From: westcott <westcott>
Date: Wed, 17 Mar 2010 17:47:33 +0000 (+0000)
Subject: finished mpi for filter.seqs
X-Git-Url: https://git.donarmstrong.com/?a=commitdiff_plain;h=aba5f8811829037b0a3004ef33f0ad4ed5e5fcf8;p=mothur.git

finished mpi for filter.seqs
---

diff --git a/distancecommand.cpp b/distancecommand.cpp
index 23fe58c..4720df3 100644
--- a/distancecommand.cpp
+++ b/distancecommand.cpp
@@ -385,6 +385,7 @@ int DistanceCommand::driver(int startLine, int endLine, string dFileName, float
 		exit(1);
 	}
 }
+#ifdef USE_MPI
 /**************************************************************************************************/
 /////// need to fix to work with calcs and sequencedb
 int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, float cutoff){
@@ -444,11 +445,11 @@ int DistanceCommand::driverMPI(int startLine, int endLine, MPI_File& outMPI, flo
 		return 1;
 	}
 	catch(exception& e) {
-		m->errorOut(e, "DistanceCommand", "driver");
+		m->errorOut(e, "DistanceCommand", "driverMPI");
 		exit(1);
 	}
 }
-
+#endif
 /**************************************************************************************************/
 int DistanceCommand::convertMatrix(string outputFile) {
 	try{
diff --git a/distancecommand.h b/distancecommand.h
index b0a3c04..c1dac14 100644
--- a/distancecommand.h
+++ b/distancecommand.h
@@ -46,7 +46,10 @@ private:
 	//void appendFiles(string, string);
 	void createProcesses(string);
 	int driver(/*Dist*, SequenceDB, */int, int, string, float);
+	
+	#ifdef USE_MPI 
 	int driverMPI(int, int, MPI_File&, float);
+	#endif
 	
 	int convertMatrix(string);
 	int convertToLowerTriangle(string);
diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp
index 6d894d8..b402a5b 100644
--- a/filterseqscommand.cpp
+++ b/filterseqscommand.cpp
@@ -172,6 +172,13 @@ int FilterSeqsCommand::execute() {
 		
 		if (m->control_pressed) { return 0; }
 		
+		#ifdef USE_MPI
+			int pid;
+			MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+					
+			if (pid == 0) { //only one process should output the filter
+		#endif
+		
 		ofstream outFilter;
 		
 		string filterFile = outputDir + filterFileName + ".filter";
@@ -180,6 +187,9 @@ int FilterSeqsCommand::execute() {
 		outFilter.close();
 		outputNames.push_back(filterFile);
 		
+		#ifdef USE_MPI
+			}
+		#endif
 		
 		////////////run filter/////////////////
 		
@@ -216,7 +226,9 @@ int FilterSeqsCommand::execute() {
 /**************************************************************************************/
 int FilterSeqsCommand::filterSequences() {	
 	try {
-	
+		
+		numSeqs = 0;
+		
 		for (int s = 0; s < fastafileNames.size(); s++) {
 			
 				for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
@@ -247,11 +259,15 @@ int FilterSeqsCommand::filterSequences() {
 				if (pid == 0) { //you are the root process 
 					
 					setLines(fastafileNames[s]);
-						
+					
+					char bufF[alignmentLength];
+					strcpy(bufF, filter.c_str()); 
+								
 					for (int j = 0; j < lines.size(); j++) { //each process
 						if (j != 0) { //don't send to yourself
 							MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
 							MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
+							MPI_Send(bufF, alignmentLength, MPI_CHAR, j, tag, MPI_COMM_WORLD);
 						}
 					}
 					
@@ -271,10 +287,14 @@ int FilterSeqsCommand::filterSequences() {
 					
 				}else { //you are a child process
 					//receive your section of file
-					int startPos, numLines, bufferSize;
+					int startPos, bufferSize;
+					char bufF[alignmentLength];
 					MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
 					MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-									
+					MPI_Recv(bufF, alignmentLength, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status); 
+					
+					filter = bufF; //filter was made by process 0 so other processes need to get it
+								
 					//read your peice of file
 					char buf2[bufferSize];
 					MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status);
@@ -305,14 +325,14 @@ int FilterSeqsCommand::filterSequences() {
 					
 					driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
 				}else{
-					setLines(fastafileNames[s]);					
+					setLines(fastafileNames[s]);
 					createProcessesRunFilter(filter, fastafileNames[s]); 
-					
+				
 					rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
 				
 					//append fasta files
 					for(int i=1;i<processors;i++){
-						appendAlignFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
+						appendFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
 						remove((fastafileNames[s] + toString(processIDS[i]) + ".temp").c_str());
 					}
 				}
@@ -407,7 +427,7 @@ int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string i
 				
 		in.seekg(line->start);
 		
-		for(int i=0;i<line->numSeqs;i++){
+		for(int i=0;i<line->num;i++){
 				
 				if (m->control_pressed) { in.close(); out.close(); return 0; }
 				
@@ -476,8 +496,7 @@ int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
 /**************************************************************************************/
 string FilterSeqsCommand::createFilter() {	
 	try {
-		string filterString = "";
-		
+		string filterString = "";			
 		Filters F;
 		
 		if (soft != 0)			{  F.setSoft(soft);		}
@@ -493,44 +512,46 @@ string FilterSeqsCommand::createFilter() {
 		else						{	F.setFilter(string(alignmentLength, '1'));	}
 		
 		numSeqs = 0;
-		
 		if(trump != '*' || isTrue(vertical) || soft != 0){
 			for (int s = 0; s < fastafileNames.size(); s++) {
 			
+				for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
+			
 #ifdef USE_MPI	
-				int pid, rc, ierr; 
+				int pid; 
 				int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
 				int tag = 2001;
 				
 				MPI_Status status; 
-				MPI_File in; 
-				rc = MPI_Comm_size(MPI_COMM_WORLD, &processors);
-				rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
+				MPI_File inMPI; 
+				MPI_Comm_size(MPI_COMM_WORLD, &processors);
+				MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
 							
-				char tempFileName[fastafileNames[s].length()];
-				strcpy(tempFileName, fastafileNames[s].c_str());
-		cout << pid  << " tempFileName " << tempFileName << endl;		
-				MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in);  //comm, filename, mode, info, filepointer
-		cout << pid  << " here" << endl;			
+				char* tempFileName = new char(fastafileNames[s].length());
+				tempFileName = &(fastafileNames[s][0]);
+		
+				MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI);  //comm, filename, mode, info, filepointer
+				
 				if (pid == 0) { //you are the root process
 						setLines(fastafileNames[s]);
-				cout << pid  << " after setlines" << endl;			
+					
 						for (int j = 0; j < lines.size(); j++) { //each process
 							if (j != 0) { //don't send to yourself
 								MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
+								MPI_Send(&numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD); 
 								MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
 							}
 						}
-				cout << pid << " done sending" << endl;
+			
 						char buf[bufferSizes[0]];
-						MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
-			cout << pid << " done reading" << endl;
+						MPI_File_read_at(inMPI, 0, buf, bufferSizes[0], MPI_CHAR, &status);
+			
 						string tempBuf = buf;
 						if (tempBuf.length() > bufferSizes[0]) { tempBuf = tempBuf.substr(0, bufferSizes[0]); }
 
 						MPICreateFilter(F, tempBuf);
-				cout << pid << "done with mpi create filter " << endl;				
-						if (m->control_pressed) { MPI_File_close(&in); return filterString; }
+						
+						if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; }
 												
 						vector<int> temp; temp.resize(alignmentLength+1);
 						
@@ -538,7 +559,7 @@ string FilterSeqsCommand::createFilter() {
 						for(int i = 0; i < ((processors-1)*5); i++) { 
 							MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); 
 							int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what letter count this is for
-					
+				
 							if (receiveTag == Atag) { //you are recieveing the A frequencies
 								for (int k = 0; k < alignmentLength; k++) {		F.a[k] += temp[k];	}
 							}else if (receiveTag == Ttag) { //you are recieveing the T frequencies
@@ -554,21 +575,22 @@ string FilterSeqsCommand::createFilter() {
 
 						
 				}else { //i am the child process
-			cout << pid << endl;
+			
 					int startPos, bufferSize;
-					ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-					ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-			cout << pid << '\t' << startPos << '\t' << bufferSize << endl;						
+					MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+					MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+					MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+								
 					//send freqs
 					char buf2[bufferSize];
-					MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
+					MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status);
 			
 					string tempBuf = buf2;
 					if (tempBuf.length() > bufferSize) { tempBuf = tempBuf.substr(0, bufferSize); }
 			
 					MPICreateFilter(F, tempBuf);
-				cout << pid << "done with mpi create filter " << endl;		
-					if (m->control_pressed) { MPI_File_close(&in); return filterString; }
+				
+					if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; }
 					
 					//send my fequency counts
 					F.a.push_back(Atag);
@@ -584,7 +606,7 @@ string FilterSeqsCommand::createFilter() {
 				}
 				
 				MPI_Barrier(MPI_COMM_WORLD);
-				MPI_File_close(&in);
+				MPI_File_close(&inMPI);
 				
 #else
 		#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
@@ -596,7 +618,6 @@ string FilterSeqsCommand::createFilter() {
 					
 					numSeqs += numFastaSeqs;
 					
-					for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
 					lines.push_back(new linePair(0, numFastaSeqs));
 					
 					driverCreateFilter(F, fastafileNames[s], lines[0]);
@@ -614,7 +635,6 @@ string FilterSeqsCommand::createFilter() {
 				
 				numSeqs += numFastaSeqs;
 				
-				for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
 				lines.push_back(new linePair(0, numFastaSeqs));
 				
 				driverCreateFilter(F, fastafileNames[s], lines[0]);
@@ -648,7 +668,7 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
 				
 		in.seekg(line->start);
 		
-		for(int i=0;i<line->numSeqs;i++){
+		for(int i=0;i<line->num;i++){
 				
 			if (m->control_pressed) { in.close(); return 1; }
 					
@@ -666,7 +686,7 @@ int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair*
 		}
 		
 		//report progress
-		if((line->numSeqs) % 100 != 0){	m->mothurOut(toString(line->numSeqs)); m->mothurOutEndLine();		}
+		if((line->num) % 100 != 0){	m->mothurOut(toString(line->num)); m->mothurOutEndLine();		}
 		
 		in.close();
 		
@@ -751,7 +771,7 @@ int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename)
 
 int FilterSeqsCommand::setLines(string filename) {
 	try {
-		for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
+		
 		vector<long int> positions;
 		bufferSizes.clear();
 		
diff --git a/filterseqscommand.h b/filterseqscommand.h
index 2f318c0..1d2526f 100644
--- a/filterseqscommand.h
+++ b/filterseqscommand.h
@@ -26,8 +26,8 @@ public:
 private:
 	struct linePair {
 		int start;
-		int numSeqs;
-		linePair(long int i, int j) : start(i), numSeqs(j) {}
+		int num;
+		linePair(long int i, long int j) : start(i), num(j) {}
 	};
 	vector<linePair*> lines;
 	vector<int> processIDS;