9#include "CollectiveCommBaseInterface.h"
10#include "CollectiveCommunicationSingleNonBlocking.h"
12#include <utils/Logger.h>
13#include "CollectiveCommunicationInterface.h"
14#include "Simulation.h"
29 _currentKey(-1), _comms() {
41 void init(MPI_Comm communicator,
int numValues,
int key = 0)
override {
42 if (_currentKey != -1) {
43 global_log->error() <<
"CollectiveCommunicationNonBlocking: previous communication with key " << _currentKey
44 <<
" not yet finalized" << std::endl;
51 if (_comms.count(_currentKey) == 1) {
53 global_log->debug() <<
"CollectiveCommunicationNonBlocking: key " << _currentKey
54 <<
" already existent. Reusing information." << std::endl;
56 global_log->debug() <<
"CollectiveCommunicationNonBlocking: key " << _currentKey
57 <<
" not existent. Cannot reuse information." << std::endl;
59 auto [_, inserted] = _comms.try_emplace(_currentKey);
61 global_log->error() <<
"CollectiveCommunicationNonBlocking: key " << _currentKey
62 <<
" could not be inserted. Aborting!" << std::endl;
66 _comms.at(_currentKey).init(communicator, numValues, _currentKey);
72 _comms.at(_currentKey).finalize();
73 if (_currentKey == 0) {
74 global_log->debug() <<
"CollectiveCommunicationNonBlocking: finalizing with key " << _currentKey
75 <<
", thus the entry is removed." << std::endl;
76 _comms.erase(_currentKey);
84 _comms.at(_currentKey).appendInt(intValue);
90 _comms.at(_currentKey).appendUnsLong(unsLongValue);
96 _comms.at(_currentKey).appendFloat(floatValue);
102 _comms.at(_currentKey).appendDouble(doubleValue);
108 _comms.at(_currentKey).appendLongDouble(longDoubleValue);
114 return _comms.at(_currentKey).getTopology();
123 return _comms.at(_currentKey).getInt();
132 return _comms.at(_currentKey).getUnsLong();
141 return _comms.at(_currentKey).getFloat();
150 return _comms.at(_currentKey).getDouble();
159 return _comms.at(_currentKey).getLongDouble();
165 _comms.at(_currentKey).broadcast(root);
170 Log::global_log->debug() <<
"CollectiveCommunicationNonBlocking: normal Allreduce" << std::endl;
171 _comms.at(_currentKey).allreduceSum();
178 Log::global_log->debug() <<
"CollectiveCommunicationNonBlocking: nonblocking Allreduce with id "<< _currentKey << std::endl;
179 mardyn_assert(_currentKey > 0);
180 _comms.at(_currentKey).allreduceSumAllowPrevious();
184 _comms.at(_currentKey).allreduceCustom(
type);
188 _comms.at(_currentKey).scanSum();
202 for (
auto& comm : _comms){
203 tmp += comm.second.getTotalSize();
209 std::map<int, CollectiveCommunicationSingleNonBlocking> _comms;
This class provides an interface for the collective communication classes.
Definition: CollectiveCommunicationInterface.h:15
Definition: CollectiveCommunicationNonBlocking.h:25
void appendUnsLong(unsigned long unsLongValue) override
Definition: CollectiveCommunicationNonBlocking.h:89
void finalize() override
delete memory and MPI_Type
Definition: CollectiveCommunicationNonBlocking.h:71
void allreduceCustom(ReduceType type) override
Definition: CollectiveCommunicationNonBlocking.h:183
MPI_Comm getTopology() override
Definition: CollectiveCommunicationNonBlocking.h:113
double getDouble() override
Definition: CollectiveCommunicationNonBlocking.h:149
virtual void allreduceSumAllowPrevious() override
Definition: CollectiveCommunicationNonBlocking.h:177
float getFloat() override
Definition: CollectiveCommunicationNonBlocking.h:140
void init(MPI_Comm communicator, int numValues, int key=0) override
allocate memory for the values to be sent, initialize counters for specific
Definition: CollectiveCommunicationNonBlocking.h:41
CollectiveCommunicationNonBlocking()
Constructor, does nothing yet.
Definition: CollectiveCommunicationNonBlocking.h:28
void appendFloat(float floatValue) override
Definition: CollectiveCommunicationNonBlocking.h:95
void scanSum() override
Performs a scan (sum)
Definition: CollectiveCommunicationNonBlocking.h:187
size_t getTotalSize() override
Definition: CollectiveCommunicationNonBlocking.h:200
void allreduceSum() override
Do Allreduce off all values with reduce operation add.
Definition: CollectiveCommunicationNonBlocking.h:169
~CollectiveCommunicationNonBlocking() override=default
int getInt() override
Definition: CollectiveCommunicationNonBlocking.h:122
unsigned long getUnsLong() override
Definition: CollectiveCommunicationNonBlocking.h:131
void appendDouble(double doubleValue) override
Definition: CollectiveCommunicationNonBlocking.h:101
void appendLongDouble(long double longDoubleValue) override
Definition: CollectiveCommunicationNonBlocking.h:107
long double getLongDouble() override
Definition: CollectiveCommunicationNonBlocking.h:158
void broadcast(int root=0) override
Definition: CollectiveCommunicationNonBlocking.h:164
void appendInt(int intValue) override
Definition: CollectiveCommunicationNonBlocking.h:83
static void exit(int exitcode)
Terminate simulation with given exit code.
Definition: Simulation.cpp:155
Enumeration class corresponding to the type schema type.
Definition: vtk-unstructured.h:1746