diff --git a/bin/tadah_cli.cpp b/bin/tadah_cli.cpp index 7a4edbb010c13c0a204c7cbc4771c59c28d28ceb..5ab39857f6c29d2a5d64bf6a82b40e82407b354a 100644 --- a/bin/tadah_cli.cpp +++ b/bin/tadah_cli.cpp @@ -42,11 +42,15 @@ void TadahCLI::subcommand_train() { #endif /* MPI CODE: + * The PHI matrix is divided into local phi matrices. + * * 1. each rank reads config file - * 2. root calculate total number of structures - * 3. root calculate the dimensions of the PHI matrix + * 2. host calculates total number of structures + * 3. host calculates the dimensions of the PHI matrix * based on force and stress flags and type of the regression - * 4. root broadcast PHI cols number to all cpus + * 4. host broadcasts PHI number of cols and rows to all workers + * 5. compute sizes for local phi matrices + * 6. Each worker allocates memory for a local matrix phi */ @@ -109,6 +113,7 @@ void TadahCLI::subcommand_train() { int b_ncols; // Number of column procs int rnb = 22; // Row block size int cnb = 22; // Column block size + #ifdef TADAH_BUILD_MPI blacs_pinfo_(&b_rank, &ncpu) ; // BLACS rank and world size std::cout << "RANK: " << rank << " BRANK: " << b_rank << " ncpu: " << ncpu << std::endl; @@ -132,17 +137,112 @@ void TadahCLI::subcommand_train() { // int PHI_size = PHI_rows*PHI_cols; int phi_rows = numroc_( &PHI_rows, &rnb, &b_row, &izero, &b_nrows ); int phi_cols = numroc_( &PHI_cols, &cnb, &b_col, &izero, &b_ncols ); - std::cout << "BRANK: " << b_rank << " phi_row: " << phi_rows << " phi_col " << phi_cols << std::endl; + // once we know the size of the PHI, we can assign local phi matrices + Matrix phi; + if (rank!=0) { + // allocate matrix for each worker + phi = Matrix(phi_rows,phi_cols); + } #endif - // once we know the size of the PHI, we can assign local phi matrices + const int RELEASE_TAG = 2; + const int WORK_TAG = 0; + + + if (rank==0) { + + // prepare work packages + // filename, first structure index, number of structures to read + std::vector<std::tuple<std::string,int,int>> wpckgs; + int nstruc = 88; // TODO + for (const std::string &fn : config("DBFILE")) { + // get number of structures + int dbsize = StructureDB::count(fn).first; + std::cout << "DBSIZE: " << dbsize << std::endl; + int first=0; + while(true) { + if (nstruc < dbsize) { + wpckgs.push_back(std::make_tuple(fn,first,nstruc)); + first += nstruc; + } else { + wpckgs.push_back(std::make_tuple(fn,first,dbsize)); + break; + } + dbsize-=nstruc; + } + } + + + // host + // seed all workers + // implement host function to request next work package + // the workpackage must contain: + // - dataset file name as *char + // - length of above name + for (int r= 1; r< ncpu; ++r) { + if (wpckgs.size()==0) { + // no more packages, just skip remaining workers + // we will release them soon... + break; + } + std::tuple<std::string,int,int> wpckg = wpckgs.back(); + wpckgs.pop_back(); + + // send dataset filename + const char *fn = std::get<0>(wpckg).c_str(); + int fn_length = std::get<0>(wpckg).length()+1; // +1 for char + MPI_Send (&fn_length, 1, MPI_INT, r, WORK_TAG, MPI_COMM_WORLD); + MPI_Send (fn, fn_length, MPI_CHAR, r, WORK_TAG, MPI_COMM_WORLD); + + // send index of the first structure to load + int first = std::get<1>(wpckg); + MPI_Send (&first, 1, MPI_INT, r, WORK_TAG, MPI_COMM_WORLD); + + // send number of structures to load + int nstruc = std::get<2>(wpckg); + MPI_Send (&nstruc, 1, MPI_INT, r, WORK_TAG, MPI_COMM_WORLD); - for (const std::string &fn : config("DBFILE")) { - // get number of structures and tot atoms in all structures - std::pair<int,int> dbsize = StructureDB::count(fn); + std::cout << "HOST: " << fn << " " << first << " " << nstruc << std::endl; + } + + // work finised, release all workers + for (int r= 1; r< ncpu; ++r) { + MPI_Send (0, 0, MPI_INT, r, RELEASE_TAG, MPI_COMM_WORLD); + } + } + else { + // worker + int fn_length; + int first; + int nstruc; + MPI_Status status; + + while (true) { + std::cout << "WORKER NUM: " << b_rank << " waiting..." << std::endl; + MPI_Recv (&fn_length, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + std::cout << "WORKER NUM: " << b_rank << " got message!" << std::endl; + std::cout << "Lenght of char fn: " << fn_length << std::endl; + std::cout << "MPI_TAG: " << status.MPI_TAG << std::endl; + if (status.MPI_TAG == RELEASE_TAG) { + std::cout << "WORKER: " << b_rank << "RELEASE TAG" << std::endl; + break; + } + char *fn = (char *) malloc(fn_length+1); + MPI_Recv (fn, fn_length, MPI_CHAR, 0, WORK_TAG, MPI_COMM_WORLD, &status); + MPI_Recv (&first, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); + MPI_Recv (&nstruc, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD, &status); + std::cout << "WORKER: " << fn << " " << first << " " << nstruc << std::endl; + + StructureDB stdb; + stdb.add(std::string(fn,fn_length),first,nstruc); + std::cout << "WORKER: " << stdb << std::endl; + + if (fn) + delete fn; + } } @@ -151,14 +251,6 @@ void TadahCLI::subcommand_train() { if (is_verbose()) std::cout << "Done!" << std::endl; - //StructureDB temp; - //int nst = temp.add("db.train", 99, 3); - //std::cout << "Number of structures loaded: " << temp.size() << std::endl; - //std::cout << "Number of structures loaded: " << nst << std::endl; - //std::cout << temp << std::endl; - //std::cout << temp(0) << std::endl; - - if (is_verbose()) std::cout << "Finding nearest neighbours within: " << config.get<double>("RCUTMAX") << " cutoff distance..." << std::flush; @@ -170,6 +262,7 @@ void TadahCLI::subcommand_train() { + // here we want to train with the locally allocated phi model->train(stdb,dc); if (is_verbose()) std::cout << "Done!" << std::endl; @@ -342,10 +435,10 @@ void TadahCLI::subcommand_hpo( std::vector<fs::path> trg=read_targets(targets_dir); // the number of files the process will work on - // also the size of an array it will get from the root process. + // also the size of an array it will get from the host process. int s; if ( rank == 0 ) { - // root proc distributes work equally between available processes + // host proc distributes work equally between available processes // Each process will recieve an array of integers. // Integers correspond to indices in the trg vector // e.g. indices 3,4 indicate that the process @@ -382,7 +475,7 @@ void TadahCLI::subcommand_hpo( #endif } - // and prepare root process for its own work + // and prepare host process for its own work s=counts[rank]; local_trg_indices.resize(s); #ifdef TADAH_BUILD_MPI