Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadMessage/WriteMessage #2833

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
166 changes: 160 additions & 6 deletions src/parallel/bbs.cpp
Expand Up @@ -269,7 +269,6 @@
void BBSImpl::execute(int id) { // assumes a "_todo" message in receive buffer
++etaskcnt;
double st, et;
int userid;
char* rs;
char* s;
size_t n;
Expand All @@ -282,18 +281,17 @@
if (debug_) {
printf("execute begin %g: working_id_=%d\n", st, working_id_);
}
userid = upkint();
int wid = upkint();
Message mess = readMessage(this);
hoc_ac_ = double(id);
rs = execute_helper(&n, id); // builds and execute hoc statement
rs = execute_helper(mess, &n, id); // builds and execute hoc statement
et = time() - st;
total_exec_time += et;
if (debug) {
printf("execute end elapsed %g: working_id_=%d hoc_ac_=%g\n", et, working_id_, hoc_ac_);
}
pkbegin();
pkint(userid);
pkint(wid);
pkint(mess.userid);
pkint(mess.wid);
pkint(rs ? 1 : 0);
if (!rs) {
pkdouble(hoc_ac_);
Expand Down Expand Up @@ -490,3 +488,159 @@
}
started_ = 1;
}

static std::vector<char> fromUpkpickle(BBSImpl* impl) {
std::size_t n;
char* s = impl->upkpickle(&n);
std::vector<char> pickle(s, s + n);
delete[] s;
return pickle;
}

static std::string fromUpkstr(BBSImpl* impl) {
char* s = impl->upkstr();
std::string str(s);
delete[] s;
return str;
}

static MyStr fromUpkstr2(BBSImpl* impl) {
char* s = impl->upkstr();
MyStr str(s);
return str;
}

static std::vector<double> fromUpkvec(BBSImpl* impl) {
std::size_t n = impl->upkint();
std::vector<double> vec(n);
impl->upkvec(n, vec.data());
return vec;
}

std::vector<Message::ArgType> readArgsFromOc(int hoc_arg_index) {
std::vector<Message::ArgType> args;
for (; ifarg(i); ++i) {

Check failure on line 522 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'

Check failure on line 522 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
if (hoc_is_double_arg(i)) {

Check failure on line 523 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
args.push_back(*getarg(i));

Check failure on line 524 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
} else if (hoc_is_str_arg(i)) {

Check failure on line 525 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
char* s = gargstr(i);

Check failure on line 526 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
args.push_back(std::string(s));

Check failure on line 527 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

no matching member function for call to 'push_back'
delete[] s;
} else if (is_vector_arg(i)) {

Check failure on line 529 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
Vect* vec = vector_arg(i);

Check failure on line 530 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

unknown type name 'Vect'

Check failure on line 530 in src/parallel/bbs.cpp

View workflow job for this annotation

GitHub Actions / ubuntu-22.04 - cmake (-DNRN_ENABLE_CORENEURON=ON -DNRN_ENABLE_INTERVIEWS=OFF -DNMODL_SANITIZERS=undefinedundefined)

use of undeclared identifier 'i'
args.push_back(vec->vec());
} else { // must be a PythonObject
size_t size;
char* s = neuron::python::methods.po2pickle(*hoc_objgetarg(i), &size);
std::vector<char> pickle(size);
std::copy(s, s + size, pickle.data());
delete[] s;
args.push_back(pickle);
}
}
return args;
}

Message readMessage(BBSImpl* impl) {
Message mess{};
mess.userid = impl->upkint();
mess.wid = impl->upkint();
mess.style = impl->upkint();

int arg_manifest = 0;
switch (mess.style) {
case 0:
mess.statement = fromUpkstr(impl);
break;
case 1:
mess.fname = fromUpkstr(impl);
arg_manifest = impl->upkint();
break;
case 2:
mess.template_name = fromUpkstr(impl);
mess.object_index = impl->upkint();
mess.fname = fromUpkstr(impl);
arg_manifest = impl->upkint();
break;
case 3:
mess.pickle = fromUpkpickle(impl);
arg_manifest = impl->upkint();
break;
}

if (arg_manifest != 0) {
for (int i = 0, j = arg_manifest; (i = j % 5) != 0; j /= 5) {
switch (i) {
case 1:
mess.args.push_back(impl->upkdouble());
break;
case 2:
mess.args.push_back(fromUpkstr2(impl));
break;
case 3:
mess.args.push_back(fromUpkvec(impl));
break;
case 4:
mess.args.push_back(fromUpkpickle(impl));
break;
}
}
}

return mess;
}

