]> git.donarmstrong.com Git - mothur.git/blobdiff - summarysharedcommand.cpp
fixed bug in windows summary.seqs and summary.qual paralellization. paralellized...
[mothur.git] / summarysharedcommand.cpp
index 7116e46e0f9e96045fc3c60a717ae41819949118..f63a2ad16531731f6aee5eb7f6d20dcf81a85ee1 100644 (file)
@@ -8,47 +8,6 @@
  */
 
 #include "summarysharedcommand.h"
-#include "sharedsobscollectsummary.h"
-#include "sharedchao1.h"
-#include "sharedace.h"
-#include "sharednseqs.h"
-#include "sharedjabund.h"
-#include "sharedsorabund.h"
-#include "sharedjclass.h"
-#include "sharedsorclass.h"
-#include "sharedjest.h"
-#include "sharedsorest.h"
-#include "sharedthetayc.h"
-#include "sharedthetan.h"
-#include "sharedkstest.h"
-#include "whittaker.h"
-#include "sharedochiai.h"
-#include "sharedanderbergs.h"
-#include "sharedkulczynski.h"
-#include "sharedkulczynskicody.h"
-#include "sharedlennon.h"
-#include "sharedmorisitahorn.h"
-#include "sharedbraycurtis.h"
-#include "sharedjackknife.h"
-#include "whittaker.h"
-#include "odum.h"
-#include "canberra.h"
-#include "structeuclidean.h"
-#include "structchord.h"
-#include "hellinger.h"
-#include "manhattan.h"
-#include "structpearson.h"
-#include "soergel.h"
-#include "spearman.h"
-#include "structkulczynski.h"
-#include "structchi2.h"
-#include "speciesprofile.h"
-#include "hamming.h"
-#include "gower.h"
-#include "memchi2.h"
-#include "memchord.h"
-#include "memeuclidean.h"
-#include "mempearson.h"
 
 //**********************************************************************************************************************
 vector<string> SummarySharedCommand::setParameters(){  
@@ -508,152 +467,209 @@ int SummarySharedCommand::execute(){
 /***********************************************************/
 int SummarySharedCommand::process(vector<SharedRAbundVector*> thisLookup, string sumFileName, string sumAllFileName) {
        try {
-                       vector< vector<seqDist> > calcDists;  //vector containing vectors that contains the summary results for each group compare
-                       calcDists.resize(sumCalculators.size()); //one for each calc, this will be used to make .dist files
-                               
-                       #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
-                               if(processors == 1){
-                                       driver(thisLookup, 0, numGroups, sumFileName+".temp", sumAllFileName+".temp", calcDists);
-                                       m->appendFiles((sumFileName + ".temp"), sumFileName);
-                                       m->mothurRemove((sumFileName + ".temp"));
-                                       if (mult) {
-                                               m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
-                                               m->mothurRemove((sumAllFileName + ".temp"));
-                                       }
-                               }else{
-                                       int process = 1;
-                                       vector<int> processIDS;
-               
-                                       //loop through and create all the processes you want
-                                       while (process != processors) {
-                                               int pid = fork();
-                                               
-                                               if (pid > 0) {
-                                                       processIDS.push_back(pid); 
-                                                       process++;
-                                               }else if (pid == 0){
-                                                       driver(thisLookup, lines[process].start, lines[process].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);   
-                                                       
-                                                       //only do this if you want a distance file
-                                                       if (createPhylip) {
-                                                               string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(getpid()) + ".dist";
-                                                               ofstream outtemp;
-                                                               m->openOutputFile(tempdistFileName, outtemp);
-                                                               
-                                                               for (int i = 0; i < calcDists.size(); i++) {
-                                                                       outtemp << calcDists[i].size() << endl;
-                                                                       
-                                                                       for (int j = 0; j < calcDists[i].size(); j++) {
-                                                                               outtemp << calcDists[i][j].seq1 << '\t' << calcDists[i][j].seq2 << '\t' << calcDists[i][j].dist << endl;
-                                                                       }
-                                                               }
-                                                               outtemp.close();
-                                                       }
-                                                       
-                                                       exit(0);
-                                               }else { 
-                                                       m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); 
-                                                       for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
-                                                       exit(0);
-                                               }
-                                       }
-                                       
-                                       //parent do your part
-                                       driver(thisLookup, lines[0].start, lines[0].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);   
-                                       m->appendFiles((sumFileName + toString(getpid()) + ".temp"), sumFileName);
-                                       m->mothurRemove((sumFileName + toString(getpid()) + ".temp"));
-                                       if (mult) { m->appendFiles((sumAllFileName + toString(getpid()) + ".temp"), sumAllFileName); }
-                                               
-                                       //force parent to wait until all the processes are done
-                                       for (int i = 0; i < processIDS.size(); i++) {
-                                               int temp = processIDS[i];
-                                               wait(&temp);
-                                       }
-                                       
-                                       for (int i = 0; i < processIDS.size(); i++) {
-                                               m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
-                                               m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
-                                               if (mult) {     m->mothurRemove((sumAllFileName + toString(processIDS[i]) + ".temp"));  }
-                                               
-                                               if (createPhylip) {
-                                                       string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(processIDS[i]) +  ".dist";
-                                                       ifstream intemp;
-                                                       m->openInputFile(tempdistFileName, intemp);
-                                                       
-                                                       for (int k = 0; k < calcDists.size(); k++) {
-                                                               int size = 0;
-                                                               intemp >> size; m->gobble(intemp);
-                                                                       
-                                                               for (int j = 0; j < size; j++) {
-                                                                       int seq1 = 0;
-                                                                       int seq2 = 0;
-                                                                       float dist = 1.0;
-                                                                       
-                                                                       intemp >> seq1 >> seq2 >> dist;   m->gobble(intemp);
-                                                                       
-                                                                       seqDist tempDist(seq1, seq2, dist);
-                                                                       calcDists[k].push_back(tempDist);
-                                                               }
-                                                       }
-                                                       intemp.close();
-                                                       m->mothurRemove(tempdistFileName);
-                                               }
-                                       }
+        vector< vector<seqDist> > calcDists;  //vector containing vectors that contains the summary results for each group compare
+        calcDists.resize(sumCalculators.size()); //one for each calc, this will be used to make .dist files
+        
+        
+        if(processors == 1){
+            driver(thisLookup, 0, numGroups, sumFileName+".temp", sumAllFileName+".temp", calcDists);
+            m->appendFiles((sumFileName + ".temp"), sumFileName);
+            m->mothurRemove((sumFileName + ".temp"));
+            if (mult) {
+                m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
+                m->mothurRemove((sumAllFileName + ".temp"));
+            }
+        }else{
+            
+            int process = 1;
+            vector<int> processIDS;
+            
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+            //loop through and create all the processes you want
+            while (process != processors) {
+                int pid = fork();
+                
+                if (pid > 0) {
+                    processIDS.push_back(pid); 
+                    process++;
+                }else if (pid == 0){
+                    driver(thisLookup, lines[process].start, lines[process].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);   
+                    
+                    //only do this if you want a distance file
+                    if (createPhylip) {
+                        string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(getpid()) + ".dist";
+                        ofstream outtemp;
+                        m->openOutputFile(tempdistFileName, outtemp);
+                        
+                        for (int i = 0; i < calcDists.size(); i++) {
+                            outtemp << calcDists[i].size() << endl;
+                            
+                            for (int j = 0; j < calcDists[i].size(); j++) {
+                                outtemp << calcDists[i][j].seq1 << '\t' << calcDists[i][j].seq2 << '\t' << calcDists[i][j].dist << endl;
+                            }
+                        }
+                        outtemp.close();
+                    }
+                    
+                    exit(0);
+                }else { 
+                    m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); 
+                    for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+                    exit(0);
+                }
+            }
+            
+            //parent do your part
+            driver(thisLookup, lines[0].start, lines[0].end, sumFileName + toString(getpid()) + ".temp", sumAllFileName + toString(getpid()) + ".temp", calcDists);   
+            m->appendFiles((sumFileName + toString(getpid()) + ".temp"), sumFileName);
+            m->mothurRemove((sumFileName + toString(getpid()) + ".temp"));
+            if (mult) { m->appendFiles((sumAllFileName + toString(getpid()) + ".temp"), sumAllFileName); }
+            
+            //force parent to wait until all the processes are done
+            for (int i = 0; i < processIDS.size(); i++) {
+                int temp = processIDS[i];
+                wait(&temp);
+            }
+            
+            for (int i = 0; i < processIDS.size(); i++) {
+                m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
+                m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
+                if (mult) {    m->mothurRemove((sumAllFileName + toString(processIDS[i]) + ".temp"));  }
+                
+                if (createPhylip) {
+                    string tempdistFileName = m->getRootName(m->getSimpleName(sumFileName)) + toString(processIDS[i]) +  ".dist";
+                    ifstream intemp;
+                    m->openInputFile(tempdistFileName, intemp);
+                    
+                    for (int k = 0; k < calcDists.size(); k++) {
+                        int size = 0;
+                        intemp >> size; m->gobble(intemp);
+                        
+                        for (int j = 0; j < size; j++) {
+                            int seq1 = 0;
+                            int seq2 = 0;
+                            float dist = 1.0;
+                            
+                            intemp >> seq1 >> seq2 >> dist;   m->gobble(intemp);
+                            
+                            seqDist tempDist(seq1, seq2, dist);
+                            calcDists[k].push_back(tempDist);
+                        }
+                    }
+                    intemp.close();
+                    m->mothurRemove(tempdistFileName);
+                }
+            }
+#else
+            //////////////////////////////////////////////////////////////////////////////////////////////////////
+            //Windows version shared memory, so be careful when passing variables through the summarySharedData struct. 
+            //Above fork() will clone, so memory is separate, but that's not the case with windows, 
+            //Taking advantage of shared memory to pass results vectors.
+            //////////////////////////////////////////////////////////////////////////////////////////////////////
 
-                               }
-                       #else
-                               driver(thisLookup, 0, numGroups, (sumFileName + ".temp"), (sumAllFileName + ".temp"), calcDists);
-                               m->appendFiles((sumFileName + ".temp"), sumFileName);
-                               m->mothurRemove((sumFileName + ".temp"));
-                               if (mult) {
-                                       m->appendFiles((sumAllFileName + ".temp"), sumAllFileName);
-                                       m->mothurRemove((sumAllFileName + ".temp"));
-                               }
-                       #endif
-                       
-                       if (createPhylip) {
-                               for (int i = 0; i < calcDists.size(); i++) {
-                                       if (m->control_pressed) { break; }
+            vector<summarySharedData*> pDataArray; 
+            DWORD   dwThreadIdArray[processors-1];
+            HANDLE  hThreadArray[processors-1]; 
+            
+            //Create processor worker threads.
+            for( int i=1; i<processors; i++ ){
+                
+                //make copy of lookup so we don't get access violations
+                vector<SharedRAbundVector*> newLookup;
+                for (int k = 0; k < thisLookup.size(); k++) {
+                    SharedRAbundVector* temp = new SharedRAbundVector();
+                    temp->setLabel(thisLookup[k]->getLabel());
+                    temp->setGroup(thisLookup[k]->getGroup());
+                    newLookup.push_back(temp);
+                }
+                
+                //for each bin
+                for (int k = 0; k < thisLookup[0]->getNumBins(); k++) {
+                    if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) {  delete newLookup[j];  } return 0; }
+                    for (int j = 0; j < thisLookup.size(); j++) { newLookup[j]->push_back(thisLookup[j]->getAbundance(k), thisLookup[j]->getGroup()); }
+                }
+
+                // Allocate memory for thread data.
+                summarySharedData* tempSum = new summarySharedData((sumFileName+toString(i)+".temp"), m, lines[i].start, lines[i].end, Estimators, newLookup);
+                pDataArray.push_back(tempSum);
+                processIDS.push_back(i);
+                
+                hThreadArray[i-1] = CreateThread(NULL, 0, MySummarySharedThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);   
+            }
+            
+            //parent do your part
+            driver(thisLookup, lines[0].start, lines[0].end, sumFileName +"0.temp", sumAllFileName + "0.temp", calcDists);   
+            m->appendFiles((sumFileName + "0.temp"), sumFileName);
+            m->mothurRemove((sumFileName + "0.temp"));
+            if (mult) { m->appendFiles((sumAllFileName + "0.temp"), sumAllFileName); }
+            
+            //Wait until all threads have terminated.
+            WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+            
+            //Close all thread handles and free memory allocations.
+            for(int i=0; i < pDataArray.size(); i++){
+                m->appendFiles((sumFileName + toString(processIDS[i]) + ".temp"), sumFileName);
+                m->mothurRemove((sumFileName + toString(processIDS[i]) + ".temp"));
+                
+                for (int j = 0; j < pDataArray[i]->thisLookup.size(); j++) {  delete pDataArray[i]->thisLookup[j];  } 
+                
+                if (createPhylip) {
+                    for (int k = 0; k < calcDists.size(); k++) {
+                        int size = pDataArray[i]->calcDists[k].size();
+                        for (int j = 0; j < size; j++) {    calcDists[k].push_back(pDataArray[i]->calcDists[k][j]);    }
+                    }
+                }
+              
+                CloseHandle(hThreadArray[i]);
+                delete pDataArray[i];
+            }
+
+#endif
+        }
+              
+        if (createPhylip) {
+            for (int i = 0; i < calcDists.size(); i++) {
+                if (m->control_pressed) { break; }
                                
-                                       string distFileName = outputDir + m->getRootName(m->getSimpleName(sumFileName)) + sumCalculators[i]->getName() + "." + thisLookup[0]->getLabel() + ".dist";
-                                       outputNames.push_back(distFileName);
-                                       ofstream outDist;
-                                       m->openOutputFile(distFileName, outDist);
-                                       outDist.setf(ios::fixed, ios::floatfield); outDist.setf(ios::showpoint);
-                                       
-                                       //initialize matrix
-                                       vector< vector<float> > matrix; //square matrix to represent the distance
-                                       matrix.resize(thisLookup.size());
-                                       for (int k = 0; k < thisLookup.size(); k++) {  matrix[k].resize(thisLookup.size(), 0.0); }
-                                       
-                                       
-                                       for (int j = 0; j < calcDists[i].size(); j++) {
-                                               int row = calcDists[i][j].seq1;
-                                               int column = calcDists[i][j].seq2;
-                                               float dist = calcDists[i][j].dist;
-                                               
-                                               matrix[row][column] = dist;
-                                               matrix[column][row] = dist;
-                                       }
+                string distFileName = outputDir + m->getRootName(m->getSimpleName(sumFileName)) + sumCalculators[i]->getName() + "." + thisLookup[0]->getLabel() + ".dist";
+                outputNames.push_back(distFileName);
+                ofstream outDist;
+                m->openOutputFile(distFileName, outDist);
+                outDist.setf(ios::fixed, ios::floatfield); outDist.setf(ios::showpoint);
+                
+                //initialize matrix
+                vector< vector<float> > matrix; //square matrix to represent the distance
+                matrix.resize(thisLookup.size());
+                for (int k = 0; k < thisLookup.size(); k++) {  matrix[k].resize(thisLookup.size(), 0.0); }
+                
+                
+                for (int j = 0; j < calcDists[i].size(); j++) {
+                    int row = calcDists[i][j].seq1;
+                    int column = calcDists[i][j].seq2;
+                    float dist = calcDists[i][j].dist;
+                    
+                    matrix[row][column] = dist;
+                    matrix[column][row] = dist;
+                }
+                
+                //output to file
+                outDist << thisLookup.size() << endl;
+                for (int r=0; r<thisLookup.size(); r++) { 
+                    //output name
+                    string name = thisLookup[r]->getGroup();
+                    if (name.length() < 10) { //pad with spaces to make compatible
+                        while (name.length() < 10) {  name += " ";  }
+                    }
+                    outDist << name << '\t';
                                        
-                                       //output to file
-                                       outDist << thisLookup.size() << endl;
-                                       for (int r=0; r<thisLookup.size(); r++) { 
-                                               //output name
-                                               string name = thisLookup[r]->getGroup();
-                                               if (name.length() < 10) { //pad with spaces to make compatible
-                                                       while (name.length() < 10) {  name += " ";  }
-                                               }
-                                               outDist << name << '\t';
-                                       
-                                               //output distances
-                                               for (int l = 0; l < r; l++) {   outDist  << matrix[r][l] << '\t';  }
-                                               outDist << endl;
-                                       }
-                                       
-                                       outDist.close();
-                               }
-                       }
+                    //output distances
+                    for (int l = 0; l < r; l++) {      outDist  << matrix[r][l] << '\t';  }
+                    outDist << endl;
+                }
+                
+                outDist.close();
+            }
+        }
                return 0;
        }
        catch(exception& e) {