]> git.donarmstrong.com Git - mothur.git/commitdiff
finished mpi for filter.seqs
authorwestcott <westcott>
Wed, 17 Mar 2010 17:47:33 +0000 (17:47 +0000)
committerwestcott <westcott>
Wed, 17 Mar 2010 17:47:33 +0000 (17:47 +0000)
distancecommand.cpp
distancecommand.h
filterseqscommand.cpp
filterseqscommand.h

index 23fe58c41fcf3e1a534ee5d0fbf8f809b3afdae2..4720df3cf0d968a84ba49c4af449b226a304c1cd 100644 (file)
@@ -385,6 +385,7 @@ int DistanceCommand::driver(int startLine, int endLine, string dFileName, float
                exit(1);
        }
 }
+#ifdef USE_MPI
 /**************************************************************************************************/
 /////// need to fix to work with calcs and sequencedb
 int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, float cutoff){
@@ -444,11 +445,11 @@ int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, flo
                return 1;
        }
        catch(exception& e) {
-               m->errorOut(e, "DistanceCommand", "driver");
+               m->errorOut(e, "DistanceCommand", "driverMPI");
                exit(1);
        }
 }
-
+#endif
 /**************************************************************************************************/
 int DistanceCommand::convertMatrix(string outputFile) {
        try{
index b0a3c049025066a566365db37b0229c92920e6bf..c1dac1443a4d2a213ab715d79908e6dd76027a62 100644 (file)
@@ -46,7 +46,10 @@ private:
        //void appendFiles(string, string);
        void createProcesses(string);
        int driver(/*Dist*, SequenceDB, */int, int, string, float);
+       
+       #ifdef USE_MPI 
        int driverMPI(int, int, MPI_File&, float);
+       #endif
        
        int convertMatrix(string);
        int convertToLowerTriangle(string);
index 6d894d866a2930eb009d20627cb03f509b94ef2e..b402a5bac5cc8cf8ad6a2ac4b4ab882fc91e306b 100644 (file)
@@ -172,6 +172,13 @@ int FilterSeqsCommand::execute() {
                
                if (m->control_pressed) { return 0; }
                
+               #ifdef USE_MPI
+                       int pid;
+                       MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+                                       
+                       if (pid == 0) { //only one process should output the filter
+               #endif
+               
                ofstream outFilter;
                
                string filterFile = outputDir + filterFileName + ".filter";
@@ -180,6 +187,9 @@ int FilterSeqsCommand::execute() {
                outFilter.close();
                outputNames.push_back(filterFile);
                
+               #ifdef USE_MPI
+                       }
+               #endif
                
                ////////////run filter/////////////////
                
@@ -216,7 +226,9 @@ int FilterSeqsCommand::execute() {
 /**************************************************************************************/
 int FilterSeqsCommand::filterSequences() {     
        try {
-       
+               
+               numSeqs = 0;
+               
                for (int s = 0; s < fastafileNames.size(); s++) {
                        
                                for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
@@ -247,11 +259,15 @@ int FilterSeqsCommand::filterSequences() {
                                if (pid == 0) { //you are the root process 
                                        
                                        setLines(fastafileNames[s]);
-                                               
+                                       
+                                       char bufF[alignmentLength];
+                                       strcpy(bufF, filter.c_str()); 
+                                                               
                                        for (int j = 0; j < lines.size(); j++) { //each process
                                                if (j != 0) { //don't send to yourself
                                                        MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
                                                        MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
+                                                       MPI_Send(bufF, alignmentLength, MPI_CHAR, j, tag, MPI_COMM_WORLD);
                                                }
                                        }
                                        
@@ -271,10 +287,14 @@ int FilterSeqsCommand::filterSequences() {
                                        
                                }else { //you are a child process
                                        //receive your section of file
-                                       int startPos, numLines, bufferSize;
+                                       int startPos, bufferSize;
+                                       char bufF[alignmentLength];
                                        MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
                                        MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-                                                                       
+                                       MPI_Recv(bufF, alignmentLength, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); 
+                                       
+                                       filter = bufF; //filter was made by process 0 so other processes need to get it
+                                                               
                                        //read your peice of file
                                        char buf2[bufferSize];
                                        MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status);
@@ -305,14 +325,14 @@ int FilterSeqsCommand::filterSequences() {
                                        
                                        driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                }else{
-                                       setLines(fastafileNames[s]);                                    
+                                       setLines(fastafileNames[s]);
                                        createProcessesRunFilter(filter, fastafileNames[s]); 
-                                       
+                               
                                        rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
                                
                                        //append fasta files
                                        for(int i=1;i<processors;i++){
-                                               appendAlignFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
+                                               appendFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
                                                remove((fastafileNames[s] + toString(processIDS[i]) + ".temp").c_str());
                                        }
                                }
@@ -407,7 +427,7 @@ int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string i
                                
                in.seekg(line->start);
                
-               for(int i=0;i<line->numSeqs;i++){
+               for(int i=0;i<line->num;i++){
                                
                                if (m->control_pressed) { in.close(); out.close(); return 0; }
                                
@@ -476,8 +496,7 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
 /**************************************************************************************/
 string FilterSeqsCommand::createFilter() {     
        try {
-               string filterString = "";
-               
+               string filterString = "";                       
                Filters F;
                
                if (soft != 0)                  {  F.setSoft(soft);             }
@@ -493,44 +512,46 @@ string FilterSeqsCommand::createFilter() {
                else                                            {       F.setFilter(string(alignmentLength, '1'));      }
                
                numSeqs = 0;
-               
                if(trump != '*' || isTrue(vertical) || soft != 0){
                        for (int s = 0; s < fastafileNames.size(); s++) {
                        
+                               for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
+                       
 #ifdef USE_MPI 
-                               int pid, rc, ierr
+                               int pid; 
                                int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
                                int tag = 2001;
                                
                                MPI_Status status; 
-                               MPI_File in; 
-                               rc = MPI_Comm_size(MPI_COMM_WORLD, &processors);
-                               rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+                               MPI_File inMPI
+                               MPI_Comm_size(MPI_COMM_WORLD, &processors);
+                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
                                                        
-                               char tempFileName[fastafileNames[s].length()];
-                               strcpy(tempFileName, fastafileNames[s].c_str());
-               cout << pid  << " tempFileName " << tempFileName << endl;               
-                               MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in);  //comm, filename, mode, info, filepointer
-               cout << pid  << " here" << endl;                        
+                               char* tempFileName = new char(fastafileNames[s].length());
+                               tempFileName = &(fastafileNames[s][0]);
+               
+                               MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI);  //comm, filename, mode, info, filepointer
+                               
                                if (pid == 0) { //you are the root process
                                                setLines(fastafileNames[s]);
-                               cout << pid  << " after setlines" << endl;                      
+                                       
                                                for (int j = 0; j < lines.size(); j++) { //each process
                                                        if (j != 0) { //don't send to yourself
                                                                MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
+                                                               MPI_Send(&numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD); 
                                                                MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
                                                        }
                                                }
-                               cout << pid << " done sending" << endl;
+                       
                                                char buf[bufferSizes[0]];
-                                               MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
-                       cout << pid << " done reading" << endl;
+                                               MPI_File_read_at(inMPI, 0, buf, bufferSizes[0], MPI_CHAR, &status);
+                       
                                                string tempBuf = buf;
                                                if (tempBuf.length() > bufferSizes[0]) { tempBuf = tempBuf.substr(0, bufferSizes[0]); }
 
                                                MPICreateFilter(F, tempBuf);
-                               cout << pid << "done with mpi create filter " << endl;                          
-                                               if (m->control_pressed) { MPI_File_close(&in); return filterString; }
+                                               
+                                               if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; }
                                                                                                
                                                vector<int> temp; temp.resize(alignmentLength+1);
                                                
@@ -538,7 +559,7 @@ string FilterSeqsCommand::createFilter() {
                                                for(int i = 0; i < ((processors-1)*5); i++) { 
                                                        MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); 
                                                        int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what letter count this is for
-                                       
+                               
                                                        if (receiveTag == Atag) { //you are recieveing the A frequencies
                                                                for (int k = 0; k < alignmentLength; k++) {             F.a[k] += temp[k];      }
                                                        }else if (receiveTag == Ttag) { //you are recieveing the T frequencies
@@ -554,21 +575,22 @@ string FilterSeqsCommand::createFilter() {
 
                                                
                                }else { //i am the child process
-                       cout << pid << endl;
+                       
                                        int startPos, bufferSize;
-                                       ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-                                       ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-                       cout << pid << '\t' << startPos << '\t' << bufferSize << endl;                                          
+                                       MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                                       MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                                       MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                                                               
                                        //send freqs
                                        char buf2[bufferSize];
-                                       MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
+                                       MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status);
                        
                                        string tempBuf = buf2;
                                        if (tempBuf.length() > bufferSize) { tempBuf = tempBuf.substr(0, bufferSize); }
                        
                                        MPICreateFilter(F, tempBuf);
-                               cout << pid << "done with mpi create filter " << endl;          
-                                       if (m->control_pressed) { MPI_File_close(&in); return filterString; }
+                               
+                                       if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; }
                                        
                                        //send my fequency counts
                                        F.a.push_back(Atag);
@@ -584,7 +606,7 @@ string FilterSeqsCommand::createFilter() {
                                }
                                
                                MPI_Barrier(MPI_COMM_WORLD);
-                               MPI_File_close(&in);
+                               MPI_File_close(&inMPI);
                                
 #else
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
@@ -596,7 +618,6 @@ string FilterSeqsCommand::createFilter() {
                                        
                                        numSeqs += numFastaSeqs;
                                        
-                                       for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
                                        lines.push_back(new linePair(0, numFastaSeqs));
                                        
                                        driverCreateFilter(F, fastafileNames[s], lines[0]);
@@ -614,7 +635,6 @@ string FilterSeqsCommand::createFilter() {
                                
                                numSeqs += numFastaSeqs;
                                
-                               for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
                                lines.push_back(new linePair(0, numFastaSeqs));
                                
                                driverCreateFilter(F, fastafileNames[s], lines[0]);
@@ -648,7 +668,7 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
                                
                in.seekg(line->start);
                
-               for(int i=0;i<line->numSeqs;i++){
+               for(int i=0;i<line->num;i++){
                                
                        if (m->control_pressed) { in.close(); return 1; }
                                        
@@ -666,7 +686,7 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
                }
                
                //report progress
-               if((line->numSeqs) % 100 != 0){ m->mothurOut(toString(line->numSeqs)); m->mothurOutEndLine();           }
+               if((line->num) % 100 != 0){     m->mothurOut(toString(line->num)); m->mothurOutEndLine();               }
                
                in.close();
                
@@ -751,7 +771,7 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
 
 int FilterSeqsCommand::setLines(string filename) {
        try {
-               for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
+               
                vector<long int> positions;
                bufferSizes.clear();
                
index 2f318c0abbb575b6087477344199275b61305706..1d2526fdca83114dcca76217917324d134bcc391 100644 (file)
@@ -26,8 +26,8 @@ public:
 private:
        struct linePair {
                int start;
-               int numSeqs;
-               linePair(long int i, int j) : start(i), numSeqs(j) {}
+               int num;
+               linePair(long int i, long int j) : start(i), num(j) {}
        };
        vector<linePair*> lines;
        vector<int> processIDS;