From 6ff8072e8c5a6dc1301e884f5a648a0b63bdd48a Mon Sep 17 00:00:00 2001 From: Arnur Nigmetov Date: Tue, 3 Dec 2019 20:34:28 +0100 Subject: Rename directories for bottleneck and Wasserstein --- wasserstein/include/dnn/parallel/tbb.h | 237 +++++++++++++++++++++++++++++++ wasserstein/include/dnn/parallel/utils.h | 100 +++++++++++++ 2 files changed, 337 insertions(+) create mode 100644 wasserstein/include/dnn/parallel/tbb.h create mode 100644 wasserstein/include/dnn/parallel/utils.h (limited to 'wasserstein/include/dnn/parallel') diff --git a/wasserstein/include/dnn/parallel/tbb.h b/wasserstein/include/dnn/parallel/tbb.h new file mode 100644 index 0000000..3f811d6 --- /dev/null +++ b/wasserstein/include/dnn/parallel/tbb.h @@ -0,0 +1,237 @@ +#ifndef HERA_WS_PARALLEL_H +#define HERA_WS_PARALLEL_H + +#include + +#include +#include +#include + +#ifdef TBB + +#include +#include +#include + +#include +#include +#include + +namespace hera +{ +namespace ws +{ +namespace dnn +{ + using tbb::mutex; + using tbb::task_scheduler_init; + using tbb::task_group; + using tbb::task; + + template + struct vector + { + typedef tbb::concurrent_vector type; + }; + + template + struct atomic + { + typedef tbb::atomic type; + static T compare_and_swap(type& v, T n, T o) { return v.compare_and_swap(n,o); } + }; + + template + void do_foreach(Iterator begin, Iterator end, const F& f) { tbb::parallel_do(begin, end, f); } + + template + void for_each_range_(const Range& r, const F& f) + { + for (typename Range::iterator cur = r.begin(); cur != r.end(); ++cur) + f(*cur); + } + + template + void for_each_range(size_t from, size_t to, const F& f) + { + //static tbb::affinity_partitioner ap; + //tbb::parallel_for(c.range(), boost::bind(&for_each_range_, _1, f), ap); + tbb::parallel_for(from, to, f); + } + + template + void for_each_range(const Container& c, const F& f) + { + //static tbb::affinity_partitioner ap; + //tbb::parallel_for(c.range(), boost::bind(&for_each_range_, _1, f), ap); + tbb::parallel_for(c.range(), boost::bind(&for_each_range_, _1, f)); + } + + template + void for_each_range(Container& c, const F& f) + { + //static tbb::affinity_partitioner ap; + //tbb::parallel_for(c.range(), boost::bind(&for_each_range_, _1, f), ap); + tbb::parallel_for(c.range(), boost::bind(&for_each_range_, _1, f)); + } + + template + struct map_traits + { + typedef tbb::concurrent_hash_map type; + typedef typename type::range_type range; + }; + + struct progress_timer + { + progress_timer(): start(tbb::tick_count::now()) {} + ~progress_timer() + { std::cout << (tbb::tick_count::now() - start).seconds() << " s" << std::endl; } + + tbb::tick_count start; + }; +} // dnn +} // ws +} // hera + +// Serialization for tbb::concurrent_vector<...> +namespace boost +{ + namespace serialization + { + template + void save(Archive& ar, const tbb::concurrent_vector& v, const unsigned int file_version) + { stl::save_collection(ar, v); } + + template + void load(Archive& ar, tbb::concurrent_vector& v, const unsigned int file_version) + { + stl::load_collection, + stl::archive_input_seq< Archive, tbb::concurrent_vector >, + stl::reserve_imp< tbb::concurrent_vector > + >(ar, v); + } + + template + void serialize(Archive& ar, tbb::concurrent_vector& v, const unsigned int file_version) + { split_free(ar, v, file_version); } + + template + void save(Archive& ar, const tbb::atomic& v, const unsigned int file_version) + { T v_ = v; ar << v_; } + + template + void load(Archive& ar, tbb::atomic& v, const unsigned int file_version) + { T v_; ar >> v_; v = v_; } + + template + void serialize(Archive& ar, tbb::atomic& v, const unsigned int file_version) + { split_free(ar, v, file_version); } + } +} + +#else + +#include +#include +#include + +namespace hera +{ +namespace ws +{ +namespace dnn +{ + template + struct vector + { + typedef ::std::vector type; + }; + + template + struct atomic + { + typedef T type; + static T compare_and_swap(type& v, T n, T o) { if (v != o) return v; v = n; return o; } + }; + + template + void do_foreach(Iterator begin, Iterator end, const F& f) { std::for_each(begin, end, f); } + + template + void for_each_range(size_t from, size_t to, const F& f) + { + for (size_t i = from; i < to; ++i) + f(i); + } + + template + void for_each_range(Container& c, const F& f) + { + BOOST_FOREACH(const typename Container::value_type& i, c) + f(i); + } + + template + void for_each_range(const Container& c, const F& f) + { + BOOST_FOREACH(const typename Container::value_type& i, c) + f(i); + } + + struct mutex + { + struct scoped_lock + { + scoped_lock() {} + scoped_lock(mutex& ) {} + void acquire(mutex& ) const {} + void release() const {} + }; + }; + + struct task_scheduler_init + { + task_scheduler_init(unsigned) {} + void initialize(unsigned) {} + static const unsigned automatic = 0; + static const unsigned deferred = 0; + }; + + struct task_group + { + template + void run(const Functor& f) const { f(); } + void wait() const {} + }; + + template + struct map_traits + { + typedef std::map type; + typedef type range; + }; + + using boost::progress_timer; +} // dnn +} // ws +} // hera + +#endif // TBB + +namespace hera +{ +namespace ws +{ +namespace dnn +{ + template + void do_foreach(const Range& range, const F& f) { do_foreach(boost::begin(range), boost::end(range), f); } +} // dnn +} // ws +} // hera + +#endif diff --git a/wasserstein/include/dnn/parallel/utils.h b/wasserstein/include/dnn/parallel/utils.h new file mode 100644 index 0000000..7104ec3 --- /dev/null +++ b/wasserstein/include/dnn/parallel/utils.h @@ -0,0 +1,100 @@ +#ifndef HERA_WS_PARALLEL_UTILS_H +#define HERA_WS_PARALLEL_UTILS_H + +#include "../utils.h" + +namespace hera +{ +namespace ws +{ +namespace dnn +{ + // Assumes rng is synchronized across ranks + template + void shuffle(mpi::communicator& world, DataVector& data, RNGType& rng, const SwapFunctor& swap, DataVector empty = DataVector()); + + template + void shuffle(mpi::communicator& world, DataVector& data, RNGType& rng) + { + typedef decltype(data[0]) T; + shuffle(world, data, rng, [](T& x, T& y) { std::swap(x,y); }); + } +} // dnn +} // ws +} // hera + +template +void +hera::ws::dnn::shuffle(mpi::communicator& world, DataVector& data, RNGType& rng, const SwapFunctor& swap, DataVector empty) +{ + // This is not a perfect shuffle: it dishes out data in chunks of 1/size. + // (It can be interpreted as generating a bistochastic matrix by taking the + // sum of size random permutation matrices.) Hopefully, it works for our purposes. + + typedef typename RNGType::result_type RNGResult; + + int size = world.size(); + int rank = world.rank(); + + // Generate local seeds + boost::uniform_int uniform; + RNGResult seed; + for (size_t i = 0; i < size; ++i) + { + RNGResult v = uniform(rng); + if (i == rank) + seed = v; + } + RNGType local_rng(seed); + + // Shuffle local data + hera::ws::dnn::random_shuffle(data.begin(), data.end(), local_rng, swap); + + // Decide how much of our data goes to i-th processor + std::vector out_counts(size); + std::vector ranks(boost::counting_iterator(0), + boost::counting_iterator(size)); + for (size_t i = 0; i < size; ++i) + { + hera::ws::dnn::random_shuffle(ranks.begin(), ranks.end(), rng); + ++out_counts[ranks[rank]]; + } + + // Fill the outgoing array + size_t total = 0; + std::vector< DataVector > outgoing(size, empty); + for (size_t i = 0; i < size; ++i) + { + size_t count = data.size()*out_counts[i]/size; + if (total + count > data.size()) + count = data.size() - total; + + outgoing[i].reserve(count); + for (size_t j = total; j < total + count; ++j) + outgoing[i].push_back(data[j]); + + total += count; + } + + boost::uniform_int uniform_outgoing(0,size-1); // in range [0,size-1] + while(total < data.size()) // send leftover to random processes + { + outgoing[uniform_outgoing(local_rng)].push_back(data[total]); + ++total; + } + data.clear(); + + // Exchange the data + std::vector< DataVector > incoming(size, empty); + mpi::all_to_all(world, outgoing, incoming); + outgoing.clear(); + + // Assemble our data + for(const DataVector& vec : incoming) + for (size_t i = 0; i < vec.size(); ++i) + data.push_back(vec[i]); + hera::ws::dnn::random_shuffle(data.begin(), data.end(), local_rng, swap); + // XXX: the final shuffle is irrelevant for our purposes. But it's also cheap. +} + +#endif -- cgit v1.2.3 From 4c3eca516693a333facbb6c6a952da735e902846 Mon Sep 17 00:00:00 2001 From: Arnur Nigmetov Date: Tue, 18 Feb 2020 15:35:02 +0100 Subject: README and boost/progress changes from master. --- README.txt | 22 +++++++++++++++++++--- wasserstein/include/dnn/parallel/tbb.h | 2 -- 2 files changed, 19 insertions(+), 5 deletions(-) (limited to 'wasserstein/include/dnn/parallel') diff --git a/README.txt b/README.txt index 4a132fd..ae3f6a0 100644 --- a/README.txt +++ b/README.txt @@ -1,8 +1,5 @@ This repository contains software to compute bottleneck and Wasserstein distances between persistence diagrams. -See Michael Kerber, Dmitriy Morozov, and Arnur Nigmetov, -"Geometry Helps to Compare Persistence Diagrams.", ALENEX 2016. -http://dx.doi.org/10.1137/1.9781611974317.9 The software is licensed under BSD license, see license.txt file. @@ -10,3 +7,22 @@ If you are going to use this software for research purposes, you probably do not need to worry about that. See README files in subdirectories for usage and building. + +If you use Hera in your project, we would appreciate if you +cite the corresponding paper: +Michael Kerber, Dmitriy Morozov, and Arnur Nigmetov, +"Geometry Helps to Compare Persistence Diagrams.", +Journal of Experimental Algorithmics, vol. 22, 2017, pp. 1--20. +(conference version: ALENEX 2016). +The BibTeX is below: + + +@article{jea_hera, + title={Geometry helps to compare persistence diagrams}, + author={Kerber, Michael and Morozov, Dmitriy and Nigmetov, Arnur}, + journal={Journal of Experimental Algorithmics (JEA)}, + volume={22}, + pages={1--20}, + year={2017}, + publisher={ACM New York, NY, USA} +} diff --git a/wasserstein/include/dnn/parallel/tbb.h b/wasserstein/include/dnn/parallel/tbb.h index 3f811d6..712f812 100644 --- a/wasserstein/include/dnn/parallel/tbb.h +++ b/wasserstein/include/dnn/parallel/tbb.h @@ -135,7 +135,6 @@ namespace boost #include #include -#include namespace hera { @@ -215,7 +214,6 @@ namespace dnn typedef type range; }; - using boost::progress_timer; } // dnn } // ws } // hera -- cgit v1.2.3