From: westcott Date: Tue, 22 Jun 2010 11:41:29 +0000 (+0000) Subject: added mpi code to cluster.split command X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=commitdiff_plain;h=2bb20fb79f19b8bda48492d89f8e8b7389431413 added mpi code to cluster.split command --- diff --git a/Mothur.xcodeproj/project.pbxproj b/Mothur.xcodeproj/project.pbxproj index 61b8070..6e28507 100644 --- a/Mothur.xcodeproj/project.pbxproj +++ b/Mothur.xcodeproj/project.pbxproj @@ -12,6 +12,8 @@ A703FE941194645F002C397E /* makegroupcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = makegroupcommand.cpp; sourceTree = SOURCE_ROOT; }; A7118EE511CFCAC000CFDE03 /* degapseqscommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = degapseqscommand.h; sourceTree = ""; }; A7118EE611CFCAC000CFDE03 /* degapseqscommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = degapseqscommand.cpp; sourceTree = ""; }; + A7118F0F11CFD5DC00CFDE03 /* getrelabundcommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = getrelabundcommand.h; sourceTree = ""; }; + A7118F1011CFD5DC00CFDE03 /* getrelabundcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = getrelabundcommand.cpp; sourceTree = ""; }; A71D924211AEB42400D00CBC /* clustersplitcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = clustersplitcommand.cpp; sourceTree = ""; }; A71D924311AEB42400D00CBC /* clustersplitcommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = clustersplitcommand.h; sourceTree = ""; }; A71D924411AEB42400D00CBC /* splitabundcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = splitabundcommand.cpp; sourceTree = ""; }; @@ -710,6 +712,8 @@ A7DA2060113FECD400BF472F /* getoturepcommand.cpp */, A7DA2062113FECD400BF472F /* getrabundcommand.cpp */, A7DA2063113FECD400BF472F /* getrabundcommand.h */, + A7118F0F11CFD5DC00CFDE03 /* getrelabundcommand.h */, + A7118F1011CFD5DC00CFDE03 /* getrelabundcommand.cpp */, A7DA2064113FECD400BF472F /* getsabundcommand.cpp */, A7DA2065113FECD400BF472F /* getsabundcommand.h */, A7DA2067113FECD400BF472F /* getseqscommand.h */, diff --git a/clustersplitcommand.cpp b/clustersplitcommand.cpp index 8d21d1a..38e8394 100644 --- a/clustersplitcommand.cpp +++ b/clustersplitcommand.cpp @@ -208,6 +208,9 @@ void ClusterSplitCommand::help(){ m->mothurOut("The taxonomy parameter allows you to enter the taxonomy file for your sequences, this is only valid if you are using splitmethod=classify. Be sure your taxonomy file does not include the probability scores. \n"); m->mothurOut("The taxlevel parameter allows you to specify the taxonomy level you want to use to split the distance file, default=1. \n"); m->mothurOut("The large parameter allows you to indicate that your distance matrix is too large to fit in RAM. The default value is false.\n"); + #ifdef USE_MPI + m->mothurOut("When using MPI, the processors parameter is set to the number of MPI processes running. \n"); + #endif m->mothurOut("The cluster.split command should be in the following format: \n"); m->mothurOut("cluster.split(column=youDistanceFile, name=yourNameFile, method=yourMethod, cutoff=yourCutoff, precision=yourPrecision, splitmethod=yourSplitmethod, taxonomy=yourTaxonomyfile, taxlevel=yourtaxlevel) \n"); m->mothurOut("Example: cluster.split(column=abrecovery.dist, name=abrecovery.names, method=furthest, cutoff=0.10, precision=1000, splitmethod=classify, taxonomy=abrecovery.silva.slv.taxonomy, taxlevel=5) \n"); @@ -229,8 +232,23 @@ int ClusterSplitCommand::execute(){ try { if (abort == true) { return 0; } + time_t estart; + vector listFileNames; + set labels; + string singletonName = ""; + //****************** file prep work ******************************// + #ifdef USE_MPI + int pid; + int tag = 2001; + MPI_Status status; + MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running + MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + + if (pid == 0) { //only process 0 converts and splits + + #endif //if user gave a phylip file convert to column file if (format == "phylip") { @@ -280,7 +298,7 @@ int ClusterSplitCommand::execute(){ if (m->control_pressed) { delete split; return 0; } - string singletonName = split->getSingletonNames(); + singletonName = split->getSingletonNames(); vector< map > distName = split->getDistanceFiles(); //returns map of distance files -> namefile sorted by distance file size delete split; @@ -290,8 +308,156 @@ int ClusterSplitCommand::execute(){ estart = time(NULL); //****************** break up files between processes and cluster each file set ******************************// - vector listFileNames; - set labels; + #ifdef USE_MPI + ////you are process 0 from above//// + + vector < vector < map > > dividedNames; //distNames[1] = vector of filenames for process 1... + dividedNames.resize(processors); + + //for each file group figure out which process will complete it + //want to divide the load intelligently so the big files are spread between processes + int count = 1; + for (int i = 0; i < distName.size(); i++) { + int processToAssign = (i+1) % processors; + if (processToAssign == 0) { processToAssign = processors; } + + dividedNames[(processToAssign-1)].push_back(distName[i]); + } + + //not lets reverse the order of ever other process, so we balance big files running with little ones + for (int i = 0; i < processors; i++) { + int remainder = ((i+1) % processors); + if (remainder) { reverse(dividedNames[i].begin(), dividedNames[i].end()); } + } + + + //send each child the list of files it needs to process + for(int i = 1; i < processors; i++) { + //send number of file pairs + int num = dividedNames[i].size(); + MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + + for (int j = 0; j < num; j++) { //send filenames to process i + char tempDistFileName[1024]; + strcpy(tempDistFileName, (dividedNames[i][j].begin()->first).c_str()); + int lengthDist = (dividedNames[i][j].begin()->first).length(); + + MPI_Send(&lengthDist, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(tempDistFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD); + + char tempNameFileName[1024]; + strcpy(tempNameFileName, (dividedNames[i][j].begin()->second).c_str()); + int lengthName = (dividedNames[i][j].begin()->second).length(); + + MPI_Send(&lengthName, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(tempNameFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD); + } + } + + //process your share + listFileNames = cluster(dividedNames[0], labels); + + //receive the other processes info + for(int i = 1; i < processors; i++) { + int num = dividedNames[i].size(); + + //send list filenames to root process + for (int j = 0; j < num; j++) { + int lengthList = 0; + char tempListFileName[1024]; + + MPI_Recv(&lengthList, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + MPI_Recv(tempListFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + + string myListFileName = tempListFileName; + myListFileName = myListFileName.substr(0, lengthList); + + listFileNames.push_back(myListFileName); + } + + //send Labels to root process + int numLabels = 0; + MPI_Recv(&numLabels, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + + for (int j = 0; j < numLabels; j++) { + int lengthLabel = 0; + char tempLabel[100]; + + MPI_Recv(&lengthLabel, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + MPI_Recv(tempLabel, 100, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); + + string myLabel = tempLabel; + myLabel = myLabel.substr(0, lengthLabel); + + if (labels.count(myLabel) == 0) { labels.insert(myLabel); } + } + } + + }else { //you are a child process + vector < map > myNames; + + //recieve the files you need to process + //receive number of file pairs + int num = 0; + MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + + myNames.resize(num); + + for (int j = 0; j < num; j++) { //receive filenames to process + int lengthDist = 0; + char tempDistFileName[1024]; + + MPI_Recv(&lengthDist, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + MPI_Recv(tempDistFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); + + string myDistFileName = tempDistFileName; + myDistFileName = myDistFileName.substr(0, lengthDist); + + int lengthName = 0; + char tempNameFileName[1024]; + + MPI_Recv(&lengthName, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + MPI_Recv(tempNameFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); + + string myNameFileName = tempNameFileName; + myNameFileName = myNameFileName.substr(0, lengthName); + + //save file name + myNames[j][myDistFileName] = myNameFileName; + } + + //process them + listFileNames = cluster(myNames, labels); + + //send list filenames to root process + for (int j = 0; j < num; j++) { + char tempListFileName[1024]; + strcpy(tempListFileName, listFileNames[j].c_str()); + int lengthList = listFileNames[j].length(); + + MPI_Send(&lengthList, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + MPI_Send(tempListFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + } + + //send Labels to root process + int numLabels = labels.size(); + MPI_Send(&numLabels, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + + for(set::iterator it = labels.begin(); it != labels.end(); ++it) { + char tempLabel[100]; + strcpy(tempLabel, (*it).c_str()); + int lengthLabel = (*it).length(); + + MPI_Send(&lengthLabel, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + MPI_Send(tempLabel, 100, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + } + } + + //make everyone wait + MPI_Barrier(MPI_COMM_WORLD); + + #else + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files @@ -352,7 +518,7 @@ int ClusterSplitCommand::execute(){ #else listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files #endif - + #endif if (m->control_pressed) { for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } return 0; } m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to cluster"); m->mothurOutEndLine(); @@ -361,6 +527,10 @@ int ClusterSplitCommand::execute(){ estart = time(NULL); m->mothurOut("Merging the clustered files..."); m->mothurOutEndLine(); + #ifdef USE_MPI + if (pid == 0) { //only process 0 merges + #endif + ListVector* listSingle; map labelBins = completeListFile(listFileNames, singletonName, labels, listSingle); //returns map of label to numBins @@ -376,6 +546,13 @@ int ClusterSplitCommand::execute(){ m->mothurOut("Output File Names: "); m->mothurOutEndLine(); for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } m->mothurOutEndLine(); + + #ifdef USE_MPI + } //only process 0 merges + + //make everyone wait + MPI_Barrier(MPI_COMM_WORLD); + #endif return 0; } @@ -674,6 +851,16 @@ vector ClusterSplitCommand::cluster(vector< map > distNa globaldata->setNameFile(thisNamefile); globaldata->setColumnFile(thisDistFile); globaldata->setFormat("column"); + #ifdef USE_MPI + int pid; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + + //output your files too + if (pid != 0) { + cout << endl << "Reading " << thisDistFile << endl; + } + #endif + m->mothurOutEndLine(); m->mothurOut("Reading " + thisDistFile); m->mothurOutEndLine(); ReadMatrix* read = new ReadColumnMatrix(thisDistFile); @@ -692,6 +879,14 @@ vector ClusterSplitCommand::cluster(vector< map > distNa delete read; delete nameMap; + + #ifdef USE_MPI + //output your files too + if (pid != 0) { + cout << endl << "Clustering " << thisDistFile << endl; + } + #endif + m->mothurOutEndLine(); m->mothurOut("Clustering " + thisDistFile); m->mothurOutEndLine(); rabund = new RAbundVector(list->getRAbundVector()); diff --git a/commandfactory.cpp b/commandfactory.cpp index 7de4d06..0612c6c 100644 --- a/commandfactory.cpp +++ b/commandfactory.cpp @@ -166,7 +166,6 @@ CommandFactory::CommandFactory(){ commands["clearcut"] = "clearcut"; commands["catchall"] = "catchall"; commands["split.abund"] = "split.abund"; - commands["cluster.split"] = "cluster.split"; commands["classify.otu"] = "classify.otu"; commands["degap.seqs"] = "degap.seqs"; commands["classify.seqs"] = "MPIEnabled"; @@ -181,6 +180,7 @@ CommandFactory::CommandFactory(){ commands["chimera.bellerophon"] = "MPIEnabled"; commands["screen.seqs"] = "MPIEnabled"; commands["summary.seqs"] = "MPIEnabled"; + commands["cluster.split"] = "MPIEnabled"; commands["quit"] = "MPIEnabled"; } diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index 2bf0fa3..33790fe 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -116,7 +116,7 @@ FilterSeqsCommand::FilterSeqsCommand(string option) { vertical = validParameter.validFile(parameters, "vertical", false); if (vertical == "not found") { - if ((hard == "") && (trump == '*')) { vertical = "T"; } //you have not given a hard file or set the trump char. + if ((hard == "") && (trump == '*') && (soft == 0)) { vertical = "T"; } //you have not given a hard file or set the trump char. else { vertical = "F"; } } diff --git a/makefile b/makefile index 415d699..8fefa14 100644 --- a/makefile +++ b/makefile @@ -33,7 +33,7 @@ ifeq ($(strip $(USEREADLINE)),yes) -L../readline-6.0 endif -USEMPI ?= no +USEMPI ?= yes ifeq ($(strip $(USEMPI)),yes) CC = mpic++