diff --git a/trainer.h b/trainer.h index 149bb347d72dfd5253c406b707a4726d06719e50..0aaa6df5f2c15d59adcc43f9f1a778f8ee007e86 100644 --- a/trainer.h +++ b/trainer.h @@ -180,13 +180,98 @@ class MPI_Trainer: public Trainer { dm.Phi.resize(phi_rows1,phi_cols1); dm.T.resize(phi_rows1); dm.Tlabels.resize(phi_rows1); - } void probe() { MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); worker = status.MPI_SOURCE; tag = status.MPI_TAG; } + + /** This method uses ScalaPACK pdgels to obtain vector of weights. + * + * The model weights are updated at the end of this call. + */ + void solve() { + // Descriptors for scalaPACK + int descPHI[9], descPHI2[9]; + int descB[9], descB2[9]; + int info, info2; + + descinit_( descPHI, &PHI_rows, &PHI_cols, &rnb1, &cnb1, &izero, + &izero, &context1, /*leading dimension*/&phi_rows1, &info); + descinit_( descPHI2, &PHI_rows, &PHI_cols, &rnb2, &cnb2, &izero, + &izero, &context2, /*leading dimension*/&phi_rows2, &info2); + + if(info != 0) { + printf("Error in descinit 1a, info = %d\n", info); + } + if(info2 != 0) { + printf("Error in descinit 2a, info = %d\n", info2); + printf("HINT: Check these CONFIG parameters: MPIWPCKG, MBLOCK, NBLOCK\n"); + } + + descinit_( descB, &PHI_rows, &ione, &rnb1, &cnb1, &izero, + &izero, &context1, /*leading dimension*/&phi_rows1, &info); + descinit_( descB2, &PHI_rows, &ione, &rnb2, &cnb2, &izero, + &izero, &context2, /*leading dimension*/&phi_rows2, &info2); + + if(info != 0) { + printf("Error in descinit 1b, info = %d\n", info); + } + if(info2 != 0) { + printf("Error in descinit 2b, info = %d\n", info2); + printf("HINT: Check these CONFIG parameters: MPIWPCKG, MBLOCK, NBLOCK\n"); + } + + //double MPIt1 = MPI_Wtime(); + char trans='N'; + int nrhs = 1; + + int ia = 1; + int ja = 1; + int ib = 1; + int jb = 1; + // Distribute data in 2D block cyclic + DesignMatrix<DM_Function_Base&> dm2(*fb, config); + dm2.Phi.resize(phi_rows2,phi_cols2); + dm2.T.resize(phi_rows2); + dm2.Tlabels.resize(phi_rows2); + + pdgemr2d_(&PHI_rows, &PHI_cols, dm.Phi.ptr(), &ione, &ione, descPHI, + dm2.Phi.ptr(), &ione, &ione, descPHI2, &context2); + + pdgemr2d_(&PHI_rows, &ione, dm.T.ptr(), &ione, &ione, descB, + dm2.T.ptr(), &ione, &ione, descB2, &context2); + + double *b2 = dm2.T.ptr(); + double wkopt2; + int lwork2 = -1; // query -> get size of the work matrix + pdgels_(&trans, &PHI_rows, &PHI_cols, &nrhs, dm2.Phi.ptr(), &ia, &ja, + descPHI2, b2, &ib, &jb, descB2, &wkopt2, &lwork2, &info2); + if (info2 != 0) { + printf("Error in pdgels, info = %d\n", info); + } + lwork2 = (int)wkopt2; + double *work2 = new double[lwork2]; + pdgels_(&trans, &PHI_rows, &PHI_cols, &nrhs, dm2.Phi.ptr(), &ia, &ja, + descPHI2, b2, &ib, &jb, descB2, work2, &lwork2, &info2); + + // get weight vector, for context1 + pdgemr2d_(&PHI_rows, &ione, dm2.T.ptr(), &ione, &ione, descB2, + dm.T.ptr(), &ione, &ione, descB, &context1); + + if (rank==0) { + t_type w(dm.T.ptr(), PHI_cols); + model->set_weights(w); + model->trained=true; + } + delete[] work2; + MPI_Type_free(&rowvec); + MPI_Type_free(&rowvecs); + + blacs_gridexit_(&context1); + blacs_gridexit_(&context2); + } }; class MPI_Trainer_HOST { @@ -230,7 +315,8 @@ class MPI_Trainer_HOST { void work_tag() { int rows_available; - MPI_Recv (&rows_available, 1, MPI_INT, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&rows_available, 1, MPI_INT, tr.worker, tr.tag, + MPI_COMM_WORLD, &tr.status); std::tuple<std::string,int,int> wpckg = wpckgs.back(); wpckgs.pop_back(); @@ -252,17 +338,25 @@ class MPI_Trainer_HOST { void data_tag() { int rows_needed; - MPI_Recv (&rows_needed, 1, MPI_INT, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&rows_needed, 1, MPI_INT, tr.worker, tr.tag, + MPI_COMM_WORLD, &tr.status); if (tr.rows_available>0) { - int rows_accepted = tr.rows_available < rows_needed ? tr.rows_available : rows_needed; + int rows_accepted = tr.rows_available < rows_needed ? + tr.rows_available : rows_needed; MPI_Send (&tr.b_rank, 1, MPI_INT, tr.worker, tr.tag, MPI_COMM_WORLD); MPI_Send (&rows_accepted, 1, MPI_INT, tr.worker, tr.tag, MPI_COMM_WORLD); - MPI_Recv (&tr.dm.Phi.data()[tr.phi_row], rows_accepted, tr.rowvecs, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); - MPI_Recv (&tr.dm.T.data()[tr.phi_row], rows_accepted, MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); - MPI_Recv (&tr.dm.Tlabels.data()[tr.phi_row], rows_accepted, MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&tr.dm.Phi.data()[tr.phi_row], rows_accepted, tr.rowvecs, + tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&tr.dm.T.data()[tr.phi_row], rows_accepted, MPI_DOUBLE, + tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&tr.dm.Tlabels.data()[tr.phi_row], rows_accepted, + MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); tr.rows_available -= rows_accepted; tr.phi_row += rows_accepted; - if (tr.rows_available<0 ) { throw std::runtime_error(" HOST2: The number of rows in the local array is smaller than requested.");} + if (tr.rows_available<0 ) { + throw std::runtime_error("HOST: The number of rows in the \ + local array is smaller than requested."); + } } else { // host is unable to fit data we have to ask workers for their storage availability @@ -272,34 +366,35 @@ class MPI_Trainer_HOST { // find a worker capable of accepting data int w_rows_available; while (true) { - MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, TadahCLI::WORK_TAG, MPI_COMM_WORLD, &status2); + MPI_Recv (&w_rows_available, 1, MPI_INT, MPI_ANY_SOURCE, + TadahCLI::WORK_TAG, MPI_COMM_WORLD, &status2); worker2 = status2.MPI_SOURCE; - if (tr.worker==worker2) {throw std::runtime_error("worker and worker2 are the same."); } - //if (w_rows_available==0 ) { - // // give up on this worker and release him as there is no more work to be done - // MPI_Send (0, 0, MPI_INT, worker2, TadahCLI::RELEASE_TAG, MPI_COMM_WORLD); - // count++; - //} - //else { - // found a worker - break; - //} + if (tr.worker==worker2) { + throw std::runtime_error("worker and worker2 are the same."); + } + break; } - int rows_accepted = w_rows_available < rows_needed ? w_rows_available : rows_needed; - MPI_Send (&worker2, 1, MPI_INT, tr.worker, TadahCLI::DATA_TAG, MPI_COMM_WORLD); - MPI_Send (&rows_accepted, 1, MPI_INT, tr.worker, TadahCLI::DATA_TAG, MPI_COMM_WORLD); + int rows_accepted = w_rows_available < rows_needed ? + w_rows_available : rows_needed; + MPI_Send (&worker2, 1, MPI_INT, tr.worker, TadahCLI::DATA_TAG, + MPI_COMM_WORLD); + MPI_Send (&rows_accepted, 1, MPI_INT, tr.worker, + TadahCLI::DATA_TAG, MPI_COMM_WORLD); } } void release_tag(int &count) { int rows_available; - MPI_Recv (&rows_available, 1, MPI_INT, tr.worker, TadahCLI::WORK_TAG, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&rows_available, 1, MPI_INT, tr.worker, + TadahCLI::WORK_TAG, MPI_COMM_WORLD, &tr.status); // there is no more work so release a worker if full if (rows_available==0) { - MPI_Send (0, 0, MPI_INT, tr.worker, TadahCLI::RELEASE_TAG, MPI_COMM_WORLD); + MPI_Send (0, 0, MPI_INT, tr.worker, + TadahCLI::RELEASE_TAG, MPI_COMM_WORLD); count++; } else { - MPI_Send (0, 0, MPI_INT, tr.worker, TadahCLI::WAIT_TAG, MPI_COMM_WORLD); + MPI_Send (0, 0, MPI_INT, tr.worker, + TadahCLI::WAIT_TAG, MPI_COMM_WORLD); } } @@ -320,7 +415,10 @@ class MPI_Trainer_WORKER { bool release_tag() { int temp; MPI_Recv (&temp, 1, MPI_INT, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); - if (tr.rows_available!=0) { throw std::runtime_error("Attempting to release a worker... but the worker requires more data!!");} + if (tr.rows_available!=0) { + throw std::runtime_error("Attempting to release a worker... \ + but the worker requires more data!!"); + } return true; } void wait_tag() { @@ -332,10 +430,16 @@ class MPI_Trainer_WORKER { int arr_size; MPI_Get_count(&tr.status, MPI_DOUBLE, &arr_size); int rows_accepted = arr_size/tr.phi_cols1; - if (tr.rows_available<rows_accepted) { throw std::runtime_error("Number of rows available is smaller than number of provided rows");} - MPI_Recv (&tr.dm.Phi.data()[tr.phi_row], tr.rows_available, tr.rowvecs, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); - MPI_Recv (&tr.dm.T.data()[tr.phi_row], tr.rows_available, MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); - MPI_Recv (&tr.dm.Tlabels.data()[tr.phi_row], tr.rows_available, MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + if (tr.rows_available<rows_accepted) { + throw std::runtime_error("Number of rows available is smaller \ + than number of provided rows"); + } + MPI_Recv (&tr.dm.Phi.data()[tr.phi_row], tr.rows_available, + tr.rowvecs, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&tr.dm.T.data()[tr.phi_row], tr.rows_available, + MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&tr.dm.Tlabels.data()[tr.phi_row], tr.rows_available, + MPI_DOUBLE, tr.worker, tr.tag, MPI_COMM_WORLD, &tr.status); tr.rows_available -= rows_accepted; tr.phi_row += rows_accepted; } @@ -348,9 +452,12 @@ class MPI_Trainer_WORKER { MPI_Get_count(&tr.status, MPI_CHAR, &fn_length); char *fn = (char *) malloc(fn_length+1); - MPI_Recv (fn, fn_length, MPI_CHAR, 0, TadahCLI::WORK_TAG, MPI_COMM_WORLD, &tr.status); - MPI_Recv (&first, 1, MPI_INT, 0, TadahCLI::WORK_TAG, MPI_COMM_WORLD, &tr.status); - MPI_Recv (&nstruc, 1, MPI_INT, 0, TadahCLI::WORK_TAG, MPI_COMM_WORLD, &tr.status); + MPI_Recv (fn, fn_length, MPI_CHAR, 0, TadahCLI::WORK_TAG, + MPI_COMM_WORLD, &tr.status); + MPI_Recv (&first, 1, MPI_INT, 0, TadahCLI::WORK_TAG, + MPI_COMM_WORLD, &tr.status); + MPI_Recv (&nstruc, 1, MPI_INT, 0, TadahCLI::WORK_TAG, + MPI_COMM_WORLD, &tr.status); // do work StructureDB stdb; @@ -402,24 +509,30 @@ class MPI_Trainer_WORKER { int rows_accepted; // number of accepted rows int dest; // receiving process // host returns which dest can accept and how much - MPI_Recv (&dest, 1, MPI_INT, 0, TadahCLI::DATA_TAG, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&dest, 1, MPI_INT, 0, TadahCLI::DATA_TAG, + MPI_COMM_WORLD, &tr.status); - MPI_Recv (&rows_accepted, 1, MPI_INT, 0, TadahCLI::DATA_TAG, MPI_COMM_WORLD, &tr.status); + MPI_Recv (&rows_accepted, 1, MPI_INT, 0, TadahCLI::DATA_TAG, + MPI_COMM_WORLD, &tr.status); // we send data to the host or a willing worker int start=temp_dm.Phi.rows()-rows_needed; // Define temp data type for temp Phi matrix // Phi is stored in a column-major order MPI_Datatype trowvec, trowvecs; - MPI_Type_vector( temp_dm.Phi.cols(), 1, temp_dm.Phi.rows(), MPI_DOUBLE, &trowvec); + MPI_Type_vector( temp_dm.Phi.cols(), 1, temp_dm.Phi.rows(), + MPI_DOUBLE, &trowvec); MPI_Type_commit(&trowvec); MPI_Type_create_resized(trowvec, 0, 1*sizeof(double), &trowvecs); MPI_Type_commit(&trowvecs); // ready to send - MPI_Send (&temp_dm.Phi.data()[start], rows_accepted, trowvecs, dest, TadahCLI::DATA_TAG, MPI_COMM_WORLD); - MPI_Send (&temp_dm.T.data()[start], rows_accepted, MPI_DOUBLE, dest, TadahCLI::DATA_TAG, MPI_COMM_WORLD); - MPI_Send (&temp_dm.Tlabels.data()[start], rows_accepted, MPI_DOUBLE, dest, TadahCLI::DATA_TAG, MPI_COMM_WORLD); + MPI_Send (&temp_dm.Phi.data()[start], rows_accepted, + trowvecs, dest, TadahCLI::DATA_TAG, MPI_COMM_WORLD); + MPI_Send (&temp_dm.T.data()[start], rows_accepted, + MPI_DOUBLE, dest, TadahCLI::DATA_TAG, MPI_COMM_WORLD); + MPI_Send (&temp_dm.Tlabels.data()[start], rows_accepted, + MPI_DOUBLE, dest, TadahCLI::DATA_TAG, MPI_COMM_WORLD); rows_needed -= rows_accepted; MPI_Type_free(&trowvec); MPI_Type_free(&trowvecs);