Skip to content
Snippets Groups Projects
Commit df48f395 authored by Marcin Kirsz's avatar Marcin Kirsz
Browse files

Almost there, just need to introduce work balancing scheme for cases where...

Almost there, just need to introduce work balancing scheme for cases where there is no more work and the worker's phi is not full
parent fef81a4a
No related branches found
No related tags found
2 merge requests!5MPI version of tadah,!3MPI version of Tadah
Pipeline #37314 failed
......@@ -38,6 +38,10 @@ void TadahCLI::subcommand_train() {
#ifdef TADAH_BUILD_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &ncpu);
if (ncpu<2) {
std::cout << "Minimum number of cpus for an mpi version is 2" << std::endl;
return;
}
MPI_Status status;
#endif
......@@ -91,8 +95,11 @@ void TadahCLI::subcommand_train() {
NNFinder nnf(config);
#ifdef TADAH_BUILD_MPI
const int RELEASE_TAG = 2;
const int DATA_TAG = 1;
const int WORK_TAG = 0;
int PHI_cols;
int PHI_rows;
......@@ -124,6 +131,7 @@ void TadahCLI::subcommand_train() {
// the required function must take into considersation:
// the size of the blok rnb, number of cols and rows in Phi
// ideally the grid should be as square as possible, unless PHI is narrow
// Probably we want the number of column processes to be <= ~sqrt(ncpu)/cnb
b_ncols = 1;
b_nrows = ncpu;
assert(b_nrows * b_ncols == ncpu);
......@@ -142,15 +150,21 @@ void TadahCLI::subcommand_train() {
// int PHI_size = PHI_rows*PHI_cols;
int phi_rows = numroc_( &PHI_rows, &rnb, &b_row, &izero, &b_nrows );
int phi_cols = numroc_( &PHI_cols, &cnb, &b_col, &izero, &b_ncols );
std::cout << "BRANK: " << b_rank << " phi_row: " << phi_rows << " phi_col " << phi_cols << std::endl;
// once we know the size of the PHI, we can assign local phi matrices
Matrix phi;
if (rank!=0) {
// allocate matrix for each worker
phi = Matrix(phi_rows,phi_cols);
}
std::cout << "BRANK: " << b_rank << " phi_rows: " << phi_rows << " phi_col " << phi_cols << std::endl;
size_t phi_row = 0; // next row to be filled in the local phi array
int rows_available=phi_rows; // number of available rows in the local phi array
// once we know the size of local phi, we can allocate memory to it
// including host as well. The host will collect excees computations from
// workers
DesignMatrix<DM_Function_Base&> dm(*fb, config);
dm.Phi.resize(phi_rows,phi_cols);
//Matrix phi;
//if (rank!=0) {
// // allocate matrix for each worker
// phi = Matrix(phi_rows,phi_cols);
//}
// BEGIN HOST-WORKER
if (rank==0) {
......@@ -158,7 +172,7 @@ void TadahCLI::subcommand_train() {
// prepare work packages
// filename, first structure index, number of structures to read
std::vector<std::tuple<std::string,int,int>> wpckgs;
int nstruc = 8; // TODO the number of structures in a single work package
int nstruc = 18; // TODO the number of structures in a single work package
for (const std::string &fn : config("DBFILE")) {
// get number of structures
int dbsize = StructureDB::count(fn).first;
......@@ -181,83 +195,199 @@ void TadahCLI::subcommand_train() {
if (wpckgs.size()==0) {
// no more packages, just skip remaining workers
// we will release them soon...
// we will collect remaining data and release them outside of this loop
break;
}
// receive request from ANY worker
MPI_Recv (NULL, 0, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status);
// receive ANY request from ANY worker
int arr_size; // array size for DATA_TAG
MPI_Recv (&arr_size, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
int worker = status.MPI_SOURCE;
int tag = status.MPI_TAG;
std::tuple<std::string,int,int> wpckg = wpckgs.back();
wpckgs.pop_back();
if (tag==WORK_TAG) {
std::tuple<std::string,int,int> wpckg = wpckgs.back();
wpckgs.pop_back();
// send dataset filename
const char *fn = std::get<0>(wpckg).c_str();
int fn_length = std::get<0>(wpckg).length()+1; // +1 for char
MPI_Send (&fn_length, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD);
MPI_Send (fn, fn_length, MPI_CHAR, worker, WORK_TAG, MPI_COMM_WORLD);
// send dataset filename
const char *fn = std::get<0>(wpckg).c_str();
int fn_length = std::get<0>(wpckg).length()+1; // +1 for char
MPI_Send (&fn_length, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD);
MPI_Send (fn, fn_length, MPI_CHAR, worker, WORK_TAG, MPI_COMM_WORLD);
// send index of the first structure to load
int first = std::get<1>(wpckg);
MPI_Send (&first, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD);
// send index of the first structure to load
int first = std::get<1>(wpckg);
MPI_Send (&first, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD);
// send number of structures to load
int nstruc = std::get<2>(wpckg);
MPI_Send (&nstruc, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD);
// send number of structures to load
int nstruc = std::get<2>(wpckg);
MPI_Send (&nstruc, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD);
std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl;
std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl;
}
else if (tag==DATA_TAG) {
// TODO
int rows_needed = arr_size/phi_cols;
std::cout << "HOST received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl;
rows_available -= rows_needed;
int start=phi_row*phi_cols;
phi_row+=rows_needed;
if (rows_available==0 ) { std::cout << "!!! HOST LOCAL MATRIX IS FILLED !!!" << std::endl;}
if (rows_available<0 ) { throw std::runtime_error("The number of rows in the local array is smaller than requested.");}
MPI_Recv (&dm.Phi.data()[0], arr_size, MPI_DOUBLE, worker, DATA_TAG, MPI_COMM_WORLD, &status);
}
else {
std::runtime_error("HOST: Unexpected request from " + std::to_string(worker));
}
}
// work finised, release all workers
int count=0;
// work finised, collect remaining data and release all workers
int count=1; // skip host
while(true) {
MPI_Recv (NULL, 0, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
int arr_size; // array size for DATA_TAG
MPI_Recv (&arr_size, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
int worker = status.MPI_SOURCE;
MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD);
count++;
if (count==ncpu-1) break;
int tag = status.MPI_TAG;
if (tag==DATA_TAG) {
// TODO
int rows_needed = arr_size/phi_cols;
std::cout << "HOST received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl;
rows_available -= rows_needed;
int start=phi_row*phi_cols;
phi_row+=rows_needed;
if (rows_available==0 ) { std::cout << "!!! HOST LOCAL MATRIX IS FILLED !!!" << std::endl;}
if (rows_available<0 ) { throw std::runtime_error("The number of rows in the local array is smaller than requested.");}
MPI_Recv (&dm.Phi.data()[0], arr_size, MPI_DOUBLE, worker, DATA_TAG, MPI_COMM_WORLD, &status);
}
else {
// there is no more work so release a worker
MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD);
count++;
if (count==ncpu) { break; }
}
}
} else {
std::cout << "HOST: " << "r_avail: " << rows_available << std::endl;
std::cout << "HOST EXIT" << std::endl;
}
else {
// worker
int fn_length;
int first;
int nstruc;
int first; // index of the first structure to read from the file
int nstruc; // number of structures to be processed
while (true) {
// ask for more work...
MPI_Send (NULL, 0, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD);
std::cout << "WORKER NUM: " << b_rank << " waiting..." << std::endl;
MPI_Send (NULL, NULL, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD);
// get either work or release signal
MPI_Recv (&fn_length, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
std::cout << "WORKER NUM: " << b_rank << " got message!" << std::endl;
std::cout << "Lenght of char fn: " << fn_length << std::endl;
std::cout << "MPI_TAG: " << status.MPI_TAG << std::endl;
// if release tag but the worker's local phi is not full
// the workers sends request to the host for remaining data
// release worker
if (status.MPI_TAG == RELEASE_TAG) {
std::cout << "WORKER: " << b_rank << "RELEASE TAG" << std::endl;
if (rows_available>0 ) { throw std::runtime_error("Attempting to release worker... but the workers requires more data !!");}
break;
}
// otherwise do work
// otherwise get work package
char *fn = (char *) malloc(fn_length+1);
MPI_Recv (fn, fn_length, MPI_CHAR, 0, WORK_TAG, MPI_COMM_WORLD, &status);
MPI_Recv (&first, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status);
MPI_Recv (&nstruc, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status);
std::cout << "WORKER: " << fn << " " << first << " " << nstruc << std::endl;
std::cout << "WORKER: " << b_rank << "file: " << fn << " first: " << first << " nstruc: " << nstruc << std::endl;
// do work
StructureDB stdb;
stdb.add(std::string(fn,fn_length),first,nstruc);
std::cout << "WORKER: " << b_rank << stdb << std::endl;
std::cout << "WORKER: " << b_rank << " stdb.size(): " << stdb.size() << std::endl;
nnf.calc(stdb);
// temp solution is to compute temp_phi inside DM and then copy data local phi matrix
//Config c = config;
//DesignMatrix<DM_Function_Base&> dm(*fb, c);
//Normaliser norm(c); // <-- we don't want this
//dm.build(stdb,norm,dc); // this is just for resizing for now
// size_t phi_row = 0; // local just for now, this is of course wrong
// one way to find out what we need in terms of phi size
int rows_needed = 0;
for (size_t s=0; s<stdb.size(); ++s) {
int natoms = stdb(s).natoms();
rows_needed += DesignMatrixBase::phi_rows_num(config, 1, natoms);
}
std:: cout << "WORKER: " << b_rank << " R avail: " << rows_available << " R needed: " << rows_needed << std::endl;
if ((rows_available-rows_needed)<0) {
// we do not have enough rows in the local phi matrix
// so we create temp DM of required size
DesignMatrix<DM_Function_Base&> temp_dm(*fb, config);
temp_dm.Phi.resize(rows_needed,phi_cols);
// and compute all rows
size_t temp_phi_row=0;
for (size_t s=0; s<stdb.size(); ++s) {
StDescriptors st_d = dc.calc(stdb(s));
temp_dm.build(temp_phi_row,stdb(s),st_d); // phi_row++
}
// first we try to fill remaining rows in the local phi matrix
if (rows_available>0) {
for (size_t r=0; r<rows_available; r++) {
for (size_t c=0; c<phi_cols; c++) {
dm.Phi(phi_row,c) = temp_dm.Phi(r,c);
}
phi_row++;
rows_needed--;
}
}
// there are no more available rows
rows_available=0;
std::cout << "FULL WORKER: " << b_rank << " phi_row: " << phi_row << " phi_rows: " << phi_rows << std::endl;
// then send remaining data to the host
int arr_size=rows_needed*phi_cols;
if (arr_size > 0) {
MPI_Send (&arr_size, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD);
// TODO
int start=0;
MPI_Send (&temp_dm.Phi.data()[start], arr_size, MPI_DOUBLE, 0, DATA_TAG, MPI_COMM_WORLD);
}
}
else {
// just fill local phi array
for (size_t s=0; s<stdb.size(); ++s) {
//std::cout << "phi_row:" << phi_row << " R avail: " << rows_available << " phi rows: " << phi_rows << std::endl;
StDescriptors st_d = dc.calc(stdb(s));
dm.build(phi_row,stdb(s),st_d); // phi_row++
}
rows_available-=rows_needed;
}
// ..copy dm.phi -> phi
// for every new structure stdb we want to calculate PHI rows
// using local phi. We have to make sure we do not exceed phi
// Also we have to keep number of filled rows which will
// serve as an index for subsequent calls.
//
// We do not want to train a model so use DesignMatrix
//
// if (phi_row == phi_rows) stop computation
if (fn)
delete fn;
}
}
std::cout << "HOW IS THIS POSSIBLE: " << b_rank << std::endl;
std::cout << "RANK" << rank << " HOST-WORKER EXIT SUCCESS" << std::endl;
// END HOST-WORKER
#endif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment