]> git.donarmstrong.com Git - mothur.git/commitdiff
paralellized filter.seqs for windows
authorSarah Westcott <mothur.westcott@gmail.com>
Tue, 21 Feb 2012 16:11:35 +0000 (11:11 -0500)
committerSarah Westcott <mothur.westcott@gmail.com>
Tue, 21 Feb 2012 16:11:35 +0000 (11:11 -0500)
filterseqscommand.cpp
filterseqscommand.h

index 82c73f3d439c1d3cc154c8dac67398075134a1e8..806ca0d69cabd6056de8ce1870c628e40916e080 100644 (file)
@@ -420,9 +420,9 @@ int FilterSeqsCommand::filterSequences() {
                                MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
                                
 #else
                                MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
                                
 #else
-                       
+                       vector<unsigned long long> positions = savedPositions[s];
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
-                       vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
+                       //vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
                        
                        for (int i = 0; i < (positions.size()-1); i++) {
                                lines.push_back(new linePair(positions[i], positions[(i+1)]));
                        
                        for (int i = 0; i < (positions.size()-1); i++) {
                                lines.push_back(new linePair(positions[i], positions[(i+1)]));
@@ -432,23 +432,31 @@ int FilterSeqsCommand::filterSequences() {
                                        int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                        numSeqs += numFastaSeqs;
                                }else{
                                        int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                        numSeqs += numFastaSeqs;
                                }else{
-                                       int numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s]); 
+                                       int numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s], filteredFasta); 
                                        numSeqs += numFastaSeqs;
                                        numSeqs += numFastaSeqs;
-                               
-                                       rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
-                               
-                                       //append fasta files
-                                       for(int i=1;i<processors;i++){
-                                               m->appendFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
-                                               m->mothurRemove((fastafileNames[s] + toString(processIDS[i]) + ".temp"));
-                                       }
                                }
                                
                                if (m->control_pressed) {  return 1; }
                #else
                                }
                                
                                if (m->control_pressed) {  return 1; }
                #else
-                               lines.push_back(new linePair(0, 1000));
+            if(processors == 1){
+                lines.push_back(new linePair(0, 1000));
                                int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                numSeqs += numFastaSeqs;
                                int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
                                numSeqs += numFastaSeqs;
+            }else {
+                int numFastaSeqs = positions.size()-1;
+                //positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs); 
+                
+                //figure out how many sequences you have to process
+                int numSeqsPerProcessor = numFastaSeqs / processors;
+                for (int i = 0; i < processors; i++) {
+                    int startIndex =  i * numSeqsPerProcessor;
+                    if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;   }
+                    lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+                }
+                
+                numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s], filteredFasta); 
+                numSeqs += numFastaSeqs;
+            }
 
                                if (m->control_pressed) {  return 1; }
                #endif
 
                                if (m->control_pressed) {  return 1; }
                #endif
@@ -596,12 +604,15 @@ int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string i
 }
 /**************************************************************************************************/
 
 }
 /**************************************************************************************************/
 
-int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
+int FilterSeqsCommand::createProcessesRunFilter(string F, string filename, string filteredFastaName) {
        try {
        try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
-               int process = 0;
+        
+        int process = 1;
                int num = 0;
                processIDS.clear();
                int num = 0;
                processIDS.clear();
+        
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+               
                
                //loop through and create all the processes you want
                while (process != processors) {
                
                //loop through and create all the processes you want
                while (process != processors) {
@@ -629,8 +640,10 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
                        }
                }
                
                        }
                }
                
+        num = driverRunFilter(F, filteredFastaName, filename, lines[0]);
+        
                //force parent to wait until all the processes are done
                //force parent to wait until all the processes are done
-               for (int i=0;i<processors;i++) { 
+               for (int i=0;i<processIDS.size();i++) { 
                        int temp = processIDS[i];
                        wait(&temp);
                }       
                        int temp = processIDS[i];
                        wait(&temp);
                }       
@@ -641,11 +654,56 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
                        m->openInputFile(tempFile, in);
                        if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
                        in.close(); m->mothurRemove(tempFile);
                        m->openInputFile(tempFile, in);
                        if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
                        in.close(); m->mothurRemove(tempFile);
+            
+            m->appendFiles((filename + toString(processIDS[i]) + ".temp"), filteredFastaName);
+            m->mothurRemove((filename + toString(processIDS[i]) + ".temp"));
                }
                }
-
-               
-               return num;
-#endif         
+               
+#else
+        
+        //////////////////////////////////////////////////////////////////////////////////////////////////////
+               //Windows version shared memory, so be careful when passing variables through the filterData struct. 
+               //Above fork() will clone, so memory is separate, but that's not the case with windows, 
+               //Taking advantage of shared memory to allow both threads to add info to F.
+               //////////////////////////////////////////////////////////////////////////////////////////////////////
+               
+               vector<filterRunData*> pDataArray; 
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1]; 
+               
+               //Create processor worker threads.
+               for( int i=0; i<processors-1; i++){
+                       
+            string extension = "";
+                       if (i != 0) { extension = toString(i) + ".temp"; }
+            
+                       filterRunData* tempFilter = new filterRunData(filter, filename, (filteredFastaName + extension), m, lines[i]->start, lines[i]->end, alignmentLength, i);
+                       pDataArray.push_back(tempFilter);
+                       processIDS.push_back(i);
+            
+                       hThreadArray[i] = CreateThread(NULL, 0, MyRunFilterThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);   
+               }
+        
+        num = driverRunFilter(F, (filteredFastaName + toString(processors-1) + ".temp"), filename, lines[processors-1]);
+        
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+                       num += pDataArray[i]->count;
+            CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               }
+        
+        for (int i = 1; i < processors; i++) {
+            m->appendFiles((filteredFastaName + toString(i) + ".temp"), filteredFastaName);
+            m->mothurRemove((filteredFastaName + toString(i) + ".temp"));
+               }
+#endif 
+        
+        return num;
+        
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "createProcessesRunFilter");
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "createProcessesRunFilter");
@@ -740,9 +798,9 @@ string FilterSeqsCommand::createFilter() {
                                
 #else
                                
                                
 #else
                                
-               
+                vector<unsigned long long> positions;
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
-                               vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
+                               positions = m->divideFile(fastafileNames[s], processors);
                                for (int i = 0; i < (positions.size()-1); i++) {
                                        lines.push_back(new linePair(positions[i], positions[(i+1)]));
                                }       
                                for (int i = 0; i < (positions.size()-1); i++) {
                                        lines.push_back(new linePair(positions[i], positions[(i+1)]));
                                }       
@@ -754,14 +812,31 @@ string FilterSeqsCommand::createFilter() {
                                        int numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]); 
                                        numSeqs += numFastaSeqs;
                                }
                                        int numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]); 
                                        numSeqs += numFastaSeqs;
                                }
-                               
-                               if (m->control_pressed) {  return filterString; }
                #else
                #else
-                               lines.push_back(new linePair(0, 1000));
-                               int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
-                               numSeqs += numFastaSeqs;
-                               if (m->control_pressed) {  return filterString; }
+                if(processors == 1){
+                    lines.push_back(new linePair(0, 1000));
+                    int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
+                    numSeqs += numFastaSeqs;
+                               }else {
+                    int numFastaSeqs = 0;
+                    positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs); 
+                    
+                    //figure out how many sequences you have to process
+                    int numSeqsPerProcessor = numFastaSeqs / processors;
+                    for (int i = 0; i < processors; i++) {
+                        int startIndex =  i * numSeqsPerProcessor;
+                        if(i == (processors - 1)){     numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;   }
+                        lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
+                    }
+                    
+                    numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]); 
+                                       numSeqs += numFastaSeqs;
+                }
                #endif
                #endif
+                //save the file positions so we can reuse them in the runFilter function
+                savedPositions[s] = positions;
+                
+                               if (m->control_pressed) {  return filterString; }
 #endif
                        
                        }
 #endif
                        
                        }
