Skip to content

Commit

Permalink
MPI: put in netpar variables that should not be global (#2421)
Browse files Browse the repository at this point in the history
  • Loading branch information
alkino committed Jul 17, 2023
1 parent 08bebf0 commit 9a96a3a
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 83 deletions.
25 changes: 23 additions & 2 deletions src/nrniv/netpar.cpp
Expand Up @@ -41,6 +41,19 @@ static IvocVect* all_spiketvec = NULL;
static IvocVect* all_spikegidvec = NULL;
static double t_exchange_;
static double dt1_; // 1/dt
static int localgid_size_;
static int ag_send_size_;
static int ag_send_nspike_;
static int ovfl_capacity_;
static int ovfl_;
static unsigned char* spfixout_;
static unsigned char* spfixin_;
static unsigned char* spfixin_ovfl_;
static int nout_;
static int* nin_;
static NRNMPI_Spike* spikeout_;
static NRNMPI_Spike* spikein_;
static int icapacity_;
static void alloc_space();

extern NetCvode* net_cvode_instance;
Expand Down Expand Up @@ -574,7 +587,7 @@ void nrn_spike_exchange(NrnThread* nt) {
nrnmpi_barrier();
nrnmpi_step_wait_ += nrnmpi_wtime() - wt;
}
n = nrnmpi_spike_exchange();
n = nrnmpi_spike_exchange(&ovfl_, &nout_, nin_, spikeout_, &spikein_, &icapacity_);
wt_ = nrnmpi_wtime() - wt;
wt = nrnmpi_wtime();
TBUF
Expand Down Expand Up @@ -670,7 +683,15 @@ void nrn_spike_exchange_compressed(NrnThread* nt) {
nrnmpi_barrier();
nrnmpi_step_wait_ += nrnmpi_wtime() - wt;
}
n = nrnmpi_spike_exchange_compressed();
n = nrnmpi_spike_exchange_compressed(localgid_size_,
ag_send_size_,
ag_send_nspike_,
&ovfl_capacity_,
&ovfl_,
spfixout_,
spfixin_,
&spfixin_ovfl_,
nin_);
wt_ = nrnmpi_wtime() - wt;
wt = nrnmpi_wtime();
TBUF
Expand Down
1 change: 1 addition & 0 deletions src/nrnmpi/mkdynam.sh
Expand Up @@ -13,6 +13,7 @@ sed -n '
s/, [a-zA-Z0-9_*]* /, /g
s/)([a-zA-Z_0-9*]* /)(/
s/char\* //g
s/char\*\* //g
s/std::string& //g
}
'> nrnmpi_dynam_wrappers.inc
Expand Down
82 changes: 47 additions & 35 deletions src/nrnmpi/mpispike.cpp
Expand Up @@ -66,7 +66,7 @@ void nrnmpi_spike_initialize() {

static MPI_Datatype spikebuf_type;

static void make_spikebuf_type() {
static void make_spikebuf_type(int* nout_) {
NRNMPI_Spikebuf s;
int block_lengths[3];
MPI_Aint displacements[3];
Expand Down Expand Up @@ -95,34 +95,39 @@ static void make_spikebuf_type() {
}
#endif

int nrnmpi_spike_exchange() {
int nrnmpi_spike_exchange(int* ovfl,
int* nout_,
int* nin_,
NRNMPI_Spike* spikeout_,
NRNMPI_Spike** spikein_,
int* icapacity_) {
int i, n, novfl, n1;
if (!displs) {
np = nrnmpi_numprocs;
displs = (int*) hoc_Emalloc(np * sizeof(int));
hoc_malchk();
displs[0] = 0;
#if nrn_spikebuf_size > 0
make_spikebuf_type();
make_spikebuf_type(nout_);
#endif
}
nrnbbs_context_wait();
#if nrn_spikebuf_size == 0
MPI_Allgather(&nout_, 1, MPI_INT, nin_, 1, MPI_INT, nrnmpi_comm);
MPI_Allgather(nout_, 1, MPI_INT, nin_, 1, MPI_INT, nrnmpi_comm);
n = nin_[0];
for (i = 1; i < np; ++i) {
displs[i] = n;
n += nin_[i];
}
if (n) {
if (icapacity_ < n) {
icapacity_ = n + 10;
free(spikein_);
spikein_ = (NRNMPI_Spike*) hoc_Emalloc(icapacity_ * sizeof(NRNMPI_Spike));
if (*icapacity_ < n) {
*icapacity_ = n + 10;
free(*spikein_);
*spikein_ = (NRNMPI_Spike*) hoc_Emalloc(*icapacity_ * sizeof(NRNMPI_Spike));
hoc_malchk();
}
MPI_Allgatherv(
spikeout_, nout_, spike_type, spikein_, nin_, displs, spike_type, nrnmpi_comm);
spikeout_, *nout_, spike_type, *spikein_, nin_, displs, spike_type, nrnmpi_comm);
}
#else
MPI_Allgather(spbufout_, 1, spikebuf_type, spbufin_, 1, spikebuf_type, nrnmpi_comm);
Expand All @@ -146,16 +151,16 @@ int nrnmpi_spike_exchange() {
}
}
if (novfl) {
if (icapacity_ < novfl) {
icapacity_ = novfl + 10;
free(spikein_);
spikein_ = (NRNMPI_Spike*) hoc_Emalloc(icapacity_ * sizeof(NRNMPI_Spike));
if (*icapacity_ < novfl) {
*icapacity_ = novfl + 10;
free(*spikein_);
*spikein_ = (NRNMPI_Spike*) hoc_Emalloc(*icapacity_ * sizeof(NRNMPI_Spike));
hoc_malchk();
}
n1 = (nout_ > nrn_spikebuf_size) ? nout_ - nrn_spikebuf_size : 0;
MPI_Allgatherv(spikeout_, n1, spike_type, spikein_, nin_, displs, spike_type, nrnmpi_comm);
n1 = (*nout_ > nrn_spikebuf_size) ? *nout_ - nrn_spikebuf_size : 0;
MPI_Allgatherv(spikeout_, n1, spike_type, *spikein_, nin_, displs, spike_type, nrnmpi_comm);
}
ovfl_ = novfl;
*ovfl = novfl;
#endif
return n;
}
Expand All @@ -178,7 +183,15 @@ a sequence of spiketime, localgid pairs. There are nspike of them.
The allgather sends the first part of the buf and the allgatherv buffer
sends any overflow.
*/
int nrnmpi_spike_exchange_compressed() {
int nrnmpi_spike_exchange_compressed(int localgid_size,
int ag_send_size,
int ag_send_nspike,
int* ovfl_capacity,
int* ovfl,
unsigned char* spfixout,
unsigned char* spfixin,
unsigned char** spfixin_ovfl,
int* nin_) {
int i, novfl, n, ntot, idx, bs, bstot; /* n is #spikes, bs is #byte overflow */
if (!displs) {
np = nrnmpi_numprocs;
Expand All @@ -192,52 +205,51 @@ int nrnmpi_spike_exchange_compressed() {
}
nrnbbs_context_wait();

MPI_Allgather(
spfixout_, ag_send_size_, MPI_BYTE, spfixin_, ag_send_size_, MPI_BYTE, nrnmpi_comm);
MPI_Allgather(spfixout, ag_send_size, MPI_BYTE, spfixin, ag_send_size, MPI_BYTE, nrnmpi_comm);
novfl = 0;
ntot = 0;
bstot = 0;
for (i = 0; i < np; ++i) {
displs[i] = bstot;
idx = i * ag_send_size_;
n = spfixin_[idx++] * 256;
n += spfixin_[idx++];
idx = i * ag_send_size;
n = spfixin[idx++] * 256;
n += spfixin[idx++];
ntot += n;
nin_[i] = n;
if (n > ag_send_nspike_) {
bs = 2 + n * (1 + localgid_size_) - ag_send_size_;
if (n > ag_send_nspike) {
bs = 2 + n * (1 + localgid_size) - ag_send_size;
byteovfl[i] = bs;
bstot += bs;
novfl += n - ag_send_nspike_;
novfl += n - ag_send_nspike;
} else {
byteovfl[i] = 0;
}
}
if (novfl) {
if (ovfl_capacity_ < novfl) {
ovfl_capacity_ = novfl + 10;
free(spfixin_ovfl_);
spfixin_ovfl_ = (unsigned char*) hoc_Emalloc(ovfl_capacity_ * (1 + localgid_size_) *
if (*ovfl_capacity < novfl) {
*ovfl_capacity = novfl + 10;
free(*spfixin_ovfl);
*spfixin_ovfl = (unsigned char*) hoc_Emalloc(*ovfl_capacity * (1 + localgid_size) *
sizeof(unsigned char));
hoc_malchk();
}
bs = byteovfl[nrnmpi_myid];
/*
note that the spfixout_ buffer is one since the overflow
is contiguous to the first part. But the spfixin_ovfl_ is
completely separate from the spfixin_ since the latter
note that the spfixout buffer is one since the overflow
is contiguous to the first part. But the spfixin_ovfl is
completely separate from the spfixin since the latter
dynamically changes its size during a run.
*/
MPI_Allgatherv(spfixout_ + ag_send_size_,
MPI_Allgatherv(spfixout + ag_send_size,
bs,
MPI_BYTE,
spfixin_ovfl_,
*spfixin_ovfl,
byteovfl,
displs,
MPI_BYTE,
nrnmpi_comm);
}
ovfl_ = novfl;
*ovfl = novfl;
return ntot;
}

Expand Down
29 changes: 0 additions & 29 deletions src/nrnmpi/mpispike.h
Expand Up @@ -13,35 +13,6 @@ typedef struct {
} NRNMPI_Spikebuf;
#endif


#define icapacity_ nrnmpi_i_capacity_
#define spikeout_ nrnmpi_spikeout_
#define spikein_ nrnmpi_spikein_
#define nout_ nrnmpi_nout_
#define nin_ nrnmpi_nin_
extern int nout_;
extern int* nin_;
extern int icapacity_;
extern NRNMPI_Spike* spikeout_;
extern NRNMPI_Spike* spikein_;

#define spfixout_ nrnmpi_spikeout_fixed_
#define spfixin_ nrnmpi_spikein_fixed_
#define spfixin_ovfl_ nrnmpi_spikein_fixed_ovfl_
#define localgid_size_ nrnmpi_localgid_size_
#define ag_send_size_ nrnmpi_ag_send_size_
#define ag_send_nspike_ nrnmpi_send_nspike_
#define ovfl_capacity_ nrnmpi_ovfl_capacity_
#define ovfl_ nrnmpi_ovfl_
extern int localgid_size_; /* bytes */
extern int ag_send_size_; /* bytes */
extern int ag_send_nspike_; /* spikes */
extern int ovfl_capacity_; /* spikes */
extern int ovfl_; /* spikes */
extern unsigned char* spfixout_;
extern unsigned char* spfixin_;
extern unsigned char* spfixin_ovfl_;

#if nrn_spikebuf_size > 0
#define spbufout_ nrnmpi_spbufout_
#define spbufin_ nrnmpi_spbufin_
Expand Down
16 changes: 1 addition & 15 deletions src/nrnmpi/nrnmpi_def_cinc
Expand Up @@ -10,18 +10,4 @@ int nrnmpi_subworld_change_cnt = 0;
int nrnmpi_subworld_id = -1;
int nrnmpi_numprocs_subworld = 1;

int nrnmpi_nout_;
int* nrnmpi_nin_;
int nrnmpi_i_capacity_;
NRNMPI_Spike* nrnmpi_spikeout_;
NRNMPI_Spike* nrnmpi_spikein_;

int nrnmpi_localgid_size_;
int nrnmpi_ag_send_size_;
int nrnmpi_send_nspike_;
int nrnmpi_ovfl_capacity_;
int nrnmpi_ovfl_;
unsigned char* nrnmpi_spikeout_fixed_;
unsigned char* nrnmpi_spikein_fixed_;
unsigned char* nrnmpi_spikein_fixed_ovfl_;
int nrn_cannot_use_threads_and_mpi;
int nrn_cannot_use_threads_and_mpi = 0;
4 changes: 2 additions & 2 deletions src/nrnmpi/nrnmpidec.h
Expand Up @@ -65,8 +65,8 @@ extern void nrnmpi_subworld_size(int n);

/* from mpispike.cpp */
extern void nrnmpi_spike_initialize();
extern int nrnmpi_spike_exchange();
extern int nrnmpi_spike_exchange_compressed();
extern int nrnmpi_spike_exchange(int* ovfl, int* nout, int* nin, NRNMPI_Spike* spikeout, NRNMPI_Spike** spikein, int* icapacity_);
extern int nrnmpi_spike_exchange_compressed(int localgid_size, int ag_send_size, int ag_send_nspike, int* ovfl_capacity, int* ovfl, unsigned char* spfixout, unsigned char* spfixin, unsigned char** spfixin_ovfl, int* nin_);
extern double nrnmpi_mindelay(double maxdel);
extern int nrnmpi_int_allmax(int i);
extern void nrnmpi_int_gather(int* s, int* r, int cnt, int root);
Expand Down

0 comments on commit 9a96a3a

Please sign in to comment.