diff --git a/bin/tadah_cli.cpp b/bin/tadah_cli.cpp index 7037367cae0263351423a57284dded3dd3662f0c..0e7711b2a9da567c7b6fc894ce4a18fa2bb8ba4e 100644 --- a/bin/tadah_cli.cpp +++ b/bin/tadah_cli.cpp @@ -38,6 +38,10 @@ void TadahCLI::subcommand_train() { #ifdef TADAH_BUILD_MPI MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &ncpu); + if (ncpu<2) { + std::cout << "Minimum number of cpus for an mpi version is 2" << std::endl; + return; + } MPI_Status status; #endif @@ -91,8 +95,11 @@ void TadahCLI::subcommand_train() { NNFinder nnf(config); #ifdef TADAH_BUILD_MPI + const int RELEASE_TAG = 2; + const int DATA_TAG = 1; const int WORK_TAG = 0; + int PHI_cols; int PHI_rows; @@ -124,6 +131,7 @@ void TadahCLI::subcommand_train() { // the required function must take into considersation: // the size of the blok rnb, number of cols and rows in Phi // ideally the grid should be as square as possible, unless PHI is narrow + // Probably we want the number of column processes to be <= ~sqrt(ncpu)/cnb b_ncols = 1; b_nrows = ncpu; assert(b_nrows * b_ncols == ncpu); @@ -142,15 +150,21 @@ void TadahCLI::subcommand_train() { // int PHI_size = PHI_rows*PHI_cols; int phi_rows = numroc_( &PHI_rows, &rnb, &b_row, &izero, &b_nrows ); int phi_cols = numroc_( &PHI_cols, &cnb, &b_col, &izero, &b_ncols ); - std::cout << "BRANK: " << b_rank << " phi_row: " << phi_rows << " phi_col " << phi_cols << std::endl; - - // once we know the size of the PHI, we can assign local phi matrices - Matrix phi; - if (rank!=0) { - // allocate matrix for each worker - phi = Matrix(phi_rows,phi_cols); - } - + std::cout << "BRANK: " << b_rank << " phi_rows: " << phi_rows << " phi_col " << phi_cols << std::endl; + + size_t phi_row = 0; // next row to be filled in the local phi array + int rows_available=phi_rows; // number of available rows in the local phi array + + // once we know the size of local phi, we can allocate memory to it + // including host as well. The host will collect excees computations from + // workers + DesignMatrix<DM_Function_Base&> dm(*fb, config); + dm.Phi.resize(phi_rows,phi_cols); + //Matrix phi; + //if (rank!=0) { + // // allocate matrix for each worker + // phi = Matrix(phi_rows,phi_cols); + //} // BEGIN HOST-WORKER if (rank==0) { @@ -158,7 +172,7 @@ void TadahCLI::subcommand_train() { // prepare work packages // filename, first structure index, number of structures to read std::vector<std::tuple<std::string,int,int>> wpckgs; - int nstruc = 8; // TODO the number of structures in a single work package + int nstruc = 18; // TODO the number of structures in a single work package for (const std::string &fn : config("DBFILE")) { // get number of structures int dbsize = StructureDB::count(fn).first; @@ -181,83 +195,199 @@ void TadahCLI::subcommand_train() { if (wpckgs.size()==0) { // no more packages, just skip remaining workers - // we will release them soon... + // we will collect remaining data and release them outside of this loop break; } - // receive request from ANY worker - MPI_Recv (NULL, 0, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status); + // receive ANY request from ANY worker + int arr_size; // array size for DATA_TAG + MPI_Recv (&arr_size, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); int worker = status.MPI_SOURCE; + int tag = status.MPI_TAG; - std::tuple<std::string,int,int> wpckg = wpckgs.back(); - wpckgs.pop_back(); + if (tag==WORK_TAG) { + std::tuple<std::string,int,int> wpckg = wpckgs.back(); + wpckgs.pop_back(); - // send dataset filename - const char *fn = std::get<0>(wpckg).c_str(); - int fn_length = std::get<0>(wpckg).length()+1; // +1 for char - MPI_Send (&fn_length, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); - MPI_Send (fn, fn_length, MPI_CHAR, worker, WORK_TAG, MPI_COMM_WORLD); + // send dataset filename + const char *fn = std::get<0>(wpckg).c_str(); + int fn_length = std::get<0>(wpckg).length()+1; // +1 for char + MPI_Send (&fn_length, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); + MPI_Send (fn, fn_length, MPI_CHAR, worker, WORK_TAG, MPI_COMM_WORLD); - // send index of the first structure to load - int first = std::get<1>(wpckg); - MPI_Send (&first, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); + // send index of the first structure to load + int first = std::get<1>(wpckg); + MPI_Send (&first, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); - // send number of structures to load - int nstruc = std::get<2>(wpckg); - MPI_Send (&nstruc, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); + // send number of structures to load + int nstruc = std::get<2>(wpckg); + MPI_Send (&nstruc, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); - std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl; + std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl; + } + else if (tag==DATA_TAG) { + // TODO + int rows_needed = arr_size/phi_cols; + std::cout << "HOST received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; + rows_available -= rows_needed; + int start=phi_row*phi_cols; + phi_row+=rows_needed; + if (rows_available==0 ) { std::cout << "!!! HOST LOCAL MATRIX IS FILLED !!!" << std::endl;} + if (rows_available<0 ) { throw std::runtime_error("The number of rows in the local array is smaller than requested.");} + MPI_Recv (&dm.Phi.data()[0], arr_size, MPI_DOUBLE, worker, DATA_TAG, MPI_COMM_WORLD, &status); + } + else { + std::runtime_error("HOST: Unexpected request from " + std::to_string(worker)); + } } - // work finised, release all workers - int count=0; + // work finised, collect remaining data and release all workers + int count=1; // skip host while(true) { - MPI_Recv (NULL, 0, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + int arr_size; // array size for DATA_TAG + MPI_Recv (&arr_size, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); int worker = status.MPI_SOURCE; - MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD); - count++; - if (count==ncpu-1) break; + int tag = status.MPI_TAG; + + if (tag==DATA_TAG) { + // TODO + int rows_needed = arr_size/phi_cols; + std::cout << "HOST received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; + rows_available -= rows_needed; + int start=phi_row*phi_cols; + phi_row+=rows_needed; + if (rows_available==0 ) { std::cout << "!!! HOST LOCAL MATRIX IS FILLED !!!" << std::endl;} + if (rows_available<0 ) { throw std::runtime_error("The number of rows in the local array is smaller than requested.");} + MPI_Recv (&dm.Phi.data()[0], arr_size, MPI_DOUBLE, worker, DATA_TAG, MPI_COMM_WORLD, &status); + } + + else { + // there is no more work so release a worker + MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD); + count++; + if (count==ncpu) { break; } + } + } - } else { + std::cout << "HOST: " << "r_avail: " << rows_available << std::endl; + std::cout << "HOST EXIT" << std::endl; + } + else { // worker int fn_length; - int first; - int nstruc; + int first; // index of the first structure to read from the file + int nstruc; // number of structures to be processed + while (true) { // ask for more work... - MPI_Send (NULL, 0, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD); - std::cout << "WORKER NUM: " << b_rank << " waiting..." << std::endl; + MPI_Send (NULL, NULL, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD); // get either work or release signal MPI_Recv (&fn_length, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); - std::cout << "WORKER NUM: " << b_rank << " got message!" << std::endl; - std::cout << "Lenght of char fn: " << fn_length << std::endl; - std::cout << "MPI_TAG: " << status.MPI_TAG << std::endl; + + // if release tag but the worker's local phi is not full + // the workers sends request to the host for remaining data // release worker if (status.MPI_TAG == RELEASE_TAG) { std::cout << "WORKER: " << b_rank << "RELEASE TAG" << std::endl; + if (rows_available>0 ) { throw std::runtime_error("Attempting to release worker... but the workers requires more data !!");} break; } - // otherwise do work + // otherwise get work package char *fn = (char *) malloc(fn_length+1); MPI_Recv (fn, fn_length, MPI_CHAR, 0, WORK_TAG, MPI_COMM_WORLD, &status); MPI_Recv (&first, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); MPI_Recv (&nstruc, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); - std::cout << "WORKER: " << fn << " " << first << " " << nstruc << std::endl; + std::cout << "WORKER: " << b_rank << "file: " << fn << " first: " << first << " nstruc: " << nstruc << std::endl; + // do work StructureDB stdb; stdb.add(std::string(fn,fn_length),first,nstruc); - std::cout << "WORKER: " << b_rank << stdb << std::endl; + std::cout << "WORKER: " << b_rank << " stdb.size(): " << stdb.size() << std::endl; nnf.calc(stdb); + // temp solution is to compute temp_phi inside DM and then copy data local phi matrix + //Config c = config; + //DesignMatrix<DM_Function_Base&> dm(*fb, c); + //Normaliser norm(c); // <-- we don't want this + //dm.build(stdb,norm,dc); // this is just for resizing for now + + // size_t phi_row = 0; // local just for now, this is of course wrong + + // one way to find out what we need in terms of phi size + + int rows_needed = 0; + for (size_t s=0; s<stdb.size(); ++s) { + int natoms = stdb(s).natoms(); + rows_needed += DesignMatrixBase::phi_rows_num(config, 1, natoms); + } + + + std:: cout << "WORKER: " << b_rank << " R avail: " << rows_available << " R needed: " << rows_needed << std::endl; + if ((rows_available-rows_needed)<0) { + // we do not have enough rows in the local phi matrix + // so we create temp DM of required size + DesignMatrix<DM_Function_Base&> temp_dm(*fb, config); + temp_dm.Phi.resize(rows_needed,phi_cols); + + // and compute all rows + size_t temp_phi_row=0; + for (size_t s=0; s<stdb.size(); ++s) { + StDescriptors st_d = dc.calc(stdb(s)); + temp_dm.build(temp_phi_row,stdb(s),st_d); // phi_row++ + } + + // first we try to fill remaining rows in the local phi matrix + if (rows_available>0) { + for (size_t r=0; r<rows_available; r++) { + for (size_t c=0; c<phi_cols; c++) { + dm.Phi(phi_row,c) = temp_dm.Phi(r,c); + } + phi_row++; + rows_needed--; + } + } + // there are no more available rows + rows_available=0; + std::cout << "FULL WORKER: " << b_rank << " phi_row: " << phi_row << " phi_rows: " << phi_rows << std::endl; + // then send remaining data to the host + int arr_size=rows_needed*phi_cols; + if (arr_size > 0) { + MPI_Send (&arr_size, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD); + // TODO + int start=0; + MPI_Send (&temp_dm.Phi.data()[start], arr_size, MPI_DOUBLE, 0, DATA_TAG, MPI_COMM_WORLD); + } + + } + else { + // just fill local phi array + for (size_t s=0; s<stdb.size(); ++s) { + //std::cout << "phi_row:" << phi_row << " R avail: " << rows_available << " phi rows: " << phi_rows << std::endl; + StDescriptors st_d = dc.calc(stdb(s)); + dm.build(phi_row,stdb(s),st_d); // phi_row++ + } + rows_available-=rows_needed; + } + + // ..copy dm.phi -> phi + + // for every new structure stdb we want to calculate PHI rows + // using local phi. We have to make sure we do not exceed phi + // Also we have to keep number of filled rows which will + // serve as an index for subsequent calls. + // + // We do not want to train a model so use DesignMatrix + // + // if (phi_row == phi_rows) stop computation + if (fn) delete fn; } } - std::cout << "HOW IS THIS POSSIBLE: " << b_rank << std::endl; - + std::cout << "RANK" << rank << " HOST-WORKER EXIT SUCCESS" << std::endl; // END HOST-WORKER #endif