@@ -848,7 +923,7 @@ string FilterSeqsCommand::createFilter() {
        
        MPI_Barrier(MPI_COMM_WORLD);
 #endif
        
        MPI_Barrier(MPI_COMM_WORLD);
 #endif
-                               
+            
                return filterString;
        }
        catch(exception& e) {
                return filterString;
        }
        catch(exception& e) {
@@ -954,11 +1029,12 @@ int FilterSeqsCommand::MPICreateFilter(int start, int num, Filters& F, MPI_File&
 
 int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
        try {
 
 int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
        try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
-               int process = 1;
+        int process = 1;
                int num = 0;
                processIDS.clear();
                int num = 0;
                processIDS.clear();
-               
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                               
                //loop through and create all the processes you want
                while (process != processors) {
                        int pid = fork();
                //loop through and create all the processes you want
                while (process != processors) {
                        int pid = fork();
@@ -1033,8 +1109,50 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
                        m->mothurRemove(tempFilename);
                }
                
                        m->mothurRemove(tempFilename);
                }
                
-               return num;
-#endif         
+               
+#else
+        
+        //////////////////////////////////////////////////////////////////////////////////////////////////////
+               //Windows version shared memory, so be careful when passing variables through the filterData struct. 
+               //Above fork() will clone, so memory is separate, but that's not the case with windows, 
+               //Taking advantage of shared memory to allow both threads to add info to F.
+               //////////////////////////////////////////////////////////////////////////////////////////////////////
+               
+               vector<filterData*> pDataArray; 
+               DWORD   dwThreadIdArray[processors];
+               HANDLE  hThreadArray[processors]; 
+               
+               //Create processor worker threads.
+               for( int i=0; i<processors; i++ ){
+                       
+                       filterData* tempFilter = new filterData(filename, m, lines[i]->start, lines[i]->end, alignmentLength, trump, vertical, soft, hard, i);
+                       pDataArray.push_back(tempFilter);
+                       processIDS.push_back(i);
+            
+                       hThreadArray[i] = CreateThread(NULL, 0, MyCreateFilterThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);   
+               }
+        
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+                       num += pDataArray[i]->count;
+            F.mergeFilter(pDataArray[i]->F.getFilter());
+            
+                       for (int k = 0; k < alignmentLength; k++) {      F.a[k] += pDataArray[i]->F.a[k];       }
+                       for (int k = 0; k < alignmentLength; k++) {      F.t[k] += pDataArray[i]->F.t[k];       }
+                       for (int k = 0; k < alignmentLength; k++) {      F.g[k] += pDataArray[i]->F.g[k];       }
+                       for (int k = 0; k < alignmentLength; k++) {      F.c[k] += pDataArray[i]->F.c[k];       }
+                       for (int k = 0; k < alignmentLength; k++) {      F.gap[k] += pDataArray[i]->F.gap[k];   }
+
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               }
+               
+#endif 
+        return num;
+        
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "createProcessesCreateFilter");
        }
        catch(exception& e) {
                m->errorOut(e, "FilterSeqsCommand", "createProcessesCreateFilter");
index 3bf36c040231c5f801281f9d553c60bed01101c8..16062f33e79582a13e56154a9e0830998cc0d5e0 100644 (file)
@@ -40,6 +40,7 @@ private:
 \r
        vector<linePair*> lines;\r
        vector<int> processIDS;\r
 \r
        vector<linePair*> lines;\r
        vector<int> processIDS;\r
+    map<int, vector<unsigned long long> > savedPositions;\r
 \r
        string vertical, filter, fasta, hard, outputDir, filterFileName;\r
        vector<string> fastafileNames;  \r
 \r
        string vertical, filter, fasta, hard, outputDir, filterFileName;\r
        vector<string> fastafileNames;  \r
@@ -55,7 +56,7 @@ private:
        string createFilter();\r
        int filterSequences();\r
        int createProcessesCreateFilter(Filters&, string);\r
        string createFilter();\r
        int filterSequences();\r
        int createProcessesCreateFilter(Filters&, string);\r
-       int createProcessesRunFilter(string, string);\r
+       int createProcessesRunFilter(string, string, string);\r
        int driverRunFilter(string, string, string, linePair*);\r
        int driverCreateFilter(Filters& F, string filename, linePair* line);\r
        #ifdef USE_MPI\r
        int driverRunFilter(string, string, string, linePair*);\r
        int driverCreateFilter(Filters& F, string filename, linePair* line);\r
        #ifdef USE_MPI\r
@@ -65,4 +66,179 @@ private:
        \r
 };\r
 \r
        \r
 };\r
 \r
