ls1-MarDyn
ls1-MarDyn molecular dynamics code
CollectiveCommunication.h
1#ifndef COLLECTIVECOMMUNICATION_H_
2#define COLLECTIVECOMMUNICATION_H_
3
4#include <mpi.h>
5
6#include "Simulation.h"
7#include "utils/Logger.h"
8#include "CollectiveCommBase.h"
9#include "CollectiveCommunicationInterface.h"
10#include "utils/mardyn_assert.h"
11
12
13/* Enable agglomerated reduce operations. This will store all values in one array and apply a
14 * user defined reduce operation so that the MPI reduce operation is only called once. */
15#define ENABLE_AGGLOMERATED_REDUCE 1
16
66public:
68 _communicator = MPI_COMM_NULL;
69 _agglomeratedType = MPI_DATATYPE_NULL;
70 }
71
72 virtual ~~CollectiveCommunication() {
73 //mardyn_assert(_agglomeratedType == MPI_DATATYPE_NULL);
74 }
75
78 virtual void init(MPI_Comm communicator, int numValues, int key = 0) override {
79 CollectiveCommBase::init(numValues);
80
81 _communicator = communicator;
82 _types.reserve(numValues);
83 }
84
85 // documentation in base class
86 virtual void finalize() override {
88 _types.clear();
89
90 mardyn_assert(_agglomeratedType == MPI_DATATYPE_NULL);
91 }
92
93 // documentation in base class
94 void appendInt(int intValue) override {
96 _types.push_back(MPI_INT);
97 }
98
99 // documentation in base class
100 void appendUnsLong(unsigned long unsLongValue) override {
102 _types.push_back(MPI_UNSIGNED_LONG);
103 }
104
105 // documentation in base class
106 void appendFloat(float floatValue) override {
108 _types.push_back(MPI_FLOAT);
109 }
110
111 // documentation in base class
112 void appendDouble(double doubleValue) override {
114 _types.push_back(MPI_DOUBLE);
115 }
116
117 // documentation in base class
118 void appendLongDouble(long double longDoubleValue) override {
120 _types.push_back(MPI_LONG_DOUBLE);
121 }
122
125 MPI_Comm getTopology() override {
126 return _communicator;
127 }
128
129 // Getters don't need to be overridden, see parent class
130
131 // documentation in base class
132 void broadcast(int root = 0) override {
133 setMPIType();
134 valType * startOfValues = _values.data();
135 MPI_CHECK(
136 MPI_Bcast(startOfValues, 1, _agglomeratedType, root,
138 MPI_CHECK(MPI_Type_free(&_agglomeratedType));
139 }
140
141 // documentation in base class
142 void allreduceSum() override {
143 allreduceCustom(ReduceType::SUM);
144 }
145
146 void allreduceCustom(ReduceType type) override {
147 Log::global_log->debug() << "CollectiveCommunication: custom Allreduce" << std::endl;
148#if ENABLE_AGGLOMERATED_REDUCE
149 setMPIType();
150 MPI_Op agglomeratedTypeAddOperator;
151 const int commutative = 1;
152 valType * startOfValues = _values.data();
153
154 switch (type) {
155 case ReduceType::SUM:
156 MPI_CHECK(
157 MPI_Op_create(
158 (MPI_User_function * ) CollectiveCommunication::add,
159 commutative, &agglomeratedTypeAddOperator));
160 break;
161 case ReduceType::MAX:
162 MPI_CHECK(
163 MPI_Op_create(
164 (MPI_User_function * ) CollectiveCommunication::max,
165 commutative, &agglomeratedTypeAddOperator));
166 break;
167 case ReduceType::MIN:
168 MPI_CHECK(
169 MPI_Op_create(
170 (MPI_User_function * ) CollectiveCommunication::min,
171 commutative, &agglomeratedTypeAddOperator));
172 break;
173 default:
174 Log::global_log->error()<<"invalid reducetype, aborting." << std::endl;
176 }
177
178 MPI_CHECK(
179 MPI_Allreduce(MPI_IN_PLACE, startOfValues, 1, _agglomeratedType, agglomeratedTypeAddOperator, _communicator));
180 MPI_CHECK(MPI_Op_free(&agglomeratedTypeAddOperator));
181 MPI_CHECK(MPI_Type_free(&_agglomeratedType));
182#else
183 for (unsigned int i = 0; i < _types.size(); i++) {
184 MPI_Op op = MPI_NO_OP;
185 switch(type) {
186 case ReduceType::SUM:
187 op = MPI_SUM;
188 break;
189 case ReduceType::MIN:
190 op = MPI_MIN;
191 break;
192 case ReduceType::MAX:
193 op = MPI_MAX;
194 break;
195 default:
196 Log::global_log->error()<<"invalid reducetype, aborting." << std::endl;
198 }
199 MPI_CHECK(MPI_Allreduce( MPI_IN_PLACE, &_values[i], 1, _types[i], op, _communicator ));
200 }
201#endif
202 }
203
207 virtual void allreduceSumAllowPrevious() override{
208 allreduceSum();
209 }
210
211 // documentation in base class
212 void scanSum() override {
213 #if ENABLE_AGGLOMERATED_REDUCE
214 setMPIType();
215 MPI_Op agglomeratedTypeAddOperator;
216 const int commutative = 1;
217 valType * startOfValues = _values.data();
218 MPI_CHECK(
219 MPI_Op_create(
220 (MPI_User_function * ) CollectiveCommunication::add,
221 commutative, &agglomeratedTypeAddOperator));
222 MPI_CHECK(
223 MPI_Scan(MPI_IN_PLACE, startOfValues, 1, _agglomeratedType, agglomeratedTypeAddOperator, _communicator));
224 MPI_CHECK(MPI_Op_free(&agglomeratedTypeAddOperator));
225 MPI_CHECK(MPI_Type_free(&_agglomeratedType));
226 #else
227 for(unsigned int i = 0; i < _types.size(); i++ ) {
228 MPI_CHECK( MPI_Scan( MPI_IN_PLACE, _values.data(), 1, _types[i], MPI_SUM, _communicator ) );
229 }
230 #endif
231 }
232
233 virtual size_t getTotalSize() override{
234 return CollectiveCommBase::getTotalSize() + _types.capacity() * sizeof(MPI_Datatype);
235 }
236
237
238protected:
245 void setMPIType() {
246 int numblocks = _values.size();
247 std::vector<int> blocklengths(numblocks);
248 std::vector<MPI_Aint> disps(numblocks);
249 int disp = 0;
250 for (int i = 0; i < numblocks; i++) {
251 blocklengths[i] = 1;
252 disps[i] = disp;
253 disp += sizeof(valType);
254 }
255 MPI_Datatype * startOfTypes = &(_types[0]);
256#if MPI_VERSION >= 2 && MPI_SUBVERSION >= 0
257 MPI_CHECK(
258 MPI_Type_create_struct(numblocks, blocklengths.data(), disps.data(),
259 startOfTypes, &_agglomeratedType));
260#else
261 MPI_CHECK( MPI_Type_struct(numblocks, blocklengths.data(), disps.data(), startOfTypes, &_agglomeratedType) );
262#endif
263 MPI_CHECK(MPI_Type_commit(&_agglomeratedType));
264 }
265
277 static void add(valType *invec, valType *inoutvec, int */*len*/,
278 MPI_Datatype *dtype) {
279 int numints;
280 int numaddr;
281 int numtypes;
282 int combiner;
283
284 MPI_CHECK(
285 MPI_Type_get_envelope(*dtype, &numints, &numaddr, &numtypes,
286 &combiner));
287
288 std::vector<int> arrayInts(numints);
289 std::vector<MPI_Aint> arrayAddr(numaddr);
290 std::vector<MPI_Datatype> arrayTypes(numtypes);
291
292 MPI_CHECK(
293 MPI_Type_get_contents(*dtype, numints, numaddr, numtypes,
294 arrayInts.data(), arrayAddr.data(), arrayTypes.data()));
295
296 for (int i = 0; i < numtypes; i++) {
297 if (arrayTypes[i] == MPI_INT) {
298 inoutvec[i].v_int += invec[i].v_int;
299 }
300 else if (arrayTypes[i] == MPI_UNSIGNED_LONG) {
301 inoutvec[i].v_unsLong += invec[i].v_unsLong;
302 }
303 else if (arrayTypes[i] == MPI_FLOAT) {
304 inoutvec[i].v_float += invec[i].v_float;
305 }
306 else if (arrayTypes[i] == MPI_DOUBLE) {
307 inoutvec[i].v_double += invec[i].v_double;
308 }
309 else if (arrayTypes[i] == MPI_LONG_DOUBLE) {
310 inoutvec[i].v_longDouble += invec[i].v_longDouble;
311 }
312 }
313 }
314
315 static void max(valType *invec, valType *inoutvec, int */*len*/,
316 MPI_Datatype *dtype) {
317 int numints;
318 int numaddr;
319 int numtypes;
320 int combiner;
321
322 MPI_CHECK(
323 MPI_Type_get_envelope(*dtype, &numints, &numaddr, &numtypes,
324 &combiner));
325
326 std::vector<int> arrayInts(numints);
327 std::vector<MPI_Aint> arrayAddr(numaddr);
328 std::vector<MPI_Datatype> arrayTypes(numtypes);
329
330 MPI_CHECK(
331 MPI_Type_get_contents(*dtype, numints, numaddr, numtypes,
332 arrayInts.data(), arrayAddr.data(), arrayTypes.data()));
333
334 for (int i = 0; i < numtypes; i++) {
335 if (arrayTypes[i] == MPI_INT) {
336 inoutvec[i].v_int = std::max(inoutvec[i].v_int, invec[i].v_int);
337 }
338 else if (arrayTypes[i] == MPI_UNSIGNED_LONG) {
339 inoutvec[i].v_unsLong = std::max(inoutvec[i].v_unsLong, invec[i].v_unsLong);
340 }
341 else if (arrayTypes[i] == MPI_FLOAT) {
342 inoutvec[i].v_float = std::max(inoutvec[i].v_float, invec[i].v_float);
343 }
344 else if (arrayTypes[i] == MPI_DOUBLE) {
345 inoutvec[i].v_double = std::max(inoutvec[i].v_double, invec[i].v_double);
346 }
347 else if (arrayTypes[i] == MPI_LONG_DOUBLE) {
348 inoutvec[i].v_longDouble = std::max(inoutvec[i].v_longDouble, invec[i].v_longDouble);
349 }
350 }
351 }
352
353 static void min(valType *invec, valType *inoutvec, int */*len*/,
354 MPI_Datatype *dtype) {
355 int numints;
356 int numaddr;
357 int numtypes;
358 int combiner;
359
360 MPI_CHECK(
361 MPI_Type_get_envelope(*dtype, &numints, &numaddr, &numtypes,
362 &combiner));
363
364 std::vector<int> arrayInts(numints);
365 std::vector<MPI_Aint> arrayAddr(numaddr);
366 std::vector<MPI_Datatype> arrayTypes(numtypes);
367
368 MPI_CHECK(
369 MPI_Type_get_contents(*dtype, numints, numaddr, numtypes,
370 arrayInts.data(), arrayAddr.data(), arrayTypes.data()));
371
372 for (int i = 0; i < numtypes; i++) {
373 if (arrayTypes[i] == MPI_INT) {
374 inoutvec[i].v_int = std::min(inoutvec[i].v_int, invec[i].v_int);
375 }
376 else if (arrayTypes[i] == MPI_UNSIGNED_LONG) {
377 inoutvec[i].v_unsLong = std::min(inoutvec[i].v_unsLong, invec[i].v_unsLong);
378 }
379 else if (arrayTypes[i] == MPI_FLOAT) {
380 inoutvec[i].v_float = std::min(inoutvec[i].v_float, invec[i].v_float);
381 }
382 else if (arrayTypes[i] == MPI_DOUBLE) {
383 inoutvec[i].v_double = std::min(inoutvec[i].v_double, invec[i].v_double);
384 }
385 else if (arrayTypes[i] == MPI_LONG_DOUBLE) {
386 inoutvec[i].v_longDouble = std::min(inoutvec[i].v_longDouble, invec[i].v_longDouble);
387 }
388 }
389 }
390
392 std::vector<MPI_Datatype> _types;
393
396 MPI_Datatype _agglomeratedType;
397
400
401};
402
403#endif /* COLLECTIVECOMMUNICATION_H_ */
This class is a dummy class which ensures that the collective communication commands also work if the...
Definition: CollectiveCommBase.h:21
virtual void appendUnsLong(unsigned long unsLongValue) override
Definition: CollectiveCommBase.h:58
virtual void appendInt(int intValue) override
Definition: CollectiveCommBase.h:50
virtual void finalize() override
delete memory and MPI_Type
Definition: CollectiveCommBase.h:152
virtual void appendDouble(double doubleValue) override
Definition: CollectiveCommBase.h:74
virtual void appendLongDouble(long double longDoubleValue) override
Definition: CollectiveCommBase.h:82
void init(int numValues)
allocate memory for the values to be stored, initialize getter-iterator
Definition: CollectiveCommBase.h:43
virtual void appendFloat(float floatValue) override
Definition: CollectiveCommBase.h:66
std::vector< valType > _values
Vector to store the values which shall be communicated.
Definition: CollectiveCommBase.h:162
virtual size_t getTotalSize() override
Definition: CollectiveCommBase.h:156
This class provides an interface for the collective communication classes.
Definition: CollectiveCommunicationInterface.h:15
This class is used to transfer several values of different types with a single command.
Definition: CollectiveCommunication.h:65
virtual size_t getTotalSize() override
Definition: CollectiveCommunication.h:233
void setMPIType()
defines a MPI datatype which can be used to transfer a CollectiveCommunication object
Definition: CollectiveCommunication.h:245
void appendInt(int intValue) override
Definition: CollectiveCommunication.h:94
void scanSum() override
Performs a scan (sum)
Definition: CollectiveCommunication.h:212
MPI_Comm _communicator
Communicator to be used by the communication commands.
Definition: CollectiveCommunication.h:399
void appendDouble(double doubleValue) override
Definition: CollectiveCommunication.h:112
void broadcast(int root=0) override
Definition: CollectiveCommunication.h:132
virtual void init(MPI_Comm communicator, int numValues, int key=0) override
allocate memory for the values to be sent, initialize counters
Definition: CollectiveCommunication.h:78
void allreduceSum() override
Performs an all-reduce (sum)
Definition: CollectiveCommunication.h:142
virtual void allreduceSumAllowPrevious() override
Definition: CollectiveCommunication.h:207
void appendUnsLong(unsigned long unsLongValue) override
Definition: CollectiveCommunication.h:100
virtual void finalize() override
delete memory and MPI_Type
Definition: CollectiveCommunication.h:86
std::vector< MPI_Datatype > _types
Vector of the corresponding MPI types for the values stored in _values.
Definition: CollectiveCommunication.h:392
MPI_Datatype _agglomeratedType
Definition: CollectiveCommunication.h:396
void appendFloat(float floatValue) override
Definition: CollectiveCommunication.h:106
void allreduceCustom(ReduceType type) override
Definition: CollectiveCommunication.h:146
void appendLongDouble(long double longDoubleValue) override
Definition: CollectiveCommunication.h:118
static void add(valType *invec, valType *inoutvec, int *, MPI_Datatype *dtype)
method used by MPI to add variables of this type
Definition: CollectiveCommunication.h:277
MPI_Comm getTopology() override
Definition: CollectiveCommunication.h:125
static void exit(int exitcode)
Terminate simulation with given exit code.
Definition: Simulation.cpp:155
Enumeration class corresponding to the type schema type.
Definition: vtk-unstructured.h:1746
Definition: CollectiveCommBase.h:29