dune-fem  2.6-git
cachedcommmanager.hh
Go to the documentation of this file.
1 #ifndef DUNE_FEM_CACHED_COMMUNICATION_MANAGER_HH
2 #define DUNE_FEM_CACHED_COMMUNICATION_MANAGER_HH
3 
4 #include <cassert>
5 #include <cstddef>
6 
7 //- system includes
8 #include <iostream>
9 #include <map>
10 #include <queue>
11 #include <memory>
12 #include <vector>
13 
14 //- dune-common includes
15 #include <dune/common/math.hh>
16 #include <dune/common/timer.hh>
17 #include <dune/common/visibility.hh>
18 
19 //- dune-grid includes
20 #include <dune/grid/common/grid.hh>
21 #include <dune/grid/common/datahandleif.hh>
22 #include <dune/grid/utility/entitycommhelper.hh>
23 
24 // include alugrid headers to have to communicator class from ALUGrid
25 #if HAVE_DUNE_ALUGRID
26 #include <dune/alugrid/3d/alu3dinclude.hh>
27 #endif
28 
29 //- dune-fem includes
34 #include <dune/fem/misc/functor.hh>
36 
37 namespace Dune
38 {
39 
40  namespace Fem
41  {
42 
43  // External Forward Declarations
44  // -----------------------------
45 
46  template< class DiscreteFunctionSpace >
48 
49  class IsDiscreteFunction;
50 
51 
52 
57 // only if ALUGrid found and was build for parallel runs
58 // if HAVE_DUNE_ALUGRID is not defined, ALU3DGRID_PARALLEL shouldn't be either
59 #if ALU3DGRID_PARALLEL
60 
66  template< class Space >
67  class DependencyCache
68  {
69  public:
71  typedef Space SpaceType;
72 
74  typedef typename SpaceType :: GridPartType GridPartType;
75 
76  protected:
77  // type of communication indices
78  typedef CommunicationIndexMap IndexMapType;
79 
80  // type of IndexMapVector
81  typedef IndexMapType *IndexMapVectorType;
82 
83  // type of set of links
84  typedef std :: set< int > LinkStorageType;
85 
86  // ALUGrid send/recv buffers
87  typedef ALU3DSPACE ObjectStream ObjectStreamType;
88 
89  // type of communicator
90  typedef ALU3DSPACE MpAccessLocal MPAccessInterfaceType;
91  // type of communication implementation
92  typedef ALU3DSPACE MpAccessMPI MPAccessImplType;
93 
95  typedef std :: vector< ObjectStreamType > ObjectStreamVectorType;
96 
97  protected:
98  const SpaceType &space_;
99  const GridPartType &gridPart_;
100 
101  const InterfaceType interface_;
102  const CommunicationDirection dir_;
103 
104  const int myRank_;
105  const int mySize_;
106 
107  LinkStorageType linkStorage_;
108 
109  IndexMapType *recvIndexMap_;
110  IndexMapType *sendIndexMap_;
111 
112  // ALUGrid communicatior Class
113  MPAccessInterfaceType *mpAccess_;
114 
115  // exchange time
116  double exchangeTime_;
117  // setup time
118  double buildTime_;
119 
121  int sequence_;
122 
123  int nonBlockingObjects_ ;
124 
125  protected:
126  template< class LinkStorage, class IndexMapVector, InterfaceType CommInterface >
127  class LinkBuilder;
128 
130  // begin NonBlockingCommunication
132 
133  class NonBlockingCommunication
134  {
135  typedef DependencyCache < Space > DependencyCacheType;
136 
137 #if HAVE_DUNE_ALUGRID
138  typedef MPAccessInterfaceType :: NonBlockingExchange NonBlockingExchange;
139 
140  template <class DiscreteFunction>
141  class Pack : public NonBlockingExchange :: DataHandleIF
142  {
143  protected:
144  NonBlockingCommunication& commObj_;
145  const DiscreteFunction& discreteFunction_;
146 
147  public:
148  Pack( NonBlockingCommunication& commObj, const DiscreteFunction& df )
149  : commObj_( commObj ), discreteFunction_( df )
150  {}
151 
152  void pack( const int link, ObjectStreamType& buffer )
153  {
154  commObj_.pack( link, buffer, discreteFunction_ );
155  }
156 
157  void unpack( const int link, ObjectStreamType& buffer )
158  {
159  DUNE_THROW(InvalidStateException,"Pack::unpack should not be called!");
160  }
161  };
162 
163  template <class DiscreteFunction, class Operation>
164  class Unpack : public NonBlockingExchange :: DataHandleIF
165  {
166  protected:
167  NonBlockingCommunication& commObj_;
168  DiscreteFunction& discreteFunction_;
169 
170  // communication operation (usually ADD or COPY)
171  const Operation operation_;
172 
173  public:
174  Unpack( NonBlockingCommunication& commObj, DiscreteFunction& df )
175  : commObj_( commObj ), discreteFunction_( df ), operation_()
176  {}
177 
178  void pack( const int link, ObjectStreamType& buffer )
179  {
180  DUNE_THROW(InvalidStateException,"Unpack::pack should not be called!");
181  }
182 
183  void unpack( const int link, ObjectStreamType& buffer )
184  {
185  commObj_.unpack( link, buffer, discreteFunction_, operation_ );
186  }
187  };
188 #else // ALUGRID_HAS_NONBLOCKING_COMM is false
189  typedef int NonBlockingExchange;
190 #endif
191 
192  // create an unique tag for the communication
193  DUNE_EXPORT static int getMessageTag()
194  {
195  enum { initial = 665 };
196  static int tagCounter = initial ;
197  ++ tagCounter;
198  int messageTag = tagCounter ;
199 
200  // avoid overflow
201  if( messageTag < 0 )
202  {
203  messageTag = initial ;
204  tagCounter = initial ;
205  }
206  return messageTag;
207  }
208 
209  public:
210  NonBlockingCommunication( DependencyCacheType& dependencyCache, const int mySize )
211  : dependencyCache_( dependencyCache ),
212  nonBlockingExchange_(),
213  buffer_(),
214  exchangeTime_( 0.0 ),
215  mySize_( mySize )
216  {
217  // update cache ( if necessary )
218  dependencyCache_.rebuild();
219 
220  // notify dependency cache of open communication
221  dependencyCache_.attachComm();
222  }
223 
224  // copy constructor
225  NonBlockingCommunication( const NonBlockingCommunication& other )
226  : dependencyCache_( other.dependencyCache_ ),
227  nonBlockingExchange_(),
228  buffer_(),
229  exchangeTime_( 0.0 ),
230  mySize_( other.mySize_ )
231  {
232  // update cache ( if necessary )
233  dependencyCache_.rebuild();
234 
235  // notify dependency cache of open communication
236  dependencyCache_.attachComm();
237  }
238 
239  ~NonBlockingCommunication()
240  {
241  // if this assertion fails some communication has not been finished
242  assert( ! nonBlockingExchange_ );
243  // notify dependency cache that comm is finished
244  dependencyCache_.detachComm() ;
245  }
246 
247  template < class DiscreteFunctionSpace >
248  void send( const PetscDiscreteFunction< DiscreteFunctionSpace >& discreteFunction )
249  {
250  // nothing to do for the PetscDiscreteFunction here
251  }
252 
253  template < class DiscreteFunction >
254  void send( const DiscreteFunction& discreteFunction )
255  {
256  // check that object is in non-sent state
257  assert( ! nonBlockingExchange_ );
258 
259  // on serial runs: do nothing
260  if( mySize_ <= 1 ) return;
261 
262  // take time
263  Dune::Timer sendTimer ;
264 
265  // this variable can change during rebuild
266  const int nLinks = dependencyCache_.nlinks();
267 
268  // resize buffer vector
269  buffer_.resize( nLinks );
270 
271 #if HAVE_DUNE_ALUGRID
272  // get non-blocking exchange object from mpAccess including message tag
273  nonBlockingExchange_.reset( dependencyCache_.mpAccess().nonBlockingExchange( getMessageTag() ) );
274 
275  // pack data object
276  Pack< DiscreteFunction > packData( *this, discreteFunction );
277 
278  // perform send operation including packing of data
279  nonBlockingExchange_->send( buffer_, packData );
280 #else
281  // write buffers
282  for( int link = 0; link < nLinks; ++link )
283  pack( link, buffer_[ link ], discreteFunction );
284 #endif
285 
286  // store time needed for sending
287  exchangeTime_ = sendTimer.elapsed();
288  }
289 
291  template < class DiscreteFunctionSpace, class Operation >
292  double receive( PetscDiscreteFunction< DiscreteFunctionSpace >& discreteFunction,
293  const Operation& operation )
294  {
295  // take time
296  Dune::Timer exchTimer;
297 
298  // PetscDiscreteFunction has it's own communication
299  discreteFunction.communicate();
300 
301  return exchTimer.elapsed();
302  }
303 
305  template < class DiscreteFunction, class Operation >
306  double receive( DiscreteFunction& discreteFunction, const Operation& operation )
307  {
308  // on serial runs: do nothing
309  if( mySize_ <= 1 ) return 0.0;
310 
311  // take time
312  Dune::Timer recvTimer ;
313 
314 #if HAVE_DUNE_ALUGRID
315  // unpack data object
316  Unpack< DiscreteFunction, Operation > unpackData( *this, discreteFunction );
317 
318  // receive data and unpack
319  nonBlockingExchange_->receive( unpackData );
320 #else
321  // use exchange for older ALUGrid versions (send and receive)
322  buffer_ = dependencyCache_.mpAccess().exchange( buffer_ );
323 
324  // this variable can change during rebuild
325  const int nLinks = buffer_.size();
326 
327  // read buffers and store to discrete function
328  for( int link = 0; link < nLinks; ++link )
329  unpack( link, buffer_[ link ], discreteFunction, operation );
330 #endif
331 
332  // store time needed for sending
333  exchangeTime_ += recvTimer.elapsed();
334 
335 #if HAVE_DUNE_ALUGRID
336  // clear nonBlockingExchange object
337  nonBlockingExchange_.reset();
338 #endif
339  return exchangeTime_;
340  }
341 
343  template < class DiscreteFunction >
344  double receive( DiscreteFunction& discreteFunction )
345  {
346  // get type of default operation
347  typedef typename DiscreteFunction :: DiscreteFunctionSpaceType
348  :: template CommDataHandle< DiscreteFunction > :: OperationType DefaultOperationType;
349  DefaultOperationType operation;
350  return receive( discreteFunction, operation );
351  }
352 
353  protected:
354  template <class DiscreteFunction>
355  void pack( const int link, ObjectStreamType& buffer, const DiscreteFunction& discreteFunction )
356  {
357  // reset buffer counters
358  buffer.clear();
359  // write data of discrete function to message buffer
360  dependencyCache_.writeBuffer( link, buffer, discreteFunction );
361  }
362 
363  template <class DiscreteFunction, class Operation>
364  void unpack( const int link, ObjectStreamType& buffer,
365  DiscreteFunction& discreteFunction, const Operation& operation )
366  {
367  // read data of discrete function from message buffer
368  dependencyCache_.readBuffer( link, buffer, discreteFunction, operation );
369  }
370 
371  protected:
372  DependencyCacheType& dependencyCache_;
373  std::unique_ptr< NonBlockingExchange > nonBlockingExchange_ ;
374  ObjectStreamVectorType buffer_;
375  double exchangeTime_ ;
376  const int mySize_;
377  };
378 
379  public:
380  typedef NonBlockingCommunication NonBlockingCommunicationType;
381 
383  NonBlockingCommunicationType nonBlockingCommunication()
384  {
385  // create non-blocking communication object
386  return NonBlockingCommunicationType( *this, mySize_ );
387  }
389  // end NonBlockingCommunication
391 
393  DependencyCache( const SpaceType &space, const InterfaceType interface, const CommunicationDirection dir )
394  : space_( space ),
395  gridPart_( space_.gridPart() ),
396  interface_( interface ),
397  dir_(dir),
398  myRank_( gridPart_.comm().rank() ),
399  mySize_( gridPart_.comm().size() ),
400  linkStorage_(),
401  recvIndexMap_( new IndexMapType[ mySize_ ] ),
402  sendIndexMap_( new IndexMapType[ mySize_ ] ),
403  mpAccess_( new MPAccessImplType( MPIHelper::getCommunicator() ) ),
404  exchangeTime_( 0.0 ),
405  buildTime_( 0.0 ),
406  sequence_( -1 ),
407  nonBlockingObjects_( 0 )
408  {}
409 
410  DependencyCache( const DependencyCache & ) = delete;
411 
413  ~DependencyCache()
414  {
415  delete mpAccess_;
416  mpAccess_ = 0;
417 
418  delete [] sendIndexMap_;
419  sendIndexMap_ = 0;
420 
421  delete [] recvIndexMap_;
422  recvIndexMap_ = 0;
423  }
424 
426  InterfaceType communicationInterface() const
427  {
428  return interface_;
429  }
430 
432  CommunicationDirection communicationDirection() const
433  {
434  return dir_;
435  }
436 
438  double buildTime() const
439  {
440  return buildTime_;
441  }
442 
444  double exchangeTime() const
445  {
446  return exchangeTime_;
447  }
448 
449  // build linkage and index maps
450  inline void buildMaps();
451 
452  // notify for open non-blocking communications
453  void attachComm()
454  {
455  ++ nonBlockingObjects_;
456  }
457 
458  // notify for finished non-blocking communication
459  void detachComm()
460  {
461  --nonBlockingObjects_;
462  assert( nonBlockingObjects_ >= 0 );
463  }
464 
465  bool noOpenCommunications() const
466  {
467  return true ;
468  }
469  protected:
470  // check consistency of maps
471  inline void checkConsistency();
472 
473  template< class LS, class IMV, InterfaceType CI >
474  inline void buildMaps( LinkBuilder< LS, IMV, CI > &handle );
475 
476  public:
478  inline int dest( const int link ) const
479  {
480  return mpAccess().dest()[ link ];
481  }
482 
484  inline int nlinks() const
485  {
486  return mpAccess().nlinks();
487  }
488 
490  inline void rebuild()
491  {
492  // only in parallel we have to do something
493  if( mySize_ <= 1 ) return;
494 
495  // make sure all non-blocking communications have been finished by now
496  assert( noOpenCommunications() );
497 #ifndef NDEBUG
498  // make sure buildMaps is called on every process
499  // otherwise the programs wait here until forever
500  int willRebuild = (sequence_ != space_.sequence()) ? 1 : 0;
501  const int myRebuild = willRebuild;
502 
503  // send willRebuild from rank 0 to all
504  gridPart_.comm().broadcast( &willRebuild, 1 , 0);
505 
506  assert( willRebuild == myRebuild );
507 #endif
508 
509  // check whether grid has changed.
510  if( sequence_ != space_.sequence() )
511  {
512  // take timer needed for rebuild
513  Dune::Timer buildTime;
514 
515  buildMaps();
516  sequence_ = space_.sequence();
517 
518  // store time needed
519  buildTime_ = buildTime.elapsed();
520  }
521  }
522 
524  template< class DiscreteFunction, class Operation >
525  inline void exchange( DiscreteFunction &discreteFunction, const Operation& operation );
526 
528  template< class DiscreteFunction >
529  inline void writeBuffer( ObjectStreamVectorType &osv, const DiscreteFunction &discreteFunction ) const;
530 
532  template< class DiscreteFunctionType, class Operation >
533  inline void readBuffer( ObjectStreamVectorType &osv,
534  DiscreteFunctionType &discreteFunction,
535  const Operation& operation ) const;
536 
538  inline MPAccessInterfaceType &mpAccess()
539  {
540  assert( mpAccess_ );
541  return *mpAccess_;
542  }
543 
545  inline const MPAccessInterfaceType &mpAccess() const
546  {
547  assert( mpAccess_ );
548  return *mpAccess_;
549  }
550 
551  protected:
552  // specialization for PetscDiscreteFunction doing nothing
553  template< class DiscreteFunctionSpace >
554  inline void writeBuffer( const int link,
555  ObjectStreamType &str,
556  const PetscDiscreteFunction< DiscreteFunctionSpace > &discreteFunction ) const
557  {
558  DUNE_THROW(NotImplemented,"writeBuffer not implemented for PetscDiscteteFunction" );
559  }
560 
561  // write data of DataImp& vector to object stream
562  // --writeBuffer
563  template< class DiscreteFunction >
564  inline void writeBuffer( const int link,
565  ObjectStreamType &str,
566  const DiscreteFunction &discreteFunction ) const
567  {
568  assert( sequence_ == space_.sequence() );
569  const auto &indexMap = sendIndexMap_[ dest( link ) ];
570  const int size = indexMap.size();
571 
572  typedef typename DiscreteFunction :: DofType DofType;
573 
574  // reserve write buffer for storage of dofs
575  typename DiscreteFunction::DiscreteFunctionSpaceType::LocalBlockIndices localBlockIndices;
576  str.reserve( size * Hybrid::size( localBlockIndices ) * sizeof( DofType ) );
577  for( int i = 0; i < size; ++i )
578  {
579  const auto &block = discreteFunction.dofVector()[ indexMap[ i ] ];
580  Hybrid::forEach( localBlockIndices, [ &str, &block ] ( auto &&k ) { str.writeUnchecked( block[ k ] ); } );
581  }
582  }
583 
584  // read data from object stream to DataImp& data vector
585  // specialization for PetscDiscreteFunction doing nothing
586  template< class DiscreteFunctionSpace, class Operation >
587  inline void readBuffer( const int link,
588  ObjectStreamType &str,
589  PetscDiscreteFunction< DiscreteFunctionSpace > &discreteFunction,
590  const Operation& ) const
591  {
592  DUNE_THROW(NotImplemented,"readBuffer not implemented for PetscDiscteteFunction" );
593  }
594 
595  // read data from object stream to DataImp& data vector
596  // --readBuffer
597  template< class DiscreteFunction, class Operation >
598  inline void readBuffer( const int link,
599  ObjectStreamType &str,
600  DiscreteFunction &discreteFunction,
601  const Operation& operation ) const
602  {
603  static_assert( ! std::is_pointer< Operation > :: value,
604  "DependencyCache::readBuffer: Operation needs to be a reference!");
605 
606  assert( sequence_ == space_.sequence() );
607  typedef typename DiscreteFunction :: DofType DofType;
608 
609  // get index map of rank belonging to link
610  const auto &indexMap = recvIndexMap_[ dest( link ) ];
611 
612  const int size = indexMap.size();
613  // make sure that the receive buffer has the correct size
614  typename DiscreteFunction::DiscreteFunctionSpaceType::LocalBlockIndices localBlockIndices;
615  assert( static_cast< std::size_t >( size * Hybrid::size( localBlockIndices ) * sizeof( DofType ) ) <= static_cast< std::size_t >( str.size() ) );
616  for( int i = 0; i < size; ++i )
617  {
618  auto &&block = discreteFunction.dofVector()[ indexMap[ i ] ];
619  Hybrid::forEach( localBlockIndices, [ &str, &operation, &block ] ( auto &&k ) {
620  DofType value;
621 #if HAVE_DUNE_ALUGRID
622  str.readUnchecked( value );
623 #else // #if HAVE_DUNE_ALUGRID
624  str.read( value );
625 #endif // #else // #if HAVE_DUNE_ALUGRID
626  // apply operation, i.e. COPY, ADD, etc.
627  operation( value, block[ k ] );
628  } );
629  }
630  }
631  };
632 
633  // --LinkBuilder
634  template< class Space >
635  template< class LinkStorage, class IndexMapVector, InterfaceType CommInterface >
636  class DependencyCache< Space > :: LinkBuilder
637  : public CommDataHandleIF
638  < LinkBuilder< LinkStorage, IndexMapVector, CommInterface >,
639  typename SpaceType :: BlockMapperType :: GlobalKeyType >
640  {
641  public:
642  typedef LinkStorage LinkStorageType;
643 
644  typedef IndexMapVector IndexMapVectorType;
645 
646  typedef typename SpaceType :: BlockMapperType BlockMapperType;
647  typedef typename BlockMapperType :: GlobalKeyType GlobalKeyType;
648 
649  typedef GlobalKeyType DataType;
650 
651  protected:
652  const GlobalKeyType myRank_;
653  const GlobalKeyType mySize_;
654 
655  LinkStorageType &linkStorage_;
656 
657  IndexMapVectorType &sendIndexMap_;
658  IndexMapVectorType &recvIndexMap_;
659 
660  const SpaceType &space_;
661  const BlockMapperType &blockMapper_;
662 
663  public:
664  LinkBuilder( LinkStorageType &linkStorage,
665  IndexMapVectorType &sendIdxMap,
666  IndexMapVectorType &recvIdxMap,
667  const SpaceType &space )
668  : myRank_( space.gridPart().comm().rank() ),
669  mySize_( space.gridPart().comm().size() ),
670  linkStorage_( linkStorage ),
671  sendIndexMap_( sendIdxMap ),
672  recvIndexMap_( recvIdxMap ),
673  space_( space ),
674  blockMapper_( space.blockMapper() )
675  {}
676 
677  protected:
678  void sendBackSendMaps()
679  {
680  // create ALU communicator
681  MPAccessImplType mpAccess( MPIHelper::getCommunicator() );
682 
683  // build linkage
684  mpAccess.removeLinkage();
685  // insert new linkage
686  mpAccess.insertRequestSymetric( linkStorage_ );
687  // get destination ranks
688  std::vector<int> dest = mpAccess.dest();
689  // get number of links
690  const int nlinks = mpAccess.nlinks();
691 
692  // create buffers
693  ObjectStreamVectorType osv( nlinks );
694 
696  //
697  // at this point complete send maps exsist on receiving side,
698  // so send them back to sending side
699  //
701 
702  // write all send maps to buffer
703  for(int link=0; link<nlinks; ++link)
704  sendIndexMap_[ dest[link] ].writeToBuffer( osv[link] );
705 
706  // exchange data
707  osv = mpAccess.exchange( osv );
708 
709  // read all send maps from buffer
710  for(int link=0; link<nlinks; ++link)
711  sendIndexMap_[ dest[link] ].readFromBuffer( osv[link] );
712  }
713 
714  public:
716  ~LinkBuilder()
717  {
718  sendBackSendMaps();
719  }
720 
722  bool contains( int dim, int codim ) const
723  {
724  return space_.blockMapper().contains( codim );
725  }
726 
728  bool fixedsize( int dim, int codim ) const
729  {
730  return false;
731  }
732 
734  template< class MessageBuffer, class Entity >
735  void gather( MessageBuffer &buffer, const Entity &entity ) const
736  {
737  // check whether we are a sending entity
738  const auto myPartitionType = entity.partitionType();
739  const bool send = EntityCommHelper< CommInterface > :: send( myPartitionType );
740 
741  // if we send data then send rank and dofs
742  if( send )
743  {
744  // send rank for linkage
745  buffer.write( myRank_ );
746 
747  const int numDofs = blockMapper_.numEntityDofs( entity );
748 
749  // int should be GlobalKey !!!!
750  typedef std::vector< GlobalKeyType > IndicesType ;
751  IndicesType indices( numDofs );
752 
753  // copy all global keys
754  blockMapper_.mapEachEntityDof( entity, AssignFunctor< IndicesType >( indices ) );
755 
756  // write global keys to message buffer
757  for( int i = 0; i < numDofs; ++i )
758  buffer.write( indices[ i ] );
759  }
760  }
761 
763  template< class MessageBuffer, class Entity >
764  void scatter( MessageBuffer &buffer, const Entity &entity, const size_t dataSize )
765  {
766  // if data size > 0 then other side is sender
767  if( dataSize > 0 )
768  {
769  // read rank of other side
770  GlobalKeyType rank;
771  buffer.read( rank );
772  assert( (rank >= 0) && (rank < mySize_) );
773 
774  // check whether we are a sending entity
775  const auto myPartitionType = entity.partitionType();
776  const bool receive = EntityCommHelper< CommInterface > :: receive( myPartitionType );
777 
778  // insert rank of link into set of links
779  linkStorage_.insert( rank );
780 
781  // read indices from stream
782  typedef std::vector< GlobalKeyType > IndicesType ;
783  IndicesType indices( dataSize - 1 );
784  for(size_t i=0; i<dataSize-1; ++i)
785  buffer.read( indices[i] );
786 
787  // if we are a receiving entity
788  if( receive )
789  {
791  //
792  // Problem here: sending and receiving order might differ
793  // Solution: sort all dofs after receiving order and send
794  // senders dofs back at the end
795  //
797 
798  // if data has been send and we are receive entity
799  // then insert indices into send map of rank
800  sendIndexMap_[ rank ].insert( indices );
801 
802  // build local mapping for receiving of dofs
803  const int numDofs = blockMapper_.numEntityDofs( entity );
804  indices.resize( numDofs );
805 
806  // map each entity dof and store in indices
807  blockMapper_.mapEachEntityDof( entity, AssignFunctor< IndicesType >( indices ) );
808 
809  // insert receiving dofs
810  recvIndexMap_[ rank ].insert( indices );
811  }
812  }
813  }
814 
816  template< class Entity >
817  size_t size( const Entity &entity ) const
818  {
819  const PartitionType myPartitionType = entity.partitionType();
820  const bool send = EntityCommHelper< CommInterface > :: send( myPartitionType );
821  return (send) ? (blockMapper_.numEntityDofs( entity ) + 1) : 0;
822  }
823  };
824 
825 
826 
827  template< class Space >
828  inline void DependencyCache< Space > :: buildMaps()
829  {
830  if( interface_ == InteriorBorder_All_Interface )
831  {
832  LinkBuilder< LinkStorageType, IndexMapVectorType,
833  InteriorBorder_All_Interface >
834  handle( linkStorage_, sendIndexMap_, recvIndexMap_, space_ );
835  buildMaps( handle );
836  }
837  else if( interface_ == InteriorBorder_InteriorBorder_Interface )
838  {
839  LinkBuilder< LinkStorageType, IndexMapVectorType,
840  InteriorBorder_InteriorBorder_Interface >
841  handle( linkStorage_, sendIndexMap_, recvIndexMap_, space_ );
842  buildMaps( handle );
843  }
844  else if( interface_ == All_All_Interface )
845  {
846  LinkBuilder< LinkStorageType, IndexMapVectorType, All_All_Interface >
847  handle( linkStorage_, sendIndexMap_, recvIndexMap_, space_ );
848  buildMaps( handle );
849  }
850  else
851  DUNE_THROW( NotImplemented, "DependencyCache for the given interface has not been implemented, yet." );
852 #ifndef NDEBUG
853  // checks that sizes of index maps are equal on sending and receiving proc
854  checkConsistency();
855 #endif
856  }
857 
858 
859  template< class Space >
860  template< class LS, class IMV, InterfaceType CI >
861  inline void DependencyCache< Space > :: buildMaps( LinkBuilder< LS, IMV, CI > &handle )
862  {
863  linkStorage_.clear();
864  for( int i = 0; i < mySize_; ++i )
865  {
866  recvIndexMap_[ i ].clear();
867  sendIndexMap_[ i ].clear();
868  }
869 
870  // make one all to all communication to build up communication pattern
871  gridPart_.communicate( handle, All_All_Interface , ForwardCommunication );
872 
873  // remove old linkage
874  mpAccess().removeLinkage();
875  // create new linkage
876  mpAccess().insertRequestSymetric( linkStorage_ );
877  }
878 
879  template< class Space >
880  inline void DependencyCache< Space > :: checkConsistency()
881  {
882  const int nLinks = nlinks();
883 
884  ObjectStreamVectorType buffer( nLinks );
885 
886  // check that order and size are consistent
887  for(int l=0; l<nLinks; ++l)
888  {
889  buffer[l].clear();
890  const int sendSize = sendIndexMap_[ dest( l ) ].size();
891  buffer[l].write( sendSize );
892  for(int i=0; i<sendSize; ++i)
893  buffer[l].write( i );
894  }
895 
896  // exchange data to other procs
897  buffer = mpAccess().exchange( buffer );
898 
899  // check that order and size are consistent
900  for(int l=0; l<nLinks; ++l)
901  {
902  const int recvSize = recvIndexMap_[ dest( l ) ].size();
903  int sendedSize;
904  buffer[l].read( sendedSize );
905 
906  // compare sizes, must be the same
907  if( recvSize != sendedSize )
908  {
909  DUNE_THROW(InvalidStateException,"Sizes do not match!" << sendedSize << " o|r " << recvSize);
910  }
911 
912  for(int i=0; i<recvSize; ++i)
913  {
914  int idx;
915  buffer[l].read( idx );
916 
917  // ordering should be the same on both sides
918  if( i != idx )
919  {
920  DUNE_THROW(InvalidStateException,"Wrong ordering of send and recv maps!");
921  }
922  }
923  }
924  }
925 
926  template< class Space >
927  template< class DiscreteFunction, class Operation >
928  inline void DependencyCache< Space > :: exchange( DiscreteFunction &discreteFunction, const Operation& operation )
929  {
930  // on serial runs: do nothing
931  if( mySize_ <= 1 ) return;
932 
933  // create non-blocking communication object
934  NonBlockingCommunicationType nbc( *this, mySize_ );
935 
936  // perform send operation
937  nbc.send( discreteFunction );
938 
939  // store time for send and receive of data
940  exchangeTime_ = nbc.receive( discreteFunction, operation );
941  }
942 
943  template< class Space >
944  template< class DiscreteFunction >
945  inline void DependencyCache< Space > :: writeBuffer( ObjectStreamVectorType &osv,
946  const DiscreteFunction &discreteFunction ) const
947  {
948  const int numLinks = nlinks();
949  for( int link = 0; link < numLinks; ++link )
950  writeBuffer( link, osv[ link ], discreteFunction );
951  }
952 
953  template< class Space >
954  template< class DiscreteFunction, class Operation >
955  inline void DependencyCache< Space > :: readBuffer( ObjectStreamVectorType &osv, DiscreteFunction &discreteFunction,
956  const Operation& operation ) const
957  {
958  const int numLinks = nlinks();
959  for( int link = 0; link < numLinks; ++link )
960  readBuffer( link, osv[ link ], discreteFunction, operation );
961  }
962 
964  template <class SpaceImp>
965  class CommManagerSingletonKey
966  {
967  const SpaceImp & space_;
968  const InterfaceType interface_;
969  const CommunicationDirection dir_;
970  public:
972  CommManagerSingletonKey(const SpaceImp & space,
973  const InterfaceType interface,
974  const CommunicationDirection dir)
975  : space_(space), interface_(interface), dir_(dir)
976  {}
977 
979  CommManagerSingletonKey(const CommManagerSingletonKey & org)
980  : space_(org.space_), interface_(org.interface_), dir_(org.dir_)
981  {}
982 
984  bool operator == (const CommManagerSingletonKey & otherKey) const
985  {
986  // mapper of space is singleton
987  return (&(space_.blockMapper()) == & (otherKey.space_.blockMapper()) );
988  }
989 
991  const SpaceImp & space() const
992  {
993  return space_;
994  }
995 
997  InterfaceType interface() const
998  {
999  return interface_;
1000  }
1001 
1003  CommunicationDirection direction() const
1004  {
1005  return dir_;
1006  }
1007  };
1008 
1011  template <class KeyImp, class ObjectImp>
1012  class CommManagerFactory
1013  {
1014  public:
1016  static ObjectImp * createObject( const KeyImp & key )
1017  {
1018  return new ObjectImp(key.space(), key.interface(), key.direction());
1019  }
1020 
1022  static void deleteObject( ObjectImp * obj )
1023  {
1024  delete obj;
1025  }
1026  };
1027 
1029  template <class SpaceImp>
1030  class CommunicationManager
1031  {
1032  typedef CommunicationManager<SpaceImp> ThisType;
1033 
1034  // type of communication manager object which does communication
1035  typedef DependencyCache<SpaceImp> DependencyCacheType;
1036 
1037  typedef CommManagerSingletonKey<SpaceImp> KeyType;
1038  typedef CommManagerFactory<KeyType, DependencyCacheType> FactoryType;
1039 
1040  typedef SingletonList< KeyType , DependencyCacheType , FactoryType > CommunicationProviderType;
1041 
1042  typedef SpaceImp SpaceType;
1043  const SpaceType & space_;
1044 
1045  const KeyType key_;
1046 
1047  const int mySize_;
1048 
1049  typedef ALU3DSPACE MpAccessLocal MPAccessInterfaceType;
1050 
1051  // is singleton per space
1052  DependencyCacheType &cache_;
1053  CommunicationManager(const ThisType& org);
1054  public:
1055  // type of non-blocking communication object
1056  typedef typename DependencyCacheType :: NonBlockingCommunicationType NonBlockingCommunicationType;
1057 
1059  CommunicationManager(const SpaceType & space,
1060  const InterfaceType interface,
1061  const CommunicationDirection dir)
1062  : space_(space)
1063  , key_(space_,interface,dir)
1064  , mySize_(space_.gridPart().comm().size())
1065  , cache_(CommunicationProviderType::getObject(key_))
1066  {}
1067 
1069  CommunicationManager(const SpaceType & space)
1070  : space_(space)
1071  , key_(space_, space.communicationInterface(), space.communicationDirection())
1072  , mySize_(space_.gridPart().comm().size())
1073  , cache_(CommunicationProviderType::getObject(key_))
1074  {}
1075 
1077  ~CommunicationManager()
1078  {
1079  CommunicationProviderType::removeObject(cache_);
1080  }
1081 
1083  InterfaceType communicationInterface() const
1084  {
1085  return cache_.communicationInterface();
1086  }
1087 
1089  CommunicationDirection communicationDirection() const
1090  {
1091  return cache_.communicationDirection();
1092  }
1093 
1095  double buildTime() const
1096  {
1097  return cache_.buildTime();
1098  }
1099 
1101  double exchangeTime() const
1102  {
1103  return cache_.exchangeTime();
1104  }
1105 
1106  MPAccessInterfaceType& mpAccess()
1107  {
1108  return cache_.mpAccess();
1109  }
1110 
1113  {
1114  return cache_.nonBlockingCommunication();
1115  }
1116 
1119  template <class DiscreteFunctionType>
1120  void exchange(DiscreteFunctionType & df) const
1121  {
1122  // get type of default operation
1123  typedef typename DiscreteFunctionType :: DiscreteFunctionSpaceType
1124  :: template CommDataHandle< DiscreteFunctionType > :: OperationType DefaultOperationType;
1125 
1126  // create default operation
1127  DefaultOperationType operation;
1128 
1129  exchange( df, operation );
1130  }
1131 
1134  template <class DiscreteFunctionType, class Operation>
1135  void exchange(DiscreteFunctionType & df, const Operation& operation ) const
1136  {
1137  cache_.exchange( df, operation );
1138  }
1139 
1141  template <class ObjectStreamVectorType, class DiscreteFunctionType>
1142  void writeBuffer(ObjectStreamVectorType& osv, const DiscreteFunctionType & df) const
1143  {
1144  cache_.writeBuffer( osv, df );
1145  }
1146 
1147  // read given df from given buffer
1148  template <class ObjectStreamVectorType, class DiscreteFunctionType>
1149  void readBuffer(ObjectStreamVectorType& osv, DiscreteFunctionType & df) const
1150  {
1151  typedef typename DiscreteFunctionType :: DiscreteFunctionSpaceType
1152  :: template CommDataHandle<DiscreteFunctionType> :: OperationType OperationType;
1153 
1154  // communication operation to be performed on the received data
1155  OperationType operation;
1156 
1157  readBuffer( osv, df, operation );
1158  }
1159 
1160  // read given df from given buffer
1161  template <class ObjectStreamVectorType, class DiscreteFunctionType, class OperationType>
1162  void readBuffer(ObjectStreamVectorType& osv, DiscreteFunctionType & df, const OperationType& operation) const
1163  {
1164  cache_.readBuffer( osv, df , operation);
1165  }
1166 
1168  void rebuildCache()
1169  {
1170  cache_.rebuild();
1171  }
1172  };
1173 
1175  class CommunicationManagerList
1176  {
1178  template <class MPAccessType, class ObjectStreamVectorType>
1179  class DiscreteFunctionCommunicatorInterface
1180  {
1181  protected:
1182  DiscreteFunctionCommunicatorInterface()
1183  {}
1184  public:
1185  virtual ~DiscreteFunctionCommunicatorInterface()
1186  {}
1187 
1188  virtual MPAccessType& mpAccess() = 0;
1189  virtual void writeBuffer(ObjectStreamVectorType&) const = 0;
1190  virtual void readBuffer(ObjectStreamVectorType&) = 0;
1191  virtual void rebuildCache() = 0;
1192 
1193  virtual bool handles ( IsDiscreteFunction &df ) const = 0;
1194  };
1195 
1199  template <class DiscreteFunctionImp,
1200  class MPAccessType,
1201  class ObjectStreamVectorType,
1202  class OperationType >
1203  class DiscreteFunctionCommunicator
1204  : public DiscreteFunctionCommunicatorInterface<MPAccessType,ObjectStreamVectorType>
1205  {
1206  typedef DiscreteFunctionImp DiscreteFunctionType;
1207  typedef typename DiscreteFunctionType :: DiscreteFunctionSpaceType DiscreteFunctionSpaceType;
1208 
1209  typedef CommunicationManager<DiscreteFunctionSpaceType> CommunicationManagerType;
1210 
1211  // object to communicate
1212  DiscreteFunctionType& df_;
1214  CommunicationManagerType comm_;
1215 
1217  const OperationType operation_;
1218  public:
1220  DiscreteFunctionCommunicator(DiscreteFunctionType& df, const OperationType& op )
1221  : df_(df), comm_(df_.space()), operation_( op )
1222  {}
1223 
1225  virtual MPAccessType& mpAccess()
1226  {
1227  return comm_.mpAccess();
1228  }
1229 
1231  virtual void writeBuffer(ObjectStreamVectorType& osv) const
1232  {
1233  comm_.writeBuffer(osv,df_);
1234  }
1235 
1237  virtual void readBuffer(ObjectStreamVectorType& osv)
1238  {
1239  comm_.readBuffer(osv, df_, operation_ );
1240  }
1241 
1243  virtual void rebuildCache()
1244  {
1245  comm_.rebuildCache();
1246  }
1247 
1248  virtual bool handles ( IsDiscreteFunction &df ) const { return (&static_cast< IsDiscreteFunction & >( df_ ) == &df); }
1249  };
1250 
1251  // ALUGrid send/recv buffers
1252  typedef ALU3DSPACE ObjectStream ObjectStreamType;
1253 
1254  // type of buffer vector
1255  typedef std::vector< ObjectStreamType > ObjectStreamVectorType;
1256 
1257  // type of ALUGrid Communicator
1258  typedef ALU3DSPACE MpAccessLocal MPAccessInterfaceType;
1259 
1260  // interface for communicated objects
1261  typedef DiscreteFunctionCommunicatorInterface<MPAccessInterfaceType,ObjectStreamVectorType>
1262  CommObjInterfaceType;
1263 
1264  // list of communicated objects
1265  typedef std::list < std::unique_ptr< CommObjInterfaceType > > CommObjListType;
1266  CommObjListType objList_;
1267 
1268  // number of processors
1269  int mySize_;
1270 
1271  public:
1273  template <class CombinedObjectType>
1274  CommunicationManagerList(CombinedObjectType& cObj) :
1275  mySize_( -1 )
1276  {
1277  // add all discrete functions containd in cObj to list
1278  cObj.addToList(*this);
1279  }
1280 
1283  : mySize_( -1 )
1284  {}
1285 
1287 
1289  template <class DiscreteFunctionImp, class Operation>
1290  void addToList(DiscreteFunctionImp &df, const Operation& operation )
1291  {
1292  // type of communication object
1293  typedef DiscreteFunctionCommunicator<DiscreteFunctionImp,
1294  MPAccessInterfaceType,
1295  ObjectStreamVectorType,
1296  Operation > CommObj;
1297  CommObj * obj = new CommObj(df, operation);
1298  objList_.push_back( std::unique_ptr< CommObjInterfaceType > (obj) );
1299 
1300  // if mySize wasn't set, set to number of processors
1301  if( mySize_ < 0 )
1302  {
1303  // get ALUGrid communicator
1304  MPAccessInterfaceType& mpAccess = objList_.front()->mpAccess();
1305 
1306  // set number of processors
1307  mySize_ = mpAccess.psize();
1308  }
1309  }
1310 
1312  template <class DiscreteFunctionImp>
1313  void addToList(DiscreteFunctionImp &df)
1314  {
1315  DFCommunicationOperation::Copy operation;
1316  addToList( df, operation );
1317  }
1318 
1319  template< class DiscreteFunction >
1320  void removeFromList ( DiscreteFunction &df )
1321  {
1322  const auto handles = [ &df ] ( const std::unique_ptr< CommObjInterfaceType > &commObj ) { assert( commObj ); return commObj->handles( df ); };
1323  CommObjListType::reverse_iterator pos = std::find_if( objList_.rbegin(), objList_.rend(), handles );
1324  if( pos != objList_.rend() )
1325  objList_.erase( --pos.base() );
1326  else
1327  DUNE_THROW( RangeError, "Trying to remove discrete function that was never added" );
1328  }
1329 
1332  void exchange()
1333  {
1334  // if only one process, do nothing
1335  if( mySize_ <= 1 ) return ;
1336 
1337  // exchange data
1338  if(objList_.size() > 0)
1339  {
1340  // rebuild cahce if grid has changed
1341  for(auto& elem : objList_)
1342  elem->rebuildCache();
1343 
1344  // get ALUGrid communicator
1345  auto& mpAccess = objList_.front()->mpAccess();
1346 
1347  // create buffer
1348  ObjectStreamVectorType osv( mpAccess.nlinks() );
1349 
1350  // fill buffers
1351  for(auto& elem : objList_)
1352  elem->writeBuffer(osv);
1353 
1354  // exchange data
1355  osv = mpAccess.exchange(osv);
1356 
1357  // read buffers
1358  for(auto& elem : objList_)
1359  elem->readBuffer(osv);
1360  }
1361  }
1362  };
1363 #endif // #if ALU3DGRID_PARALLEL
1365 
1366  } // namespace Fem
1367 
1368 } // namespace Dune
1369 #endif // #ifndef DUNE_FEM_CACHED_COMMUNICATION_MANAGER_HH
const SpaceType & space_
Definition: communicationmanager.hh:161
double buildTime() const
return time needed for last build
Definition: communicationmanager.hh:199
InterfaceType communicationInterface() const
return communication interface
Definition: communicationmanager.hh:185
DefaultCommunicationManager< SpaceImp > ThisType
Definition: communicationmanager.hh:82
SpaceImp SpaceType
Definition: communicationmanager.hh:79
void exchange() const
Definition: communicationmanager.hh:373
void exchange(DiscreteFunction &discreteFunction) const
exchange data for a discrete function using the copy operation
Definition: communicationmanager.hh:221
CommunicationDirection communicationDirection() const
return communication direction
Definition: communicationmanager.hh:190
NonBlockingCommunication NonBlockingCommunicationType
Definition: communicationmanager.hh:169
NonBlockingCommunicationType nonBlockingCommunication() const
return object for non-blocking communication
Definition: communicationmanager.hh:211
double exchangeTime() const
return time needed for last exchange of data
Definition: communicationmanager.hh:205
void addToList(DiscreteFunctionImp &df, const Operation &operation)
add discrete function to communication list
Definition: communicationmanager.hh:345
void removeFromList(DiscreteFunction &df)
Definition: communicationmanager.hh:361
Definition: bindguard.hh:11
bool operator==(const Double &a, const Double &b)
Definition: double.hh:601
static void forEach(IndexRange< T, sz > range, F &&f)
Definition: hybrid.hh:129
base class for determing whether a class is a discrete function or not
Definition: common/discretefunction.hh:52
Definition: cachedcommmanager.hh:47
Definition: commindexmap.hh:16