Commit bf35d240 authored by Bentriou Mahmoud's avatar Bentriou Mahmoud
Browse files

Distributed ABC

parent 67040dc5
import StatsBase: cov, ProbabilityWeights
import Statistics: quantile
import NearestNeighbors: knn, KDTree
import Distributions: MvNormal, Categorical
import Random: rand!
function _init_abc_automaton!(old_mat_p::Matrix{Float64}, vec_dist::Vector{Float64},
function _init_abc_automaton!(mat_p_old::Matrix{Float64}, vec_dist::Vector{Float64},
pm::ParametricModel, str_var_aut::String)
vec_p = zeros(pm.df)
for i = eachindex(vec_dist)
draw!(vec_p, pm)
old_mat_p[:,i] = vec_p
mat_p_old[:,i] = vec_p
σ = simulate(pm, vec_p)
vec_dist[i] = σ.S[str_var_aut]
end
......@@ -39,16 +33,35 @@ function _compute_epsilon(vec_dist::Vector{Float64}, α::Float64,
epsilon = (epsilon < last_eps) ? last_eps : epsilon
end
function _task_worker!(d_mat_p::DArray{Float64,2}, d_vec_dist::DArray{Float64,1},
d_wl_current::DArray{Float64,1},
pm::ParametricModel, epsilon::Float64,
wl_old::Vector{Float64}, mat_p_old::Matrix{Float64},
mat_cov::Matrix{Float64}, tree_mat_p::Union{Nothing,KDTree},
kernel_type::String, str_var_aut::String)
mat_p = localpart(d_mat_p)
vec_dist = localpart(d_vec_dist)
wl_current = localpart(d_wl_current)
l_nbr_sim = zeros(Int, length(vec_dist))
Threads.@threads for i = eachindex(vec_dist)
_update_param!(mat_p, vec_dist, l_nbr_sim, wl_current, i, pm, epsilon,
wl_old, mat_p_old, mat_cov, tree_mat_p, kernel_type, str_var_aut)
end
return sum(l_nbr_sim)
end
function _draw_param_kernel!(vec_p_prime::Vector{Float64},
vec_p_star::SubArray{Float64,1}, old_mat_p::Matrix{Float64}, wl_old::Vector{Float64},
mat_cov::Matrix{Float64}, tree_mat_p::Union{KDTree,Nothing}, kernel_type::String)
vec_p_star::SubArray{Float64,1},
mat_p_old::Matrix{Float64}, wl_old::Vector{Float64},
mat_cov::Matrix{Float64},
tree_mat_p::Union{KDTree,Nothing}, kernel_type::String)
if kernel_type == "mvnormal"
d_mvnormal = MvNormal(vec_p_star, mat_cov)
rand!(d_mvnormal, vec_p_prime)
elseif kernel_type == "knn_mvnormal"
k = Int(round(0.25 * size(old_mat_p)[2]))
k = Int(round(0.25 * size(mat_p_old)[2]))
idxs, dist = knn(tree_mat_p, vec_p_star, k, true)
knn_mat_cov = 2 * cov(old_mat_p[:,idxs], ProbabilityWeights(wl_old[idxs]), 2; corrected=false)
knn_mat_cov = 2 * cov(mat_p_old[:,idxs], ProbabilityWeights(wl_old[idxs]), 2; corrected=false)
d_knn_mvnormal = Distributions.MvNormal(vec_p_star, knn_mat_cov)
rand!(d_knn_mvnormal, vec_p_prime)
return knn_mat_cov
......@@ -58,19 +71,20 @@ function _draw_param_kernel!(vec_p_prime::Vector{Float64},
end
function _update_param!(mat_p::Matrix{Float64}, vec_dist::Vector{Float64},
l_nbr_sim::Vector{Int}, wl_current::Vector{Float64}, idx::Int,
pm::ParametricModel, epsilon::Float64,
wl_old::Vector{Float64}, old_mat_p::Matrix{Float64},
mat_cov::Matrix{Float64}, tree_mat_p::Union{Nothing,KDTree},
kernel_type::String, str_var_aut::String)
l_nbr_sim::Vector{Int}, wl_current::Vector{Float64},
idx::Int,
pm::ParametricModel, epsilon::Float64,
wl_old::Vector{Float64}, mat_p_old::Matrix{Float64},
mat_cov::Matrix{Float64}, tree_mat_p::Union{Nothing,KDTree},
kernel_type::String, str_var_aut::String)
d_weights = Categorical(wl_old)
dist_sim = Inf
nbr_sim = 0
vec_p_prime = zeros(pm.df)
while dist_sim > epsilon
ind_p_star = rand(d_weights)
vec_p_star = view(old_mat_p, :, ind_p_star)
knn_mat_cov = _draw_param_kernel!(vec_p_prime, vec_p_star, old_mat_p, wl_old, mat_cov, tree_mat_p, kernel_type)
vec_p_star = view(mat_p_old, :, ind_p_star)
knn_mat_cov = _draw_param_kernel!(vec_p_prime, vec_p_star, mat_p_old, wl_old, mat_cov, tree_mat_p, kernel_type)
if !insupport(pm, vec_p_prime)
continue
end
......@@ -90,24 +104,23 @@ function _update_param!(mat_p::Matrix{Float64}, vec_dist::Vector{Float64},
mat_p[:,idx] = vec_p_prime
vec_dist[idx] = dist_sim
l_nbr_sim[idx] = nbr_sim
_update_weight!(wl_current, idx, pm, wl_old, old_mat_p, vec_p_prime, mat_cov_kernel, kernel_type)
_update_weight!(wl_current, idx, pm, wl_old, mat_p_old, vec_p_prime, mat_cov_kernel, kernel_type)
end
function _update_weight!(wl_current::Vector{Float64}, idx::Int,
pm::ParametricModel,
wl_old::Vector{Float64}, old_mat_p::Matrix{Float64},
vec_p_prime::Vector{Float64},
mat_cov_kernel::Matrix{Float64},
kernel_type::String)
pm::ParametricModel,
wl_old::Vector{Float64}, mat_p_old::Matrix{Float64},
vec_p_prime::Vector{Float64},
mat_cov_kernel::Matrix{Float64}, kernel_type::String)
denom = 0.0
for j in 1:length(wl_old)
#denom += wl_old[j] * Distributions.pdf(d_normal, inv_sqrt_mat_cov * (vec_p_current - old_mat_p[:,j]))::Float64
denom += wl_old[j] * pdf(MvNormal(old_mat_p[:,j], mat_cov_kernel), vec_p_prime)::Float64
#denom += wl_old[j] * Distributions.pdf(d_normal, inv_sqrt_mat_cov * (vec_p_current - mat_p_old[:,j]))::Float64
denom += wl_old[j] * pdf(MvNormal(mat_p_old[:,j], mat_cov_kernel), vec_p_prime)::Float64
end
wl_current[idx] = prior_pdf(pm, vec_p_prime) / denom
end
function effective_sample_size(wl::Vector{Float64})
function effective_sample_size(wl::AbstractVector{Float64})
n_eff = 0.0
wls = sum(wl)
if wls > 0.0
......
import Distributed: nprocs, nworkers, @distributed
import StatsBase: mean, median, std, cov, ProbabilityWeights
import Statistics: quantile
import NearestNeighbors: KDTree, knn
import Distributions: Categorical
import Distributions: MvNormal, Categorical
import Random: rand!
using DistributedArrays
import Distributed: @sync, @async, nworkers, nprocs, workers
import DistributedArrays: DArray, dzeros, convert, localpart
using Distributed
using LinearAlgebra
using DelimitedFiles
using Logging
include(get_module_path() * "/algorithms/_utils_abc.jl")
main_pkg_path = get_module_path()
include("$(main_pkg_path)/algorithms/_utils_abc.jl")
struct ResultAutomatonAbc
mat_p_end::Matrix{Float64}
......@@ -36,12 +40,10 @@ function automaton_abc(pm::ParametricModel; nbr_particles::Int = 100, alpha::Flo
write(file_cfg, "alpha : $(alpha) \n")
write(file_cfg, "kernel type : $(kernel_type) \n")
close(file_cfg)
if nprocs() == 1
@info "Begin multi-threaded ABC PMC"
if nprocs() == 1
return _automaton_abc(pm, nbr_particles, alpha, kernel_type, NT, duration_time, bound_sim, dir_res, str_var_aut)
end
@info "Begin distributed ABC PMC"
_distributed_automaton_abc(pm, nbr_particles, alpha, kernel_type, NT, duration_time, bound_sim, dir_res, str_var_aut)
end
return _distributed_automaton_abc(pm, nbr_particles, alpha, kernel_type, NT, duration_time, bound_sim, dir_res, str_var_aut)
end
# To code:
......@@ -50,51 +52,45 @@ end
function _automaton_abc(pm::ParametricModel, nbr_particles::Int, alpha::Float64,
kernel_type::String, NT::Float64, duration_time::Float64, bound_sim::Int,
dir_res::String, str_var_aut::String)
@info "ABC PMC with $(nworkers()) processus and $(Threads.nthreads()) threads"
begin_time = time()
nbr_p = pm.df
last_epsilon = 0.0
# Init. Iteration 1
t = 1
epsilon = Inf
old_mat_p = zeros(nbr_p, nbr_particles)
mat_p_old = zeros(nbr_p, nbr_particles)
vec_dist = zeros(nbr_particles)
wl_old = zeros(nbr_particles)
@info "Step 1 : Init"
_init_abc_automaton!(old_mat_p, vec_dist, pm, str_var_aut)
wl_old = ones(nbr_particles)
prior_pdf!(wl_old, pm, old_mat_p)
_init_abc_automaton!(mat_p_old, vec_dist, pm, str_var_aut)
prior_pdf!(wl_old, pm, mat_p_old)
normalize!(wl_old, 1)
ess = effective_sample_size(wl_old)
l_ess = zeros(0)
l_ess = push!(l_ess, ess)
flush(stdout)
nbr_tot_sim = nbr_particles
current_time = time()
old_epsilon = epsilon
mat_p = zeros(nbr_p, nbr_particles)
wl_current = zeros(nbr_particles)
l_nbr_sim = zeros(Int, nbr_particles)
while (epsilon > last_epsilon) && (current_time - begin_time <= duration_time) && (nbr_tot_sim <= bound_sim)
t += 1
begin_time_ite = time()
@info "Step $t"
# Set new epsilon
epsilon = _compute_epsilon(vec_dist, alpha, old_epsilon, last_epsilon)
@info "Current epsilon and total number of simulations" epsilon nbr_tot_sim
@debug "5 first dist values" sort(vec_dist)[1:5]
@debug mean(vec_dist), maximum(vec_dist), median(vec_dist), std(vec_dist)
@info "Current epsilon" epsilon
@info "Total number of simulated trajectories" nbr_tot_sim
# Broadcast simulations
l_nbr_sim = zeros(Int, nbr_particles)
wl_current = zeros(nbr_particles)
mat_cov = nothing
tree_mat_p = nothing
if kernel_type == "mvnormal"
mat_cov = 2 * cov(old_mat_p, ProbabilityWeights(wl_old), 2; corrected=false)
mat_cov = 2 * cov(mat_p_old, ProbabilityWeights(wl_old), 2; corrected=false)
@debug diag(mat_cov)
if det(mat_cov) == 0.0
@debug det(mat_cov), rank(mat_cov), effective_sample_size(wl_old)
......@@ -102,44 +98,145 @@ function _automaton_abc(pm::ParametricModel, nbr_particles::Int, alpha::Float64,
end
end
if kernel_type == "knn_mvnormal"
tree_mat_p = KDTree(old_mat_p)
tree_mat_p = KDTree(mat_p_old)
end
Threads.@threads for i = 1:nbr_particles
Threads.@threads for i = eachindex(vec_dist)
_update_param!(mat_p, vec_dist, l_nbr_sim, wl_current, i, pm, epsilon,
wl_old, old_mat_p, mat_cov, tree_mat_p, kernel_type, str_var_aut)
wl_old, mat_p_old, mat_cov, tree_mat_p, kernel_type, str_var_aut)
end
normalize!(wl_current, 1)
nbr_tot_sim += sum(l_nbr_sim)
old_mat_p = mat_p
step_nbr_sim = sum(l_nbr_sim)
nbr_tot_sim += step_nbr_sim
ess = effective_sample_size(wl_current)
@debug ess
l_ess = push!(l_ess, ess)
@debug ess
# Resampling
if ess < NT
@info "Resampling.."
d_weights = Categorical(wl_current)
ind_w = rand(d_weights, nbr_particles)
old_mat_p = old_mat_p[:,ind_w]
mat_p = mat_p[:,ind_w]
wl_current = ones(nbr_particles)
normalize!(wl_current, 1)
@info "End"
end
@info "Time spent for this step" steptime=(current_time-begin_time_ite)
@info "Number of simulations for this step" sum(l_nbr_sim)
@info "After this step, time spent and number of simulations" steptime=(current_time-begin_time_ite) step_nbr_sim
mat_p_old = copy(mat_p)
wl_old = copy(wl_current)
fill!(l_nbr_sim, 0)
flush(stdout)
current_time = time()
old_epsilon = epsilon
end
mat_p = old_mat_p
mat_p = mat_p_old
mat_cov = cov(mat_p, ProbabilityWeights(wl_old), 2; corrected=false)
save_mat_p_end = false
if save_mat_p_end
writedlm(dir_res * "weights_end.csv", wl_old, ',')
writedlm(dir_res * "mat_p_end.csv", old_mat_p, ',')
writedlm(dir_res * "mat_p_end.csv", mat_p_old, ',')
end
r = ResultAutomatonAbc(mat_p, mat_cov, nbr_tot_sim, time() - begin_time, vec_dist, epsilon, wl_old, l_ess)
return r
end
function _distributed_automaton_abc(pm::ParametricModel, nbr_particles::Int, alpha::Float64,
kernel_type::String, NT::Float64, duration_time::Float64, bound_sim::Int,
dir_res::String, str_var_aut::String)
@info "Distributed ABC PMC with $(nworkers()) processus and $(Threads.nthreads()) threads"
begin_time = time()
nbr_p = pm.df
last_epsilon = 0.0
# Init. Iteration 1
t = 1
epsilon = Inf
mat_p_old = zeros(nbr_p, nbr_particles)
vec_dist = zeros(nbr_particles)
wl_old = zeros(nbr_particles)
@info "Step 1 : Init"
_init_abc_automaton!(mat_p_old, vec_dist, pm, str_var_aut)
prior_pdf!(wl_old, pm, mat_p_old)
normalize!(wl_old, 1)
ess = effective_sample_size(wl_old)
l_ess = zeros(0)
l_ess = push!(l_ess, ess)
flush(stdout)
nbr_tot_sim = nbr_particles
current_time = time()
old_epsilon = epsilon
d_mat_p = dzeros(nbr_p, nbr_particles)
d_vec_dist = dzeros(nbr_particles)
d_wl_current = dzeros(nbr_particles)
mat_p = zeros(0,0)
wl_current = zeros(0)
while (epsilon > last_epsilon) && (current_time - begin_time <= duration_time) && (nbr_tot_sim <= bound_sim)
t += 1
begin_time_ite = time()
@info "Step $t"
# Set new epsilon
epsilon = _compute_epsilon(vec_dist, alpha, old_epsilon, last_epsilon)
@info "Current epsilon and total number of simulations" epsilon nbr_tot_sim
@debug "5 first dist values" sort(vec_dist)[1:5]
@debug mean(vec_dist), maximum(vec_dist), median(vec_dist), std(vec_dist)
# Broadcast simulations
mat_cov = nothing
tree_mat_p = nothing
if kernel_type == "mvnormal"
mat_cov = 2 * cov(mat_p_old, ProbabilityWeights(wl_old), 2; corrected=false)
@debug diag(mat_cov)
if det(mat_cov) == 0.0
@debug det(mat_cov), rank(mat_cov), effective_sample_size(wl_old)
@error "Bad inv mat cov"
end
end
if kernel_type == "knn_mvnormal"
tree_mat_p = KDTree(mat_p_old)
end
l_nbr_sim = zeros(Int, nworkers())
@sync for id_w in workers()
t_id_w = id_w - workers()[1] + 1
@async l_nbr_sim[t_id_w] =
remotecall_fetch(() -> _task_worker!(d_mat_p, d_vec_dist, d_wl_current,
pm, epsilon, wl_old, mat_p_old, mat_cov, tree_mat_p,
kernel_type, str_var_aut), id_w)
end
wl_current = convert(Array, d_wl_current)
normalize!(wl_current, 1)
mat_p = convert(Array, d_mat_p)
step_nbr_sim = sum(l_nbr_sim)
nbr_tot_sim += step_nbr_sim
ess = effective_sample_size(wl_current)
l_ess = push!(l_ess, ess)
@debug ess
# Resampling
if ess < NT
@info "Resampling.."
d_weights = Categorical(wl_current)
ind_w = rand(d_weights, nbr_particles)
mat_p = mat_p[:,ind_w]
wl_current = ones(nbr_particles)
normalize!(wl_current, 1)
@info "End"
end
@info "After this step, time spent and number of simulations" steptime=(current_time-begin_time_ite) step_nbr_sim
mat_p_old = mat_p
wl_old = wl_current
vec_dist = convert(Array, d_vec_dist)
fill!(l_nbr_sim, 0)
flush(stdout)
current_time = time()
old_epsilon = epsilon
end
mat_cov = cov(mat_p, ProbabilityWeights(wl_old), 2; corrected=false)
save_mat_p_end = false
if save_mat_p_end
writedlm(dir_res * "weights_end.csv", wl_current, ',')
writedlm(dir_res * "mat_p_end.csv", mat_p, ',')
end
r = ResultAutomatonAbc(mat_p, mat_cov, nbr_tot_sim, time() - begin_time, convert(Array, d_vec_dist), epsilon, wl_current, l_ess)
return r
end
......@@ -5,6 +5,7 @@ import Base: copy, getfield, getindex, getproperty, lastindex
import Base: setindex!, setproperty!, fill!
import StaticArrays: SVector
import Distributed: @everywhere
import Distributions: Distribution, Product, Uniform, Normal
export Distribution, Product, Uniform, Normal
......
......@@ -340,7 +340,7 @@ draw!(vec_p::AbstractVector{Float64}, pm::ParametricModel) = rand!(pm.dist, vec_
Draw `size(mat_p)[2]` (number of columns of mat_p) parameters from the prior distribution
defined in pm and stores it in mat_p.
"""
function draw!(mat_p::Matrix{Float64}, pm::ParametricModel)
function draw!(mat_p::AbstractMatrix{Float64}, pm::ParametricModel)
for i = 1:size(mat_p)[2]
draw!(view(mat_p, :, i), pm)
end
......@@ -357,13 +357,13 @@ prior_pdf(pm::ParametricModel, p_prior::AbstractVector{Float64}) = pdf(pm.dist,
Computes the density of the prior distribution defined in pm on each column
ov mat_p. Stores it in mat_p. (`length(vec_res) == size(mat_p)[2]`)
"""
function prior_pdf!(res_pdf::Vector{Float64}, pm::ParametricModel, mat_p::Matrix{Float64})
function prior_pdf!(res_pdf::AbstractVector{Float64}, pm::ParametricModel, mat_p::AbstractMatrix{Float64})
for i = eachindex(res_pdf)
res_pdf[i] = prior_pdf(pm, view(mat_p, :, i))
end
end
fill!(pm::ParametricModel, p_prior::Vector{Float64}) = get_proba_model(pm).p[pm._param_idx] = p_prior
insupport(pm::ParametricModel, p_prior::Vector{Float64}) = insupport(pm.dist, p_prior)
fill!(pm::ParametricModel, p_prior::AbstractVector{Float64}) = get_proba_model(pm).p[pm._param_idx] = p_prior
insupport(pm::ParametricModel, p_prior::AbstractVector{Float64}) = insupport(pm.dist, p_prior)
# to do: simulate(pm), create_res_dir, check_bounds, ajouter un champ _param_idx pour pm.
#
......
......@@ -19,7 +19,7 @@ function cosmos_get_values(name_file::String)
return dict_values
end
load_model(name_model::String) = include(get_module_path() * "/models/" * name_model * ".jl")
load_automaton(automaton::String) = include(get_module_path() * "/automata/$(automaton).jl")
load_model(name_model::String) = include("$(get_module_path())/models/$(name_model).jl")
load_automaton(automaton::String) = include("$(get_module_path())/automata/$(automaton).jl")
load_plots() = include(get_module_path() * "/core/plots.jl")
using MarkovProcesses
@everywhere using MarkovProcesses
using Plots
load_model("ER")
load_automaton("automaton_F")
@everywhere load_model("ER")
@everywhere load_automaton("automaton_F")
A_F_R2 = create_automaton_F(ER, 50.0, 75.0, 0.05, 0.075, "P")
sync_ER = A_F_R2*ER
pm_sync_ER = ParametricModel(sync_ER, ("k3", Uniform(0.0, 100.0)))
r = automaton_abc(pm_sync_ER; nbr_particles = 1000)
@timev r = automaton_abc(pm_sync_ER)
@show r.nbr_sim
histogram(r.mat_p_end', weights = r.weights, normalize = :density)
histogram(r.mat_p_end', bins = :sqrt, weights = r.weights, normalize = :density, xlims = (0.0, 100.0))
png("R2_hist.png")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment