void FilterSeqsCommand::help(){
try {
+ #ifdef USE_MPI
+ int pid;
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid);
+
+ if (pid == 0) {
+ #endif
+
m->mothurOut("The filter.seqs command reads a file containing sequences and creates a .filter and .filter.fasta file.\n");
m->mothurOut("The filter.seqs command parameters are fasta, trump, soft, hard and vertical. \n");
m->mothurOut("The fasta parameter is required. You may enter several fasta files to build the filter from and filter, by separating their names with -'s.\n");
m->mothurOut("Example filter.seqs(fasta=abrecovery.fasta, trump=..., soft=..., hard=..., vertical=T).\n");
m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFasta).\n\n");
+ #ifdef USE_MPI
+ }
+ #endif
+
}
catch(exception& e) {
m->errorOut(e, "FilterSeqsCommand", "help");
if (m->control_pressed) { for(int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; }
-
+ #ifdef USE_MPI
+ int pid;
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid);
+
+ if (pid == 0) {
+ #endif
+
m->mothurOutEndLine();
m->mothurOut("Length of filtered alignment: " + toString(filteredLength)); m->mothurOutEndLine();
m->mothurOut("Number of columns removed: " + toString((alignmentLength-filteredLength))); m->mothurOutEndLine();
m->mothurOut("Output File Names: "); m->mothurOutEndLine();
for(int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); }
m->mothurOutEndLine();
+
+ #ifdef USE_MPI
+ }
+ #endif
return 0;
for (int s = 0; s < fastafileNames.size(); s++) {
for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
+
+#ifdef USE_MPI
+ int pid, rc, ierr;
+ char* buf;
+ int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
- #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
- if(processors == 1){
- ifstream inFASTA;
- openInputFile(fastafileNames[s], inFASTA);
- int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
- inFASTA.close();
+ MPI_Status status;
+ MPI_File in;
+ rc = MPI_Comm_size(MPI_COMM_WORLD, &processors);
+ rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid);
- numSeqs += numFastaSeqs;
-
- lines.push_back(new linePair(0, numFastaSeqs));
-
- driverCreateFilter(F, fastafileNames[s], lines[0]);
- }else{
- vector<int> positions;
-
- ifstream inFASTA;
- openInputFile(fastafileNames[s], inFASTA);
-
- string input;
- while(!inFASTA.eof()){
- input = getline(inFASTA);
- if (input.length() != 0) {
- if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); }
+
+ char* tempFileName = &(fastafileNames[s][0]);
+ MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in); //comm, filename, mode, info, filepointer
+
+ if (pid == 0) { //you are the root process
+ setLines(fastafileNames[s]);
+
+ for (int j = 0; j < lines.size(); j++) { //each process
+ if (j != 0) { //don't send to yourself
+ MPI_Send(&lines[j]->start, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //start position in file
+ MPI_Send(&lines[j]->numSeqs, 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //how many sequences we are sending
+ MPI_Send(&bufferSizes[j], 1, MPI_INT, j, 2001, MPI_COMM_WORLD); //how bytes for the read
}
}
- inFASTA.close();
-
- int numFastaSeqs = positions.size();
+ cout << "done sending" << endl;
+ cout << "parent = " << pid << " lines = " << lines[pid]->start << '\t' << lines[pid]->numSeqs << " size = " << lines.size() << endl;
+
+ buf = new char(bufferSizes[0]);
+ cout << pid << '\t' << bufferSizes[0] << " line 1 start pos = " << lines[1]->start << " buffer size 0 " << bufferSizes[0] << " buffer size 1 " << bufferSizes[1] << endl;
+ MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
+
+ cout << pid << " done reading " << endl;
+ string tempBuf = buf;
+ cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
+ //do your part
+ MPICreateFilter(F, tempBuf);
+
+ vector<int> temp; temp.resize(numSeqs);
+
+ //get the frequencies from the child processes
+ for(int i = 0; i < ((processors-1)*5); i++) {
+ cout << "i = " << i << endl;
+ int ierr = MPI_Recv(&temp, numSeqs, MPI_INT, MPI_ANY_SOURCE, 2001, MPI_COMM_WORLD, &status);
+
+ int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for
+
+ int sender = status.MPI_SOURCE;
+
+ if (receiveTag == Atag) { //you are recieveing the A frequencies
+ for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; }
+ }else if (receiveTag == Ttag) { //you are recieveing the T frequencies
+ for (int k = 0; k < alignmentLength; k++) { F.t[k] += temp[k]; }
+ }else if (receiveTag == Ctag) { //you are recieveing the C frequencies
+ for (int k = 0; k < alignmentLength; k++) { F.c[k] += temp[k]; }
+ }else if (receiveTag == Gtag) { //you are recieveing the G frequencies
+ for (int k = 0; k < alignmentLength; k++) { F.g[k] += temp[k]; }
+ }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies
+ for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; }
+ }
+
+ m->mothurOut("receive tag = " + toString(receiveTag) + " " + toString(sender) + " is complete."); m->mothurOutEndLine();
+ }
+
- numSeqs += numFastaSeqs;
+ }else { //i am the child process
+ int startPos, numLines, bufferSize;
+ cout << "child = " << pid << endl;
+ ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+ ierr = MPI_Recv(&numLines, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+ ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, 2001, MPI_COMM_WORLD, &status);
+ cout << "child = " << pid << " done recv messages startpos = " << startPos << " numLines = " << numLines << " buffersize = " << bufferSize << endl;
- int numSeqsPerProcessor = numFastaSeqs / processors;
+
+ //send freqs
+ char* buf2 = new char(bufferSize);
+ MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
+ cout << pid << " done reading " << endl;
+
+ string tempBuf = buf2;
+ cout << pid << '\t' << (tempBuf.substr(0, 10)) << endl;
+ MPICreateFilter(F, tempBuf);
- for (int i = 0; i < processors; i++) {
- long int startPos = positions[ i * numSeqsPerProcessor ];
- if(i == processors - 1){
- numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
- }
- lines.push_back(new linePair(startPos, numSeqsPerProcessor));
- }
+ //send my fequency counts
+ F.a.push_back(Atag);
+ int ierr = MPI_Send( &F.a[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+ F.t.push_back(Ttag);
+ ierr = MPI_Send( &F.t[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+ F.c.push_back(Ctag);
+ ierr = MPI_Send( &F.c[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+ F.g.push_back(Gtag);
+ ierr = MPI_Send( &F.g[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+ F.gap.push_back(Gaptag);
+ ierr = MPI_Send( &F.gap[0], alignmentLength, MPI_INT, 0, 2001, MPI_COMM_WORLD);
+
+ cout << "child " << pid << " done sending counts" << endl;
+ }
- createProcessesCreateFilter(F, fastafileNames[s]);
- }
- #else
+ MPI_Barrier(MPI_COMM_WORLD);
+
+#else
+ #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
+ if(processors == 1){
ifstream inFASTA;
openInputFile(fastafileNames[s], inFASTA);
int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
inFASTA.close();
-
+
numSeqs += numFastaSeqs;
-
+
lines.push_back(new linePair(0, numFastaSeqs));
+
+ driverCreateFilter(F, fastafileNames[s], lines[0]);
+ }else{
+
+ setLines(fastafileNames[s]);
+ createProcessesCreateFilter(F, fastafileNames[s]);
+ }
+ #else
+ ifstream inFASTA;
+ openInputFile(fastafileNames[s], inFASTA);
+ int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
+ inFASTA.close();
- driverCreateFilter(F, lines[0], fastafileNames[s]);
- #endif
-
+ numSeqs += numFastaSeqs;
+
+ lines.push_back(new linePair(0, numFastaSeqs));
+
+ driverCreateFilter(F, lines[0], fastafileNames[s]);
+ #endif
+#endif
}
}
-
+
+#ifdef USE_MPI
+
+//merge all frequency data and create filter string
+ //int pid;
+ //MPI_Comm_rank(MPI_COMM_WORLD, &pid);
+
+ //if (pid == 0) { //only one process should output to screen
+#endif
+
+ cout << "made it here" << endl;
F.setNumSeqs(numSeqs);
if(isTrue(vertical) == 1) { F.doVertical(); }
if(soft != 0) { F.doSoft(); }
-
+//cout << "Filter String = " << F.getFilter() << endl;
filterString = F.getFilter();
return filterString;
if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); }
cout.flush();
}
+
+ //report progress
+ if((i+1) % 100 == 0){ m->mothurOut(toString(i+1)); m->mothurOutEndLine(); }
}
-
+
+ //report progress
+ if((line->numSeqs) % 100 != 0){ m->mothurOut(toString(line->numSeqs)); m->mothurOutEndLine(); }
+
in.close();
return 0;
exit(1);
}
}
+/**************************************************************************************/
+int FilterSeqsCommand::MPICreateFilter(Filters& F, string temp) {
+ try {
+
+ vector<string> seqStrings;
+ parseBuffer(temp, seqStrings);
+
+ for(int i=0;i<seqStrings.size();i++){
+
+ if (m->control_pressed) { return 1; }
+
+ Sequence seq("", seqStrings[0]);
+
+ if(trump != '*'){ F.doTrump(seq); }
+ if(isTrue(vertical) || soft != 0){ F.getFreqs(seq); }
+ cout.flush();
+
+ //report progress
+ if((i+1) % 100 == 0){ m->mothurOut(toString(i+1)); m->mothurOutEndLine(); }
+ }
+
+ //report progress
+ if((seqStrings.size()) % 100 != 0){ m->mothurOut(toString(seqStrings.size())); m->mothurOutEndLine(); }
+
+ return 0;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "FilterSeqsCommand", "MPICreateFilter");
+ exit(1);
+ }
+}
+
/**************************************************************************************************/
int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
exit(1);
}
}
+/**************************************************************************************************/
+
+int FilterSeqsCommand::setLines(string filename) {
+ try {
+ vector<int> positions;
+ map<int, int> buf;
+ bufferSizes.clear();
+
+ int pid;
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid);
+
+ ifstream inFASTA;
+ openInputFile(filename, inFASTA);
+
+ string input;
+ int numbuf = 0;
+ while(!inFASTA.eof()){
+ input = getline(inFASTA);
+
+ if (input.length() != 0) {
+ numbuf += input.length();
+ if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); buf[(pos - input.length() - 1)] = numbuf; }
+ }
+ }
+
+ inFASTA.close();
+
+ int numFastaSeqs = positions.size();
+
+ numSeqs += numFastaSeqs;
+
+ int numSeqsPerProcessor = numFastaSeqs / processors;
+
+ for (int i = 0; i < processors; i++) {
+
+ long int startPos = positions[ i * numSeqsPerProcessor ];
+ if(i == processors - 1){
+ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
+ bufferSizes.push_back(numbuf-buf[startPos]);
+ }else{
+ int myEnd = positions[ (i+1) * numSeqsPerProcessor ];
+ bufferSizes.push_back(buf[myEnd]-buf[startPos]);
+ }
+ lines.push_back(new linePair(startPos, numSeqsPerProcessor));
+ }
+
+ return 0;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "FilterSeqsCommand", "setLines");
+ exit(1);
+ }
+}
+/**************************************************************************************************/
+int FilterSeqsCommand::parseBuffer(string file, vector<string>& seqs) {
+ try {
+
+ istringstream iss (file,istringstream::in);
+ string name, seqstring;
+
+ while (iss) {
+
+ if (m->control_pressed) { return 0; }
+ cout << "here" << endl;
+ Sequence seq(iss);
+ cout << "here1" << endl;
+ gobble(iss);
+ cout << seq.getName() << endl;
+ if (seq.getName() != "") {
+ seqs.push_back(seq.getAligned());
+ }
+ }
+
+ return 0;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "FilterSeqsCommand", "parseBuffer");
+ exit(1);
+ }
+}
/**************************************************************************************/