summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGard Spreemann <gspr@nonempty.org>2020-08-04 19:51:32 +0200
committerGard Spreemann <gspr@nonempty.org>2020-08-04 19:51:32 +0200
commit4e4a4bfc4ccc3f39288678a7550ff12601d13223 (patch)
tree291a0757e933188f0faf316c4f37cb5ef4f32f84
Initial commit.
-rw-r--r--CMakeLists.txt28
-rw-r--r--include/misc.hpp66
-rw-r--r--include/pd.hpp126
-rw-r--r--src/main.cpp346
4 files changed, 566 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..49f6dd1
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,28 @@
+cmake_minimum_required(VERSION 3.1.0)
+
+project(heatkernel_mpi)
+
+if(CMAKE_BUILD_TYPE STREQUAL "Debug")
+ add_definitions(-DDEBUG)
+ remove_definitions(-DNDEBUG)
+endif()
+
+include(TestBigEndian)
+test_big_endian(BIG_ENDIAN)
+if(BIG_ENDIAN)
+ add_definitions(-DBIGENDIAN)
+endif()
+
+find_package(MPI REQUIRED)
+include_directories(include ${MPI_INCLUDE_PATH})
+link_directories(${MPI_LIBRARIES})
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${MPI_COMPILE_FLAGS} -Wall -pedantic -Wextra -std=c++14")
+set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${MPI_LINK_FLAGS}")
+
+add_executable(heatkernel-mpi
+ src/main.cpp
+ include/misc.hpp
+ include/pd.hpp)
+
+target_link_libraries(heatkernel-mpi ${MPI_LIBRARIES} ${Boost_LIBRARIES})
diff --git a/include/misc.hpp b/include/misc.hpp
new file mode 100644
index 0000000..f8afe77
--- /dev/null
+++ b/include/misc.hpp
@@ -0,0 +1,66 @@
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <cstdint>
+#include <sstream>
+
+constexpr int64_t DIPHA_MAGIC = 8067171840;
+constexpr int64_t DIPHA_DISTANCE_MATRIX = 7;
+constexpr int64_t DIPHA_PERSISTENCE_DIAGRAM = 2;
+constexpr int64_t DIPHA_WEIGHTED_BOUNDARY_MATRIX = 0;
+inline constexpr double PI = 4*std::atan(1);
+
+template <typename T> inline void reverse_endianness(T & x)
+{
+ uint8_t * p = reinterpret_cast<uint8_t *>(&x);
+ std::reverse(p, p + sizeof(T));
+}
+
+template <typename T> inline T read_le(std::istream & s)
+{
+ T result;
+ s.read(reinterpret_cast<char *>(&result), sizeof(T));
+ #ifdef BIGENDIAN
+ reverse_endianness(result);
+ #endif
+ return result;
+}
+
+template <typename T> inline void write_le(std::ostream & s, T value)
+{
+ #ifdef BIGENDIAN
+ reverse_endianness(value);
+ #endif
+ s.write(reinterpret_cast<char *>(&value), sizeof(T));
+}
+
+template <typename T> inline T read_be(std::istream & s)
+{
+ T result;
+ s.read(reinterpret_cast<char *>(&result), sizeof(T));
+ #ifndef BIGENDIAN
+ reverse_endianness(result);
+ #endif
+ return result;
+}
+
+template <typename T> inline void write_be(std::ostream & s, T value)
+{
+ #ifndef BIGENDIAN
+ reverse_endianness(value);
+ #endif
+ s.write(reinterpret_cast<char *>(&value), sizeof(T));
+}
+
+enum class Message_tag : int { result, work };
+
+template <typename T> inline T unroll_upper_tri(T n, T i, T j)
+{
+ return i*(n-1) - ((i-1)*i)/2 + j;
+}
+
+template <typename T> inline T unroll_full(T n, T i, T j)
+{
+ return i*n + j;
+}
diff --git a/include/pd.hpp b/include/pd.hpp
new file mode 100644
index 0000000..04b8609
--- /dev/null
+++ b/include/pd.hpp
@@ -0,0 +1,126 @@
+#pragma once
+
+#include <cstdint>
+#include <fstream>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "misc.hpp"
+
+template <typename T> class PD
+{
+public:
+ class Interval
+ {
+ public:
+ T birth;
+ T death;
+ };
+
+
+ PD() : intervals()
+ {
+ };
+
+ void add(T b, T d, unsigned int m)
+ {
+ if (m > 0 && b < d)
+ {
+ Interval interval;
+ interval.birth = b;
+ interval.death = d;
+ intervals.push_back(std::make_pair(interval, m));
+ }
+ }
+
+ void clear() { intervals.clear(); }
+
+ void finitize(T scale)
+ {
+ for (auto it = intervals.begin(); it != intervals.end(); ++it)
+ {
+ if (std::isinf(it->first.death))
+ it->first.death = scale;
+ }
+ }
+
+ typename std::vector<std::pair<PD<T>::Interval, unsigned int> >::size_type size() const { return intervals.size(); }
+
+ using Iterator = typename std::vector<std::pair<PD<T>::Interval, unsigned int> >::const_iterator;
+
+ inline Iterator cbegin() const { return intervals.cbegin(); }
+ inline Iterator cend() const { return intervals.cend(); }
+
+private:
+ std::vector<std::pair<PD<T>::Interval, unsigned int> > intervals;
+};
+
+inline double sqdist(PD<double>::Interval x, PD<double>::Interval y)
+{
+ return (x.birth - y.birth)*(x.birth - y.birth) + (x.death - y.death)*(x.death - y.death);
+}
+
+template <typename T> T heat_kernel(T sigma, const PD<T> & a, const PD<T> & b)
+{
+ T ret = 0;
+
+ for (auto it = a.cbegin(); it != a.cend(); ++it)
+ {
+ auto x = *it;
+ typename PD<T>::Interval xbar;
+ xbar.birth = x.first.death;
+ xbar.death = x.first.birth;
+
+ for (auto jt = b.cbegin(); jt != b.cend(); ++jt)
+ {
+ auto y = *jt;
+ ret += x.second*y.second*(std::exp(-sqdist(x.first, y.first)/(8*sigma)) - std::exp(-sqdist(xbar, y.first)/(8*sigma)));
+ }
+ }
+
+ ret /= 8*sigma*PI;
+
+ return ret;
+}
+
+int read_dipha_degree(const std::string & fname, unsigned int degree, PD<double> & pd)
+{
+ std::ifstream f;
+ f.open(fname, std::ios::in | std::ios::binary);
+ if (!f.is_open())
+ {
+ std::cerr << "Failed to open " << fname << ". Bailing." << std::endl;
+ return 1;
+ }
+
+ if (read_le<int64_t>(f) != DIPHA_MAGIC)
+ {
+ std::cerr << "File " << fname << " is not DIPHA file. Bailing." << std::endl;
+ f.close();
+ return 1;
+ }
+
+ if (read_le<int64_t>(f) != DIPHA_PERSISTENCE_DIAGRAM)
+ {
+ std::cerr << "File " << fname << " is not persistence diagram. Bailing." << std::endl;
+ f.close();
+ return 1;
+ }
+
+ int64_t n = read_le<int64_t>(f);
+
+ for (int64_t i = 0; i < n; ++i)
+ {
+ int64_t d = read_le<int64_t>(f);
+ double birth = read_le<double>(f);
+ double death = read_le<double>(f);
+ if (d == degree)
+ pd.add(birth, death, 1);
+ else if (-d - 1 == degree)
+ pd.add(birth, std::numeric_limits<double>::infinity(), 1);
+ }
+
+ f.close();
+ return 0;
+}
diff --git a/src/main.cpp b/src/main.cpp
new file mode 100644
index 0000000..448ae5a
--- /dev/null
+++ b/src/main.cpp
@@ -0,0 +1,346 @@
+#include <cassert>
+#include <fstream>
+#include <iostream>
+#include <limits>
+
+#include <mpi.h>
+
+#include "misc.hpp"
+#include "pd.hpp"
+
+void print_help(const std::string & invocation)
+{
+ std::cout << "Usage: " << invocation << " arguments" << std::endl;
+ std::cout << "Arguments:" << std::endl;
+ std::cout << " --in-lists, -i file_1 file_2" << std::endl;
+ std::cout << " Mandatory. Files containing lists of persistence diagram files to process, one file per line." << std::endl;
+ std::cout << " If the same file name is given twice, the computation will exploit symmetry." << std::endl;
+ std::cout << " --dimension, -d dim" << std::endl;
+ std::cout << " Mandatory integer. Degree to read from the persistence diagrams." << std::endl;
+ std::cout << " --sigma, -d s" << std::endl;
+ std::cout << " Mandatory decimal. σ parameter in heat kernel." << std::endl;
+ std::cout << " --finitize, -f x" << std::endl;
+ std::cout << " Optional decimal. Finitize all infinite intervals to scale x. If not given (default), infinite intervals are ignored." << std::endl;
+ std::cout << " --out, -o file" << std::endl;
+ std::cout << " Mandatory output file name. Plain text." << std::endl;
+ std::cout << " --chunk, -c c" << std::endl;
+ std::cout << " Optional integer. Size of work chunk to send off to each computational node. Too small a value yields a lot of overhead, too large a value can cause an unbalanced load. Increase if there are many small computations. Default: 10." << std::endl;
+ std::cout << " --help, -h" << std::endl;
+ std::cout << " Print this help text." << std::endl;
+}
+
+void fail(int rank, const std::string & message)
+{
+ if (rank == 0)
+ {
+ std::cout << message << std::endl;
+ }
+ MPI_Finalize();
+ exit(1);
+}
+
+void arg_fail(const std::string & invocation, int rank, const std::string & message)
+{
+ if (rank == 0)
+ {
+ std::cout << message << std::endl;
+ print_help(invocation);
+ }
+ MPI_Finalize();
+ exit(1);
+}
+
+int main(int argc, char ** argv)
+{
+ MPI_Init(NULL, NULL);
+
+ std::string invocation(argv[0]);
+
+ int world_size;
+ MPI_Comm_size(MPI_COMM_WORLD, &world_size);
+ int world_rank;
+ MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
+
+ if (world_size < 2)
+ {
+ fail(world_rank, "Currently there is no support for running with just one process. Please run at least 2 MPI jobs.");
+ }
+
+ char processor_name_tmp[MPI_MAX_PROCESSOR_NAME];
+ int processor_name_len;
+ MPI_Get_processor_name(processor_name_tmp, &processor_name_len);
+ std::string processor_name(processor_name_tmp);
+
+ // Begin processing args.
+ std::string list_1_file_name;
+ std::string list_2_file_name;
+ int dim = -1;
+ double sigma = std::numeric_limits<double>::quiet_NaN();
+ double finitization = std::numeric_limits<double>::infinity();
+ std::string out_file_name;
+ int chunk_size = 10;
+
+ for (int i = 1; i < argc; ++i)
+ {
+ std::string arg(argv[i]);
+ if (arg == std::string("--in-lists") || arg == std::string("-i"))
+ {
+ if (i < argc - 2)
+ {
+ list_1_file_name = std::string(argv[++i]);
+ list_2_file_name = std::string(argv[++i]);
+ }
+ else
+ arg_fail(invocation, world_rank, "Need two arguments for --in-lists.");
+ }
+ else if (arg == std::string("--dimension") || arg == std::string("-d"))
+ {
+ if (i < argc - 1)
+ dim = std::atoi(argv[++i]);
+ else
+ arg_fail(invocation, world_rank, "Missing argument for --dimension.");
+ }
+ else if (arg == std::string("--sigma") || arg == std::string("-s"))
+ {
+ if (i < argc - 1)
+ sigma = std::atof(argv[++i]);
+ else
+ arg_fail(invocation, world_rank, "Missing argument for --sigma.");
+ }
+ else if (arg == std::string("--finitize") || arg == std::string("-f"))
+ {
+ if (i < argc - 1)
+ finitization = std::atof(argv[++i]);
+ else
+ arg_fail(invocation, world_rank, "Missing argument for --finitize.");
+ }
+ else if (arg == std::string("--out") || arg == std::string("-o"))
+ {
+ if (i < argc - 1)
+ out_file_name = std::string(argv[++i]);
+ else
+ arg_fail(invocation, world_rank, "Missing argument for --out.");
+ }
+ else if (arg == std::string("--chunk") || arg == std::string("-c"))
+ {
+ if (i < argc - 1)
+ chunk_size = std::atoi(argv[++i]);
+ else
+ arg_fail(invocation, world_rank, "Missing argument for --chunk.");
+ }
+ else if (arg == std::string("--help") || arg == std::string("-h"))
+ {
+ if (world_rank == 0)
+ print_help(invocation);
+ MPI_Finalize();
+ exit(0);
+ }
+ else
+ {
+ arg_fail(invocation, world_rank, "Incorrect argument.");
+ }
+ }
+ // End processing args.
+
+ // Begin argument validation.
+ if (list_1_file_name == std::string("") || list_2_file_name == std::string(""))
+ arg_fail(invocation, world_rank, "Need input file lists.");
+ if (dim < 0)
+ arg_fail(invocation, world_rank, "Dimension must be non-negative.");
+ if (sigma <= 0)
+ arg_fail(invocation, world_rank, "σ must be positive.");
+ if (finitization <= 0)
+ arg_fail(invocation, world_rank, "Finitization must be positive.");
+ if (out_file_name == std::string(""))
+ arg_fail(invocation, world_rank, "Need output file.");
+ if (chunk_size <= 0)
+ arg_fail(invocation, world_rank, "Chunk size must be positive.");
+ // End argument validation.
+
+ bool symmetric = list_1_file_name == list_2_file_name;
+
+ std::vector<std::string> files_1;
+ std::vector<std::string> files_2;
+
+ std::ifstream list_file(list_1_file_name, std::ios::in);
+ if (!list_file.is_open())
+ {
+ fail(world_rank, std::string("Failed to load diagram list file ") + list_1_file_name + std::string("."));
+ }
+ std::string line;
+ while (std::getline(list_file, line))
+ {
+ files_1.push_back(line);
+ }
+ list_file.close();
+
+ if (symmetric)
+ {
+ files_2 = files_1;
+ }
+ else
+ {
+ list_file.open(list_2_file_name, std::ios::in);
+ if (!list_file.is_open())
+ {
+ fail(world_rank, std::string("Failed to load diagram list file ") + list_2_file_name + std::string("."));
+ }
+ while (std::getline(list_file, line))
+ {
+ files_2.push_back(line);
+ }
+ list_file.close();
+ }
+
+ int m = files_1.size();
+ int n = files_2.size();
+
+ std::vector<std::pair<int, int>> idxs;
+ if (symmetric)
+ {
+ for (int i = 0; i < m; ++i)
+ {
+ for (int j = i; j < n; ++j)
+ {
+ idxs.push_back(std::make_pair(i, j));
+ }
+ }
+ }
+ else
+ {
+ for (int i = 0; i < m; ++i)
+ {
+ for (int j = 0; j < n; ++j)
+ {
+ idxs.push_back(std::make_pair(i, j));
+ }
+ }
+ }
+
+ // Begin master.
+ if (world_rank == 0)
+ {
+ std::vector<double> result(idxs.size(), std::numeric_limits<double>::quiet_NaN());
+
+ double unused_buf = std::numeric_limits<double>::quiet_NaN();
+ std::vector<MPI_Request> result_reqs(world_size); // Element zero not used, index by actual rank.
+
+ int done = 0;
+ int next_chunk = 0;
+
+ std::vector<std::pair<int, int>> assigned(world_size, std::make_pair(-2, -2));
+
+ std::cout << "Total things to compute: " << idxs.size() << std::endl;
+
+ for (int r = 1; r < world_size; ++r)
+ {
+ MPI_Irecv(&unused_buf, 0, MPI_DOUBLE, r, (int)Message_tag::result, MPI_COMM_WORLD, &(result_reqs[r]));
+ }
+
+ while ((size_t) done < idxs.size())
+ {
+ int respondent_index = -1;
+ MPI_Status status;
+ MPI_Waitany(world_size - 1, &(result_reqs[1]), &respondent_index, &status);
+ int r = 1 + respondent_index;
+
+ std::cout << "Heard back from rank " << r << "." << std::endl;
+ int recv_count = -1;
+ MPI_Get_count(&status, MPI_DOUBLE, &recv_count);
+ std::cout << "Received " << recv_count << " elements from rank " << r << "." << std::endl;
+
+ assert(recv_count == assigned[r].second - assigned[r].first);
+ done += recv_count;
+
+ int work[2] = {-1, -1};
+ if ((size_t)next_chunk*chunk_size < idxs.size())
+ {
+ work[0] = next_chunk*chunk_size;
+ work[1] = std::min((int)idxs.size(), (next_chunk + 1)*chunk_size);
+ MPI_Irecv(&(result[work[0]]), work[1] - work[0], MPI_DOUBLE, r, (int)Message_tag::result, MPI_COMM_WORLD, &(result_reqs[r]));
+ std::cout << "Rank will get new work." << std::endl;
+ }
+ else
+ std::cout << "Rank will terminate." << std::endl;
+
+ assigned[r] = std::make_pair(work[0], work[1]);
+ ++next_chunk;
+ MPI_Send(work, 2, MPI_INT, r, (int)Message_tag::work, MPI_COMM_WORLD);
+ std::cout << 100*(double)done/(double)idxs.size() << "% complete." << std::endl;
+ std::cout << "------------------" << std::endl;
+ }
+ std::cout << "Done. Writing out." << std::endl;
+ std::ofstream out_file(out_file_name, std::ios::out);
+ for (int i = 0; i < m; ++i)
+ {
+ for (int j = 0; j < i; ++j)
+ {
+ out_file << std::scientific << result[symmetric ? unroll_upper_tri(n, j, i) : unroll_full(n, i, j)] << " ";
+ }
+ for (int j = i; j < n; ++j)
+ {
+ out_file << std::scientific << result[symmetric ? unroll_upper_tri(n, i, j) : unroll_full(n, i, j)] << " ";
+ }
+ out_file << std::endl;
+ }
+ out_file.close();
+ } // End master.
+
+ else // Begin slaves.
+ {
+ std::vector<double> result(chunk_size, std::numeric_limits<double>::quiet_NaN());
+ int work[2] = {-2, -2};
+
+ while (work[0] != -1)
+ {
+ int i_prev = -1;
+ int j_prev = -1;
+ int i = -1;
+ int j = -1;
+
+ PD<double> pd_1;
+ PD<double> pd_2;
+
+ for (int k = work[0]; k < work[1]; ++k)
+ {
+ i = idxs[k].first;
+ j = idxs[k].second;
+
+ int load_status = 1;
+
+ if (i != i_prev)
+ {
+ pd_1.clear();
+ load_status = read_dipha_degree(files_1[i], dim, pd_1);
+
+ if (load_status)
+ fail(world_rank, std::string("Failed to load file ") + files_1[i] + std::string("."));
+
+ pd_1.finitize(finitization);
+ }
+
+ if (j != j_prev)
+ {
+ pd_2.clear();
+ load_status = read_dipha_degree(files_2[j], dim, pd_2);
+
+ if (load_status)
+ fail(world_rank, std::string("Failed to load file ") + files_2[j] + std::string("."));
+
+ pd_2.finitize(finitization);
+ }
+
+ result[k - work[0]] = heat_kernel<double>(sigma, pd_1, pd_2);
+ i_prev = i;
+ j_prev = j;
+ }
+
+ MPI_Send(result.data(), work[1] - work[0], MPI_DOUBLE, 0, (int)Message_tag::result, MPI_COMM_WORLD);
+ MPI_Recv(work, 2, MPI_INT, 0, (int)Message_tag::work, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+ }
+ }
+ // End slave.
+
+ MPI_Finalize();
+ return 0;
+}