Skip to content
Snippets Groups Projects
Commit 2310db26 authored by Marcin Kirsz's avatar Marcin Kirsz
Browse files

Update

parent fa3041cd
No related branches found
No related tags found
2 merge requests!5MPI version of tadah,!3MPI version of Tadah
Pipeline #43169 failed
......@@ -21,26 +21,6 @@
#include <stdexcept>
#include <chrono>
#ifdef TADAH_BUILD_MPI
extern "C" void blacs_get_(int*, int*, int*);
extern "C" void blacs_pinfo_(int*, int*);
extern "C" void blacs_gridinit_(int*, char*, int*, int*);
extern "C" void blacs_gridinfo_(int*, int*, int*, int*, int*);
extern "C" void descinit_(int*, int*, int*, int*, int*,
int*, int*, int*, int*, int*);
extern "C" void pdpotrf_(char*, int*, double*, int*, int*, int*, int*);
extern "C" void blacs_gridexit_(int*);
extern "C" int numroc_(int*, int*, int*, int*, int*);
extern "C" void pdgels_(char* trans, int* m, int* n, int* nrhs,
double* a, int* ia, int* ja, int* desca, double* b, int* ib,
int* jb, int* descb, double* work, int* lwork, int* info);
extern "C" void pdgemr2d_(int *m, int *n, double *a, int *ia, int *ja, int *desca,
double *b, int *ib, int *jb, int *descb, int *context);
extern "C" void pdgemv_(char* transa, int* m, int* n, double* alpha, double* a,
int* ia, int* ja, int* desc_a, double* x, int* ix, int* jx, int* desc_x,
int* incx, double* beta, double* y, int* iy, int* jy, int* desc_y, int* incy);
#endif
void TadahCLI::subcommand_train() {
int rank = 0;
......@@ -88,216 +68,51 @@ void TadahCLI::subcommand_train() {
if(train->count("--verbose"))
set_verbose();
Trainer tr(config_file);
Config config(config_file);
if(train->count("--Force")) {
tr.config.remove("FORCE");
tr.config.add("FORCE", "true");
config.remove("FORCE");
config.add("FORCE", "true");
}
if(train->count("--Stress")) {
tr.config.remove("STRESS");
tr.config.add("STRESS", "true");
config.remove("STRESS");
config.add("STRESS", "true");
}
if (rank==0)
if (is_verbose()) std::cout << "Training mode" << std::endl;
MPI_Trainer tr(config);
#ifdef TADAH_BUILD_MPI
int PHI_cols;
int PHI_rows;
if (rank==0) {
int nstruct_tot = StructureDB::count(tr.config).first;
int natoms_tot = StructureDB::count(tr.config).second;
PHI_cols = tr.fb->get_phi_cols(tr.config);
PHI_rows = DesignMatrixBase::phi_rows_num(tr.config, nstruct_tot, natoms_tot);
}
MPI_Bcast(&PHI_rows, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&PHI_cols, 1, MPI_INT, 0, MPI_COMM_WORLD);
// Initialize BLACS
// We create two contexts.
// context1 is used for the computation of phi matrices
// context2 is used for distribution of local phi to "block cyclic phi"
int b_rank;
blacs_pinfo_(&b_rank, &ncpu) ; // BLACS rank and world size
int context1, context2;
int b_row1, b_row2;
int b_col1, b_col2;
int b_nrows1, b_nrows2; // Number of row procs
int b_ncols1, b_ncols2; // Number of column procs
int rnb1 = ceil(PHI_rows/ncpu), rnb2 = tr.config.get<int>("MBLOCK"); // Row block size
int cnb1 = PHI_cols, cnb2 = tr.config.get<int>("NBLOCK"); // Column block size
b_ncols1 = 1; b_ncols2 = 2;
b_nrows1 = ncpu; b_nrows2 = ncpu/b_ncols2;
// make as sqaure grid as possible
int sr = sqrt(ncpu);
if (sr*sr==ncpu) {
b_nrows2 = sr;
b_ncols2 = sr;
}
else {
// loop over all possible divisors
int before, /*sqrt(ncpu),*/ after;
for (int i = 1; i <= ncpu; ++i){
if (ncpu % i == 0) {
if (i>sqrt(ncpu)) {
after=i;
break;
}
before=i;
}
}
b_nrows2 = after;
b_ncols2 = before;
}
assert(b_nrows2 * b_ncols2 == ncpu);
assert(b_nrows1 * b_ncols1 == ncpu);
int izero = 0;
int ione = 1;
char layout='R'; // Block cyclic, Row major processor mapping
// Create first context
blacs_get_(&izero,&izero, &context1 ); // -> Create context1
blacs_gridinit_(&context1, &layout, &b_nrows1, &b_ncols1 ); // context1 -> Initialize the grid
blacs_gridinfo_(&context1, &b_nrows1, &b_ncols1, &b_row1, &b_col1 );
// Create second context
blacs_get_(&izero,&izero, &context2 ); // -> Create context2
blacs_gridinit_(&context2, &layout, &b_nrows2, &b_ncols2 ); // context2 -> Initialize the grid
blacs_gridinfo_(&context2, &b_nrows2, &b_ncols2, &b_row2, &b_col2 );
// Compute the size of the local phi matrices
int phi_rows1 = numroc_( &PHI_rows, &rnb1, &b_row1, &izero, &b_nrows1 );
int phi_cols1 = numroc_( &PHI_cols, &cnb1, &b_col1, &izero, &b_ncols1 );
int phi_rows2 = numroc_( &PHI_rows, &rnb2, &b_row2, &izero, &b_nrows2 );
int phi_cols2 = numroc_( &PHI_cols, &cnb2, &b_col2, &izero, &b_ncols2 );
// Define MPI datatype to send rows from the PHI matrix with column-major order
// used only in context1
MPI_Datatype rowvec, rowvecs;
MPI_Type_vector( phi_cols1, 1, phi_rows1, MPI_DOUBLE, &rowvec);
MPI_Type_commit(&rowvec);
MPI_Type_create_resized(rowvec, 0, sizeof(double), &rowvecs);
MPI_Type_commit(&rowvecs);
// COUNTERS
size_t phi_row = 0; // next row to be filled in the local phi array
int rows_available=phi_rows1; // number of available rows in the local phi array
// once we know the size of local phi, we can allocate memory to it
// including host as well. The host will collect excees computations from
// workers.
//DesignMatrix<DM_Function_Base&> dm(*tr.fb, tr.config);
tr.dm.Phi.resize(phi_rows1,phi_cols1);
tr.dm.T.resize(phi_rows1);
tr.dm.Tlabels.resize(phi_rows1);
tr.init(rank, ncpu);
// BEGIN HOST-WORKER
if (rank==0) {
// HOST: prepare work packages
// filename, first structure index, number of structures to read
std::vector<std::tuple<std::string,int,int>> wpckgs;
int nstruc = tr.config.get<int>("MPIWPCKG");
for (const std::string &fn : tr.config("DBFILE")) {
// get number of structures
int dbsize = StructureDB::count(fn).first;
int first=0;
while(true) {
if (nstruc < dbsize) {
wpckgs.push_back(std::make_tuple(fn,first,nstruc));
first += nstruc;
} else {
wpckgs.push_back(std::make_tuple(fn,first,dbsize));
break;
}
dbsize-=nstruc;
}
}
// HOST is waiting for workers requests
MPI_Trainer_HOST host(tr, status, rank, ncpu);
host.prep_wpckgs();
while (true) {
if (wpckgs.size()==0) {
if (!host.has_packages()) {
// no more packages, just skip remaining workers
// we will collect remaining data and release them outside of this loop
break;
}
// probe ANY request from ANY worker
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
int worker = status.MPI_SOURCE;
int tag = status.MPI_TAG;
if (tag==WORK_TAG) {
int rows_available;
MPI_Recv (&rows_available, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status);
std::tuple<std::string,int,int> wpckg = wpckgs.back();
wpckgs.pop_back();
// send dataset filename
const char *fn = std::get<0>(wpckg).c_str();
int fn_length = std::get<0>(wpckg).length()+1; // +1 for char
MPI_Send (fn, fn_length, MPI_CHAR, worker, tag, MPI_COMM_WORLD);
// send index of the first structure to load
int first = std::get<1>(wpckg);
MPI_Send (&first, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
// send number of structures to load
int nstruc = std::get<2>(wpckg);
MPI_Send (&nstruc, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
if (tag==TadahCLI::WORK_TAG) {
host.work_tag(worker, tag);
}
else if (tag==DATA_TAG) {
int rows_needed;
MPI_Recv (&rows_needed, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status);
if (rows_available>0) {
int rows_accepted = rows_available < rows_needed ? rows_available : rows_needed;
MPI_Send (&b_rank, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
MPI_Recv (&tr.dm.Phi.data()[phi_row], rows_accepted, rowvecs, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.T.data()[phi_row], rows_accepted, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.Tlabels.data()[phi_row], rows_accepted, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
rows_available -= rows_accepted;
phi_row += rows_accepted;
if (rows_available<0 ) { throw std::runtime_error(" HOST1: The number of rows in the local array is smaller than requested.");}
}
else {
// Host is unable to fit data we have to ask workers for their storage availability
// Locate a worker willing to accept at least some data.
int worker2;
MPI_Status status2;
// find a worker capable of accepting data
int w_rows_available;
while (true) {
MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status2);
worker2 = status2.MPI_SOURCE;
if (worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); }
if (w_rows_available==0 ) {
// give up on this worker
MPI_Send (&worker2, 1, MPI_INT, worker2, WAIT_TAG, MPI_COMM_WORLD);
}
else {
// found a worker
break;
}
}
int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed;
// tell worker about worker2
MPI_Send (&worker2, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
}
else if (tag==TadahCLI::DATA_TAG) {
int dummy=0;
host.data_tag(worker, tag, dummy);
}
else {
throw std::runtime_error("HOST1: Unexpected request from " + std::to_string(worker));
throw std::runtime_error("HOST1: Unexpected request from "
+ std::to_string(worker));
}
}
......@@ -307,58 +122,11 @@ void TadahCLI::subcommand_train() {
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
int worker = status.MPI_SOURCE;
int tag = status.MPI_TAG;
if (tag==DATA_TAG) {
int rows_needed;
MPI_Recv (&rows_needed, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status);
if (rows_available>0) {
int rows_accepted = rows_available < rows_needed ? rows_available : rows_needed;
MPI_Send (&b_rank, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD);
MPI_Recv (&tr.dm.Phi.data()[phi_row], rows_accepted, rowvecs, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.T.data()[phi_row], rows_accepted, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.Tlabels.data()[phi_row], rows_accepted, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
rows_available -= rows_accepted;
phi_row += rows_accepted;
if (rows_available<0 ) { throw std::runtime_error(" HOST2: The number of rows in the local array is smaller than requested.");}
}
else {
// host is unable to fit data we have to ask workers for their storage availability
// find a worker to accept at least some data
MPI_Status status2;
int worker2;
// find a worker capable of accepting data
int w_rows_available;
while (true) {
MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status2);
worker2 = status2.MPI_SOURCE;
if (worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); }
if (w_rows_available==0 ) {
// give up on this worker and release him as there is no more work to be done
MPI_Send (0, 0, MPI_INT, worker2, RELEASE_TAG, MPI_COMM_WORLD);
count++;
}
else {
// found a worker
break;
}
}
int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed;
MPI_Send (&worker2, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD);
MPI_Send (&rows_accepted, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD);
}
if (tag==TadahCLI::DATA_TAG) {
host.data_tag(worker, tag, count);
}
else {
int rows_available;
MPI_Recv (&rows_available, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD, &status);
// there is no more work so release a worker
if (rows_available==0) {
MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD);
count++;
}
else {
MPI_Send (0, 0, MPI_INT, worker, WAIT_TAG, MPI_COMM_WORLD);
}
host.b(worker, tag, count);
}
if (count==ncpu) { break; } // count starts from 1
}
......@@ -366,7 +134,7 @@ void TadahCLI::subcommand_train() {
else { // WORKER
while (true) {
// ask for more work...
MPI_Send (&rows_available, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD);
MPI_Send (&tr.rows_available, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD);
// request from root or from other workers
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
......@@ -377,7 +145,7 @@ void TadahCLI::subcommand_train() {
if (tag == TadahCLI::RELEASE_TAG) {
int temp;
MPI_Recv (&temp, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status);
if (rows_available!=0) { throw std::runtime_error("Attempting to release a worker... but the worker requires more data!!");}
if (tr.rows_available!=0) { throw std::runtime_error("Attempting to release a worker... but the worker requires more data!!");}
break;
}
else if (tag == TadahCLI::WAIT_TAG) {
......@@ -389,16 +157,16 @@ void TadahCLI::subcommand_train() {
// other worker is giving me some data
int arr_size;
MPI_Get_count(&status, MPI_DOUBLE, &arr_size);
int rows_accepted = arr_size/phi_cols1;
if (rows_available<rows_accepted) { throw std::runtime_error("Number of rows available is smaller than number of provided rows");}
MPI_Recv (&tr.dm.Phi.data()[phi_row], rows_available, rowvecs, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.T.data()[phi_row], rows_available, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.Tlabels.data()[phi_row], rows_available, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
rows_available -= rows_accepted;
phi_row += rows_accepted;
int rows_accepted = arr_size/tr.phi_cols1;
if (tr.rows_available<rows_accepted) { throw std::runtime_error("Number of rows available is smaller than number of provided rows");}
MPI_Recv (&tr.dm.Phi.data()[tr.phi_row], tr.rows_available, tr.rowvecs, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.T.data()[tr.phi_row], tr.rows_available, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
MPI_Recv (&tr.dm.Tlabels.data()[tr.phi_row], tr.rows_available, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status);
tr.rows_available -= rows_accepted;
tr.phi_row += rows_accepted;
}
else if (tag == TadahCLI::WORK_TAG) {
tr.train(status, rows_available, phi_row, phi_cols1, rowvecs, worker);
tr.calc_desc(status, tr.rows_available, tr.phi_row);
}
}
}
......@@ -410,8 +178,8 @@ void TadahCLI::subcommand_train() {
int descB[9], descB2[9];
int info, info2;
descinit_( descPHI, &PHI_rows, &PHI_cols, &rnb1, &cnb1, &izero, &izero, &context1, /*leading dimension*/&phi_rows1, &info);
descinit_( descPHI2, &PHI_rows, &PHI_cols, &rnb2, &cnb2, &izero, &izero, &context2, /*leading dimension*/&phi_rows2, &info2);
descinit_( descPHI, &tr.PHI_rows, &tr.PHI_cols, &tr.rnb1, &tr.cnb1, &tr.izero, &tr.izero, &tr.context1, /*leading dimension*/&tr.phi_rows1, &info);
descinit_( descPHI2, &tr.PHI_rows, &tr.PHI_cols, &tr.rnb2, &tr.cnb2, &tr.izero, &tr.izero, &tr.context2, /*leading dimension*/&tr.phi_rows2, &info2);
if(info != 0) {
printf("Error in descinit 1a, info = %d\n", info);
......@@ -421,8 +189,8 @@ void TadahCLI::subcommand_train() {
printf("HINT: Check these CONFIG parameters: MPIWPCKG, MBLOCK, NBLOCK\n");
}
descinit_( descB, &PHI_rows, &ione, &rnb1, &cnb1, &izero, &izero, &context1, /*leading dimension*/&phi_rows1, &info);
descinit_( descB2, &PHI_rows, &ione, &rnb2, &cnb2, &izero, &izero, &context2, /*leading dimension*/&phi_rows2, &info2);
descinit_( descB, &tr.PHI_rows, &tr.ione, &tr.rnb1, &tr.cnb1, &tr.izero, &tr.izero, &tr.context1, /*leading dimension*/&tr.phi_rows1, &info);
descinit_( descB2, &tr.PHI_rows, &tr.ione, &tr.rnb2, &tr.cnb2, &tr.izero, &tr.izero, &tr.context2, /*leading dimension*/&tr.phi_rows2, &info2);
if(info != 0) {
printf("Error in descinit 1b, info = %d\n", info);
......@@ -448,15 +216,15 @@ void TadahCLI::subcommand_train() {
// Distribute data in 2D block cyclic
DesignMatrix<DM_Function_Base&> dm2(*tr.fb, tr.config);
dm2.Phi.resize(phi_rows2,phi_cols2);
dm2.T.resize(phi_rows2);
dm2.Tlabels.resize(phi_rows2);
dm2.Phi.resize(tr.phi_rows2,tr.phi_cols2);
dm2.T.resize(tr.phi_rows2);
dm2.Tlabels.resize(tr.phi_rows2);
pdgemr2d_(&PHI_rows, &PHI_cols, tr.dm.Phi.ptr(), &ione, &ione, descPHI,
dm2.Phi.ptr(), &ione, &ione, descPHI2, &context2);
pdgemr2d_(&tr.PHI_rows, &tr.PHI_cols, tr.dm.Phi.ptr(), &tr.ione, &tr.ione, descPHI,
dm2.Phi.ptr(), &tr.ione, &tr.ione, descPHI2, &tr.context2);
pdgemr2d_(&PHI_rows, &ione, tr.dm.T.ptr(), &ione, &ione, descB,
dm2.T.ptr(), &ione, &ione, descB2, &context2);
pdgemr2d_(&tr.PHI_rows, &tr.ione, tr.dm.T.ptr(), &tr.ione, &tr.ione, descB,
dm2.T.ptr(), &tr.ione, &tr.ione, descB2, &tr.context2);
// make a copy of dm.Phi and dm2.Phi
//Matrix Phi_cpy = dm.Phi;
......@@ -479,7 +247,7 @@ void TadahCLI::subcommand_train() {
//std::cout << "phi_rows1, phi_cols1: " << phi_rows1 << ", " << phi_cols1<< std::endl;
//std::cout << "phi_rows2, phi_cols2: " << phi_rows2 << ", " << phi_cols2<< std::endl;
pdgels_(&trans, &PHI_rows, &PHI_cols, &nrhs, dm2.Phi.ptr(), &ia, &ja, descPHI2, b2, &ib, &jb, descB2, &wkopt2, &lwork2, &info2);
pdgels_(&trans, &tr.PHI_rows, &tr.PHI_cols, &nrhs, dm2.Phi.ptr(), &ia, &ja, descPHI2, b2, &ib, &jb, descB2, &wkopt2, &lwork2, &info2);
if (info2 != 0) {
printf("Error in pdgels, info = %d\n", info);
}
......@@ -495,13 +263,13 @@ void TadahCLI::subcommand_train() {
// std::cout << std::endl;
//}
double *work2 = new double[lwork2];
pdgels_(&trans, &PHI_rows, &PHI_cols, &nrhs, dm2.Phi.ptr(), &ia, &ja, descPHI2, b2, &ib, &jb, descB2, work2, &lwork2, &info2);
pdgels_(&trans, &tr.PHI_rows, &tr.PHI_cols, &nrhs, dm2.Phi.ptr(), &ia, &ja, descPHI2, b2, &ib, &jb, descB2, work2, &lwork2, &info2);
if (rank==0) {
//std::cout << "---b2 vec: rank: " << rank << " ";
//for (int i=0;i<phi_cols2;++i) std::cout << b2[i] << " ";
//std::cout << std::endl;
t_type w(b2,phi_cols2);
t_type w(b2,tr.phi_cols2);
//w.resize(phi_cols2);
//for (int i=0;i<phi_cols2;++i) w[i] = b2[i];
tr.model->set_weights(w);
......@@ -546,11 +314,11 @@ void TadahCLI::subcommand_train() {
//delete[] work;
delete[] work2;
MPI_Type_free(&rowvec);
MPI_Type_free(&rowvecs);
MPI_Type_free(&tr.rowvec);
MPI_Type_free(&tr.rowvecs);
blacs_gridexit_(&context1);
blacs_gridexit_(&context2);
blacs_gridexit_(&tr.context1);
blacs_gridexit_(&tr.context2);
#else // NON MPI VERSION
if (is_verbose()) std::cout << "Loading structures..." << std::flush;
......@@ -964,6 +732,3 @@ void TadahCLI::run(int argc, char** argv) {
subcommand_predict();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment