From b9800d7b05252b911b10d36febfb5e6da3271766 Mon Sep 17 00:00:00 2001 From: Sarah Westcott Date: Tue, 28 Feb 2012 14:05:17 -0500 Subject: [PATCH] paralellized dist.shared for windows --- matrixoutputcommand.cpp | 109 +++++++++++---------- matrixoutputcommand.h | 203 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 265 insertions(+), 47 deletions(-) diff --git a/matrixoutputcommand.cpp b/matrixoutputcommand.cpp index d1cf54b..6537963 100644 --- a/matrixoutputcommand.cpp +++ b/matrixoutputcommand.cpp @@ -8,47 +8,6 @@ */ #include "matrixoutputcommand.h" -#include "sharedsobscollectsummary.h" -#include "sharedchao1.h" -#include "sharedace.h" -#include "sharednseqs.h" -#include "sharedjabund.h" -#include "sharedsorabund.h" -#include "sharedjclass.h" -#include "sharedsorclass.h" -#include "sharedjest.h" -#include "sharedsorest.h" -#include "sharedthetayc.h" -#include "sharedthetan.h" -#include "sharedkstest.h" -#include "whittaker.h" -#include "sharedochiai.h" -#include "sharedanderbergs.h" -#include "sharedkulczynski.h" -#include "sharedkulczynskicody.h" -#include "sharedlennon.h" -#include "sharedmorisitahorn.h" -#include "sharedbraycurtis.h" -#include "sharedjackknife.h" -#include "whittaker.h" -#include "odum.h" -#include "canberra.h" -#include "structeuclidean.h" -#include "structchord.h" -#include "hellinger.h" -#include "manhattan.h" -#include "structpearson.h" -#include "soergel.h" -#include "spearman.h" -#include "structkulczynski.h" -#include "structchi2.h" -#include "speciesprofile.h" -#include "hamming.h" -#include "gower.h" -#include "memchi2.h" -#include "memchord.h" -#include "memeuclidean.h" -#include "mempearson.h" //********************************************************************************************************************** vector MatrixOutputCommand::setParameters(){ @@ -459,13 +418,14 @@ int MatrixOutputCommand::process(vector thisLookup){ vector subset; vector< vector > calcDists; calcDists.resize(matrixCalculators.size()); //one for each calc, this will be used to make .dist files - #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ driver(thisLookup, 0, numGroups, calcDists); }else{ int process = 1; vector processIDS; - + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) //loop through and create all the processes you want while (process != processors) { int pid = fork(); @@ -529,11 +489,66 @@ int MatrixOutputCommand::process(vector thisLookup){ intemp.close(); m->mothurRemove(tempdistFileName); } - + #else + ////////////////////////////////////////////////////////////////////////////////////////////////////// + //Windows version shared memory, so be careful when passing variables through the distSharedData struct. + //Above fork() will clone, so memory is separate, but that's not the case with windows, + //Taking advantage of shared memory to pass results vectors. + ////////////////////////////////////////////////////////////////////////////////////////////////////// + + vector pDataArray; + DWORD dwThreadIdArray[processors-1]; + HANDLE hThreadArray[processors-1]; + + //Create processor worker threads. + for( int i=1; i newLookup; + for (int k = 0; k < thisLookup.size(); k++) { + SharedRAbundVector* temp = new SharedRAbundVector(); + temp->setLabel(thisLookup[k]->getLabel()); + temp->setGroup(thisLookup[k]->getGroup()); + newLookup.push_back(temp); + } + + //for each bin + for (int k = 0; k < thisLookup[0]->getNumBins(); k++) { + if (m->control_pressed) { for (int j = 0; j < newLookup.size(); j++) { delete newLookup[j]; } return 0; } + for (int j = 0; j < thisLookup.size(); j++) { newLookup[j]->push_back(thisLookup[j]->getAbundance(k), thisLookup[j]->getGroup()); } + } + + // Allocate memory for thread data. + distSharedData* tempSum = new distSharedData(m, lines[i].start, lines[i].end, Estimators, newLookup); + pDataArray.push_back(tempSum); + processIDS.push_back(i); + + hThreadArray[i-1] = CreateThread(NULL, 0, MyDistSharedThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]); + } + + //parent do your part + driver(thisLookup, lines[0].start, lines[0].end, calcDists); + + //Wait until all threads have terminated. + WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE); + + //Close all thread handles and free memory allocations. + for(int i=0; i < pDataArray.size(); i++){ + for (int j = 0; j < pDataArray[i]->thisLookup.size(); j++) { delete pDataArray[i]->thisLookup[j]; } + + for (int k = 0; k < calcDists.size(); k++) { + int size = pDataArray[i]->calcDists[k].size(); + for (int j = 0; j < size; j++) { calcDists[k].push_back(pDataArray[i]->calcDists[k][j]); } + } + + CloseHandle(hThreadArray[i]); + delete pDataArray[i]; + } + + #endif } -#else - driver(thisLookup, 0, numGroups, calcDists); -#endif + + for (int i = 0; i < calcDists.size(); i++) { if (m->control_pressed) { break; } diff --git a/matrixoutputcommand.h b/matrixoutputcommand.h index ae38b46..748d8c2 100644 --- a/matrixoutputcommand.h +++ b/matrixoutputcommand.h @@ -13,6 +13,48 @@ #include "inputdata.h" #include "groupmap.h" #include "validcalculator.h" +#include "sharedsobscollectsummary.h" +#include "sharedchao1.h" +#include "sharedace.h" +#include "sharednseqs.h" +#include "sharedjabund.h" +#include "sharedsorabund.h" +#include "sharedjclass.h" +#include "sharedsorclass.h" +#include "sharedjest.h" +#include "sharedsorest.h" +#include "sharedthetayc.h" +#include "sharedthetan.h" +#include "sharedkstest.h" +#include "whittaker.h" +#include "sharedochiai.h" +#include "sharedanderbergs.h" +#include "sharedkulczynski.h" +#include "sharedkulczynskicody.h" +#include "sharedlennon.h" +#include "sharedmorisitahorn.h" +#include "sharedbraycurtis.h" +#include "sharedjackknife.h" +#include "whittaker.h" +#include "odum.h" +#include "canberra.h" +#include "structeuclidean.h" +#include "structchord.h" +#include "hellinger.h" +#include "manhattan.h" +#include "structpearson.h" +#include "soergel.h" +#include "spearman.h" +#include "structkulczynski.h" +#include "structchi2.h" +#include "speciesprofile.h" +#include "hamming.h" +#include "gower.h" +#include "memchi2.h" +#include "memchord.h" +#include "memeuclidean.h" +#include "mempearson.h" + // aka. dist.shared() @@ -66,6 +108,167 @@ private: }; +/**************************************************************************************************/ +//custom data structure for threads to use. +//main process handling the calcs that can do more than 2 groups +// This is passed by void pointer so it can be any data type +// that can be passed using a single void pointer (LPVOID). +struct distSharedData { + vector thisLookup; + vector< vector > calcDists; + vector Estimators; + unsigned long long start; + unsigned long long end; + MothurOut* m; + + distSharedData(){} + distSharedData(MothurOut* mout, unsigned long long st, unsigned long long en, vector est, vector lu) { + m = mout; + start = st; + end = en; + Estimators = est; + thisLookup = lu; + } +}; +/**************************************************************************************************/ +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) +#else +static DWORD WINAPI MyDistSharedThreadFunction(LPVOID lpParam){ + distSharedData* pDataArray; + pDataArray = (distSharedData*)lpParam; + + try { + + vector matrixCalculators; + ValidCalculators validCalculator; + for (int i=0; iEstimators.size(); i++) { + if (validCalculator.isValidCalculator("matrix", pDataArray->Estimators[i]) == true) { + if (pDataArray->Estimators[i] == "sharedsobs") { + matrixCalculators.push_back(new SharedSobsCS()); + }else if (pDataArray->Estimators[i] == "sharedchao") { + matrixCalculators.push_back(new SharedChao1()); + }else if (pDataArray->Estimators[i] == "sharedace") { + matrixCalculators.push_back(new SharedAce()); + }else if (pDataArray->Estimators[i] == "jabund") { + matrixCalculators.push_back(new JAbund()); + }else if (pDataArray->Estimators[i] == "sorabund") { + matrixCalculators.push_back(new SorAbund()); + }else if (pDataArray->Estimators[i] == "jclass") { + matrixCalculators.push_back(new Jclass()); + }else if (pDataArray->Estimators[i] == "sorclass") { + matrixCalculators.push_back(new SorClass()); + }else if (pDataArray->Estimators[i] == "jest") { + matrixCalculators.push_back(new Jest()); + }else if (pDataArray->Estimators[i] == "sorest") { + matrixCalculators.push_back(new SorEst()); + }else if (pDataArray->Estimators[i] == "thetayc") { + matrixCalculators.push_back(new ThetaYC()); + }else if (pDataArray->Estimators[i] == "thetan") { + matrixCalculators.push_back(new ThetaN()); + }else if (pDataArray->Estimators[i] == "kstest") { + matrixCalculators.push_back(new KSTest()); + }else if (pDataArray->Estimators[i] == "sharednseqs") { + matrixCalculators.push_back(new SharedNSeqs()); + }else if (pDataArray->Estimators[i] == "ochiai") { + matrixCalculators.push_back(new Ochiai()); + }else if (pDataArray->Estimators[i] == "anderberg") { + matrixCalculators.push_back(new Anderberg()); + }else if (pDataArray->Estimators[i] == "kulczynski") { + matrixCalculators.push_back(new Kulczynski()); + }else if (pDataArray->Estimators[i] == "kulczynskicody") { + matrixCalculators.push_back(new KulczynskiCody()); + }else if (pDataArray->Estimators[i] == "lennon") { + matrixCalculators.push_back(new Lennon()); + }else if (pDataArray->Estimators[i] == "morisitahorn") { + matrixCalculators.push_back(new MorHorn()); + }else if (pDataArray->Estimators[i] == "braycurtis") { + matrixCalculators.push_back(new BrayCurtis()); + }else if (pDataArray->Estimators[i] == "whittaker") { + matrixCalculators.push_back(new Whittaker()); + }else if (pDataArray->Estimators[i] == "odum") { + matrixCalculators.push_back(new Odum()); + }else if (pDataArray->Estimators[i] == "canberra") { + matrixCalculators.push_back(new Canberra()); + }else if (pDataArray->Estimators[i] == "structeuclidean") { + matrixCalculators.push_back(new StructEuclidean()); + }else if (pDataArray->Estimators[i] == "structchord") { + matrixCalculators.push_back(new StructChord()); + }else if (pDataArray->Estimators[i] == "hellinger") { + matrixCalculators.push_back(new Hellinger()); + }else if (pDataArray->Estimators[i] == "manhattan") { + matrixCalculators.push_back(new Manhattan()); + }else if (pDataArray->Estimators[i] == "structpearson") { + matrixCalculators.push_back(new StructPearson()); + }else if (pDataArray->Estimators[i] == "soergel") { + matrixCalculators.push_back(new Soergel()); + }else if (pDataArray->Estimators[i] == "spearman") { + matrixCalculators.push_back(new Spearman()); + }else if (pDataArray->Estimators[i] == "structkulczynski") { + matrixCalculators.push_back(new StructKulczynski()); + }else if (pDataArray->Estimators[i] == "speciesprofile") { + matrixCalculators.push_back(new SpeciesProfile()); + }else if (pDataArray->Estimators[i] == "hamming") { + matrixCalculators.push_back(new Hamming()); + }else if (pDataArray->Estimators[i] == "structchi2") { + matrixCalculators.push_back(new StructChi2()); + }else if (pDataArray->Estimators[i] == "gower") { + matrixCalculators.push_back(new Gower()); + }else if (pDataArray->Estimators[i] == "memchi2") { + matrixCalculators.push_back(new MemChi2()); + }else if (pDataArray->Estimators[i] == "memchord") { + matrixCalculators.push_back(new MemChord()); + }else if (pDataArray->Estimators[i] == "memeuclidean") { + matrixCalculators.push_back(new MemEuclidean()); + }else if (pDataArray->Estimators[i] == "mempearson") { + matrixCalculators.push_back(new MemPearson()); + } + } + } + + pDataArray->calcDists.resize(matrixCalculators.size()); + + vector subset; + for (int k = pDataArray->start; k < pDataArray->end; k++) { // pass cdd each set of groups to compare + + for (int l = 0; l < k; l++) { + + if (k != l) { //we dont need to similiarity of a groups to itself + subset.clear(); //clear out old pair of sharedrabunds + //add new pair of sharedrabunds + subset.push_back(pDataArray->thisLookup[k]); subset.push_back(pDataArray->thisLookup[l]); + + for(int i=0;igetNeedsAll()) { + //load subset with rest of lookup for those calcs that need everyone to calc for a pair + for (int w = 0; w < pDataArray->thisLookup.size(); w++) { + if ((w != k) && (w != l)) { subset.push_back(pDataArray->thisLookup[w]); } + } + } + + vector tempdata = matrixCalculators[i]->getValues(subset); //saves the calculator outputs + + if (pDataArray->m->control_pressed) { return 1; } + + seqDist temp(l, k, tempdata[0]); + pDataArray->calcDists[i].push_back(temp); + } + } + } + } + + for(int i=0;im->errorOut(e, "MatrixOutputCommand", "MyDistSharedThreadFunction"); + exit(1); + } +} +#endif #endif -- 2.39.2