From 70491a12902e89b85cfa6b44a7b7fbe066ee2ac1 Mon Sep 17 00:00:00 2001 From: Sarah Westcott Date: Mon, 6 May 2013 15:08:59 -0400 Subject: [PATCH] paralellized parsimony, unifrac.unweighted, phylo.diversity, indicator commands for windows. --- indicatorcommand.cpp | 233 ++++++++++++++++++++++------- indicatorcommand.h | 137 ++++++++++++++++- parsimony.cpp | 84 +++++++---- parsimony.h | 86 ++++++++++- phylodiversitycommand.cpp | 103 +++++++++---- phylodiversitycommand.h | 139 ++++++++++++++++++ readcolumn.cpp | 20 ++- splitmatrix.cpp | 29 ++-- summarysharedcommand.cpp | 3 +- unweighted.cpp | 188 +++++++++++++++--------- unweighted.h | 302 ++++++++++++++++++++++++++++++++++++++ 11 files changed, 1121 insertions(+), 203 deletions(-) diff --git a/indicatorcommand.cpp b/indicatorcommand.cpp index 8b9a88c..fd818ac 100644 --- a/indicatorcommand.cpp +++ b/indicatorcommand.cpp @@ -453,7 +453,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); + pValues = getPValues(groupings, lookup.size(), indicatorValues); }else { vector< vector > groupings; set groupsAlreadyAdded; @@ -476,7 +476,7 @@ int IndicatorCommand::GetIndicatorSpecies(){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); + pValues = getPValues(groupings, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -627,7 +627,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookup.size(), indicatorValues); + pValues = getPValues(groupings, lookup.size(), indicatorValues); }else { vector< vector > groupings; @@ -676,7 +676,7 @@ int IndicatorCommand::GetIndicatorSpecies(Tree*& T){ indicatorValues = getValues(groupings, indicatorGroups, randomGroupingsMap); - pValues = getPValues(groupings, randomGroupingsMap, lookupFloat.size(), indicatorValues); + pValues = getPValues(groupings, lookupFloat.size(), indicatorValues); } if (m->control_pressed) { out.close(); return 0; } @@ -1142,7 +1142,7 @@ int IndicatorCommand::getSharedFloat(){ } } //********************************************************************************************************************** -vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ +vector IndicatorCommand::driver(vector< vector >& groupings, int num, vector indicatorValues, int numIters){ try { vector pvalues; pvalues.resize(indicatorValues.size(), 0); @@ -1150,7 +1150,7 @@ vector IndicatorCommand::driver(vector< vector for(int i=0;icontrol_pressed) { break; } - groupingsMap = randomizeGroupings(groupings, num); + map< vector, vector > groupingsMap = randomizeGroupings(groupings, num); vector randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap); for (int j = 0; j < indicatorValues.size(); j++) { @@ -1166,23 +1166,29 @@ vector IndicatorCommand::driver(vector< vector } } //********************************************************************************************************************** -vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ +vector IndicatorCommand::getPValues(vector< vector >& groupings, int num, vector indicatorValues){ try { vector pvalues; - -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + if(processors == 1){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + pvalues = driver(groupings, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } }else{ + //divide iters between processors + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + vector processIDS; + int process = 1; - //divide iters between processors - int numItersPerProcessor = iters / processors; - - vector processIDS; - int process = 1; - int num = 0; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1191,7 +1197,7 @@ vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::getPValues(vector< vectormothurRemove(tempFile); } - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } - } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #else - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i > newGroupings; + + for (int k = 0; k < groupings.size(); k++) { + vector newLookup; + for (int l = 0; l < groupings[k].size(); l++) { + SharedRAbundFloatVector* temp = new SharedRAbundFloatVector(); + temp->setLabel(groupings[k][l]->getLabel()); + temp->setGroup(groupings[k][l]->getGroup()); + newLookup.push_back(temp); + } + newGroupings.push_back(newLookup); + } + + //for each bin + for (int l = 0; l < groupings.size(); l++) { + for (int k = 0; k < groupings[l][0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; } + + for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back(groupings[l][j]->getAbundance(k), groupings[l][j]->getGroup()); } + } + } + + vector copyIValues = indicatorValues; + + indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //do my part + pvalues = driver(groupings, num, indicatorValues, procIters[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; } + + for (int l = 0; l < pDataArray[i]->groupings.size(); l++) { + for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #endif + } + return pvalues; } @@ -1256,7 +1319,7 @@ vector IndicatorCommand::getPValues(vector< vector IndicatorCommand::driver(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues, int numIters){ +vector IndicatorCommand::driver(vector< vector >& groupings, int num, vector indicatorValues, int numIters){ try { vector pvalues; pvalues.resize(indicatorValues.size(), 0); @@ -1264,7 +1327,7 @@ vector IndicatorCommand::driver(vector< vector >& gr for(int i=0;icontrol_pressed) { break; } - groupingsMap = randomizeGroupings(groupings, num); + map< vector, vector > groupingsMap = randomizeGroupings(groupings, num); vector randomIndicatorValues = getValues(groupings, notUsedGroupings, groupingsMap); for (int j = 0; j < indicatorValues.size(); j++) { @@ -1281,22 +1344,29 @@ vector IndicatorCommand::driver(vector< vector >& gr } //********************************************************************************************************************** //same as above, just data type difference -vector IndicatorCommand::getPValues(vector< vector >& groupings, map< vector, vector > groupingsMap, int num, vector indicatorValues){ +vector IndicatorCommand::getPValues(vector< vector >& groupings, int num, vector indicatorValues){ try { vector pvalues; - -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + if(processors == 1){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + pvalues = driver(groupings, num, indicatorValues, iters); + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } }else{ + //divide iters between processors + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + vector processIDS; + int process = 1; - //divide iters between processors - int numItersPerProcessor = iters / processors; - - vector processIDS; - int process = 1; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -1305,7 +1375,7 @@ vector IndicatorCommand::getPValues(vector< vector > processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later process++; }else if (pid == 0){ - pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + pvalues = driver(groupings, num, indicatorValues, procIters[process]); //pass pvalues to parent ofstream out; @@ -1321,49 +1391,106 @@ vector IndicatorCommand::getPValues(vector< vector > out.close(); exit(0); - }else { - m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); + }else { + m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine(); for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } exit(0); } } //do my part - //special case for last processor in case it doesn't divide evenly - numItersPerProcessor = iters - ((processors-1) * numItersPerProcessor); - pvalues = driver(groupings, groupingsMap, num, indicatorValues, numItersPerProcessor); + pvalues = driver(groupings, num, indicatorValues, procIters[0]); //force parent to wait until all the processes are done - for (int i=0;iopenInputFile(tempFile, in); ////// to do /////////// - int numTemp; numTemp = 0; + int numTemp; numTemp = 0; for (int j = 0; j < pvalues.size(); j++) { in >> numTemp; m->gobble(in); pvalues[j] += numTemp; } in.close(); m->mothurRemove(tempFile); } - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } - } + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #else - pvalues = driver(groupings, groupingsMap, num, indicatorValues, iters); - for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i > newGroupings; + + for (int k = 0; k < groupings.size(); k++) { + vector newLookup; + for (int l = 0; l < groupings[k].size(); l++) { + SharedRAbundFloatVector* temp = new SharedRAbundFloatVector(); + temp->setLabel(groupings[k][l]->getLabel()); + temp->setGroup(groupings[k][l]->getGroup()); + newLookup.push_back(temp); + } + newGroupings.push_back(newLookup); + } + + //for each bin + for (int l = 0; l < groupings.size(); l++) { + for (int k = 0; k < groupings[l][0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newGroupings.size(); j++) { for (int u = 0; u < newGroupings[j].size(); u++) { delete newGroupings[j][u]; } } return pvalues; } + + for (int j = 0; j < groupings[l].size(); j++) { newGroupings[l][j]->push_back((float)(groupings[l][j]->getAbundance(k)), groupings[l][j]->getGroup()); } + } + } + + vector copyIValues = indicatorValues; + + indicatorData* temp = new indicatorData(m, procIters[i], newGroupings, num, copyIValues); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyIndicatorThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //do my part + pvalues = driver(groupings, num, indicatorValues, procIters[0]); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->pvalues.size(); j++) { pvalues[j] += pDataArray[i]->pvalues[j]; } + + for (int l = 0; l < pDataArray[i]->groupings.size(); l++) { + for (int j = 0; j < pDataArray[i]->groupings[l].size(); j++) { delete pDataArray[i]->groupings[l][j]; } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + for (int i = 0; i < pvalues.size(); i++) { pvalues[i] /= (double)iters; } #endif + } + return pvalues; } catch(exception& e) { - m->errorOut(e, "IndicatorCommand", "getPValues"); + m->errorOut(e, "IndicatorCommand", "getPValues"); exit(1); } } diff --git a/indicatorcommand.h b/indicatorcommand.h index 1f46081..61bd565 100644 --- a/indicatorcommand.h +++ b/indicatorcommand.h @@ -58,15 +58,144 @@ private: map< vector, vector > randomizeGroupings(vector< vector >&, int); map< vector, vector > randomizeGroupings(vector< vector >&, int); - vector driver(vector< vector >&, map< vector, vector >, int, vector, int); - vector driver(vector< vector >&, map< vector, vector >, int, vector, int); + vector driver(vector< vector >&, int, vector, int); + vector driver(vector< vector >&, int, vector, int); - vector getPValues(vector< vector >&, map< vector, vector >, int, vector); - vector getPValues(vector< vector >&, map< vector, vector >, int, vector); + vector getPValues(vector< vector >&, int, vector); + vector getPValues(vector< vector >&, int, vector); }; +/**************************************************************************************************/ + +struct indicatorData { + vector< vector > groupings; + MothurOut* m; + int iters, num; + vector indicatorValues; + vector pvalues; + + indicatorData(){} + indicatorData(MothurOut* mout, int it, vector< vector > ng, int n, vector iv) { + m = mout; + iters = it; + groupings = ng; + indicatorValues = iv; + num = n; + } +}; +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) +#else +static DWORD WINAPI MyIndicatorThreadFunction(LPVOID lpParam){ + indicatorData* pDataArray; + pDataArray = (indicatorData*)lpParam; + + try { + + pDataArray->pvalues.resize(pDataArray->indicatorValues.size(), 0); + + for(int i=0;iiters;i++){ + if (pDataArray->m->control_pressed) { break; } + + //groupingsMap = randomizeGroupings(groupings, num); + /////////////////////////////////////////////////////////////////////// + map< vector, vector > randomGroupings; + + for (int j = 0; j < pDataArray->num; j++) { + + //get random groups to swap to switch with + //generate random int between 0 and groupings.size()-1 + int z = pDataArray->m->getRandomIndex(pDataArray->groupings.size()-1); + int x = pDataArray->m->getRandomIndex(pDataArray->groupings.size()-1); + int a = pDataArray->m->getRandomIndex(pDataArray->groupings[z].size()-1); + int b = pDataArray->m->getRandomIndex(pDataArray->groupings[x].size()-1); + //cout << i << '\t' << z << '\t' << x << '\t' << a << '\t' << b << endl; + + vector from; + vector to; + + from.push_back(z); from.push_back(a); + to.push_back(x); to.push_back(b); + + randomGroupings[from] = to; + } + /////////////////////////////////////////////////////////////////////// + + //vector randomIndicatorValues = getValues(groupings, notUsedGroupings, randomGroupings); + /////////////////////////////////////////////////////////////////////// + vector randomIndicatorValues; + map< vector, vector >::iterator it; + + //for each otu + for (int i = 0; i < pDataArray->groupings[0][0]->getNumBins(); i++) { + + if (pDataArray->m->control_pressed) { return 0; } + + vector terms; + float AijDenominator = 0.0; + vector Bij; + + //get overall abundance of each grouping + for (int j = 0; j < pDataArray->groupings.size(); j++) { + + float totalAbund = 0; + int numNotZero = 0; + for (int k = 0; k < pDataArray->groupings[j].size(); k++) { + vector temp; temp.push_back(j); temp.push_back(k); + it = randomGroupings.find(temp); + + if (it == randomGroupings.end()) { //this one didnt get moved + totalAbund += pDataArray->groupings[j][k]->getAbundance(i); + if (pDataArray->groupings[j][k]->getAbundance(i) != 0.0) { numNotZero++; } + }else { + totalAbund += pDataArray->groupings[(it->second)[0]][(it->second)[1]]->getAbundance(i); + if (pDataArray->groupings[(it->second)[0]][(it->second)[1]]->getAbundance(i) != 0.0) { numNotZero++; } + } + + } + + //mean abundance + float Aij = (totalAbund / (float) pDataArray->groupings[j].size()); + terms.push_back(Aij); + + //percentage of sites represented + Bij.push_back(numNotZero / (float) pDataArray->groupings[j].size()); + + AijDenominator += Aij; + } + + float maxIndVal = 0.0; + for (int j = 0; j < terms.size(); j++) { + float thisAij = (terms[j] / AijDenominator); //relative abundance + float thisValue = thisAij * Bij[j] * 100.0; + + //save largest + if (thisValue > maxIndVal) { maxIndVal = thisValue; } + } + + randomIndicatorValues.push_back(maxIndVal); + } + + /////////////////////////////////////////////////////////////////////// + + for (int j = 0; j < pDataArray->indicatorValues.size(); j++) { + if (randomIndicatorValues[j] >= pDataArray->indicatorValues[j]) { pDataArray->pvalues[j]++; } + } + } + + return 0; + + } + catch(exception& e) { + pDataArray->m->errorOut(e, "IndicatorCommand", "MyIndicatorThreadFunction"); + exit(1); + } +} +#endif + + #endif diff --git a/parsimony.cpp b/parsimony.cpp index 6a0485c..b12ca1f 100644 --- a/parsimony.cpp +++ b/parsimony.cpp @@ -54,31 +54,18 @@ EstOutput Parsimony::getValues(Tree* t, int p, string o) { namesOfGroupCombos.push_back(groups); } } - - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - if(processors == 1){ - data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), ct); - }else{ - lines.clear(); - int numPairs = namesOfGroupCombos.size(); - - int numPairsPerProcessor = numPairs / processors; - - for (int i = 0; i < processors; i++) { - int startPos = i * numPairsPerProcessor; - - if(i == processors - 1){ - numPairsPerProcessor = numPairs - i * numPairsPerProcessor; - } - - lines.push_back(linePair(startPos, numPairsPerProcessor)); - } - - data = createProcesses(t, namesOfGroupCombos, ct); - } - #else - data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), ct); - #endif + + lines.clear(); + int numPairs = namesOfGroupCombos.size(); + int numPairsPerProcessor = numPairs / processors; + + for (int i = 0; i < processors; i++) { + int startPos = i * numPairsPerProcessor; + if(i == processors - 1){ numPairsPerProcessor = numPairs - i * numPairsPerProcessor; } + lines.push_back(linePair(startPos, numPairsPerProcessor)); + } + + data = createProcesses(t, namesOfGroupCombos, ct); return data; @@ -92,12 +79,13 @@ EstOutput Parsimony::getValues(Tree* t, int p, string o) { EstOutput Parsimony::createProcesses(Tree* t, vector< vector > namesOfGroupCombos, CountTable* ct) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - int process = 1; + int process = 1; vector processIDS; EstOutput results; - + +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -160,9 +148,47 @@ EstOutput Parsimony::createProcesses(Tree* t, vector< vector > namesOfGr in.close(); m->mothurRemove(s); } +#else + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + vector cts; + vector trees; + + //Create processor worker threads. + for( int i=1; icopy(ct); + Tree* copyTree = new Tree(copyCount); + copyTree->getCopy(t); + + cts.push_back(copyCount); + trees.push_back(copyTree); + + parsData* temppars = new parsData(m, lines[i].start, lines[i].num, namesOfGroupCombos, copyTree, copyCount); + pDataArray.push_back(temppars); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyParsimonyThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + results = driver(t, namesOfGroupCombos, lines[0].start, lines[0].num, ct); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->results.size(); j++) { results.push_back(pDataArray[i]->results[j]); } + delete cts[i]; + delete trees[i]; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } - return results; #endif + return results; } catch(exception& e) { m->errorOut(e, "Parsimony", "createProcesses"); diff --git a/parsimony.h b/parsimony.h index bf0e0d4..51c0496 100644 --- a/parsimony.h +++ b/parsimony.h @@ -38,7 +38,91 @@ class Parsimony : public TreeCalculator { EstOutput driver(Tree*, vector< vector >, int, int, CountTable*); EstOutput createProcesses(Tree*, vector< vector >, CountTable*); }; - /***********************************************************************/ +struct parsData { + int start; + int num; + MothurOut* m; + EstOutput results; + vector< vector > namesOfGroupCombos; + Tree* t; + CountTable* ct; + + parsData(){} + parsData(MothurOut* mout, int st, int en, vector< vector > ngc, Tree* tree, CountTable* count) { + m = mout; + start = st; + num = en; + namesOfGroupCombos = ngc; + t = tree; + ct = count; + } +}; + +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) +#else +static DWORD WINAPI MyParsimonyThreadFunction(LPVOID lpParam){ + parsData* pDataArray; + pDataArray = (parsData*)lpParam; + try { + + pDataArray->results.resize(pDataArray->num); + + Tree* copyTree = new Tree(pDataArray->ct); + int count = 0; + + for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) { + + if (pDataArray->m->control_pressed) { delete copyTree; return 0; } + + int score = 0; + + //groups in this combo + vector groups = pDataArray->namesOfGroupCombos[h]; + + //copy users tree so that you can redo pgroups + copyTree->getCopy(pDataArray->t); + + //create pgroups that reflect the groups the user want to use + for(int i=copyTree->getNumLeaves();igetNumNodes();i++){ + copyTree->tree[i].pGroups = (copyTree->mergeUserGroups(i, groups)); + } + + for(int i=copyTree->getNumLeaves();igetNumNodes();i++){ + + if (pDataArray->m->control_pressed) { return 0; } + + int lc = copyTree->tree[i].getLChild(); + int rc = copyTree->tree[i].getRChild(); + + int iSize = copyTree->tree[i].pGroups.size(); + int rcSize = copyTree->tree[rc].pGroups.size(); + int lcSize = copyTree->tree[lc].pGroups.size(); + + //if isize are 0 then that branch is to be ignored + if (iSize == 0) { } + else if ((rcSize == 0) || (lcSize == 0)) { } + //if you have more groups than either of your kids then theres been a change. + else if(iSize > rcSize || iSize > lcSize){ + score++; + } + } + + pDataArray->results[count] = score; + count++; + } + + delete copyTree; + + return 0; + + } + catch(exception& e) { + pDataArray->m->errorOut(e, "Parsimony", "MyParsimonyThreadFunction"); + exit(1); + } +} +#endif #endif diff --git a/phylodiversitycommand.cpp b/phylodiversitycommand.cpp index 002c577..339649c 100644 --- a/phylodiversitycommand.cpp +++ b/phylodiversitycommand.cpp @@ -362,34 +362,22 @@ int PhyloDiversityCommand::execute(){ if (numSampledList.count(diversity[mGroups[j]].size()-1) == 0) { numSampledList.insert(diversity[mGroups[j]].size()-1); } } - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - if(processors == 1){ - driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); - }else{ - if (rarefy) { - vector procIters; - - int numItersPerProcessor = iters / processors; - - //divide iters between processes - for (int h = 0; h < processors; h++) { - if(h == processors - 1){ - numItersPerProcessor = iters - h * numItersPerProcessor; - } - procIters.push_back(numItersPerProcessor); - } - - createProcesses(procIters, trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum); - - }else{ //no need to paralellize if you dont want to rarefy - driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); - } - } - - #else - driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); - #endif - + if (rarefy) { + vector procIters; + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ numItersPerProcessor = iters - h * numItersPerProcessor; } + procIters.push_back(numItersPerProcessor); + } + + createProcesses(procIters, trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum); + + }else{ //no need to paralellize if you dont want to rarefy + driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); + } + if (rarefy) { printData(numSampledList, sumDiversity, outRare, iters); } } @@ -415,12 +403,13 @@ int PhyloDiversityCommand::execute(){ //********************************************************************************************************************** int PhyloDiversityCommand::createProcesses(vector& procIters, Tree* t, map< string, vector >& div, map >& sumDiv, int numIters, int increment, vector& randomLeaf, set& numSampledList, ofstream& outCollect, ofstream& outSum){ try { - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - int process = 1; + int process = 1; vector processIDS; map< string, vector >::iterator itSum; - + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -489,6 +478,56 @@ int PhyloDiversityCommand::createProcesses(vector& procIters, Tree* t, map< in.close(); m->mothurRemove(inTemp); } +#else + + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + vector cts; + vector trees; + map rootForGroup = getRootForGroups(t); + + //Create processor worker threads. + for( int i=1; icopy(ct); + Tree* copyTree = new Tree(copyCount); + copyTree->getCopy(t); + + cts.push_back(copyCount); + trees.push_back(copyTree); + + map > copydiv = div; + map > copysumDiv = sumDiv; + vector copyrandomLeaf = randomLeaf; + set copynumSampledList = numSampledList; + map copyRootForGrouping = rootForGroup; + + phylodivData* temp = new phylodivData(m, procIters[i], copydiv, copysumDiv, copyTree, copyCount, increment, copyrandomLeaf, copynumSampledList, copyRootForGrouping); + pDataArray.push_back(temp); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyPhyloDivThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + driver(t, div, sumDiv, procIters[0], increment, randomLeaf, numSampledList, outCollect, outSum, true); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (itSum = pDataArray[i]->sumDiv.begin(); itSum != pDataArray[i]->sumDiv.end(); itSum++) { + for (int k = 0; k < (itSum->second).size(); k++) { + sumDiv[itSum->first][k] += (itSum->second)[k]; + } + } + delete cts[i]; + delete trees[i]; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } #endif @@ -594,7 +633,7 @@ void PhyloDiversityCommand::printSumData(map< string, vector >& div, ofst else { score = div[mGroups[j]][numSampled] / (float)numIters; } out << setprecision(4) << score << endl; - cout << mGroups[j] << '\t' << numSampled << '\t'<< setprecision(4) << score << endl; + //cout << mGroups[j] << '\t' << numSampled << '\t'<< setprecision(4) << score << endl; } out.close(); diff --git a/phylodiversitycommand.h b/phylodiversitycommand.h index ec372ba..045d206 100644 --- a/phylodiversitycommand.h +++ b/phylodiversitycommand.h @@ -51,5 +51,144 @@ private: }; +/***********************************************************************/ +struct phylodivData { + int numIters; + MothurOut* m; + map< string, vector > div; + map > sumDiv; + map rootForGroup; + vector randomLeaf; + set numSampledList; + int increment; + Tree* t; + CountTable* ct; + bool includeRoot; + + + phylodivData(){} + phylodivData(MothurOut* mout, int ni, map< string, vector > cd, map< string, vector > csd, Tree* tree, CountTable* count, int incre, vector crl, set nsl, map rfg) { + m = mout; + t = tree; + ct = count; + div = cd; + numIters = ni; + sumDiv = csd; + increment = incre; + randomLeaf = crl; + numSampledList = nsl; + rootForGroup = rfg; + } +}; + +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) +#else +static DWORD WINAPI MyPhyloDivThreadFunction(LPVOID lpParam){ + phylodivData* pDataArray; + pDataArray = (phylodivData*)lpParam; + try { + int numLeafNodes = pDataArray->randomLeaf.size(); + vector mGroups = pDataArray->m->getGroups(); + + for (int l = 0; l < pDataArray->numIters; l++) { + random_shuffle(pDataArray->randomLeaf.begin(), pDataArray->randomLeaf.end()); + + //initialize counts + map counts; + vector< map > countedBranch; + for (int i = 0; i < pDataArray->t->getNumNodes(); i++) { + map temp; + for (int j = 0; j < mGroups.size(); j++) { temp[mGroups[j]] = false; } + countedBranch.push_back(temp); + } + + for (int j = 0; j < mGroups.size(); j++) { counts[mGroups[j]] = 0; } + + for(int k = 0; k < numLeafNodes; k++){ + + if (pDataArray->m->control_pressed) { return 0; } + + //calc branch length of randomLeaf k + //vector br = calcBranchLength(t, randomLeaf[k], countedBranch, rootForGroup); + //(Tree* t, int leaf, vector< map >& counted, map roots + ///////////////////////////////////////////////////////////////////////////////////// + vector br; + int index = pDataArray->randomLeaf[k]; + + vector groups = pDataArray->t->tree[pDataArray->randomLeaf[k]].getGroup(); + br.resize(groups.size(), 0.0); + + //you are a leaf + if(pDataArray->t->tree[index].getBranchLength() != -1){ + for (int k = 0; k < groups.size(); k++) { + br[k] += abs(pDataArray->t->tree[index].getBranchLength()); + } + } + + index = pDataArray->t->tree[index].getParent(); + + //while you aren't at root + while(pDataArray->t->tree[index].getParent() != -1){ + + if (pDataArray->m->control_pressed) { return 0; } + + for (int k = 0; k < groups.size(); k++) { + + if (index >= pDataArray->rootForGroup[groups[k]]) { countedBranch[index][groups[k]] = true; } //if you are at this groups "root", then say we are done + + if (!countedBranch[index][groups[k]]){ //if counted[index][groups[k] is true this groups has already added all br from here to root, so quit early + if (pDataArray->t->tree[index].getBranchLength() != -1) { + br[k] += abs(pDataArray->t->tree[index].getBranchLength()); + } + countedBranch[index][groups[k]] = true; + } + } + index = pDataArray->t->tree[index].getParent(); + } + ///////////////////////////////////////////////////////////////////////////////////// + + //for each group in the groups update the total branch length accounting for the names file + groups = pDataArray->t->tree[pDataArray->randomLeaf[k]].getGroup(); + + for (int j = 0; j < groups.size(); j++) { + + if (pDataArray->m->inUsersGroups(groups[j], mGroups)) { + int numSeqsInGroupJ = 0; + map::iterator it; + it = pDataArray->t->tree[pDataArray->randomLeaf[k]].pcount.find(groups[j]); + if (it != pDataArray->t->tree[pDataArray->randomLeaf[k]].pcount.end()) { //this leaf node contains seqs from group j + numSeqsInGroupJ = it->second; + } + + if (numSeqsInGroupJ != 0) { pDataArray->div[groups[j]][(counts[groups[j]]+1)] = pDataArray->div[groups[j]][counts[groups[j]]] + br[j]; } + + for (int s = (counts[groups[j]]+2); s <= (counts[groups[j]]+numSeqsInGroupJ); s++) { + pDataArray->div[groups[j]][s] = pDataArray->div[groups[j]][s-1]; //update counts, but don't add in redundant branch lengths + } + counts[groups[j]] += numSeqsInGroupJ; + } + } + } + + + //add this diversity to the sum + for (int j = 0; j < mGroups.size(); j++) { + for (int g = 0; g < pDataArray->div[mGroups[j]].size(); g++) { + pDataArray->sumDiv[mGroups[j]][g] += pDataArray->div[mGroups[j]][g]; + } + } + + } + + return 0; + } + catch(exception& e) { + pDataArray->m->errorOut(e, "PhyloDiversityCommand", "MyPhyloDivThreadFunction"); + exit(1); + } +} +#endif + #endif diff --git a/readcolumn.cpp b/readcolumn.cpp index 83d11c9..e14125c 100644 --- a/readcolumn.cpp +++ b/readcolumn.cpp @@ -48,7 +48,11 @@ int ReadColumnMatrix::read(NameAssignment* nameMap){ while(fileHandle && lt == 1){ //let's assume it's a triangular matrix... - fileHandle >> firstName >> secondName >> distance; // get the row and column names and distance + fileHandle >> firstName; m->gobble(fileHandle); + fileHandle >> secondName; m->gobble(fileHandle); + fileHandle >> distance; // get the row and column names and distance + + if (m->debug) { cout << firstName << '\t' << secondName << '\t' << distance << endl; } if (m->control_pressed) { fileHandle.close(); delete reading; return 0; } @@ -106,7 +110,9 @@ int ReadColumnMatrix::read(NameAssignment* nameMap){ m->openInputFile(distFile, fileHandle); //let's start over while(fileHandle){ - fileHandle >> firstName >> secondName >> distance; + fileHandle >> firstName; m->gobble(fileHandle); + fileHandle >> secondName; m->gobble(fileHandle); + fileHandle >> distance; // get the row and column names and distance if (m->control_pressed) { fileHandle.close(); delete reading; return 0; } @@ -167,8 +173,10 @@ int ReadColumnMatrix::read(CountTable* countTable){ while(fileHandle && lt == 1){ //let's assume it's a triangular matrix... - fileHandle >> firstName >> secondName >> distance; // get the row and column names and distance - + fileHandle >> firstName; m->gobble(fileHandle); + fileHandle >> secondName; m->gobble(fileHandle); + fileHandle >> distance; // get the row and column names and distance + if (m->control_pressed) { fileHandle.close(); delete reading; return 0; } int itA = countTable->get(firstName); @@ -224,7 +232,9 @@ int ReadColumnMatrix::read(CountTable* countTable){ m->openInputFile(distFile, fileHandle); //let's start over while(fileHandle){ - fileHandle >> firstName >> secondName >> distance; + fileHandle >> firstName; m->gobble(fileHandle); + fileHandle >> secondName; m->gobble(fileHandle); + fileHandle >> distance; // get the row and column names and distance if (m->control_pressed) { fileHandle.close(); delete reading; return 0; } diff --git a/splitmatrix.cpp b/splitmatrix.cpp index a5ae3cb..2e8b905 100644 --- a/splitmatrix.cpp +++ b/splitmatrix.cpp @@ -332,7 +332,7 @@ int SplitMatrix::splitDistanceLarge(){ int numGroups = 0; - ofstream outFile; + //ofstream outFile; ifstream dFile; m->openInputFile(distFile, dFile); @@ -408,6 +408,7 @@ int SplitMatrix::splitDistanceLarge(){ //have we reached the max buffer size if (numOutputs[groupID] > 60) { //write out sequence + ofstream outFile; outFile.open(fileName.c_str(), ios::app); outFile << outputs[groupID] << seqA << '\t' << seqB << '\t' << dist << endl; outFile.close(); @@ -434,7 +435,7 @@ int SplitMatrix::splitDistanceLarge(){ //if groupB is written to file it is above buffer size so read and write to new merged file if (wroteOutPut[groupIDB]) { string fileName2 = distFile + "." + toString(groupIDB) + ".temp"; - ifstream fileB(fileName2.c_str(), ios::ate); + /*ifstream fileB(fileName2.c_str(), ios::ate); outFile.open(fileName.c_str(), ios::app); @@ -469,17 +470,22 @@ int SplitMatrix::splitDistanceLarge(){ outFile << temp.substr(0, lastRead); delete memblock; - fileB.close(); + fileB.close();*/ + m->appendFiles(fileName2, fileName); m->mothurRemove(fileName2); + //write out the merged memory if (numOutputs[groupID] > 60) { - outFile << outputs[groupID]; + ofstream tempOut; + m->openOutputFile(fileName, tempOut); + tempOut << outputs[groupID]; outputs[groupID] = ""; numOutputs[groupID] = 0; + tempOut.close(); } - outFile.close(); + //outFile.close(); wroteOutPut[groupID] = true; wroteOutPut[groupIDB] = false; @@ -494,7 +500,7 @@ int SplitMatrix::splitDistanceLarge(){ if (wroteOutPut[groupIDA]) { string fileName2 = distFile + "." + toString(groupIDA) + ".temp"; - ifstream fileB(fileName2.c_str(), ios::ate); + /*ifstream fileB(fileName2.c_str(), ios::ate); outFile.open(fileName.c_str(), ios::app); @@ -529,17 +535,21 @@ int SplitMatrix::splitDistanceLarge(){ delete memblock; - fileB.close(); + fileB.close();*/ + m->appendFiles(fileName2, fileName); m->mothurRemove(fileName2); //write out the merged memory if (numOutputs[groupID] > 60) { - outFile << outputs[groupID]; + ofstream tempOut; + m->openOutputFile(fileName, tempOut); + tempOut << outputs[groupID]; outputs[groupID] = ""; numOutputs[groupID] = 0; + tempOut.close(); } - outFile.close(); + //outFile.close(); wroteOutPut[groupID] = true; wroteOutPut[groupIDA] = false; @@ -559,6 +569,7 @@ int SplitMatrix::splitDistanceLarge(){ //remove old names files just in case if (numOutputs[i] > 0) { + ofstream outFile; outFile.open(fileName.c_str(), ios::app); outFile << outputs[i]; outFile.close(); diff --git a/summarysharedcommand.cpp b/summarysharedcommand.cpp index e82d1f6..d93fb6e 100644 --- a/summarysharedcommand.cpp +++ b/summarysharedcommand.cpp @@ -703,7 +703,7 @@ int SummarySharedCommand::process(vector thisLookup, string vector pDataArray; DWORD dwThreadIdArray[processors-1]; - HANDLE hThreadArray[processors-1]; + HANDLE hThreadArray[processors-1]; //Create processor worker threads. for( int i=1; i thisLookup, string temp->setGroup(thisLookup[k]->getGroup()); newLookup.push_back(temp); } + //for each bin for (int k = 0; k < thisLookup[0]->getNumBins(); k++) { diff --git a/unweighted.cpp b/unweighted.cpp index 56694e3..a845f9b 100644 --- a/unweighted.cpp +++ b/unweighted.cpp @@ -49,31 +49,21 @@ EstOutput Unweighted::getValues(Tree* t, int p, string o) { namesOfGroupCombos.push_back(groups); } } + + lines.clear(); + int numPairs = namesOfGroupCombos.size(); + int numPairsPerProcessor = numPairs / processors; + + for (int i = 0; i < processors; i++) { + int startPos = i * numPairsPerProcessor; + if(i == processors - 1){ numPairsPerProcessor = numPairs - i * numPairsPerProcessor; } + lines.push_back(linePair(startPos, numPairsPerProcessor)); + } + + data = createProcesses(t, namesOfGroupCombos, ct); - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - if(processors == 1){ - data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), ct); - }else{ - int numPairs = namesOfGroupCombos.size(); - - int numPairsPerProcessor = numPairs / processors; - - for (int i = 0; i < processors; i++) { - int startPos = i * numPairsPerProcessor; - - if(i == processors - 1){ - numPairsPerProcessor = numPairs - i * numPairsPerProcessor; - } - - lines.push_back(linePair(startPos, numPairsPerProcessor)); - } - data = createProcesses(t, namesOfGroupCombos, ct); - lines.clear(); - } - #else - data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), ct); - #endif - + lines.clear(); + return data; } catch(exception& e) { @@ -85,11 +75,12 @@ EstOutput Unweighted::getValues(Tree* t, int p, string o) { EstOutput Unweighted::createProcesses(Tree* t, vector< vector > namesOfGroupCombos, CountTable* ct) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - int process = 1; + int process = 1; vector processIDS; EstOutput results; +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { @@ -155,11 +146,47 @@ EstOutput Unweighted::createProcesses(Tree* t, vector< vector > namesOfG in.close(); m->mothurRemove(s); } +#else + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + vector cts; + vector trees; + + //Create processor worker threads. + for( int i=1; icopy(ct); + Tree* copyTree = new Tree(copyCount); + copyTree->getCopy(t); + + cts.push_back(copyCount); + trees.push_back(copyTree); + + unweightedData* tempweighted = new unweightedData(m, lines[i].start, lines[i].num, namesOfGroupCombos, copyTree, copyCount, includeRoot); + pDataArray.push_back(tempweighted); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyUnWeightedThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + results = driver(t, namesOfGroupCombos, lines[0].start, lines[0].num, ct); - //m->mothurOut("DONE."); m->mothurOutEndLine(); m->mothurOutEndLine(); + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); - return results; -#endif + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->results.size(); j++) { results.push_back(pDataArray[i]->results[j]); } + delete cts[i]; + delete trees[i]; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + +#endif + return results; } catch(exception& e) { m->errorOut(e, "Unweighted", "createProcesses"); @@ -175,10 +202,7 @@ EstOutput Unweighted::driver(Tree* t, vector< vector > namesOfGroupCombo int count = 0; int total = num; - int twentyPercent = (total * 0.20); - if (twentyPercent == 0) { twentyPercent = 1; } - - + for (int h = start; h < (start+num); h++) { if (m->control_pressed) { return results; } @@ -240,12 +264,7 @@ EstOutput Unweighted::driver(Tree* t, vector< vector > namesOfGroupCombo } count++; - //report progress - //if((count % twentyPercent) == 0) { float tempOut = (count / (float)total); if (isnan(tempOut) || isinf(tempOut)) { tempOut = 0.0; } m->mothurOut("Percentage complete: " + toString((int(tempOut) * 100.0))); m->mothurOutEndLine(); } - } - - //report progress - //if((count % twentyPercent) != 0) { float tempOut = (count / (float)total); if (isnan(tempOut) || isinf(tempOut)) { tempOut = 0.0; } m->mothurOut("Percentage complete: " + toString((int(tempOut) * 100.0))); m->mothurOutEndLine(); } + } return results; } @@ -294,31 +313,20 @@ EstOutput Unweighted::getValues(Tree* t, string groupA, string groupB, int p, st namesOfGroupCombos.push_back(groups); } } + + lines.clear(); + int numPairs = namesOfGroupCombos.size(); + int numPairsPerProcessor = numPairs / processors; + + for (int i = 0; i < processors; i++) { + int startPos = i * numPairsPerProcessor; + if(i == processors - 1){ numPairsPerProcessor = numPairs - i * numPairsPerProcessor; } + lines.push_back(linePair(startPos, numPairsPerProcessor)); + } + + data = createProcesses(t, namesOfGroupCombos, true, ct); + lines.clear(); - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - if(processors == 1){ - data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), true, ct); - }else{ - int numPairs = namesOfGroupCombos.size(); - - int numPairsPerProcessor = numPairs / processors; - - for (int i = 0; i < processors; i++) { - int startPos = i * numPairsPerProcessor; - if(i == processors - 1){ - numPairsPerProcessor = numPairs - i * numPairsPerProcessor; - } - lines.push_back(linePair(startPos, numPairsPerProcessor)); - } - - data = createProcesses(t, namesOfGroupCombos, true, ct); - - lines.clear(); - } - #else - data = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), true, ct); - #endif - return data; } catch(exception& e) { @@ -330,12 +338,12 @@ EstOutput Unweighted::getValues(Tree* t, string groupA, string groupB, int p, st EstOutput Unweighted::createProcesses(Tree* t, vector< vector > namesOfGroupCombos, bool usingGroups, CountTable* ct) { try { -#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) - int process = 1; + int process = 1; vector processIDS; EstOutput results; - +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) + //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -399,9 +407,51 @@ EstOutput Unweighted::createProcesses(Tree* t, vector< vector > namesOfG in.close(); m->mothurRemove(s); } +#else + //for some reason it doesn't seem to be calculating hte random trees scores. all scores are the same even though copytree appears to be randomized. + + /* + //fill in functions + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + vector cts; + vector trees; + + //Create processor worker threads. + for( int i=1; icopy(ct); + Tree* copyTree = new Tree(copyCount); + copyTree->getCopy(t); + + cts.push_back(copyCount); + trees.push_back(copyTree); + + unweightedData* tempweighted = new unweightedData(m, lines[i].start, lines[i].num, namesOfGroupCombos, copyTree, copyCount, includeRoot); + pDataArray.push_back(tempweighted); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyUnWeightedRandomThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + results = driver(t, namesOfGroupCombos, lines[0].start, lines[0].num, usingGroups, ct); - return results; -#endif + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->results.size(); j++) { results.push_back(pDataArray[i]->results[j]); } + delete cts[i]; + delete trees[i]; + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } */ + + results = driver(t, namesOfGroupCombos, 0, namesOfGroupCombos.size(), usingGroups, ct); +#endif + return results; } catch(exception& e) { m->errorOut(e, "Unweighted", "createProcesses"); @@ -481,7 +531,7 @@ EstOutput Unweighted::driver(Tree* t, vector< vector > namesOfGroupCombo if (isnan(UW) || isinf(UW)) { UW = 0; } results[count] = UW; - } + } count++; } diff --git a/unweighted.h b/unweighted.h index b136b00..0291853 100644 --- a/unweighted.h +++ b/unweighted.h @@ -46,5 +46,307 @@ class Unweighted : public TreeCalculator { }; /***********************************************************************/ +struct unweightedData { + int start; + int num; + MothurOut* m; + EstOutput results; + vector< vector > namesOfGroupCombos; + Tree* t; + CountTable* ct; + bool includeRoot; + + unweightedData(){} + unweightedData(MothurOut* mout, int st, int en, vector< vector > ngc, Tree* tree, CountTable* count, bool ir) { + m = mout; + start = st; + num = en; + namesOfGroupCombos = ngc; + t = tree; + ct = count; + includeRoot = ir; + } +}; + +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix) +#else +static DWORD WINAPI MyUnWeightedThreadFunction(LPVOID lpParam){ + unweightedData* pDataArray; + pDataArray = (unweightedData*)lpParam; + try { + pDataArray->results.resize(pDataArray->num); + map< vector, set > rootForGrouping; + + int count = 0; + + for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) { + if (pDataArray->m->control_pressed) { return 0; } + + double UniqueBL=0.0000; //a branch length is unique if it's chidren are from the same group + double totalBL = 0.00; //all branch lengths + double UW = 0.00; //Unweighted Value = UniqueBL / totalBL; + + //find a node that belongs to one of the groups in this combo + int nodeBelonging = -1; + for (int g = 0; g < pDataArray->namesOfGroupCombos[h].size(); g++) { + if (pDataArray->t->groupNodeInfo[pDataArray->namesOfGroupCombos[h][g]].size() != 0) { nodeBelonging = pDataArray->t->groupNodeInfo[pDataArray->namesOfGroupCombos[h][g]][0]; break; } + } + + //sanity check + if (nodeBelonging == -1) { + pDataArray->m->mothurOut("[WARNING]: cannot find a nodes in the tree from grouping "); + for (int g = 0; g < pDataArray->namesOfGroupCombos[h].size()-1; g++) { pDataArray->m->mothurOut(pDataArray->namesOfGroupCombos[h][g] + "-"); } + pDataArray->m->mothurOut(pDataArray->namesOfGroupCombos[h][pDataArray->namesOfGroupCombos[h].size()-1]); + pDataArray->m->mothurOut(", skipping."); pDataArray->m->mothurOutEndLine(); pDataArray->results[count] = UW; + }else{ + + //if including the root this clears rootForGrouping[namesOfGroupCombos[h]] + //getRoot(t, nodeBelonging, namesOfGroupCombos[h]); + ///////////////////////////////////////////////////////////////////////////// + //you are a leaf so get your parent + vector grouping = pDataArray->namesOfGroupCombos[h]; + int index = pDataArray->t->tree[nodeBelonging].getParent(); + + if (pDataArray->includeRoot) { + rootForGrouping[grouping].clear(); + }else { + + //my parent is a potential root + rootForGrouping[grouping].insert(index); + + //while you aren't at root + while(pDataArray->t->tree[index].getParent() != -1){ + //cout << index << endl; + if (pDataArray->m->control_pressed) { return 0; } + + //am I the root for this grouping? if so I want to stop "early" + //does my sibling have descendants from the users groups? + //if so I am not the root + int parent = pDataArray->t->tree[index].getParent(); + int lc = pDataArray->t->tree[parent].getLChild(); + int rc = pDataArray->t->tree[parent].getRChild(); + + int sib = lc; + if (lc == index) { sib = rc; } + + map::iterator itGroup; + int pcountSize = 0; + for (int j = 0; j < grouping.size(); j++) { + map::iterator itGroup = pDataArray->t->tree[sib].pcount.find(grouping[j]); + if (itGroup != pDataArray->t->tree[sib].pcount.end()) { pcountSize++; if (pcountSize > 1) { break; } } + } + + //if yes, I am not the root + if (pcountSize != 0) { + rootForGrouping[grouping].clear(); + rootForGrouping[grouping].insert(parent); + } + + index = parent; + } + + //get all nodes above the root to add so we don't add their u values above + index = *(rootForGrouping[grouping].begin()); + while(pDataArray->t->tree[index].getParent() != -1){ + int parent = pDataArray->t->tree[index].getParent(); + rootForGrouping[grouping].insert(parent); + //cout << parent << " in root" << endl; + index = parent; + } + } + ///////////////////////////////////////////////////////////////////////////// + + for(int i=0;it->getNumNodes();i++){ + + if (pDataArray->m->control_pressed) { return 0; } + //cout << i << endl; + //pcountSize = 0, they are from a branch that is entirely from a group the user doesn't want + //pcountSize = 2, not unique to one group + //pcountSize = 1, unique to one group + + int pcountSize = 0; + for (int j = 0; j < pDataArray->namesOfGroupCombos[h].size(); j++) { + map::iterator itGroup = pDataArray->t->tree[i].pcount.find(pDataArray->namesOfGroupCombos[h][j]); + if (itGroup != pDataArray->t->tree[i].pcount.end()) { pcountSize++; if (pcountSize > 1) { break; } } + } + + + //unique calc + if (pcountSize == 0) { } + else if ((pDataArray->t->tree[i].getBranchLength() != -1) && (pcountSize == 1) && (rootForGrouping[pDataArray->namesOfGroupCombos[h]].count(i) == 0)) { //you have a unique branch length and you are not the root + UniqueBL += abs(pDataArray->t->tree[i].getBranchLength()); + } + + //total calc + if (pcountSize == 0) { } + else if ((pDataArray->t->tree[i].getBranchLength() != -1) && (pcountSize != 0) && (rootForGrouping[pDataArray->namesOfGroupCombos[h]].count(i) == 0)) { //you have a branch length and you are not the root + totalBL += abs(pDataArray->t->tree[i].getBranchLength()); + } + } + //cout << UniqueBL << '\t' << totalBL << endl; + UW = (UniqueBL / totalBL); + + if (isnan(UW) || isinf(UW)) { UW = 0; } + + pDataArray->results[count] = UW; + } + count++; + } + + return 0; + + } + catch(exception& e) { + pDataArray->m->errorOut(e, "UnWeighted", "MyUnWeightedThreadFunction"); + exit(1); + } +} +/**************************************************************************************************/ + +static DWORD WINAPI MyUnWeightedRandomThreadFunction(LPVOID lpParam){ + unweightedData* pDataArray; + pDataArray = (unweightedData*)lpParam; + try { + pDataArray->results.resize(pDataArray->num); + + int count = 0; + + Tree* copyTree = new Tree(pDataArray->ct); + + for (int h = pDataArray->start; h < (pDataArray->start+pDataArray->num); h++) { + + if (pDataArray->m->control_pressed) { return 0; } + + map< vector, set > rootForGrouping; + + //copy random tree passed in + copyTree->getCopy(pDataArray->t); + + //swap labels in the groups you want to compare + copyTree->assembleRandomUnifracTree(pDataArray->namesOfGroupCombos[h]); + + double UniqueBL=0.0000; //a branch length is unique if it's chidren are from the same group + double totalBL = 0.00; //all branch lengths + double UW = 0.00; //Unweighted Value = UniqueBL / totalBL; + //find a node that belongs to one of the groups in this combo + int nodeBelonging = -1; + for (int g = 0; g < pDataArray->namesOfGroupCombos[h].size(); g++) { + if (copyTree->groupNodeInfo[pDataArray->namesOfGroupCombos[h][g]].size() != 0) { nodeBelonging = copyTree->groupNodeInfo[pDataArray->namesOfGroupCombos[h][g]][0]; break; } + } + + //sanity check + if (nodeBelonging == -1) { + pDataArray->m->mothurOut("[WARNING]: cannot find a nodes in the tree from grouping "); + for (int g = 0; g < pDataArray->namesOfGroupCombos[h].size()-1; g++) { pDataArray->m->mothurOut(pDataArray->namesOfGroupCombos[h][g] + "-"); } + pDataArray->m->mothurOut(pDataArray->namesOfGroupCombos[h][pDataArray->namesOfGroupCombos[h].size()-1]); + pDataArray->m->mothurOut(", skipping."); pDataArray->m->mothurOutEndLine(); pDataArray->results[count] = UW; + }else{ + + //if including the root this clears rootForGrouping[namesOfGroupCombos[h]] + //getRoot(copyTree, nodeBelonging, namesOfGroupCombos[h]); + ///////////////////////////////////////////////////////////////////////////// + //you are a leaf so get your parent + vector grouping = pDataArray->namesOfGroupCombos[h]; + int index = copyTree->tree[nodeBelonging].getParent(); + + if (pDataArray->includeRoot) { + rootForGrouping[grouping].clear(); + }else { + + //my parent is a potential root + rootForGrouping[grouping].insert(index); + + //while you aren't at root + while(copyTree->tree[index].getParent() != -1){ + //cout << index << endl; + if (pDataArray->m->control_pressed) { return 0; } + + //am I the root for this grouping? if so I want to stop "early" + //does my sibling have descendants from the users groups? + //if so I am not the root + int parent = copyTree->tree[index].getParent(); + int lc = copyTree->tree[parent].getLChild(); + int rc = copyTree->tree[parent].getRChild(); + + int sib = lc; + if (lc == index) { sib = rc; } + + map::iterator itGroup; + int pcountSize = 0; + for (int j = 0; j < grouping.size(); j++) { + map::iterator itGroup = copyTree->tree[sib].pcount.find(grouping[j]); + if (itGroup != copyTree->tree[sib].pcount.end()) { pcountSize++; if (pcountSize > 1) { break; } } + } + + //if yes, I am not the root + if (pcountSize != 0) { + rootForGrouping[grouping].clear(); + rootForGrouping[grouping].insert(parent); + } + + index = parent; + } + + //get all nodes above the root to add so we don't add their u values above + index = *(rootForGrouping[grouping].begin()); + while(copyTree->tree[index].getParent() != -1){ + int parent = copyTree->tree[index].getParent(); + rootForGrouping[grouping].insert(parent); + //cout << parent << " in root" << endl; + index = parent; + } + } + ///////////////////////////////////////////////////////////////////////////// + for(int i=0;igetNumNodes();i++){ + + if (pDataArray->m->control_pressed) { return 0; } + + //pcountSize = 0, they are from a branch that is entirely from a group the user doesn't want + //pcountSize = 2, not unique to one group + //pcountSize = 1, unique to one group + int pcountSize = 0; + for (int j = 0; j < pDataArray->namesOfGroupCombos[h].size(); j++) { + map::iterator itGroup = copyTree->tree[i].pcount.find(pDataArray->namesOfGroupCombos[h][j]); + if (itGroup != copyTree->tree[i].pcount.end()) { pcountSize++; if (pcountSize > 1) { break; } } + } + + //unique calc + if (pcountSize == 0) { } + else if ((copyTree->tree[i].getBranchLength() != -1) && (pcountSize == 1) && (rootForGrouping[pDataArray->namesOfGroupCombos[h]].count(i) == 0)) { //you have a unique branch length and you are not the root + UniqueBL += abs(copyTree->tree[i].getBranchLength()); + } + + //total calc + if (pcountSize == 0) { } + else if ((copyTree->tree[i].getBranchLength() != -1) && (pcountSize != 0) && (rootForGrouping[pDataArray->namesOfGroupCombos[h]].count(i) == 0)) { //you have a branch length and you are not the root + totalBL += abs(copyTree->tree[i].getBranchLength()); + } + + } + cout << h << '\t' << UniqueBL << '\t' << totalBL << endl; + UW = (UniqueBL / totalBL); + + if (isnan(UW) || isinf(UW)) { UW = 0; } + + pDataArray->results[count] = UW; + cout << h << '\t' << UW << endl; + } + count++; + + } + + delete copyTree; + + return 0; + } + catch(exception& e) { + pDataArray->m->errorOut(e, "UnWeighted", "MyUnWeightedRandomThreadFunction"); + exit(1); + } +} + +#endif + #endif -- 2.39.2