diff --git a/bin/tadah_cli.cpp b/bin/tadah_cli.cpp index 5ab39857f6c29d2a5d64bf6a82b40e82407b354a..f151e136e4d5d48002d3a134e0f08d21c250551b 100644 --- a/bin/tadah_cli.cpp +++ b/bin/tadah_cli.cpp @@ -35,10 +35,10 @@ void TadahCLI::subcommand_train() { int rank = 0; int ncpu = 1; - #ifdef TADAH_BUILD_MPI MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &ncpu); + MPI_Status status; #endif /* MPI CODE: @@ -88,6 +88,7 @@ void TadahCLI::subcommand_train() { M_Tadah_Base *model = CONFIG::factory<M_Tadah_Base,DM_Function_Base&,Config&> (config.get<std::string>("MODEL",0),*fb,config); +#ifdef TADAH_BUILD_MPI int PHI_cols; int PHI_rows; @@ -97,12 +98,10 @@ void TadahCLI::subcommand_train() { PHI_cols = fb->get_phi_cols(config); PHI_rows = DesignMatrixBase::phi_rows_num(config, nstruct_tot, natoms_tot); } -#ifdef TADAH_BUILD_MPI MPI_Bcast(&PHI_rows, 1, MPI_INT, 0, MPI_COMM_WORLD); MPI_Bcast(&PHI_cols, 1, MPI_INT, 0, MPI_COMM_WORLD); std::cout << "PHI ROWS: " << PHI_rows << std::endl; std::cout << "PHI COLS: " << PHI_cols << std::endl; -#endif // Initialize BLACS int b_rank; @@ -114,7 +113,6 @@ void TadahCLI::subcommand_train() { int rnb = 22; // Row block size int cnb = 22; // Column block size -#ifdef TADAH_BUILD_MPI blacs_pinfo_(&b_rank, &ncpu) ; // BLACS rank and world size std::cout << "RANK: " << rank << " BRANK: " << b_rank << " ncpu: " << ncpu << std::endl; @@ -145,18 +143,17 @@ void TadahCLI::subcommand_train() { // allocate matrix for each worker phi = Matrix(phi_rows,phi_cols); } -#endif const int RELEASE_TAG = 2; const int WORK_TAG = 0; if (rank==0) { - + // HOST // prepare work packages // filename, first structure index, number of structures to read std::vector<std::tuple<std::string,int,int>> wpckgs; - int nstruc = 88; // TODO + int nstruc = 50; // TODO the number of structures in a single work package for (const std::string &fn : config("DBFILE")) { // get number of structures int dbsize = StructureDB::count(fn).first; @@ -174,62 +171,71 @@ void TadahCLI::subcommand_train() { } } - - // host - // seed all workers - // implement host function to request next work package - // the workpackage must contain: - // - dataset file name as *char - // - length of above name - for (int r= 1; r< ncpu; ++r) { + // HOST is waiting for workers requests + while (true) { if (wpckgs.size()==0) { // no more packages, just skip remaining workers // we will release them soon... break; } + + // receive request from ANY worker + MPI_Recv (NULL, 0, MPI_INT, MPI_ANY_SOURCE, WORK_TAG, MPI_COMM_WORLD, &status); + int worker = status.MPI_SOURCE; + std::tuple<std::string,int,int> wpckg = wpckgs.back(); wpckgs.pop_back(); // send dataset filename const char *fn = std::get<0>(wpckg).c_str(); int fn_length = std::get<0>(wpckg).length()+1; // +1 for char - MPI_Send (&fn_length, 1, MPI_INT, r, WORK_TAG, MPI_COMM_WORLD); - MPI_Send (fn, fn_length, MPI_CHAR, r, WORK_TAG, MPI_COMM_WORLD); + MPI_Send (&fn_length, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); + MPI_Send (fn, fn_length, MPI_CHAR, worker, WORK_TAG, MPI_COMM_WORLD); // send index of the first structure to load int first = std::get<1>(wpckg); - MPI_Send (&first, 1, MPI_INT, r, WORK_TAG, MPI_COMM_WORLD); + MPI_Send (&first, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); // send number of structures to load int nstruc = std::get<2>(wpckg); - MPI_Send (&nstruc, 1, MPI_INT, r, WORK_TAG, MPI_COMM_WORLD); + MPI_Send (&nstruc, 1, MPI_INT, worker, WORK_TAG, MPI_COMM_WORLD); std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl; } // work finised, release all workers - for (int r= 1; r< ncpu; ++r) { - MPI_Send (0, 0, MPI_INT, r, RELEASE_TAG, MPI_COMM_WORLD); + int count=0; + while(true) { + MPI_Recv (NULL, 0, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + int worker = status.MPI_SOURCE; + MPI_Send (0, 0, MPI_INT, worker, RELEASE_TAG, MPI_COMM_WORLD); + count++; + if (count==ncpu-1) break; } - } - else { + } else { // worker int fn_length; int first; int nstruc; - MPI_Status status; while (true) { + // ask for more work... + MPI_Send (NULL, 0, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD); std::cout << "WORKER NUM: " << b_rank << " waiting..." << std::endl; + // get either work or release signal MPI_Recv (&fn_length, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); std::cout << "WORKER NUM: " << b_rank << " got message!" << std::endl; std::cout << "Lenght of char fn: " << fn_length << std::endl; std::cout << "MPI_TAG: " << status.MPI_TAG << std::endl; + + // release worker if (status.MPI_TAG == RELEASE_TAG) { std::cout << "WORKER: " << b_rank << "RELEASE TAG" << std::endl; break; } + + // otherwise do work char *fn = (char *) malloc(fn_length+1); MPI_Recv (fn, fn_length, MPI_CHAR, 0, WORK_TAG, MPI_COMM_WORLD, &status); MPI_Recv (&first, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); @@ -238,12 +244,13 @@ void TadahCLI::subcommand_train() { StructureDB stdb; stdb.add(std::string(fn,fn_length),first,nstruc); - std::cout << "WORKER: " << stdb << std::endl; + std::cout << "WORKER: " << b_rank << stdb << std::endl; if (fn) delete fn; } } +#endif if (is_verbose()) std::cout << "Loading structures..." << std::flush; @@ -439,7 +446,7 @@ void TadahCLI::subcommand_hpo( int s; if ( rank == 0 ) { // host proc distributes work equally between available processes - // Each process will recieve an array of integers. + // Each process will receive an array of integers. // Integers correspond to indices in the trg vector // e.g. indices 3,4 indicate that the process // should work on target files trg[3] and trg[4]