]> git.donarmstrong.com Git - mothur.git/commitdiff
added mpi code to cluster.split command
authorwestcott <westcott>
Tue, 22 Jun 2010 11:41:29 +0000 (11:41 +0000)
committerwestcott <westcott>
Tue, 22 Jun 2010 11:41:29 +0000 (11:41 +0000)
Mothur.xcodeproj/project.pbxproj
clustersplitcommand.cpp
commandfactory.cpp
filterseqscommand.cpp
makefile

index 61b807010ef618807f18b7277f937a59b107cde7..6e28507e7faca5dba63e780f5b7d212b7715da3d 100644 (file)
@@ -12,6 +12,8 @@
                A703FE941194645F002C397E /* makegroupcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = makegroupcommand.cpp; sourceTree = SOURCE_ROOT; };
                A7118EE511CFCAC000CFDE03 /* degapseqscommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = degapseqscommand.h; sourceTree = "<group>"; };
                A7118EE611CFCAC000CFDE03 /* degapseqscommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = degapseqscommand.cpp; sourceTree = "<group>"; };
+               A7118F0F11CFD5DC00CFDE03 /* getrelabundcommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = getrelabundcommand.h; sourceTree = "<group>"; };
+               A7118F1011CFD5DC00CFDE03 /* getrelabundcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = getrelabundcommand.cpp; sourceTree = "<group>"; };
                A71D924211AEB42400D00CBC /* clustersplitcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = clustersplitcommand.cpp; sourceTree = "<group>"; };
                A71D924311AEB42400D00CBC /* clustersplitcommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = clustersplitcommand.h; sourceTree = "<group>"; };
                A71D924411AEB42400D00CBC /* splitabundcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = splitabundcommand.cpp; sourceTree = "<group>"; };
                                A7DA2060113FECD400BF472F /* getoturepcommand.cpp */,
                                A7DA2062113FECD400BF472F /* getrabundcommand.cpp */,
                                A7DA2063113FECD400BF472F /* getrabundcommand.h */,
+                               A7118F0F11CFD5DC00CFDE03 /* getrelabundcommand.h */,
+                               A7118F1011CFD5DC00CFDE03 /* getrelabundcommand.cpp */,
                                A7DA2064113FECD400BF472F /* getsabundcommand.cpp */,
                                A7DA2065113FECD400BF472F /* getsabundcommand.h */,
                                A7DA2067113FECD400BF472F /* getseqscommand.h */,