+\r
+/**************************************************************************************************/\r
+//custom data structure for threads to use.\r
+// This is passed by void pointer so it can be any data type\r
+// that can be passed using a single void pointer (LPVOID).\r
+struct filterData {\r
+       Filters F;\r
+    int count, tid, alignmentLength;\r
+    unsigned long long start, end;\r
+    MothurOut* m;\r
+    string filename, vertical, hard;\r
+    char trump;\r
+    float soft;\r
+       \r
+       filterData(){}\r
+       filterData(string fn, MothurOut* mout, unsigned long long st, unsigned long long en, int aLength, char tr, string vert, float so, string ha, int t) {\r
+        filename = fn;\r
+               m = mout;\r
+               start = st;\r
+               end = en;\r
+        tid = t;\r
+        trump = tr;\r
+        alignmentLength = aLength;\r
+        vertical = vert;\r
+        soft = so;\r
+        hard = ha;\r
+               count = 0;\r
+       }\r
+};\r
+/**************************************************************************************************/\r
+//custom data structure for threads to use.\r
+// This is passed by void pointer so it can be any data type\r
+// that can be passed using a single void pointer (LPVOID).\r
+struct filterRunData {\r
+    int count, tid, alignmentLength;\r
+    unsigned long long start, end;\r
+    MothurOut* m;\r
+    string filename;\r
+    string filter, outputFilename;\r
+       \r
+       filterRunData(){}\r
+       filterRunData(string f, string fn, string ofn, MothurOut* mout, unsigned long long st, unsigned long long en, int aLength, int t) {\r
+        filter = f;\r
+        outputFilename = ofn;\r
+        filename = fn;\r
+               m = mout;\r
+               start = st;\r
+               end = en;\r
+        tid = t;\r
+        alignmentLength = aLength;\r
+               count = 0;\r
+       }\r
+};\r
+\r
+/**************************************************************************************************/\r
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)\r
+#else\r
+static DWORD WINAPI MyCreateFilterThreadFunction(LPVOID lpParam){ \r
+       filterData* pDataArray;\r
+       pDataArray = (filterData*)lpParam;\r
+       \r
+       try {\r
+\r
+               if (pDataArray->soft != 0)                      {  pDataArray->F.setSoft(pDataArray->soft);             }\r
+               if (pDataArray->trump != '*')           {  pDataArray->F.setTrump(pDataArray->trump);   }\r
+               \r
+               pDataArray->F.setLength(pDataArray->alignmentLength);\r
+               \r
+               if(pDataArray->trump != '*' || pDataArray->m->isTrue(pDataArray->vertical) || pDataArray->soft != 0){\r
+                       pDataArray->F.initialize();\r
+               }\r
+               \r
+               if(pDataArray->hard.compare("") != 0)   {       pDataArray->F.doHard(pDataArray->hard);         }\r
+               else                                            {       pDataArray->F.setFilter(string(pDataArray->alignmentLength, '1'));      }\r
+        \r
+               ifstream in;\r
+               pDataArray->m->openInputFile(pDataArray->filename, in);\r
+        \r
+               //print header if you are process 0\r
+               if ((pDataArray->start == 0) || (pDataArray->start == 1)) {\r
+                       in.seekg(0);\r
+               }else { //this accounts for the difference in line endings. \r
+                       in.seekg(pDataArray->start-1); pDataArray->m->gobble(in); \r
+               }\r
+               \r
+               pDataArray->count = pDataArray->end;\r
+               for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process\r
+                       \r
+                       if (pDataArray->m->control_pressed) { in.close(); pDataArray->count = 1; return 1; }\r
+                       \r
+                       Sequence current(in); pDataArray->m->gobble(in); \r
+                       \r
+                       if (current.getName() != "") {\r
+                               if (current.getAligned().length() != pDataArray->alignmentLength) { pDataArray->m->mothurOut("Sequences are not all the same length, please correct."); pDataArray->m->mothurOutEndLine(); pDataArray->m->control_pressed = true;  }\r
+                \r
+                if(pDataArray->trump != '*')                   {       pDataArray->F.doTrump(current);         }\r
+                if(pDataArray->m->isTrue(pDataArray->vertical) || pDataArray->soft != 0)       {       pDataArray->F.getFreqs(current);        }\r
+                       }\r
+            \r
+            //report progress\r
+                       if((i) % 100 == 0){     pDataArray->m->mothurOut(toString(i)); pDataArray->m->mothurOutEndLine();               }\r
+               }\r
+               \r
+        if((pDataArray->count) % 100 != 0){    pDataArray->m->mothurOut(toString(pDataArray->count)); pDataArray->m->mothurOutEndLine();               }\r
+        \r
+               in.close();\r
+               \r
+               return 0;\r
+               \r
+       }\r
+       catch(exception& e) {\r
+               pDataArray->m->errorOut(e, "FilterSeqsCommand", "MyCreateFilterThreadFunction");\r
+               exit(1);\r
+       }\r
+} \r
+/**************************************************************************************************/\r
+static DWORD WINAPI MyRunFilterThreadFunction(LPVOID lpParam){ \r
+       filterRunData* pDataArray;\r
+       pDataArray = (filterRunData*)lpParam;\r
+       \r
+       try {\r
+        \r
+        ofstream out;\r
+               pDataArray->m->openOutputFile(pDataArray->outputFilename, out);\r
+\r
+               ifstream in;\r
+               pDataArray->m->openInputFile(pDataArray->filename, in);\r
+        \r
+               //print header if you are process 0\r
+               if ((pDataArray->start == 0) || (pDataArray->start == 1)) {\r
+                       in.seekg(0);\r
+               }else { //this accounts for the difference in line endings. \r
+                       in.seekg(pDataArray->start-1); pDataArray->m->gobble(in); \r
+               }\r
+               \r
+               pDataArray->count = pDataArray->end;\r
+               for(int i = 0; i < pDataArray->end; i++){ //end is the number of sequences to process\r
+                       \r
+                       if (pDataArray->m->control_pressed) { in.close(); out.close(); pDataArray->count = 1; return 1; }\r
+                       \r
+                       Sequence seq(in); pDataArray->m->gobble(in);\r
+            if (seq.getName() != "") {\r
+                string align = seq.getAligned();\r
+                string filterSeq = "";\r
+                \r
+                for(int j=0;j<pDataArray->alignmentLength;j++){\r
+                    if(pDataArray->filter[j] == '1'){\r
+                        filterSeq += align[j];\r
+                    }\r
+                }\r
+                \r
+                out << '>' << seq.getName() << endl << filterSeq << endl;\r
+            }\r
+            \r
+            //report progress\r
+                       if((i) % 100 == 0){     pDataArray->m->mothurOut(toString(i)); pDataArray->m->mothurOutEndLine();               }\r
+               }\r
+               \r
+        if((pDataArray->count) % 100 != 0){    pDataArray->m->mothurOut(toString(pDataArray->count)); pDataArray->m->mothurOutEndLine();               }\r
+        \r
+               in.close();\r
+        out.close();\r
+               \r
+               return 0;\r
+               \r
+       }\r
+       catch(exception& e) {\r
+               pDataArray->m->errorOut(e, "FilterSeqsCommand", "MyRunFilterThreadFunction");\r
+               exit(1);\r
+       }\r
+} \r
+/**************************************************************************************************/\r
+#endif\r
+\r
+\r
 #endif\r
 #endif\r