]> git.donarmstrong.com Git - mothur.git/blobdiff - preclustercommand.cpp
added processors to pre.cluster for mac and windows. Fixed bug in reNameFile function...
[mothur.git] / preclustercommand.cpp
index 4b04529af1f8181efff2dff2986cf0ab52065a58..0810a02d4351ff6b0285b7a28bf42e3a5bda4f71 100644 (file)
@@ -8,12 +8,8 @@
  */
 
 #include "preclustercommand.h"
-#include "sequenceparser.h"
 #include "deconvolutecommand.h"
 
-//**********************************************************************************************************************
-inline bool comparePriority(seqPNode first, seqPNode second) {  return (first.numIdentical > second.numIdentical); }
-
 //**********************************************************************************************************************
 vector<string> PreClusterCommand::setParameters(){     
        try {
@@ -21,7 +17,7 @@ vector<string> PreClusterCommand::setParameters(){
                CommandParameter pname("name", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(pname);
                CommandParameter pgroup("group", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(pgroup);
                CommandParameter pdiffs("diffs", "Number", "", "0", "", "", "",false,false); parameters.push_back(pdiffs);
-               CommandParameter pbygroup("bygroup", "Boolean", "", "T", "", "", "",false,false); parameters.push_back(pbygroup);
+               CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors);
                CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir);
                CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir);
                
@@ -42,9 +38,8 @@ string PreClusterCommand::getHelpString(){
                helpString += "The pre.cluster command outputs a new fasta and name file.\n";
                helpString += "The pre.cluster command parameters are fasta, names and diffs. The fasta parameter is required. \n";
                helpString += "The names parameter allows you to give a list of seqs that are identical. This file is 2 columns, first column is name or representative sequence, second column is a list of its identical sequences separated by commas.\n";
-               helpString += "The group parameter allows you to provide a group file. \n";
+               helpString += "The group parameter allows you to provide a group file so you can cluster by group. \n";
                helpString += "The diffs parameter allows you to specify maximum number of mismatched bases allowed between sequences in a grouping. The default is 1.\n";
-               helpString += "The bygroup parameter allows you to indicate you whether you want to cluster sequences only within each group, default=T, when a groupfile is given. \n";
                helpString += "The pre.cluster command should be in the following format: \n";
                helpString += "pre.cluster(fasta=yourFastaFile, names=yourNamesFile, diffs=yourMaxDiffs) \n";
                helpString += "Example pre.cluster(fasta=amazon.fasta, diffs=2).\n";
@@ -154,22 +149,18 @@ PreClusterCommand::PreClusterCommand(string option) {
                        else {  m->setNameFile(namefile); }
                        
                        groupfile = validParameter.validFile(parameters, "group", true);
-                       if (groupfile == "not found") { groupfile =  "";  }
+                       if (groupfile == "not found") { groupfile =  "";  bygroup = false; }
                        else if (groupfile == "not open") { abort = true; groupfile =  ""; }    
-                       else {   m->setGroupFile(groupfile); }
+                       else {   m->setGroupFile(groupfile); bygroup = true;  }
                        
                        string temp     = validParameter.validFile(parameters, "diffs", false);         if(temp == "not found"){        temp = "1"; }
                        convert(temp, diffs); 
                        
-                       temp = validParameter.validFile(parameters, "bygroup", false);                  
-                       if (temp == "not found") { 
-                               if (groupfile != "") { temp = "T"; }
-                               else { temp = "F"; }
-                       }
-                       bygroup = m->isTrue(temp); 
+                       temp = validParameter.validFile(parameters, "processors", false);       if (temp == "not found"){       temp = m->getProcessors();      }
+                       m->setProcessors(temp);
+                       convert(temp, processors);
+                       
                        
-                       if (bygroup && (groupfile == "")) { m->mothurOut("You cannot set bygroup=T, unless you provide a groupfile."); m->mothurOutEndLine(); abort=true; }
-
                }
                                
        }
@@ -203,40 +194,17 @@ int PreClusterCommand::execute(){
                        
                        vector<string> groups = parser->getNamesOfGroups();
                        
-                       //precluster each group
-                       for (int i = 0; i < groups.size(); i++) {
-                               
-                               start = time(NULL);
-                               
-                               if (m->control_pressed) {  delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; }
-                               
-                               m->mothurOutEndLine(); m->mothurOut("Processing group " + groups[i] + ":"); m->mothurOutEndLine();
-                               
-                               map<string, string> thisNameMap;
-                               if (namefile != "") { thisNameMap = parser->getNameMap(groups[i]); }
-                               vector<Sequence> thisSeqs = parser->getSeqs(groups[i]);
-                               
-                               //fill alignSeqs with this groups info.
-                               int numSeqs = loadSeqs(thisNameMap, thisSeqs);
-                               
-                               if (m->control_pressed) {  delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; }
-                               
-                               if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); return 0;  }
-                               
-                               int count = process();
-                               
-                               if (m->control_pressed) {  delete parser; m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; }
-
-                               m->mothurOut("Total number of sequences before pre.cluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine();
-                               m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); 
-                               printData(newFastaFile, newNamesFile);
-                               
-                               m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); 
-                               
-                       }
+//#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+                       if(processors == 1)     {       driverGroups(parser, newFastaFile, newNamesFile, 0, groups.size(), groups);     }
+                       else                            {       createProcessesGroups(parser, newFastaFile, newNamesFile, groups);                      }
+//#else
+//                     driverGroups(parser, newFastaFile, newNamesFile, 0, groups.size(), groups);
+//#endif
                        
                        delete parser;
                        
