X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=blobdiff_plain;f=indicatorcommand.cpp;h=fd818acb44c4c5280c6a54971d3d39413307861d;hp=3c461d7188ea5096de8031200913a1a43bc1101c;hb=050a3ff02473a3d4c0980964e1a9ebe52e55d6b8;hpb=79a7d3273749b08d4f9f8dfe350c964ff0c4351e diff --git a/indicatorcommand.cpp b/indicatorcommand.cpp index 3c461d7..fd818ac 100644 --- a/indicatorcommand.cpp +++ b/indicatorcommand.cpp @@ -14,16 +14,16 @@ //********************************************************************************************************************** vector IndicatorCommand::setParameters(){ try { - CommandParameter piters("iters", "Number", "", "1000", "", "", "",false,false); parameters.push_back(piters); - CommandParameter pdesign("design", "InputTypes", "", "", "TreeDesign", "TreeDesign", "none",false,false); parameters.push_back(pdesign); - CommandParameter pshared("shared", "InputTypes", "", "", "SharedRel", "SharedRel", "none",false,false); parameters.push_back(pshared); - CommandParameter prelabund("relabund", "InputTypes", "", "", "SharedRel", "SharedRel", "none",false,false); parameters.push_back(prelabund); - CommandParameter pgroups("groups", "String", "", "", "", "", "",false,false); parameters.push_back(pgroups); - CommandParameter plabel("label", "String", "", "", "", "", "",false,false); parameters.push_back(plabel); - CommandParameter ptree("tree", "InputTypes", "", "", "TreeDesign", "TreeDesign", "none",false,false); parameters.push_back(ptree); - CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir); - CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir); - CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors); + CommandParameter piters("iters", "Number", "", "1000", "", "", "","",false,false); parameters.push_back(piters); + CommandParameter pdesign("design", "InputTypes", "", "", "TreeDesign", "TreeDesign", "none","summary",false,false,true); parameters.push_back(pdesign); + CommandParameter pshared("shared", "InputTypes", "", "", "SharedRel", "SharedRel", "none","summary",false,false,true); parameters.push_back(pshared); + CommandParameter prelabund("relabund", "InputTypes", "", "", "SharedRel", "SharedRel", "none","summary",false,false); parameters.push_back(prelabund); + CommandParameter pgroups("groups", "String", "", "", "", "", "","",false,false); parameters.push_back(pgroups); + CommandParameter plabel("label", "String", "", "", "", "", "","",false,false); parameters.push_back(plabel); + CommandParameter ptree("tree", "InputTypes", "", "", "TreeDesign", "TreeDesign", "none","tree-summary",false,false,true); parameters.push_back(ptree); + CommandParameter pinputdir("inputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(pinputdir); + CommandParameter poutputdir("outputdir", "String", "", "", "", "", "","",false,false); parameters.push_back(poutputdir); + CommandParameter pprocessors("processors", "Number", "", "1", "", "", "","",false,false); parameters.push_back(pprocessors); vector myArray; for (int i = 0; i < parameters.size(); i++) { myArray.push_back(parameters[i].name); } @@ -58,25 +58,20 @@ string IndicatorCommand::getHelpString(){ } } //********************************************************************************************************************** -string IndicatorCommand::getOutputFileNameTag(string type, string inputName=""){ - try { - string outputFileName = ""; - map >::iterator it; +string IndicatorCommand::getOutputPattern(string type) { + try { + string pattern = ""; - //is this a type this command creates - it = outputTypes.find(type); - if (it == outputTypes.end()) { m->mothurOut("[ERROR]: this command doesn't create a " + type + " output file.\n"); } - else { - if (type == "tree") { outputFileName = "indicator.tre"; } - else if (type == "summary") { outputFileName = "indicator.summary"; } - else { m->mothurOut("[ERROR]: No definition for type " + type + " output file tag.\n"); m->control_pressed = true; } - } - return outputFileName; - } - catch(exception& e) { - m->errorOut(e, "IndicatorCommand", "getOutputFileNameTag"); - exit(1); - } + if (type == "tree") { pattern = "[filename],indicator.tre"; } + else if (type == "summary") { pattern = "[filename],indicator.summary"; } + else { m->mothurOut("[ERROR]: No definition for type " + type + " output pattern.\n"); m->control_pressed = true; } + + return pattern; + } + catch(exception& e) { + m->errorOut(e, "IndicatorCommand", "getOutputPattern"); + exit(1); + } } //********************************************************************************************************************** IndicatorCommand::IndicatorCommand(){ @@ -296,8 +291,8 @@ int IndicatorCommand::execute(){ for (int i = 0; i < m->Treenames.size(); i++) { nameMap.insert(m->Treenames[i]); //sanity check - is this a group that is not in the sharedfile? + if (i == 0) { gps.insert("Group1"); } if (designfile == "") { - if (i == 0) { gps.insert("Group1"); } if (!(m->inUsersGroups(m->Treenames[i], m->getAllGroups()))) { m->mothurOut("[ERROR]: " + m->Treenames[i] + " is not a group in your shared or relabund file."); m->mothurOutEndLine(); mismatch = true; @@ -410,7 +405,9 @@ int IndicatorCommand::GetIndicatorSpecies(){ try { string thisOutputDir = outputDir; if (outputDir == "") { thisOutputDir += m->hasPath(inputFileName); } - string outputFileName = thisOutputDir + m->getRootName(m->getSimpleName(inputFileName)) + getOutputFileNameTag("summary"); + map variables; + variables["[filename]"] = thisOutputDir + m->getRootName(m->getSimpleName(inputFileName)); + string outputFileName = getOutputFileName("summary", variables); outputNames.push_back(outputFileName); outputTypes["summary"].push_back(outputFileName); ofstream out; @@ -456,7 +453,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); + pValues = getPValues(groupings, lookup.size(), indicatorValues); }else { vector< vector > groupings; set groupsAlreadyAdded; @@ -479,7 +476,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); + pValues = getPValues(groupings, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -526,7 +523,9 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ string thisOutputDir = outputDir; if (outputDir == "") { thisOutputDir += m->hasPath(inputFileName); } - string outputFileName = thisOutputDir + m->getRootName(m->getSimpleName(inputFileName)) + getOutputFileNameTag("summary"); + map variables; + variables["[filename]"] = thisOutputDir + m->getRootName(m->getSimpleName(inputFileName)); + string outputFileName = getOutputFileName("summary",variables); outputNames.push_back(outputFileName); outputTypes["summary"].push_back(outputFileName); ofstream out; @@ -546,7 +545,8 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ string treeOutputDir = outputDir; if (outputDir == "") { treeOutputDir += m->hasPath(treefile); } - string outputTreeFileName = treeOutputDir + m->getRootName(m->getSimpleName(treefile)) + getOutputFileNameTag("tree"); + variables["[filename]"] = treeOutputDir + m->getRootName(m->getSimpleName(treefile)); + string outputTreeFileName = getOutputFileName("tree", variables); //create a map from tree node index to names of descendants, save time later to know which sharedRabund you need @@ -627,7 +627,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); + pValues = getPValues(groupings, lookup.size(), indicatorValues); }else { vector< vector > groupings; @@ -676,7 +676,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); + pValues = getPValues(groupings, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -1142,7 +1142,7 @@ int IndicatorCommand::getSharedFloat(){ } } //********************************************************************************************************************** -vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ +vector IndicatorCommand::driver(vector< vector >& groupings, int num, vector indicatorValues, int numIters){ try { vector pvalues; pvalues.resize(indicatorValues.size(), 0); @@ -1150,7 +1150,7 @@ vector IndicatorCommand::driver(vector< vector for(int i=0;icontrol_pressed) { break; } - groupingsMap = randomizeGroupings(groupings, num); + map< vector, vector > groupingsMap = randomizeGroupings(groupings, num); vector randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap); for (int j = 0; j < indicatorValues.size(); j++) { @@ -1166,23 +1166,29 @@ vector IndicatorCommand::driver(vector< vector } } //********************************************************************************************************************** -vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ +vector IndicatorCommand::getPValues(vector< vector >& groupings, int num, vector indicatorValues){ try { vector pvalues; - -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + if(processors == 1){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + pvalues = driver(groupings, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } }else{ + //divide iters between processors + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + vector processIDS; + int process = 1; - //divide iters between processors - int numItersPerProcessor = iters / processors; - - vector processIDS; - int process = 1; - int num = 0; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1191,7 +1197,7 @@ vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::getPValues(vector< vectormothurRemove(tempFile); } - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } - } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #else - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i > newGroupings; + + for (int k = 0; k < groupings.size(); k++) { + vector newLookup; + for (int l = 0; l < groupings[k].size(); l++) { + SharedRAbundFloatVector* temp = new SharedRAbundFloatVector(); + temp->setLabel(groupings[k][l]->getLabel()); + temp->setGroup(groupings[k][l]->getGroup()); + newLookup.push_back(temp); + } + newGroupings.push_back(newLookup); + } + + //for each bin + for (int l = 0; l < groupings.size(); l++) { + for (int k = 0; k < groupings[l][0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; } + + for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back(groupings[l][j]->getAbundance(k), groupings[l][j]->getGroup()); } + } + } + + vector copyIValues = indicatorValues; + + indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //do my part + pvalues = driver(groupings, num, indicatorValues, procIters[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; } + + for (int l = 0; l < pDataArray[i]->groupings.size(); l++) { + for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #endif + } + return pvalues; } @@ -1256,7 +1319,7 @@ vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ +vector IndicatorCommand::driver(vector< vector >& groupings, int num, vector indicatorValues, int numIters){ try { vector pvalues; pvalues.resize(indicatorValues.size(), 0); @@ -1264,7 +1327,7 @@ vector IndicatorCommand::driver(vector< vector >& gr for(int i=0;icontrol_pressed) { break; } - groupingsMap = randomizeGroupings(groupings, num); + map< vector, vector > groupingsMap = randomizeGroupings(groupings, num); vector randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap); for (int j = 0; j < indicatorValues.size(); j++) { @@ -1281,22 +1344,29 @@ vector IndicatorCommand::driver(vector< vector >& gr } //********************************************************************************************************************** //same as above, just data type difference -vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ +vector IndicatorCommand::getPValues(vector< vector >& groupings, int num, vector indicatorValues){ try { vector pvalues; - -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + if(processors == 1){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + pvalues = driver(groupings, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } }else{ + //divide iters between processors + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + vector processIDS; + int process = 1; - //divide iters between processors - int numItersPerProcessor = iters / processors; - - vector processIDS; - int process = 1; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1305,7 +1375,7 @@ vector IndicatorCommand::getPValues(vector< vector > processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later process++; }else if (pid == 0){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + pvalues = driver(groupings, num, indicatorValues, procIters[process]); //pass pvalues to parent ofstream out; @@ -1321,49 +1391,106 @@ vector IndicatorCommand::getPValues(vector< vector > out.close(); exit(0); - }else { - m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } exit(0); } } //do my part - //special case for last processor in case it doesn't divide evenly - numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor); - pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + pvalues = driver(groupings, num, indicatorValues, procIters[0]); //force parent to wait until all the processes are done - for (int i=0;iopenInputFile(tempFile, in); ////// to do /////////// - int numTemp; numTemp = 0; + int numTemp; numTemp = 0; for (int j = 0; j < pvalues.size(); j++) { in >> numTemp; m->gobble(in); pvalues[j] += numTemp; } in.close(); m->mothurRemove(tempFile); } - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } - } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #else - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i > newGroupings; + + for (int k = 0; k < groupings.size(); k++) { + vector newLookup; + for (int l = 0; l < groupings[k].size(); l++) { + SharedRAbundFloatVector* temp = new SharedRAbundFloatVector(); + temp->setLabel(groupings[k][l]->getLabel()); + temp->setGroup(groupings[k][l]->getGroup()); + newLookup.push_back(temp); + } + newGroupings.push_back(newLookup); + } + + //for each bin + for (int l = 0; l < groupings.size(); l++) { + for (int k = 0; k < groupings[l][0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; } + + for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back((float)(groupings[l][j]->getAbundance(k)), groupings[l][j]->getGroup()); } + } + } + + vector copyIValues = indicatorValues; + + indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //do my part + pvalues = driver(groupings, num, indicatorValues, procIters[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; } + + for (int l = 0; l < pDataArray[i]->groupings.size(); l++) { + for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #endif + } + return pvalues; } catch(exception& e) { - m->errorOut(e, "IndicatorCommand", "getPValues"); + m->errorOut(e, "IndicatorCommand", "getPValues"); exit(1); } }