From 9028ff5dfe2cb98f8e153c16e4c7a8241e9f803e Mon Sep 17 00:00:00 2001
From: Mahmoud Bentriou <mahmoud.bentriou@centralesupelec.fr>
Date: Sat, 16 Jan 2021 02:30:55 +0100
Subject: [PATCH] now the package has a better structure for distributed
 simulations by better definition of functions over workers + test

---
 core/network_model.jl                         |  8 ++-
 models/ER.jl                                  | 10 ++-
 models/SIR.jl                                 | 10 ++-
 models/SIR_tauleap.jl                         | 25 ++++----
 tests/run_unit.jl                             |  1 +
 tests/unit/macro_network_model_distributed.jl | 61 +++++++++++++++++++
 6 files changed, 87 insertions(+), 28 deletions(-)
 create mode 100644 tests/unit/macro_network_model_distributed.jl

diff --git a/core/network_model.jl b/core/network_model.jl
index bc3a39d..b55bac4 100644
--- a/core/network_model.jl
+++ b/core/network_model.jl
@@ -225,12 +225,14 @@ macro network_model(expr_network,expr_name...)
     expr_model_f! *= "@inbounds l_tr[1] = l_sym_R[reaction]\n"
     expr_model_f! *= "end\n"
     expr_model_isabsorbing = "isabsorbing_$(basename_func)(p::Vector{Float64},xn::Vector{Int}) = $(str_test_isabsorbing) === 0.0"
-    model_f! = eval(Meta.parse(expr_model_f!))
-    model_isabsorbing = eval(Meta.parse(expr_model_isabsorbing))
+    @everywhere eval(Meta.parse($expr_model_f!))
+    @everywhere eval(Meta.parse($expr_model_isabsorbing))
+    symbol_func_f! = Symbol("$(basename_func)_f!")
+    symbol_func_isabsorbing = Symbol("isabsorbing_$(basename_func)")
     map_idx_var_model = Dict(value => key for (key, value) in dict_species)
     model_g = [map_idx_var_model[i] for i = 1:length(list_species)]
     return :(ContinuousTimeModel($dim_state, $dim_params, $dict_species, $dict_params, $transitions, 
-                                 $(zeros(dim_params)), $(zeros(Int, dim_state)), 0.0, $model_f!, $model_isabsorbing; 
+                                 $(zeros(dim_params)), $(zeros(Int, dim_state)), 0.0, $(getfield(Main, symbol_func_f!)), $(getfield(Main, symbol_func_isabsorbing));
                                  g = $model_g, name=$model_name))
 end
 
diff --git a/models/ER.jl b/models/ER.jl
index 350707d..fffe144 100644
--- a/models/ER.jl
+++ b/models/ER.jl
@@ -1,6 +1,4 @@
 
-import StaticArrays: SVector, SMatrix, @SVector, @SMatrix
-
 d=4
 k=3
 dict_var_ER = Dict(:E => 1, :S => 2, :ES => 3, :P => 4)
@@ -9,8 +7,8 @@ l_tr_ER = [:R1,:R2,:R3]
 p_ER = [1.0, 1.0, 1.0]
 x0_ER = [100, 100, 0, 0]
 t0_ER = 0.0
-function ER_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transition},
-               xn::Vector{Int}, tn::Float64, p::Vector{Float64})
+@everywhere function ER_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transition},
+                           xn::Vector{Int}, tn::Float64, p::Vector{Float64})
     @inbounds a1 = p[1] * xn[1] * xn[2]
     @inbounds a2 = p[2] * xn[3]
     @inbounds a3 = p[3] * xn[3]
@@ -48,11 +46,11 @@ function ER_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transiti
     @inbounds l_t[1] = tn + tau
     @inbounds l_tr[1] = l_str_R[reaction]
 end
-isabsorbing_ER(p::Vector{Float64},xn::Vector{Int}) = 
+@everywhere isabsorbing_ER(p::Vector{Float64},xn::Vector{Int}) = 
     @inbounds(p[1]*xn[1]*xn[2] + (p[2]+p[3])*xn[3] === 0.0)
 g_ER = [:P]
 
-ER = ContinuousTimeModel(d,k,dict_var_ER,dict_p_ER,l_tr_ER,p_ER,x0_ER,t0_ER,ER_f!,isabsorbing_ER; g=g_ER,name="ER SSA pkg")
+ER = ContinuousTimeModel(d,k,dict_var_ER,dict_p_ER,l_tr_ER,p_ER,x0_ER,t0_ER, getfield(Main, :ER_f!), getfield(Main, :isabsorbing_ER); g=g_ER,name="ER SSA pkg")
 
 function create_ER(new_p::Vector{Float64})
     ER_new = deepcopy(ER)
diff --git a/models/SIR.jl b/models/SIR.jl
index e2bfadf..16ac093 100644
--- a/models/SIR.jl
+++ b/models/SIR.jl
@@ -1,6 +1,4 @@
 
