ls1-MarDyn
ls1-MarDyn molecular dynamics code
CollectiveCommunicationSingleNonBlocking.h
1/*
2 * CollectiveCommunicationSingleNonBlocking.h
3 *
4 * Created on: May 2, 2016
5 * Author: seckler
6 */
7
8#pragma once
9
10#include "CollectiveCommunication.h"
11#if MPI_VERSION >= 3
12
18
19public:
24
30
36
41 if (_request) {
42 if (_communicationInitiated) {
43 MPI_Wait(_request.get(), MPI_STATUS_IGNORE);
44 }
45 }
46 if (_agglomeratedTypeAddOperator != MPI_OP_NULL) {
47 MPI_CHECK(MPI_Op_free(&_agglomeratedTypeAddOperator));
48 }
49 if (_agglomeratedType != MPI_DATATYPE_NULL) {
50 MPI_CHECK(MPI_Type_free(&_agglomeratedType));
51 }
52 }
53
54 void allreduceSumAllowPrevious() override {
55 if (!_valuesValid) {
56 if (_communicationInitiated) {
57 // if the values are not valid and communication is already initiated, first wait for the communication to finish.
58 waitAndUpdateData();
59 }
60 // if the values were invalid, we have to do a proper allreduce!
62 _tempValues = _values; // save it somewhere safe for next iteration!
63 _valuesValid = true;
64 } else {
65
66 if (_communicationInitiated) {
67 // if the values are not valid and communication is already initiated, first wait for the communication to finish.
68 // safe the data, that needs to be sent
69 std::vector<valType> toSendValues = _values;
70 // get the new data, this updates _values
71 waitAndUpdateData();
72 // initiate the next allreduce
73 initAllreduceSum(toSendValues);
74 } else {
75 std::vector<valType> previous = _tempValues;
76 // initiate the next allreduce
77 initAllreduceSum(_values); // this updates _tempValues
78 // restore saved data from previous operations.
79 _values = previous;
80 }
81
82 }
83 }
84
85 // documentation in base class
86 void init(MPI_Comm communicator, int numValues, int key = 0) override {
87 CollectiveCommunication::init(communicator, numValues);
88#ifndef NDEBUG
89 if (!_firstComm) {
90 mardyn_assert(static_cast<int>(_values.size()) == numValues);
91 }
92#endif
93 }
94
95 // documentation in base class
96 void finalize() override {
97 // We intentionally use CollectiveCommBase::finalize(), as _agglomeratedType might not be MPI_DATATYPE_NULL.
99 _types.clear();
100 }
101
102private:
103
104 /*int testReceived() {
105 int flag;
106 MPI_CHECK(MPI_Test(&_request, &flag, MPI_STATUS_IGNORE));
107 return flag;
108 }*/
109
113 void waitAndUpdateData() {
114 MPI_CHECK(MPI_Wait(_request.get(), MPI_STATUS_IGNORE));
115 // copy the temporary values to the real values!
116 _values = _tempValues;
117
118 _communicationInitiated = false;
119 _valuesValid = true;
120 }
121
122 void initAllreduceSum(std::vector<valType> values) {
123 // copy the values to a temporary vector:
124 // all nonblocking operations have to be performed on this vector!
125 // this is necessary to maintain the validity of the data from previous steps
126 _tempValues = values;
127#if ENABLE_AGGLOMERATED_REDUCE
128 if (_agglomeratedType == MPI_DATATYPE_NULL) {
129 setMPIType();
130 }
131 const int commutative = 1;
132 valType * startOfValues = &(_tempValues[0]);
133 if (_agglomeratedTypeAddOperator == MPI_OP_NULL) {
134 MPI_CHECK(
135 MPI_Op_create((MPI_User_function * ) CollectiveCommunication::add, commutative,
136 &_agglomeratedTypeAddOperator));
137 }
138 MPI_CHECK(
139 MPI_Iallreduce(MPI_IN_PLACE, startOfValues, 1, _agglomeratedType, _agglomeratedTypeAddOperator, _communicator, _request.get()));
140 _communicationInitiated = true;
141#else
142 for( unsigned int i = 0; i < _types.size(); i++ ) {
143 MPI_CHECK( MPI_Allreduce( MPI_IN_PLACE, &(_values[i]), 1, _types[i], MPI_SUM, _communicator ) );
144 }
145#endif
146
147 }
148
149 std::unique_ptr<MPI_Request> _request{new MPI_Request()};
150 MPI_Op _agglomeratedTypeAddOperator{MPI_OP_NULL};
151 bool _communicationInitiated{false};
152 bool _valuesValid{false};
154 std::vector<valType> _tempValues;
155 bool _firstComm{true};
156};
157
158#endif // MPI_VERSION >= 3
virtual void finalize() override
delete memory and MPI_Type
Definition: CollectiveCommBase.h:152
std::vector< valType > _values
Vector to store the values which shall be communicated.
Definition: CollectiveCommBase.h:162
Definition: CollectiveCommunicationSingleNonBlocking.h:17
void allreduceSumAllowPrevious() override
Definition: CollectiveCommunicationSingleNonBlocking.h:54
CollectiveCommunicationSingleNonBlocking(const CollectiveCommunicationSingleNonBlocking &)=delete
~CollectiveCommunicationSingleNonBlocking() override
Definition: CollectiveCommunicationSingleNonBlocking.h:40
void init(MPI_Comm communicator, int numValues, int key=0) override
allocate memory for the values to be sent, initialize counters
Definition: CollectiveCommunicationSingleNonBlocking.h:86
CollectiveCommunicationSingleNonBlocking & operator=(const CollectiveCommunicationSingleNonBlocking &)=delete
void finalize() override
delete memory and MPI_Type
Definition: CollectiveCommunicationSingleNonBlocking.h:96
This class is used to transfer several values of different types with a single command.
Definition: CollectiveCommunication.h:65
void setMPIType()
defines a MPI datatype which can be used to transfer a CollectiveCommunication object
Definition: CollectiveCommunication.h:245
MPI_Comm _communicator
Communicator to be used by the communication commands.
Definition: CollectiveCommunication.h:399
virtual void init(MPI_Comm communicator, int numValues, int key=0) override
allocate memory for the values to be sent, initialize counters
Definition: CollectiveCommunication.h:78
void allreduceSum() override
Performs an all-reduce (sum)
Definition: CollectiveCommunication.h:142
std::vector< MPI_Datatype > _types
Vector of the corresponding MPI types for the values stored in _values.
Definition: CollectiveCommunication.h:392
MPI_Datatype _agglomeratedType
Definition: CollectiveCommunication.h:396
static void add(valType *invec, valType *inoutvec, int *, MPI_Datatype *dtype)
method used by MPI to add variables of this type
Definition: CollectiveCommunication.h:277