From 4e4a4bfc4ccc3f39288678a7550ff12601d13223 Mon Sep 17 00:00:00 2001 From: Gard Spreemann Date: Tue, 4 Aug 2020 19:51:32 +0200 Subject: Initial commit. --- CMakeLists.txt | 28 +++++ include/misc.hpp | 66 +++++++++++ include/pd.hpp | 126 ++++++++++++++++++++ src/main.cpp | 346 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 566 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 include/misc.hpp create mode 100644 include/pd.hpp create mode 100644 src/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..49f6dd1 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.1.0) + +project(heatkernel_mpi) + +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + add_definitions(-DDEBUG) + remove_definitions(-DNDEBUG) +endif() + +include(TestBigEndian) +test_big_endian(BIG_ENDIAN) +if(BIG_ENDIAN) + add_definitions(-DBIGENDIAN) +endif() + +find_package(MPI REQUIRED) +include_directories(include ${MPI_INCLUDE_PATH}) +link_directories(${MPI_LIBRARIES}) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${MPI_COMPILE_FLAGS} -Wall -pedantic -Wextra -std=c++14") +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${MPI_LINK_FLAGS}") + +add_executable(heatkernel-mpi + src/main.cpp + include/misc.hpp + include/pd.hpp) + +target_link_libraries(heatkernel-mpi ${MPI_LIBRARIES} ${Boost_LIBRARIES}) diff --git a/include/misc.hpp b/include/misc.hpp new file mode 100644 index 0000000..f8afe77 --- /dev/null +++ b/include/misc.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include + +constexpr int64_t DIPHA_MAGIC = 8067171840; +constexpr int64_t DIPHA_DISTANCE_MATRIX = 7; +constexpr int64_t DIPHA_PERSISTENCE_DIAGRAM = 2; +constexpr int64_t DIPHA_WEIGHTED_BOUNDARY_MATRIX = 0; +inline constexpr double PI = 4*std::atan(1); + +template inline void reverse_endianness(T & x) +{ + uint8_t * p = reinterpret_cast(&x); + std::reverse(p, p + sizeof(T)); +} + +template inline T read_le(std::istream & s) +{ + T result; + s.read(reinterpret_cast(&result), sizeof(T)); + #ifdef BIGENDIAN + reverse_endianness(result); + #endif + return result; +} + +template inline void write_le(std::ostream & s, T value) +{ + #ifdef BIGENDIAN + reverse_endianness(value); + #endif + s.write(reinterpret_cast(&value), sizeof(T)); +} + +template inline T read_be(std::istream & s) +{ + T result; + s.read(reinterpret_cast(&result), sizeof(T)); + #ifndef BIGENDIAN + reverse_endianness(result); + #endif + return result; +} + +template inline void write_be(std::ostream & s, T value) +{ + #ifndef BIGENDIAN + reverse_endianness(value); + #endif + s.write(reinterpret_cast(&value), sizeof(T)); +} + +enum class Message_tag : int { result, work }; + +template inline T unroll_upper_tri(T n, T i, T j) +{ + return i*(n-1) - ((i-1)*i)/2 + j; +} + +template inline T unroll_full(T n, T i, T j) +{ + return i*n + j; +} diff --git a/include/pd.hpp b/include/pd.hpp new file mode 100644 index 0000000..04b8609 --- /dev/null +++ b/include/pd.hpp @@ -0,0 +1,126 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "misc.hpp" + +template class PD +{ +public: + class Interval + { + public: + T birth; + T death; + }; + + + PD() : intervals() + { + }; + + void add(T b, T d, unsigned int m) + { + if (m > 0 && b < d) + { + Interval interval; + interval.birth = b; + interval.death = d; + intervals.push_back(std::make_pair(interval, m)); + } + } + + void clear() { intervals.clear(); } + + void finitize(T scale) + { + for (auto it = intervals.begin(); it != intervals.end(); ++it) + { + if (std::isinf(it->first.death)) + it->first.death = scale; + } + } + + typename std::vector::Interval, unsigned int> >::size_type size() const { return intervals.size(); } + + using Iterator = typename std::vector::Interval, unsigned int> >::const_iterator; + + inline Iterator cbegin() const { return intervals.cbegin(); } + inline Iterator cend() const { return intervals.cend(); } + +private: + std::vector::Interval, unsigned int> > intervals; +}; + +inline double sqdist(PD::Interval x, PD::Interval y) +{ + return (x.birth - y.birth)*(x.birth - y.birth) + (x.death - y.death)*(x.death - y.death); +} + +template T heat_kernel(T sigma, const PD & a, const PD & b) +{ + T ret = 0; + + for (auto it = a.cbegin(); it != a.cend(); ++it) + { + auto x = *it; + typename PD::Interval xbar; + xbar.birth = x.first.death; + xbar.death = x.first.birth; + + for (auto jt = b.cbegin(); jt != b.cend(); ++jt) + { + auto y = *jt; + ret += x.second*y.second*(std::exp(-sqdist(x.first, y.first)/(8*sigma)) - std::exp(-sqdist(xbar, y.first)/(8*sigma))); + } + } + + ret /= 8*sigma*PI; + + return ret; +} + +int read_dipha_degree(const std::string & fname, unsigned int degree, PD & pd) +{ + std::ifstream f; + f.open(fname, std::ios::in | std::ios::binary); + if (!f.is_open()) + { + std::cerr << "Failed to open " << fname << ". Bailing." << std::endl; + return 1; + } + + if (read_le(f) != DIPHA_MAGIC) + { + std::cerr << "File " << fname << " is not DIPHA file. Bailing." << std::endl; + f.close(); + return 1; + } + + if (read_le(f) != DIPHA_PERSISTENCE_DIAGRAM) + { + std::cerr << "File " << fname << " is not persistence diagram. Bailing." << std::endl; + f.close(); + return 1; + } + + int64_t n = read_le(f); + + for (int64_t i = 0; i < n; ++i) + { + int64_t d = read_le(f); + double birth = read_le(f); + double death = read_le(f); + if (d == degree) + pd.add(birth, death, 1); + else if (-d - 1 == degree) + pd.add(birth, std::numeric_limits::infinity(), 1); + } + + f.close(); + return 0; +} diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..448ae5a --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,346 @@ +#include +#include +#include +#include + +#include + +#include "misc.hpp" +#include "pd.hpp" + +void print_help(const std::string & invocation) +{ + std::cout << "Usage: " << invocation << " arguments" << std::endl; + std::cout << "Arguments:" << std::endl; + std::cout << " --in-lists, -i file_1 file_2" << std::endl; + std::cout << " Mandatory. Files containing lists of persistence diagram files to process, one file per line." << std::endl; + std::cout << " If the same file name is given twice, the computation will exploit symmetry." << std::endl; + std::cout << " --dimension, -d dim" << std::endl; + std::cout << " Mandatory integer. Degree to read from the persistence diagrams." << std::endl; + std::cout << " --sigma, -d s" << std::endl; + std::cout << " Mandatory decimal. σ parameter in heat kernel." << std::endl; + std::cout << " --finitize, -f x" << std::endl; + std::cout << " Optional decimal. Finitize all infinite intervals to scale x. If not given (default), infinite intervals are ignored." << std::endl; + std::cout << " --out, -o file" << std::endl; + std::cout << " Mandatory output file name. Plain text." << std::endl; + std::cout << " --chunk, -c c" << std::endl; + std::cout << " Optional integer. Size of work chunk to send off to each computational node. Too small a value yields a lot of overhead, too large a value can cause an unbalanced load. Increase if there are many small computations. Default: 10." << std::endl; + std::cout << " --help, -h" << std::endl; + std::cout << " Print this help text." << std::endl; +} + +void fail(int rank, const std::string & message) +{ + if (rank == 0) + { + std::cout << message << std::endl; + } + MPI_Finalize(); + exit(1); +} + +void arg_fail(const std::string & invocation, int rank, const std::string & message) +{ + if (rank == 0) + { + std::cout << message << std::endl; + print_help(invocation); + } + MPI_Finalize(); + exit(1); +} + +int main(int argc, char ** argv) +{ + MPI_Init(NULL, NULL); + + std::string invocation(argv[0]); + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + + if (world_size < 2) + { + fail(world_rank, "Currently there is no support for running with just one process. Please run at least 2 MPI jobs."); + } + + char processor_name_tmp[MPI_MAX_PROCESSOR_NAME]; + int processor_name_len; + MPI_Get_processor_name(processor_name_tmp, &processor_name_len); + std::string processor_name(processor_name_tmp); + + // Begin processing args. + std::string list_1_file_name; + std::string list_2_file_name; + int dim = -1; + double sigma = std::numeric_limits::quiet_NaN(); + double finitization = std::numeric_limits::infinity(); + std::string out_file_name; + int chunk_size = 10; + + for (int i = 1; i < argc; ++i) + { + std::string arg(argv[i]); + if (arg == std::string("--in-lists") || arg == std::string("-i")) + { + if (i < argc - 2) + { + list_1_file_name = std::string(argv[++i]); + list_2_file_name = std::string(argv[++i]); + } + else + arg_fail(invocation, world_rank, "Need two arguments for --in-lists."); + } + else if (arg == std::string("--dimension") || arg == std::string("-d")) + { + if (i < argc - 1) + dim = std::atoi(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --dimension."); + } + else if (arg == std::string("--sigma") || arg == std::string("-s")) + { + if (i < argc - 1) + sigma = std::atof(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --sigma."); + } + else if (arg == std::string("--finitize") || arg == std::string("-f")) + { + if (i < argc - 1) + finitization = std::atof(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --finitize."); + } + else if (arg == std::string("--out") || arg == std::string("-o")) + { + if (i < argc - 1) + out_file_name = std::string(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --out."); + } + else if (arg == std::string("--chunk") || arg == std::string("-c")) + { + if (i < argc - 1) + chunk_size = std::atoi(argv[++i]); + else + arg_fail(invocation, world_rank, "Missing argument for --chunk."); + } + else if (arg == std::string("--help") || arg == std::string("-h")) + { + if (world_rank == 0) + print_help(invocation); + MPI_Finalize(); + exit(0); + } + else + { + arg_fail(invocation, world_rank, "Incorrect argument."); + } + } + // End processing args. + + // Begin argument validation. + if (list_1_file_name == std::string("") || list_2_file_name == std::string("")) + arg_fail(invocation, world_rank, "Need input file lists."); + if (dim < 0) + arg_fail(invocation, world_rank, "Dimension must be non-negative."); + if (sigma <= 0) + arg_fail(invocation, world_rank, "σ must be positive."); + if (finitization <= 0) + arg_fail(invocation, world_rank, "Finitization must be positive."); + if (out_file_name == std::string("")) + arg_fail(invocation, world_rank, "Need output file."); + if (chunk_size <= 0) + arg_fail(invocation, world_rank, "Chunk size must be positive."); + // End argument validation. + + bool symmetric = list_1_file_name == list_2_file_name; + + std::vector files_1; + std::vector files_2; + + std::ifstream list_file(list_1_file_name, std::ios::in); + if (!list_file.is_open()) + { + fail(world_rank, std::string("Failed to load diagram list file ") + list_1_file_name + std::string(".")); + } + std::string line; + while (std::getline(list_file, line)) + { + files_1.push_back(line); + } + list_file.close(); + + if (symmetric) + { + files_2 = files_1; + } + else + { + list_file.open(list_2_file_name, std::ios::in); + if (!list_file.is_open()) + { + fail(world_rank, std::string("Failed to load diagram list file ") + list_2_file_name + std::string(".")); + } + while (std::getline(list_file, line)) + { + files_2.push_back(line); + } + list_file.close(); + } + + int m = files_1.size(); + int n = files_2.size(); + + std::vector> idxs; + if (symmetric) + { + for (int i = 0; i < m; ++i) + { + for (int j = i; j < n; ++j) + { + idxs.push_back(std::make_pair(i, j)); + } + } + } + else + { + for (int i = 0; i < m; ++i) + { + for (int j = 0; j < n; ++j) + { + idxs.push_back(std::make_pair(i, j)); + } + } + } + + // Begin master. + if (world_rank == 0) + { + std::vector result(idxs.size(), std::numeric_limits::quiet_NaN()); + + double unused_buf = std::numeric_limits::quiet_NaN(); + std::vector result_reqs(world_size); // Element zero not used, index by actual rank. + + int done = 0; + int next_chunk = 0; + + std::vector> assigned(world_size, std::make_pair(-2, -2)); + + std::cout << "Total things to compute: " << idxs.size() << std::endl; + + for (int r = 1; r < world_size; ++r) + { + MPI_Irecv(&unused_buf, 0, MPI_DOUBLE, r, (int)Message_tag::result, MPI_COMM_WORLD, &(result_reqs[r])); + } + + while ((size_t) done < idxs.size()) + { + int respondent_index = -1; + MPI_Status status; + MPI_Waitany(world_size - 1, &(result_reqs[1]), &respondent_index, &status); + int r = 1 + respondent_index; + + std::cout << "Heard back from rank " << r << "." << std::endl; + int recv_count = -1; + MPI_Get_count(&status, MPI_DOUBLE, &recv_count); + std::cout << "Received " << recv_count << " elements from rank " << r << "." << std::endl; + + assert(recv_count == assigned[r].second - assigned[r].first); + done += recv_count; + + int work[2] = {-1, -1}; + if ((size_t)next_chunk*chunk_size < idxs.size()) + { + work[0] = next_chunk*chunk_size; + work[1] = std::min((int)idxs.size(), (next_chunk + 1)*chunk_size); + MPI_Irecv(&(result[work[0]]), work[1] - work[0], MPI_DOUBLE, r, (int)Message_tag::result, MPI_COMM_WORLD, &(result_reqs[r])); + std::cout << "Rank will get new work." << std::endl; + } + else + std::cout << "Rank will terminate." << std::endl; + + assigned[r] = std::make_pair(work[0], work[1]); + ++next_chunk; + MPI_Send(work, 2, MPI_INT, r, (int)Message_tag::work, MPI_COMM_WORLD); + std::cout << 100*(double)done/(double)idxs.size() << "% complete." << std::endl; + std::cout << "------------------" << std::endl; + } + std::cout << "Done. Writing out." << std::endl; + std::ofstream out_file(out_file_name, std::ios::out); + for (int i = 0; i < m; ++i) + { + for (int j = 0; j < i; ++j) + { + out_file << std::scientific << result[symmetric ? unroll_upper_tri(n, j, i) : unroll_full(n, i, j)] << " "; + } + for (int j = i; j < n; ++j) + { + out_file << std::scientific << result[symmetric ? unroll_upper_tri(n, i, j) : unroll_full(n, i, j)] << " "; + } + out_file << std::endl; + } + out_file.close(); + } // End master. + + else // Begin slaves. + { + std::vector result(chunk_size, std::numeric_limits::quiet_NaN()); + int work[2] = {-2, -2}; + + while (work[0] != -1) + { + int i_prev = -1; + int j_prev = -1; + int i = -1; + int j = -1; + + PD pd_1; + PD pd_2; + + for (int k = work[0]; k < work[1]; ++k) + { + i = idxs[k].first; + j = idxs[k].second; + + int load_status = 1; + + if (i != i_prev) + { + pd_1.clear(); + load_status = read_dipha_degree(files_1[i], dim, pd_1); + + if (load_status) + fail(world_rank, std::string("Failed to load file ") + files_1[i] + std::string(".")); + + pd_1.finitize(finitization); + } + + if (j != j_prev) + { + pd_2.clear(); + load_status = read_dipha_degree(files_2[j], dim, pd_2); + + if (load_status) + fail(world_rank, std::string("Failed to load file ") + files_2[j] + std::string(".")); + + pd_2.finitize(finitization); + } + + result[k - work[0]] = heat_kernel(sigma, pd_1, pd_2); + i_prev = i; + j_prev = j; + } + + MPI_Send(result.data(), work[1] - work[0], MPI_DOUBLE, 0, (int)Message_tag::result, MPI_COMM_WORLD); + MPI_Recv(work, 2, MPI_INT, 0, (int)Message_tag::work, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + } + // End slave. + + MPI_Finalize(); + return 0; +} -- cgit v1.2.3