-import StaticArrays: SVector, SMatrix, @SVector, @SMatrix
-
 d=3
 k=2
 dict_var = Dict(:S => 1, :I => 2, :R => 3)
@@ -9,8 +7,8 @@ l_tr_SIR = [:R1,:R2]
 p_SIR = [0.0012, 0.05]
 x0_SIR = [95, 5, 0]
 t0_SIR = 0.0
-function SIR_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transition},
-                xn::Vector{Int}, tn::Float64, p::Vector{Float64})
+@everywhere function SIR_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transition},
+                            xn::Vector{Int}, tn::Float64, p::Vector{Float64})
     @inbounds a1 = p[1] * xn[1] * xn[2]
     @inbounds a2 = p[2] * xn[2]
     l_a = (a1, a2)
@@ -47,10 +45,10 @@ function SIR_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transit
     @inbounds l_t[1] = tn + tau
     @inbounds l_tr[1] = l_str_R[reaction]
 end
-isabsorbing_SIR(p::Vector{Float64}, xn::Vector{Int}) = (p[1]*xn[1]*xn[2] + p[2]*xn[2]) === 0.0
+@everywhere isabsorbing_SIR(p::Vector{Float64}, xn::Vector{Int}) = (p[1]*xn[1]*xn[2] + p[2]*xn[2]) === 0.0
 g_SIR = [:I]
 
-SIR = ContinuousTimeModel(d,k,dict_var,dict_p,l_tr_SIR,p_SIR,x0_SIR,t0_SIR,SIR_f!,isabsorbing_SIR; g=g_SIR, name="SIR SSA pkg")
+SIR = ContinuousTimeModel(d,k,dict_var,dict_p,l_tr_SIR,p_SIR,x0_SIR,t0_SIR, getfield(Main, :SIR_f!), getfield(Main, :isabsorbing_SIR); g=g_SIR, name="SIR SSA pkg")
 
 function create_SIR(new_p::Vector{Float64})
     SIR_new = deepcopy(SIR)
diff --git a/models/SIR_tauleap.jl b/models/SIR_tauleap.jl
index 40a07d0..9ed8b90 100644
--- a/models/SIR_tauleap.jl
+++ b/models/SIR_tauleap.jl
@@ -1,6 +1,5 @@
 
-import StaticArrays: SVector, SMatrix, @SVector, @SMatrix
-import Distributions: Poisson, rand
+@everywhere import Distributions: Poisson, rand
 
 d=3
 k=3
@@ -10,34 +9,34 @@ l_tr_SIR_tauleap = [:U]
 p_SIR_tauleap = [0.0012, 0.05, 5.0]
 x0_SIR_tauleap = [95, 5, 0]
 t0_SIR_tauleap = 0.0
-function SIR_tauleap_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transition},
-                xn::Vector{Int}, tn::Float64, p::Vector{Float64})
-    tau = p[3]
-    a1 = p[1] * xn[1] * xn[2]
-    a2 = p[2] * xn[2]
-    l_a = SVector(a1, a2)
+@everywhere function SIR_tauleap_f!(xnplus1::Vector{Int}, l_t::Vector{Float64}, l_tr::Vector{Transition},
+                                    xn::Vector{Int}, tn::Float64, p::Vector{Float64})
+    @inbounds tau = p[3]
+    @inbounds a1 = p[1] * xn[1] * xn[2]
+    @inbounds a2 = p[2] * xn[2]
+    l_a = (a1, a2)
     asum = sum(l_a)
     if asum == 0.0
         copyto!(xnplus1, xn)
         return nothing
     end
     # column-major order
-    nu_1 = SVector(-1, 1, 0)
-    nu_2 = SVector(0, -1, 1)
+    nu_1 = (-1, 1, 0)
+    nu_2 = (0, -1, 1)
     nbr_R1 = rand(Poisson(a1*tau))
     nbr_R2 = rand(Poisson(a2*tau))
     for i = 1:3
-        xnplus1[i] = xn[i]+ nbr_R1*nu_1[i] + nbr_R2*nu_2[i]
+        @inbounds xnplus1[i] = xn[i]+ nbr_R1*nu_1[i] + nbr_R2*nu_2[i]
     end
     l_t[1] = tn + tau
     l_tr[1] = :U
 end
-isabsorbing_SIR_tauleap(p::Vector{Float64}, xn::Vector{Int}) = (p[1]*xn[1]*xn[2] + p[2]*xn[2]) === 0.0
+@everywhere isabsorbing_SIR_tauleap(p::Vector{Float64}, xn::Vector{Int}) = (p[1]*xn[1]*xn[2] + p[2]*xn[2]) === 0.0
 g_SIR_tauleap = [:I]
 
 SIR_tauleap = ContinuousTimeModel(d,k,dict_var_SIR_tauleap,dict_p_SIR_tauleap,l_tr_SIR_tauleap,
                                   p_SIR_tauleap,x0_SIR_tauleap,t0_SIR_tauleap,
-                                  SIR_tauleap_f!,isabsorbing_SIR_tauleap; g=g_SIR_tauleap, name="SIR tauleap pkg")
+                                  getfield(Main, :SIR_tauleap_f!), getfield(Main, :isabsorbing_SIR_tauleap); g=g_SIR_tauleap, name="SIR tauleap pkg")
 function create_SIR_tauleap(new_p::Vector{Float64})
     SIR_tauleap_new = deepcopy(SIR_tauleap)
     @assert length(SIR_tauleap_new.p) == length(new_p)
