From 1a1ed6dda1d655ff006459f15c712f057c93ddaf Mon Sep 17 00:00:00 2001 From: westcott Date: Wed, 29 Jun 2011 11:25:17 +0000 Subject: [PATCH] paralellized the indicator command --- corraxescommand.cpp | 7 +- indicatorcommand.cpp | 292 +++++++++++++++++++++++++++++++++++-------- indicatorcommand.h | 7 +- 3 files changed, 249 insertions(+), 57 deletions(-) diff --git a/corraxescommand.cpp b/corraxescommand.cpp index cd188b8..20bdcbd 100644 --- a/corraxescommand.cpp +++ b/corraxescommand.cpp @@ -863,14 +863,14 @@ int CorrAxesCommand::getMetadata(){ //read the first label, because it refers to the groups string columnLabel; iss >> columnLabel; m->gobble(iss); - + //save names of columns you are reading while (!iss.eof()) { iss >> columnLabel; m->gobble(iss); metadataLabels.push_back(columnLabel); } int count = metadataLabels.size(); - + //read rest of file while (!in.eof()) { @@ -879,7 +879,7 @@ int CorrAxesCommand::getMetadata(){ string group = ""; in >> group; m->gobble(in); groupNames.push_back(group); - + SharedRAbundFloatVector* tempLookup = new SharedRAbundFloatVector(); tempLookup->setGroup(group); tempLookup->setLabel("1"); @@ -887,7 +887,6 @@ int CorrAxesCommand::getMetadata(){ for (int i = 0; i < count; i++) { float temp = 0.0; in >> temp; - tempLookup->push_back(temp, group); } diff --git a/indicatorcommand.cpp b/indicatorcommand.cpp index 76816df..e8863f7 100644 --- a/indicatorcommand.cpp +++ b/indicatorcommand.cpp @@ -23,6 +23,7 @@ vector IndicatorCommand::setParameters(){ CommandParameter ptree("tree", "InputTypes", "", "", "TreeDesign", "TreeDesign", "none",false,false); parameters.push_back(ptree); CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir); CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir); + CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors); vector myArray; for (int i = 0; i < parameters.size(); i++) { myArray.push_back(parameters[i].name); } @@ -44,6 +45,7 @@ string IndicatorCommand::getHelpString(){ helpString += "The design parameter allows you to relate the tree to the shared or relabund file, if your tree contains the grouping names, or if no tree is provided to group your groups into groupings.\n"; helpString += "The groups parameter allows you to specify which of the groups in your shared or relabund you would like analyzed, or if you provide a design file the groups in your design file. The groups may be entered separated by dashes.\n"; helpString += "The label parameter indicates at what distance your tree relates to the shared or relabund.\n"; + helpString += "The processors parameter allows you to specify how many processors you would like to use. The default is 1. \n"; helpString += "The iters parameter allows you to set number of randomization for the P value. The default is 1000."; helpString += "The indicator command should be used in the following format: indicator(tree=test.tre, shared=test.shared, label=0.03)\n"; helpString += "Note: No spaces between parameter labels (i.e. tree), '=' and parameters (i.e.yourTreefile).\n"; @@ -174,6 +176,10 @@ IndicatorCommand::IndicatorCommand(string option) { string temp = validParameter.validFile(parameters, "iters", false); if (temp == "not found") { temp = "1000"; } convert(temp, iters); + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); } + m->setProcessors(temp); + convert(temp, processors); + if ((relabundfile == "") && (sharedfile == "")) { //is there are current file available for either of these? //give priority to shared, then relabund @@ -204,6 +210,7 @@ IndicatorCommand::IndicatorCommand(string option) { if ((relabundfile != "") && (sharedfile != "")) { m->mothurOut("[ERROR]: You may not use both a shared and relabund file."); m->mothurOutEndLine(); abort = true; } + } } catch(exception& e) { @@ -219,6 +226,8 @@ int IndicatorCommand::execute(){ if (abort == true) { if (calledHelp) { return 0; } return 2; } cout.setf(ios::fixed, ios::floatfield); cout.setf(ios::showpoint); + + int start = time(NULL); //read designfile if given and set up globaldatas groups for read of sharedfiles if (designfile != "") { @@ -354,6 +363,8 @@ int IndicatorCommand::execute(){ if ((itTypes->second).size() != 0) { current = (itTypes->second)[0]; m->setTreeFile(current); } } } + + m->mothurOutEndLine(); m->mothurOutEndLine(); m->mothurOut("It took " + toString(time(NULL) - start) + " secs to find the indicator species."); m->mothurOutEndLine(); m->mothurOutEndLine(); m->mothurOut("Output File Names: "); m->mothurOutEndLine(); for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } @@ -418,19 +429,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, randomGroupingsMap); - pValues.resize(indicatorValues.size(), 0); - for(int i=0;icontrol_pressed) { break; } - randomGroupingsMap = randomizeGroupings(groupings, lookup.size()); - vector randomIndicatorValues = getValues(groupings, randomGroupingsMap); - - for (int j = 0; j < indicatorValues.size(); j++) { - if (randomIndicatorValues[j] >= indicatorValues[j]) { pValues[j]++; } - } - } - - for (int i = 0; i < pValues.size(); i++) { pValues[i] /= (double)iters; } - + pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); }else { vector< vector > groupings; set groupsAlreadyAdded; @@ -453,18 +452,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, randomGroupingsMap); - pValues.resize(indicatorValues.size(), 0); - for(int i=0;icontrol_pressed) { break; } - randomGroupingsMap = randomizeGroupings(groupings, lookupFloat.size()); - vector randomIndicatorValues = getValues(groupings, randomGroupingsMap); - - for (int j = 0; j < indicatorValues.size(); j++) { - if (randomIndicatorValues[j] >= indicatorValues[j]) { pValues[j]++; } - } - } - - for (int i = 0; i < pValues.size(); i++) { pValues[i] /= (double)iters; } + pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -611,19 +599,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, randomGroupingsMap); - pValues.resize(indicatorValues.size(), 0); - for(int i=0;icontrol_pressed) { break; } - randomGroupingsMap = randomizeGroupings(groupings, lookup.size()); - vector randomIndicatorValues = getValues(groupings, randomGroupingsMap); - - for (int j = 0; j < indicatorValues.size(); j++) { - if (randomIndicatorValues[j] >= indicatorValues[j]) { pValues[j]++; } - } - } - - for (int i = 0; i < pValues.size(); i++) { pValues[i] /= (double)iters; } - + pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); }else { vector< vector > groupings; @@ -672,19 +648,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, randomGroupingsMap); - pValues.resize(indicatorValues.size(), 0); - for(int i=0;icontrol_pressed) { break; } - randomGroupingsMap = randomizeGroupings(groupings, lookup.size()); - vector randomIndicatorValues = getValues(groupings, randomGroupingsMap); - - for (int j = 0; j < indicatorValues.size(); j++) { - if (randomIndicatorValues[j] >= indicatorValues[j]) { pValues[j]++; } - } - } - - for (int i = 0; i < pValues.size(); i++) { pValues[i] /= (double)iters; } + pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -1123,7 +1087,231 @@ int IndicatorCommand::getSharedFloat(){ m->errorOut(e, "IndicatorCommand", "getShared"); exit(1); } -} +} +//********************************************************************************************************************** +vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ + try { + vector pvalues; + pvalues.resize(indicatorValues.size(), 0); + + for(int i=0;icontrol_pressed) { break; } + groupingsMap = randomizeGroupings(groupings, num); + vector randomIndicatorValues = getValues(groupings, groupingsMap); + + for (int j = 0; j < indicatorValues.size(); j++) { + if (randomIndicatorValues[j] >= indicatorValues[j]) { pvalues[j]++; } + } + } + + return pvalues; + + }catch(exception& e) { + m->errorOut(e, "IndicatorCommand", "driver"); + exit(1); + } +} +//********************************************************************************************************************** +vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ + try { + vector pvalues; + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + }else{ + + //divide iters between processors + int numItersPerProcessor = iters / processors; + + vector processIDS; + int process = 1; + int num = 0; + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + + //pass pvalues to parent + ofstream out; + string tempFile = toString(getpid()) + ".pvalues.temp"; + m->openOutputFile(tempFile, out); + + //pass values + for (int i = 0; i < pvalues.size(); i++) { + out << pvalues[i] << '\t'; + } + out << endl; + + out.close(); + + exit(0); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } + exit(0); + } + } + + //do my part + //special case for last processor in case it doesn't divide evenly + numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor); + pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + + //force parent to wait until all the processes are done + for (int i=0;iopenInputFile(tempFile, in); + + ////// to do /////////// + int numTemp; numTemp = 0; + for (int j = 0; j < pvalues.size(); j++) { + in >> numTemp; m->gobble(in); + pvalues[j] += numTemp; + } + in.close(); remove(tempFile.c_str()); + } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + } +#else + pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } +#endif + + return pvalues; + } + catch(exception& e) { + m->errorOut(e, "IndicatorCommand", "getPValues"); + exit(1); + } +} + +//********************************************************************************************************************** +//same as above, just data type difference +vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ + try { + vector pvalues; + pvalues.resize(indicatorValues.size(), 0); + + for(int i=0;icontrol_pressed) { break; } + groupingsMap = randomizeGroupings(groupings, num); + vector randomIndicatorValues = getValues(groupings, groupingsMap); + + for (int j = 0; j < indicatorValues.size(); j++) { + if (randomIndicatorValues[j] >= indicatorValues[j]) { pvalues[j]++; } + } + } + + return pvalues; + + }catch(exception& e) { + m->errorOut(e, "IndicatorCommand", "driver"); + exit(1); + } +} +//********************************************************************************************************************** +//same as above, just data type difference +vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ + try { + vector pvalues; + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + }else{ + + //divide iters between processors + int numItersPerProcessor = iters / processors; + + vector processIDS; + int process = 1; + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + + //pass pvalues to parent + ofstream out; + string tempFile = toString(getpid()) + ".pvalues.temp"; + m->openOutputFile(tempFile, out); + + //pass values + for (int i = 0; i < pvalues.size(); i++) { + out << pvalues[i] << '\t'; + } + out << endl; + + out.close(); + + exit(0); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } + exit(0); + } + } + + //do my part + //special case for last processor in case it doesn't divide evenly + numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor); + pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + + //force parent to wait until all the processes are done + for (int i=0;iopenInputFile(tempFile, in); + + ////// to do /////////// + int numTemp; numTemp = 0; + for (int j = 0; j < pvalues.size(); j++) { + in >> numTemp; m->gobble(in); + pvalues[j] += numTemp; + } + in.close(); remove(tempFile.c_str()); + } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + } +#else + pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } +#endif + + return pvalues; + } + catch(exception& e) { + m->errorOut(e, "IndicatorCommand", "getPValues"); + exit(1); + } +} //********************************************************************************************************************** //swap groups between groupings, in essence randomizing the second column of the design file map< vector, vector > IndicatorCommand::randomizeGroupings(vector< vector >& groupings, int numLookupGroups){ diff --git a/indicatorcommand.h b/indicatorcommand.h index aa31562..c5112e6 100644 --- a/indicatorcommand.h +++ b/indicatorcommand.h @@ -39,7 +39,7 @@ private: GroupMap* designMap; string treefile, sharedfile, relabundfile, groups, label, inputFileName, outputDir, designfile; bool abort; - int iters; + int iters, processors; vector outputNames, Groups; vector lookup; vector lookupFloat; @@ -54,6 +54,11 @@ private: map getDistToRoot(Tree*&); map< vector, vector > randomizeGroupings(vector< vector >&, int); map< vector, vector > randomizeGroupings(vector< vector >&, int); + vector driver(vector< vector >&, map< vector, vector >, int, vector, int); + vector driver(vector< vector >&, map< vector, vector >, int, vector, int); + vector getPValues(vector< vector >&, map< vector, vector >, int, vector); + vector getPValues(vector< vector >&, map< vector, vector >, int, vector); + }; -- 2.39.2