diff --git a/bin/tadah_cli.cpp b/bin/tadah_cli.cpp index 3f3444b348065539b945c53776cf731b6245028e..497aeea6e97200c81501b0f5def015f64980893c 100644 --- a/bin/tadah_cli.cpp +++ b/bin/tadah_cli.cpp @@ -19,7 +19,6 @@ #include <iostream> #include <stdexcept> #include <chrono> -#include <thread> #ifdef TADAH_BUILD_MPI extern "C" void blacs_get_(int*, int*, int*); @@ -152,13 +151,13 @@ void TadahCLI::subcommand_train() { int b_ncols1; // Number of column procs int b_nrows2; // Number of row procs int b_ncols2; // Number of column procs - int rnb1 = 1; // Row block size TODO + int rnb1 = 50; // Row block size for context1 does not really matter int rnb2 = 1; // Row block size TODO - int cnb1 = PHI_cols; // Column block size TODO + int cnb1 = PHI_cols; // Column block size for context one is fixed int cnb2 = 1; // Column block size TODO blacs_pinfo_(&b_rank, &ncpu) ; // BLACS rank and world size - //std::cout << "RANK: " << rank << " BRANK: " << b_rank << " ncpu: " << ncpu << std::endl; + //std::cout << "RANK: " << rank << " BRANK: " << b_rank << " ncpu: " << ncpu << std::endl; b_ncols1 = 1; b_nrows1 = ncpu; @@ -178,29 +177,30 @@ void TadahCLI::subcommand_train() { blacs_get_(&izero,&izero, &context1 ); // -> Create context1 blacs_gridinit_(&context1, &layout, &b_nrows1, &b_ncols1 ); // context1 -> Initialize the grid blacs_gridinfo_(&context1, &b_nrows1, &b_ncols1, &b_row1, &b_col1 ); - // + // // Create second context blacs_get_(&izero,&izero, &context2 ); // -> Create context2 blacs_gridinit_(&context2, &layout2, &b_nrows2, &b_ncols2 ); // context1 -> Initialize the grid blacs_gridinfo_(&context2, &b_nrows2, &b_ncols2, &b_row2, &b_col2 ); - //std::cout << "BRANK: " << b_rank << " BROW: " << b_row1 << " BCOL: " << b_col1 << std::endl; - printf( "rank = %d, nprow = %d, npcol = %d, myrow = %d, mycol = %d\n", rank, b_nrows1, b_ncols1, b_row1, b_col1 ); + // printf( "context= %d, rank = %d, nprow = %d, npcol = %d, myrow = %d, mycol = %d\n", context1, rank, b_nrows1, b_ncols1, b_row1, b_col1 ); + // printf( "context= %d, rank = %d, nprow = %d, npcol = %d, myrow = %d, mycol = %d\n", context2, rank, b_nrows2, b_ncols2, b_row2, b_col2 ); - std::cout << "izero: " << izero << " context1: " << context1 << std::endl; - std::cout << "ione: " << ione << " context2: " << context2 << std::endl; + // std::cout << "izero: " << izero << " context1: " << context1 << std::endl; + // std::cout << "ione: " << ione << " context2: " << context2 << std::endl; // Compute the size of the local phi matrices // int PHI_size = PHI_rows*PHI_cols; int phi_rows = numroc_( &PHI_rows, &rnb1, &b_row1, &izero, &b_nrows1 ); - int phi_rows2 = numroc_( &PHI_rows, &rnb2, &b_row2, &izero, &b_nrows2 ); int phi_cols = numroc_( &PHI_cols, &cnb1, &b_col1, &izero, &b_ncols1 ); + int phi_rows2 = numroc_( &PHI_rows, &rnb2, &b_row2, &izero, &b_nrows2 ); int phi_cols2 = numroc_( &PHI_cols, &cnb2, &b_col2, &izero, &b_ncols2 ); - std::cout << "BRANK: " << b_rank << " phi_rows: " << phi_rows << " phi_col " << phi_cols << std::endl; + // std::cout << "BRANK: " << b_rank << " phi_rows1: " << phi_rows << " phi_col1 " << phi_cols << std::endl; + // std::cout << "BRANK: " << b_rank << " phi_rows2: " << phi_rows2 << " phi_col2 " << phi_cols2 << std::endl; // Define MPI datatype to send rows from the PHI matrix with column-major order MPI_Datatype rowvec, rowvecs; MPI_Type_vector( phi_cols, 1, phi_rows, MPI_DOUBLE, &rowvec); MPI_Type_commit(&rowvec); - MPI_Type_create_resized(rowvec, 0, 1*sizeof(double), &rowvecs); + MPI_Type_create_resized(rowvec, 0, sizeof(double), &rowvecs); MPI_Type_commit(&rowvecs); // COUNTERS @@ -218,11 +218,11 @@ void TadahCLI::subcommand_train() { // HOST: prepare work packages // filename, first structure index, number of structures to read std::vector<std::tuple<std::string,int,int>> wpckgs; - int nstruc = 10; // TODO the number of structures in a single work package + int nstruc = 10; // TODO: read from Config, the number of structures in a single work package for (const std::string &fn : config("DBFILE")) { // get number of structures int dbsize = StructureDB::count(fn).first; - std::cout << "DBSIZE: " << dbsize << std::endl; + // std::cout << "DBSIZE: " << dbsize << std::endl; int first=0; while(true) { if (nstruc < dbsize) { @@ -249,11 +249,11 @@ void TadahCLI::subcommand_train() { MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); int worker = status.MPI_SOURCE; int tag = status.MPI_TAG; - std::cout << "HOST1 request from WORKER: " << worker << " TAG: " << tag << std::endl; + // std::cout << "HOST1 request from WORKER: " << worker << " TAG: " << tag << std::endl; if (tag==WORK_TAG) { - int wsize; // array size for DATA_TAG, rows_available for WORK_TAG - MPI_Recv (&wsize, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); + int rows_available; + MPI_Recv (&rows_available, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); std::tuple<std::string,int,int> wpckg = wpckgs.back(); wpckgs.pop_back(); @@ -261,7 +261,6 @@ void TadahCLI::subcommand_train() { // send dataset filename const char *fn = std::get<0>(wpckg).c_str(); int fn_length = std::get<0>(wpckg).length()+1; // +1 for char - MPI_Send (&fn_length, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); MPI_Send (fn, fn_length, MPI_CHAR, worker, tag, MPI_COMM_WORLD); // send index of the first structure to load @@ -272,420 +271,426 @@ void TadahCLI::subcommand_train() { int nstruc = std::get<2>(wpckg); MPI_Send (&nstruc, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); - std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl; - } - else if (tag==DATA_TAG) { - int rows_needed; - MPI_Recv (&rows_needed, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - if (rows_available>0) { - std::cout << "HOST1 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; - int rows_accepted = rows_available < rows_needed ? rows_available : rows_needed; - MPI_Send (&b_rank, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); - MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); - - // this is actually necesarry for a worker to know number of rows - //MPI_Recv (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - - // get data... - //MPI_Probe(worker, tag, MPI_COMM_WORLD, &status); - //int arr_size = rows_accepted*phi_cols; - //int arr_size2; - //MPI_Get_count(&status, MPI_DOUBLE, &arr_size2); - //if (arr_size2!=arr_size) { throw std::runtime_error("arr_size != arr_size2\n");} - //MPI_Recv (&dm.Phi.data()[phi_row], arr_size, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status); - MPI_Recv (&dm.Phi.data()[phi_row], rows_accepted, rowvecs, worker, tag, MPI_COMM_WORLD, &status); - //for (int i=0; i<rows_accepted;++i) - // MPI_Recv (&dm.Phi.data()[phi_row+i], 1, rowvec, worker, tag, MPI_COMM_WORLD, &status); - rows_available -= rows_accepted; - phi_row += rows_accepted; - if (rows_available==0 ) { std::cout << "!!! HOST1 LOCAL MATRIX IS FILLED !!!" << std::endl;} - if (rows_available<0 ) { throw std::runtime_error(" HOST1: The number of rows in the local array is smaller than requested.");} - + // std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl; } - else { - std::cout << "HOST1 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; - // host is unable to fit data we have to ask workers for their storage availability - //MPI_Probe(MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status); - int w_rows_available; - // find a worker to accept at least some data - MPI_Status status2; - int worker2; - // find a worker capable of accepting data - while (true) { - std::cout << "HOST1 searching for a WORKER!" << std::endl; - MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status2); - worker2 = status2.MPI_SOURCE; - if (worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); } - if (w_rows_available==0 ) { - // give up on this worker - MPI_Send (&worker2, 1, MPI_INT, worker2, WAIT_TAG, MPI_COMM_WORLD); - std::cout << "HOST1 skipping a WORKER!" << std::endl; - } - else { - std::cout << "HOST1 found a WORKER: " << worker2 << " WORKER NEEDS: " << w_rows_available << std::endl; - break; - } + else if (tag==DATA_TAG) { + int rows_needed; + MPI_Recv (&rows_needed, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); + if (rows_available>0) { + // std::cout << "HOST1 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; + int rows_accepted = rows_available < rows_needed ? rows_available : rows_needed; + MPI_Send (&b_rank, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); + MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); + + MPI_Recv (&dm.Phi.data()[phi_row], rows_accepted, rowvecs, worker, tag, MPI_COMM_WORLD, &status); + rows_available -= rows_accepted; + phi_row += rows_accepted; + if (rows_available==0 ) { std::cout << "!!! HOST1 LOCAL MATRIX IS FILLED !!!" << std::endl;} + if (rows_available<0 ) { throw std::runtime_error(" HOST1: The number of rows in the local array is smaller than requested.");} + } + else { + // std::cout << "HOST1 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; + // host is unable to fit data we have to ask workers for their storage availability + int w_rows_available; + // find a worker to accept at least some data + MPI_Status status2; + int worker2; + // find a worker capable of accepting data + while (true) { + // std::cout << "HOST1 searching for a WORKER!" << std::endl; + MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status2); + worker2 = status2.MPI_SOURCE; + if (worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); } + if (w_rows_available==0 ) { + // give up on this worker + MPI_Send (&worker2, 1, MPI_INT, worker2, WAIT_TAG, MPI_COMM_WORLD); + // std::cout << "HOST1 skipping a WORKER!" << std::endl; + } + else { + // std::cout << "HOST1 found a WORKER: " << worker2 << " WORKER NEEDS: " << w_rows_available << std::endl; + break; + } + } - int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed; + int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed; - MPI_Send (&worker2, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD); - MPI_Send (&rows_accepted, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD); + MPI_Send (&worker2, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); + MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); + } + } + else { + throw std::runtime_error("HOST: Unexpected request from " + std::to_string(worker)); } } - else { - throw std::runtime_error("HOST: Unexpected request from " + std::to_string(worker)); - } - } - std::cout << "---------LOOP 1 FINISHED---------" << std::endl; - - // work finised, collect remaining data and release all workers - int count=1; // count number of release workers, skip host - while(true) { - int wsize; // array size for DATA_TAG, rows_available for WORK_TAG - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); - int worker = status.MPI_SOURCE; - int tag = status.MPI_TAG; - - if (tag==DATA_TAG) { - int rows_needed; - MPI_Recv (&rows_needed, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - if (rows_available>0) { - std::cout << "HOST1 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; - int rows_accepted = rows_available < rows_needed ? rows_available : rows_needed; - MPI_Send (&b_rank, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); - MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); - //MPI_Recv (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - // this is actually necesarry for a worker to know number of rows - //MPI_Recv (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - - // get data... - //MPI_Probe(worker, tag, MPI_COMM_WORLD, &status); - //int arr_size = rows_accepted*phi_cols; - //int arr_size2; - //MPI_Get_count(&status, MPI_DOUBLE, &arr_size2); - //if (arr_size2!=arr_size) { throw std::runtime_error("arr_size != arr_size2\n");} - MPI_Recv (&dm.Phi.data()[phi_row], rows_accepted, rowvecs, worker, tag, MPI_COMM_WORLD, &status); - //for (int i=0; i<rows_accepted;++i) - // MPI_Recv (&dm.Phi.data()[phi_row+i], 1, rowvec, worker, tag, MPI_COMM_WORLD, &status); - //MPI_Recv (&dm.Phi.data()[phi_row], arr_size, MPI_DOUBLE, worker, tag, MPI_COMM_WORLD, &status); + std::cout << "---------LOOP 1 FINISHED---------" << std::endl; - rows_available -= rows_accepted; - phi_row += rows_accepted; - //int start=phi_row*phi_cols; - if (rows_available==0 ) { std::cout << "!!! HOST1 LOCAL MATRIX IS FILLED !!!" << std::endl;} - if (rows_available<0 ) { throw std::runtime_error(" HOST1: The number of rows in the local array is smaller than requested.");} + // work finised, collect remaining data and release all workers + int count=1; // count number of release workers, skip host + while(true) { + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + int worker = status.MPI_SOURCE; + int tag = status.MPI_TAG; - } - else { - std::cout << "HOST2 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; - // host is unable to fit data we have to ask workers for their storage availability - //MPI_Probe(MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status); - int w_rows_available; - // find a worker to accept at least some data - MPI_Status status2; - int worker2; - - // find a worker capable of accepting data - while (true) { - std::cout << "HOST2 searching for a WORKER!" << std::endl; - MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status2); - worker2 = status2.MPI_SOURCE; - if (worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); } - if (w_rows_available==0 ) { - // give up on this worker and release him as there is no more work to be done - //MPI_Send (&worker2, 1, MPI_INT, worker2, WAIT_TAG, MPI_COMM_WORLD); - MPI_Send (0, 0, MPI_INT, worker2, RELEASE_TAG, MPI_COMM_WORLD); - count++; - std::cout << "HOST2 skipping+releasing a WORKER: " << worker2 << std::endl; - } - else { - std::cout << "HOST2 found a WORKER: " << worker2 << " WORKER NEEDS: " << w_rows_available << std::endl; - break; - } - } + if (tag==DATA_TAG) { + int rows_needed; + MPI_Recv (&rows_needed, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); + if (rows_available>0) { + std::cout << "HOST1 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; + int rows_accepted = rows_available < rows_needed ? rows_available : rows_needed; + MPI_Send (&b_rank, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); + MPI_Send (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD); + MPI_Recv (&dm.Phi.data()[phi_row], rows_accepted, rowvecs, worker, tag, MPI_COMM_WORLD, &status); + + rows_available -= rows_accepted; + phi_row += rows_accepted; + if (rows_available==0 ) { std::cout << "!!! HOST1 LOCAL MATRIX IS FILLED !!!" << std::endl;} + if (rows_available<0 ) { throw std::runtime_error(" HOST1: The number of rows in the local array is smaller than requested.");} - int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed; + } + else { + //std::cout << "HOST2 received data transfer request from: " << worker << " rows needed: " << rows_needed << " rows_avail:" << rows_available << std::endl; + // host is unable to fit data we have to ask workers for their storage availability + int w_rows_available; + // find a worker to accept at least some data + MPI_Status status2; + int worker2; + + // find a worker capable of accepting data + while (true) { + // std::cout << "HOST2 searching for a WORKER!" << std::endl; + MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status2); + worker2 = status2.MPI_SOURCE; + if (worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); } + if (w_rows_available==0 ) { + // give up on this worker and release him as there is no more work to be done + MPI_Send (0, 0, MPI_INT, worker2, RELEASE_TAG, MPI_COMM_WORLD); + count++; + // std::cout << "HOST2 skipping+releasing a WORKER: " << worker2 << std::endl; + } + else { + // std::cout << "HOST2 found a WORKER: " << worker2 << " WORKER NEEDS: " << w_rows_available << std::endl; + break; + } + } - MPI_Send (&worker2, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD); - MPI_Send (&rows_accepted, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD); + int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed; - //MPI_Send (&worker2, 1, MPI_INT, worker2, WAIT_TAG, MPI_COMM_WORLD); - } - } - else { - MPI_Recv (&wsize, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD, &status); - // there is no more work so release a worker - if (wsize==0) { - MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD); - count++; + MPI_Send (&worker2, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD); + MPI_Send (&rows_accepted, 1, MPI_INT, worker, DATA_TAG, MPI_COMM_WORLD); + } } else { - MPI_Send (0, 0, MPI_INT, worker, WAIT_TAG, MPI_COMM_WORLD); + int rows_available; + MPI_Recv (&rows_available, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD, &status); + // there is no more work so release a worker + if (rows_available==0) { + MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD); + count++; + } + else { + MPI_Send (0, 0, MPI_INT, worker, WAIT_TAG, MPI_COMM_WORLD); + } + if (count==ncpu) { break; } // count starts from 1 } - if (count==ncpu) { break; } // count starts from 1 - } - } - std::cout << "HOST: " << "r_avail: " << rows_available << std::endl; - std::cout << "HOST EXIT" << std::endl; -} -else { // WORKER - std::cout << "WORKER INIT: " << b_rank << std::endl; - while (true) { - // ask for more work... - MPI_Send (&rows_available, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD); - // request from root or from other workers - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); - int worker = status.MPI_SOURCE; - int tag = status.MPI_TAG; - - // release a worker - if (status.MPI_TAG == RELEASE_TAG) { - int temp; - MPI_Recv (&temp, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - if (rows_available!=0) { throw std::runtime_error("Attempting to release a worker... but the workers requires more data !!");} - break; } - else if (status.MPI_TAG == WAIT_TAG) { - int temp; - MPI_Recv (&temp, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - // do nothing here; ask for more work in the next cycle - // std::this_thread::sleep_for(std::chrono::milliseconds(1000)); //TODO - } - else if (status.MPI_TAG == DATA_TAG) { - // other worker is giving me some data - // this is actually necesarry for a worker to know number of rows - //int rows_accepted; - //MPI_Recv (&rows_accepted, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - int arr_size; - MPI_Get_count(&status, MPI_DOUBLE, &arr_size); - //MPI_Get_count(&status, MPI_DOUBLE, &rows_accepted); - int rows_accepted = arr_size/phi_cols; - if (rows_available<rows_accepted) { throw std::runtime_error("Number of rows available is smaller than number of provided rows");} - MPI_Recv (&dm.Phi.data()[phi_row], rows_available, rowvecs, worker, tag, MPI_COMM_WORLD, &status); - rows_available -= rows_accepted; - phi_row += rows_accepted; - } - else if (status.MPI_TAG == WORK_TAG) { - // otherwise get work package - int fn_length; // length of the filename char array - int first; // index of the first structure to read from the file - int nstruc; // number of structures to be processed - MPI_Recv (&fn_length, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); - - char *fn = (char *) malloc(fn_length+1); - MPI_Recv (fn, fn_length, MPI_CHAR, 0, WORK_TAG, MPI_COMM_WORLD, &status); - MPI_Recv (&first, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); - MPI_Recv (&nstruc, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); - std::cout << "WORKER: " << b_rank << "file: " << fn << " first: " << first << " nstruc: " << nstruc << std::endl; - - // do work - StructureDB stdb; - stdb.add(std::string(fn,fn_length),first,nstruc); - std::cout << "WORKER: " << b_rank << " stdb.size(): " << stdb.size() << std::endl; - nnf.calc(stdb); - - // compute number of rows needed for a given StructureDB - int rows_needed = 0; - for (size_t s=0; s<stdb.size(); ++s) { - int natoms = stdb(s).natoms(); - rows_needed += DesignMatrixBase::phi_rows_num(config, 1, natoms); - } - - std:: cout << "WORKER BEGIN COMP: " << b_rank << " R avail: " << rows_available << " R needed: " << rows_needed << std::endl; - if (rows_available<rows_needed) { - // we do not have enough rows in the local phi matrix - // so we create temp DM of required size - DesignMatrix<DM_Function_Base&> temp_dm(*fb, config); - temp_dm.Phi.resize(rows_needed,phi_cols); + std::cout << "HOST: " << "r_avail: " << rows_available << std::endl; + std::cout << "HOST EXIT" << std::endl; + } + else { // WORKER + std::cout << "WORKER INIT: " << b_rank << std::endl; + while (true) { + // ask for more work... + MPI_Send (&rows_available, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD); + // request from root or from other workers + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + int worker = status.MPI_SOURCE; + int tag = status.MPI_TAG; - // and compute all rows - size_t temp_phi_row=0; + // release a worker + if (status.MPI_TAG == RELEASE_TAG) { + int temp; + MPI_Recv (&temp, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); + if (rows_available!=0) { throw std::runtime_error("Attempting to release a worker... but the workers requires more data !!");} + break; + } + else if (status.MPI_TAG == WAIT_TAG) { + int temp; + MPI_Recv (&temp, 1, MPI_INT, worker, tag, MPI_COMM_WORLD, &status); + // do nothing here; ask for more work in the next cycle + } + else if (status.MPI_TAG == DATA_TAG) { + // other worker is giving me some data + + int arr_size; + MPI_Get_count(&status, MPI_DOUBLE, &arr_size); + int rows_accepted = arr_size/phi_cols; + if (rows_available<rows_accepted) { throw std::runtime_error("Number of rows available is smaller than number of provided rows");} + MPI_Recv (&dm.Phi.data()[phi_row], rows_available, rowvecs, worker, tag, MPI_COMM_WORLD, &status); + rows_available -= rows_accepted; + phi_row += rows_accepted; + } + else if (status.MPI_TAG == WORK_TAG) { + // otherwise get work package + int fn_length; // length of the filename char array + int first; // index of the first structure to read from the file + int nstruc; // number of structures to be processed + MPI_Get_count(&status, MPI_CHAR, &fn_length); + + char *fn = (char *) malloc(fn_length+1); + MPI_Recv (fn, fn_length, MPI_CHAR, 0, WORK_TAG, MPI_COMM_WORLD, &status); + MPI_Recv (&first, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); + MPI_Recv (&nstruc, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); + // std::cout << "WORKER: " << b_rank << "file: " << fn << " first: " << first << " nstruc: " << nstruc << std::endl; + + // do work + StructureDB stdb; + stdb.add(std::string(fn,fn_length),first,nstruc); + // std::cout << "WORKER: " << b_rank << " stdb.size(): " << stdb.size() << std::endl; + nnf.calc(stdb); + + // compute number of rows needed for a given StructureDB + int rows_needed = 0; for (size_t s=0; s<stdb.size(); ++s) { - StDescriptors st_d = dc.calc(stdb(s)); - temp_dm.build(temp_phi_row,stdb(s),st_d); // phi_row++ + int natoms = stdb(s).natoms(); + rows_needed += DesignMatrixBase::phi_rows_num(config, 1, natoms); } - // first we try to fill remaining rows in the local phi matrix - // copy top of temp_dm.Phi to the bottom of dm. Phi in reverse order - if (rows_available>0) { - for (rows_available; rows_available>0; rows_available--) { - for (int c=0; c<phi_cols; c++) { - dm.Phi(phi_row,c) = temp_dm.Phi(rows_available-1,c); + // std:: cout << "WORKER BEGIN COMP: " << b_rank << " R avail: " << rows_available << " R needed: " << rows_needed << std::endl; + if (rows_available<rows_needed) { + // we do not have enough rows in the local phi matrix + // so we create temp DM of required size + DesignMatrix<DM_Function_Base&> temp_dm(*fb, config); + temp_dm.Phi.resize(rows_needed,phi_cols); + + // and compute all rows + size_t temp_phi_row=0; + for (size_t s=0; s<stdb.size(); ++s) { + StDescriptors st_d = dc.calc(stdb(s)); + temp_dm.build(temp_phi_row,stdb(s),st_d); // phi_row++ + } + + // first we try to fill remaining rows in the local phi matrix + // copy top of temp_dm.Phi to the bottom of dm. Phi in reverse order + if (rows_available>0) { + for (rows_available; rows_available>0; rows_available--) { + for (int c=0; c<phi_cols; c++) { + dm.Phi(phi_row,c) = temp_dm.Phi(rows_available-1,c); + } + phi_row++; + rows_needed--; } - phi_row++; - rows_needed--; } - } - // there are no more available rows - // send remaining data to available processes - while (rows_needed > 0) { - // request host - MPI_Send (&rows_needed, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD); - int rows_accepted; // number of accepted rows - int dest; // receiving process - // host returns which dest can accept and how much - MPI_Recv (&dest, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD, &status); - - MPI_Recv (&rows_accepted, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD, &status); - // we send data to the host or a willing worker - int start=temp_dm.Phi.rows()-rows_needed; - - // Define temp data type for temp Phi matrix - // Phi is stored in a column-major order - MPI_Datatype trowvec, trowvecs; - MPI_Type_vector( temp_dm.Phi.cols(), 1, temp_dm.Phi.rows(), MPI_DOUBLE, &trowvec); - MPI_Type_commit(&trowvec); - MPI_Type_create_resized(trowvec, 0, 1*sizeof(double), &trowvecs); - MPI_Type_commit(&trowvecs); - - // ready to send - MPI_Send (&temp_dm.Phi.data()[start], rows_accepted, trowvecs, dest, DATA_TAG, MPI_COMM_WORLD); - rows_needed -= rows_accepted; + // there are no more available rows + // send remaining data to available processes + while (rows_needed > 0) { + // request host + MPI_Send (&rows_needed, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD); + int rows_accepted; // number of accepted rows + int dest; // receiving process + // host returns which dest can accept and how much + MPI_Recv (&dest, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD, &status); + + MPI_Recv (&rows_accepted, 1, MPI_INT, 0, DATA_TAG, MPI_COMM_WORLD, &status); + // we send data to the host or a willing worker + int start=temp_dm.Phi.rows()-rows_needed; + + // Define temp data type for temp Phi matrix + // Phi is stored in a column-major order + MPI_Datatype trowvec, trowvecs; + MPI_Type_vector( temp_dm.Phi.cols(), 1, temp_dm.Phi.rows(), MPI_DOUBLE, &trowvec); + MPI_Type_commit(&trowvec); + MPI_Type_create_resized(trowvec, 0, 1*sizeof(double), &trowvecs); + MPI_Type_commit(&trowvecs); + + // ready to send + MPI_Send (&temp_dm.Phi.data()[start], rows_accepted, trowvecs, dest, DATA_TAG, MPI_COMM_WORLD); + rows_needed -= rows_accepted; + MPI_Type_free(&trowvec); + } } - - } - else { - // just fill local phi array as it is large enough - for (size_t s=0; s<stdb.size(); ++s) { - StDescriptors st_d = dc.calc(stdb(s)); - dm.build(phi_row,stdb(s),st_d); // build() increments phi_row++ + else { + // just fill local phi array as it is large enough + for (size_t s=0; s<stdb.size(); ++s) { + StDescriptors st_d = dc.calc(stdb(s)); + dm.build(phi_row,stdb(s),st_d); // build() increments phi_row++ + } + rows_available-=rows_needed; } - rows_available-=rows_needed; - } - if (fn) - delete fn; + if (fn) + delete fn; + } } + if (rows_available==0) std::cout << "+++++++ WORKER IS FULL++++++++ " << b_rank << " ROWS AVAILABLE: " << rows_available << std::endl; + + } + std::cout << "RANK" << rank << " HOST-WORKER EXIT SUCCESS" << std::endl; + // END HOST-WORKER + + // All phi matrices are computed by this point + + // Descriptor for scalaPACK + int descPHI[9]; + int descPHI2[9]; + int descB[9]; + int descB2[9]; + int info; + int info2; + descinit_( descPHI, &PHI_rows, &PHI_cols, &rnb1, &cnb1, &izero, &izero, &context1, /*leading dimension*/&phi_rows, &info); + descinit_( descPHI2, &PHI_rows, &PHI_cols, &rnb2, &cnb2, &izero, &izero, &context2, /*leading dimension*/&phi_rows2, &info2); + if(info != 0) { + printf("Error in descinit, info = %d\n", info); + } + if(info2 != 0) { + printf("Error in descinit, info = %d\n", info2); } - if (rows_available==0) std::cout << "+++++++ WORKER IS FULL++++++++ " << b_rank << " ROWS AVAILABLE: " << rows_available << std::endl; -} -std::cout << "RANK" << rank << " HOST-WORKER EXIT SUCCESS" << std::endl; -// END HOST-WORKER - -// All phi matrices are computed by this point - -// Descriptor for scalaPACK -int descPHI[9]; -int descPHI2[9]; -int descB[9]; -int info; -int info2; -descinit_( descPHI, &PHI_rows, &PHI_cols, &rnb1, &cnb1, &izero, &izero, &context1, /*leading dimension*/&phi_rows, &info); -descinit_( descPHI2, &PHI_rows, &PHI_cols, &rnb2, &cnb2, &izero, &izero, &context2, /*leading dimension*/&phi_rows2, &info2); -descinit_( descB , &PHI_rows, &ione, &rnb1, &cnb1, &izero, &izero, &context1, /*leading dimension*/&phi_rows, &info2); - -if (rank==1) { - std::cout << "descPHI:"<< std::endl; - for (int i=0;i<9;++i) { - std::cout << descPHI[i] << " "; + descinit_( descB , &PHI_rows, &ione, &rnb1, &cnb1, &izero, &izero, &context1, /*leading dimension*/&phi_rows, &info); + descinit_( descB2 , &PHI_rows, &ione, &rnb2, &cnb2, &izero, &izero, &context2, /*leading dimension*/&phi_rows2, &info2); + + // if (rank==1) { + // std::cout << "descPHI:"<< std::endl; + // for (int i=0;i<9;++i) { + // std::cout << descPHI[i] << " "; + // } + // std::cout << std::endl; + // std::cout << "descB:"<< std::endl; + // for (int i=0;i<9;++i) { + // std::cout << descB[i] << " "; + // } + // std::cout << std::endl; + // } + if(info != 0) { + printf("Error in descinit, info = %d\n", info); } - std::cout << std::endl; - std::cout << "descB:"<< std::endl; - for (int i=0;i<9;++i) { - std::cout << descB[i] << " "; + if(info2 != 0) { + printf("Error in descinit, info = %d\n", info2); } - std::cout << std::endl; -} -if(info != 0) { - printf("Error in descinit, info = %d\n", info); -} -if(info2 != 0) { - printf("Error in descinit, info = %d\n", info2); -} -double MPIt1 = MPI_Wtime(); -char trans='N'; -int nrhs = 1; -double *b = new double[phi_rows]; -for (int i=0;i<phi_rows;++i) b[i]=i; + double MPIt1 = MPI_Wtime(); + char trans='N'; + int nrhs = 1; + double *b = new double[phi_rows]; + for (int i=0;i<phi_rows;++i) b[i]=i; + + double *b2 = new double[phi_rows2]; + for (int i=0;i<phi_rows2;++i) b2[i]=i; -if (rank==0) - std::cout << dm.Phi; + if (rank==0) + std::cout << dm.Phi; -int ia = 1; -int ja = 1; -int ib = 1; -int jb = 1; + int ia = 1; + int ja = 1; + int ib = 1; + int jb = 1; -// + +// // Distribute data in 2D block cyclic // DesignMatrix<DM_Function_Base&> dm2(*fb, config); // dm2.Phi.resize(phi_rows2,phi_cols2); // -//pdgemr2d_(&PHI_rows, &PHI_cols, dm.Phi.ptr(), &ione, &ione, descPHI, -// dm2.Phi.ptr(), &ione, &ione, descPHI2, &context2); -//if (rank==0) { -// std::cout << "----------Making copy-----------" << std::endl; -// std::cout << dm2.Phi; -//} - -double wkopt; -int lwork = -1; // query -> get size of the work matrix -pdgels_(&trans, &phi_rows, &phi_cols, &nrhs, dm.Phi.ptr(), &ia, &ja, descPHI, b, &ib, &jb, descB, &wkopt, &lwork, &info); -if (info != 0) { - printf("Error in potrf, info = %d\n", info); -} +// pdgemr2d_(&PHI_rows, &PHI_cols, dm.Phi.ptr(), &ione, &ione, descPHI, +// dm2.Phi.ptr(), &ione, &ione, descPHI2, &context2); + + +// if (rank==0) { +// std::cout << "----------Making copy-----------" << std::endl; +// printf( "context= %d, rank = %d, nprow = %d, npcol = %d, myrow = %d, mycol = %d\n", context2, rank, b_nrows2, b_ncols2, b_row2, b_col2 ); +// std::cout << dm2.Phi; +// } + + double wkopt; + double wkopt2; + int lwork = -1; // query -> get size of the work matrix + int lwork2 = -1; // query -> get size of the work matrix + // + std::cout << "phi_cols: " << phi_cols << std::endl; + pdgels_(&trans, &PHI_rows, &PHI_cols, &nrhs, dm.Phi.ptr(), &ia, &ja, descPHI, b, &ib, &jb, descB, &wkopt, &lwork, &info); + if (info != 0) { + printf("Error in pdgels, info = %d\n", info); + } + + //std::cout << "phi_cols2: " << phi_cols2 << std::endl; + //pdgels_(&trans, &phi_rows2, &phi_cols2, &nrhs, dm2.Phi.ptr(), &ia, &ja, descPHI2, b2, &ib, &jb, descB2, &wkopt2, &lwork2, &info2); + //if (info2 != 0) { + // printf("Error in pdgels, info = %d\n", info); + //} -lwork = (int)wkopt; -//std::cout << "work array size: " << lwork << std::endl; + lwork = (int)wkopt; + //lwork2 = (int)wkopt2; -double *work = new double[lwork]; -pdgels_(&trans, &phi_rows, &phi_cols, &nrhs, dm.Phi.ptr(), &ia, &ja, descPHI, b, &ib, &jb, descB, work, &lwork, &info); + //std::cout << "work array size: " << lwork << std::endl; -double MPIt2 = MPI_Wtime(); -//printf("[%dx%d] Done, time %e s.\n", b_row1, b_col1, MPIt2 - MPIt1); + double *work = new double[lwork]; + pdgels_(&trans, &PHI_rows, &PHI_cols, &nrhs, dm.Phi.ptr(), &ia, &ja, descPHI, b, &ib, &jb, descB, work, &lwork, &info); + std::cout << "---b vec: rank: " << b_rank << " "; + for (int i=0;i<phi_rows;++i) std::cout << b[i] << " "; + std::cout << std::endl; + std::cout << std::endl; -delete[] b; -delete[] work; -MPI_Type_free(&rowvec); -MPI_Type_free(&rowvecs); + //double *work2 = new double[lwork2]; + //pdgels_(&trans, &phi_rows2, &phi_cols2, &nrhs, dm2.Phi.ptr(), &ia, &ja, descPHI2, b2, &ib, &jb, descB2, work2, &lwork2, &info2); + double MPIt2 = MPI_Wtime(); + //printf("[%dx%d] Done, time %e s.\n", b_row1, b_col1, MPIt2 - MPIt1); + + //std::cout << "---b2: "; + //for (int i=0;i<phi_rows2;++i) std::cout << b2[i] << " "; + //std::cout << std::endl; + //std::cout << std::endl; + + delete[] b; + delete[] b2; + delete[] work; + //delete[] work2; + MPI_Type_free(&rowvec); + MPI_Type_free(&rowvecs); #endif -// if (is_verbose()) std::cout << "Loading structures..." << std::flush; -// StructureDB stdb(config); -// if (is_verbose()) std::cout << "Done!" << std::endl; -// -// if (is_verbose()) std::cout << "Finding nearest neighbours within: " << -// config.get<double>("RCUTMAX") << " cutoff distance..." << std::flush; -// nnf.calc(stdb); -// if (is_verbose()) std::cout << "Done!" << std::endl; -// -// if (is_verbose()) std::cout << "Training start..." << std::flush; -// -// // here we want to train with the locally allocated phi -// model->train(stdb,dc); -// -// //if (is_verbose()) std::cout << "Done!" << std::endl; -// -// Config param_file = model->get_param_file(); -// //param_file.check_pot_file(); -// //std::ofstream outfile; -// //outfile.open ("pot.tadah"); -// //outfile << param_file << std::endl;; -// -// //if(train->count("--uncertainty")) { -// // t_type weights = model->get_weights(); -// // t_type unc = model->get_weights_uncertainty(); -// // Output(param_file,false).print_train_unc(weights, unc); -// //} -// -// //if (is_verbose()) std::cout << timer_tot.to_string() << std::endl; -// -if(model) - delete model; -if(fb) - delete fb; + // if (is_verbose()) std::cout << "Loading structures..." << std::flush; + // StructureDB stdb(config); + // if (is_verbose()) std::cout << "Done!" << std::endl; + // + // if (is_verbose()) std::cout << "Finding nearest neighbours within: " << + // config.get<double>("RCUTMAX") << " cutoff distance..." << std::flush; + // nnf.calc(stdb); + // if (is_verbose()) std::cout << "Done!" << std::endl; + // + // if (is_verbose()) std::cout << "Training start..." << std::flush; + // + // // here we want to train with the locally allocated phi + // model->train(stdb,dc); + // + // //if (is_verbose()) std::cout << "Done!" << std::endl; + // + // Config param_file = model->get_param_file(); + // //param_file.check_pot_file(); + // //std::ofstream outfile; + // //outfile.open ("pot.tadah"); + // //outfile << param_file << std::endl;; + // + // //if(train->count("--uncertainty")) { + // // t_type weights = model->get_weights(); + // // t_type unc = model->get_weights_uncertainty(); + // // Output(param_file,false).print_train_unc(weights, unc); + // //} + // + // //if (is_verbose()) std::cout << timer_tot.to_string() << std::endl; + // + if(model) + delete model; + if(fb) + delete fb; blacs_gridexit_(&context1); blacs_gridexit_(&context2); //MPI_Win_free(&window); - } +} void TadahCLI::subcommand_predict() { CLI::Timer timer_tot {"Prediction", CLI::Timer::Big};