diff --git a/tests/run_unit.jl b/tests/run_unit.jl
index c3f168d..06e4f43 100644
--- a/tests/run_unit.jl
+++ b/tests/run_unit.jl
@@ -27,6 +27,7 @@ using Test
     @test include("unit/long_sim_er.jl")
     
     @test include("unit/macro_network_model.jl")
+    @test include("unit/macro_network_model_distributed.jl")
     @test include("unit/model_prior.jl")
     @test include("unit/models_exps_er_1d.jl")
     @test include("unit/models_exps_er_2d.jl")
diff --git a/tests/unit/macro_network_model_distributed.jl b/tests/unit/macro_network_model_distributed.jl
new file mode 100644
index 0000000..d02d440
--- /dev/null
+++ b/tests/unit/macro_network_model_distributed.jl
@@ -0,0 +1,61 @@
+
+using Distributed
+using MarkovProcesses
+# A bunch of code to define the package on the workers
+# because they are created after the execution of Julia
+addprocs(2)
+module_path = get_module_path()
+@everywhere module_path = $module_path
+@everywhere push!(LOAD_PATH, "$(module_path)/core")
+@everywhere using MarkovProcesses
+
+test_all = true
+
+# SIR created by macro
+model_SIR = @network_model begin
+    R1: (S+I => 2I, ki*S*I)
+    R2: (I => R, kr*I)
+end "SIR"
+set_x0!(model_SIR, [95,5,0])
+set_param!(model_SIR, [0.012, 0.05])
+# Check if we send the model to the workers, 
+# then the simulation works
+@everywhere simulate($model_SIR)
+sym_f! = Symbol(model_SIR.f!)
+sym_isabs = Symbol(model_SIR.isabsorbing)
+test_all = test_all && fetch(@spawnat 2 isdefined(Main, sym_f!)) && fetch(@spawnat 2 isdefined(Main, sym_isabs))
+test_all = test_all && fetch(@spawnat 3 isdefined(Main, sym_f!)) && fetch(@spawnat 3 isdefined(Main, sym_isabs))
+# Check the model object is not defined on workers
+test_all = test_all && !fetch(@spawnat 2 isdefined(Main, :model_SIR)) && !fetch(@spawnat 2 isdefined(Main, :model_SIR))
+test_all = test_all && !fetch(@spawnat 3 isdefined(Main, :model_SIR)) && !fetch(@spawnat 3 isdefined(Main, :model_SIR))
+
+# SIR pkg
+load_model("SIR")
+# Check if we send the model to the workers, 
+# then the simulation works
+@everywhere simulate($SIR)
+sym_f! = Symbol(SIR.f!)
+sym_isabs = Symbol(SIR.isabsorbing)
+test_all = test_all && fetch(@spawnat 2 isdefined(Main, sym_f!)) && fetch(@spawnat 2 isdefined(Main, sym_isabs))
+test_all = test_all && fetch(@spawnat 3 isdefined(Main, sym_f!)) && fetch(@spawnat 3 isdefined(Main, sym_isabs))
+# Check the model object is not defined on workers
+test_all = test_all && !fetch(@spawnat 2 isdefined(Main, :SIR)) && !fetch(@spawnat 2 isdefined(Main, :SIR))
+test_all = test_all && !fetch(@spawnat 3 isdefined(Main, :SIR)) && !fetch(@spawnat 3 isdefined(Main, :SIR))
+
+# ER pkg
+load_model("ER")
+# Check if we send the model to the workers, 
+# then the simulation works
+@everywhere simulate($ER)
+sym_f! = Symbol(ER.f!)
+sym_isabs = Symbol(ER.isabsorbing)
+test_all = test_all && fetch(@spawnat 2 isdefined(Main, sym_f!)) && fetch(@spawnat 2 isdefined(Main, sym_isabs))
+test_all = test_all && fetch(@spawnat 3 isdefined(Main, sym_f!)) && fetch(@spawnat 3 isdefined(Main, sym_isabs))
+# Check the model object is not defined on workers
+test_all = test_all && !fetch(@spawnat 2 isdefined(Main, :ER)) && !fetch(@spawnat 2 isdefined(Main, :ER))
+test_all = test_all && !fetch(@spawnat 3 isdefined(Main, :ER)) && !fetch(@spawnat 3 isdefined(Main, :ER))
+
+rmprocs(2)
+
+return test_all
+
-- 
GitLab