X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=blobdiff_plain;f=indicatorcommand.cpp;h=fd818acb44c4c5280c6a54971d3d39413307861d;hp=b92c58b54852b35c80ad039720c847b8d35d02bb;hb=050a3ff02473a3d4c0980964e1a9ebe52e55d6b8;hpb=deba0af0ccdcb6005ed5b2b82649b137c63fbdf7 diff --git a/indicatorcommand.cpp b/indicatorcommand.cpp index b92c58b..fd818ac 100644 --- a/indicatorcommand.cpp +++ b/indicatorcommand.cpp @@ -291,8 +291,8 @@ int IndicatorCommand::execute(){ for (int i = 0; i < m->Treenames.size(); i++) { nameMap.insert(m->Treenames[i]); //sanity check - is this a group that is not in the sharedfile? + if (i == 0) { gps.insert("Group1"); } if (designfile == "") { - if (i == 0) { gps.insert("Group1"); } if (!(m->inUsersGroups(m->Treenames[i], m->getAllGroups()))) { m->mothurOut("[ERROR]: " + m->Treenames[i] + " is not a group in your shared or relabund file."); m->mothurOutEndLine(); mismatch = true; @@ -453,7 +453,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); + pValues = getPValues(groupings, lookup.size(), indicatorValues); }else { vector< vector > groupings; set groupsAlreadyAdded; @@ -476,7 +476,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); + pValues = getPValues(groupings, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -627,7 +627,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); + pValues = getPValues(groupings, lookup.size(), indicatorValues); }else { vector< vector > groupings; @@ -676,7 +676,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); + pValues = getPValues(groupings, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -1142,7 +1142,7 @@ int IndicatorCommand::getSharedFloat(){ } } //********************************************************************************************************************** -vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ +vector IndicatorCommand::driver(vector< vector >& groupings, int num, vector indicatorValues, int numIters){ try { vector pvalues; pvalues.resize(indicatorValues.size(), 0); @@ -1150,7 +1150,7 @@ vector IndicatorCommand::driver(vector< vector for(int i=0;icontrol_pressed) { break; } - groupingsMap = randomizeGroupings(groupings, num); + map< vector, vector > groupingsMap = randomizeGroupings(groupings, num); vector randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap); for (int j = 0; j < indicatorValues.size(); j++) { @@ -1166,23 +1166,29 @@ vector IndicatorCommand::driver(vector< vector } } //********************************************************************************************************************** -vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ +vector IndicatorCommand::getPValues(vector< vector >& groupings, int num, vector indicatorValues){ try { vector pvalues; - -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + if(processors == 1){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + pvalues = driver(groupings, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } }else{ + //divide iters between processors + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + vector processIDS; + int process = 1; - //divide iters between processors - int numItersPerProcessor = iters / processors; - - vector processIDS; - int process = 1; - int num = 0; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1191,7 +1197,7 @@ vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::getPValues(vector< vectormothurRemove(tempFile); } - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } - } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #else - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i > newGroupings; + + for (int k = 0; k < groupings.size(); k++) { + vector newLookup; + for (int l = 0; l < groupings[k].size(); l++) { + SharedRAbundFloatVector* temp = new SharedRAbundFloatVector(); + temp->setLabel(groupings[k][l]->getLabel()); + temp->setGroup(groupings[k][l]->getGroup()); + newLookup.push_back(temp); + } + newGroupings.push_back(newLookup); + } + + //for each bin + for (int l = 0; l < groupings.size(); l++) { + for (int k = 0; k < groupings[l][0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; } + + for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back(groupings[l][j]->getAbundance(k), groupings[l][j]->getGroup()); } + } + } + + vector copyIValues = indicatorValues; + + indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //do my part + pvalues = driver(groupings, num, indicatorValues, procIters[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; } + + for (int l = 0; l < pDataArray[i]->groupings.size(); l++) { + for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #endif + } + return pvalues; } @@ -1256,7 +1319,7 @@ vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ +vector IndicatorCommand::driver(vector< vector >& groupings, int num, vector indicatorValues, int numIters){ try { vector pvalues; pvalues.resize(indicatorValues.size(), 0); @@ -1264,7 +1327,7 @@ vector IndicatorCommand::driver(vector< vector >& gr for(int i=0;icontrol_pressed) { break; } - groupingsMap = randomizeGroupings(groupings, num); + map< vector, vector > groupingsMap = randomizeGroupings(groupings, num); vector randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap); for (int j = 0; j < indicatorValues.size(); j++) { @@ -1281,22 +1344,29 @@ vector IndicatorCommand::driver(vector< vector >& gr } //********************************************************************************************************************** //same as above, just data type difference -vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ +vector IndicatorCommand::getPValues(vector< vector >& groupings, int num, vector indicatorValues){ try { vector pvalues; - -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + if(processors == 1){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + pvalues = driver(groupings, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } }else{ + //divide iters between processors + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + vector processIDS; + int process = 1; - //divide iters between processors - int numItersPerProcessor = iters / processors; - - vector processIDS; - int process = 1; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1305,7 +1375,7 @@ vector IndicatorCommand::getPValues(vector< vector > processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later process++; }else if (pid == 0){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + pvalues = driver(groupings, num, indicatorValues, procIters[process]); //pass pvalues to parent ofstream out; @@ -1321,49 +1391,106 @@ vector IndicatorCommand::getPValues(vector< vector > out.close(); exit(0); - }else { - m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } exit(0); } } //do my part - //special case for last processor in case it doesn't divide evenly - numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor); - pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + pvalues = driver(groupings, num, indicatorValues, procIters[0]); //force parent to wait until all the processes are done - for (int i=0;iopenInputFile(tempFile, in); ////// to do /////////// - int numTemp; numTemp = 0; + int numTemp; numTemp = 0; for (int j = 0; j < pvalues.size(); j++) { in >> numTemp; m->gobble(in); pvalues[j] += numTemp; } in.close(); m->mothurRemove(tempFile); } - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } - } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #else - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i > newGroupings; + + for (int k = 0; k < groupings.size(); k++) { + vector newLookup; + for (int l = 0; l < groupings[k].size(); l++) { + SharedRAbundFloatVector* temp = new SharedRAbundFloatVector(); + temp->setLabel(groupings[k][l]->getLabel()); + temp->setGroup(groupings[k][l]->getGroup()); + newLookup.push_back(temp); + } + newGroupings.push_back(newLookup); + } + + //for each bin + for (int l = 0; l < groupings.size(); l++) { + for (int k = 0; k < groupings[l][0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; } + + for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back((float)(groupings[l][j]->getAbundance(k)), groupings[l][j]->getGroup()); } + } + } + + vector copyIValues = indicatorValues; + + indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //do my part + pvalues = driver(groupings, num, indicatorValues, procIters[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; } + + for (int l = 0; l < pDataArray[i]->groupings.size(); l++) { + for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #endif + } + return pvalues; } catch(exception& e) { - m->errorOut(e, "IndicatorCommand", "getPValues"); + m->errorOut(e, "IndicatorCommand", "getPValues"); exit(1); } }