X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=blobdiff_plain;f=getmetacommunitycommand.cpp;h=ad069d00745fd3627015c6166cd4101047f8284c;hp=59efe608e624ebc4ab9e71ce77aba23554bd40cb;hb=2cfb747aa8f63bde9c1114001e6d2e81ccd26178;hpb=4a760c2d164aa955dee7d3d38da323822763d906 diff --git a/getmetacommunitycommand.cpp b/getmetacommunitycommand.cpp index 59efe60..ad069d0 100644 --- a/getmetacommunitycommand.cpp +++ b/getmetacommunitycommand.cpp @@ -7,7 +7,7 @@ // #include "getmetacommunitycommand.h" -#include "qFinderDMM.h" + //********************************************************************************************************************** vector GetMetaCommunityCommand::setParameters(){ @@ -16,8 +16,9 @@ vector GetMetaCommunityCommand::setParameters(){ CommandParameter pgroups("groups", "String", "", "", "", "", "","",false,false); parameters.push_back(pgroups); CommandParameter plabel("label", "String", "", "", "", "", "","",false,false); parameters.push_back(plabel); CommandParameter pminpartitions("minpartitions", "Number", "", "5", "", "", "","",false,false,true); parameters.push_back(pminpartitions); - CommandParameter pmaxpartitions("maxpartitions", "Number", "", "10", "", "", "","",false,false,true); parameters.push_back(pmaxpartitions); + CommandParameter pmaxpartitions("maxpartitions", "Number", "", "100", "", "", "","",false,false,true); parameters.push_back(pmaxpartitions); CommandParameter poptimizegap("optimizegap", "Number", "", "3", "", "", "","",false,false,true); parameters.push_back(poptimizegap); + CommandParameter pprocessors("processors", "Number", "", "1", "", "", "","",false,false,true); parameters.push_back(pprocessors); CommandParameter pinputdir("inputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(pinputdir); CommandParameter poutputdir("outputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(poutputdir); @@ -34,12 +35,13 @@ vector GetMetaCommunityCommand::setParameters(){ string GetMetaCommunityCommand::getHelpString(){ try { string helpString = ""; - helpString += "The get.metacommunity command parameters are shared, label, groups, minpartitions, maxpartitions and optimizegap. The shared file is required. \n"; + helpString += "The get.metacommunity command parameters are shared, label, groups, minpartitions, maxpartitions, optimizegap and processors. The shared file is required. \n"; helpString += "The label parameter is used to analyze specific labels in your input. labels are separated by dashes.\n"; helpString += "The groups parameter allows you to specify which of the groups in your shared file you would like analyzed. Group names are separated by dashes.\n"; helpString += "The minpartitions parameter is used to .... Default=5.\n"; helpString += "The maxpartitions parameter is used to .... Default=10.\n"; helpString += "The optimizegap parameter is used to .... Default=3.\n"; + helpString += "The processors parameter allows you to specify number of processors to use. The default is 1.\n"; helpString += "The get.metacommunity command should be in the following format: get.metacommunity(shared=yourSharedFile).\n"; return helpString; } @@ -55,7 +57,7 @@ string GetMetaCommunityCommand::getOutputPattern(string type) { if (type == "fit") { pattern = "[filename],[distance],mix.fit"; } else if (type == "relabund") { pattern = "[filename],[distance],[tag],mix.relabund"; } - else if (type == "design") { pattern = "[filename],mix.design"; } + else if (type == "design") { pattern = "[filename],[distance],mix.design"; } else if (type == "matrix") { pattern = "[filename],[distance],[tag],mix.posterior"; } else if (type == "parameters") { pattern = "[filename],[distance],mix.parameters"; } else if (type == "summary") { pattern = "[filename],[distance],mix.summary"; } @@ -154,6 +156,10 @@ GetMetaCommunityCommand::GetMetaCommunityCommand(string option) { temp = validParameter.validFile(parameters, "optimizegap", false); if (temp == "not found"){ temp = "3"; } m->mothurConvert(temp, optimizegap); + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); } + m->setProcessors(temp); + m->mothurConvert(temp, processors); + string groups = validParameter.validFile(parameters, "groups", false); if (groups == "not found") { groups = ""; } else { m->splitAtDash(groups, Groups); } @@ -197,7 +203,7 @@ int GetMetaCommunityCommand::execute(){ m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine(); - process(lookup); + createProcesses(lookup); processedLabels.insert(lookup[0]->getLabel()); userLabels.erase(lookup[0]->getLabel()); @@ -210,7 +216,7 @@ int GetMetaCommunityCommand::execute(){ lookup = input.getSharedRAbundVectors(lastLabel); m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine(); - process(lookup); + createProcesses(lookup); processedLabels.insert(lookup[0]->getLabel()); userLabels.erase(lookup[0]->getLabel()); @@ -251,7 +257,7 @@ int GetMetaCommunityCommand::execute(){ m->mothurOut(lookup[0]->getLabel()); m->mothurOutEndLine(); - process(lookup); + createProcesses(lookup); for (int i = 0; i < lookup.size(); i++) { delete lookup[i]; } } @@ -270,17 +276,245 @@ int GetMetaCommunityCommand::execute(){ } } //********************************************************************************************************************** -int GetMetaCommunityCommand::process(vector& thislookup){ +int GetMetaCommunityCommand::createProcesses(vector& thislookup){ try { - double minLaplace = 1e10; + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + #else + processors=1; //qFinderDMM not thread safe + #endif + + vector processIDS; + int process = 1; + int num = 0; int minPartition = 0; + + //sanity check + if (maxpartitions < processors) { processors = maxpartitions; } map variables; variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile)); variables["[distance]"] = thislookup[0]->getLabel(); string outputFileName = getOutputFileName("fit", variables); outputNames.push_back(outputFileName); outputTypes["fit"].push_back(outputFileName); + + //divide the partitions between the processors + vector< vector > dividedPartitions; + vector< vector > rels, matrix; + vector doneFlags; + dividedPartitions.resize(processors); + rels.resize(processors); + matrix.resize(processors); + + //for each file group figure out which process will complete it + //want to divide the load intelligently so the big files are spread between processes + for (int i=1; i<=maxpartitions; i++) { + //cout << i << endl; + int processToAssign = (i+1) % processors; + if (processToAssign == 0) { processToAssign = processors; } + + if (m->debug) { m->mothurOut("[DEBUG]: assigning " + toString(i) + " to process " + toString(processToAssign-1) + "\n"); } + dividedPartitions[(processToAssign-1)].push_back(i); + + variables["[tag]"] = toString(i); + string relName = getOutputFileName("relabund", variables); + string mName = getOutputFileName("matrix", variables); + rels[(processToAssign-1)].push_back(relName); + matrix[(processToAssign-1)].push_back(mName); + } + + for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours + string tempDoneFile = toString(i) + ".done.temp"; + doneFlags.push_back(tempDoneFile); + ofstream out; + m->openOutputFile(tempDoneFile, out); //clear out + out.close(); + } + + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + outputNames.clear(); + num = processDriver(thislookup, dividedPartitions[process], (outputFileName + toString(getpid())), rels[process], matrix[process], doneFlags, process); + + //pass numSeqs to parent + ofstream out; + string tempFile = toString(getpid()) + ".outputNames.temp"; + m->openOutputFile(tempFile, out); + out << num << endl; + out << outputNames.size() << endl; + for (int i = 0; i < outputNames.size(); i++) { out << outputNames[i] << endl; } + out.close(); + + exit(0); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } + exit(0); + } + } + + //do my part + minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0); + + //force parent to wait until all the processes are done + for (int i=0;i tempOutputNames = outputNames; + for (int i=0;iopenInputFile(tempFile, in); + if (!in.eof()) { + int tempNum = 0; + in >> tempNum; m->gobble(in); + if (tempNum < minPartition) { minPartition = tempNum; } + in >> tempNum; m->gobble(in); + for (int i = 0; i < tempNum; i++) { + string tempName = ""; + in >> tempName; m->gobble(in); + tempOutputNames.push_back(tempName); + } + } + in.close(); m->mothurRemove(tempFile); + + m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName); + m->mothurRemove(outputFileName + toString(processIDS[i])); + } + + if (processors > 1) { + outputNames.clear(); + for (int i = 0; i < tempOutputNames.size(); i++) { //remove files if needed + string name = tempOutputNames[i]; + vector parts; + m->splitAtChar(name, parts, '.'); + bool keep = true; + if (((parts[parts.size()-1] == "relabund") || (parts[parts.size()-1] == "posterior")) && (parts[parts.size()-2] == "mix")) { + string tempNum = parts[parts.size()-3]; + int num; m->mothurConvert(tempNum, num); + //if (num > minPartition) { + // m->mothurRemove(tempOutputNames[i]); + // keep = false; if (m->debug) { m->mothurOut("[DEBUG]: removing " + tempOutputNames[i] + ".\n"); } + //} + } + if (keep) { outputNames.push_back(tempOutputNames[i]); } + } + + //reorder fit file + ifstream in; + m->openInputFile(outputFileName, in); + string headers = m->getline(in); m->gobble(in); + + map file; + while (!in.eof()) { + string numString, line; + int num; + in >> numString; line = m->getline(in); m->gobble(in); + m->mothurConvert(numString, num); + file[num] = line; + } + in.close(); + ofstream out; + m->openOutputFile(outputFileName, out); + out << headers << endl; + for (map::iterator it = file.begin(); it != file.end(); it++) { + out << it->first << '\t' << it->second << endl; + if (m->debug) { m->mothurOut("[DEBUG]: printing: " + toString(it->first) + '\t' + it->second + ".\n"); } + } + out.close(); + } + +#else + /* + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i newLookup; + for (int k = 0; k < thislookup.size(); k++) { + SharedRAbundVector* temp = new SharedRAbundVector(); + temp->setLabel(thislookup[k]->getLabel()); + temp->setGroup(thislookup[k]->getGroup()); + newLookup.push_back(temp); + } + + //for each bin + for (int k = 0; k < thislookup[0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) { delete newLookup[j]; } return 0; } + for (int j = 0; j < thislookup.size(); j++) { newLookup[j]->push_back(thislookup[j]->getAbundance(k), thislookup[j]->getGroup()); } + } + + processIDS.push_back(i); + + // Allocate memory for thread data. + metaCommunityData* tempMeta = new metaCommunityData(newLookup, m, dividedPartitions[i], outputFileName + toString(i), rels[i], matrix[i], minpartitions, maxpartitions, optimizegap); + pDataArray.push_back(tempMeta); + + hThreadArray[i] = CreateThread(NULL, 0, MyMetaCommunityThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]); + } + + //do my part + minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + if (pDataArray[i]->minPartition < minPartition) { minPartition = pDataArray[i]->minPartition; } + for (int j = 0; j < pDataArray[i]->outputNames.size(); j++) { + outputNames.push_back(pDataArray[i]->outputNames[j]); + } + m->appendFilesWithoutHeaders(outputFileName + toString(processIDS[i]), outputFileName); + m->mothurRemove(outputFileName + toString(processIDS[i])); + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } */ + //do my part + minPartition = processDriver(thislookup, dividedPartitions[0], outputFileName, rels[0], matrix[0], doneFlags, 0); +#endif + for (int i = 0; i < processors; i++) { //read from everyone elses, just write to yours + string tempDoneFile = toString(i) + ".done.temp"; + m->mothurRemove(tempDoneFile); + } + + if (m->control_pressed) { return 0; } + + if (m->debug) { m->mothurOut("[DEBUG]: minPartition = " + toString(minPartition) + "\n"); } + + //run generate Summary function for smallest minPartition + variables["[tag]"] = toString(minPartition); + generateSummaryFile(minPartition, variables); + + return 0; + + } + catch(exception& e) { + m->errorOut(e, "GetMetaCommunityCommand", "createProcesses"); + exit(1); + } +} +//********************************************************************************************************************** +int GetMetaCommunityCommand::processDriver(vector& thislookup, vector& parts, string outputFileName, vector relabunds, vector matrix, vector doneFlags, int processID){ + try { + + double minLaplace = 1e10; + int minPartition = 0; ofstream fitData; m->openOutputFile(outputFileName, fitData); @@ -295,10 +529,26 @@ int GetMetaCommunityCommand::process(vector& thislookup){ m->mothurOut("K\tNLE\t\tlogDet\tBIC\t\tAIC\t\tLaplace\n"); fitData << "K\tNLE\tlogDet\tBIC\tAIC\tLaplace" << endl; - for(int numPartitions=1;numPartitions<=maxpartitions;numPartitions++){ + for(int i=0;idebug) { m->mothurOut("[DEBUG]: running partition " + toString(numPartitions) + "\n"); } if (m->control_pressed) { break; } + //check to see if anyone else is done + for (int j = 0; j < doneFlags.size(); j++) { + if (!m->isBlank(doneFlags[j])) { //another process has finished + //are they done at a lower partition? + ifstream in; + m->openInputFile(doneFlags[j], in); + int tempNum; + in >> tempNum; in.close(); + if (tempNum < numPartitions) { break; } //quit, because someone else has finished + } + } + qFinderDMM findQ(sharedMatrix, numPartitions); double laplace = findQ.getLaplace(); @@ -319,43 +569,47 @@ int GetMetaCommunityCommand::process(vector& thislookup){ } m->mothurOutEndLine(); - variables["[tag]"] = toString(numPartitions); - string relabund = getOutputFileName("relabund", variables); + string relabund = relabunds[i]; outputNames.push_back(relabund); outputTypes["relabund"].push_back(relabund); - string matrix = getOutputFileName("matrix", variables); - outputNames.push_back(matrix); outputTypes["matrix"].push_back(matrix); + string matrixName = matrix[i]; + outputNames.push_back(matrixName); outputTypes["matrix"].push_back(matrixName); - findQ.printZMatrix(matrix, m->getGroups()); + findQ.printZMatrix(matrixName, m->getGroups()); findQ.printRelAbund(relabund, m->currentBinLabels); - if(optimizegap != -1 && (numPartitions - minPartition) >= optimizegap && numPartitions >= minpartitions){ break; } + if(optimizegap != -1 && (numPartitions - minPartition) >= optimizegap && numPartitions >= minpartitions){ + string tempDoneFile = toString(processID) + ".done.temp"; + ofstream outDone; + m->openOutputFile(tempDoneFile, outDone); + outDone << minPartition << endl; + outDone.close(); + break; + } } fitData.close(); //minPartition = 4; if (m->control_pressed) { return 0; } - - generateSummaryFile(minpartitions, outputTypes["relabund"][0], outputTypes["relabund"][outputTypes["relabund"].size()-1], outputTypes["matrix"][outputTypes["matrix"].size()-1], thislookup[0]->getLabel()); - - return 0; + return minPartition; } catch(exception& e) { - m->errorOut(e, "GetMetaCommunityCommand", "process"); + m->errorOut(e, "GetMetaCommunityCommand", "processDriver"); exit(1); } } /**************************************************************************************************/ -vector GetMetaCommunityCommand::generateDesignFile(int numPartitions, string input){ +vector GetMetaCommunityCommand::generateDesignFile(int numPartitions, map variables){ try { vector piValues(numPartitions, 0); ifstream postFile; + variables["[tag]"] = toString(numPartitions); + string input = getOutputFileName("matrix", variables); m->openInputFile(input, postFile);//((fileRoot + toString(numPartitions) + "mix.posterior").c_str()); //matrix file - map variables; - variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(input)); + variables.erase("[tag]"); string outputFileName = getOutputFileName("design", variables); ofstream designFile; m->openOutputFile(outputFileName, designFile); @@ -416,7 +670,7 @@ vector GetMetaCommunityCommand::generateDesignFile(int numPartitions, st inline bool summaryFunction(summaryData i, summaryData j){ return i.difference > j.difference; } /**************************************************************************************************/ -int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string reference, string partFile, string designInput, string label){ +int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, map v){ try { vector summary; @@ -428,10 +682,17 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer double mean, lci, uci; - vector piValues = generateDesignFile(numPartitions, designInput); + vector piValues = generateDesignFile(numPartitions, v); ifstream referenceFile; + map variables; + variables["[filename]"] = v["[filename]"]; + variables["[distance]"] = v["[distance]"]; + variables["[tag]"] = "1"; + string reference = getOutputFileName("relabund", variables); m->openInputFile(reference, referenceFile); //((fileRoot + label + ".1mix.relabund").c_str()); + variables["[tag]"] = toString(numPartitions); + string partFile = getOutputFileName("relabund", variables); ifstream partitionFile; m->openInputFile(partFile, partitionFile); //((fileRoot + toString(numPartitions) + "mix.relabund").c_str()); @@ -486,9 +747,7 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer sort(summary.begin(), summary.end(), summaryFunction); - map variables; - variables["[filename]"] = outputDir + m->getRootName(m->getSimpleName(sharedfile)); - variables["[distance]"] = label; + variables.erase("[tag]"); string outputFileName = getOutputFileName("parameters", variables); outputNames.push_back(outputFileName); outputTypes["parameters"].push_back(outputFileName); @@ -509,7 +768,7 @@ int GetMetaCommunityCommand::generateSummaryFile(int numPartitions, string refer if (m->control_pressed) { return 0; } string summaryFileName = getOutputFileName("summary", variables); - outputNames.push_back(outputFileName); outputTypes["summary"].push_back(outputFileName); + outputNames.push_back(summaryFileName); outputTypes["summary"].push_back(summaryFileName); ofstream summaryFile; m->openOutputFile(summaryFileName, summaryFile); //((fileRoot + "mix.summary").c_str());