From 725a3d4ff2442c79bfde0a75ed3e0904edcf03b7 Mon Sep 17 00:00:00 2001 From: westcott Date: Thu, 20 May 2010 15:00:33 +0000 Subject: [PATCH] added cluster.split command --- Mothur.xcodeproj/project.pbxproj | 10 +- aligncommand.cpp | 11 +- alignmentdb.cpp | 18 +- bayesian.cpp | 33 +- bellerophon.cpp | 2 +- blastdb.cpp | 49 --- blastdb.hpp | 5 - chimera.cpp | 15 +- chimeraccodecommand.cpp | 12 +- chimeracheckcommand.cpp | 11 +- chimerapintailcommand.cpp | 11 +- chimeraslayercommand.cpp | 11 +- classify.cpp | 34 +- classifyseqscommand.cpp | 11 +- clustersplitcommand.cpp | 632 +++++++++++++++++++++++++++++++ clustersplitcommand.h | 49 +++ commandfactory.cpp | 3 + database.hpp | 5 - distancecommand.cpp | 1 + engine.cpp | 8 +- filterseqscommand.cpp | 70 ++-- hclustercommand.cpp | 2 +- inputdata.cpp | 36 +- inputdata.h | 1 + kmerdb.cpp | 37 -- kmerdb.hpp | 5 - makefile | 20 +- phylotree.cpp | 16 +- readcluster.cpp | 7 +- readcluster.h | 3 +- screenseqscommand.cpp | 13 +- seqsummarycommand.cpp | 96 ++--- splitmatrix.cpp | 256 +++++++++++++ splitmatrix.h | 39 ++ suffixdb.cpp | 32 -- suffixdb.hpp | 5 - validparameter.cpp | 12 +- 37 files changed, 1279 insertions(+), 302 deletions(-) create mode 100644 clustersplitcommand.cpp create mode 100644 clustersplitcommand.h create mode 100644 splitmatrix.cpp create mode 100644 splitmatrix.h diff --git a/Mothur.xcodeproj/project.pbxproj b/Mothur.xcodeproj/project.pbxproj index 2b7d878..d462547 100644 --- a/Mothur.xcodeproj/project.pbxproj +++ b/Mothur.xcodeproj/project.pbxproj @@ -13,6 +13,10 @@ A72B3A63118B37FD004B9F8D /* phylodiversitycommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = phylodiversitycommand.cpp; sourceTree = ""; }; A72B3A7B118B4D1B004B9F8D /* phylodiversity.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = phylodiversity.h; sourceTree = ""; }; A72B3A7C118B4D1B004B9F8D /* phylodiversity.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = phylodiversity.cpp; sourceTree = ""; }; + A730977D11A417BE00117C95 /* splitmatrix.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = splitmatrix.h; sourceTree = SOURCE_ROOT; }; + A730977E11A417BE00117C95 /* splitmatrix.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = splitmatrix.cpp; sourceTree = SOURCE_ROOT; }; + A73097B911A43E1300117C95 /* clustersplitcommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = clustersplitcommand.h; sourceTree = ""; }; + A73097BA11A43E1300117C95 /* clustersplitcommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = clustersplitcommand.cpp; sourceTree = ""; }; A73953DA11987ED100B0B160 /* chopseqscommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = chopseqscommand.h; sourceTree = ""; }; A73953DB11987ED100B0B160 /* chopseqscommand.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = chopseqscommand.cpp; sourceTree = ""; }; A73F163411A1951D0087CA57 /* splitabundcommand.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = splitabundcommand.h; sourceTree = ""; }; @@ -670,8 +674,10 @@ A7DA201D113FECD400BF472F /* classifyseqscommand.cpp */, A7D215C811996C6E00F13F13 /* clearcutcommand.h */, A7D215C911996C6E00F13F13 /* clearcutcommand.cpp */, - A7DA2021113FECD400BF472F /* clustercommand.cpp */, A7DA2022113FECD400BF472F /* clustercommand.h */, + A7DA2021113FECD400BF472F /* clustercommand.cpp */, + A73097B911A43E1300117C95 /* clustersplitcommand.h */, + A73097BA11A43E1300117C95 /* clustersplitcommand.cpp */, A7DA2025113FECD400BF472F /* collectcommand.cpp */, A7DA2026113FECD400BF472F /* collectcommand.h */, A7DA2029113FECD400BF472F /* collectsharedcommand.cpp */, @@ -919,6 +925,8 @@ A7DA20ED113FECD400BF472F /* readphylip.h */, A7DA20EE113FECD400BF472F /* readtree.cpp */, A7DA20EF113FECD400BF472F /* readtree.h */, + A730977D11A417BE00117C95 /* splitmatrix.h */, + A730977E11A417BE00117C95 /* splitmatrix.cpp */, ); name = read; sourceTree = ""; diff --git a/aligncommand.cpp b/aligncommand.cpp index a246ecc..03992dc 100644 --- a/aligncommand.cpp +++ b/aligncommand.cpp @@ -303,8 +303,10 @@ int AlignCommand::execute(){ MPIPos = setFilePosFasta(candidateFileNames[s], numFastaSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numFastaSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; @@ -323,9 +325,10 @@ int AlignCommand::execute(){ if (tempResult != 0) { MPIWroteAccnos = true; } } }else{ //you are a child process - MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numFastaSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numFastaSeqs+1); - MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; diff --git a/alignmentdb.cpp b/alignmentdb.cpp index 364ef12..9b2977d 100644 --- a/alignmentdb.cpp +++ b/alignmentdb.cpp @@ -25,12 +25,14 @@ AlignmentDB::AlignmentDB(string fastaFileName, string s, int kmerSize, float gap m->mothurOut("Reading in the " + fastaFileName + " template sequences...\t"); cout.flush(); #ifdef USE_MPI - int pid; + int pid, processors; vector positions; MPI_Status status; MPI_File inMPI; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); + int tag = 2001; char inFileName[1024]; strcpy(inFileName, fastaFileName.c_str()); @@ -41,12 +43,14 @@ AlignmentDB::AlignmentDB(string fastaFileName, string s, int kmerSize, float gap positions = setFilePosFasta(fastaFileName, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } }else{ - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); positions.resize(numSeqs+1); - MPI_Bcast(&positions[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&positions[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); } //read file @@ -73,7 +77,9 @@ AlignmentDB::AlignmentDB(string fastaFileName, string s, int kmerSize, float gap if (temp.getUnaligned().length() > longest) { longest = temp.getUnaligned().length()+1; } } } - + + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + MPI_File_close(&inMPI); #else diff --git a/bayesian.cpp b/bayesian.cpp index 22eab72..63f8716 100644 --- a/bayesian.cpp +++ b/bayesian.cpp @@ -362,7 +362,7 @@ void Bayesian::readProbFile(ifstream& in, ifstream& inNum, string inName, string #ifdef USE_MPI - int pid, num, num2; + int pid, num, num2, processors; vector positions; vector positions2; @@ -370,6 +370,8 @@ void Bayesian::readProbFile(ifstream& in, ifstream& inNum, string inName, string MPI_File inMPI; MPI_File inMPI2; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); + int tag = 2001; char inFileName[1024]; strcpy(inFileName, inNumName.c_str()); @@ -382,26 +384,24 @@ void Bayesian::readProbFile(ifstream& in, ifstream& inNum, string inName, string if (pid == 0) { positions = setFilePosEachLine(inNumName, num); - - //send file positions to all processes - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos - positions2 = setFilePosEachLine(inName, num2); - //send file positions to all processes - MPI_Bcast(&num2, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions2[0], (num2+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + + MPI_Send(&num2, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions2[0], (num2+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } }else{ - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs - positions.resize(num); - MPI_Bcast(&positions[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + positions.resize(num+1); + MPI_Recv(&positions[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); - MPI_Bcast(&num2, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs - positions2.resize(num2); - MPI_Bcast(&positions2[0], (num2+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions - + MPI_Recv(&num2, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + positions2.resize(num2+1); + MPI_Recv(&positions2[0], (num2+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); } //read numKmers @@ -473,6 +473,7 @@ void Bayesian::readProbFile(ifstream& in, ifstream& inNum, string inName, string } MPI_File_close(&inMPI2); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else in >> numKmers; gobble(in); diff --git a/bellerophon.cpp b/bellerophon.cpp index 54025eb..aa26d29 100644 --- a/bellerophon.cpp +++ b/bellerophon.cpp @@ -310,7 +310,7 @@ int Bellerophon::getChimeras() { MPIBestSend.clear(); } - + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else //divide breakpoints between processors diff --git a/blastdb.cpp b/blastdb.cpp index b52c82e..35a19f4 100644 --- a/blastdb.cpp +++ b/blastdb.cpp @@ -177,55 +177,6 @@ void BlastDB::generateDB() { exit(1); } } -#ifdef USE_MPI -/**************************************************************************************************/ -int BlastDB::MPISend(int receiver) { - try { - - //send gapOpen - float - MPI_Send(&gapOpen, 1, MPI_FLOAT, receiver, 2001, MPI_COMM_WORLD); - - //send gapExtend - float - MPI_Send(&gapExtend, 1, MPI_FLOAT, receiver, 2001, MPI_COMM_WORLD); - - //send match - float - MPI_Send(&match, 1, MPI_FLOAT, receiver, 2001, MPI_COMM_WORLD); - - //send mismatch - float - MPI_Send(&misMatch, 1, MPI_FLOAT, receiver, 2001, MPI_COMM_WORLD); - - return 0; - } - catch(exception& e) { - m->errorOut(e, "BlastDB", "MPISend"); - exit(1); - } -} -/**************************************************************************************************/ -int BlastDB::MPIRecv(int sender) { - try { - MPI_Status status; - - //receive gapOpen - float - MPI_Recv(&gapOpen, 1, MPI_FLOAT, sender, 2001, MPI_COMM_WORLD, &status); - - //receive gapExtend - float - MPI_Recv(&gapExtend, 1, MPI_FLOAT, sender, 2001, MPI_COMM_WORLD, &status); - - //receive match - float - MPI_Recv(&match, 1, MPI_FLOAT, sender, 2001, MPI_COMM_WORLD, &status); - - //receive mismatch - float - MPI_Recv(&misMatch, 1, MPI_FLOAT, sender, 2001, MPI_COMM_WORLD, &status); - - return 0; - } - catch(exception& e) { - m->errorOut(e, "BlastDB", "MPIRecv"); - exit(1); - } -} -#endif /**************************************************************************************************/ /**************************************************************************************************/ diff --git a/blastdb.hpp b/blastdb.hpp index d61aaec..2e7423d 100644 --- a/blastdb.hpp +++ b/blastdb.hpp @@ -26,11 +26,6 @@ public: vector findClosestSequences(Sequence*, int); vector findClosestMegaBlast(Sequence*, int); - #ifdef USE_MPI - int MPISend(int); //just sends gapOpen, gapExtend, match and mismatch - int MPIRecv(int); - #endif - private: string dbFileName; string queryFileName; diff --git a/chimera.cpp b/chimera.cpp index b513075..483e553 100644 --- a/chimera.cpp +++ b/chimera.cpp @@ -101,13 +101,15 @@ vector Chimera::readSeqs(string file) { m->mothurOut("Reading sequences from " + file + "..."); cout.flush(); #ifdef USE_MPI - int pid; + int pid, processors; vector positions; int numSeqs; + int tag = 2001; MPI_Status status; MPI_File inMPI; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); //char* inFileName = new char[file.length()]; //memcpy(inFileName, file.c_str(), file.length()); @@ -122,12 +124,14 @@ vector Chimera::readSeqs(string file) { positions = setFilePosFasta(file, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } }else{ - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); positions.resize(numSeqs+1); - MPI_Bcast(&positions[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&positions[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); } //read file @@ -157,6 +161,7 @@ vector Chimera::readSeqs(string file) { } MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else ifstream in; diff --git a/chimeraccodecommand.cpp b/chimeraccodecommand.cpp index cc046ea..45f7f54 100644 --- a/chimeraccodecommand.cpp +++ b/chimeraccodecommand.cpp @@ -233,8 +233,10 @@ int ChimeraCcodeCommand::execute(){ MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -253,9 +255,9 @@ int ChimeraCcodeCommand::execute(){ if (tempResult != 0) { MPIWroteAccnos = true; } } }else{ //you are a child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -276,6 +278,8 @@ int ChimeraCcodeCommand::execute(){ MPI_File_close(&outMPI); MPI_File_close(&outMPIAccnos); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + //delete accnos file if blank if (pid == 0) { if (!MPIWroteAccnos) { diff --git a/chimeracheckcommand.cpp b/chimeracheckcommand.cpp index 1b25861..66cf12f 100644 --- a/chimeracheckcommand.cpp +++ b/chimeracheckcommand.cpp @@ -191,8 +191,10 @@ int ChimeraCheckCommand::execute(){ MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -211,9 +213,9 @@ int ChimeraCheckCommand::execute(){ MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); } }else{ //you are a child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -235,6 +237,7 @@ int ChimeraCheckCommand::execute(){ //close files MPI_File_close(&inMPI); MPI_File_close(&outMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else //break up file diff --git a/chimerapintailcommand.cpp b/chimerapintailcommand.cpp index adf060f..6cef8bb 100644 --- a/chimerapintailcommand.cpp +++ b/chimerapintailcommand.cpp @@ -249,8 +249,10 @@ int ChimeraPintailCommand::execute(){ MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -269,9 +271,9 @@ int ChimeraPintailCommand::execute(){ if (tempResult != 0) { MPIWroteAccnos = true; } } }else{ //you are a child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -291,6 +293,7 @@ int ChimeraPintailCommand::execute(){ MPI_File_close(&inMPI); MPI_File_close(&outMPI); MPI_File_close(&outMPIAccnos); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case //delete accnos file if blank if (pid == 0) { diff --git a/chimeraslayercommand.cpp b/chimeraslayercommand.cpp index 435cd30..c41e026 100644 --- a/chimeraslayercommand.cpp +++ b/chimeraslayercommand.cpp @@ -261,8 +261,10 @@ int ChimeraSlayerCommand::execute(){ MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -281,9 +283,9 @@ int ChimeraSlayerCommand::execute(){ if (tempResult != 0) { MPIWroteAccnos = true; } } }else{ //you are a child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; @@ -303,6 +305,7 @@ int ChimeraSlayerCommand::execute(){ MPI_File_close(&inMPI); MPI_File_close(&outMPI); MPI_File_close(&outMPIAccnos); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case //delete accnos file if blank if (pid == 0) { diff --git a/classify.cpp b/classify.cpp index 147f499..e07344d 100644 --- a/classify.cpp +++ b/classify.cpp @@ -27,12 +27,14 @@ void Classify::generateDatabaseAndNames(string tfile, string tempFile, string me m->mothurOut("Generating search database... "); cout.flush(); #ifdef USE_MPI - int pid; + int pid, processors; vector positions; + int tag = 2001; MPI_Status status; MPI_File inMPI; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); //char* inFileName = new char[tempFile.length()]; //memcpy(inFileName, tempFile.c_str(), tempFile.length()); @@ -47,12 +49,14 @@ void Classify::generateDatabaseAndNames(string tfile, string tempFile, string me positions = setFilePosFasta(tempFile, numSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } }else{ - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs - positions.resize(numSeqs); - MPI_Bcast(&positions[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + positions.resize(numSeqs+1); + MPI_Recv(&positions[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); } //create database @@ -86,6 +90,7 @@ void Classify::generateDatabaseAndNames(string tfile, string tempFile, string me database->generateDB(); MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else //need to know number of template seqs for suffixdb @@ -171,12 +176,14 @@ int Classify::readTaxonomy(string file) { m->mothurOut("Reading in the " + file + " taxonomy...\t"); cout.flush(); #ifdef USE_MPI - int pid, num; + int pid, num, processors; vector positions; + int tag = 2001; MPI_Status status; MPI_File inMPI; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); //char* inFileName = new char[file.length()]; //memcpy(inFileName, file.c_str(), file.length()); @@ -191,12 +198,14 @@ int Classify::readTaxonomy(string file) { positions = setFilePosEachLine(file, num); //send file positions to all processes - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } }else{ - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs - positions.resize(num); - MPI_Bcast(&positions[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + positions.resize(num+1); + MPI_Recv(&positions[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); } //read file @@ -218,6 +227,7 @@ int Classify::readTaxonomy(string file) { } MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else ifstream inTax; openInputFile(file, inTax); diff --git a/classifyseqscommand.cpp b/classifyseqscommand.cpp index 7ae2ee5..eee3160 100644 --- a/classifyseqscommand.cpp +++ b/classifyseqscommand.cpp @@ -419,8 +419,10 @@ int ClassifySeqsCommand::execute(){ MPIPos = setFilePosFasta(fastaFileNames[s], numFastaSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numFastaSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; @@ -438,9 +440,9 @@ int ClassifySeqsCommand::execute(){ MPI_Recv(&done, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); } }else{ //you are a child process - MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numFastaSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numFastaSeqs+1); - MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; @@ -461,6 +463,7 @@ int ClassifySeqsCommand::execute(){ MPI_File_close(&inMPI); MPI_File_close(&outMPINewTax); MPI_File_close(&outMPITempTax); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) diff --git a/clustersplitcommand.cpp b/clustersplitcommand.cpp new file mode 100644 index 0000000..a1e6db0 --- /dev/null +++ b/clustersplitcommand.cpp @@ -0,0 +1,632 @@ +/* + * clustersplitcommand.cpp + * Mothur + * + * Created by westcott on 5/19/10. + * Copyright 2010 Schloss Lab. All rights reserved. + * + */ + +#include "clustersplitcommand.h" +#include "readcluster.h" +#include "splitmatrix.h" +#include "readphylip.h" +#include "readcolumn.h" +#include "readmatrix.hpp" +#include "inputdata.h" + +//********************************************************************************************************************** +//This function checks to make sure the cluster command has no errors and then clusters based on the method chosen. +ClusterSplitCommand::ClusterSplitCommand(string option) { + try{ + globaldata = GlobalData::getInstance(); + abort = false; + + //allow user to run help + if(option == "help") { help(); abort = true; } + + else { + //valid paramters for this command + string Array[] = {"phylip","column","name","cutoff","precision","method","showabund","timing","hard","processors","splitcutoff","outputdir","inputdir"}; + vector myArray (Array, Array+(sizeof(Array)/sizeof(string))); + + OptionParser parser(option); + map parameters = parser.getParameters(); + + ValidParameters validParameter; + + //check to make sure all parameters are valid for command + map::iterator it; + for (it = parameters.begin(); it != parameters.end(); it++) { + if (validParameter.isValidParameter(it->first, myArray, it->second) != true) { + abort = true; + } + } + + globaldata->newRead(); + + //if the user changes the output directory command factory will send this info to us in the output parameter + outputDir = validParameter.validFile(parameters, "outputdir", false); if (outputDir == "not found"){ outputDir = ""; } + + //if the user changes the input directory command factory will send this info to us in the output parameter + string inputDir = validParameter.validFile(parameters, "inputdir", false); + if (inputDir == "not found"){ inputDir = ""; } + else { + string path; + it = parameters.find("phylip"); + //user has given a template file + if(it != parameters.end()){ + path = hasPath(it->second); + //if the user has not given a path then, add inputdir. else leave path alone. + if (path == "") { parameters["phylip"] = inputDir + it->second; } + } + + it = parameters.find("column"); + //user has given a template file + if(it != parameters.end()){ + path = hasPath(it->second); + //if the user has not given a path then, add inputdir. else leave path alone. + if (path == "") { parameters["column"] = inputDir + it->second; } + } + + it = parameters.find("name"); + //user has given a template file + if(it != parameters.end()){ + path = hasPath(it->second); + //if the user has not given a path then, add inputdir. else leave path alone. + if (path == "") { parameters["name"] = inputDir + it->second; } + } + } + + //check for required parameters + phylipfile = validParameter.validFile(parameters, "phylip", true); + if (phylipfile == "not open") { abort = true; } + else if (phylipfile == "not found") { phylipfile = ""; } + else { distfile = phylipfile; format = "phylip"; } + + columnfile = validParameter.validFile(parameters, "column", true); + if (columnfile == "not open") { abort = true; } + else if (columnfile == "not found") { columnfile = ""; } + else { distfile = columnfile; format = "column"; } + + namefile = validParameter.validFile(parameters, "name", true); + if (namefile == "not open") { abort = true; } + else if (namefile == "not found") { namefile = ""; } + + if ((phylipfile == "") && (columnfile == "")) { m->mothurOut("When executing a hcluster command you must enter a phylip or a column."); m->mothurOutEndLine(); abort = true; } + else if ((phylipfile != "") && (columnfile != "")) { m->mothurOut("When executing a hcluster command you must enter ONLY ONE of the following: phylip or column."); m->mothurOutEndLine(); abort = true; } + + if (columnfile != "") { + if (namefile == "") { cout << "You need to provide a namefile if you are going to use the column format." << endl; abort = true; } + } + + //check for optional parameter and set defaults + // ...at some point should added some additional type checking... + //get user cutoff and precision or use defaults + string temp; + temp = validParameter.validFile(parameters, "precision", false); + if (temp == "not found") { temp = "100"; } + //saves precision legnth for formatting below + length = temp.length(); + convert(temp, precision); + + temp = validParameter.validFile(parameters, "hard", false); if (temp == "not found") { temp = "F"; } + hard = isTrue(temp); + + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; } + convert(temp, processors); + + temp = validParameter.validFile(parameters, "cutoff", false); + if (temp == "not found") { temp = "10"; } + convert(temp, cutoff); + if (!hard) { cutoff += (5 / (precision * 10.0)); } + + temp = validParameter.validFile(parameters, "splitcutoff", false); + if (temp == "not found") { temp = "0.10"; } + convert(temp, splitcutoff); + if (!hard) { splitcutoff += (5 / (precision * 10.0)); } + + method = validParameter.validFile(parameters, "method", false); + if (method == "not found") { method = "furthest"; } + + if ((method == "furthest") || (method == "nearest") || (method == "average")) { } + else { m->mothurOut("Not a valid clustering method. Valid clustering algorithms are furthest, nearest or average."); m->mothurOutEndLine(); abort = true; } + + showabund = validParameter.validFile(parameters, "showabund", false); + if (showabund == "not found") { showabund = "T"; } + + timing = validParameter.validFile(parameters, "timing", false); + if (timing == "not found") { timing = "F"; } + + } + } + catch(exception& e) { + m->errorOut(e, "ClusterSplitCommand", "ClusterSplitCommand"); + exit(1); + } +} + +//********************************************************************************************************************** + +void ClusterSplitCommand::help(){ + try { + m->mothurOut("The cluster command can only be executed after a successful read.dist command.\n"); + m->mothurOut("The cluster command parameter options are method, cuttoff, hard, precision, showabund and timing. No parameters are required.\n"); + m->mothurOut("The cluster command should be in the following format: \n"); + m->mothurOut("cluster(method=yourMethod, cutoff=yourCutoff, precision=yourPrecision) \n"); + m->mothurOut("The acceptable cluster methods are furthest, nearest and average. If no method is provided then furthest is assumed.\n\n"); + } + catch(exception& e) { + m->errorOut(e, "ClusterSplitCommand", "help"); + exit(1); + } +} + +//********************************************************************************************************************** + +ClusterSplitCommand::~ClusterSplitCommand(){} + +//********************************************************************************************************************** + +int ClusterSplitCommand::execute(){ + try { + + if (abort == true) { return 0; } + + //****************** file prep work ******************************// + + //if user gave a phylip file convert to column file + if (format == "phylip") { + ReadCluster* convert = new ReadCluster(distfile, cutoff, outputDir, false); + + NameAssignment* nameMap = NULL; + convert->read(nameMap); + + if (m->control_pressed) { delete convert; return 0; } + + string distfile = convert->getOutputFile(); + + //if no names file given with phylip file, create it + ListVector* listToMakeNameFile = convert->getListVector(); + if (namefile == "") { //you need to make a namefile for split matrix + ofstream out; + namefile = phylipfile + ".names"; + openOutputFile(namefile, out); + for (int i = 0; i < listToMakeNameFile->getNumBins(); i++) { + string bin = listToMakeNameFile->get(i); + out << bin << '\t' << bin << endl; + } + out.close(); + } + delete listToMakeNameFile; + delete convert; + } + + if (m->control_pressed) { return 0; } + + time_t estart = time(NULL); + + //split matrix into non-overlapping groups + SplitMatrix* split = new SplitMatrix(distfile, namefile, splitcutoff); + split->split(); + + if (m->control_pressed) { delete split; return 0; } + + string singletonName = split->getSingletonNames(); + vector< map > distName = split->getDistanceFiles(); //returns map of distance files -> namefile sorted by distance file size + delete split; + + m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to split the distance file."); m->mothurOutEndLine(); + estart = time(NULL); + + if (m->control_pressed) { return 0; } + + //****************** break up files between processes and cluster each file set ******************************// + vector listFileNames; + set labels; + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files + }else{ + vector < vector < map > > dividedNames; //distNames[1] = vector of filenames for process 1... + dividedNames.resize(processors); + + //for each file group figure out which process will complete it + //want to divide the load intelligently so the big files are spread between processes + int count = 1; + for (int i = 0; i < distName.size(); i++) { + int processToAssign = (i+1) % processors; + if (processToAssign == 0) { processToAssign = processors; } + + dividedNames[(processToAssign-1)].push_back(distName[i]); + } + + createProcesses(dividedNames); + + if (m->control_pressed) { return 0; } + + //get list of list file names from each process + for(int i=0;i> tempName; gobble(in); + listFileNames.push_back(tempName); + } + in.close(); + remove((toString(processIDS[i]) + ".temp").c_str()); + + //get labels + filename = toString(processIDS[i]) + ".temp.labels"; + ifstream in2; + openInputFile(filename, in2); + + while(!in2.eof()) { + string tempName; + in2 >> tempName; gobble(in); + if (labels.count(tempName) == 0) { labels.insert(tempName); } + } + in2.close(); + remove((toString(processIDS[i]) + ".temp.labels").c_str()); + } + } + #else + listFileNames = cluster(distName, labels); //clusters individual files and returns names of list files + #endif + + if (m->control_pressed) { for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } return 0; } + + //****************** merge list file and create rabund and sabund files ******************************// + + mergeLists(listFileNames, singletonName, labels); + + if (m->control_pressed) { for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; } + + m->mothurOut("It took " + toString(time(NULL) - estart) + " seconds to cluster"); m->mothurOutEndLine(); + + m->mothurOutEndLine(); + m->mothurOut("Output File Names: "); m->mothurOutEndLine(); + for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } + m->mothurOutEndLine(); + + return 0; + } + catch(exception& e) { + m->errorOut(e, "ClusterSplitCommand", "execute"); + exit(1); + } +} +//********************************************************************************************************************** +int ClusterSplitCommand::mergeLists(vector listNames, string singleton, set userLabels){ + try { + if (outputDir == "") { outputDir += hasPath(distfile); } + fileroot = outputDir + getRootName(getSimpleName(distfile)); + + openOutputFile(fileroot+ tag + ".sabund", outSabund); + openOutputFile(fileroot+ tag + ".rabund", outRabund); + openOutputFile(fileroot+ tag + ".list", outList); + + outputNames.push_back(fileroot+ tag + ".sabund"); + outputNames.push_back(fileroot+ tag + ".rabund"); + outputNames.push_back(fileroot+ tag + ".list"); + + //read in singletons + ListVector* listSingle = NULL; + if (singleton != "none") { + ifstream in; + openInputFile(singleton, in); + + string firstCol, secondCol; + listSingle = new ListVector(); + while (!in.eof()) { + in >> firstCol >> secondCol; gobble(in); + listSingle->push_back(secondCol); + } + in.close(); + } + + vector orderFloat; + + //go through users set and make them floats so we can sort them + for(set::iterator it = userLabels.begin(); it != userLabels.end(); ++it) { + float temp; + + if ((*it != "unique") && (convertTestFloat(*it, temp) == true)){ + convert(*it, temp); + orderFloat.push_back(temp); + }else if (*it == "unique") { orderFloat.push_back(-1.0); } + else { + userLabels.erase(*it); + it--; + } + } + + //sort order + sort(orderFloat.begin(), orderFloat.end()); + + vector inputs; + vector lastLabels; + for (int i = 0; i < listNames.size(); i++) { + InputData* input = new InputData(listNames[i], "list"); + inputs.push_back(input); + + ifstream in; + openInputFile(listNames[i], in); + ListVector tempList(in); + lastLabels.push_back(tempList.getLabel()); + in.close(); + } + + ListVector* merged = NULL; + + //for each label needed + for(int l = 0; l < orderFloat.size(); l++){ + + string thisLabel; + if (orderFloat[l] == -1) { thisLabel = "unique"; } + else { thisLabel = toString(orderFloat[l], length-1); } + + //get the list info from each file + for (int k = 0; k < listNames.size(); k++) { + + ListVector* list = inputs[k]->getListVector(); + + //this file has reached the end + if (list == NULL) { list = inputs[k]->getListVector(lastLabels[k], true); } + + float labelFloat; + if (list->getLabel() == "unique") { labelFloat = -1.0; } + else { convert(list->getLabel(), labelFloat); } + + //check for missing labels + if (labelFloat > orderFloat[l]) { //you are missing the label, get the next smallest one + //if its bigger get last label, otherwise keep it + delete list; + list = inputs[k]->getListVector(lastLabels[k], true); //get last list vector to use, you actually want to move back in the file + } + lastLabels[k] = list->getLabel(); + + //is this the first file + if (merged == NULL) { merged = new ListVector(); merged->setLabel(thisLabel); } + + for (int j = 0; j < list->getNumBins(); j++) { + merged->push_back(list->get(j)); + } + + delete list; + } + + //add in singletons + for (int j = 0; j < listSingle->getNumBins(); j++) { + merged->push_back(listSingle->get(j)); + } + + //print to files + printData(merged); + + delete merged; merged = NULL; + } + + if (listSingle != NULL) { delete listSingle; remove(singleton.c_str()); } + + for (int i = 0; i < listNames.size(); i++) { delete inputs[i]; remove(listNames[i].c_str()); } + } + catch(exception& e) { + m->errorOut(e, "ClusterSplitCommand", "mergeLists"); + exit(1); + } +} +//********************************************************************************************************************** + +void ClusterSplitCommand::printData(ListVector* oldList){ + try { + string label = oldList->getLabel(); + RAbundVector oldRAbund = oldList->getRAbundVector(); + + oldRAbund.setLabel(label); + if (isTrue(showabund)) { + oldRAbund.getSAbundVector().print(cout); + } + oldRAbund.print(outRabund); + oldRAbund.getSAbundVector().print(outSabund); + + oldList->print(outList); + } + catch(exception& e) { + m->errorOut(e, "ClusterSplitCommand", "printData"); + exit(1); + } +} +//********************************************************************************************************************** +int ClusterSplitCommand::createProcesses(vector < vector < map > > dividedNames){ + try { + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 0; + int exitCommand = 1; + processIDS.clear(); + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + set labels; + vector listFileNames = cluster(dividedNames[process], labels); + + //write out names to file + string filename = toString(getpid()) + ".temp"; + ofstream out; + openOutputFile(filename, out); + for (int j = 0; j < listFileNames.size(); j++) { out << listFileNames[j] << endl; } + out.close(); + + //print out labels + ofstream outLabels; + filename = toString(getpid()) + ".temp.labels"; + openOutputFile(filename, outLabels); + + for (set::iterator it = labels.begin(); it != labels.end(); it++) { + outLabels << (*it) << endl; + } + outLabels.close(); + + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } + } + + //force parent to wait until all the processes are done + for (int i=0;ierrorOut(e, "ClusterSplitCommand", "createProcesses"); + exit(1); + } +} +//********************************************************************************************************************** + +vector ClusterSplitCommand::cluster(vector< map > distNames, set& labels){ + try { + Cluster* cluster; + SparseMatrix* matrix; + ListVector* list; + ListVector oldList; + RAbundVector* rabund; + + vector listFileNames; + + //cluster each distance file + for (int i = 0; i < distNames.size(); i++) { + + string thisNamefile = distNames[i].begin()->second; + string thisDistFile = distNames[i].begin()->first; + + //read in distance file + globaldata->setNameFile(thisNamefile); + globaldata->setColumnFile(thisDistFile); globaldata->setFormat("column"); + + ReadMatrix* read = new ReadColumnMatrix(thisDistFile); + read->setCutoff(cutoff); + + NameAssignment* nameMap = new NameAssignment(thisNamefile); + nameMap->readMap(); + read->read(nameMap); + + if (m->control_pressed) { delete read; delete nameMap; return listFileNames; } + + list = read->getListVector(); + oldList = *list; + matrix = read->getMatrix(); + + delete read; + delete nameMap; + + m->mothurOutEndLine(); m->mothurOut("Clustering " + thisDistFile); m->mothurOutEndLine(); + + rabund = new RAbundVector(list->getRAbundVector()); + + //create cluster + if (method == "furthest") { cluster = new CompleteLinkage(rabund, list, matrix, cutoff, method); } + else if(method == "nearest"){ cluster = new SingleLinkage(rabund, list, matrix, cutoff, method); } + else if(method == "average"){ cluster = new AverageLinkage(rabund, list, matrix, cutoff, method); } + tag = cluster->getTag(); + + if (outputDir == "") { outputDir += hasPath(thisDistFile); } + fileroot = outputDir + getRootName(getSimpleName(thisDistFile)); + + ofstream listFile; + openOutputFile(fileroot+ tag + ".list", listFile); + + listFileNames.push_back(fileroot+ tag + ".list"); + + time_t estart = time(NULL); + + float previousDist = 0.00000; + float rndPreviousDist = 0.00000; + + oldList = *list; + + print_start = true; + start = time(NULL); + double saveCutoff = cutoff; + + while (matrix->getSmallDist() < cutoff && matrix->getNNodes() > 0){ + + if (m->control_pressed) { //clean up + delete matrix; delete list; delete cluster; delete rabund; + listFile.close(); + for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } + listFileNames.clear(); return listFileNames; + } + + cluster->update(cutoff); + + float dist = matrix->getSmallDist(); + float rndDist = roundDist(dist, precision); + + if(previousDist <= 0.0000 && dist != previousDist){ + oldList.setLabel("unique"); + oldList.print(listFile); + if (labels.count("unique") == 0) { labels.insert("unique"); } + } + else if(rndDist != rndPreviousDist){ + oldList.setLabel(toString(rndPreviousDist, length-1)); + oldList.print(listFile); + if (labels.count(toString(rndPreviousDist, length-1)) == 0) { labels.insert(toString(rndPreviousDist, length-1)); } + } + + previousDist = dist; + rndPreviousDist = rndDist; + oldList = *list; + } + + + if(previousDist <= 0.0000){ + oldList.setLabel("unique"); + oldList.print(listFile); + if (labels.count("unique") == 0) { labels.insert("unique"); } + } + else if(rndPreviousDistcontrol_pressed) { //clean up + for (int i = 0; i < listFileNames.size(); i++) { remove(listFileNames[i].c_str()); } + listFileNames.clear(); return listFileNames; + } + + remove(thisDistFile.c_str()); + remove(thisNamefile.c_str()); + } + + + return listFileNames; + + } + catch(exception& e) { + m->errorOut(e, "ClusterSplitCommand", "cluster"); + exit(1); + } + + +} + +//********************************************************************************************************************** diff --git a/clustersplitcommand.h b/clustersplitcommand.h new file mode 100644 index 0000000..4d1f435 --- /dev/null +++ b/clustersplitcommand.h @@ -0,0 +1,49 @@ +#ifndef CLUSTERSPLITCOMMAND_H +#define CLUSTERSPLITCOMMAND_H + +/* + * clustersplitcommand.h + * Mothur + * + * Created by westcott on 5/19/10. + * Copyright 2010 Schloss Lab. All rights reserved. + * + */ + +#include "command.hpp" +#include "rabundvector.hpp" +#include "sabundvector.hpp" +#include "listvector.hpp" +#include "cluster.hpp" +#include "sparsematrix.hpp" +#include "globaldata.hpp" + + +class ClusterSplitCommand : public Command { + +public: + ClusterSplitCommand(string); + ~ClusterSplitCommand(); + int execute(); + void help(); + +private: + GlobalData* globaldata; + vector processIDS; //processid + vector outputNames; + + string method, fileroot, tag, outputDir, phylipfile, columnfile, namefile, distfile, format, showabund, timing; + double cutoff, splitcutoff; + int precision, length, processors; + bool print_start, abort, hard; + time_t start; + ofstream outList, outRabund, outSabund; + + void printData(ListVector*); + int createProcesses(vector < vector < map > >); + vector cluster(vector< map >, set&); + int mergeLists(vector, string, set); +}; + +#endif + diff --git a/commandfactory.cpp b/commandfactory.cpp index f358797..0bca8c0 100644 --- a/commandfactory.cpp +++ b/commandfactory.cpp @@ -78,6 +78,7 @@ #include "clearcutcommand.h" #include "catchallcommand.h" #include "splitabundcommand.h" +#include "clustersplitcommand.h" /*******************************************************/ @@ -163,6 +164,7 @@ CommandFactory::CommandFactory(){ commands["clearcut"] = "clearcut"; commands["catchall"] = "catchall"; commands["split.abund"] = "split.abund"; + commands["cluster.split"] = "cluster.split"; commands["classify.seqs"] = "MPIEnabled"; commands["dist.seqs"] = "MPIEnabled"; commands["filter.seqs"] = "MPIEnabled"; @@ -285,6 +287,7 @@ Command* CommandFactory::getCommand(string commandName, string optionString){ else if(commandName == "clearcut") { command = new ClearcutCommand(optionString); } else if(commandName == "catchall") { command = new CatchAllCommand(optionString); } else if(commandName == "split.abund") { command = new SplitAbundCommand(optionString); } + else if(commandName == "cluster.split") { command = new ClusterSplitCommand(optionString); } else { command = new NoCommand(optionString); } return command; diff --git a/database.hpp b/database.hpp index efc7ba7..3191fdf 100644 --- a/database.hpp +++ b/database.hpp @@ -56,11 +56,6 @@ public: virtual vector getSequencesWithKmer(int){ vector filler; return filler; }; virtual int getMaxKmer(){ return 1; }; - #ifdef USE_MPI - virtual int MPISend(int) = 0; - virtual int MPIRecv(int) = 0; - #endif - protected: MothurOut* m; int numSeqs, longest; diff --git a/distancecommand.cpp b/distancecommand.cpp index ec32ffe..503f64b 100644 --- a/distancecommand.cpp +++ b/distancecommand.cpp @@ -300,6 +300,7 @@ int DistanceCommand::execute(){ MPI_Send(&size, 1, MPI_LONG, 0, tag, MPI_COMM_WORLD); } } + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) diff --git a/engine.cpp b/engine.cpp index 9e5801b..90103ea 100644 --- a/engine.cpp +++ b/engine.cpp @@ -73,6 +73,8 @@ bool InteractEngine::getInput(){ int pid; MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + if ((cFactory->MPIEnabled(commandName)) || (pid == 0)) { #endif //executes valid command @@ -200,7 +202,9 @@ bool BatchEngine::getInput(){ #ifdef USE_MPI int pid; MPI_Comm_rank(MPI_COMM_WORLD, &pid); - +cout << pid << " is waiting " << commandName << endl; + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case +cout << pid << " is here " << commandName << endl; if ((cFactory->MPIEnabled(commandName)) || (pid == 0)) { #endif //executes valid command @@ -292,6 +296,8 @@ bool ScriptEngine::getInput(){ int pid; MPI_Comm_rank(MPI_COMM_WORLD, &pid); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + if ((cFactory->MPIEnabled(commandName)) || (pid == 0)) { #endif //executes valid command diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index 41c3219..1c70b3e 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -282,8 +282,10 @@ int FilterSeqsCommand::filterSequences() { numSeqs += num; //send file positions to all processes - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to do numSeqsPerProcessor = num / processors; @@ -303,10 +305,10 @@ int FilterSeqsCommand::filterSequences() { } }else { //you are a child process - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs - numSeqs += num; + MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(num+1); - MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + numSeqs += num; + MPI_Recv(&MPIPos[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = num / processors; @@ -328,6 +330,7 @@ int FilterSeqsCommand::filterSequences() { MPI_File_close(&outMPI); MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) @@ -543,7 +546,7 @@ string FilterSeqsCommand::createFilter() { F.setLength(alignmentLength); - if(soft != 0 || isTrue(vertical)){ + if(trump != '*' || isTrue(vertical) || soft != 0){ F.initialize(); } @@ -581,8 +584,10 @@ string FilterSeqsCommand::createFilter() { numSeqs += num; //send file positions to all processes - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to do numSeqsPerProcessor = num / processors; @@ -596,11 +601,10 @@ string FilterSeqsCommand::createFilter() { if (m->control_pressed) { MPI_File_close(&inMPI); return 0; } }else { //i am the child process - - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(num+1); numSeqs += num; - MPI_Bcast(&MPIPos[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&MPIPos[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); //figure out how many sequences you have to align numSeqsPerProcessor = num / processors; @@ -615,6 +619,7 @@ string FilterSeqsCommand::createFilter() { } MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) @@ -668,20 +673,24 @@ string FilterSeqsCommand::createFilter() { vector temp; temp.resize(alignmentLength+1); //get the frequencies from the child processes - for(int i = 0; i < ((processors-1)*5); i++) { - MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, 2001, MPI_COMM_WORLD, &status); - int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for - - if (receiveTag == Atag) { //you are recieveing the A frequencies - for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; } - }else if (receiveTag == Ttag) { //you are recieveing the T frequencies - for (int k = 0; k < alignmentLength; k++) { F.t[k] += temp[k]; } - }else if (receiveTag == Ctag) { //you are recieveing the C frequencies - for (int k = 0; k < alignmentLength; k++) { F.c[k] += temp[k]; } - }else if (receiveTag == Gtag) { //you are recieveing the G frequencies - for (int k = 0; k < alignmentLength; k++) { F.g[k] += temp[k]; } - }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies - for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; } + for(int i = 1; i < processors; i++) { + + for (int j = 0; j < 5; j++) { + + MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for + + if (receiveTag == Atag) { //you are recieveing the A frequencies + for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; } + }else if (receiveTag == Ttag) { //you are recieveing the T frequencies + for (int k = 0; k < alignmentLength; k++) { F.t[k] += temp[k]; } + }else if (receiveTag == Ctag) { //you are recieveing the C frequencies + for (int k = 0; k < alignmentLength; k++) { F.c[k] += temp[k]; } + }else if (receiveTag == Gtag) { //you are recieveing the G frequencies + for (int k = 0; k < alignmentLength; k++) { F.g[k] += temp[k]; } + }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies + for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; } + } } } }else{ @@ -701,6 +710,8 @@ string FilterSeqsCommand::createFilter() { } + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + if (pid == 0) { //only one process should output the filter #endif F.setNumSeqs(numSeqs); @@ -712,10 +723,14 @@ string FilterSeqsCommand::createFilter() { #ifdef USE_MPI //send filter string to kids - MPI_Bcast(&filterString[0], alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD); + //for(int i = 1; i < processors; i++) { + // MPI_Send(&filterString[0], alignmentLength, MPI_CHAR, i, 2001, MPI_COMM_WORLD); + //} + MPI_Bcast(&filterString[0], alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD); }else{ //recieve filterString char* tempBuf = new char[alignmentLength]; + //MPI_Recv(&tempBuf[0], alignmentLength, MPI_CHAR, 0, 2001, MPI_COMM_WORLD, &status); MPI_Bcast(tempBuf, alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD); filterString = tempBuf; @@ -725,8 +740,7 @@ string FilterSeqsCommand::createFilter() { MPI_Barrier(MPI_COMM_WORLD); #endif - - + return filterString; } catch(exception& e) { diff --git a/hclustercommand.cpp b/hclustercommand.cpp index 89c6dbd..dc14e16 100644 --- a/hclustercommand.cpp +++ b/hclustercommand.cpp @@ -192,7 +192,7 @@ int HClusterCommand::execute(){ time_t estart = time(NULL); if (!sorted) { - read = new ReadCluster(distfile, cutoff, outputDir); + read = new ReadCluster(distfile, cutoff, outputDir, true); read->setFormat(format); read->read(globaldata->nameMap); diff --git a/inputdata.cpp b/inputdata.cpp index 8e54ab7..65685d6 100644 --- a/inputdata.cpp +++ b/inputdata.cpp @@ -60,7 +60,7 @@ InputData::InputData(string fName, string orderFileName, string f) : format(f){ ListVector* InputData::getListVector(){ try { - if(fileHandle){ + if(!fileHandle.eof()){ if(format == "list") { list = new ListVector(fileHandle); }else{ list = NULL; } @@ -79,7 +79,6 @@ ListVector* InputData::getListVector(){ } /***********************************************************************/ - ListVector* InputData::getListVector(string label){ try { ifstream in; @@ -115,7 +114,40 @@ ListVector* InputData::getListVector(string label){ exit(1); } } +/***********************************************************************/ +ListVector* InputData::getListVector(string label, bool resetFP){ + try { + string thisLabel; + fileHandle.clear(); + fileHandle.seekg(0); + + if(fileHandle){ + if (format == "list") { + + while (fileHandle.eof() != true) { + + list = new ListVector(fileHandle); gobble(fileHandle); + thisLabel = list->getLabel(); + + //if you are at the last label + if (thisLabel == label) { break; } + //so you don't loose this memory + else { delete list; } + } + }else{ list = NULL; } + + return list; + } + else{ + return NULL; + } + } + catch(exception& e) { + m->errorOut(e, "InputData", "getListVector"); + exit(1); + } +} /***********************************************************************/ diff --git a/inputdata.h b/inputdata.h index 251df5b..c91720b 100644 --- a/inputdata.h +++ b/inputdata.h @@ -16,6 +16,7 @@ public: ~InputData(); ListVector* getListVector(); ListVector* getListVector(string); //pass the label you want + ListVector* getListVector(string, bool); //pass the label you want, reset filepointer SharedListVector* getSharedListVector(); SharedListVector* getSharedListVector(string); //pass the label you want OrderVector* getOrderVector(); diff --git a/kmerdb.cpp b/kmerdb.cpp index 77d93da..33761a8 100644 --- a/kmerdb.cpp +++ b/kmerdb.cpp @@ -206,43 +206,6 @@ vector KmerDB::getSequencesWithKmer(int kmer) { exit(1); } } -#ifdef USE_MPI -/**************************************************************************************************/ -int KmerDB::MPISend(int receiver) { - try { - - //send kmerSize - int - MPI_Send(&kmerSize, 1, MPI_INT, receiver, 2001, MPI_COMM_WORLD); - - return 0; - } - catch(exception& e) { - m->errorOut(e, "KmerDB", "MPISend"); - exit(1); - } -} -/**************************************************************************************************/ -int KmerDB::MPIRecv(int sender) { - try { - MPI_Status status; - - //receive kmerSize - int - MPI_Recv(&kmerSize, 1, MPI_INT, sender, 2001, MPI_COMM_WORLD, &status); - - //set maxKmer - int power4s[14] = { 1, 4, 16, 64, 256, 1024, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864 }; - count = 0; - maxKmer = power4s[kmerSize]; - kmerLocations.resize(maxKmer+1); - - return 0; - } - catch(exception& e) { - m->errorOut(e, "KmerDB", "MPIRecv"); - exit(1); - } -} -#endif /**************************************************************************************************/ diff --git a/kmerdb.hpp b/kmerdb.hpp index 513f3f0..becdeda 100644 --- a/kmerdb.hpp +++ b/kmerdb.hpp @@ -37,11 +37,6 @@ public: vector getSequencesWithKmer(int); //returns vector of sequences that contain kmer passed in int getMaxKmer() { return maxKmer; } - #ifdef USE_MPI - int MPISend(int); //just sends kmersize - int MPIRecv(int); - #endif - private: int kmerSize; diff --git a/makefile b/makefile index 6b10078..f4af8e9 100644 --- a/makefile +++ b/makefile @@ -26,10 +26,10 @@ ifeq ($(strip $(USEREADLINE)),yes) -L../readline-6.0 endif -USEMPI ?= no - +USEMPI ?= yes ifeq ($(strip $(USEMPI)),yes) + CC = mpic++ CC_OPTIONS += -DUSE_MPI endif @@ -151,6 +151,8 @@ mothur : \ ./clearcutcommand.o\ ./catchallcommand.o\ ./splitabundcommand.o\ + ./splitmatrix.o\ + ./clustersplitcommand.o\ ./inputdata.o\ ./jackknife.o\ ./kmer.o\ @@ -357,6 +359,8 @@ mothur : \ ./clearcutcommand.o\ ./catchallcommand.o\ ./splitabundcommand.o\ + ./splitmatrix.o\ + ./clustersplitcommand.o\ ./inputdata.o\ ./jackknife.o\ ./kmer.o\ @@ -566,6 +570,8 @@ clean : ./clearcutcommand.o\ ./catchallcommand.o\ ./splitabundcommand.o\ + ./splitmatrix.o\ + ./clustersplitcommand.o\ ./inputdata.o\ ./jackknife.o\ ./kmer.o\ @@ -1680,8 +1686,16 @@ install : mothur ./catchallcommand.o : catchallcommand.cpp $(CC) $(CC_OPTIONS) catchallcommand.cpp -c $(INCLUDE) -o ./catchallcommand.o -# Item # 205 -- catchallcommand -- +# Item # 205 -- splitabundcommand -- ./splitabundcommand : splitabundcommand $(CC) $(CC_OPTIONS) splitabundcommand -c $(INCLUDE) -o ./splitabundcommand + +# Item # 206 -- splitmatrix -- +./splitmatrix : splitmatrix + $(CC) $(CC_OPTIONS) splitmatrix -c $(INCLUDE) -o ./splitmatrix + +# Item # 207 -- splitmatrix -- +./clustersplitcommand : clustersplitcommand + $(CC) $(CC_OPTIONS) clustersplitcommand -c $(INCLUDE) -o ./clustersplitcommand ##### END RUN #### diff --git a/phylotree.cpp b/phylotree.cpp index 399a4bd..bfac59c 100644 --- a/phylotree.cpp +++ b/phylotree.cpp @@ -122,12 +122,13 @@ PhyloTree::PhyloTree(string tfile){ #ifdef USE_MPI - int pid, num; + int pid, num, processors; vector positions; MPI_Status status; MPI_File inMPI; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); char inFileName[1024]; strcpy(inFileName, tfile.c_str()); @@ -138,12 +139,14 @@ PhyloTree::PhyloTree(string tfile){ positions = setFilePosEachLine(tfile, num); //send file positions to all processes - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&positions[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&num, 1, MPI_INT, i, 2001, MPI_COMM_WORLD); + MPI_Send(&positions[0], (num+1), MPI_LONG, i, 2001, MPI_COMM_WORLD); + } }else{ - MPI_Bcast(&num, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs - positions.resize(num); - MPI_Bcast(&positions[0], (num+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + MPI_Recv(&num, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status); + positions.resize(num+1); + MPI_Recv(&positions[0], (num+1), MPI_LONG, 0, 2001, MPI_COMM_WORLD, &status); } //read file @@ -164,6 +167,7 @@ PhyloTree::PhyloTree(string tfile){ } MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else ifstream in; diff --git a/readcluster.cpp b/readcluster.cpp index c8a8bc7..0893a14 100644 --- a/readcluster.cpp +++ b/readcluster.cpp @@ -11,12 +11,14 @@ /***********************************************************************/ -ReadCluster::ReadCluster(string distfile, float c, string o){ +ReadCluster::ReadCluster(string distfile, float c, string o, bool s){ globaldata = GlobalData::getInstance(); m = MothurOut::getInstance(); distFile = distfile; cutoff = c; outputDir = o; + sortWanted = s; + list = NULL; } /***********************************************************************/ @@ -29,7 +31,8 @@ int ReadCluster::read(NameAssignment* nameMap){ if (m->control_pressed) { return 0; } - OutPutFile = sortFile(distFile, outputDir); + if (sortWanted) { OutPutFile = sortFile(distFile, outputDir); } + else { OutPutFile = distFile; } //for use by clusters splitMatrix to convert a phylip matrix to column return 0; diff --git a/readcluster.h b/readcluster.h index fae7c26..9ca2a1b 100644 --- a/readcluster.h +++ b/readcluster.h @@ -21,7 +21,7 @@ class ReadCluster { public: - ReadCluster(string, float, string); + ReadCluster(string, float, string, bool); ~ReadCluster(); int read(NameAssignment*); string getOutputFile() { return OutPutFile; } @@ -35,6 +35,7 @@ private: ListVector* list; float cutoff; MothurOut* m; + bool sortWanted; int convertPhylip2Column(NameAssignment*); }; diff --git a/screenseqscommand.cpp b/screenseqscommand.cpp index f67dba7..878a06b 100644 --- a/screenseqscommand.cpp +++ b/screenseqscommand.cpp @@ -217,8 +217,10 @@ int ScreenSeqsCommand::execute(){ MPIPos = setFilePosFasta(fastafile, numFastaSeqs); //fills MPIPos, returns numSeqs //send file positions to all processes - MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + for(int i = 1; i < processors; i++) { + MPI_Send(&numFastaSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; @@ -249,10 +251,10 @@ int ScreenSeqsCommand::execute(){ }*/ } }else{ //you are a child process - MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numFastaSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numFastaSeqs+1); - MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions - + MPI_Recv(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; int startIndex = pid * numSeqsPerProcessor; @@ -285,6 +287,7 @@ int ScreenSeqsCommand::execute(){ MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBad); MPI_File_close(&outMPIBadAccnos); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else diff --git a/seqsummarycommand.cpp b/seqsummarycommand.cpp index f551edd..f225d50 100644 --- a/seqsummarycommand.cpp +++ b/seqsummarycommand.cpp @@ -144,11 +144,12 @@ int SeqSummaryCommand::execute(){ delete buf2; MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs + + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&MPIPos[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } - //send file positions to all processes - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos - //figure out how many sequences you have to do numSeqsPerProcessor = numSeqs / processors; int startIndex = pid * numSeqsPerProcessor; @@ -156,54 +157,55 @@ int SeqSummaryCommand::execute(){ //do your part MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); - - if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - - //get the info from the child processes - for(int i = 1; i < processors; i++) { - - int size; - MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); - - vector temp; temp.resize(size+1); - - for(int j = 0; j < 5; j++) { - - MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); - int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for - - if (receiveTag == startTag) { - for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } - }else if (receiveTag == endTag) { - for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } - }else if (receiveTag == lengthTag) { - for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } - }else if (receiveTag == baseTag) { - for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } - }else if (receiveTag == lhomoTag) { - for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } - } - } - } - - }else { //i am the child process - MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); MPIPos.resize(numSeqs+1); - MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions - + MPI_Recv(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; int startIndex = pid * numSeqsPerProcessor; if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } - + //do your part MPICreateSummary(startIndex, numSeqsPerProcessor, startPosition, endPosition, seqLength, ambigBases, longHomoPolymer, inMPI, outMPI, MPIPos); - - if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; } - + } + + MPI_File_close(&inMPI); + MPI_File_close(&outMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case + + if (pid == 0) { + //get the info from the child processes + for(int i = 1; i < processors; i++) { + int size; + MPI_Recv(&size, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + + vector temp; temp.resize(size+1); + + for(int j = 0; j < 5; j++) { + + MPI_Recv(&temp[0], (size+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); + int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what count this is for + + if (receiveTag == startTag) { + for (int k = 0; k < size; k++) { startPosition.push_back(temp[k]); } + }else if (receiveTag == endTag) { + for (int k = 0; k < size; k++) { endPosition.push_back(temp[k]); } + }else if (receiveTag == lengthTag) { + for (int k = 0; k < size; k++) { seqLength.push_back(temp[k]); } + }else if (receiveTag == baseTag) { + for (int k = 0; k < size; k++) { ambigBases.push_back(temp[k]); } + }else if (receiveTag == lhomoTag) { + for (int k = 0; k < size; k++) { longHomoPolymer.push_back(temp[k]); } + } + } + } + + }else{ + //send my counts int size = startPosition.size(); MPI_Send(&size, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); @@ -218,12 +220,9 @@ int SeqSummaryCommand::execute(){ ierr = MPI_Send(&(ambigBases[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); longHomoPolymer.push_back(lhomoTag); ierr = MPI_Send(&(longHomoPolymer[0]), (size+1), MPI_INT, 0, 2001, MPI_COMM_WORLD); - } - MPI_File_close(&inMPI); - MPI_File_close(&outMPI); - + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) if(processors == 1){ @@ -363,7 +362,10 @@ int SeqSummaryCommand::driverCreateSummary(vector& startPosition, vector& startPosition, vector& endPosition, vector& seqLength, vector& ambigBases, vector& longHomoPolymer, MPI_File& inMPI, MPI_File& outMPI, vector& MPIPos) { try { + int pid; MPI_Status status; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + for(int i=0;i& startPo MPI_File_write_shared(outMPI, buf3, length, MPI_CHAR, &status); delete buf3; - } + } } return 0; diff --git a/splitmatrix.cpp b/splitmatrix.cpp new file mode 100644 index 0000000..c52c287 --- /dev/null +++ b/splitmatrix.cpp @@ -0,0 +1,256 @@ +/* + * splitmatrix.cpp + * Mothur + * + * Created by westcott on 5/19/10. + * Copyright 2010 Schloss Lab. All rights reserved. + * + */ + +#include "splitmatrix.h" + +/***********************************************************************/ + +SplitMatrix::SplitMatrix(string distfile, string name, float c){ + m = MothurOut::getInstance(); + distFile = distfile; + cutoff = c; + namefile = name; +} + +/***********************************************************************/ + +int SplitMatrix::split(){ + try { + + vector > groups; + int numGroups = 0; + + ofstream outFile; + ifstream dFile; + openInputFile(distFile, dFile); + + while(dFile){ + string seqA, seqB; + float dist; + + dFile >> seqA >> seqB >> dist; + + if(dist < cutoff){ + //cout << "in cutoff: " << dist << endl; + int groupIDA = -1; + int groupIDB = -1; + int groupID = -1; + int prevGroupID = -1; + + for(int i=0;i::iterator aIt = groups[i].find(seqA); + set::iterator bIt = groups[i].find(seqB); + + if(groupIDA == -1 && aIt != groups[i].end()){//seqA is not already assigned to a group and is in group[i], so assign seqB to group[i] + groups[i].insert(seqB); + groupIDA = i; + groupID = groupIDA; + + //cout << "in aIt: " << groupID << endl; + // break; + } + else if(groupIDB == -1 && bIt != groups[i].end()){//seqB is not already assigned to a group and is in group[i], so assign seqA to group[i] + groups[i].insert(seqA); + groupIDB = i; + groupID = groupIDB; + + // cout << "in bIt: " << groupID << endl; + // break; + } + + if(groupIDA != -1 && groupIDB != -1){//both ifs above have been executed, so we need to decide who to assign them to + if(groupIDA < groupIDB){ + // cout << "A: " << groupIDA << "\t" << groupIDB << endl; + groups[groupIDA].insert(groups[groupIDB].begin(), groups[groupIDB].end()); //merge two groups into groupIDA + groups[groupIDB].clear(); + groupID = groupIDA; + } + else{ + // cout << "B: " << groupIDA << "\t" << groupIDB << endl; + groups[groupIDB].insert(groups[groupIDA].begin(), groups[groupIDA].end()); //merge two groups into groupIDB + groups[groupIDA].clear(); + groupID = groupIDB; + } + break; + } + } + + //windows is gonna gag on the reuse of outFile, will need to make it local... + + if(groupIDA == -1 && groupIDB == -1){ //we need a new group + set newGroup; + newGroup.insert(seqA); + newGroup.insert(seqB); + groups.push_back(newGroup); + + outFile.close(); + string fileName = distFile + "." + toString(numGroups) + ".temp"; + outFile.open(fileName.c_str(), ios::ate); + + outFile << seqA << '\t' << seqB << '\t' << dist << endl; + numGroups++; + } + else{ + string fileName = distFile + "." + toString(groupID) + ".temp"; + if(groupID != prevGroupID){ + outFile.close(); + outFile.open(fileName.c_str(), ios::app); + prevGroupID = groupID; + } + outFile << seqA << '\t' << seqB << '\t' << dist << endl; + + if(groupIDA != -1 && groupIDB != -1){ //merge distance files of two groups you merged above + string row, column, distance; + if(groupIDA> row >> column >> distance; + outFile << row << '\t' << column << '\t' << distance << endl; + gobble(fileB); + } + fileB.close(); + remove(fileName.c_str()); + } + else{ + string fileName = distFile + "." + toString(groupIDA) + ".temp"; + ifstream fileA(fileName.c_str()); + while(fileA){ + fileA >> row >> column >> distance; + outFile << row << '\t' << column << '\t' << distance << endl; + gobble(fileA); + } + fileA.close(); + remove(fileName.c_str()); + } + } + } + } + gobble(dFile); + } + outFile.close(); + dFile.close(); + + ifstream bigNameFile(namefile.c_str()); + if(!bigNameFile){ + cerr << "Error: We can't open the name file\n"; + exit(1); + } + + map nameMap; + string name, nameList; + while(bigNameFile){ + bigNameFile >> name >> nameList; + nameMap[name] = nameList; + gobble(bigNameFile); + } + bigNameFile.close(); + + for(int i=0;i 0){ + string fileName = namefile + "." + toString(i) + ".temp"; + ofstream smallNameFile(fileName.c_str(), ios::ate); + + for(set::iterator gIt=groups[i].begin();gIt!=groups[i].end();gIt++){ + map::iterator nIt = nameMap.find(*gIt); + + if (nIt != nameMap.end()) { + smallNameFile << nIt->first << '\t' << nIt->second << endl; + nameMap.erase(nIt); + }else{ + m->mothurOut((*gIt) + " is in your distance file and not in your namefile. Please correct."); m->mothurOutEndLine(); exit(1); + } + } + smallNameFile.close(); + } + } + + //names of singletons + if (nameMap.size() != 0) { + singleton = namefile + ".extra.temp"; + ofstream remainingNames(singleton.c_str(), ios::ate); + for(map::iterator nIt=nameMap.begin();nIt!=nameMap.end();nIt++){ + remainingNames << nIt->first << '\t' << nIt->second << endl; + } + remainingNames.close(); + }else { singleton = "none"; } + + for(int i=0;i 0){ + string tempNameFile = namefile + "." + toString(i) + ".temp"; + string tempDistFile = distFile + "." + toString(i) + ".temp"; + + map temp; + temp[tempDistFile] = tempNameFile; + dists.push_back(temp); + } + } + + return 0; + + } + catch(exception& e) { + m->errorOut(e, "SplitMatrix", "split"); + exit(1); + } +} +//******************************************************************************************************************** +//sorts biggest to smallest +inline bool compareFileSizes(map left, map right){ + + FILE * pFile; + long leftsize = 0; + + //get num bytes in file + string filename = left.begin()->first; + pFile = fopen (filename.c_str(),"rb"); + string error = "Error opening " + filename; + if (pFile==NULL) perror (error.c_str()); + else{ + fseek (pFile, 0, SEEK_END); + leftsize=ftell (pFile); + fclose (pFile); + } + + FILE * pFile2; + long rightsize = 0; + + //get num bytes in file + filename = right.begin()->first; + pFile2 = fopen (filename.c_str(),"rb"); + error = "Error opening " + filename; + if (pFile2==NULL) perror (error.c_str()); + else{ + fseek (pFile2, 0, SEEK_END); + rightsize=ftell (pFile2); + fclose (pFile2); + } + + return (leftsize > rightsize); +} +/***********************************************************************/ +//returns map of distance files -> namefile sorted by distance file size +vector< map< string, string> > SplitMatrix::getDistanceFiles(){ + try { + + sort(dists.begin(), dists.end(), compareFileSizes); + + return dists; + } + catch(exception& e) { + m->errorOut(e, "SplitMatrix", "getDistanceFiles"); + exit(1); + } +} +/***********************************************************************/ +SplitMatrix::~SplitMatrix(){} +/***********************************************************************/ + diff --git a/splitmatrix.h b/splitmatrix.h new file mode 100644 index 0000000..5974ff1 --- /dev/null +++ b/splitmatrix.h @@ -0,0 +1,39 @@ +#ifndef SPLITMATRIX_H +#define SPLITMATRIX_H +/* + * splitmatrix.h + * Mothur + * + * Created by westcott on 5/19/10. + * Copyright 2010 Schloss Lab. All rights reserved. + * + */ + + +#include "mothur.h" +#include "mothurout.h" + +/******************************************************/ + +class SplitMatrix { + + public: + + SplitMatrix(string, string, float); //column formatted distance file, namesfile, cutoff + ~SplitMatrix(); + int split(); + vector< map > getDistanceFiles(); //returns map of distance files -> namefile sorted by distance file size + string getSingletonNames() { return singleton; } //returns namesfile containing singletons + + private: + string distFile, namefile, singleton; + vector< map< string, string> > dists; + float cutoff; + + MothurOut* m; +}; + +/******************************************************/ + +#endif + diff --git a/suffixdb.cpp b/suffixdb.cpp index b3496f6..4fc342d 100644 --- a/suffixdb.cpp +++ b/suffixdb.cpp @@ -82,36 +82,4 @@ void SuffixDB::addSequence(Sequence seq) { SuffixDB::~SuffixDB(){ for (int i = (suffixForest.size()-1); i >= 0; i--) { suffixForest.pop_back(); } } -#ifdef USE_MPI -/**************************************************************************************************/ -int SuffixDB::MPISend(int receiver) { - try { - - //send numSeqs - int - MPI_Send(&numSeqs, 1, MPI_INT, receiver, 2001, MPI_COMM_WORLD); - - return 0; - } - catch(exception& e) { - m->errorOut(e, "SuffixDB", "MPISend"); - exit(1); - } -} -/**************************************************************************************************/ -int SuffixDB::MPIRecv(int sender) { - try { - MPI_Status status; - //receive numSeqs - int - MPI_Recv(&numSeqs, 1, MPI_INT, sender, 2001, MPI_COMM_WORLD, &status); - - suffixForest.resize(numSeqs); - - return 0; - } - catch(exception& e) { - m->errorOut(e, "SuffixDB", "MPIRecv"); - exit(1); - } -} -#endif /**************************************************************************************************/ diff --git a/suffixdb.hpp b/suffixdb.hpp index 4dc7e0f..393b4a5 100644 --- a/suffixdb.hpp +++ b/suffixdb.hpp @@ -33,11 +33,6 @@ public: void generateDB() {}; //adding sequences generates the db void addSequence(Sequence); vector findClosestSequences(Sequence*, int); - - #ifdef USE_MPI - int MPISend(int); //just sends numSeqs - int MPIRecv(int); - #endif private: vector suffixForest; diff --git a/validparameter.cpp b/validparameter.cpp index 4ea677f..163423b 100644 --- a/validparameter.cpp +++ b/validparameter.cpp @@ -209,8 +209,10 @@ string ValidParameters::validFile(map container, string paramete if(isFile == true) { #ifdef USE_MPI - int pid; + int pid, processors; + MPI_Status status; MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); if (pid == 0) { #endif @@ -219,10 +221,14 @@ string ValidParameters::validFile(map container, string paramete in.close(); #ifdef USE_MPI - MPI_Bcast(&ableToOpen, 1, MPI_INT, 0, MPI_COMM_WORLD); //send ableToOPen + for(int i = 1; i < processors; i++) { + MPI_Send(&ableToOpen, 1, MPI_INT, i, 2001, MPI_COMM_WORLD); + } }else { - MPI_Bcast(&ableToOpen, 1, MPI_INT, 0, MPI_COMM_WORLD); //get ableToOPen + MPI_Recv(&ableToOpen, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status); } + + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #endif if (ableToOpen == 1) { return "not open"; } -- 2.39.2