]> git.donarmstrong.com Git - mothur.git/commitdiff
paralellized unifrac.weighted for windows. added get.metacommunity command. fixed...
authorSarahsWork <sarahswork@imac.westcotts.net>
Fri, 19 Apr 2013 13:37:01 +0000 (09:37 -0400)
committerSarahsWork <sarahswork@imac.westcotts.net>
Fri, 19 Apr 2013 13:37:01 +0000 (09:37 -0400)
getmetacommunitycommand.cpp
getmetacommunitycommand.h
listseqscommand.cpp
mothurout.cpp
seqsummarycommand.cpp
sequence.cpp
unifracweightedcommand.cpp
unifracweightedcommand.h
weighted.cpp
weighted.h

index 59efe608e624ebc4ab9e71ce77aba23554bd40cb..ad069d00745fd3627015c6166cd4101047f8284c 100644 (file)
@@ -7,7 +7,7 @@
 //
 
 #include "getmetacommunitycommand.h"
 //
 
 #include "getmetacommunitycommand.h"
-#include "qFinderDMM.h"
+
 
 //**********************************************************************************************************************
 vector<string> GetMetaCommunityCommand::setParameters(){
 
 //**********************************************************************************************************************
 vector<string> GetMetaCommunityCommand::setParameters(){
@@ -16,8 +16,9 @@ vector<string> GetMetaCommunityCommand::setParameters(){
         CommandParameter pgroups("groups", "String", "", "", "", "", "","",false,false); parameters.push_back(pgroups);
                CommandParameter plabel("label", "String", "", "", "", "", "","",false,false); parameters.push_back(plabel);
         CommandParameter pminpartitions("minpartitions", "Number", "", "5", "", "", "","",false,false,true); parameters.push_back(pminpartitions);
         CommandParameter pgroups("groups", "String", "", "", "", "", "","",false,false); parameters.push_back(pgroups);
                CommandParameter plabel("label", "String", "", "", "", "", "","",false,false); parameters.push_back(plabel);
         CommandParameter pminpartitions("minpartitions", "Number", "", "5", "", "", "","",false,false,true); parameters.push_back(pminpartitions);
-        CommandParameter pmaxpartitions("maxpartitions", "Number", "", "10", "", "", "","",false,false,true); parameters.push_back(pmaxpartitions);
+        CommandParameter pmaxpartitions("maxpartitions", "Number", "", "100", "", "", "","",false,false,true); parameters.push_back(pmaxpartitions);
         CommandParameter poptimizegap("optimizegap", "Number", "", "3", "", "", "","",false,false,true); parameters.push_back(poptimizegap);
         CommandParameter poptimizegap("optimizegap", "Number", "", "3", "", "", "","",false,false,true); parameters.push_back(poptimizegap);
+        CommandParameter pprocessors("processors", "Number", "", "1", "", "", "","",false,false,true); parameters.push_back(pprocessors);
                CommandParameter pinputdir("inputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(pinputdir);
                CommandParameter poutputdir("outputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(poutputdir);
                
                CommandParameter pinputdir("inputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(pinputdir);
                CommandParameter poutputdir("outputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(poutputdir);
                
@@ -34,12 +35,13 @@ vector<string> GetMetaCommunityCommand::setParameters(){
 string GetMetaCommunityCommand::getHelpString(){
        try {
                string helpString = "";
 string GetMetaCommunityCommand::getHelpString(){
        try {
                string helpString = "";
-               helpString += "The get.metacommunity command parameters are shared, label, groups, minpartitions, maxpartitions and optimizegap. The shared file is required. \n";
+               helpString += "The get.metacommunity command parameters are shared, label, groups, minpartitions, maxpartitions, optimizegap and processors. The shared file is required. \n";
         helpString += "The label parameter is used to analyze specific labels in your input. labels are separated by dashes.\n";
                helpString += "The groups parameter allows you to specify which of the groups in your shared file you would like analyzed.  Group names are separated by dashes.\n";
                helpString += "The minpartitions parameter is used to .... Default=5.\n";
         helpString += "The maxpartitions parameter is used to .... Default=10.\n";
         helpString += "The optimizegap parameter is used to .... Default=3.\n";
         helpString += "The label parameter is used to analyze specific labels in your input. labels are separated by dashes.\n";
                helpString += "The groups parameter allows you to specify which of the groups in your shared file you would like analyzed.  Group names are separated by dashes.\n";
                helpString += "The minpartitions parameter is used to .... Default=5.\n";
         helpString += "The maxpartitions parameter is used to .... Default=10.\n";
         helpString += "The optimizegap parameter is used to .... Default=3.\n";
+        helpString += "The processors parameter allows you to specify number of processors to use.  The default is 1.\n";
                helpString += "The get.metacommunity command should be in the following format: get.metacommunity(shared=yourSharedFile).\n";
                return helpString;
        }
                helpString += "The get.metacommunity command should be in the following format: get.metacommunity(shared=yourSharedFile).\n";
                return helpString;
        }
@@ -55,7 +57,7 @@ string GetMetaCommunityCommand::getOutputPattern(string type) {
         
         if (type == "fit")              {  pattern = "[filename],[distance],mix.fit"; }
         else if (type == "relabund")    {  pattern = "[filename],[distance],[tag],mix.relabund"; }
         
         if (type == "fit")              {  pattern = "[filename],[distance],mix.fit"; }
         else if (type == "relabund")    {  pattern = "[filename],[distance],[tag],mix.relabund"; }
-        else if (type == "design")      {  pattern = "[filename],mix.design"; }
+        else if (type == "design")      {  pattern = "[filename],[distance],mix.design"; }
         else if (type == "matrix")      {  pattern = "[filename],[distance],[tag],mix.posterior"; }
         else if (type == "parameters")  {  pattern = "[filename],[distance],mix.parameters"; }
         else if (type == "summary")  {  pattern = "[filename],[distance],mix.summary"; }
         else if (type == "matrix")      {  pattern = "[filename],[distance],[tag],mix.posterior"; }
         else if (type == "parameters")  {  pattern = "[filename],[distance],mix.parameters"; }
         else if (type == "summary")  {  pattern = "[filename],[distance],mix.summary"; }
@@ -154,6 +156,10 @@ GetMetaCommunityCommand::GetMetaCommunityCommand(string option)  {
             temp = validParameter.validFile(parameters, "optimizegap", false);          if (temp == "not found"){      temp = "3";      }
                        m->mothurConvert(temp, optimizegap);
             
             temp = validParameter.validFile(parameters, "optimizegap", false);          if (temp == "not found"){      temp = "3";      }
                        m->mothurConvert(temp, optimizegap);
             
+            temp = validParameter.validFile(parameters, "processors", false);  if (temp == "not found"){       temp = m->getProcessors();      }
+                       m->setProcessors(temp);
+                       m->mothurConvert(temp, processors);
+            
             string groups = validParameter.validFile(parameters, "groups", false);
                        if (groups == "not found") { groups = ""; }
                        else { m->splitAtDash(groups, Groups); }
             string groups = validParameter.validFile(parameters, "groups", false);
                        if (groups == "not found") { groups = ""; }
                        else { m->splitAtDash(groups, Groups); }
@@ -197,7 +203,7 @@ int GetMetaCommunityCommand::execute(){
                 
                 m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
                 
                 
                 m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
                 
-                process(lookup);
+                createProcesses(lookup);
                 
                 processedLabels.insert(lookup[0]->getLabel());
                 userLabels.erase(lookup[0]->getLabel());
                 
                 processedLabels.insert(lookup[0]->getLabel());
                 userLabels.erase(lookup[0]->getLabel());
@@ -210,7 +216,7 @@ int GetMetaCommunityCommand::execute(){
                 lookup = input.getSharedRAbundVectors(lastLabel);
                 m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
                 
                 lookup = input.getSharedRAbundVectors(lastLabel);
                 m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
                 
-                process(lookup);
+                createProcesses(lookup);
                 
                 processedLabels.insert(lookup[0]->getLabel());
                 userLabels.erase(lookup[0]->getLabel());
                 
                 processedLabels.insert(lookup[0]->getLabel());
                 userLabels.erase(lookup[0]->getLabel());
@@ -251,7 +257,7 @@ int GetMetaCommunityCommand::execute(){
             
             m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
             
             
             m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine();
             
-            process(lookup);
+            createProcesses(lookup);
             
             for (int i = 0; i < lookup.size(); i++) {  delete lookup[i];  }
         }
             
             for (int i = 0; i < lookup.size(); i++) {  delete lookup[i];  }
         }
@@ -270,17 +276,245 @@ int GetMetaCommunityCommand::execute(){
        }
 }
 //**********************************************************************************************************************
        }
 }
 //**********************************************************************************************************************
-int GetMetaCommunityCommand::process(vector<SharedRAbundVector*>& thislookup){
+int GetMetaCommunityCommand::createProcesses(vector<SharedRAbundVector*>& thislookup){
        try {
         
        try {
         
-        double minLaplace = 1e10;
+        #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+        #else 
+        processors=1; //qFinderDMM not thread safe
+        #endif
+        
+        vector<int> processIDS;
+               int process = 1;
+               int num = 0;
         int minPartition = 0;
         int minPartition = 0;
+               
+               //sanity check
+               if (maxpartitions < processors) { processors = maxpartitions; }
         
         map<string, string> variables;
         variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile));
         variables["[distance]"] = thislookup[0]->getLabel();
                string outputFileName = getOutputFileName("fit", variables);
         outputNames.push_back(outputFileName); outputTypes["fit"].push_back(outputFileName);
         
         map<string, string> variables;
         variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile));
         variables["[distance]"] = thislookup[0]->getLabel();
                string outputFileName = getOutputFileName("fit", variables);
         outputNames.push_back(outputFileName); outputTypes["fit"].push_back(outputFileName);
+                
+               //divide the partitions between the processors
+               vector< vector<int> > dividedPartitions;
+        vector< vector<string> > rels, matrix;
+        vector<string> doneFlags;
+        dividedPartitions.resize(processors);
+        rels.resize(processors);
+        matrix.resize(processors);
+        
+        //for each file group figure out which process will complete it
+        //want to divide the load intelligently so the big files are spread between processes
+        for (int i=1; i<=maxpartitions; i++) {
+            //cout << i << endl;
+            int processToAssign = (i+1) % processors;
+            if (processToAssign == 0) { processToAssign = processors; }
+            
+            if (m->debug) { m->mothurOut("[DEBUG]: assigning " + toString(i) + " to process " + toString(processToAssign-1) + "\n"); }
+            dividedPartitions[(processToAssign-1)].push_back(i);
+            
+            variables["[tag]"] = toString(i);
+            string relName = getOutputFileName("relabund", variables);
+            string mName = getOutputFileName("matrix", variables);
+            rels[(processToAssign-1)].push_back(relName);
+            matrix[(processToAssign-1)].push_back(mName);
+        }
+        
+        for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours
+            string tempDoneFile = toString(i) + ".done.temp";
+            doneFlags.push_back(tempDoneFile);
+            ofstream out;
+            m->openOutputFile(tempDoneFile, out); //clear out 
+            out.close();
+        }
+        
+
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+               
+               //loop through and create all the processes you want
+               while (process != processors) {
+                       int pid = fork();
+                       
+                       if (pid > 0) {
+                               processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
+                               process++;
+                       }else if (pid == 0){
+                outputNames.clear();
+                               num = processDriver(thislookup, dividedPartitions[process], (outputFileName + toString(getpid())), rels[process], matrix[process], doneFlags, process);
+                
+                //pass numSeqs to parent
+                               ofstream out;
+                               string tempFile = toString(getpid()) + ".outputNames.temp";
+                               m->openOutputFile(tempFile, out);
+                out << num << endl;
+                out << outputNames.size() << endl;
+                               for (int i = 0; i < outputNames.size(); i++) { out << outputNames[i] << endl; }
+                               out.close();
+                
+                               exit(0);
+                       }else {
+                               m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+                               for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+                               exit(0);
+                       }
+               }
+               
+               //do my part
+               minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0);
+               
+               //force parent to wait until all the processes are done
+               for (int i=0;i<processIDS.size();i++) {
+                       int temp = processIDS[i];
+                       wait(&temp);
+               }
+        
+        vector<string> tempOutputNames = outputNames;
+        for (int i=0;i<processIDS.size();i++) {
+            ifstream in;
+                       string tempFile = toString(processIDS[i]) + ".outputNames.temp";
+                       m->openInputFile(tempFile, in);
+                       if (!in.eof()) {
+                int tempNum = 0;
+                in >> tempNum; m->gobble(in);
+                if (tempNum < minPartition) { minPartition = tempNum; }
+                in >> tempNum; m->gobble(in);
+                for (int i = 0; i < tempNum; i++) {
+                    string tempName = "";
+                    in >> tempName; m->gobble(in);
+                    tempOutputNames.push_back(tempName);
+                }
+            }
+                       in.close(); m->mothurRemove(tempFile);
+            
+            m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName);
+            m->mothurRemove(outputFileName + toString(processIDS[i]));
+        }
+        
+        if (processors > 1) { 
+            outputNames.clear();
+            for (int i = 0; i < tempOutputNames.size(); i++) { //remove files if needed
+                string name = tempOutputNames[i];
+                vector<string> parts;
+                m->splitAtChar(name, parts, '.');
+                bool keep = true;
+                if (((parts[parts.size()-1] == "relabund") || (parts[parts.size()-1] == "posterior")) && (parts[parts.size()-2] == "mix")) {
+                    string tempNum = parts[parts.size()-3];
+                    int num;  m->mothurConvert(tempNum, num);
+                    //if (num > minPartition) {
+                     //   m->mothurRemove(tempOutputNames[i]);
+                    //    keep = false; if (m->debug) { m->mothurOut("[DEBUG]: removing " + tempOutputNames[i] + ".\n"); }
+                    //}
+                }
+                if (keep) { outputNames.push_back(tempOutputNames[i]); }
+            }
+            
+            //reorder fit file
+            ifstream in;
+            m->openInputFile(outputFileName, in);
+            string headers = m->getline(in); m->gobble(in);
+            
+            map<int, string> file;
+            while (!in.eof()) {
+                string numString, line;
+                int num;
+                in >> numString; line = m->getline(in); m->gobble(in);
+                m->mothurConvert(numString, num);
+                file[num] = line;
+            }
+            in.close();
+            ofstream out;
+            m->openOutputFile(outputFileName, out);
+            out << headers << endl;
+            for (map<int, string>::iterator it = file.begin(); it != file.end(); it++) {
+                out << it->first << '\t' << it->second << endl;
+                if (m->debug) { m->mothurOut("[DEBUG]: printing: " + toString(it->first) + '\t' + it->second + ".\n"); }
+            }
+            out.close();
+        }
+        
+#else
+        /*
+        vector<metaCommunityData*> pDataArray;
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1];
+        
+               //Create processor worker threads.
+               for( int i=1; i<processors; i++ ){
+            //copy lookup
+            //make copy of lookup so we don't get access violations
+            vector<SharedRAbundVector*> newLookup;
+            for (int k = 0; k < thislookup.size(); k++) {
+                SharedRAbundVector* temp = new SharedRAbundVector();
+                temp->setLabel(thislookup[k]->getLabel());
+                temp->setGroup(thislookup[k]->getGroup());
+                newLookup.push_back(temp);
+            }
+            
+            //for each bin
+            for (int k = 0; k < thislookup[0]->getNumBins(); k++) {
+                if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) {  delete newLookup[j];  } return 0; }
+                for (int j = 0; j < thislookup.size(); j++) { newLookup[j]->push_back(thislookup[j]->getAbundance(k), thislookup[j]->getGroup()); }
+            }
+
+            processIDS.push_back(i);
+            
+                       // Allocate memory for thread data.
+                       metaCommunityData* tempMeta = new metaCommunityData(newLookup, m, dividedPartitions[i], outputFileName + toString(i), rels[i], matrix[i], minpartitions, maxpartitions, optimizegap);
+                       pDataArray.push_back(tempMeta);
+                       
+                       hThreadArray[i] = CreateThread(NULL, 0, MyMetaCommunityThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
+               }
+               
+        //do my part
+               minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0]);
+        
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+            if (pDataArray[i]->minPartition < minPartition) { minPartition = pDataArray[i]->minPartition; }
+            for (int j = 0; j < pDataArray[i]->outputNames.size(); j++) {
+                outputNames.push_back(pDataArray[i]->outputNames[j]);
+            }
+            m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName);
+            m->mothurRemove(outputFileName + toString(processIDS[i]));
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               } */
+        //do my part
+               minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0);
+#endif
+        for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours
+            string tempDoneFile = toString(i) + ".done.temp";
+            m->mothurRemove(tempDoneFile);
+        }
+        
+        if (m->control_pressed) { return 0; }
+        
+        if (m->debug) { m->mothurOut("[DEBUG]: minPartition = " + toString(minPartition) + "\n"); }
+        
+        //run generate Summary function for smallest minPartition
+        variables["[tag]"] = toString(minPartition);
+        generateSummaryFile(minPartition, variables);
+        
+        return 0;
+
+    }
+       catch(exception& e) {
+               m->errorOut(e, "GetMetaCommunityCommand", "createProcesses");
+               exit(1);
+       }
+}
+//**********************************************************************************************************************
+int GetMetaCommunityCommand::processDriver(vector<SharedRAbundVector*>& thislookup, vector<int>& parts, string outputFileName, vector<string> relabunds, vector<string> matrix, vector<string> doneFlags, int processID){
+       try {
+        
+        double minLaplace = 1e10;
+        int minPartition = 0;
         
                ofstream fitData;
                m->openOutputFile(outputFileName, fitData);
         
                ofstream fitData;
                m->openOutputFile(outputFileName, fitData);
@@ -295,10 +529,26 @@ int GetMetaCommunityCommand::process(vector<SharedRAbundVector*>& thislookup){
         m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n");
         fitData << "K\tNLE\tlogDet\tBIC\tAIC\tLaplace" << endl;
         
         m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n");
         fitData << "K\tNLE\tlogDet\tBIC\tAIC\tLaplace" << endl;
         
-        for(int numPartitions=1;numPartitions<=maxpartitions;numPartitions++){
+        for(int i=0;i<parts.size();i++){
+            
+            int numPartitions = parts[i];
+            
+            if (m->debug) { m->mothurOut("[DEBUG]: running partition " + toString(numPartitions) + "\n"); }
             
             if (m->control_pressed) { break; }
             
             
             if (m->control_pressed) { break; }
             
+            //check to see if anyone else is done
+            for (int j = 0; j < doneFlags.size(); j++) {
+                if (!m->isBlank(doneFlags[j])) { //another process has finished
+                    //are they done at a lower partition?
+                    ifstream in;
+                    m->openInputFile(doneFlags[j], in);
+                    int tempNum;
+                    in >> tempNum; in.close();
+                    if (tempNum < numPartitions) { break; } //quit, because someone else has finished
+                }
+            }
+            
             qFinderDMM findQ(sharedMatrix, numPartitions);
             
             double laplace = findQ.getLaplace();
             qFinderDMM findQ(sharedMatrix, numPartitions);
             
             double laplace = findQ.getLaplace();
@@ -319,43 +569,47 @@ int GetMetaCommunityCommand::process(vector<SharedRAbundVector*>& thislookup){
             }
             m->mothurOutEndLine();
             
             }
             m->mothurOutEndLine();
             
-            variables["[tag]"] = toString(numPartitions);
-            string relabund = getOutputFileName("relabund", variables);
+            string relabund = relabunds[i];
             outputNames.push_back(relabund); outputTypes["relabund"].push_back(relabund);
             outputNames.push_back(relabund); outputTypes["relabund"].push_back(relabund);
-            string matrix = getOutputFileName("matrix", variables);
-            outputNames.push_back(matrix); outputTypes["matrix"].push_back(matrix);
+            string matrixName = matrix[i];
+            outputNames.push_back(matrixName); outputTypes["matrix"].push_back(matrixName);
             
             
-            findQ.printZMatrix(matrix, m->getGroups());
+            findQ.printZMatrix(matrixName, m->getGroups());
             findQ.printRelAbund(relabund, m->currentBinLabels);
             
             findQ.printRelAbund(relabund, m->currentBinLabels);
             
-            if(optimizegap != -1 && (numPartitions - minPartition) >= optimizegap && numPartitions >= minpartitions){ break;  }
+            if(optimizegap != -1 && (numPartitions - minPartition) >= optimizegap && numPartitions >= minpartitions){
+                string tempDoneFile = toString(processID) + ".done.temp";
+                ofstream outDone;
+                m->openOutputFile(tempDoneFile, outDone);
+                outDone << minPartition << endl;
+                outDone.close();
+                break;
+            }
         }
         fitData.close();
         
         //minPartition = 4;
         
         if (m->control_pressed) { return 0; }
         }
         fitData.close();
         
         //minPartition = 4;
         
         if (m->control_pressed) { return 0; }
-        
-        generateSummaryFile(minpartitions, outputTypes["relabund"][0], outputTypes["relabund"][outputTypes["relabund"].size()-1], outputTypes["matrix"][outputTypes["matrix"].size()-1], thislookup[0]->getLabel());
 
 
-        
-        return 0;
+        return minPartition;
     }
        catch(exception& e) {
     }
        catch(exception& e) {
-               m->errorOut(e, "GetMetaCommunityCommand", "process");
+               m->errorOut(e, "GetMetaCommunityCommand", "processDriver");
                exit(1);
        }
 }
 /**************************************************************************************************/
 
                exit(1);
        }
 }
 /**************************************************************************************************/
 
-vector<double> GetMetaCommunityCommand::generateDesignFile(int numPartitions, string input){
+vector<double> GetMetaCommunityCommand::generateDesignFile(int numPartitions, map<string,string> variables){
     try {
         vector<double> piValues(numPartitions, 0);
         
         ifstream postFile;
     try {
         vector<double> piValues(numPartitions, 0);
         
         ifstream postFile;
+        variables["[tag]"] = toString(numPartitions);
+        string input = getOutputFileName("matrix", variables);
         m->openInputFile(input, postFile);//((fileRoot + toString(numPartitions) + "mix.posterior").c_str()); //matrix file
         m->openInputFile(input, postFile);//((fileRoot + toString(numPartitions) + "mix.posterior").c_str()); //matrix file
-        map<string, string> variables;
-        variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(input));
+        variables.erase("[tag]");
                string outputFileName = getOutputFileName("design", variables);
         ofstream designFile;
         m->openOutputFile(outputFileName, designFile);
                string outputFileName = getOutputFileName("design", variables);
         ofstream designFile;
         m->openOutputFile(outputFileName, designFile);
@@ -416,7 +670,7 @@ vector<double> GetMetaCommunityCommand::generateDesignFile(int numPartitions, st
 inline bool summaryFunction(summaryData i, summaryData j){ return i.difference > j.difference;   }
 
 /**************************************************************************************************/
 inline bool summaryFunction(summaryData i, summaryData j){ return i.difference > j.difference;   }
 
 /**************************************************************************************************/
-int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string reference, string partFile, string designInput, string label){
+int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, map<string,string> v){
     try {
         vector<summaryData> summary;
         
     try {
         vector<summaryData> summary;
         
@@ -428,10 +682,17 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer
         double mean, lci, uci;
         
         
         double mean, lci, uci;
         
         
-        vector<double> piValues = generateDesignFile(numPartitions, designInput);
+        vector<double> piValues = generateDesignFile(numPartitions, v);
         
         ifstream referenceFile;
         
         ifstream referenceFile;
+        map<string, string> variables;
+        variables["[filename]"] = v["[filename]"];
+        variables["[distance]"] = v["[distance]"];
+        variables["[tag]"] = "1";
+        string reference = getOutputFileName("relabund", variables);
         m->openInputFile(reference, referenceFile); //((fileRoot + label + ".1mix.relabund").c_str());
         m->openInputFile(reference, referenceFile); //((fileRoot + label + ".1mix.relabund").c_str());
+        variables["[tag]"] = toString(numPartitions);
+        string partFile = getOutputFileName("relabund", variables);
         ifstream partitionFile;
         m->openInputFile(partFile, partitionFile); //((fileRoot + toString(numPartitions) + "mix.relabund").c_str());
         
         ifstream partitionFile;
         m->openInputFile(partFile, partitionFile); //((fileRoot + toString(numPartitions) + "mix.relabund").c_str());
         
@@ -486,9 +747,7 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer
         
         sort(summary.begin(), summary.end(), summaryFunction);
         
         
         sort(summary.begin(), summary.end(), summaryFunction);
         
-        map<string, string> variables;
-        variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile));
-        variables["[distance]"] = label;
+        variables.erase("[tag]");
                string outputFileName = getOutputFileName("parameters", variables);
         outputNames.push_back(outputFileName); outputTypes["parameters"].push_back(outputFileName);
         
                string outputFileName = getOutputFileName("parameters", variables);
         outputNames.push_back(outputFileName); outputTypes["parameters"].push_back(outputFileName);
         
@@ -509,7 +768,7 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer
         if (m->control_pressed) { return 0; }
         
         string summaryFileName = getOutputFileName("summary", variables);
         if (m->control_pressed) { return 0; }
         
         string summaryFileName = getOutputFileName("summary", variables);
-        outputNames.push_back(outputFileName); outputTypes["summary"].push_back(outputFileName);
+        outputNames.push_back(summaryFileName); outputTypes["summary"].push_back(summaryFileName);
         
         ofstream summaryFile;
         m->openOutputFile(summaryFileName, summaryFile); //((fileRoot + "mix.summary").c_str());
         
         ofstream summaryFile;
         m->openOutputFile(summaryFileName, summaryFile); //((fileRoot + "mix.summary").c_str());
index 62070d6191ca75a09b6980f6cd5aabe73af89023..2264c87b9af87c5f3363f5774116ff95bb691b8f 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "command.hpp"
 #include "inputdata.h"
 
 #include "command.hpp"
 #include "inputdata.h"
+#include "qFinderDMM.h"
 
 /**************************************************************************************************/
 
 
 /**************************************************************************************************/
 
@@ -34,17 +35,23 @@ public:
     void help() { m->mothurOut(getHelpString()); }
     
 private:
     void help() { m->mothurOut(getHelpString()); }
     
 private:
+    struct linePair {
+               unsigned long long start;
+               unsigned long long end;
+               linePair(unsigned long long i, unsigned long long j) : start(i), end(j) {}
+       };
     bool abort, allLines;
     string outputDir;
     vector<string> outputNames;
     string sharedfile;
     bool abort, allLines;
     string outputDir;
     vector<string> outputNames;
     string sharedfile;
-    int minpartitions, maxpartitions, optimizegap;
+    int minpartitions, maxpartitions, optimizegap, processors;
     vector<string> Groups;
     set<string> labels;
     
     vector<string> Groups;
     set<string> labels;
     
-    int process(vector<SharedRAbundVector*>&);
-    vector<double> generateDesignFile(int, string);
-    int generateSummaryFile(int, string, string, string, string);
+    int processDriver(vector<SharedRAbundVector*>&, vector<int>&, string, vector<string>, vector<string>, vector<string>, int);
+    int createProcesses(vector<SharedRAbundVector*>&);
+    vector<double> generateDesignFile(int, map<string,string>);
+    int generateSummaryFile(int, map<string,string>);
 
 };
 
 
 };
 
@@ -58,6 +65,108 @@ struct summaryData {
 };
 /**************************************************************************************************/
 
 };
 /**************************************************************************************************/
 
+struct metaCommunityData {
+    vector<SharedRAbundVector*> thislookup;
+       MothurOut* m;
+       string outputFileName;
+    vector<string> relabunds, matrix, outputNames;
+    int minpartitions, maxpartitions, optimizegap;
+    vector<int> parts;
+    int minPartition;
+       
+       metaCommunityData(){}
+       metaCommunityData(vector<SharedRAbundVector*> lu, MothurOut* mout, vector<int> dp, string fit, vector<string> rels, vector<string> mat, int minp, int maxp, int opg) {
+               m = mout;
+        thislookup = lu;
+        parts = dp;
+        outputFileName = fit;
+        relabunds = rels;
+        matrix = mat;
+        minpartitions = minp;
+        maxpartitions = maxp;
+        optimizegap = opg;
+        minPartition = 0;
+       }
+};
+/**************************************************************************************************/
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+#else
+static DWORD WINAPI MyMetaCommunityThreadFunction(LPVOID lpParam){
+       metaCommunityData* pDataArray;
+       pDataArray = (metaCommunityData*)lpParam;
+       
+       try {
+        
+        double minLaplace = 1e10;
+        
+               ofstream fitData;
+               pDataArray->m->openOutputFile(pDataArray->outputFileName, fitData);
+        fitData.setf(ios::fixed, ios::floatfield);
+        fitData.setf(ios::showpoint);
+        cout.setf(ios::fixed, ios::floatfield);
+        cout.setf(ios::showpoint);
+        
+        vector< vector<int> > sharedMatrix;
+        for (int i = 0; i < pDataArray->thislookup.size(); i++) { sharedMatrix.push_back(pDataArray->thislookup[i]->getAbundances()); }
+        
+        pDataArray->m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n");
+        fitData << "K\tNLE\tlogDet\tBIC\tAIC\tLaplace" << endl;
+        
+        for(int i=0;i<pDataArray->parts.size();i++){
+            
+            int numPartitions = pDataArray->parts[i];
+            
+            if (pDataArray->m->debug) { pDataArray->m->mothurOut("[DEBUG]: running partition " + toString(numPartitions) + "\n"); }
+            
+            if (pDataArray->m->control_pressed) { break; }
+            
+            qFinderDMM* findQ = new qFinderDMM(sharedMatrix, numPartitions);
+            
+            if (pDataArray->m->debug) { pDataArray->m->mothurOut("[DEBUG]: done finding Q " + toString(numPartitions) + "\n"); }
+            
+            double laplace = findQ->getLaplace();
+            pDataArray->m->mothurOut(toString(numPartitions) + '\t');
+            cout << setprecision (2) << findQ->getNLL() << '\t' << findQ->getLogDet() << '\t';
+            pDataArray->m->mothurOutJustToLog(toString(findQ->getNLL()) + '\t' + toString(findQ->getLogDet()) + '\t');
+            cout << findQ->getBIC() << '\t' << findQ->getAIC() << '\t' << laplace;
+            pDataArray->m->mothurOutJustToLog(toString(findQ->getBIC()) + '\t' + toString(findQ->getAIC()) + '\t' + toString(laplace));
+            
+            fitData << numPartitions << '\t';
+            fitData << setprecision (2) << findQ->getNLL() << '\t' << findQ->getLogDet() << '\t';
+            fitData << findQ->getBIC() << '\t' << findQ->getAIC() << '\t' << laplace << endl;
+            
+            if(laplace < minLaplace){
+                pDataArray->minPartition = numPartitions;
+                minLaplace = laplace;
+                pDataArray->m->mothurOut("***");
+            }
+            pDataArray->m->mothurOutEndLine();
+            
+            pDataArray->outputNames.push_back(pDataArray->relabunds[i]);
+            pDataArray->outputNames.push_back(pDataArray->matrix[i]);
+            
+            findQ->printZMatrix(pDataArray->matrix[i], pDataArray->m->getGroups());
+            findQ->printRelAbund(pDataArray->relabunds[i], pDataArray->m->currentBinLabels);
+            
+            if(pDataArray->optimizegap != -1 && (numPartitions - pDataArray->minPartition) >= pDataArray->optimizegap && numPartitions >= pDataArray->minpartitions){ break; }
+            
+            delete findQ;
+        }
+        fitData.close();
+        
+        //minPartition = 4;
+        
+        if (pDataArray->m->control_pressed) { return 0; }
+        
+        return 0;
+               
+       }
+       catch(exception& e) {
+               pDataArray->m->errorOut(e, "GetMetaCommunityCommand", "MyMetaCommunityThreadFunction");
+               exit(1);
+       }
+}
+#endif
 
 
 
 
 
 
index 8a1effeb7deaf02f6931633f1abfc41f355a9957..d10dfb4935c390d0353976eeb5c182340519ebfb 100644 (file)
@@ -297,7 +297,7 @@ int ListSeqsCommand::readFasta(){
                //ofstream out;
                //string newFastaName = outputDir + m->getRootName(m->getSimpleName(fastafile)) + "numsAdded.fasta";
                //m->openOutputFile(newFastaName, out);
                //ofstream out;
                //string newFastaName = outputDir + m->getRootName(m->getSimpleName(fastafile)) + "numsAdded.fasta";
                //m->openOutputFile(newFastaName, out);
-               //int count = 1;
+               int count = 1;
                //string lastName = "";
                
                while(!in.eof()){
                //string lastName = "";
                
                while(!in.eof()){
@@ -310,7 +310,7 @@ int ListSeqsCommand::readFasta(){
                        if (name != "") {  names.push_back(name);  }
                        
                        m->gobble(in);
                        if (name != "") {  names.push_back(name);  }
                        
                        m->gobble(in);
-                       //count++;
+                       if (m->debug) { count++; cout << "[DEBUG]: count = " + toString(count) + ", name = " + currSeq.getName() + "\n"; }
                }
                in.close();     
                //out.close();
                }
                in.close();     
                //out.close();
index 8892febb384cbb07233858f2c6b608f8153ca09f..0efa551565c6e18001cd6ed53e0643a50f525f37 100644 (file)
@@ -1296,15 +1296,15 @@ vector<unsigned long long> MothurOut::setFilePosFasta(string filename, int& num)
                                char c = inFASTA.get(); count++;
                                if (c == '>') {
                                        positions.push_back(count-1);
                                char c = inFASTA.get(); count++;
                                if (c == '>') {
                                        positions.push_back(count-1);
-                                       //cout << count << endl;
+                                       if (debug) { mothurOut("[DEBUG]: numSeqs = " + toString(positions.size()) +  " count = " + toString(count) + ".\n"); }
                                }
                        }
                        inFASTA.close();
                
                        num = positions.size();
                                }
                        }
                        inFASTA.close();
                
                        num = positions.size();
-               
-                       /*FILE * pFile;
-                       long size;
+            if (debug) { mothurOut("[DEBUG]: num = " + toString(num) + ".\n"); }
+                       FILE * pFile;
+                       unsigned long long size;
                
                        //get num bytes in file
                        pFile = fopen (filename.c_str(),"rb");
                
                        //get num bytes in file
                        pFile = fopen (filename.c_str(),"rb");
@@ -1313,9 +1313,9 @@ vector<unsigned long long> MothurOut::setFilePosFasta(string filename, int& num)
                                fseek (pFile, 0, SEEK_END);
                                size=ftell (pFile);
                                fclose (pFile);
                                fseek (pFile, 0, SEEK_END);
                                size=ftell (pFile);
                                fclose (pFile);
-                       }*/
+                       }
                        
                        
-                       unsigned long long size = positions[(positions.size()-1)];
+                       /*unsigned long long size = positions[(positions.size()-1)];
                        ifstream in;
                        openInputFile(filename, in);
                        
                        ifstream in;
                        openInputFile(filename, in);
                        
@@ -1325,8 +1325,10 @@ vector<unsigned long long> MothurOut::setFilePosFasta(string filename, int& num)
                                if(in.eof())            {       break;  }
                                else                            {       size++; }
                        }
                                if(in.eof())            {       break;  }
                                else                            {       size++; }
                        }
-                       in.close();
-               
+                       in.close();*/
+        
+            if (debug) { mothurOut("[DEBUG]: size = " + toString(size) + ".\n"); }
+        
                        positions.push_back(size);
                        positions[0] = 0;
                
                        positions.push_back(size);
                        positions[0] = 0;
                
index e9002bd7c12781eb6fd2fdb284da2ea06e65c4fa..90a66f8f633212cf69a2930914a1911c5bca1cb1 100644 (file)
@@ -452,11 +452,15 @@ int SeqSummaryCommand::driverCreateSummary(vector<int>& startPosition, vector<in
                while (!done) {
                                
                        if (m->control_pressed) { in.close(); outSummary.close(); return 1; }
                while (!done) {
                                
                        if (m->control_pressed) { in.close(); outSummary.close(); return 1; }
-                                       
+            
+            if (m->debug) { m->mothurOut("[DEBUG]: count = " + toString(count) + "\n");  }
+            
                        Sequence current(in); m->gobble(in);
            
                        if (current.getName() != "") {
                                
                        Sequence current(in); m->gobble(in);
            
                        if (current.getName() != "") {
                                
+                if (m->debug) { m->mothurOut("[DEBUG]: " + current.getName() + '\t' + toString(current.getNumBases()) + "\n");  }
+                
                                int num = 1;
                                if ((namefile != "") || (countfile != "")) {
                                        //make sure this sequence is in the namefile, else error 
                                int num = 1;
                                if ((namefile != "") || (countfile != "")) {
                                        //make sure this sequence is in the namefile, else error 
@@ -480,6 +484,8 @@ int SeqSummaryCommand::driverCreateSummary(vector<int>& startPosition, vector<in
                                outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t';
                                outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t';
                                outSummary << current.getLongHomoPolymer() << '\t' << num << endl;
                                outSummary << current.getStartPos() << '\t' << current.getEndPos() << '\t';
                                outSummary << current.getNumBases() << '\t' << current.getAmbigBases() << '\t';
                                outSummary << current.getLongHomoPolymer() << '\t' << num << endl;
+                
+                if (m->debug) { m->mothurOut("[DEBUG]: " + current.getName() + '\t' + toString(current.getNumBases()) + "\n");  }
                        }
                        
                        #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
                        }
                        
                        #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
index d1cf7299a46c522255eb725dccb058673f6b1c78..ddc7d4c5a410ca99a415ee1e15d7c60b2199060f 100644 (file)
@@ -683,12 +683,12 @@ int Sequence::filterToPos(int start){
     
     if (start > aligned.length()) { start = aligned.length(); m->mothurOut("[ERROR]: start to large.\n"); }
     
     
     if (start > aligned.length()) { start = aligned.length(); m->mothurOut("[ERROR]: start to large.\n"); }
     
-       for(int j = 0; j < start-1; j++) {
+       for(int j = 0; j < start; j++) {
                aligned[j] = '.';
        }
        
     //things like ......----------AT become ................AT
                aligned[j] = '.';
        }
        
     //things like ......----------AT become ................AT
-    for(int j = start-1; j < aligned.length(); j++) {
+    for(int j = start; j < aligned.length(); j++) {
         if (isalpha(aligned[j])) { break; }
         else { aligned[j] = '.'; }
     }
         if (isalpha(aligned[j])) { break; }
         else { aligned[j] = '.'; }
     }
index 94ae125962c85c2cc0b7f100bf5178b89da89bdd..81ea326e67665cb006500ed8d04f273b15fca2b4 100644 (file)
@@ -698,39 +698,32 @@ int UnifracWeightedCommand::runRandomCalcs(Tree* thisTree, vector<double> usersS
         
         lines.clear();
         
         
         lines.clear();
         
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
-        if(processors != 1){
-            int numPairs = namesOfGroupCombos.size();
-            int numPairsPerProcessor = numPairs / processors;
+        //breakdown work between processors
+        int numPairs = namesOfGroupCombos.size();
+        int numPairsPerProcessor = numPairs / processors;
             
             
-            for (int i = 0; i < processors; i++) {
-                int startPos = i * numPairsPerProcessor;
-                if(i == processors - 1){
-                    numPairsPerProcessor = numPairs - i * numPairsPerProcessor;
-                }
-                lines.push_back(linePair(startPos, numPairsPerProcessor));
-            }
+        for (int i = 0; i < processors; i++) {
+            int startPos = i * numPairsPerProcessor;
+            if(i == processors - 1){ numPairsPerProcessor = numPairs - i * numPairsPerProcessor; }
+            lines.push_back(linePair(startPos, numPairsPerProcessor));
         }
         }
-#endif
         
         
         //get scores for random trees
         for (int j = 0; j < iters; j++) {
         
         
         //get scores for random trees
         for (int j = 0; j < iters; j++) {
-            cout << j << endl; 
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
-            if(processors == 1){
-                driver(thisTree,  namesOfGroupCombos, 0, namesOfGroupCombos.size(),  rScores);
-            }else{
+//#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+            //if(processors == 1){
+              //  driver(thisTree,  namesOfGroupCombos, 0, namesOfGroupCombos.size(),  rScores);
+           // }else{
                 createProcesses(thisTree,  namesOfGroupCombos, rScores);
                 createProcesses(thisTree,  namesOfGroupCombos, rScores);
-            }
-#else
-            driver(thisTree, namesOfGroupCombos, 0, namesOfGroupCombos.size(), rScores);
-#endif
+           // }
+//#else
+            //driver(thisTree, namesOfGroupCombos, 0, namesOfGroupCombos.size(), rScores);
+//#endif
             
             if (m->control_pressed) { delete ct;  for (int i = 0; i < T.size(); i++) { delete T[i]; } delete output; outSum.close(); for (int i = 0; i < outputNames.size(); i++) {    m->mothurRemove(outputNames[i]);  } return 0; }
             
             
             if (m->control_pressed) { delete ct;  for (int i = 0; i < T.size(); i++) { delete T[i]; } delete output; outSum.close(); for (int i = 0; i < outputNames.size(); i++) {    m->mothurRemove(outputNames[i]);  } return 0; }
             
-            //report progress
-            //                                 m->mothurOut("Iter: " + toString(j+1)); m->mothurOutEndLine();          
         }
         lines.clear();
         
         }
         lines.clear();
         
@@ -766,12 +759,11 @@ int UnifracWeightedCommand::runRandomCalcs(Tree* thisTree, vector<double> usersS
 
 int UnifracWeightedCommand::createProcesses(Tree* t, vector< vector<string> > namesOfGroupCombos, vector< vector<double> >& scores) {
        try {
 
 int UnifracWeightedCommand::createProcesses(Tree* t, vector< vector<string> > namesOfGroupCombos, vector< vector<double> >& scores) {
        try {
-#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
-               int process = 1;
+        int process = 1;
                vector<int> processIDS;
                vector<int> processIDS;
-               
                EstOutput results;
                EstOutput results;
-               
+        
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
                //loop through and create all the processes you want
                while (process != processors) {
                        int pid = fork();
                //loop through and create all the processes you want
                while (process != processors) {
                        int pid = fork();
@@ -817,9 +809,53 @@ int UnifracWeightedCommand::createProcesses(Tree* t, vector< vector<string> > na
                        in.close();
                        m->mothurRemove(s);
                }
                        in.close();
                        m->mothurRemove(s);
                }
+#else
+        //fill in functions
+        vector<weightedRandomData*> pDataArray;
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1];
+        vector<CountTable*> cts;
+        vector<Tree*> trees;
                
                
-               return 0;
-#endif         
+               //Create processor worker threads.
+               for( int i=1; i<processors; i++ ){
+            CountTable* copyCount = new CountTable();
+            copyCount->copy(ct);
+            Tree* copyTree = new Tree(copyCount);
+            copyTree->getCopy(t);
+            
+            cts.push_back(copyCount);
+            trees.push_back(copyTree);
+            
+            vector< vector<double> > copyScores = rScores;
+            
+            weightedRandomData* tempweighted = new weightedRandomData(m, lines[i].start, lines[i].num, namesOfGroupCombos, copyTree, copyCount, includeRoot, copyScores);
+                       pDataArray.push_back(tempweighted);
+                       processIDS.push_back(i);
+            
+                       hThreadArray[i-1] = CreateThread(NULL, 0, MyWeightedRandomThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+               }
+               
+               driver(t, namesOfGroupCombos, lines[0].start, lines[0].num, scores);
+               
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+               
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+            for (int j = pDataArray[i]->start; j < (pDataArray[i]->start+pDataArray[i]->num); j++) {
+                scores[j].push_back(pDataArray[i]->scores[j][pDataArray[i]->scores[j].size()-1]);
+            }
+                       delete cts[i];
+            delete trees[i];
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               }
+
+               
+#endif 
+        
+        return 0;
        }
        catch(exception& e) {
                m->errorOut(e, "UnifracWeightedCommand", "createProcesses");
        }
        catch(exception& e) {
                m->errorOut(e, "UnifracWeightedCommand", "createProcesses");
index 1c67c32f81afb2535943934ab82d66629bbc05b2..10cbc7b83f51d70a11b42ae761fe66864b8f7898 100644 (file)
@@ -80,6 +80,77 @@ class UnifracWeightedCommand : public Command {
                
 };
 
                
 };
 
+/***********************************************************************/
+struct weightedRandomData {
+    int start;
+       int num;
+       MothurOut* m;
+    vector< vector<double> > scores;
+    vector< vector<string> > namesOfGroupCombos;
+    Tree* t;
+    CountTable* ct;
+    bool includeRoot;
+       
+       weightedRandomData(){}
+       weightedRandomData(MothurOut* mout, int st, int en, vector< vector<string> > ngc, Tree* tree, CountTable* count, bool ir, vector< vector<double> > sc) {
+        m = mout;
+               start = st;
+               num = en;
+        namesOfGroupCombos = ngc;
+        t = tree;
+        ct = count;
+        includeRoot = ir;
+        scores = sc;
+       }
+};
+
+/**************************************************************************************************/
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+#else
+static DWORD WINAPI MyWeightedRandomThreadFunction(LPVOID lpParam){
+       weightedRandomData* pDataArray;
+       pDataArray = (weightedRandomData*)lpParam;
+       try {
+        
+        Tree* randT = new Tree(pDataArray->ct);
+        
+        Weighted weighted(pDataArray->includeRoot);
+        
+               for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) {
+            
+                       if (pDataArray->m->control_pressed) { return 0; }
+            
+                       //initialize weighted score
+                       string groupA = pDataArray->namesOfGroupCombos[h][0];
+                       string groupB = pDataArray->namesOfGroupCombos[h][1];
+                       
+                       //copy T[i]'s info.
+                       randT->getCopy(pDataArray->t);
+            
+                       //create a random tree with same topology as T[i], but different labels
+                       randT->assembleRandomUnifracTree(groupA, groupB);
+                       
+                       if (pDataArray->m->control_pressed) { delete randT;  return 0;  }
+            
+                       //get wscore of random tree
+                       EstOutput randomData = weighted.getValues(randT, groupA, groupB);
+            
+                       if (pDataArray->m->control_pressed) { delete randT;  return 0;  }
+            
+                       //save scores
+                       pDataArray->scores[h].push_back(randomData[0]);
+               }
+        
+               delete randT;
+        
+        return 0;
+    }
+       catch(exception& e) {
+               pDataArray->m->errorOut(e, "Weighted", "MyWeightedRandomThreadFunction");
+               exit(1);
+       }
+}
+#endif
 
 
 #endif
 
 
 #endif
index cf1291dfa91af84e4d821541b2a8f43a169d64f7..b0d06fb0078e0201975e7738d1321e7932230253 100644 (file)
@@ -36,30 +36,19 @@ EstOutput Weighted::getValues(Tree* t, int p, string o) {
                        }
                }
                
                        }
                }
                
-               #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
-                       if(processors == 1){
-                               data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), ct);
-                       }else{
-                               int numPairs = namesOfGroupCombos.size();
-                               
-                               int numPairsPerProcessor = numPairs / processors;
-                               
-                               for (int i = 0; i < processors; i++) {
-                                       int startPos = i * numPairsPerProcessor;
-                                       if(i == processors - 1){
-                                               numPairsPerProcessor = numPairs - i * numPairsPerProcessor;
-                                       }
-                                       lines.push_back(linePair(startPos, numPairsPerProcessor));
-                               }
+        int numPairs = namesOfGroupCombos.size();
+        int numPairsPerProcessor = numPairs / processors;
+        
+        for (int i = 0; i < processors; i++) {
+            int startPos = i * numPairsPerProcessor;
+            if(i == processors - 1){ numPairsPerProcessor = numPairs - i * numPairsPerProcessor; }
+            lines.push_back(linePair(startPos, numPairsPerProcessor));
+        }
+        
+        data = createProcesses(t, namesOfGroupCombos, ct);
+        
+        lines.clear();
 
 
-                               data = createProcesses(t, namesOfGroupCombos, ct);
-                               
-                               lines.clear();
-                       }
-               #else
-                       data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), ct);
-               #endif
-               
                return data;
        }
        catch(exception& e) {
                return data;
        }
        catch(exception& e) {
@@ -71,11 +60,10 @@ EstOutput Weighted::getValues(Tree* t, int p, string o) {
 
 EstOutput Weighted::createProcesses(Tree* t, vector< vector<string> > namesOfGroupCombos, CountTable* ct) {
        try {
 
 EstOutput Weighted::createProcesses(Tree* t, vector< vector<string> > namesOfGroupCombos, CountTable* ct) {
        try {
+        vector<int> processIDS;
+               EstOutput results;
 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
                int process = 1;
 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
                int process = 1;
-               vector<int> processIDS;
-               
-               EstOutput results;
                
                //loop through and create all the processes you want
                while (process != processors) {
                
                //loop through and create all the processes you want
                while (process != processors) {
@@ -85,17 +73,12 @@ EstOutput Weighted::createProcesses(Tree* t, vector< vector<string> > namesOfGro
                                processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
                                process++;
                        }else if (pid == 0){
                                processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
                                process++;
                        }else if (pid == 0){
-       
                                EstOutput Myresults;
                                Myresults = driver(t, namesOfGroupCombos, lines[process].start, lines[process].num, ct);
                        
                                EstOutput Myresults;
                                Myresults = driver(t, namesOfGroupCombos, lines[process].start, lines[process].num, ct);
                        
-                               //m->mothurOut("Merging results."); m->mothurOutEndLine();
-                               
                                //pass numSeqs to parent
                                ofstream out;
                                //pass numSeqs to parent
                                ofstream out;
-
                                string tempFile = outputDir + toString(getpid()) + ".weighted.results.temp";
                                string tempFile = outputDir + toString(getpid()) + ".weighted.results.temp";
-       
                                m->openOutputFile(tempFile, out);
        
                                out << Myresults.size() << endl;
                                m->openOutputFile(tempFile, out);
        
                                out << Myresults.size() << endl;
@@ -143,11 +126,48 @@ EstOutput Weighted::createProcesses(Tree* t, vector< vector<string> > namesOfGro
                        in.close();
                        m->mothurRemove(s);
                }
                        in.close();
                        m->mothurRemove(s);
                }
+#else
+        
+        //fill in functions
+        vector<weightedData*> pDataArray;
+               DWORD   dwThreadIdArray[processors-1];
+               HANDLE  hThreadArray[processors-1];
+        vector<CountTable*> cts;
+        vector<Tree*> trees;
+               
+               //Create processor worker threads.
+               for( int i=1; i<processors; i++ ){
+            CountTable* copyCount = new CountTable();
+            copyCount->copy(ct);
+            Tree* copyTree = new Tree(copyCount);
+            copyTree->getCopy(t);
+            
+            cts.push_back(copyCount);
+            trees.push_back(copyTree);
+            
+            weightedData* tempweighted = new weightedData(m, lines[i].start, lines[i].num, namesOfGroupCombos, copyTree, copyCount, includeRoot);
+                       pDataArray.push_back(tempweighted);
+                       processIDS.push_back(i);
+            
+                       hThreadArray[i-1] = CreateThread(NULL, 0, MyWeightedThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+               }
                
                
-               //m->mothurOut("DONE."); m->mothurOutEndLine(); m->mothurOutEndLine();
+               results = driver(t, namesOfGroupCombos, lines[0].start, lines[0].num, ct);
+               
+               //Wait until all threads have terminated.
+               WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
                
                
-               return results;
-#endif         
+               //Close all thread handles and free memory allocations.
+               for(int i=0; i < pDataArray.size(); i++){
+            for (int j = 0; j < pDataArray[i]->results.size(); j++) {  results.push_back(pDataArray[i]->results[j]);  }
+                       delete cts[i];
+            delete trees[i];
+                       CloseHandle(hThreadArray[i]);
+                       delete pDataArray[i];
+               } 
+#endif
+        
+        return results;
        }
        catch(exception& e) {
                m->errorOut(e, "Weighted", "createProcesses");
        }
        catch(exception& e) {
                m->errorOut(e, "Weighted", "createProcesses");
index d4082fe9c82a7aeeb41aeaad62f016d0ec588d4d..e97f01b31d0e99ae5b4316464ab7f9382bc53d47 100644 (file)
@@ -47,6 +47,288 @@ class Weighted : public TreeCalculator  {
 };
 
 /***********************************************************************/
 };
 
 /***********************************************************************/
+struct weightedData {
+    int start;
+       int num;
+       MothurOut* m;
+    EstOutput results;
+    vector< vector<string> > namesOfGroupCombos;
+    Tree* t;
+    CountTable* ct;
+    bool includeRoot;
+    
+       
+       weightedData(){}
+       weightedData(MothurOut* mout, int st, int en, vector< vector<string> > ngc, Tree* tree, CountTable* count, bool ir) {
+        m = mout;
+               start = st;
+               num = en;
+        namesOfGroupCombos = ngc;
+        t = tree;
+        ct = count;
+        includeRoot = ir;
+       }
+};
+
+/**************************************************************************************************/
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+#else
+static DWORD WINAPI MyWeightedThreadFunction(LPVOID lpParam){
+       weightedData* pDataArray;
+       pDataArray = (weightedData*)lpParam;
+       try {
+        map<string, int>::iterator it;
+               vector<double> D;
+               int count = 0;
+        map< vector<string>, set<int> > rootForGrouping;
+        map<string, double> WScore;
+        
+               for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) {
+            
+                       if (pDataArray->m->control_pressed) { return 0; }
+            
+                       //initialize weighted score
+                       string groupA = pDataArray->namesOfGroupCombos[h][0];
+                       string groupB = pDataArray->namesOfGroupCombos[h][1];
+                       
+                       set<int> validBranches;
+                       WScore[groupA+groupB] = 0.0;
+                       D.push_back(0.0000); //initialize a spot in D for each combination
+                       
+                       //adding the wieghted sums from group i
+                       for (int j = 0; j < pDataArray->t->groupNodeInfo[groupA].size(); j++) { //the leaf nodes that have seqs from group i
+                               map<string, int>::iterator it = pDataArray->t->tree[pDataArray->t->groupNodeInfo[groupA][j]].pcount.find(groupA);
+                               int numSeqsInGroupI = it->second;
+
+                               //double sum = getLengthToRoot(pDataArray->t, pDataArray->t->groupNodeInfo[groupA][j], groupA, groupB);
+                /*************************************************************************************/
+                double sum = 0.0;
+                int index = pDataArray->t->groupNodeInfo[groupA][j];
+                
+                //you are a leaf
+                if(pDataArray->t->tree[index].getBranchLength() != -1){        sum += abs(pDataArray->t->tree[index].getBranchLength());       }
+                double tempTotal = 0.0;
+                index = pDataArray->t->tree[index].getParent();
+                
+                vector<string> grouping; grouping.push_back(groupA); grouping.push_back(groupB);
+                
+                rootForGrouping[grouping].insert(index);
+                
+                //while you aren't at root
+                while(pDataArray->t->tree[index].getParent() != -1){
+                    
+                    if (pDataArray->m->control_pressed) {  return 0; }
+                    
+                    int parent = pDataArray->t->tree[index].getParent();
+                    
+                    if (pDataArray->includeRoot) { //add everyone
+                        if(pDataArray->t->tree[index].getBranchLength() != -1){        sum += abs(pDataArray->t->tree[index].getBranchLength());       }
+                    }else {
+                        
+                        //am I the root for this grouping? if so I want to stop "early"
+                        //does my sibling have descendants from the users groups?
+                        int lc = pDataArray->t->tree[parent].getLChild();
+                        int rc = pDataArray->t->tree[parent].getRChild();
+                        
+                        int sib = lc;
+                        if (lc == index) { sib = rc; }
+                        
+                        map<string, int>::iterator itGroup;
+                        int pcountSize = 0;
+                        itGroup = pDataArray->t->tree[sib].pcount.find(groupA);
+                        if (itGroup != pDataArray->t->tree[sib].pcount.end()) { pcountSize++;  }
+                        itGroup = pDataArray->t->tree[sib].pcount.find(groupB);
+                        if (itGroup != pDataArray->t->tree[sib].pcount.end()) { pcountSize++;  }
+                        
+                        //if yes, I am not the root so add me
+                        if (pcountSize != 0) {
+                            if (pDataArray->t->tree[index].getBranchLength() != -1) {
+                                sum += abs(pDataArray->t->tree[index].getBranchLength()) + tempTotal;
+                                tempTotal = 0.0;
+                            }else {
+                                sum += tempTotal;
+                                tempTotal = 0.0;
+                            }
+                            rootForGrouping[grouping].clear();
+                            rootForGrouping[grouping].insert(parent);
+                        }else { //if no, I may be the root so add my br to tempTotal until I am proven innocent
+                            if (pDataArray->t->tree[index].getBranchLength() != -1) {
+                                tempTotal += abs(pDataArray->t->tree[index].getBranchLength()); 
+                            }
+                        }
+                    }
+                    
+                    index = parent;    
+                }
+                
+                //get all nodes above the root to add so we don't add their u values above
+                index = *(rootForGrouping[grouping].begin());
+                
+                while(pDataArray->t->tree[index].getParent() != -1){
+                    int parent = pDataArray->t->tree[index].getParent();
+                    rootForGrouping[grouping].insert(parent);
+                    index = parent;
+                }
+                
+                /*************************************************************************************/
+                               double weightedSum = ((numSeqsInGroupI * sum) / (double)pDataArray->ct->getGroupCount(groupA));
+                
+                               D[count] += weightedSum;
+                       }
+                       
+                       //adding the wieghted sums from group l
+                       for (int j = 0; j < pDataArray->t->groupNodeInfo[groupB].size(); j++) { //the leaf nodes that have seqs from group l
+                               map<string, int>::iterator it = pDataArray->t->tree[pDataArray->t->groupNodeInfo[groupB][j]].pcount.find(groupB);
+                               int numSeqsInGroupL = it->second;
+                               
+                               //double sum = getLengthToRoot(pDataArray->t, pDataArray->t->groupNodeInfo[groupB][j], groupA, groupB);
+                /*************************************************************************************/
+                double sum = 0.0;
+                int index = pDataArray->t->groupNodeInfo[groupB][j];
+                
+                //you are a leaf
+                if(pDataArray->t->tree[index].getBranchLength() != -1){        sum += abs(pDataArray->t->tree[index].getBranchLength());       }
+                double tempTotal = 0.0;
+                index = pDataArray->t->tree[index].getParent();
+                
+                vector<string> grouping; grouping.push_back(groupA); grouping.push_back(groupB);
+                
+                rootForGrouping[grouping].insert(index);
+                
+                //while you aren't at root
+                while(pDataArray->t->tree[index].getParent() != -1){
+                    
+                    if (pDataArray->m->control_pressed) {  return 0; }
+                    
+                    int parent = pDataArray->t->tree[index].getParent();
+                    
+                    if (pDataArray->includeRoot) { //add everyone
+                        if(pDataArray->t->tree[index].getBranchLength() != -1){        sum += abs(pDataArray->t->tree[index].getBranchLength());       }
+                    }else {
+                        
+                        //am I the root for this grouping? if so I want to stop "early"
+                        //does my sibling have descendants from the users groups?
+                        int lc = pDataArray->t->tree[parent].getLChild();
+                        int rc = pDataArray->t->tree[parent].getRChild();
+                        
+                        int sib = lc;
+                        if (lc == index) { sib = rc; }
+                        
+                        map<string, int>::iterator itGroup;
+                        int pcountSize = 0;
+                        itGroup = pDataArray->t->tree[sib].pcount.find(groupA);
+                        if (itGroup != pDataArray->t->tree[sib].pcount.end()) { pcountSize++;  }
+                        itGroup = pDataArray->t->tree[sib].pcount.find(groupB);
+                        if (itGroup != pDataArray->t->tree[sib].pcount.end()) { pcountSize++;  }
+                        
+                        //if yes, I am not the root so add me
+                        if (pcountSize != 0) {
+                            if (pDataArray->t->tree[index].getBranchLength() != -1) {
+                                sum += abs(pDataArray->t->tree[index].getBranchLength()) + tempTotal;
+                                tempTotal = 0.0;
+                            }else {
+                                sum += tempTotal;
+                                tempTotal = 0.0;
+                            }
+                            rootForGrouping[grouping].clear();
+                            rootForGrouping[grouping].insert(parent);
+                        }else { //if no, I may be the root so add my br to tempTotal until I am proven innocent
+                            if (pDataArray->t->tree[index].getBranchLength() != -1) {
+                                tempTotal += abs(pDataArray->t->tree[index].getBranchLength());
+                            }
+                        }
+                    }
+                    
+                    index = parent;
+                }
+                
+                //get all nodes above the root to add so we don't add their u values above
+                index = *(rootForGrouping[grouping].begin());
+                
+                while(pDataArray->t->tree[index].getParent() != -1){
+                    int parent = pDataArray->t->tree[index].getParent();
+                    rootForGrouping[grouping].insert(parent);
+                    index = parent;
+                }
+                
+                /*************************************************************************************/
+
+                               double weightedSum = ((numSeqsInGroupL * sum) / (double)pDataArray->ct->getGroupCount(groupB));
+                
+                               D[count] += weightedSum;
+                       }
+                       count++;
+               }
+        
+               //calculate u for the group comb
+               for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) {
+                       //report progress
+                       //pDataArray->m->mothurOut("Processing combo: " + toString(h)); pDataArray->m->mothurOutEndLine();
+            
+                       string groupA = pDataArray->namesOfGroupCombos[h][0];
+                       string groupB = pDataArray->namesOfGroupCombos[h][1];
+                       
+                       //calculate u for the group comb
+                       for(int i=0;i<pDataArray->t->getNumNodes();i++){
+                               
+                               if (pDataArray->m->control_pressed) { return 0; }
+                               
+                               double u;
+                               //int pcountSize = 0;
+                               //does this node have descendants from groupA
+                               it = pDataArray->t->tree[i].pcount.find(groupA);
+                               //if it does u = # of its descendants with a certain group / total number in tree with a certain group
+                               if (it != pDataArray->t->tree[i].pcount.end()) {
+                                       u = (double) pDataArray->t->tree[i].pcount[groupA] / (double) pDataArray->ct->getGroupCount(groupA);
+                               }else { u = 0.00; }
+                               
+                               
+                               //does this node have descendants from group l
+                               it = pDataArray->t->tree[i].pcount.find(groupB);
+                               
+                               //if it does subtract their percentage from u
+                               if (it != pDataArray->t->tree[i].pcount.end()) {
+                                       u -= (double) pDataArray->t->tree[i].pcount[groupB] / (double) pDataArray->ct->getGroupCount(groupB);
+                               }
+                               
+                               if (pDataArray->includeRoot) {
+                                       if (pDataArray->t->tree[i].getBranchLength() != -1) {
+                                               u = abs(u * pDataArray->t->tree[i].getBranchLength());
+                                               WScore[(groupA+groupB)] += u;
+                                       }
+                               }else {
+                                       //if this is not the root then add it
+                                       if (rootForGrouping[pDataArray->namesOfGroupCombos[h]].count(i) == 0) {
+                                               if (pDataArray->t->tree[i].getBranchLength() != -1) {
+                                                       u = abs(u * pDataArray->t->tree[i].getBranchLength());
+                                                       WScore[(groupA+groupB)] += u;
+                                               }
+                                       }
+                               }
+                       }
+                       
+               }
+               
+               /********************************************************/
+               //calculate weighted score for the group combination
+               double UN;
+               count = 0;
+               for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) {
+                       UN = (WScore[pDataArray->namesOfGroupCombos[h][0]+pDataArray->namesOfGroupCombos[h][1]] / D[count]);
+                       if (isnan(UN) || isinf(UN)) { UN = 0; }
+                       pDataArray->results.push_back(UN);
+                       count++;
+               }
+        
+               return 0;
+
+    }
+       catch(exception& e) {
+               pDataArray->m->errorOut(e, "Weighted", "MyWeightedThreadFunction");
+               exit(1);
+       }
+}
+#endif
 
 
 #endif
 
 
 #endif