]> git.donarmstrong.com Git - mothur.git/commitdiff
paralellized phylo.diversity
authorwestcott <westcott>
Tue, 5 Oct 2010 10:08:43 +0000 (10:08 +0000)
committerwestcott <westcott>
Tue, 5 Oct 2010 10:08:43 +0000 (10:08 +0000)
chimeracheckcommand.cpp
distancecommand.cpp
filterseqscommand.cpp
makefile
mothur
phylodiversitycommand.cpp
phylodiversitycommand.h

index c8761b9bd51ea60fed4e9f5bc1636a8b0afb1987..ac092083a53944207b926a6bb9d725a2249b571a 100644 (file)
@@ -270,8 +270,8 @@ int ChimeraCheckCommand::execute(){
                                        
                                        //wait on chidren
                                        for(int j = 1; j < processors; j++) { 
-                                               char buf[4];
-                                               MPI_Recv(buf, 4, MPI_CHAR, j, tag, MPI_COMM_WORLD, &status); 
+                                               char buf[5];
+                                               MPI_Recv(buf, 5, MPI_CHAR, j, tag, MPI_COMM_WORLD, &status); 
                                        }
                                }else{ //you are a child process
                                        MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
@@ -289,9 +289,9 @@ int ChimeraCheckCommand::execute(){
                                        if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);   for (int j = 0; j < outputNames.size(); j++) {   remove(outputNames[j].c_str()); }  delete chimera; return 0;  }
                                        
                                        //tell parent you are done.
-                                       char buf[4];
+                                       char buf[5];
                                        strcpy(buf, "done"); 
-                                       MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+                                       MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
                                }
                                
                                //close files 
