#include #include #include #include #include #include "misc.hpp" #include "pd.hpp" void print_help(const std::string & invocation) { std::cout << "Usage: " << invocation << " arguments" << std::endl; std::cout << "Arguments:" << std::endl; std::cout << " --in-lists, -i file_1 file_2" << std::endl; std::cout << " Mandatory. Files containing lists of persistence diagram files to process, one file per line." << std::endl; std::cout << " If the same file name is given twice, the computation will exploit symmetry." << std::endl; std::cout << " --dimension, -d dim" << std::endl; std::cout << " Mandatory integer. Degree to read from the persistence diagrams." << std::endl; std::cout << " --sigma, -s σ" << std::endl; std::cout << " Mandatory decimal. σ parameter in heat kernel." << std::endl; std::cout << " --finitize, -f x" << std::endl; std::cout << " Optional decimal. Finitize all infinite intervals to scale x. If not given (default), infinite intervals are ignored." << std::endl; std::cout << " --discretize, -t x" << std::endl; std::cout << " Optional decimal. Discretize to and x-by-x grid." << std::endl; std::cout << " --out, -o file" << std::endl; std::cout << " Mandatory output file name. Plain text." << std::endl; std::cout << " --chunk, -c c" << std::endl; std::cout << " Optional integer. Size of work chunk to send off to each computational node. Too small a value yields a lot of overhead, too large a value can cause an unbalanced load. Increase if there are many small computations. Default: 10." << std::endl; std::cout << " --help, -h" << std::endl; std::cout << " Print this help text." << std::endl; } void fail(int rank, const std::string & message) { if (rank == 0) { std::cout << message << std::endl; } MPI_Finalize(); exit(1); } void arg_fail(const std::string & invocation, int rank, const std::string & message) { if (rank == 0) { std::cout << message << std::endl; print_help(invocation); } MPI_Finalize(); exit(1); } int main(int argc, char ** argv) { MPI_Init(NULL, NULL); std::string invocation(argv[0]); int world_size; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int world_rank; MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); if (world_size < 2) { fail(world_rank, "Currently there is no support for running with just one process. Please run at least 2 MPI jobs."); } char processor_name_tmp[MPI_MAX_PROCESSOR_NAME]; int processor_name_len; MPI_Get_processor_name(processor_name_tmp, &processor_name_len); std::string processor_name(processor_name_tmp); // Begin processing args. std::string list_1_file_name; std::string list_2_file_name; int dim = -1; double sigma = std::numeric_limits::quiet_NaN(); double finitization = std::numeric_limits::infinity(); double discretization = 0; std::string out_file_name; int chunk_size = 10; for (int i = 1; i < argc; ++i) { std::string arg(argv[i]); if (arg == std::string("--in-lists") || arg == std::string("-i")) { if (i < argc - 2) { list_1_file_name = std::string(argv[++i]); list_2_file_name = std::string(argv[++i]); } else arg_fail(invocation, world_rank, "Need two arguments for --in-lists."); } else if (arg == std::string("--dimension") || arg == std::string("-d")) { if (i < argc - 1) dim = std::atoi(argv[++i]); else arg_fail(invocation, world_rank, "Missing argument for --dimension."); } else if (arg == std::string("--sigma") || arg == std::string("-s")) { if (i < argc - 1) sigma = std::atof(argv[++i]); else arg_fail(invocation, world_rank, "Missing argument for --sigma."); } else if (arg == std::string("--finitize") || arg == std::string("-f")) { if (i < argc - 1) finitization = std::atof(argv[++i]); else arg_fail(invocation, world_rank, "Missing argument for --finitize."); } else if (arg == std::string("--discretize") || arg == std::string("-x")) { if (i < argc - 1) discretization = std::atof(argv[++i]); else arg_fail(invocation, world_rank, "Missing argument for --discretize."); } else if (arg == std::string("--out") || arg == std::string("-o")) { if (i < argc - 1) out_file_name = std::string(argv[++i]); else arg_fail(invocation, world_rank, "Missing argument for --out."); } else if (arg == std::string("--chunk") || arg == std::string("-c")) { if (i < argc - 1) chunk_size = std::atoi(argv[++i]); else arg_fail(invocation, world_rank, "Missing argument for --chunk."); } else if (arg == std::string("--help") || arg == std::string("-h")) { if (world_rank == 0) print_help(invocation); MPI_Finalize(); exit(0); } else { arg_fail(invocation, world_rank, "Incorrect argument."); } } // End processing args. // Begin argument validation. if (list_1_file_name == std::string("") || list_2_file_name == std::string("")) arg_fail(invocation, world_rank, "Need input file lists."); if (dim < 0) arg_fail(invocation, world_rank, "Dimension must be non-negative."); if (sigma <= 0) arg_fail(invocation, world_rank, "σ must be positive."); if (finitization <= 0) arg_fail(invocation, world_rank, "Finitization must be positive."); if (discretization < 0) arg_fail(invocation, world_rank, "Discretization must be non-negative."); if (out_file_name == std::string("")) arg_fail(invocation, world_rank, "Need output file."); if (chunk_size <= 0) arg_fail(invocation, world_rank, "Chunk size must be positive."); // End argument validation. bool symmetric = list_1_file_name == list_2_file_name; std::vector files_1; std::vector files_2; std::ifstream list_file(list_1_file_name, std::ios::in); if (!list_file.is_open()) { fail(world_rank, std::string("Failed to load diagram list file ") + list_1_file_name + std::string(".")); } std::string line; while (std::getline(list_file, line)) { files_1.push_back(line); } list_file.close(); if (symmetric) { files_2 = files_1; } else { list_file.open(list_2_file_name, std::ios::in); if (!list_file.is_open()) { fail(world_rank, std::string("Failed to load diagram list file ") + list_2_file_name + std::string(".")); } while (std::getline(list_file, line)) { files_2.push_back(line); } list_file.close(); } int m = files_1.size(); int n = files_2.size(); std::vector> idxs; if (symmetric) { for (int i = 0; i < m; ++i) { for (int j = i; j < n; ++j) { idxs.push_back(std::make_pair(i, j)); } } } else { for (int i = 0; i < m; ++i) { for (int j = 0; j < n; ++j) { idxs.push_back(std::make_pair(i, j)); } } } // Begin master. if (world_rank == 0) { std::vector result(idxs.size(), std::numeric_limits::quiet_NaN()); double unused_buf = std::numeric_limits::quiet_NaN(); std::vector result_reqs(world_size); // Element zero not used, index by actual rank. int done = 0; int next_chunk = 0; std::vector> assigned(world_size, std::make_pair(-2, -2)); std::cout << "Total things to compute: " << idxs.size() << std::endl; for (int r = 1; r < world_size; ++r) { MPI_Irecv(&unused_buf, 0, MPI_DOUBLE, r, (int)Message_tag::result, MPI_COMM_WORLD, &(result_reqs[r])); } while ((size_t) done < idxs.size()) { int respondent_index = -1; MPI_Status status; MPI_Waitany(world_size - 1, &(result_reqs[1]), &respondent_index, &status); int r = 1 + respondent_index; std::cout << "Heard back from rank " << r << "." << std::endl; int recv_count = -1; MPI_Get_count(&status, MPI_DOUBLE, &recv_count); std::cout << "Received " << recv_count << " elements from rank " << r << "." << std::endl; assert(recv_count == assigned[r].second - assigned[r].first); done += recv_count; int work[2] = {-1, -1}; if ((size_t)next_chunk*chunk_size < idxs.size()) { work[0] = next_chunk*chunk_size; work[1] = std::min((int)idxs.size(), (next_chunk + 1)*chunk_size); MPI_Irecv(&(result[work[0]]), work[1] - work[0], MPI_DOUBLE, r, (int)Message_tag::result, MPI_COMM_WORLD, &(result_reqs[r])); std::cout << "Rank will get new work." << std::endl; } else std::cout << "Rank will terminate." << std::endl; assigned[r] = std::make_pair(work[0], work[1]); ++next_chunk; MPI_Send(work, 2, MPI_INT, r, (int)Message_tag::work, MPI_COMM_WORLD); std::cout << 100*(double)done/(double)idxs.size() << "% complete." << std::endl; std::cout << "------------------" << std::endl; } std::cout << "Done. Writing out." << std::endl; std::ofstream out_file(out_file_name, std::ios::out); for (int i = 0; i < m; ++i) { for (int j = 0; j < i; ++j) { out_file << std::scientific << result[symmetric ? unroll_upper_tri(n, j, i) : unroll_full(n, i, j)] << " "; } for (int j = i; j < n; ++j) { out_file << std::scientific << result[symmetric ? unroll_upper_tri(n, i, j) : unroll_full(n, i, j)] << " "; } out_file << std::endl; } out_file.close(); } // End master. else // Begin slaves. { std::vector result(chunk_size, std::numeric_limits::quiet_NaN()); int work[2] = {-2, -2}; while (work[0] != -1) { int i_prev = -1; int j_prev = -1; int i = -1; int j = -1; PD pd_1; PD pd_2; for (int k = work[0]; k < work[1]; ++k) { i = idxs[k].first; j = idxs[k].second; int load_status = 1; if (i != i_prev) { pd_1.clear(); load_status = read_dipha_degree(files_1[i], dim, pd_1); if (load_status) fail(world_rank, std::string("Failed to load file ") + files_1[i] + std::string(".")); pd_1.finitize(finitization); if (discretization > 0) { std::cout << pd_1.size_2() << " --> "; pd_1.discretize(discretization); pd_1.compress_and_sort(); std::cout << pd_1.size_2() << std::endl; } } if (j != j_prev) { pd_2.clear(); load_status = read_dipha_degree(files_2[j], dim, pd_2); if (load_status) fail(world_rank, std::string("Failed to load file ") + files_2[j] + std::string(".")); pd_2.finitize(finitization); if (discretization > 0) { std::cout << pd_2.size_2() << " --> "; pd_2.discretize(discretization); pd_2.compress_and_sort(); std::cout << pd_2.size_2() << std::endl; } } result[k - work[0]] = heat_kernel(sigma, pd_1, pd_2); i_prev = i; j_prev = j; } MPI_Send(result.data(), work[1] - work[0], MPI_DOUBLE, 0, (int)Message_tag::result, MPI_COMM_WORLD); MPI_Recv(work, 2, MPI_INT, 0, (int)Message_tag::work, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } // End slave. MPI_Finalize(); return 0; }