From 288eb3a063a95c9487997976061f753f3ed319a3 Mon Sep 17 00:00:00 2001 From: Gard Spreemann Date: Thu, 31 May 2018 19:49:57 +0200 Subject: MPI-based version for computing many distances in parallel. --- geom_matching/wasserstein/mpi/CMakeLists.txt | 36 +++ geom_matching/wasserstein/mpi/debug.hpp | 8 + geom_matching/wasserstein/mpi/main.cpp | 386 +++++++++++++++++++++++++++ 3 files changed, 430 insertions(+) create mode 100644 geom_matching/wasserstein/mpi/CMakeLists.txt create mode 100644 geom_matching/wasserstein/mpi/debug.hpp create mode 100644 geom_matching/wasserstein/mpi/main.cpp diff --git a/geom_matching/wasserstein/mpi/CMakeLists.txt b/geom_matching/wasserstein/mpi/CMakeLists.txt new file mode 100644 index 0000000..10a77bf --- /dev/null +++ b/geom_matching/wasserstein/mpi/CMakeLists.txt @@ -0,0 +1,36 @@ +cmake_minimum_required(VERSION 2.8.8) + +project(hera_mpi) + +option(SILENCE "Turn off some warnings." OFF) + +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + add_definitions(-DDEBUG) + remove_definitions(-DNDEBUG) +endif() + +if(SILENCE) + message(WARNING "Silence flag currently ignored.") + add_definitions(-DSILENCE) +endif(SILENCE) + +include(TestBigEndian) +test_big_endian(BIG_ENDIAN) +if(BIG_ENDIAN) + add_definitions(-DBIGENDIAN) +endif() + +find_package(MPI REQUIRED) +include_directories(${MPI_INCLUDE_PATH}) +link_directories(${MPI_LIBRARIES}) + +#find_package(Boost REQUIRED) # Should set version and specific components. +#include_directories(${Boost_INCLUDE_DIRS}) + +include_directories("../include") + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${MPI_COMPILE_FLAGS} -Wall -pedantic -Wextra -std=c++14") +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${MPI_LINK_FLAGS}") + +add_executable(hera_mpi main.cpp) +target_link_libraries(hera_mpi ${MPI_LIBRARIES}) diff --git a/geom_matching/wasserstein/mpi/debug.hpp b/geom_matching/wasserstein/mpi/debug.hpp new file mode 100644 index 0000000..a74a1d8 --- /dev/null +++ b/geom_matching/wasserstein/mpi/debug.hpp @@ -0,0 +1,8 @@ +#pragma once + +#ifdef DEBUG +#include +#define IFDEBUG(x) do { x } while (false) +#else +#define IFDEBUG(x) +#endif diff --git a/geom_matching/wasserstein/mpi/main.cpp b/geom_matching/wasserstein/mpi/main.cpp new file mode 100644 index 0000000..fa8c62b --- /dev/null +++ b/geom_matching/wasserstein/mpi/main.cpp @@ -0,0 +1,386 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "debug.hpp" +#include "wasserstein.h" + +void print_help(const std::string & invocation) +{ + std::cout << "Usage: " << invocation << " arguments" << std::endl; + std::cout << "Arguments:" << std::endl; + std::cout << " --in-list, -i file" << std::endl; + std::cout << " Mandatory. File containing a list of persistence diagram files to process, one file per line." << std::endl; + std::cout << " --in-type, -t type" << std::endl; + std::cout << " Optional (defaults to dipha). Input file format, dipha|txt." << std::endl; + std::cout << " --dimension, -d dim" << std::endl; + std::cout << " Mandatory if the input is a DIPHA persistence diagram file. Ignored otherwise. Integer." << std::endl; + std::cout << " --outer-norm, -p p" << std::endl; + std::cout << " Mandatory. Floating point. Outer norm. In the interval [1, ∞)." << std::endl; + std::cout << " --inner-norm, -q q" << std::endl; + std::cout << " Mandatory. Floating point. Inner norm. In the interval [1, ∞]. Use inf for infinity." << std::endl; + std::cout << " --error, -e e" << std::endl; + std::cout << " Mandatory. Relative error. Positive floating point." << std::endl; + std::cout << " --finitize, -f f" << std::endl; + std::cout << " Optional. Make infinite intervals die at f." << std::endl; + std::cout << " --out, -o file" << std::endl; + std::cout << " Mandatory. Output file name. Plain text." << std::endl; + std::cout << " --chunk, -c c" << std::endl; + std::cout << " Optional. Size of work chunk to send off to each computational node. Too small a value yields a lot of overhead, too large a value can cause an unbalanced load. Increase if there are many small computations. Default: 100." << std::endl; +} + +void arg_fail(const std::string & invocation, int rank, const std::string & message) +{ + if (rank == 0) + { + std::cout << message << std::endl; + print_help(invocation); + } + MPI_Finalize(); + exit(1); +} + +void fail(int rank, const std::string & message) +{ + if (rank == 0) + { + std::cout << message << std::endl; + } + MPI_Finalize(); + exit(1); +} + +inline int unroll(int n, int i, int j) +{ + IFDEBUG( + assert(i < n); + assert(j < n); + assert(n > 0); + assert(i < j); + assert(i >= 0); + assert(j >= 0); + ); + return i*n + j - (i*(i+1))/2 - i - 1; +} + +enum Message_tag { tag_result, tag_work }; +enum File_type { file_type_dipha, file_type_txt }; + +int main(int argc, char ** argv) +{ + MPI_Init(NULL, NULL); + + std::string invocation(argv[0]); + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + + if (world_size < 2) + { + arg_fail(invocation, world_rank, "Currently there is no support for running with just one process."); + } + + char processor_name_tmp[MPI_MAX_PROCESSOR_NAME]; + int name_len; + MPI_Get_processor_name(processor_name_tmp, &name_len); + std::string processor_name(processor_name_tmp); + + std::string list_file_name; + std::string out_file_name; + int dim = -1; + int in_type = file_type_dipha; + double finitization = std::numeric_limits::infinity(); + int chunk_size = 100; + hera::AuctionParams params; + params.wasserstein_power = std::numeric_limits::quiet_NaN(); + params.delta = std::numeric_limits::quiet_NaN(); + params.internal_p = std::numeric_limits::quiet_NaN(); + params.initial_epsilon = 0.0; // Default value taken from upstream example code. + params.epsilon_common_ratio = 0.0; // Default value taken from upstream example code. + params.max_bids_per_round = std::numeric_limits::max(); // Default value taken from upstream example code. + params.gamma_threshold = 0.0; // Default value taken from upstream example code. + params.max_num_phases = 800; // Default value taken from upstream example code. + + for (int i = 1; i < argc; ++i) + { + std::string arg(argv[i]); + if (arg == std::string("--in-list") || arg == std::string("-i")) + { + if (i < argc - 1) + list_file_name = std::string(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --in-list."); + } + else if (arg == std::string("--in-type") || arg == std::string("-t")) + { + if (i < argc - 1) + { + std::string argnext(argv[++i]); + if (argnext == std::string("txt")) + in_type = file_type_txt; + else if (argnext == std::string("dipha")) + in_type = file_type_dipha; + else + arg_fail(invocation, world_rank, "Invalid argument for --in-type."); + } + else + arg_fail(invocation, world_rank, "Missing argument for --in-type."); + } + else if (arg == std::string("--dimension") || arg == std::string("-d")) + { + if (i < argc - 1) + dim = std::atoi(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --dimension."); + } + else if (arg == std::string("--outer-norm") || arg == std::string("-p")) + { + if (i < argc - 1) + params.wasserstein_power = std::atof(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --outer-norm."); + } + else if (arg == std::string("--inner-norm") || arg == std::string("-q")) + { + if (i < argc - 1) + { + std::string argnext(argv[++i]); + if (argnext == "inf") + params.internal_p = -1; + else + params.internal_p = std::stod(argnext); + } + else + arg_fail(invocation, world_rank, "Missing argument for --inner-norm."); + } + else if (arg == std::string("--error") || arg == std::string("-e")) + { + if (i < argc - 1) + params.delta = std::atof(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --error."); + } + else if (arg == std::string("--finitize") || arg == std::string("-f")) + { + if (i < argc - 1) + finitization = std::atof(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --finitize."); + } + else if (arg == std::string("--out") || arg == std::string("-o")) + { + if (i < argc - 1) + out_file_name = std::string(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --out."); + } + else if (arg == std::string("--chunk") || arg == std::string("-c")) + { + if (i < argc - 1) + chunk_size = std::atoi(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --chunk."); + } + + } + + + // Argument validation. + if (list_file_name == std::string("")) + arg_fail(invocation, world_rank, "Need input file list."); + if (dim < 0) + arg_fail(invocation, world_rank, "Dimension must be non-negative."); + if (params.wasserstein_power < 1 || !std::isfinite(params.wasserstein_power)) + arg_fail(invocation, world_rank, "Outer norm power must be in the interval [1, ∞)."); + if (params.internal_p < 1 || std::isnan(params.internal_p)) + arg_fail(invocation, world_rank, "Inner norm power must be in ther interval [1, ∞]."); + if (params.delta <= 0 || !std::isfinite(params.delta)) + arg_fail(invocation, world_rank, "Error must be positive."); + if (out_file_name == std::string("")) + arg_fail(invocation, world_rank, "Need output file."); + if (chunk_size <= 0) + arg_fail(invocation, world_rank, "Chunk size must be positive."); + + std::vector files; + + std::ifstream list_file(list_file_name, std::ios::in); + std::string line; + while (std::getline(list_file, line)) + { + files.push_back(line); + } + list_file.close(); + + int n = files.size(); + + std::vector> idxs((n*(n-1))/2); + int k = 0; + for (int i = 0; i < n; ++i) + { + for (int j = i+1; j < n; ++j) + { + idxs[k++] = std::make_pair(i, j); + } + } + + std::vector results; + if (world_rank == 0) + { + results.resize(idxs.size(), std::numeric_limits::quiet_NaN()); + double unused_buf = std::numeric_limits::quiet_NaN(); + std::vector result_reqs(world_size); // Element zero not used, index by actual rank. + int done = 0; + int next_chunk = 0; + std::vector> assigned(world_size, std::make_pair(-2, -2)); + + std::cout << "Total things to compute: " << idxs.size() << std::endl; + + for (int r = 1; r < world_size; ++r) + { + MPI_Irecv(&unused_buf, 0, MPI_DOUBLE, r, tag_result, MPI_COMM_WORLD, &(result_reqs[r])); + } + + while ((size_t)done < idxs.size()) + { + int respondent_index = -1; + MPI_Status status; + MPI_Waitany(world_size - 1, &(result_reqs[1]), &respondent_index, &status); + int r = 1 + respondent_index; + + std::cout << "Heard back from rank " << r << "." << std::endl; + int recv_count = -1; + MPI_Get_count(&status, MPI_DOUBLE, &recv_count); + std::cout << "Received " << recv_count << " elements from rank " << r << "." << std::endl; + + assert(recv_count == assigned[r].second - assigned[r].first); + done += recv_count; + + int work[2] = {-1, -1}; + if ((size_t)next_chunk*chunk_size < idxs.size()) + { + work[0] = next_chunk*chunk_size; + work[1] = std::min((int)idxs.size(), (next_chunk + 1)*chunk_size); + MPI_Irecv(&(results[work[0]]), work[1] - work[0], MPI_DOUBLE, r, tag_result, MPI_COMM_WORLD, &(result_reqs[r])); + std::cout << "Rank will get new work." << std::endl; + } + else + std::cout << "Rank will terminate." << std::endl; + + assigned[r] = std::make_pair(work[0], work[1]); + ++next_chunk; + MPI_Send(work, 2, MPI_INT, r, tag_work, MPI_COMM_WORLD); + std::cout << 100*(double)done/(double)idxs.size() << "% complete." << std::endl; + std::cout << "------------------" << std::endl; + } + } + else // Slaves + { + results.resize(chunk_size, std::numeric_limits::quiet_NaN()); + int work[2] = {-2, -2}; + + while (work[0] != -1) + { + int i_prev = -1; + int j_prev = -1; + int i = -1; + int j = -1; + + std::vector > pd_1; + std::vector > pd_2; + + for (int k = work[0]; k < work[1]; ++k) + { + i = idxs[k].first; + j = idxs[k].second; + + bool load_success = false; + + if (i != i_prev) + { + pd_1.clear(); + if (in_type == file_type_dipha) + { + load_success = hera::read_diagram_dipha > >(files[i], dim, pd_1); + } + else if (in_type == file_type_txt) + { + load_success = hera::read_diagram_point_set > >(files[i], pd_1); + } + else + { + fail(world_rank, "Boo"); + } + + if (!load_success) + fail(world_rank, std::string("Failed to load file ") + files[i] + std::string(".")); + + hera::finitize(finitization, pd_1); + } + + if (j != j_prev) + { + pd_2.clear(); + if (in_type == file_type_dipha) + { + load_success = hera::read_diagram_dipha > >(files[j], dim, pd_2); + } + else if (in_type == file_type_txt) + { + load_success = hera::read_diagram_point_set > >(files[j], pd_2); + } + else + { + fail(world_rank, "Boo"); + } + + if (!load_success) + fail(world_rank, std::string("Failed to load file ") + files[i] + std::string(".")); + + hera::finitize(finitization, pd_2); + } + + std::string fixme(""); + results[k - work[0]] = hera::wasserstein_dist(pd_1, pd_2, params, fixme); + + i_prev = i; + j_prev = j; + } + + MPI_Send(results.data(), work[1] - work[0], MPI_DOUBLE, 0, tag_result, MPI_COMM_WORLD); + MPI_Recv(work, 2, MPI_INT, 0, tag_work, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + + } + + + if (world_rank == 0) + { + std::cout << "Writing out." << std::endl; + std::ofstream out_file(out_file_name, std::ios::out); + for (int i = 0; i < n; ++i) + { + for (int j = 0; j < i; ++j) + { + out_file << std::scientific << results[unroll(n, j, i)] << " "; + } + out_file << "0 "; + for (int j = i + 1; j < n; ++j) + { + out_file << std::scientific << results[unroll(n, i, j)] << " "; + } + out_file << std::endl; + } + out_file.close(); + } + + + MPI_Finalize(); +} -- cgit v1.2.3