diff --git a/algorithms/_utils_abc.jl b/algorithms/_utils_abc.jl index 09422a9954776eb1b06ec770e4205d06a37a3bd7..d2b4afce8554df186199aebe22f6b2ccaf10b348 100644 --- a/algorithms/_utils_abc.jl +++ b/algorithms/_utils_abc.jl @@ -1,16 +1,10 @@ -import StatsBase: cov, ProbabilityWeights -import Statistics: quantile -import NearestNeighbors: knn, KDTree -import Distributions: MvNormal, Categorical -import Random: rand! - -function _init_abc_automaton!(old_mat_p::Matrix{Float64}, vec_dist::Vector{Float64}, +function _init_abc_automaton!(mat_p_old::Matrix{Float64}, vec_dist::Vector{Float64}, pm::ParametricModel, str_var_aut::String) vec_p = zeros(pm.df) for i = eachindex(vec_dist) draw!(vec_p, pm) - old_mat_p[:,i] = vec_p + mat_p_old[:,i] = vec_p σ = simulate(pm, vec_p) vec_dist[i] = σ.S[str_var_aut] end @@ -39,16 +33,35 @@ function _compute_epsilon(vec_dist::Vector{Float64}, α::Float64, epsilon = (epsilon < last_eps) ? last_eps : epsilon end +function _task_worker!(d_mat_p::DArray{Float64,2}, d_vec_dist::DArray{Float64,1}, + d_wl_current::DArray{Float64,1}, + pm::ParametricModel, epsilon::Float64, + wl_old::Vector{Float64}, mat_p_old::Matrix{Float64}, + mat_cov::Matrix{Float64}, tree_mat_p::Union{Nothing,KDTree}, + kernel_type::String, str_var_aut::String) + mat_p = localpart(d_mat_p) + vec_dist = localpart(d_vec_dist) + wl_current = localpart(d_wl_current) + l_nbr_sim = zeros(Int, length(vec_dist)) + Threads.@threads for i = eachindex(vec_dist) + _update_param!(mat_p, vec_dist, l_nbr_sim, wl_current, i, pm, epsilon, + wl_old, mat_p_old, mat_cov, tree_mat_p, kernel_type, str_var_aut) + end + return sum(l_nbr_sim) +end + function _draw_param_kernel!(vec_p_prime::Vector{Float64}, - vec_p_star::SubArray{Float64,1}, old_mat_p::Matrix{Float64}, wl_old::Vector{Float64}, - mat_cov::Matrix{Float64}, tree_mat_p::Union{KDTree,Nothing}, kernel_type::String) + vec_p_star::SubArray{Float64,1}, + mat_p_old::Matrix{Float64}, wl_old::Vector{Float64}, + mat_cov::Matrix{Float64}, + tree_mat_p::Union{KDTree,Nothing}, kernel_type::String) if kernel_type == "mvnormal" d_mvnormal = MvNormal(vec_p_star, mat_cov) rand!(d_mvnormal, vec_p_prime) elseif kernel_type == "knn_mvnormal" - k = Int(round(0.25 * size(old_mat_p)[2])) + k = Int(round(0.25 * size(mat_p_old)[2])) idxs, dist = knn(tree_mat_p, vec_p_star, k, true) - knn_mat_cov = 2 * cov(old_mat_p[:,idxs], ProbabilityWeights(wl_old[idxs]), 2; corrected=false) + knn_mat_cov = 2 * cov(mat_p_old[:,idxs], ProbabilityWeights(wl_old[idxs]), 2; corrected=false) d_knn_mvnormal = Distributions.MvNormal(vec_p_star, knn_mat_cov) rand!(d_knn_mvnormal, vec_p_prime) return knn_mat_cov @@ -58,19 +71,20 @@ function _draw_param_kernel!(vec_p_prime::Vector{Float64}, end function _update_param!(mat_p::Matrix{Float64}, vec_dist::Vector{Float64}, - l_nbr_sim::Vector{Int}, wl_current::Vector{Float64}, idx::Int, - pm::ParametricModel, epsilon::Float64, - wl_old::Vector{Float64}, old_mat_p::Matrix{Float64}, - mat_cov::Matrix{Float64}, tree_mat_p::Union{Nothing,KDTree}, - kernel_type::String, str_var_aut::String) + l_nbr_sim::Vector{Int}, wl_current::Vector{Float64}, + idx::Int, + pm::ParametricModel, epsilon::Float64, + wl_old::Vector{Float64}, mat_p_old::Matrix{Float64}, + mat_cov::Matrix{Float64}, tree_mat_p::Union{Nothing,KDTree}, + kernel_type::String, str_var_aut::String) d_weights = Categorical(wl_old) dist_sim = Inf nbr_sim = 0 vec_p_prime = zeros(pm.df) while dist_sim > epsilon ind_p_star = rand(d_weights) - vec_p_star = view(old_mat_p, :, ind_p_star) - knn_mat_cov = _draw_param_kernel!(vec_p_prime, vec_p_star, old_mat_p, wl_old, mat_cov, tree_mat_p, kernel_type) + vec_p_star = view(mat_p_old, :, ind_p_star) + knn_mat_cov = _draw_param_kernel!(vec_p_prime, vec_p_star, mat_p_old, wl_old, mat_cov, tree_mat_p, kernel_type) if !insupport(pm, vec_p_prime) continue end @@ -90,24 +104,23 @@ function _update_param!(mat_p::Matrix{Float64}, vec_dist::Vector{Float64}, mat_p[:,idx] = vec_p_prime vec_dist[idx] = dist_sim l_nbr_sim[idx] = nbr_sim - _update_weight!(wl_current, idx, pm, wl_old, old_mat_p, vec_p_prime, mat_cov_kernel, kernel_type) + _update_weight!(wl_current, idx, pm, wl_old, mat_p_old, vec_p_prime, mat_cov_kernel, kernel_type) end function _update_weight!(wl_current::Vector{Float64}, idx::Int, - pm::ParametricModel, - wl_old::Vector{Float64}, old_mat_p::Matrix{Float64}, - vec_p_prime::Vector{Float64}, - mat_cov_kernel::Matrix{Float64}, - kernel_type::String) + pm::ParametricModel, + wl_old::Vector{Float64}, mat_p_old::Matrix{Float64}, + vec_p_prime::Vector{Float64}, + mat_cov_kernel::Matrix{Float64}, kernel_type::String) denom = 0.0 for j in 1:length(wl_old) - #denom += wl_old[j] * Distributions.pdf(d_normal, inv_sqrt_mat_cov * (vec_p_current - old_mat_p[:,j]))::Float64 - denom += wl_old[j] * pdf(MvNormal(old_mat_p[:,j], mat_cov_kernel), vec_p_prime)::Float64 + #denom += wl_old[j] * Distributions.pdf(d_normal, inv_sqrt_mat_cov * (vec_p_current - mat_p_old[:,j]))::Float64 + denom += wl_old[j] * pdf(MvNormal(mat_p_old[:,j], mat_cov_kernel), vec_p_prime)::Float64 end wl_current[idx] = prior_pdf(pm, vec_p_prime) / denom end -function effective_sample_size(wl::Vector{Float64}) +function effective_sample_size(wl::AbstractVector{Float64}) n_eff = 0.0 wls = sum(wl) if wls > 0.0 diff --git a/algorithms/automaton_abc.jl b/algorithms/automaton_abc.jl index d88f09bd60ef24ecd78d0d7828bcecebec221000..c12cecb7dbe002f44db8177baff5bb68dd50029e 100644 --- a/algorithms/automaton_abc.jl +++ b/algorithms/automaton_abc.jl @@ -1,15 +1,19 @@ -import Distributed: nprocs, nworkers, @distributed import StatsBase: mean, median, std, cov, ProbabilityWeights +import Statistics: quantile import NearestNeighbors: KDTree, knn -import Distributions: Categorical +import Distributions: MvNormal, Categorical +import Random: rand! -using DistributedArrays +import Distributed: @sync, @async, nworkers, nprocs, workers +import DistributedArrays: DArray, dzeros, convert, localpart +using Distributed using LinearAlgebra using DelimitedFiles using Logging -include(get_module_path() * "/algorithms/_utils_abc.jl") +main_pkg_path = get_module_path() +include("$(main_pkg_path)/algorithms/_utils_abc.jl") struct ResultAutomatonAbc mat_p_end::Matrix{Float64} @@ -36,12 +40,10 @@ function automaton_abc(pm::ParametricModel; nbr_particles::Int = 100, alpha::Flo write(file_cfg, "alpha : $(alpha) \n") write(file_cfg, "kernel type : $(kernel_type) \n") close(file_cfg) - if nprocs() == 1 - @info "Begin multi-threaded ABC PMC" + if nprocs() == 1 return _automaton_abc(pm, nbr_particles, alpha, kernel_type, NT, duration_time, bound_sim, dir_res, str_var_aut) - end - @info "Begin distributed ABC PMC" - _distributed_automaton_abc(pm, nbr_particles, alpha, kernel_type, NT, duration_time, bound_sim, dir_res, str_var_aut) + end + return _distributed_automaton_abc(pm, nbr_particles, alpha, kernel_type, NT, duration_time, bound_sim, dir_res, str_var_aut) end # To code: @@ -50,51 +52,45 @@ end function _automaton_abc(pm::ParametricModel, nbr_particles::Int, alpha::Float64, kernel_type::String, NT::Float64, duration_time::Float64, bound_sim::Int, dir_res::String, str_var_aut::String) - @info "ABC PMC with $(nworkers()) processus and $(Threads.nthreads()) threads" begin_time = time() nbr_p = pm.df last_epsilon = 0.0 - # Init. Iteration 1 t = 1 epsilon = Inf - old_mat_p = zeros(nbr_p, nbr_particles) + mat_p_old = zeros(nbr_p, nbr_particles) vec_dist = zeros(nbr_particles) + wl_old = zeros(nbr_particles) @info "Step 1 : Init" - - _init_abc_automaton!(old_mat_p, vec_dist, pm, str_var_aut) - wl_old = ones(nbr_particles) - prior_pdf!(wl_old, pm, old_mat_p) + _init_abc_automaton!(mat_p_old, vec_dist, pm, str_var_aut) + prior_pdf!(wl_old, pm, mat_p_old) normalize!(wl_old, 1) - ess = effective_sample_size(wl_old) l_ess = zeros(0) l_ess = push!(l_ess, ess) - flush(stdout) nbr_tot_sim = nbr_particles current_time = time() old_epsilon = epsilon mat_p = zeros(nbr_p, nbr_particles) + wl_current = zeros(nbr_particles) + l_nbr_sim = zeros(Int, nbr_particles) while (epsilon > last_epsilon) && (current_time - begin_time <= duration_time) && (nbr_tot_sim <= bound_sim) t += 1 begin_time_ite = time() @info "Step $t" # Set new epsilon epsilon = _compute_epsilon(vec_dist, alpha, old_epsilon, last_epsilon) + @info "Current epsilon and total number of simulations" epsilon nbr_tot_sim @debug "5 first dist values" sort(vec_dist)[1:5] @debug mean(vec_dist), maximum(vec_dist), median(vec_dist), std(vec_dist) - @info "Current epsilon" epsilon - @info "Total number of simulated trajectories" nbr_tot_sim # Broadcast simulations - l_nbr_sim = zeros(Int, nbr_particles) - wl_current = zeros(nbr_particles) mat_cov = nothing tree_mat_p = nothing if kernel_type == "mvnormal" - mat_cov = 2 * cov(old_mat_p, ProbabilityWeights(wl_old), 2; corrected=false) + mat_cov = 2 * cov(mat_p_old, ProbabilityWeights(wl_old), 2; corrected=false) @debug diag(mat_cov) if det(mat_cov) == 0.0 @debug det(mat_cov), rank(mat_cov), effective_sample_size(wl_old) @@ -102,44 +98,145 @@ function _automaton_abc(pm::ParametricModel, nbr_particles::Int, alpha::Float64, end end if kernel_type == "knn_mvnormal" - tree_mat_p = KDTree(old_mat_p) + tree_mat_p = KDTree(mat_p_old) end - Threads.@threads for i = 1:nbr_particles + Threads.@threads for i = eachindex(vec_dist) _update_param!(mat_p, vec_dist, l_nbr_sim, wl_current, i, pm, epsilon, - wl_old, old_mat_p, mat_cov, tree_mat_p, kernel_type, str_var_aut) + wl_old, mat_p_old, mat_cov, tree_mat_p, kernel_type, str_var_aut) end normalize!(wl_current, 1) - nbr_tot_sim += sum(l_nbr_sim) - old_mat_p = mat_p + step_nbr_sim = sum(l_nbr_sim) + nbr_tot_sim += step_nbr_sim ess = effective_sample_size(wl_current) - @debug ess l_ess = push!(l_ess, ess) - + @debug ess # Resampling if ess < NT @info "Resampling.." d_weights = Categorical(wl_current) ind_w = rand(d_weights, nbr_particles) - old_mat_p = old_mat_p[:,ind_w] + mat_p = mat_p[:,ind_w] wl_current = ones(nbr_particles) normalize!(wl_current, 1) @info "End" end - @info "Time spent for this step" steptime=(current_time-begin_time_ite) - @info "Number of simulations for this step" sum(l_nbr_sim) + @info "After this step, time spent and number of simulations" steptime=(current_time-begin_time_ite) step_nbr_sim + mat_p_old = copy(mat_p) wl_old = copy(wl_current) + fill!(l_nbr_sim, 0) flush(stdout) current_time = time() old_epsilon = epsilon end - mat_p = old_mat_p + mat_p = mat_p_old mat_cov = cov(mat_p, ProbabilityWeights(wl_old), 2; corrected=false) save_mat_p_end = false if save_mat_p_end writedlm(dir_res * "weights_end.csv", wl_old, ',') - writedlm(dir_res * "mat_p_end.csv", old_mat_p, ',') + writedlm(dir_res * "mat_p_end.csv", mat_p_old, ',') end r = ResultAutomatonAbc(mat_p, mat_cov, nbr_tot_sim, time() - begin_time, vec_dist, epsilon, wl_old, l_ess) return r end + +function _distributed_automaton_abc(pm::ParametricModel, nbr_particles::Int, alpha::Float64, + kernel_type::String, NT::Float64, duration_time::Float64, bound_sim::Int, + dir_res::String, str_var_aut::String) + @info "Distributed ABC PMC with $(nworkers()) processus and $(Threads.nthreads()) threads" + begin_time = time() + nbr_p = pm.df + last_epsilon = 0.0 + # Init. Iteration 1 + t = 1 + epsilon = Inf + mat_p_old = zeros(nbr_p, nbr_particles) + vec_dist = zeros(nbr_particles) + wl_old = zeros(nbr_particles) + @info "Step 1 : Init" + _init_abc_automaton!(mat_p_old, vec_dist, pm, str_var_aut) + prior_pdf!(wl_old, pm, mat_p_old) + normalize!(wl_old, 1) + ess = effective_sample_size(wl_old) + l_ess = zeros(0) + l_ess = push!(l_ess, ess) + flush(stdout) + nbr_tot_sim = nbr_particles + current_time = time() + old_epsilon = epsilon + d_mat_p = dzeros(nbr_p, nbr_particles) + d_vec_dist = dzeros(nbr_particles) + d_wl_current = dzeros(nbr_particles) + mat_p = zeros(0,0) + wl_current = zeros(0) + while (epsilon > last_epsilon) && (current_time - begin_time <= duration_time) && (nbr_tot_sim <= bound_sim) + t += 1 + begin_time_ite = time() + @info "Step $t" + # Set new epsilon + epsilon = _compute_epsilon(vec_dist, alpha, old_epsilon, last_epsilon) + @info "Current epsilon and total number of simulations" epsilon nbr_tot_sim + @debug "5 first dist values" sort(vec_dist)[1:5] + @debug mean(vec_dist), maximum(vec_dist), median(vec_dist), std(vec_dist) + + # Broadcast simulations + mat_cov = nothing + tree_mat_p = nothing + if kernel_type == "mvnormal" + mat_cov = 2 * cov(mat_p_old, ProbabilityWeights(wl_old), 2; corrected=false) + @debug diag(mat_cov) + if det(mat_cov) == 0.0 + @debug det(mat_cov), rank(mat_cov), effective_sample_size(wl_old) + @error "Bad inv mat cov" + end + end + if kernel_type == "knn_mvnormal" + tree_mat_p = KDTree(mat_p_old) + end + l_nbr_sim = zeros(Int, nworkers()) + @sync for id_w in workers() + t_id_w = id_w - workers()[1] + 1 + @async l_nbr_sim[t_id_w] = + remotecall_fetch(() -> _task_worker!(d_mat_p, d_vec_dist, d_wl_current, + pm, epsilon, wl_old, mat_p_old, mat_cov, tree_mat_p, + kernel_type, str_var_aut), id_w) + end + wl_current = convert(Array, d_wl_current) + normalize!(wl_current, 1) + mat_p = convert(Array, d_mat_p) + step_nbr_sim = sum(l_nbr_sim) + nbr_tot_sim += step_nbr_sim + ess = effective_sample_size(wl_current) + l_ess = push!(l_ess, ess) + @debug ess + # Resampling + if ess < NT + @info "Resampling.." + d_weights = Categorical(wl_current) + ind_w = rand(d_weights, nbr_particles) + mat_p = mat_p[:,ind_w] + wl_current = ones(nbr_particles) + normalize!(wl_current, 1) + @info "End" + end + @info "After this step, time spent and number of simulations" steptime=(current_time-begin_time_ite) step_nbr_sim + mat_p_old = mat_p + wl_old = wl_current + vec_dist = convert(Array, d_vec_dist) + fill!(l_nbr_sim, 0) + flush(stdout) + current_time = time() + old_epsilon = epsilon + end + + mat_cov = cov(mat_p, ProbabilityWeights(wl_old), 2; corrected=false) + save_mat_p_end = false + if save_mat_p_end + writedlm(dir_res * "weights_end.csv", wl_current, ',') + writedlm(dir_res * "mat_p_end.csv", mat_p, ',') + end + r = ResultAutomatonAbc(mat_p, mat_cov, nbr_tot_sim, time() - begin_time, convert(Array, d_vec_dist), epsilon, wl_current, l_ess) + return r +end + + diff --git a/core/MarkovProcesses.jl b/core/MarkovProcesses.jl index 0aa1edfc85e72e2136e6d68f350f8ead50f0f25c..0b4432c44d4e04d6f1d3018997e5c9b4a52899ff 100644 --- a/core/MarkovProcesses.jl +++ b/core/MarkovProcesses.jl @@ -5,6 +5,7 @@ import Base: copy, getfield, getindex, getproperty, lastindex import Base: setindex!, setproperty!, fill! import StaticArrays: SVector +import Distributed: @everywhere import Distributions: Distribution, Product, Uniform, Normal export Distribution, Product, Uniform, Normal diff --git a/core/model.jl b/core/model.jl index 347716b8a4470f1a15712abdae5b42a9392278e7..c1e39b15992c25c16218d7280f20b43dd14b84ff 100644 --- a/core/model.jl +++ b/core/model.jl @@ -340,7 +340,7 @@ draw!(vec_p::AbstractVector{Float64}, pm::ParametricModel) = rand!(pm.dist, vec_ Draw `size(mat_p)[2]` (number of columns of mat_p) parameters from the prior distribution defined in pm and stores it in mat_p. """ -function draw!(mat_p::Matrix{Float64}, pm::ParametricModel) +function draw!(mat_p::AbstractMatrix{Float64}, pm::ParametricModel) for i = 1:size(mat_p)[2] draw!(view(mat_p, :, i), pm) end @@ -357,13 +357,13 @@ prior_pdf(pm::ParametricModel, p_prior::AbstractVector{Float64}) = pdf(pm.dist, Computes the density of the prior distribution defined in pm on each column ov mat_p. Stores it in mat_p. (`length(vec_res) == size(mat_p)[2]`) """ -function prior_pdf!(res_pdf::Vector{Float64}, pm::ParametricModel, mat_p::Matrix{Float64}) +function prior_pdf!(res_pdf::AbstractVector{Float64}, pm::ParametricModel, mat_p::AbstractMatrix{Float64}) for i = eachindex(res_pdf) res_pdf[i] = prior_pdf(pm, view(mat_p, :, i)) end end -fill!(pm::ParametricModel, p_prior::Vector{Float64}) = get_proba_model(pm).p[pm._param_idx] = p_prior -insupport(pm::ParametricModel, p_prior::Vector{Float64}) = insupport(pm.dist, p_prior) +fill!(pm::ParametricModel, p_prior::AbstractVector{Float64}) = get_proba_model(pm).p[pm._param_idx] = p_prior +insupport(pm::ParametricModel, p_prior::AbstractVector{Float64}) = insupport(pm.dist, p_prior) # to do: simulate(pm), create_res_dir, check_bounds, ajouter un champ _param_idx pour pm. # diff --git a/core/utils.jl b/core/utils.jl index 648adee2ba3fa5b2d2fc7f6ebf65ee72c42c2c89..68c60b170b78663b32a4948b05c7215515eff621 100644 --- a/core/utils.jl +++ b/core/utils.jl @@ -19,7 +19,7 @@ function cosmos_get_values(name_file::String) return dict_values end -load_model(name_model::String) = include(get_module_path() * "/models/" * name_model * ".jl") -load_automaton(automaton::String) = include(get_module_path() * "/automata/$(automaton).jl") +load_model(name_model::String) = include("$(get_module_path())/models/$(name_model).jl") +load_automaton(automaton::String) = include("$(get_module_path())/automata/$(automaton).jl") load_plots() = include(get_module_path() * "/core/plots.jl") diff --git a/tests/automaton_abc/R2.jl b/tests/automaton_abc/R2.jl index b84d02bbaba9a5c5579be61b64e4ed27a557b7cd..143182bfeeeff6ad2cc4cbd351fa1e89ec5b976c 100644 --- a/tests/automaton_abc/R2.jl +++ b/tests/automaton_abc/R2.jl @@ -1,16 +1,17 @@ -using MarkovProcesses +@everywhere using MarkovProcesses using Plots -load_model("ER") -load_automaton("automaton_F") +@everywhere load_model("ER") +@everywhere load_automaton("automaton_F") + A_F_R2 = create_automaton_F(ER, 50.0, 75.0, 0.05, 0.075, "P") sync_ER = A_F_R2*ER pm_sync_ER = ParametricModel(sync_ER, ("k3", Uniform(0.0, 100.0))) -r = automaton_abc(pm_sync_ER; nbr_particles = 1000) +@timev r = automaton_abc(pm_sync_ER) @show r.nbr_sim -histogram(r.mat_p_end', weights = r.weights, normalize = :density) +histogram(r.mat_p_end', bins = :sqrt, weights = r.weights, normalize = :density, xlims = (0.0, 100.0)) png("R2_hist.png")