From: westcott Date: Tue, 5 Oct 2010 10:08:43 +0000 (+0000) Subject: paralellized phylo.diversity X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=commitdiff_plain;h=f663afa231c9bc1b5e18e0ea3bdd2b2ee784f5b2 paralellized phylo.diversity --- diff --git a/chimeracheckcommand.cpp b/chimeracheckcommand.cpp index c8761b9..ac09208 100644 --- a/chimeracheckcommand.cpp +++ b/chimeracheckcommand.cpp @@ -270,8 +270,8 @@ int ChimeraCheckCommand::execute(){ //wait on chidren for(int j = 1; j < processors; j++) { - char buf[4]; - MPI_Recv(buf, 4, MPI_CHAR, j, tag, MPI_COMM_WORLD, &status); + char buf[5]; + MPI_Recv(buf, 5, MPI_CHAR, j, tag, MPI_COMM_WORLD, &status); } }else{ //you are a child process MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); @@ -289,9 +289,9 @@ int ChimeraCheckCommand::execute(){ if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); for (int j = 0; j < outputNames.size(); j++) { remove(outputNames[j].c_str()); } delete chimera; return 0; } //tell parent you are done. - char buf[4]; + char buf[5]; strcpy(buf, "done"); - MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD); } //close files diff --git a/distancecommand.cpp b/distancecommand.cpp index 871e07b..60e9050 100644 --- a/distancecommand.cpp +++ b/distancecommand.cpp @@ -276,8 +276,8 @@ int DistanceCommand::execute(){ for(int i = 1; i < processors; i++) { if (m->control_pressed) { MPI_File_close(&outMPI); delete distCalculator; return 0; } - char buf[4]; - MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + char buf[5]; + MPI_Recv(buf, 5, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); } }else { //you are a child process //do your part @@ -285,10 +285,10 @@ int DistanceCommand::execute(){ if (m->control_pressed) { MPI_File_close(&outMPI); delete distCalculator; return 0; } - char buf[4]; + char buf[5]; strcpy(buf, "done"); //tell parent you are done. - MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD); } MPI_File_close(&outMPI); diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index ffffe4e..a12058e 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -310,8 +310,8 @@ int FilterSeqsCommand::filterSequences() { //wait on chidren for(int i = 1; i < processors; i++) { - char buf[4]; - MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + char buf[5]; + MPI_Recv(buf, 5, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); } }else { //you are a child process @@ -331,11 +331,11 @@ int FilterSeqsCommand::filterSequences() { if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - char buf[4]; + char buf[5]; strcpy(buf, "done"); //tell parent you are done. - MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD); } MPI_File_close(&outMPI); diff --git a/makefile b/makefile index 0fd76c1..a5c40be 100644 --- a/makefile +++ b/makefile @@ -53,7 +53,7 @@ USEREADLINE ?= yes ifeq ($(strip $(USEREADLINE)),yes) CXXFLAGS += -DUSE_READLINE - LDFLAGS += \ + LIBS = \ -lreadline\ -lncurses endif @@ -88,7 +88,7 @@ OBJECTS=$(patsubst %.cpp,%.o,$(wildcard *.cpp)) OBJECTS+=$(patsubst %.c,%.o,$(wildcard *.c)) mothur : $(OBJECTS) - $(CXX) $(LDFLAGS) $(TARGET_ARCH) -o $@ $(OBJECTS) + $(CXX) $(LDFLAGS) $(TARGET_ARCH) -o $@ $(OBJECTS) $(LIBS) install : mothur cp mothur ../Release/mothur diff --git a/mothur b/mothur index 648dccd..405237b 100755 Binary files a/mothur and b/mothur differ diff --git a/phylodiversitycommand.cpp b/phylodiversitycommand.cpp index 6be1fdd..6e9e7b1 100644 --- a/phylodiversitycommand.cpp +++ b/phylodiversitycommand.cpp @@ -20,7 +20,7 @@ PhyloDiversityCommand::PhyloDiversityCommand(string option) { else { //valid paramters for this command - string Array[] = {"freq","rarefy","iters","groups","summary","collect","scale","outputdir","inputdir"}; + string Array[] = {"freq","rarefy","iters","groups","processors","summary","collect","scale","outputdir","inputdir"}; vector myArray (Array, Array+(sizeof(Array)/sizeof(string))); OptionParser parser(option); @@ -59,6 +59,9 @@ PhyloDiversityCommand::PhyloDiversityCommand(string option) { temp = validParameter.validFile(parameters, "collect", false); if (temp == "not found") { temp = "F"; } collect = m->isTrue(temp); + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; } + convert(temp, processors); + groups = validParameter.validFile(parameters, "groups", false); if (groups == "not found") { groups = ""; Groups = globaldata->gTreemap->namesOfGroups; globaldata->Groups = Groups; } else { @@ -80,7 +83,7 @@ PhyloDiversityCommand::PhyloDiversityCommand(string option) { void PhyloDiversityCommand::help(){ try { m->mothurOut("The phylo.diversity command can only be executed after a successful read.tree command.\n"); - m->mothurOut("The phylo.diversity command parameters are groups, iters, freq, scale, rarefy, collect and summary. No parameters are required.\n"); + m->mothurOut("The phylo.diversity command parameters are groups, iters, freq, processors, scale, rarefy, collect and summary. No parameters are required.\n"); m->mothurOut("The groups parameter allows you to specify which of the groups in your groupfile you would like analyzed. The group names are separated by dashes. By default all groups are used.\n"); m->mothurOut("The iters parameter allows you to specify the number of randomizations to preform, by default iters=1000, if you set rarefy to true.\n"); m->mothurOut("The freq parameter is used indicate when to output your data, by default it is set to 100. But you can set it to a percentage of the number of sequence. For example freq=0.10, means 10%. \n"); @@ -88,6 +91,7 @@ void PhyloDiversityCommand::help(){ m->mothurOut("The rarefy parameter allows you to create a rarefaction curve. The default is false.\n"); m->mothurOut("The collect parameter allows you to create a collectors curve. The default is false.\n"); m->mothurOut("The summary parameter allows you to create a .summary file. The default is true.\n"); + m->mothurOut("The processors parameter allows you to specify the number of processors to use. The default is 1.\n"); m->mothurOut("The phylo.diversity command should be in the following format: phylo.diversity(groups=yourGroups, rarefy=yourRarefy, iters=yourIters).\n"); m->mothurOut("Example phylo.diversity(groups=A-B-C, rarefy=T, iters=500).\n"); m->mothurOut("The phylo.diversity command output two files: .phylo.diversity and if rarefy=T, .rarefaction.\n"); @@ -177,8 +181,146 @@ int PhyloDiversityCommand::execute(){ for (int j = 0; j < globaldata->Groups.size(); j++) { if (numSampledList.count(diversity[globaldata->Groups[j]].size()-1) == 0) { numSampledList.insert(diversity[globaldata->Groups[j]].size()-1); } } + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); + }else{ + if (rarefy) { + vector procIters; + + int numItersPerProcessor = iters / processors; + + //divide iters between processes + for (int h = 0; h < processors; h++) { + if(h == processors - 1){ + numItersPerProcessor = iters - h * numItersPerProcessor; + } + procIters.push_back(numItersPerProcessor); + } + + createProcesses(procIters, trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum); + + }else{ //no need to paralellize if you dont want to rarefy + driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); + } + } + + #else + driver(trees[i], diversity, sumDiversity, iters, increment, randomLeaf, numSampledList, outCollect, outSum, true); + #endif + + if (rarefy) { printData(numSampledList, sumDiversity, outRare, iters); } + } + + + if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; } + + m->mothurOutEndLine(); + m->mothurOut("Output File Names: "); m->mothurOutEndLine(); + for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } + m->mothurOutEndLine(); - for (int l = 0; l < iters; l++) { + + return 0; + } + catch(exception& e) { + m->errorOut(e, "PhyloDiversityCommand", "execute"); + exit(1); + } +} +//********************************************************************************************************************** +int PhyloDiversityCommand::createProcesses(vector& procIters, Tree* t, map< string, vector >& div, map >& sumDiv, int numIters, int increment, vector& randomLeaf, set& numSampledList, ofstream& outCollect, ofstream& outSum){ + try { + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 1; + int num = 0; + vector processIDS; + map< string, vector >::iterator itSum; + + EstOutput results; + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + driver(t, div, sumDiv, procIters[process], increment, randomLeaf, numSampledList, outCollect, outSum, false); + + string outTemp = outputDir + toString(getpid()) + ".sumDiv.temp"; + ofstream out; + m->openOutputFile(outTemp, out); + + //output the sumDIversity + for (itSum = sumDiv.begin(); itSum != sumDiv.end(); itSum++) { + out << itSum->first << '\t' << (itSum->second).size() << '\t'; + for (int k = 0; k < (itSum->second).size(); k++) { + out << (itSum->second)[k] << '\t'; + } + out << endl; + } + + out.close(); + + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } + } + + driver(t, div, sumDiv, procIters[0], increment, randomLeaf, numSampledList, outCollect, outSum, true); + + //force parent to wait until all the processes are done + for (int i=0;i<(processors-1);i++) { + int temp = processIDS[i]; + wait(&temp); + } + + //get data created by processes + for (int i=0;i<(processors-1);i++) { + + //input the sumDIversity + string inTemp = outputDir + toString(processIDS[i]) + ".sumDiv.temp"; + ifstream in; + m->openInputFile(inTemp, in); + + //output the sumDIversity + for (int j = 0; j < sumDiv.size(); j++) { + string group = ""; + int size = 0; + + in >> group >> size; m->gobble(in); + + for (int k = 0; k < size; k++) { + float tempVal; + in >> tempVal; + + sumDiv[group][k] += tempVal; + } + m->gobble(in); + } + + in.close(); + remove(inTemp.c_str()); + } + +#endif + + return 0; + + } + catch(exception& e) { + m->errorOut(e, "PhyloDiversityCommand", "createProcesses"); + exit(1); + } +} +//********************************************************************************************************************** +int PhyloDiversityCommand::driver(Tree* t, map< string, vector >& div, map >& sumDiv, int numIters, int increment, vector& randomLeaf, set& numSampledList, ofstream& outCollect, ofstream& outSum, bool doSumCollect){ + try { + int numLeafNodes = randomLeaf.size(); + + for (int l = 0; l < numIters; l++) { random_shuffle(randomLeaf.begin(), randomLeaf.end()); //initialize counts @@ -188,26 +330,26 @@ int PhyloDiversityCommand::execute(){ for(int k = 0; k < numLeafNodes; k++){ - if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; } + if (m->control_pressed) { return 0; } //calc branch length of randomLeaf k - float br = calcBranchLength(trees[i], randomLeaf[k], countedBranch); + float br = calcBranchLength(t, randomLeaf[k], countedBranch); //for each group in the groups update the total branch length accounting for the names file - vector groups = trees[i]->tree[randomLeaf[k]].getGroup(); + vector groups = t->tree[randomLeaf[k]].getGroup(); for (int j = 0; j < groups.size(); j++) { int numSeqsInGroupJ = 0; map::iterator it; - it = trees[i]->tree[randomLeaf[k]].pcount.find(groups[j]); - if (it != trees[i]->tree[randomLeaf[k]].pcount.end()) { //this leaf node contains seqs from group j + it = t->tree[randomLeaf[k]].pcount.find(groups[j]); + if (it != t->tree[randomLeaf[k]].pcount.end()) { //this leaf node contains seqs from group j numSeqsInGroupJ = it->second; } - if (numSeqsInGroupJ != 0) { diversity[groups[j]][(counts[groups[j]]+1)] = diversity[groups[j]][counts[groups[j]]] + br; } + if (numSeqsInGroupJ != 0) { div[groups[j]][(counts[groups[j]]+1)] = div[groups[j]][counts[groups[j]]] + br; } for (int s = (counts[groups[j]]+2); s <= (counts[groups[j]]+numSeqsInGroupJ); s++) { - diversity[groups[j]][s] = diversity[groups[j]][s-1]; //update counts, but don't add in redundant branch lengths + div[groups[j]][s] = div[groups[j]][s-1]; //update counts, but don't add in redundant branch lengths } counts[groups[j]] += numSeqsInGroupJ; } @@ -216,35 +358,25 @@ int PhyloDiversityCommand::execute(){ if (rarefy) { //add this diversity to the sum for (int j = 0; j < globaldata->Groups.size(); j++) { - for (int g = 0; g < diversity[globaldata->Groups[j]].size(); g++) { - sumDiversity[globaldata->Groups[j]][g] += diversity[globaldata->Groups[j]][g]; + for (int g = 0; g < div[globaldata->Groups[j]].size(); g++) { + sumDiv[globaldata->Groups[j]][g] += div[globaldata->Groups[j]][g]; } } } - if ((collect) && (l == 0)) { printData(numSampledList, diversity, outCollect, 1); } - if ((summary) && (l == 0)) { printSumData(diversity, outSum, 1); } + if ((collect) && (l == 0) && doSumCollect) { printData(numSampledList, div, outCollect, 1); } + if ((summary) && (l == 0) && doSumCollect) { printSumData(div, outSum, 1); } } - if (rarefy) { printData(numSampledList, sumDiversity, outRare, iters); } - } - - - if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; } + return 0; - m->mothurOutEndLine(); - m->mothurOut("Output File Names: "); m->mothurOutEndLine(); - for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } - m->mothurOutEndLine(); - - - return 0; } catch(exception& e) { - m->errorOut(e, "PhyloDiversityCommand", "execute"); + m->errorOut(e, "PhyloDiversityCommand", "driver"); exit(1); } } + //********************************************************************************************************************** void PhyloDiversityCommand::printSumData(map< string, vector >& div, ofstream& out, int numIters){ diff --git a/phylodiversitycommand.h b/phylodiversitycommand.h index a7e0d2e..7d4b2d8 100644 --- a/phylodiversitycommand.h +++ b/phylodiversitycommand.h @@ -26,7 +26,7 @@ class PhyloDiversityCommand : public Command { GlobalData* globaldata; float freq; - int iters; + int iters, processors; bool abort, rarefy, summary, collect, scale; string groups, outputDir; vector Groups, outputNames; //holds groups to be used, and outputFile names @@ -34,6 +34,9 @@ class PhyloDiversityCommand : public Command { void printData(set&, map< string, vector >&, ofstream&, int); void printSumData(map< string, vector >&, ofstream&, int); float calcBranchLength(Tree*, int, map< string, set >&); + int driver(Tree*, map< string, vector >&, map >&, int, int, vector&, set&, ofstream&, ofstream&, bool); + int createProcesses(vector&, Tree*, map< string, vector >&, map >&, int, int, vector&, set&, ofstream&, ofstream&); + }; #endif