index 8d21d1a83e1eff3d518861273dff976650d8c56c..38e839496536b6ae525c94efd0c5b024c7eee384 100644 (file)
@@ -208,6 +208,9 @@ void ClusterSplitCommand::help(){
                m->mothurOut("The taxonomy parameter allows you to enter the taxonomy file for your sequences, this is only valid if you are using splitmethod=classify. Be sure your taxonomy file does not include the probability scores. \n");
                m->mothurOut("The taxlevel parameter allows you to specify the taxonomy level you want to use to split the distance file, default=1. \n");
                m->mothurOut("The large parameter allows you to indicate that your distance matrix is too large to fit in RAM.  The default value is false.\n");
+               #ifdef USE_MPI
+               m->mothurOut("When using MPI, the processors parameter is set to the number of MPI processes running. \n");
+               #endif
                m->mothurOut("The cluster.split command should be in the following format: \n");
                m->mothurOut("cluster.split(column=youDistanceFile, name=yourNameFile, method=yourMethod, cutoff=yourCutoff, precision=yourPrecision, splitmethod=yourSplitmethod, taxonomy=yourTaxonomyfile, taxlevel=yourtaxlevel) \n");
                m->mothurOut("Example: cluster.split(column=abrecovery.dist, name=abrecovery.names, method=furthest, cutoff=0.10, precision=1000, splitmethod=classify, taxonomy=abrecovery.silva.slv.taxonomy, taxlevel=5) \n");       
@@ -229,8 +232,23 @@ int ClusterSplitCommand::execute(){
        try {
        
                if (abort == true) {    return 0;       }
+               
                time_t estart;
+               vector<string> listFileNames;
+               set<string> labels;
+               string singletonName = "";
+
                //****************** file prep work ******************************//
+               #ifdef USE_MPI
+                       int pid;
+                       int tag = 2001;
+                       MPI_Status status; 
+                       MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
+                       MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+                       
+                       if (pid == 0) { //only process 0 converts and splits
+                       
+               #endif
                
                //if user gave a phylip file convert to column file
                if (format == "phylip") {
@@ -280,7 +298,7 @@ int ClusterSplitCommand::execute(){
                
                if (m->control_pressed) { delete split; return 0; }
                
-               string singletonName = split->getSingletonNames();
+               singletonName = split->getSingletonNames();
                vector< map<string, string> > distName = split->getDistanceFiles();  //returns map of distance files -> namefile sorted by distance file size
                delete split;
                
@@ -290,8 +308,156 @@ int ClusterSplitCommand::execute(){
                estart = time(NULL);
                
                //****************** break up files between processes and cluster each file set ******************************//
-               vector<string> listFileNames;
-               set<string> labels;
+       #ifdef USE_MPI
+                       ////you are process 0 from above////
+                       
+                       vector < vector < map<string, string> > > dividedNames; //distNames[1] = vector of filenames for process 1...                           
+                       dividedNames.resize(processors);
+                                       
+                       //for each file group figure out which process will complete it
+                       //want to divide the load intelligently so the big files are spread between processes
+                       int count = 1;
+                       for (int i = 0; i < distName.size(); i++) { 
+                               int processToAssign = (i+1) % processors; 
+                               if (processToAssign == 0) { processToAssign = processors; }
+                                               
+                               dividedNames[(processToAssign-1)].push_back(distName[i]);
+                       }
+                                       
+                       //not lets reverse the order of ever other process, so we balance big files running with little ones
+                       for (int i = 0; i < processors; i++) {
+                               int remainder = ((i+1) % processors);
+                               if (remainder) {  reverse(dividedNames[i].begin(), dividedNames[i].end());  }
+                       }
+                       
+                       
+                       //send each child the list of files it needs to process
+                       for(int i = 1; i < processors; i++) { 
+                               //send number of file pairs
+                               int num = dividedNames[i].size();
+                               MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+                               
+                               for (int j = 0; j < num; j++) { //send filenames to process i
+                                       char tempDistFileName[1024];
+                                       strcpy(tempDistFileName, (dividedNames[i][j].begin()->first).c_str());
+                                       int lengthDist = (dividedNames[i][j].begin()->first).length();
+                                       
+                                       MPI_Send(&lengthDist, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+                                       MPI_Send(tempDistFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD);
+                                       
+                                       char tempNameFileName[1024];
+                                       strcpy(tempNameFileName, (dividedNames[i][j].begin()->second).c_str());
+                                       int lengthName = (dividedNames[i][j].begin()->second).length();
+
+                                       MPI_Send(&lengthName, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
+                                       MPI_Send(tempNameFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD);
+                               }
+                       }
+                       
+                       //process your share
+                       listFileNames = cluster(dividedNames[0], labels);
+                       
+                       //receive the other processes info
+                       for(int i = 1; i < processors; i++) { 
+                               int num = dividedNames[i].size();
+                               
+                               //send list filenames to root process
+                               for (int j = 0; j < num; j++) {  
+                                       int lengthList = 0;
+                                       char tempListFileName[1024];
+                               
+                                       MPI_Recv(&lengthList, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+                                       MPI_Recv(tempListFileName, 1024, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
+                               
+                                       string myListFileName = tempListFileName;
+                                       myListFileName = myListFileName.substr(0, lengthList);
+                                       
+                                       listFileNames.push_back(myListFileName);
+                               }
+                               
+                               //send Labels to root process
+                               int numLabels = 0;
+                               MPI_Recv(&numLabels, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+                               
+                               for (int j = 0; j < numLabels; j++) {  
+                                       int lengthLabel = 0;
+                                       char tempLabel[100];
+                               
+                                       MPI_Recv(&lengthLabel, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status);
+                                       MPI_Recv(tempLabel, 100, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
+                               
+                                       string myLabel = tempLabel;
+                                       myLabel = myLabel.substr(0, lengthLabel);
+                       
+                                       if (labels.count(myLabel) == 0) { labels.insert(myLabel); }
+                               }
+                       }
+                       
+               }else { //you are a child process
+                       vector < map<string, string> >  myNames;
+                       
+                       //recieve the files you need to process
+                       //receive number of file pairs
+                       int num = 0;
+                       MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                       
+                       myNames.resize(num);
+       
+                       for (int j = 0; j < num; j++) { //receive filenames to process 
+                               int lengthDist = 0;
+                               char tempDistFileName[1024];
+                               
+                               MPI_Recv(&lengthDist, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                               MPI_Recv(tempDistFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); 
+                               
+                               string myDistFileName = tempDistFileName;
+                               myDistFileName = myDistFileName.substr(0, lengthDist);
+                       
+                               int lengthName = 0;
+                               char tempNameFileName[1024];
+                               
+                               MPI_Recv(&lengthName, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+                               MPI_Recv(tempNameFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); 
+                               
+                               string myNameFileName = tempNameFileName;
+                               myNameFileName = myNameFileName.substr(0, lengthName);
+                               
+                               //save file name
+                               myNames[j][myDistFileName] = myNameFileName;
+                       }
+       
+                       //process them
+                       listFileNames = cluster(myNames, labels);
+                       
+                       //send list filenames to root process
+                       for (int j = 0; j < num; j++) {  
+                               char tempListFileName[1024];
+                               strcpy(tempListFileName, listFileNames[j].c_str());
+                               int lengthList = listFileNames[j].length();
+                                       
+                               MPI_Send(&lengthList, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                               MPI_Send(tempListFileName, 1024, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+                       }
+                       
+                       //send Labels to root process
+                       int numLabels = labels.size();
+                       MPI_Send(&numLabels, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                       
+                       for(set<string>::iterator it = labels.begin(); it != labels.end(); ++it) {
+                               char tempLabel[100];
+                               strcpy(tempLabel, (*it).c_str());
+                               int lengthLabel = (*it).length();
+                                       
+                               MPI_Send(&lengthLabel, 1, MPI_INT, 0, tag, MPI_COMM_WORLD);
+                               MPI_Send(tempLabel, 100, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
+                       }
+               }
+               
+               //make everyone wait
+               MPI_Barrier(MPI_COMM_WORLD);
+               
+       #else
+
                #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
                                if(processors == 1){
                                        listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
@@ -352,7 +518,7 @@ int ClusterSplitCommand::execute(){
                #else
                                listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files
                #endif
-               
+       #endif  
                if (m->control_pressed) { for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } return 0; }
                
                m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to cluster"); m->mothurOutEndLine();
@@ -361,6 +527,10 @@ int ClusterSplitCommand::execute(){
                estart = time(NULL);
                m->mothurOut("Merging the clustered files..."); m->mothurOutEndLine();
                
+               #ifdef USE_MPI
+                       if (pid == 0) { //only process 0 merges
+               #endif
+
                ListVector* listSingle;
                map<float, int> labelBins = completeListFile(listFileNames, singletonName, labels, listSingle); //returns map of label to numBins
                
@@ -376,6 +546,13 @@ int ClusterSplitCommand::execute(){
                m->mothurOut("Output File Names: "); m->mothurOutEndLine();
                for (int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();    }
                m->mothurOutEndLine();
+               
+               #ifdef USE_MPI
+                       } //only process 0 merges
+                       
+                       //make everyone wait
+                       MPI_Barrier(MPI_COMM_WORLD);
+               #endif
 
                return 0;
        }
@@ -674,6 +851,16 @@ vector<string> ClusterSplitCommand::cluster(vector< map<string, string> > distNa
                        globaldata->setNameFile(thisNamefile);
                        globaldata->setColumnFile(thisDistFile); globaldata->setFormat("column");
                        
+                       #ifdef USE_MPI
+                               int pid;
+                               MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
+                               
+                               //output your files too
+                               if (pid != 0) {
+                                       cout << endl << "Reading " << thisDistFile << endl;
+                               }
+                       #endif
+                       
                        m->mothurOutEndLine(); m->mothurOut("Reading " + thisDistFile); m->mothurOutEndLine();
                        
                        ReadMatrix* read = new ReadColumnMatrix(thisDistFile);  
@@ -692,6 +879,14 @@ vector<string> ClusterSplitCommand::cluster(vector< map<string, string> > distNa
                        delete read; 
                        delete nameMap; 
                        
+                       
+                       #ifdef USE_MPI
+                               //output your files too
+                               if (pid != 0) {
+                                       cout << endl << "Clustering " << thisDistFile << endl;
+                               }
+                       #endif
+                       
                        m->mothurOutEndLine(); m->mothurOut("Clustering " + thisDistFile); m->mothurOutEndLine();
                
                        rabund = new RAbundVector(list->getRAbundVector());
index 7de4d06f25ddbb1414aaea8e7f4b8d7f72d25590..0612c6c51977327cbbd8f100f7c1b13d6b18c7da 100644 (file)
@@ -166,7 +166,6 @@ CommandFactory::CommandFactory(){
        commands["clearcut"]                    = "clearcut";
        commands["catchall"]                    = "catchall";
        commands["split.abund"]                 = "split.abund";
-       commands["cluster.split"]               = "cluster.split";
        commands["classify.otu"]                = "classify.otu";
        commands["degap.seqs"]                  = "degap.seqs";
        commands["classify.seqs"]               = "MPIEnabled"; 
@@ -181,6 +180,7 @@ CommandFactory::CommandFactory(){
        commands["chimera.bellerophon"] = "MPIEnabled";
        commands["screen.seqs"]                 = "MPIEnabled";
        commands["summary.seqs"]                = "MPIEnabled";
+       commands["cluster.split"]               = "MPIEnabled";
        commands["quit"]                                = "MPIEnabled"; 
 
 }
index 2bf0fa344eaeb924525bd6f906d664c8874f3d61..33790fe97632525cbb6c155ab953da531f770078 100644 (file)
@@ -116,7 +116,7 @@ FilterSeqsCommand::FilterSeqsCommand(string option)  {
                        
                        vertical = validParameter.validFile(parameters, "vertical", false);             
                        if (vertical == "not found") { 
-                               if ((hard == "") && (trump == '*')) { vertical = "T"; } //you have not given a hard file or set the trump char.
+                               if ((hard == "") && (trump == '*') && (soft == 0)) { vertical = "T"; } //you have not given a hard file or set the trump char.
                                else { vertical = "F";  }
                        }
                        
index 415d699a6d30f4deb999a5f398ccfe7bcfaca8c4..8fefa148238c932c2181d8004927eb0c331b306f 100644 (file)
--- a/makefile
+++ b/makefile
@@ -33,7 +33,7 @@ ifeq  ($(strip $(USEREADLINE)),yes)
       -L../readline-6.0
 endif
 
-USEMPI ?= no
+USEMPI ?= yes
 
 ifeq  ($(strip $(USEMPI)),yes)
        CC = mpic++