index 871e07b65510d33214d78c9efee3b3448d10d405..60e90500866673aa8e643e024c3c60b840140dce 100644 (file)
@@ -276,8 +276,8 @@ int DistanceCommand::execute(){
                                for(int i = 1; i < processors; i++) { 
                                        if (m->control_pressed) { MPI_File_close(&outMPI);  delete distCalculator;  return 0; }
                                        
-                                       char buf[4];
-                                       MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
+                                       char buf[5];
+                                       MPI_Recv(buf, 5, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
                                }
                        }else { //you are a child process
                                //do your part
@@ -285,10 +285,10 @@ int DistanceCommand::execute(){
                                
                                if (m->control_pressed) { MPI_File_close(&outMPI);  delete distCalculator;  return 0; }
                        
-                               char buf[4];
+                               char buf[5];
                                strcpy(buf, "done"); 
                                //tell parent you are done.
-                               MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+                               MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
                        }
                        
                        MPI_File_close(&outMPI);
index ffffe4e4224c64f0b6b78d4019f8486b47d8511c..a12058e4785d00e1c21b84ea453ce82c4f62797d 100644 (file)
@@ -310,8 +310,8 @@ int FilterSeqsCommand::filterSequences() {
                                        
                                        //wait on chidren
                                        for(int i = 1; i < processors; i++) { 
-                                               char buf[4];
-                                               MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
+                                               char buf[5];
+                                               MPI_Recv(buf, 5, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
                                        }
                                        
                                }else { //you are a child process
@@ -331,11 +331,11 @@ int FilterSeqsCommand::filterSequences() {
                                        
                                        if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);  return 0;  }
                                        
-                                       char buf[4];
+                                       char buf[5];
                                        strcpy(buf, "done"); 
                                        
                                        //tell parent you are done.
-                                       MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+                                       MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
                                }
                                
                                MPI_File_close(&outMPI);
index 0fd76c198f15e808686093df22a49eaa1c2b4246..a5c40be3dd5df280bc6dad68eeef5eff97b54ace 100644 (file)
--- a/makefile
+++ b/makefile
@@ -53,7 +53,7 @@ USEREADLINE ?= yes
 
 ifeq  ($(strip $(USEREADLINE)),yes)
     CXXFLAGS += -DUSE_READLINE
-    LDFLAGS += \
+    LIBS = \
       -lreadline\
       -lncurses
 endif
@@ -88,7 +88,7 @@ OBJECTS=$(patsubst %.cpp,%.o,$(wildcard *.cpp))
 OBJECTS+=$(patsubst %.c,%.o,$(wildcard *.c))
 
 mothur : $(OBJECTS)
-       $(CXX) $(LDFLAGS) $(TARGET_ARCH) -o $@ $(OBJECTS)
+       $(CXX) $(LDFLAGS) $(TARGET_ARCH) -o $@ $(OBJECTS) $(LIBS)
 
 install : mothur
        cp mothur ../Release/mothur
diff --git a/mothur b/mothur
index 648dccdfd360c3a212309afd720842a90afeddde..405237b5d8c315b0ed18f87f32d2b5cb7b748a9a 100755 (executable)
Binary files a/mothur and b/mothur differ
index 6be1fddb597dd5ab1c61f69ce59cf590c2307e37..6e9e7b1525301a59c825c283a4de3607f338a25e 100644 (file)
@@ -20,7 +20,7 @@ PhyloDiversityCommand::PhyloDiversityCommand(string option)  {
                
                else {
                        //valid paramters for this command
-                       string Array[] =  {"freq","rarefy","iters","groups","summary","collect","scale","outputdir","inputdir"};
+                       string Array[] =  {"freq","rarefy","iters","groups","processors","summary","collect","scale","outputdir","inputdir"};
                        vector<string> myArray (Array, Array+(sizeof(Array)/sizeof(string)));
                        
                        OptionParser parser(option);
@@ -59,6 +59,9 @@ PhyloDiversityCommand::PhyloDiversityCommand(string option)  {
                        temp = validParameter.validFile(parameters, "collect", false);                  if (temp == "not found") { temp = "F"; }
                        collect = m->isTrue(temp);
                        
+                       temp = validParameter.validFile(parameters, "processors", false);       if (temp == "not found"){       temp = "1";                             }
+                       convert(temp, processors); 
+                       
                        groups = validParameter.validFile(parameters, "groups", false);                 
                        if (groups == "not found") { groups = ""; Groups = globaldata->gTreemap->namesOfGroups;  globaldata->Groups = Groups;  }
                        else { 
@@ -80,7 +83,7 @@ PhyloDiversityCommand::PhyloDiversityCommand(string option)  {
 void PhyloDiversityCommand::help(){
        try {
                m->mothurOut("The phylo.diversity command can only be executed after a successful read.tree command.\n");
-               m->mothurOut("The phylo.diversity command parameters are groups, iters, freq, scale, rarefy, collect and summary.  No parameters are required.\n");
+               m->mothurOut("The phylo.diversity command parameters are groups, iters, freq, processors, scale, rarefy, collect and summary.  No parameters are required.\n");
                m->mothurOut("The groups parameter allows you to specify which of the groups in your groupfile you would like analyzed. The group names are separated by dashes. By default all groups are used.\n");
                m->mothurOut("The iters parameter allows you to specify the number of randomizations to preform, by default iters=1000, if you set rarefy to true.\n");
                m->mothurOut("The freq parameter is used indicate when to output your data, by default it is set to 100. But you can set it to a percentage of the number of sequence. For example freq=0.10, means 10%. \n");
@@ -88,6 +91,7 @@ void PhyloDiversityCommand::help(){
                m->mothurOut("The rarefy parameter allows you to create a rarefaction curve. The default is false.\n");
                m->mothurOut("The collect parameter allows you to create a collectors curve. The default is false.\n");
                m->mothurOut("The summary parameter allows you to create a .summary file. The default is true.\n");
+               m->mothurOut("The processors parameter allows you to specify the number of processors to use. The default is 1.\n");
                m->mothurOut("The phylo.diversity command should be in the following format: phylo.diversity(groups=yourGroups, rarefy=yourRarefy, iters=yourIters).\n");
                m->mothurOut("Example phylo.diversity(groups=A-B-C, rarefy=T, iters=500).\n");
                m->mothurOut("The phylo.diversity command output two files: .phylo.diversity and if rarefy=T, .rarefaction.\n");
@@ -177,8 +181,146 @@ int PhyloDiversityCommand::execute(){
                        for (int j = 0; j < globaldata->Groups.size(); j++) {  
                                if (numSampledList.count(diversity[globaldata->Groups[j]].size()-1) == 0) {  numSampledList.insert(diversity[globaldata->Groups[j]].size()-1); }
                        }
+                       
+                       #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                               if(processors == 1){
+                                       driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true);      
+                               }else{
+                                       if (rarefy) {
+                                               vector<int> procIters;
+                                               
+                                               int numItersPerProcessor = iters / processors;
+                                               
+                                               //divide iters between processes
+                                               for (int h = 0; h < processors; h++) {
+                                                       if(h == processors - 1){
+                                                               numItersPerProcessor = iters - h * numItersPerProcessor;
+                                                       }
+                                                       procIters.push_back(numItersPerProcessor);
+                                               }
+                                               
+                                               createProcesses(procIters, trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum); 
+                                               
+                                       }else{ //no need to paralellize if you dont want to rarefy
+                                               driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true);      
+                                       }
+                               }
+
+                       #else
+                               driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true);      
+                       #endif
+
+                       if (rarefy) {   printData(numSampledList, sumDiversity, outRare, iters);        }
+               }
+               
+       
+               if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) {        remove(outputNames[i].c_str());         } return 0; }
+
+               m->mothurOutEndLine();
+               m->mothurOut("Output File Names: "); m->mothurOutEndLine();
+               for (int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();    }
+               m->mothurOutEndLine();
 
-                       for (int l = 0; l < iters; l++) {
+               
+               return 0;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "PhyloDiversityCommand", "execute");
+               exit(1);
+       }
+}
+//**********************************************************************************************************************
+int PhyloDiversityCommand::createProcesses(vector<int>& procIters, Tree* t, map< string, vector<float> >& div, map<string, vector<float> >& sumDiv, int numIters, int increment, vector<int>& randomLeaf, set<int>& numSampledList, ofstream& outCollect, ofstream& outSum){
+       try {
+               #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+               int process = 1;
+               int num = 0;
+               vector<int> processIDS;
+               map< string, vector<float> >::iterator itSum;
+               
+               EstOutput results;
+               
+               //loop through and create all the processes you want
+               while (process != processors) {
+                       int pid = fork();
+                       
+                       if (pid > 0) {
+                               processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
+                               process++;
+                       }else if (pid == 0){
+                               driver(t, div, sumDiv, procIters[process], increment, randomLeaf, numSampledList, outCollect, outSum, false);
+                               
+                               string outTemp = outputDir + toString(getpid()) + ".sumDiv.temp";
+                               ofstream out;
+                               m->openOutputFile(outTemp, out);
+                               
+                               //output the sumDIversity
+                               for (itSum = sumDiv.begin(); itSum != sumDiv.end(); itSum++) {
+                                       out << itSum->first << '\t' << (itSum->second).size() << '\t';
+                                       for (int k = 0; k < (itSum->second).size(); k++) { 
+                                               out << (itSum->second)[k] << '\t';
+                                       }
+                                       out << endl;
+                               }
+                               
+                               out.close();
+                               
+                               exit(0);
+                       }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
+               }
+               
+               driver(t, div, sumDiv, procIters[0], increment, randomLeaf, numSampledList, outCollect, outSum, true);
+               
+               //force parent to wait until all the processes are done
+               for (int i=0;i<(processors-1);i++) { 
+                       int temp = processIDS[i];
+                       wait(&temp);
+               }
+               
+               //get data created by processes
+               for (int i=0;i<(processors-1);i++) { 
+                       
+                       //input the sumDIversity
+                       string inTemp = outputDir + toString(processIDS[i]) + ".sumDiv.temp";
+                       ifstream in;
+                       m->openInputFile(inTemp, in);
+                               
+                       //output the sumDIversity
+                       for (int j = 0; j < sumDiv.size(); j++) { 
+                               string group = "";
+                               int size = 0;
+                               
+                               in >> group >> size; m->gobble(in);
+                               
+                               for (int k = 0; k < size; k++) { 
+                                       float tempVal;
+                                       in >> tempVal;
+                                       
+                                       sumDiv[group][k] += tempVal;
+                               }
+                               m->gobble(in);
+                       }
+                               
+                       in.close();
+                       remove(inTemp.c_str());
+               }
+               
+#endif
+
+       return 0;               
+       
+       }
+       catch(exception& e) {
+               m->errorOut(e, "PhyloDiversityCommand", "createProcesses");
+               exit(1);
+       }
+}
+//**********************************************************************************************************************
+int PhyloDiversityCommand::driver(Tree* t, map< string, vector<float> >& div, map<string, vector<float> >& sumDiv, int numIters, int increment, vector<int>& randomLeaf, set<int>& numSampledList, ofstream& outCollect, ofstream& outSum, bool doSumCollect){
+       try {
+               int numLeafNodes = randomLeaf.size();
+       
+               for (int l = 0; l < numIters; l++) {
                                random_shuffle(randomLeaf.begin(), randomLeaf.end());
                
                                //initialize counts
@@ -188,26 +330,26 @@ int PhyloDiversityCommand::execute(){
                                
                                for(int k = 0; k < numLeafNodes; k++){
                                                
-                                       if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) {        remove(outputNames[i].c_str());         } return 0; }
+                                       if (m->control_pressed) { return 0; }
                                        
                                        //calc branch length of randomLeaf k
-                                       float br = calcBranchLength(trees[i], randomLeaf[k], countedBranch);
+                                       float br = calcBranchLength(t, randomLeaf[k], countedBranch);
                        
                                        //for each group in the groups update the total branch length accounting for the names file
-                                       vector<string> groups = trees[i]->tree[randomLeaf[k]].getGroup();
+                                       vector<string> groups = t->tree[randomLeaf[k]].getGroup();
                                        
                                        for (int j = 0; j < groups.size(); j++) {
                                                int numSeqsInGroupJ = 0;
                                                map<string, int>::iterator it;
-                                               it = trees[i]->tree[randomLeaf[k]].pcount.find(groups[j]);
-                                               if (it != trees[i]->tree[randomLeaf[k]].pcount.end()) { //this leaf node contains seqs from group j
+                                               it = t->tree[randomLeaf[k]].pcount.find(groups[j]);
+                                               if (it != t->tree[randomLeaf[k]].pcount.end()) { //this leaf node contains seqs from group j
                                                        numSeqsInGroupJ = it->second;
                                                }
                                                
-                                               if (numSeqsInGroupJ != 0) {     diversity[groups[j]][(counts[groups[j]]+1)] = diversity[groups[j]][counts[groups[j]]] + br;  }
+                                               if (numSeqsInGroupJ != 0) {     div[groups[j]][(counts[groups[j]]+1)] = div[groups[j]][counts[groups[j]]] + br;  }
                                                
                                                for (int s = (counts[groups[j]]+2); s <= (counts[groups[j]]+numSeqsInGroupJ); s++) {
-                                                       diversity[groups[j]][s] = diversity[groups[j]][s-1];  //update counts, but don't add in redundant branch lengths
+                                                       div[groups[j]][s] = div[groups[j]][s-1];  //update counts, but don't add in redundant branch lengths
                                                }
                                                counts[groups[j]] += numSeqsInGroupJ;
                                        }
@@ -216,35 +358,25 @@ int PhyloDiversityCommand::execute(){
                                if (rarefy) {
                                        //add this diversity to the sum
                                        for (int j = 0; j < globaldata->Groups.size(); j++) {  
-                                               for (int g = 0; g < diversity[globaldata->Groups[j]].size(); g++) {
-                                                       sumDiversity[globaldata->Groups[j]][g] += diversity[globaldata->Groups[j]][g];
+                                               for (int g = 0; g < div[globaldata->Groups[j]].size(); g++) {
+                                                       sumDiv[globaldata->Groups[j]][g] += div[globaldata->Groups[j]][g];
                                                }
                                        }
                                }
                                
-                               if ((collect) && (l == 0)) {  printData(numSampledList, diversity, outCollect, 1);  }
-                               if ((summary) && (l == 0)) {  printSumData(diversity, outSum, 1);  }
+                               if ((collect) && (l == 0) && doSumCollect) {  printData(numSampledList, div, outCollect, 1);  }
+                               if ((summary) && (l == 0) && doSumCollect) {  printSumData(div, outSum, 1);  }
                        }
                        
-                       if (rarefy) {   printData(numSampledList, sumDiversity, outRare, iters);        }
-               }
-               
-       
-               if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) {        remove(outputNames[i].c_str());         } return 0; }
+                       return 0;
 
-               m->mothurOutEndLine();
-               m->mothurOut("Output File Names: "); m->mothurOutEndLine();
-               for (int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();    }
-               m->mothurOutEndLine();
-
-               
-               return 0;
        }
        catch(exception& e) {
-               m->errorOut(e, "PhyloDiversityCommand", "execute");
+               m->errorOut(e, "PhyloDiversityCommand", "driver");
                exit(1);
        }
 }
+
 //**********************************************************************************************************************
 
 void PhyloDiversityCommand::printSumData(map< string, vector<float> >& div, ofstream& out, int numIters){
index a7e0d2e5d090f3e567446fe4e76d9ee3dd7d776b..7d4b2d89484a88e4bc5016f19e2e24c89ab07a4e 100644 (file)
@@ -26,7 +26,7 @@ class PhyloDiversityCommand : public Command {
                GlobalData* globaldata;
                
                float freq;
-               int iters;  
+               int iters, processors;  
                bool abort, rarefy, summary, collect, scale;
                string groups, outputDir;
                vector<string> Groups, outputNames; //holds groups to be used, and outputFile names
@@ -34,6 +34,9 @@ class PhyloDiversityCommand : public Command {
                void printData(set<int>&, map< string, vector<float> >&, ofstream&, int);
                void printSumData(map< string, vector<float> >&, ofstream&, int);
                float calcBranchLength(Tree*, int, map< string, set<int> >&);
+               int driver(Tree*, map< string, vector<float> >&, map<string, vector<float> >&, int, int, vector<int>&, set<int>&, ofstream&, ofstream&, bool);
+               int createProcesses(vector<int>&, Tree*, map< string, vector<float> >&, map<string, vector<float> >&, int, int, vector<int>&, set<int>&, ofstream&, ofstream&);
+
 };
 
 #endif