duo_distributed_vector.h

Go to the documentation of this file.
00001 /* ./bridging/duo_distributed_vector.h 
00002 **********************************
00003 Copyright INRIA and CEA 
00004 
00005 author : Guillaume ANCIAUX (anciaux@labri.fr, g.anciaux@laposte.net)
00006 
00007 The LibMultiScale is a C++ parallel framework for the multiscale
00008 coupling methods dedicated to material simulations. This framework
00009 provides an API which makes it possible to program coupled simulations
00010 and integration of already existing codes.
00011 
00012 This Project is done in a collaboration between INRIA Futurs Bordeaux
00013 within ScAlApplix team and CEA/DPTA Ile de France. 
00014 
00015 This software is governed by the CeCILL-C license under French law and
00016 abiding by the rules of distribution of free software.  You can  use, 
00017 modify and/ or redistribute the software under the terms of the CeCILL-C
00018 license as circulated by CEA, CNRS and INRIA at the following URL
00019 "http://www.cecill.info". 
00020 
00021 As a counterpart to the access to the source code and  rights to copy,
00022 modify and redistribute granted by the license, users are provided only
00023 with a limited warranty  and the software's author,  the holder of the
00024 economic rights,  and the successive licensors  have only  limited
00025 liability. 
00026 
00027 In this respect, the user's attention is drawn to the risks associated
00028 with loading,  using,  modifying and/or developing or reproducing the
00029 software by the user in light of its specific status of free software,
00030 that may mean  that it is complicated to manipulate,  and  that  also
00031 therefore means  that it is reserved for developers  and  experienced
00032 professionals having in-depth computer knowledge. Users are therefore
00033 encouraged to load and test the software's suitability as regards their
00034 requirements in conditions enabling the security of their systems and/or 
00035 data to be ensured and,  more generally, to use and operate it in the 
00036 same conditions as regards security. 
00037 
00038 The fact that you are presently reading this means that you have had
00039 knowledge of the CeCILL-C license and that you accept its terms.
00040 ***********************************/
00041 
00042 #ifndef DUODISTRIBUTEDVECTEUR_H
00043 #define DUODISTRIBUTEDVECTEUR_H
00044 
00045 #include "../communicator/communicator.h"
00046 #include <list>
00047 
00050 typedef struct Myblocks_ {
00052   unsigned int start;
00054   unsigned int end;
00055 }blocks;
00056 
00057 
00062 class DuoDistributedVecteur{
00063  public:
00064 
00065   DuoDistributedVecteur(int lsize,unsigned int tsize){
00066     DUMP("Creating duodistributed vector of size " << lsize << "," << tsize,DBG_INFO_STARTUP);
00067     //init des flags et indicateurs
00068     totalsize = tsize;
00069     localsize = lsize;
00070     compressed = false;
00071 
00072     //allocation du tableau principal
00073     //qui sera compressé plus tard
00074     indirection = new int[localsize];
00075     
00076     // mise a zero du vecteur
00077     memset(indirection,-1,sizeof(int)*localsize);
00078 
00079     schNew_index = 1;
00080     sch_index = 0;
00081 
00082     something_changed = true;
00083     step = 0;
00084   };
00085 
00087   void SetNeighbor(int i, int proc){
00088     if (indirection[i]!= -1){
00089       FATAL("distant owner for index " << i << " was already attributed to " << indirection[i] << " i try to change it for  " << proc);
00090     }
00091     indirection[i] = proc;
00092   };
00093 
00095   int GetNeighbor(int i){
00096     blocks b;
00097     int proc;
00098     findBlock(i,b,proc,sch_index);
00099     return proc;
00100   }
00101 
00103   int GetUncompressedNeighbor(int i){
00104     return indirection[i];
00105   }
00106 
00107 
00108   //les methodes ci dessous stocke les envois
00109   //pour le faire en une fois de maniere globale
00110   //par la methode SynchronizeMigration
00112   void receiveAtom(int i,int neighbor){
00113     //    int dist_index;
00114 
00115    AddAtomInBlocks(i,neighbor,schNew_index);
00116     // findDistIndex(i,neighbor,dist_index,schNew_index);
00117    DUMP("atom " << i << " received with new neighbor " << neighbor << " newpos = " << i << " number " 
00118         <<  received[neighbor].size(),DBG_INFO);
00119    //    received[neighbor].push_back(dist_index);
00120    received[neighbor].push_back(i);
00121   }
00122 
00124   void sendAtom(int i , int toproc){
00125     DUMP("atom " << i << " migrate to " << toproc,DBG_INFO);
00126     //je le cherche pour voir qd il est envoye au voisin FE
00127     int neigh,dist_index=0;
00128     dist_index = findDistIndex(i,neigh,sch_index);
00129     RemoveAtomFromBlocks(i,schNew_index);
00130     //j'informe le nouveau proprietaire de qui est sont homologue
00131     //    if (dist_index == sent[neigh][sent[neigh].size()-1].first) FATAL("detection peu couteuse : double declaration de migration d'atome : index" 
00132     //                                                               << dist_index); 
00133     sent[neigh].push_back(std::make_pair<int,int>(dist_index,toproc));
00134     DUMP("actually size to notify to proc " << neigh << " size = " << sent[neigh].size(),DBG_INFO);
00135   }
00136 
00137 
00139   int findDistIndex(unsigned int i,int & neighbor,int schIndex){
00140     DUMP("request findDistIndex "  << i,DBG_DETAIL);
00141     std::map<int,std::list<blocks> > & scheme = compressed_scheme[schIndex];
00142 
00143     neighbor = -2;
00144     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00145     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00146     
00147     while (it != last){
00148       int proc = (*it).first;
00149       std::list<blocks> & lblocks = (*it).second;
00150       std::list<blocks>::iterator it_block = lblocks.begin();
00151       std::list<blocks>::iterator last_block = lblocks.end();
00152       int dist_index = 0;
00153   
00154       for (it_block = lblocks.begin(); it_block != last_block ; ++it_block){ 
00155         blocks & b = (*it_block);
00156         if (i < b.end && i>= b.start)
00157           {
00158             neighbor = proc;
00159             dist_index += i - b.start;
00160             return dist_index;
00161           }
00162         dist_index += b.end-b.start;
00163       }
00164       ++it;
00165     }
00166    
00167     if (neighbor == -2) FATAL("not found atom " << i << " in any blocks");
00168     return -1;
00169   }
00170 
00171 
00173   int findRealIndex(unsigned int i,int neighbor,int schIndex){
00174     DUMP("request findRealIndex "  << i << "," << neighbor,DBG_DETAIL);
00175     std::map<int,std::list<blocks> > & scheme = compressed_scheme[schIndex];
00176 
00177     std::list<blocks> & lblocks = scheme[neighbor];
00178     std::list<blocks>::iterator it_block = lblocks.begin();
00179     std::list<blocks>::iterator last_block = lblocks.end();
00180     unsigned int dist_index = 0;
00181     
00182     for (it_block = lblocks.begin(); it_block != last_block ; ++it_block){ 
00183       blocks & b = (*it_block);
00184       
00185       if (i < dist_index + b.end - b.start && i>= dist_index)
00186         {
00187           return (b.start + i - dist_index);
00188         }
00189       else if (i >= b.start && i < b.end) FATAL("insertion d'atom impossible sur une position " << i 
00190                                                 << " alors qu'il existe un bloc " << b.start << " - " << b.end);
00191     }
00192     FATAL("distant index not existant for proc " << neighbor << " dist_index " << i);
00193   }
00194 
00195 
00197 
00198   void AddAtomInBlocks(unsigned int i,int neighbor,int schIndex){
00199     std::map<int,std::list<blocks> > & scheme = compressed_scheme[schIndex];
00200 
00201     std::list<blocks> & lblocks = scheme[neighbor];
00202     //je creer systematiquement un bloc a la fin pour la coherence du schemas de com (+ simple)
00203     blocks new_b; 
00204     new_b.start = i; 
00205     new_b.end = i+1; 
00206     lblocks.push_back(new_b);
00207     
00208 
00209 /*     std::list<blocks>::iterator it_block = lblocks.begin(); */
00210 /*     std::list<blocks>::iterator last_block = lblocks.end(); */
00211 /*     int done_flag=false; */
00212 /*     blocks new_b; */
00213 
00214 /*     if (i+1>localsize) */
00215 /*       localsize = i+1; */
00216     
00217 /*     for (it_block = lblocks.begin(); it_block != last_block ; ++it_block){  */
00218 /*       blocks & b = (*it_block); */
00219 /*       if (i == b.end){ */
00220 /*      DUMP("cas 1 " << i,DBG_DETAIL); */
00221 /*      //on etend le bloc */
00222 /*      //      ++b.end;  */
00223 /*      new_b.start = i; */
00224 /*      new_b.end = i+1; */
00225 /*      ++it_block; */
00226 /*      lblocks.insert(it_block,new_b); */
00227 /*      done_flag = true; */
00228 /*      break; */
00229 /*       } */
00230 /*       else if (i+1 == b.start){ */
00231 /*      DUMP("cas 2 " << i,DBG_DETAIL); */
00232 /*      //on etend le bloc par devant */
00233 /*      //      --b.start;  */
00234 /*      new_b.start = i; */
00235 /*      new_b.end = i+1; */
00236 /*      lblocks.insert(it_block,new_b); */
00237 /*      done_flag = true; */
00238 /*      break; */
00239 /*       } */
00240 /*       else if (i < b.start) */
00241 /*      { */
00242 /*        DUMP("cas 3 " << i,DBG_DETAIL); */
00243 /*        //c l'endroit parfait pour creer le nouveau bloc de taille 1 */
00244 /*        new_b.start = i; */
00245 /*        new_b.end = i+1; */
00246 /*        lblocks.insert(it_block,new_b); */
00247 /*        done_flag = true; */
00248 /*        break; */
00249 /*      } */
00250 /*       else if (i >= b.start && i < b.end) FATAL("insertion d'atom impossible sur une position " << i  */
00251 /*                                              << " alors qu'il existe un bloc " << b.start << " - " << b.end); */
00252 /*     } */
00253 
00254 /*     if (!done_flag){ */
00255 /*       DUMP("cas 4 " << i,DBG_DETAIL); */
00256 /*       //je creer un nouveau bloc apres tout les autres */
00257 /*       new_b.start = i; */
00258 /*       new_b.end = i+1; */
00259 /*       lblocks.insert(it_block,new_b); */
00260 /*     } */
00261 
00262   }
00263 
00265   void AddAtomInBlocksWithDistIndex(unsigned int distant_index,unsigned int real_index,int neighbor,int schIndex){
00266     DUMP("request  AddAtomInBlocksWithSpecifiedOrder "  << distant_index << "," << real_index << "," << neighbor,DBG_DETAIL);
00267     std::map<int,std::list<blocks> > & scheme = compressed_scheme[schIndex];
00268 
00269     std::list<blocks> & lblocks = scheme[neighbor];
00270     std::list<blocks>::iterator it_block = lblocks.begin();
00271     std::list<blocks>::iterator last_block = lblocks.end();
00272     int done_flag=false;
00273     blocks new_b;
00274 
00275     if (real_index+1 > localsize){
00276       DUMP("AAAAAAAAAAAAAA this should not append ?",DBG_INFO);
00277       localsize = real_index+1;
00278     }
00279     unsigned int current_index = 0;
00280     for (it_block = lblocks.begin(); it_block != last_block ; ++it_block){
00281       blocks & b = (*it_block);
00282       if (distant_index >= current_index && distant_index < current_index + b.end-b.start){
00283         //c'est a l'interieur de ce bloc qu'il faut inserer le nouveau bloc
00284         if(distant_index == current_index){
00285           FATAL("AAAA");
00286           //j'insere le nouveau bloc juste avant le suivant
00287           new_b.start = real_index;
00288           new_b.end = real_index+1;
00289           
00290           lblocks.insert(it_block,new_b);
00291           done_flag=true;
00292           DUMP("insertion d'un bloc qui envoie la coordonnee " << real_index << " a " << neighbor,DBG_INFO);
00293         }
00294         else{
00295           FATAL("BBBB");
00296           //sinon je coupe le bloc en deux
00297           int index_to_remove = distant_index - current_index + b.start;
00298           new_b.start = index_to_remove + 1;
00299           new_b.end = b.end;
00300           b.end = index_to_remove;
00301           ++it_block;
00302           lblocks.insert(it_block,new_b);
00303           new_b.start = real_index;
00304           new_b.end = real_index +1 ;
00305           DUMP("splitting bloc - new blocs -> " << b.start << " - " << b.end << " , " << new_b.start << " - " << new_b.end,DBG_INFO);
00306           lblocks.insert(it_block,new_b);
00307           DUMP("adding new bloc at the end referencing index " << real_index,DBG_INFO);
00308           done_flag=true;
00309           break;
00310         }
00311       }
00312       current_index += b.end-b.start;
00313     }
00314     if (!done_flag){
00315       //je creer un nouveau bloc apres tout les autres
00316       new_b.start = real_index;
00317       new_b.end = real_index+1;
00318       lblocks.insert(it_block,new_b);
00319       DUMP("adding new bloc at the end referencing index " << real_index << " associated with " << neighbor,DBG_INFO) ;
00320     }
00321 
00322   }
00323 
00324 
00326   void RemoveAtomFromBlocks(unsigned int i,int schIndex){
00327     int done_flag = 0;
00328 
00329     std::map<int,std::list<blocks> > & scheme = compressed_scheme[schIndex];
00330 
00331     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00332     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00333     
00334     while (it != last){
00335       int proc = (*it).first;
00336       std::list<blocks> & lblocks = (*it).second;
00337       std::list<blocks>::iterator it_block = lblocks.begin();
00338       std::list<blocks>::iterator last_block = lblocks.end();
00339   
00340       for (it_block = lblocks.begin(); it_block != last_block ; ++it_block){ 
00341         blocks & b = (*it_block);
00342         if (i < b.end && i>= b.start)
00343           {
00344             if (i == b.start && i+1 == b.end){
00345               lblocks.erase(it_block);
00346               done_flag = 1;
00347               DUMP("found a block with one element (" << i << "," << i+1 << ") -> erased",DBG_INFO);
00348             }
00349             else if (i == b.start && i+1 != b.end){
00350               b.start = i+1;
00351               done_flag = 1;
00352               DUMP("index specified was first of block - none created only first index removed from block",DBG_INFO);
00353             }
00354             else if (i != b.start && i+1 == b.end){
00355               b.end = i;
00356               done_flag = 1;
00357               DUMP("index specified was last of block - none created only last index removed from block",DBG_INFO);
00358             }
00359             else{
00360               done_flag = 1;
00361               DUMP("splitting bloc " << b.start << " - " << b.end << " associated with " << proc << " (" << &b << ")",DBG_INFO);
00362               //si besoin je creer un nouveau bloc      
00363               blocks new_b;
00364               new_b.end = b.end;
00365               new_b.start = i+1;
00366               b.end = i;
00367               ++it_block;
00368               lblocks.insert(it_block,new_b);
00369               DUMP("the new blocs " << b.start << " - " << b.end << " , " << new_b.start << " - " << new_b.end,DBG_INFO);
00370             }
00371             break;
00372           }
00373         
00374       }
00375       if (done_flag) break;
00376       ++it;
00377     }
00378   }
00379 
00381   void findBlock(unsigned int i,blocks & bl,int & neighbor,int index){
00382     neighbor = -2;
00383     std::map<int,std::list<blocks> > & scheme = compressed_scheme[index];
00384 
00385     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00386     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00387     
00388     while (it != last){
00389       int proc = (*it).first;
00390       std::list<blocks> & lblocks = (*it).second;
00391       std::list<blocks>::iterator it_block = lblocks.begin();
00392       std::list<blocks>::iterator last_block = lblocks.end();
00393       
00394       for (it_block = lblocks.begin(); it_block != last_block ; ++it_block){ 
00395         blocks b = (*it_block);
00396         DUMP("testing if " << i << " is between " << b.start << " and " << b.end << " neigh = " << proc,DBG_DETAIL);
00397         if (i < b.end && i>= b.start)
00398           {
00399             bl = b;
00400             neighbor = proc;
00401             break;
00402           }
00403       }
00404       if (neighbor != -2)
00405         break;
00406       ++it;
00407     }
00408     if (neighbor == -2){
00409       FATAL("block not found for index " << i);
00410     }
00411   }
00412 
00414   int SynchronizeMigration(int groupeAtomic,int groupeFE,Communicator & com){
00415     DUMP("SynchronizeMigration start",DBG_INFO);
00416     // en premier je parcours la map des "sent" pour avertir des coms
00417 
00418     std::map<int,std::list<blocks> > & scheme = compressed_scheme[sch_index];
00419 
00420     if (scheme.size() == 0)
00421       return 0;
00422 
00423     something_changed = false;
00424 
00425     int proc;
00426     int nb_migrated;
00427     std::vector<int> buffer;
00428 
00429     //atoms side
00430     if(com.amIinGroup(groupeAtomic)){
00431 
00432       {
00433         std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00434         std::map<int,std::list<blocks> >::iterator last = scheme.end();
00435         
00436         while (it != last){
00437           proc = (*it).first;
00438           if (proc != -1){
00439             nb_migrated = sent[proc].size();
00440             DUMP("envoie le nombre d'atomes qui ont migres to proc "  << com.RealRank(proc,groupeFE) << " (" << nb_migrated << ")",DBG_INFO);
00441             com.SendIntegers(&nb_migrated,1,proc,groupeFE);
00442             com.waitForPendingComs();
00443             if (nb_migrated){
00444               something_changed = true;
00445               DUMP("i warn proc " << com.RealRank(proc,groupeFE) << " of migration of " << nb_migrated << " atoms",DBG_INFO);
00446               //je construit maintenant le vecteur d'informationd sur les migrations
00447               buffer.resize(2*nb_migrated);
00448               //je parcours les atomes migres
00449               for (int i = 0 ; i < nb_migrated ; ++i){
00450                 buffer[2*i] = sent[proc][i].first;
00451                 buffer[2*i+1] = sent[proc][i].second;
00452               }
00453               
00454               DUMP("i send the migration information to " << com.RealRank(proc,groupeFE),DBG_INFO);
00455               com.SendIntegers(&buffer[0],2*nb_migrated,proc,groupeFE);
00456               com.waitForPendingComs();
00457             }
00458           }
00459           ++it;
00460         }
00461       }
00462       
00463 
00464       std::map<int,std::vector<int> >::iterator it = received.begin();      
00465       std::map<int,std::vector<int> >::iterator last = received.end();      
00466 
00467 
00468       // first converting all the local indexes in subset to blks indexes
00469       while (it != last){
00470         int proc = (*it).first;
00471         vector<int> & new_pos = (*it).second;
00472         int temp;
00473         for (unsigned int cpt = 0 ; cpt < new_pos.size() ; ++cpt){
00474           DUMP("find distant index for index " << cpt,DBG_DETAIL);
00475           int toto = proc;
00476           temp = findDistIndex(new_pos[cpt],toto,schNew_index);
00477           if (toto != proc) FATAL("inconstistency in block management " << toto << " != " << proc); 
00478           DUMP("result is " << temp << " neighbor detected = " << proc,DBG_DETAIL);
00479           
00480           new_pos[cpt] = temp;
00481         }
00482         ++it;
00483       }
00484 
00485       //dont't forget to reset iterator
00486       it = received.begin();      
00487       // then send all the data for received atoms indexes to FE nodes
00488       while (it != last){
00489         int proc = (*it).first;
00490         vector<int> & new_pos = (*it).second;
00491         DUMP("i send " << new_pos.size() << " indexes of new positions to proc " << com.RealRank(proc,groupeFE),DBG_INFO);
00492         if (proc != -1){
00493           something_changed = true;
00494 #ifdef DEBUG_BRIDGE
00495           {
00496             int nb_send = new_pos.size();
00497             com.SendIntegers(&nb_send,1,proc,groupeFE);
00498           }
00499 #endif
00500           com.SendIntegers(&new_pos[0],new_pos.size(),proc,groupeFE);
00501         }
00502         com.waitForPendingComs();
00503         ++it;
00504       }
00505     }
00506     
00507 
00508     // FE side
00509     if(com.amIinGroup(groupeFE)){
00510       std::map<int,std::vector<int> > pending_coms;
00511 
00512       {
00513         std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00514         std::map<int,std::list<blocks> >::iterator last = scheme.end();
00515         
00516         while (it != last){
00517           int proc = (*it).first;
00518           //      std::list<blocks> & lblocks = (*it).second;
00519           //      std::list<blocks>::iterator it_block = lblocks.begin();
00520           //std::list<blocks>::iterator last_block = lblocks.end();
00521           //je recois le nombre de migration
00522           DUMP("recois le nombre d'atomes qui ont migres de " << com.RealRank(proc,groupeAtomic),DBG_INFO);
00523           if (proc == -1){++it;continue;}
00524 
00525           com.ReceiveIntegers(&nb_migrated,1,proc,groupeAtomic);
00526           com.waitForPendingComs();
00527           if (nb_migrated){
00528             something_changed = true;
00529             DUMP("i am warned from proc " << com.RealRank(proc,groupeAtomic) << " of migration of " << nb_migrated << " atoms",DBG_INFO);
00530             buffer.resize(2*nb_migrated);
00531             DUMP("i receive the migration information from " << com.RealRank(proc,groupeAtomic),DBG_INFO);
00532             com.ReceiveIntegers(&buffer[0],2*nb_migrated,proc,groupeAtomic);
00533             com.waitForPendingComs();
00534             
00535             //maintenant que j'ai recupere les informations je peux parcourir mon schemas de com
00536             // et le modifier
00537             //je parcours les infos recues
00538             
00539             for (int i = 0 ; i < nb_migrated ; ++i){    
00540               unsigned int distant_index = buffer[2*i];
00541               unsigned int new_neigh = buffer[2*i+1];
00542               int real_index = 0;
00543               //              int old_neigh;
00544               
00545               DUMP("splitting blocks from distant index " << distant_index,DBG_INFO);
00546               real_index = findRealIndex(distant_index,proc,sch_index);
00547               RemoveAtomFromBlocks(real_index,schNew_index);
00548               pending_coms[new_neigh].push_back(real_index);
00549             }
00550           }         
00551           ++it;
00552         }
00553       }
00554       {
00555         //maintenant je receptionne les coms des nouveaux voisins pour chaque atomes
00556         std::map<int,std::vector<int> >::iterator it = pending_coms.begin();
00557         std::map<int,std::vector<int> >::iterator last = pending_coms.end();
00558         
00559         while (it != last){
00560           int proc = (*it).first;
00561           int nb_coms = (*it).second.size();
00562           if (nb_coms){
00563             buffer.resize(nb_coms);
00564             DUMP("reception of " << nb_coms << " atoms new indexes from " << com.RealRank(proc,groupeAtomic),DBG_INFO);
00565 #ifdef DEBUG_BRIDGE
00566             {
00567               int nb_recv = 0;
00568               com.ReceiveIntegers(&nb_recv,1,proc,groupeAtomic);
00569               com.waitForPendingComs();
00570               if (nb_recv != nb_coms)
00571                 FATAL("redistribution failed : proc " << com.RealRank(proc,groupeAtomic) << " want to send me " << nb_recv << " integers but i planed " <<  nb_coms);
00572             }
00573 #endif
00574             com.ReceiveIntegers(&buffer[0],nb_coms,proc,groupeAtomic);
00575             com.waitForPendingComs();
00576             if (buffer.size() != pending_coms[proc].size()) FATAL("inconstistency");
00577             //      doubleQuickSort(&buffer[0],&(pending_coms[proc][0]),0,pending_coms[proc].size()-1);
00578             for (int i = 0 ; i < nb_coms ; ++i){
00579               if (i > 0 && buffer[i-1] > buffer[i]) FATAL("inconstistency in duo blocks "  
00580                                                           << buffer[i-1] << " " << buffer[i]); 
00581               AddAtomInBlocksWithDistIndex(buffer[i],pending_coms[proc][i],proc,schNew_index);
00582             }
00583           }
00584           ++it;
00585         }
00586       }
00587 
00588     }
00589     
00590     sent.clear();
00591     received.clear();
00592     
00593     swapSchemes();
00594 
00595     return something_changed;
00596 
00597     DUMP("done sync",DBG_INFO);
00598     return 0;
00599   }
00600 
00602   void swapSchemes(){
00603     if (sch_index == 0){
00604       sch_index = 1;
00605       schNew_index = 0;
00606     }
00607     else{
00608       sch_index = 0;
00609       schNew_index = 1;
00610     }
00611     
00612     std::map<int,std::list<blocks> > & scheme = compressed_scheme[sch_index];
00613     std::map<int,std::list<blocks> > & schemeNew = compressed_scheme[schNew_index];    
00614 
00615     schemeNew.clear();
00616     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00617     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00618     
00619     while (it != last){
00620       int proc = (*it).first;
00621       std::list<blocks> & lblocks = (*it).second;
00622       std::list<blocks>::iterator it_block = lblocks.begin();
00623       std::list<blocks>::iterator last_block = lblocks.end();
00624       blocks b;
00625 
00626       while (it_block != last_block){
00627          b = (*it_block);
00628         schemeNew[proc].push_back(b);
00629         ++it_block;
00630       }
00631       ++it;
00632     }
00633   }
00634 
00636   void compress(){
00637     std::map<int,std::list<blocks> > & scheme = compressed_scheme[sch_index];
00638     std::map<int,std::list<blocks> > & schemeNew = compressed_scheme[schNew_index];
00639 
00640     scheme.clear();
00641 
00642     unsigned int sindex=0;
00643     int current_ind=indirection[0];
00644 
00645     for(unsigned int i = 1; i < localsize ; ++i)
00646       {
00647         if (current_ind == indirection[i]){
00648           continue;
00649         }
00650 
00651         blocks b;
00652         //j'init ses valeurs de block
00653         b.start = sindex;
00654         b.end = i;
00655         //      b.indirection = current_ind;
00656 
00657         //je rajoute un block   
00658         scheme[current_ind].push_back(b);
00659         //je met a jour le nouveau start
00660         sindex = i;
00661 
00662         //je met a jour l'indirection courante
00663         current_ind = indirection[i];
00664       }
00665 
00666     //je rentre le dernier block
00667     blocks b;
00668     b.start = sindex;
00669     b.end = localsize;
00670     //    b.indirection = current_ind;
00671     scheme[current_ind].push_back(b);
00672 
00673     DUMP("compressed to " << scheme.size() << " blocks",DBG_INFO_STARTUP);
00674     compressed = true;
00675 
00676     nb_par_interlocuteurs.clear();
00677 
00678     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00679     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00680 
00681     while (it != last){
00682       int proc = (*it).first;
00683       std::list<blocks> & lblocks = (*it).second;
00684       std::list<blocks>::iterator it_block = lblocks.begin();
00685       std::list<blocks>::iterator last_block = lblocks.end();
00686 
00687       while (it_block != last_block){
00688         blocks b = (*it_block);
00689         nb_par_interlocuteurs[proc]+=b.end-b.start;
00690         schemeNew[proc].push_back(b);
00691 
00692         ++it_block;
00693         DUMP("dans la compression initiale on a le block (" << b.start << " - " << b.end << ") pour le proc " << proc,DBG_WARNING);
00694       }
00695 
00696       ++it;
00697     }
00698     
00699   };
00700 
00702 #ifdef DEBUG_BRIDGE
00703   void SynchronizeVecteurBySum(char * name_vec,double * vec,unsigned int nmax,int groupe1,int groupe2,Communicator & com,int stride=1){
00704 #else
00705   void SynchronizeVecteurBySum(char * name_vec,double * vec,int groupe1,int groupe2,Communicator & com,int stride=1){
00706 #endif
00707     if(com.amIinGroup(groupe1)){
00708       DUMP("distribue le vecteur de " << groupe1 << " vers " << groupe2,DBG_INFO);
00709 #ifdef DEBUG_BRIDGE
00710       DistributeVecteur(name_vec,vec,nmax,groupe1,groupe2,com,stride);
00711 #else
00712       DistributeVecteur(name_vec,vec,groupe1,groupe2,com,stride);
00713 #endif
00714     }
00715     else if(com.amIinGroup(groupe2)){
00716       DUMP("receptionne la somme partielle de " << groupe1 << " vers " << groupe2,DBG_INFO);
00717       tmp.resize(localsize*stride); 
00718 #ifdef DEBUG_BRIDGE      
00719       DistributeVecteur("tmp vector for dist-reduction",&tmp[0],localsize*stride,groupe1,groupe2,com,stride);
00720       if ( localsize * stride > nmax ) FATAL("inconstistency found " << localsize*stride << " " << nmax << " " << name_vec); 
00721 #else
00722       DistributeVecteur("tmp vector for dist-reduction",&tmp[0],groupe1,groupe2,com,stride);
00723 #endif
00724       //compute sum
00725       for(unsigned int i=0; i < localsize*stride;++i)
00726         vec[i] += tmp[i];
00727     }
00728     
00729     //renvoie le resultat
00730     DUMP("retour du resultat",DBG_INFO);
00731 #ifdef DEBUG_BRIDGE      
00732     DistributeVecteur(name_vec,vec,nmax,groupe2,groupe1,com,stride);
00733 #else
00734     DistributeVecteur(name_vec,vec,groupe2,groupe1,com,stride);
00735 #endif
00736   }
00737 
00739 #ifdef DEBUG_BRIDGE
00740   void DistributeVecteur(char * name_vec,double * vec,unsigned int nmax,int groupe1,int groupe2,Communicator & com,int stride=1){
00741 #else
00742   void DistributeVecteur(char * name_vec,double * vec,int groupe1,int groupe2,Communicator & com,int stride=1){
00743 #endif
00744     std::map<int,std::list<blocks> > & scheme = compressed_scheme[sch_index];
00745 
00746     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00747     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00748 
00749     while (it != last){
00750       int proc = (*it).first;
00751       std::list<blocks> & lblocks = (*it).second;
00752       std::list<blocks>::iterator it_block = lblocks.begin();
00753       std::list<blocks>::iterator last_block = lblocks.end();
00754       if (proc == -1){++it;continue;}
00755 
00756       //je fait une boucle sur les block a transférer
00757       for (it_block = lblocks.begin(); it_block!=last_block;++it_block){
00758         blocks & b = *it_block;
00759         if(com.amIinGroup(groupe1)){
00760           //if (b.end > localsize){FATAL("y'a un souci dans la mise ajour des blocks du duo");}
00761           DUMP("sending bloc " << b.start*stride << " - " << b.end*stride << " to proc " << proc << "(" << localsize*stride << ")",DBG_INFO);
00762 #ifdef DEBUG_BRIDGE
00763           if (b.end*stride > nmax) FATAL("depassement detecte " << b.end*stride << " " << nmax << " " << name_vec); 
00764 #endif
00765 #ifdef DEBUG_BRIDGE
00766           {
00767             int nb_send = (b.end-b.start)*stride;
00768             com.SendIntegers(&nb_send,1,proc,groupe2);
00769           }
00770 #endif
00771           com.SendDoubles(vec+b.start*stride,(b.end-b.start)*stride,proc,groupe2);
00772           DUMP("sent bloc " << b.start*stride << " - " << b.end*stride << " to proc " << com.RealRank(proc,groupe2) << "(" << localsize*stride << ")",DBG_INFO);
00773         }
00774         if(com.amIinGroup(groupe2)){
00775           //      if (b.end > localsize){FATAL("y'a un souci dans la mise ajour des blocks du duo");}
00776           //      sleep(1);
00777           DUMP("recepting bloc " << b.start*stride << " - " << b.end*stride << " from proc " << com.RealRank(proc,groupe1) << "(" << localsize*stride << ") writing to " << vec,DBG_INFO); 
00778         
00779 #ifdef DEBUG_BRIDGE
00780           if (b.end*stride > nmax) FATAL("depassement detecte " << b.end*stride << " " << nmax << " " << name_vec); 
00781           {
00782             int nb_recv = 0;
00783             com.ReceiveIntegers(&nb_recv,1,proc,groupe1);
00784             if (nb_recv != (int)((b.end-b.start)*stride))
00785               FATAL("redistribution failed : proc " << com.RealRank(proc,groupe1) << " want to send me " << nb_recv << " doubles and i planed " <<  (b.end-b.start)*stride);
00786           }
00787 #endif
00788           com.ReceiveDoubles(vec+b.start*stride,(b.end-b.start)*stride,proc,groupe1);
00789           //      sleep(1);
00790           DUMP("reception bloc " << b.start*stride << " - " << b.end*stride << " from proc " << com.RealRank(proc,groupe1) << "(" << localsize*stride << ")",DBG_INFO); 
00791         }
00792       }
00793       ++it;
00794     }
00795     com.waitForPendingComs();
00796   }
00797 
00799   void print(char * prefix){
00800     char name[255];
00801 
00802 /*     if(something_changed == false){ */
00803 /*       ++step; */
00804 /*       return; */
00805 /*     } */
00806 
00807     sprintf(name,"%s-redistrib-scheme%d-proc%d.mat",prefix,step,my_proc_id);
00808     FILE * fichier = fopen(name,"wb+");
00809 
00810     std::map<int,std::list<blocks> > & scheme = compressed_scheme[sch_index];
00811 
00812     std::map<int,std::list<blocks> >::iterator it = scheme.begin();
00813     std::map<int,std::list<blocks> >::iterator last = scheme.end();
00814     
00815     while (it != last){
00816       int proc = (*it).first;
00817       std::list<blocks> & lblocks = (*it).second;
00818       std::list<blocks>::iterator it_block = lblocks.begin();
00819       std::list<blocks>::iterator last_block = lblocks.end();
00820       
00821       int distant_index_start = 0;
00822       int distant_index_end = 0;
00823       //je fait une boucle sur les block a transférer
00824       for (it_block = lblocks.begin(); it_block!=last_block;++it_block){
00825         blocks & b = *it_block;
00826         distant_index_end += b.end-b.start;
00827         fprintf(fichier,"%d-%d %d %d %d\n",b.start,b.end,distant_index_start,distant_index_end,proc);
00828         distant_index_start += b.end-b.start;
00829       }
00830       ++it;
00831     }
00832 
00833     fprintf(fichier,"\n");
00834     fclose(fichier);
00835 
00836     ++step;
00837     something_changed = false;
00838 /*     for (unsigned int i=0; i < localsize;++i) */
00839 /*       std::cerr << i << " " << indirection[i] << "\n"; */
00840   }
00841 
00843   bool isInBlock(unsigned int i,blocks b){ 
00844     return b.start<=i && b.end>i; 
00845   } 
00846 
00848   void setLocalSize(unsigned int size){ 
00849     localsize = size;
00850   } 
00851 
00852 /*   blocks & block(int i){return compressed_scheme[i];} */
00853 /*   int nb_blocks(){return compressed_scheme.size();} */
00854 
00855 /*   void deleteTmps(){ */
00856 /*     delete [] tmp; */
00857 /*     tmp = NULL; */
00858 /*   }; */
00859 
00861   std::map<int,std::list<blocks> > & GetCompressedScheme(){
00862     return compressed_scheme[sch_index];
00863   }
00865   std::map<int,std::list<blocks> > & GetNewCompressedScheme(){
00866     return compressed_scheme[schNew_index];
00867   }
00868 
00869 
00870  private:
00871 
00873   bool compressed;
00875   unsigned int totalsize;
00877   unsigned int localsize;
00878 
00880   int * indirection;
00882   std::map<int,int> nb_par_interlocuteurs;
00883 
00884   
00885   int schNew_index;
00886   int sch_index;
00887 
00889   std::map<int,std::list<blocks> > compressed_scheme[2];
00890 
00892   std::map<int,std::vector<std::pair<int,int> > > sent;
00894   std::map<int,std::vector<int> > received;
00895 
00896   
00897   std::vector<double> tmp;
00898 
00899   int step;
00900   bool something_changed;
00901 };
00902 
00903 #endif //DUODISTRIBUTEDVECTEUR_H

Generated on Fri Sep 7 13:12:33 2007 for LibMultiScale by  doxygen 1.5.2