+                       if (m->control_pressed) { m->mothurRemove(newFastaFile); m->mothurRemove(newNamesFile); return 0; }
+                       
                        //run unique.seqs for deconvolute results
                        string inputString = "fasta=" + newFastaFile;
                        if (namefile != "") { inputString += ", name=" + newNamesFile; }
@@ -254,6 +222,8 @@ int PreClusterCommand::execute(){
                        m->mothurOut("/******************************************/"); m->mothurOutEndLine(); 
                        
                        m->renameFile(filenames["fasta"][0], newFastaFile);
+                       
+                       m->mothurOut("It took " + toString(time(NULL) - start) + " secs to run pre.cluster."); m->mothurOutEndLine(); 
                                
                }else {
                        if (namefile != "") { readNameFile(); }
@@ -306,6 +276,157 @@ int PreClusterCommand::execute(){
        }
 }
 /**************************************************************************************************/
+int PreClusterCommand::createProcessesGroups(SequenceParser* parser, string newFName, string newNName, vector<string> groups) {
+       try {
+               
+               vector<int> processIDS;
+               int process = 1;
+               int num = 0;
+               
+               //sanity check
+               if (groups.size() < processors) { processors = groups.size(); }
+               
+               //divide the groups between the processors
+               vector<linePair> lines;
+               int numGroupsPerProcessor = groups.size() / processors;
+               for (int i = 0; i < processors; i++) {
+                       int startIndex =  i * numGroupsPerProcessor;
+                       int endIndex = (i+1) * numGroupsPerProcessor;
+                       if(i == (processors - 1)){      endIndex = groups.size();       }
+                       lines.push_back(linePair(startIndex, endIndex));
+               }
+               
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)          
+               
+               //loop through and create all the processes you want
+               while (process != processors) {
+                       int pid = fork();
+                       
+                       if (pid > 0) {
+                               processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
+                               process++;
+                       }else if (pid == 0){
+                               num = driverGroups(parser, newFName + toString(getpid()) + ".temp", newNName + toString(getpid()) + ".temp", lines[process].start, lines[process].end, groups);
+                               exit(0);
+                       }else { 
+                               m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); 
+                               for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+                               exit(0);
+                       }
+               }
+               
+               //do my part
+               num = driverGroups(parser, newFName, newNName, lines[0].start, lines[0].end, groups);
+               
+               //force parent to wait until all the processes are done
+               for (int i=0;i<processIDS.size();i++) { 
+                       int temp = processIDS[i];
+                       wait(&temp);
+               }
+               
+#else
+               
+               //////////////////////////////////////////////////////////////////////////////////////////////////////
+               //Windows version shared memory, so be careful when passing variables through the preClusterData struct. 
+               //Above fork() will clone, so memory is separate, but that's not the case with windows, 
+               //////////////////////////////////////////////////////////////////////////////////////////////////////
+               
+               vector<preClusterData*> pDataArray; 
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1]; 
+               
+               //Create processor worker threads.
+               for( int i=1; i<processors; i++ ){
+                       // Allocate memory for thread data.
+                       string extension = toString(i) + ".temp";
+                       
+                       preClusterData* tempPreCluster = new preClusterData(fastafile, namefile, groupfile, (newFName+extension), (newNName+extension), groups, m, lines[i].start, lines[i].end, diffs, i);
+                       pDataArray.push_back(tempPreCluster);
+                       processIDS.push_back(i);
+                       
+                       //MySeqSumThreadFunction is in header. It must be global or static to work with the threads.
+                       //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+                       hThreadArray[i-1] = CreateThread(NULL, 0, MyPreclusterThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);   
+               }
+               
+                               
+               //using the main process as a worker saves time and memory
+               num = driverGroups(parser, newFName, newNName, lines[0].start, lines[0].end, groups);
+               
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               }
+               
+#endif         
+               
+               //append output files
+               for(int i=0;i<processIDS.size();i++){
+                       m->appendFiles((newFName + toString(processIDS[i]) + ".temp"), newFName);
+                       m->mothurRemove((newFName + toString(processIDS[i]) + ".temp"));
+                       
+                       m->appendFiles((newNName + toString(processIDS[i]) + ".temp"), newNName);
+                       m->mothurRemove((newNName + toString(processIDS[i]) + ".temp"));
+               }
+               
+               return num;     
+               
+       }
+       catch(exception& e) {
+               m->errorOut(e, "PreClusterCommand", "createProcessesGroups");
+               exit(1);
+       }
+}
+/**************************************************************************************************/
+int PreClusterCommand::driverGroups(SequenceParser* parser, string newFFile, string newNFile, int start, int end, vector<string> groups){
+       try {
+               
+               int numSeqs = 0;
+               
+               //precluster each group
+               for (int i = start; i < end; i++) {
+                       
+                       start = time(NULL);
+                       
+                       if (m->control_pressed) {  return 0; }
+                       
+                       m->mothurOutEndLine(); m->mothurOut("Processing group " + groups[i] + ":"); m->mothurOutEndLine();
+                       
+                       map<string, string> thisNameMap;
+                       if (namefile != "") { thisNameMap = parser->getNameMap(groups[i]); }
+                       vector<Sequence> thisSeqs = parser->getSeqs(groups[i]);
+                       
+                       //fill alignSeqs with this groups info.
+                       numSeqs = loadSeqs(thisNameMap, thisSeqs);
+                       
+                       if (m->control_pressed) {   return 0; }
+                       
+                       if (diffs > length) { m->mothurOut("Error: diffs is greater than your sequence length."); m->mothurOutEndLine(); m->control_pressed = true; return 0;  }
+                       
+                       int count = process();
+                       
+                       if (m->control_pressed) {  return 0; }
+                       
+                       m->mothurOut("Total number of sequences before pre.cluster was " + toString(alignSeqs.size()) + "."); m->mothurOutEndLine();
+                       m->mothurOut("pre.cluster removed " + toString(count) + " sequences."); m->mothurOutEndLine(); m->mothurOutEndLine(); 
+                       printData(newFFile, newNFile);
+                       
+                       m->mothurOut("It took " + toString(time(NULL) - start) + " secs to cluster " + toString(numSeqs) + " sequences."); m->mothurOutEndLine(); 
+                       
+               }
+               
+               return numSeqs;
+       }
+       catch(exception& e) {
+               m->errorOut(e, "PreClusterCommand", "driverGroups");
+               exit(1);
+       }
+}
+/**************************************************************************************************/
 int PreClusterCommand::process(){
        try {