]> git.donarmstrong.com Git - mothur.git/blobdiff - getmetacommunitycommand.cpp
changes while testing
[mothur.git] / getmetacommunitycommand.cpp
index 59efe608e624ebc4ab9e71ce77aba23554bd40cb..8f78ca2a45fce56f6ef8996c880b48677ae32357 100644 (file)
@@ -7,7 +7,7 @@
 //
 
 #include "getmetacommunitycommand.h"
-#include "qFinderDMM.h"
+
 
 //**********************************************************************************************************************
 vector<string> GetMetaCommunityCommand::setParameters(){
@@ -16,8 +16,9 @@ vector<string> GetMetaCommunityCommand::setParameters(){
         CommandParameter pgroups("groups", "String", "", "", "", "", "","",false,false); parameters.push_back(pgroups);
                CommandParameter plabel("label", "String", "", "", "", "", "","",false,false); parameters.push_back(plabel);
         CommandParameter pminpartitions("minpartitions", "Number", "", "5", "", "", "","",false,false,true); parameters.push_back(pminpartitions);
-        CommandParameter pmaxpartitions("maxpartitions", "Number", "", "10", "", "", "","",false,false,true); parameters.push_back(pmaxpartitions);
+        CommandParameter pmaxpartitions("maxpartitions", "Number", "", "100", "", "", "","",false,false,true); parameters.push_back(pmaxpartitions);
         CommandParameter poptimizegap("optimizegap", "Number", "", "3", "", "", "","",false,false,true); parameters.push_back(poptimizegap);
+        CommandParameter pprocessors("processors", "Number", "", "1", "", "", "","",false,false,true); parameters.push_back(pprocessors);
                CommandParameter pinputdir("inputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(pinputdir);
                CommandParameter poutputdir("outputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(poutputdir);
                
@@ -34,13 +35,14 @@ vector<string> GetMetaCommunityCommand::setParameters(){
 string GetMetaCommunityCommand::getHelpString(){
        try {
                string helpString = "";
-               helpString += "The get.metacommunity command parameters are shared, label, groups, minpartitions, maxpartitions and optimizegap. The shared file is required. \n";
+               helpString += "The get.communitytype command parameters are shared, label, groups, minpartitions, maxpartitions, optimizegap and processors. The shared file is required. \n";
         helpString += "The label parameter is used to analyze specific labels in your input. labels are separated by dashes.\n";
                helpString += "The groups parameter allows you to specify which of the groups in your shared file you would like analyzed.  Group names are separated by dashes.\n";
                helpString += "The minpartitions parameter is used to .... Default=5.\n";
         helpString += "The maxpartitions parameter is used to .... Default=10.\n";
         helpString += "The optimizegap parameter is used to .... Default=3.\n";
-               helpString += "The get.metacommunity command should be in the following format: get.metacommunity(shared=yourSharedFile).\n";
+        helpString += "The processors parameter allows you to specify number of processors to use.  The default is 1.\n";
+               helpString += "The get.communitytype command should be in the following format: get.communitytype(shared=yourSharedFile).\n";
                return helpString;
        }
        catch(exception& e) {
@@ -55,7 +57,7 @@ string GetMetaCommunityCommand::getOutputPattern(string type) {
         
         if (type == "fit")              {  pattern = "[filename],[distance],mix.fit"; }
         else if (type == "relabund")    {  pattern = "[filename],[distance],[tag],mix.relabund"; }
-        else if (type == "design")      {  pattern = "[filename],mix.design"; }
+        else if (type == "design")      {  pattern = "[filename],[distance],mix.design"; }
         else if (type == "matrix")      {  pattern = "[filename],[distance],[tag],mix.posterior"; }
         else if (type == "parameters")  {  pattern = "[filename],[distance],mix.parameters"; }
         else if (type == "summary")  {  pattern = "[filename],[distance],mix.summary"; }
@@ -154,6 +156,10 @@ GetMetaCommunityCommand::GetMetaCommunityCommand(string option)  {
             temp = validParameter.validFile(parameters, "optimizegap", false);          if (temp == "not found"){      temp = "3";      }
                        m->mothurConvert(temp, optimizegap);
             
+            temp = validParameter.validFile(parameters, "processors", false);  if (temp == "not found"){       temp = m->getProcessors();      }
+                       m->setProcessors(temp);
+                       m->mothurConvert(temp, processors);
+            
             string groups = validParameter.validFile(parameters, "groups", false);
                        if (groups == "not found") { groups = ""; }
                        else { m->splitAtDash(groups, Groups); }
@@ -197,7 +203,7 @@ int GetMetaCommunityCommand::execute(){
                 
                 m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
                 
-                process(lookup);
+                createProcesses(lookup);
                 
                 processedLabels.insert(lookup[0]->getLabel());
                 userLabels.erase(lookup[0]->getLabel());
@@ -210,7 +216,7 @@ int GetMetaCommunityCommand::execute(){
                 lookup = input.getSharedRAbundVectors(lastLabel);
                 m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
                 
-                process(lookup);
+                createProcesses(lookup);
                 
                 processedLabels.insert(lookup[0]->getLabel());
                 userLabels.erase(lookup[0]->getLabel());
@@ -251,7 +257,7 @@ int GetMetaCommunityCommand::execute(){
             
             m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
             
-            process(lookup);
+            createProcesses(lookup);
             
             for (int i = 0; i < lookup.size(); i++) {  delete lookup[i];  }
         }
@@ -270,17 +276,247 @@ int GetMetaCommunityCommand::execute(){
        }
 }
 //**********************************************************************************************************************
-int GetMetaCommunityCommand::process(vector<SharedRAbundVector*>& thislookup){
+int GetMetaCommunityCommand::createProcesses(vector<SharedRAbundVector*>& thislookup){
        try {
         
-        double minLaplace = 1e10;
+        #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+        #else 
+        processors=1; //qFinderDMM not thread safe
+        #endif
+        
+        vector<int> processIDS;
+               int process = 1;
+               int num = 0;
         int minPartition = 0;
+               
+               //sanity check
+               if (maxpartitions < processors) { processors = maxpartitions; }
         
         map<string, string> variables;
         variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile));
         variables["[distance]"] = thislookup[0]->getLabel();
                string outputFileName = getOutputFileName("fit", variables);
         outputNames.push_back(outputFileName); outputTypes["fit"].push_back(outputFileName);
+                
+               //divide the partitions between the processors
+               vector< vector<int> > dividedPartitions;
+        vector< vector<string> > rels, matrix;
+        vector<string> doneFlags;
+        dividedPartitions.resize(processors);
+        rels.resize(processors);
+        matrix.resize(processors);
+        
+        //for each file group figure out which process will complete it
+        //want to divide the load intelligently so the big files are spread between processes
+        for (int i=1; i<=maxpartitions; i++) {
+            //cout << i << endl;
+            int processToAssign = (i+1) % processors;
+            if (processToAssign == 0) { processToAssign = processors; }
+            
+            if (m->debug) { m->mothurOut("[DEBUG]: assigning " + toString(i) + " to process " + toString(processToAssign-1) + "\n"); }
+            dividedPartitions[(processToAssign-1)].push_back(i);
+            
+            variables["[tag]"] = toString(i);
+            string relName = getOutputFileName("relabund", variables);
+            string mName = getOutputFileName("matrix", variables);
+            rels[(processToAssign-1)].push_back(relName);
+            matrix[(processToAssign-1)].push_back(mName);
+        }
+        
+        for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours
+            string tempDoneFile = m->getRootName(m->getSimpleName(sharedfile)) + toString(i) + ".done.temp";
+            doneFlags.push_back(tempDoneFile);
+            ofstream out;
+            m->openOutputFile(tempDoneFile, out); //clear out 
+            out.close();
+        }
+        
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+               
+               //loop through and create all the processes you want
+               while (process != processors) {
+                       int pid = fork();
+                       
+                       if (pid > 0) {
+                               processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
+                               process++;
+                       }else if (pid == 0){
+                outputNames.clear();
+                               num = processDriver(thislookup, dividedPartitions[process], (outputFileName + toString(getpid())), rels[process], matrix[process], doneFlags, process);
+                
+                //pass numSeqs to parent
+                               ofstream out;
+                               string tempFile = toString(getpid()) + ".outputNames.temp";
+                               m->openOutputFile(tempFile, out);
+                out << num << endl;
+                out << outputNames.size() << endl;
+                               for (int i = 0; i < outputNames.size(); i++) { out << outputNames[i] << endl; }
+                               out.close();
+                
+                               exit(0);
+                       }else {
+                               m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+                               for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+                               exit(0);
+                       }
+               }
+               
+               //do my part
+        m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n");
+               minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0);
+               
+               //force parent to wait until all the processes are done
+               for (int i=0;i<processIDS.size();i++) {
+                       int temp = processIDS[i];
+                       wait(&temp);
+               }
+        
+        vector<string> tempOutputNames = outputNames;
+        for (int i=0;i<processIDS.size();i++) {
+            ifstream in;
+                       string tempFile = toString(processIDS[i]) + ".outputNames.temp";
+                       m->openInputFile(tempFile, in);
+                       if (!in.eof()) {
+                int tempNum = 0;
+                in >> tempNum; m->gobble(in);
+                if (tempNum < minPartition) { minPartition = tempNum; }
+                in >> tempNum; m->gobble(in);
+                for (int i = 0; i < tempNum; i++) {
+                    string tempName = "";
+                    in >> tempName; m->gobble(in);
+                    tempOutputNames.push_back(tempName);
+                }
+            }
+                       in.close(); m->mothurRemove(tempFile);
+            
+            m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName);
+            m->mothurRemove(outputFileName + toString(processIDS[i]));
+        }
+        
+        if (processors > 1) { 
+            outputNames.clear();
+            for (int i = 0; i < tempOutputNames.size(); i++) { //remove files if needed
+                string name = tempOutputNames[i];
+                vector<string> parts;
+                m->splitAtChar(name, parts, '.');
+                bool keep = true;
+                if (((parts[parts.size()-1] == "relabund") || (parts[parts.size()-1] == "posterior")) && (parts[parts.size()-2] == "mix")) {
+                    string tempNum = parts[parts.size()-3];
+                    int num;  m->mothurConvert(tempNum, num);
+                    //if (num > minPartition) {
+                     //   m->mothurRemove(tempOutputNames[i]);
+                    //    keep = false; if (m->debug) { m->mothurOut("[DEBUG]: removing " + tempOutputNames[i] + ".\n"); }
+                    //}
+                }
+                if (keep) { outputNames.push_back(tempOutputNames[i]); }
+            }
+            
+            //reorder fit file
+            ifstream in;
+            m->openInputFile(outputFileName, in);
+            string headers = m->getline(in); m->gobble(in);
+            
+            map<int, string> file;
+            while (!in.eof()) {
+                string numString, line;
+                int num;
+                in >> numString; line = m->getline(in); m->gobble(in);
+                m->mothurConvert(numString, num);
+                file[num] = line;
+            }
+            in.close();
+            ofstream out;
+            m->openOutputFile(outputFileName, out);
+            out << headers << endl;
+            for (map<int, string>::iterator it = file.begin(); it != file.end(); it++) {
+                out << it->first << '\t' << it->second << endl;
+                if (m->debug) { m->mothurOut("[DEBUG]: printing: " + toString(it->first) + '\t' + it->second + ".\n"); }
+            }
+            out.close();
+        }
+        
+#else
+        /*
+        vector<metaCommunityData*> pDataArray;
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1];
+        
+               //Create processor worker threads.
+               for( int i=1; i<processors; i++ ){
+            //copy lookup
+            //make copy of lookup so we don't get access violations
+            vector<SharedRAbundVector*> newLookup;
+            for (int k = 0; k < thislookup.size(); k++) {
+                SharedRAbundVector* temp = new SharedRAbundVector();
+                temp->setLabel(thislookup[k]->getLabel());
+                temp->setGroup(thislookup[k]->getGroup());
+                newLookup.push_back(temp);
+            }
+            
+            //for each bin
+            for (int k = 0; k < thislookup[0]->getNumBins(); k++) {
+                if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) {  delete newLookup[j];  } return 0; }
+                for (int j = 0; j < thislookup.size(); j++) { newLookup[j]->push_back(thislookup[j]->getAbundance(k), thislookup[j]->getGroup()); }
+            }
+
+            processIDS.push_back(i);
+            
+                       // Allocate memory for thread data.
+                       metaCommunityData* tempMeta = new metaCommunityData(newLookup, m, dividedPartitions[i], outputFileName + toString(i), rels[i], matrix[i], minpartitions, maxpartitions, optimizegap);
+                       pDataArray.push_back(tempMeta);
+                       
+                       hThreadArray[i] = CreateThread(NULL, 0, MyMetaCommunityThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+               }
+               
+        //do my part
+               minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0]);
+        
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+            if (pDataArray[i]->minPartition < minPartition) { minPartition = pDataArray[i]->minPartition; }
+            for (int j = 0; j < pDataArray[i]->outputNames.size(); j++) {
+                outputNames.push_back(pDataArray[i]->outputNames[j]);
+            }
+            m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName);
+            m->mothurRemove(outputFileName + toString(processIDS[i]));
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               } */
+        //do my part
+        m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n");
+               minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0);
+#endif
+        for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours
+            string tempDoneFile = m->getRootName(m->getSimpleName(sharedfile)) + toString(i) + ".done.temp";
+            m->mothurRemove(tempDoneFile);
+        }
+        
+        if (m->control_pressed) { return 0; }
+        
+        if (m->debug) { m->mothurOut("[DEBUG]: minPartition = " + toString(minPartition) + "\n"); }
+        
+        //run generate Summary function for smallest minPartition
+        variables["[tag]"] = toString(minPartition);
+        generateSummaryFile(minPartition, variables);
+        
+        return 0;
+
+    }
+       catch(exception& e) {
+               m->errorOut(e, "GetMetaCommunityCommand", "createProcesses");
+               exit(1);
+       }
+}
+//**********************************************************************************************************************
+int GetMetaCommunityCommand::processDriver(vector<SharedRAbundVector*>& thislookup, vector<int>& parts, string outputFileName, vector<string> relabunds, vector<string> matrix, vector<string> doneFlags, int processID){
+       try {
+        
+        double minLaplace = 1e10;
+        int minPartition = 0;
         
                ofstream fitData;
                m->openOutputFile(outputFileName, fitData);
@@ -290,15 +526,31 @@ int GetMetaCommunityCommand::process(vector<SharedRAbundVector*>& thislookup){
         cout.setf(ios::showpoint);
 
         vector< vector<int> > sharedMatrix;
-        for (int i = 0; i < thislookup.size(); i++) { sharedMatrix.push_back(thislookup[i]->getAbundances()); }
+        vector<string> thisGroups;
+        for (int i = 0; i < thislookup.size(); i++) { sharedMatrix.push_back(thislookup[i]->getAbundances()); thisGroups.push_back(thislookup[i]->getGroup()); }
         
-        m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n");
         fitData << "K\tNLE\tlogDet\tBIC\tAIC\tLaplace" << endl;
         
-        for(int numPartitions=1;numPartitions<=maxpartitions;numPartitions++){
+        for(int i=0;i<parts.size();i++){
+            
+            int numPartitions = parts[i];
+            
+            if (m->debug) { m->mothurOut("[DEBUG]: running partition " + toString(numPartitions) + "\n"); }
             
             if (m->control_pressed) { break; }
             
+            //check to see if anyone else is done
+            for (int j = 0; j < doneFlags.size(); j++) {
+                if (!m->isBlank(doneFlags[j])) { //another process has finished
+                    //are they done at a lower partition?
+                    ifstream in;
+                    m->openInputFile(doneFlags[j], in);
+                    int tempNum;
+                    in >> tempNum; in.close();
+                    if (tempNum < numPartitions) { break; } //quit, because someone else has finished
+                }
+            }
+            
             qFinderDMM findQ(sharedMatrix, numPartitions);
             
             double laplace = findQ.getLaplace();
@@ -319,43 +571,47 @@ int GetMetaCommunityCommand::process(vector<SharedRAbundVector*>& thislookup){
             }
             m->mothurOutEndLine();
             
-            variables["[tag]"] = toString(numPartitions);
-            string relabund = getOutputFileName("relabund", variables);
+            string relabund = relabunds[i];
             outputNames.push_back(relabund); outputTypes["relabund"].push_back(relabund);
-            string matrix = getOutputFileName("matrix", variables);
-            outputNames.push_back(matrix); outputTypes["matrix"].push_back(matrix);
+            string matrixName = matrix[i];
+            outputNames.push_back(matrixName); outputTypes["matrix"].push_back(matrixName);
             
-            findQ.printZMatrix(matrix, m->getGroups());
+            findQ.printZMatrix(matrixName, thisGroups);
             findQ.printRelAbund(relabund, m->currentBinLabels);
             
-            if(optimizegap != -1 && (numPartitions - minPartition) >= optimizegap && numPartitions >= minpartitions){ break;  }
+            if(optimizegap != -1 && (numPartitions - minPartition) >= optimizegap && numPartitions >= minpartitions){
+                string tempDoneFile = m->getRootName(m->getSimpleName(sharedfile)) + toString(processID) + ".done.temp";
+                ofstream outDone;
+                m->openOutputFile(tempDoneFile, outDone);
+                outDone << minPartition << endl;
+                outDone.close();
+                break;
+            }
         }
         fitData.close();
         
         //minPartition = 4;
         
         if (m->control_pressed) { return 0; }
-        
-        generateSummaryFile(minpartitions, outputTypes["relabund"][0], outputTypes["relabund"][outputTypes["relabund"].size()-1], outputTypes["matrix"][outputTypes["matrix"].size()-1], thislookup[0]->getLabel());
 
-        
-        return 0;
+        return minPartition;
     }
        catch(exception& e) {
-               m->errorOut(e, "GetMetaCommunityCommand", "process");
+               m->errorOut(e, "GetMetaCommunityCommand", "processDriver");
                exit(1);
        }
 }
 /**************************************************************************************************/
 
-vector<double> GetMetaCommunityCommand::generateDesignFile(int numPartitions, string input){
+vector<double> GetMetaCommunityCommand::generateDesignFile(int numPartitions, map<string,string> variables){
     try {
         vector<double> piValues(numPartitions, 0);
         
         ifstream postFile;
+        variables["[tag]"] = toString(numPartitions);
+        string input = getOutputFileName("matrix", variables);
         m->openInputFile(input, postFile);//((fileRoot + toString(numPartitions) + "mix.posterior").c_str()); //matrix file
-        map<string, string> variables;
-        variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(input));
+        variables.erase("[tag]");
                string outputFileName = getOutputFileName("design", variables);
         ofstream designFile;
         m->openOutputFile(outputFileName, designFile);
@@ -416,7 +672,7 @@ vector<double> GetMetaCommunityCommand::generateDesignFile(int numPartitions, st
 inline bool summaryFunction(summaryData i, summaryData j){ return i.difference > j.difference;   }
 
 /**************************************************************************************************/
-int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string reference, string partFile, string designInput, string label){
+int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, map<string,string> v){
     try {
         vector<summaryData> summary;
         
@@ -428,10 +684,17 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer
         double mean, lci, uci;
         
         
-        vector<double> piValues = generateDesignFile(numPartitions, designInput);
+        vector<double> piValues = generateDesignFile(numPartitions, v);
         
         ifstream referenceFile;
+        map<string, string> variables;
+        variables["[filename]"] = v["[filename]"];
+        variables["[distance]"] = v["[distance]"];
+        variables["[tag]"] = "1";
+        string reference = getOutputFileName("relabund", variables);
         m->openInputFile(reference, referenceFile); //((fileRoot + label + ".1mix.relabund").c_str());
+        variables["[tag]"] = toString(numPartitions);
+        string partFile = getOutputFileName("relabund", variables);
         ifstream partitionFile;
         m->openInputFile(partFile, partitionFile); //((fileRoot + toString(numPartitions) + "mix.relabund").c_str());
         
@@ -486,9 +749,7 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer
         
         sort(summary.begin(), summary.end(), summaryFunction);
         
-        map<string, string> variables;
-        variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile));
-        variables["[distance]"] = label;
+        variables.erase("[tag]");
                string outputFileName = getOutputFileName("parameters", variables);
         outputNames.push_back(outputFileName); outputTypes["parameters"].push_back(outputFileName);
         
@@ -509,7 +770,7 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer
         if (m->control_pressed) { return 0; }
         
         string summaryFileName = getOutputFileName("summary", variables);
-        outputNames.push_back(outputFileName); outputTypes["summary"].push_back(outputFileName);
+        outputNames.push_back(summaryFileName); outputTypes["summary"].push_back(summaryFileName);
         
         ofstream summaryFile;
         m->openOutputFile(summaryFileName, summaryFile); //((fileRoot + "mix.summary").c_str());