int computeArgType(const std::vector<Message::ArgType>& args) {
int argtype = 0;
int ii = 1;
for (auto& arg: args) {
if (auto* var = std::get_if<double>(&arg)) {
argtype += 1 * ii;
} else if (auto* var = std::get_if<MyStr>(&arg)) {
argtype += 2 * ii;
} else if (auto* var = std::get_if<std::vector<double>>(&arg)) {
argtype += 3 * ii;
} else if (auto* var = std::get_if<std::vector<char>>(&arg)) {
argtype += 4 * ii;
}
ii *= 5;
}
return argtype;
}

void writeArgs(BBS* impl, std::vector<Message::ArgType>& args) {
for (auto& arg: args) {
if (auto* var = std::get_if<double>(&arg)) {
impl->pkdouble(*var);
} else if (auto* var = std::get_if<MyStr>(&arg)) {
impl->pkstr(var->data());
} else if (auto* var = std::get_if<std::vector<double>>(&arg)) {
impl->pkint(var->size());
impl->pkvec(var->size(), var->data());
} else if (auto* var = std::get_if<std::vector<char>>(&arg)) {
impl->pkpickle(var->data(), var->size());
}
}
}

void writeMessage(BBS* impl, Message& mess) {
impl->pkint(mess.userid);
impl->pkint(mess.wid);
impl->pkint(mess.style);

if (mess.style == 0) {
impl->pkstr(mess.statement.c_str());
return;
} else if (mess.style == 1) {
impl->pkstr(mess.fname.c_str());
} else if (mess.style == 2) {
impl->pkstr(mess.template_name.c_str());
impl->pkint(mess.object_index);
impl->pkstr(mess.fname.c_str());
} else if (mess.style == 3) {
impl->pkpickle(mess.pickle.data(), mess.pickle.size());
}

impl->pkint(computeArgType(mess.args));
writeArgs(impl, mess.args);
}
2 changes: 2 additions & 0 deletions src/parallel/bbs.h
Expand Up @@ -72,3 +72,5 @@ class BBS {
protected:
BBSImpl* impl_;
};

void writeMessage(BBS* impl, Message& mess);
5 changes: 2 additions & 3 deletions src/parallel/bbsclimpi.cpp
Expand Up @@ -226,13 +226,12 @@ int BBSClient::take_todo() {
size_t n;
while ((type = get(0, TAKE_TODO)) == CONTEXT) {
upkbegin();
upkint(); // throw away userid
upkint(); // throw away info in reserved second slot for worker_id
Message mess = readMessage(this);
#if debug
printf("%d execute context\n", nrnmpi_myid_bbs);
fflush(stdout);
#endif
rs = execute_helper(&n, -1);
rs = execute_helper(mess, &n, -1);
if (rs) {
delete[] rs;
}
Expand Down
5 changes: 2 additions & 3 deletions src/parallel/bbsdirectmpi.cpp
Expand Up @@ -56,9 +56,8 @@ void BBSDirect::context() {
nrnmpi_ref(recvbuf_);
nrnmpi_copy(recvbuf_, sendbuf_);
nrnmpi_upkbegin(recvbuf_);
nrnmpi_upkint(recvbuf_);
nrnmpi_upkint(recvbuf_); // slot reserved for tag
execute_helper(&n, -1, false);
Message mess = readMessage(this);
execute_helper(mess, &n, -1, false);
nrnmpi_unref(recvbuf_);
recvbuf_ = rsav;
}
Expand Down
52 changes: 49 additions & 3 deletions src/parallel/bbsimpl.h
@@ -1,5 +1,11 @@
#pragma once

#include <string>
#include <variant>
#include <vector>

struct Message;

class BBSImpl {
public:
BBSImpl();
Expand Down Expand Up @@ -63,12 +69,52 @@ class BBSImpl {
static bool master_works_;

protected:
char* execute_helper(size_t*, int id, bool exec = true); // involves hoc specific details in
// ocbbs.cpp
void subworld_worker_execute(); // shadows execute_helper. ie. each of
char* execute_helper(Message&, size_t*, int id, bool exec = true); // involves hoc specific
// details in ocbbs.cpp
void subworld_worker_execute(); // shadows execute_helper. ie. each of
// the nrnmpi_myid_bbs workers (and master) need to execute
// the same thing on each of the subworld processes
// associated with nrnmpi_myid==0. A subworld does not
// intracommunicate via the bulletin board but only via
// mpi on the subworld communicator.
};

class MyStr {
public:
MyStr(char* s)
: s_(s){};

~MyStr() {
delete[] s_;
}

std::size_t size() {
return strlen(s_);
}

char*& data() {
return s_;
}

private:
char* s_ = nullptr;
};

struct Message {
int userid;
int wid; // working_id
int style;

std::string statement;

std::string fname;
std::string template_name;
int object_index;

std::vector<char> pickle;

using ArgType = std::variant<double, std::vector<double>, std::vector<char>, MyStr>;
std::vector<ArgType> args;
};

Message readMessage(BBSImpl* impl);