ls1-MarDyn
ls1-MarDyn molecular dynamics code
CollectiveCommunicationNonBlocking.h
1/*
2 * CollectiveCommunicationNonBlocking.h
3 *
4 * Created on: May 2, 2016
5 * Author: seckler
6 */
7#pragma once
8
9#include "CollectiveCommBaseInterface.h"
10#include "CollectiveCommunicationSingleNonBlocking.h"
11#include <map>
12#include <utils/Logger.h>
13#include "CollectiveCommunicationInterface.h"
14#include "Simulation.h"
15
16#if MPI_VERSION >= 3
17
18using Log::global_log;
26public:
29 _currentKey(-1), _comms() {
30 }
31
36
41 void init(MPI_Comm communicator, int numValues, int key = 0) override {
42 if (_currentKey != -1) {
43 global_log->error() << "CollectiveCommunicationNonBlocking: previous communication with key " << _currentKey
44 << " not yet finalized" << std::endl;
46 }
47
48 _currentKey = key;
49
50 // add the key, if it is not yet existent:
51 if (_comms.count(_currentKey) == 1) {
52 // this happens, if the key is already existent.
53 global_log->debug() << "CollectiveCommunicationNonBlocking: key " << _currentKey
54 << " already existent. Reusing information." << std::endl;
55 } else {
56 global_log->debug() << "CollectiveCommunicationNonBlocking: key " << _currentKey
57 << " not existent. Cannot reuse information." << std::endl;
58 // Creates the CollectiveCommunicationSingleNonBlocking object
59 auto [_, inserted] = _comms.try_emplace(_currentKey);
60 if (not inserted) {
61 global_log->error() << "CollectiveCommunicationNonBlocking: key " << _currentKey
62 << " could not be inserted. Aborting!" << std::endl;
63 Simulation::exit(498789);
64 }
65 }
66 _comms.at(_currentKey).init(communicator, numValues, _currentKey);
67 }
68
71 void finalize() override {
72 _comms.at(_currentKey).finalize();
73 if (_currentKey == 0) {
74 global_log->debug() << "CollectiveCommunicationNonBlocking: finalizing with key " << _currentKey
75 << ", thus the entry is removed." << std::endl;
76 _comms.erase(_currentKey);
77 }
78 _currentKey = -1;
79 }
80
83 void appendInt(int intValue) override {
84 _comms.at(_currentKey).appendInt(intValue);
85 }
86
89 void appendUnsLong(unsigned long unsLongValue) override {
90 _comms.at(_currentKey).appendUnsLong(unsLongValue);
91 }
92
95 void appendFloat(float floatValue) override {
96 _comms.at(_currentKey).appendFloat(floatValue);
97 }
98
101 void appendDouble(double doubleValue) override {
102 _comms.at(_currentKey).appendDouble(doubleValue);
103 }
104
107 void appendLongDouble(long double longDoubleValue) override {
108 _comms.at(_currentKey).appendLongDouble(longDoubleValue);
109 }
110
113 MPI_Comm getTopology() override {
114 return _comms.at(_currentKey).getTopology();
115 }
116
122 int getInt() override {
123 return _comms.at(_currentKey).getInt();
124 }
125
131 unsigned long getUnsLong() override {
132 return _comms.at(_currentKey).getUnsLong();
133 }
134
140 float getFloat() override {
141 return _comms.at(_currentKey).getFloat();
142 }
143
149 double getDouble() override {
150 return _comms.at(_currentKey).getDouble();
151 }
152
158 long double getLongDouble() override {
159 return _comms.at(_currentKey).getLongDouble();
160 }
161
164 void broadcast(int root = 0) override {
165 _comms.at(_currentKey).broadcast(root);
166 }
167
169 void allreduceSum() override {
170 Log::global_log->debug() << "CollectiveCommunicationNonBlocking: normal Allreduce" << std::endl;
171 _comms.at(_currentKey).allreduceSum();
172 }
173
177 virtual void allreduceSumAllowPrevious() override {
178 Log::global_log->debug() << "CollectiveCommunicationNonBlocking: nonblocking Allreduce with id "<< _currentKey << std::endl;
179 mardyn_assert(_currentKey > 0); // _currentKey has to be positive non-zero and should be unique for allreduceSumAllowPrevious
180 _comms.at(_currentKey).allreduceSumAllowPrevious();
181 }
182
183 void allreduceCustom(ReduceType type) override{
184 _comms.at(_currentKey).allreduceCustom(type);
185 }
186
187 void scanSum() override {
188 _comms.at(_currentKey).scanSum();
189 }
190
195 /*
196 void waitSpecific(int key) {
197 _comms[key].waitAndUpdateData();
198 }*/
199
200 size_t getTotalSize() override {
201 size_t tmp = 0;
202 for (auto& comm : _comms){
203 tmp += comm.second.getTotalSize();
204 }
205 return tmp;
206 }
207private:
208 int _currentKey;
209 std::map<int, CollectiveCommunicationSingleNonBlocking> _comms;
210};
211
212#endif // MPI_VERSION >= 3
This class provides an interface for the collective communication classes.
Definition: CollectiveCommunicationInterface.h:15
Definition: CollectiveCommunicationNonBlocking.h:25
void appendUnsLong(unsigned long unsLongValue) override
Definition: CollectiveCommunicationNonBlocking.h:89
void finalize() override
delete memory and MPI_Type
Definition: CollectiveCommunicationNonBlocking.h:71
void allreduceCustom(ReduceType type) override
Definition: CollectiveCommunicationNonBlocking.h:183
MPI_Comm getTopology() override
Definition: CollectiveCommunicationNonBlocking.h:113
double getDouble() override
Definition: CollectiveCommunicationNonBlocking.h:149
virtual void allreduceSumAllowPrevious() override
Definition: CollectiveCommunicationNonBlocking.h:177
float getFloat() override
Definition: CollectiveCommunicationNonBlocking.h:140
void init(MPI_Comm communicator, int numValues, int key=0) override
allocate memory for the values to be sent, initialize counters for specific
Definition: CollectiveCommunicationNonBlocking.h:41
CollectiveCommunicationNonBlocking()
Constructor, does nothing yet.
Definition: CollectiveCommunicationNonBlocking.h:28
void appendFloat(float floatValue) override
Definition: CollectiveCommunicationNonBlocking.h:95
void scanSum() override
Performs a scan (sum)
Definition: CollectiveCommunicationNonBlocking.h:187
size_t getTotalSize() override
Definition: CollectiveCommunicationNonBlocking.h:200
void allreduceSum() override
Do Allreduce off all values with reduce operation add.
Definition: CollectiveCommunicationNonBlocking.h:169
~CollectiveCommunicationNonBlocking() override=default
int getInt() override
Definition: CollectiveCommunicationNonBlocking.h:122
unsigned long getUnsLong() override
Definition: CollectiveCommunicationNonBlocking.h:131
void appendDouble(double doubleValue) override
Definition: CollectiveCommunicationNonBlocking.h:101
void appendLongDouble(long double longDoubleValue) override
Definition: CollectiveCommunicationNonBlocking.h:107
long double getLongDouble() override
Definition: CollectiveCommunicationNonBlocking.h:158
void broadcast(int root=0) override
Definition: CollectiveCommunicationNonBlocking.h:164
void appendInt(int intValue) override
Definition: CollectiveCommunicationNonBlocking.h:83
static void exit(int exitcode)
Terminate simulation with given exit code.
Definition: Simulation.cpp:155
Enumeration class corresponding to the type schema type.
Definition: vtk-unstructured.h:1746