if (m->control_pressed) { return 0; }
+ #ifdef USE_MPI
+ int pid;
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid);
+
+ if (pid == 0) { //only one process should output the filter
+ #endif
+
ofstream outFilter;
string filterFile = outputDir + filterFileName + ".filter";
outFilter.close();
outputNames.push_back(filterFile);
+ #ifdef USE_MPI
+ }
+ #endif
////////////run filter/////////////////
/**************************************************************************************/
int FilterSeqsCommand::filterSequences() {
try {
-
+
+ numSeqs = 0;
+
for (int s = 0; s < fastafileNames.size(); s++) {
for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
if (pid == 0) { //you are the root process
setLines(fastafileNames[s]);
-
+
+ char bufF[alignmentLength];
+ strcpy(bufF, filter.c_str());
+
for (int j = 0; j < lines.size(); j++) { //each process
if (j != 0) { //don't send to yourself
MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
+ MPI_Send(bufF, alignmentLength, MPI_CHAR, j, tag, MPI_COMM_WORLD);
}
}
}else { //you are a child process
//receive your section of file
- int startPos, numLines, bufferSize;
+ int startPos, bufferSize;
+ char bufF[alignmentLength];
MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
-
+ MPI_Recv(bufF, alignmentLength, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status);
+
+ filter = bufF; //filter was made by process 0 so other processes need to get it
+
//read your peice of file
char buf2[bufferSize];
MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status);
driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
}else{
- setLines(fastafileNames[s]);
+ setLines(fastafileNames[s]);
createProcessesRunFilter(filter, fastafileNames[s]);
-
+
rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
//append fasta files
for(int i=1;i<processors;i++){
- appendAlignFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
+ appendFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
remove((fastafileNames[s] + toString(processIDS[i]) + ".temp").c_str());
}
}
in.seekg(line->start);
- for(int i=0;i<line->numSeqs;i++){
+ for(int i=0;i<line->num;i++){
if (m->control_pressed) { in.close(); out.close(); return 0; }
/**************************************************************************************/
string FilterSeqsCommand::createFilter() {
try {
- string filterString = "";
-
+ string filterString = "";
Filters F;
if (soft != 0) { F.setSoft(soft); }
else { F.setFilter(string(alignmentLength, '1')); }
numSeqs = 0;
-
if(trump != '*' || isTrue(vertical) || soft != 0){
for (int s = 0; s < fastafileNames.size(); s++) {
+ for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
+
#ifdef USE_MPI
- int pid, rc, ierr;
+ int pid;
int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
int tag = 2001;
MPI_Status status;
- MPI_File in;
- rc = MPI_Comm_size(MPI_COMM_WORLD, &processors);
- rc = MPI_Comm_rank(MPI_COMM_WORLD, &pid);
+ MPI_File inMPI;
+ MPI_Comm_size(MPI_COMM_WORLD, &processors);
+ MPI_Comm_rank(MPI_COMM_WORLD, &pid);
- char tempFileName[fastafileNames[s].length()];
- strcpy(tempFileName, fastafileNames[s].c_str());
- cout << pid << " tempFileName " << tempFileName << endl;
- MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &in); //comm, filename, mode, info, filepointer
- cout << pid << " here" << endl;
+ char* tempFileName = new char(fastafileNames[s].length());
+ tempFileName = &(fastafileNames[s][0]);
+
+ MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer
+
if (pid == 0) { //you are the root process
setLines(fastafileNames[s]);
- cout << pid << " after setlines" << endl;
+
for (int j = 0; j < lines.size(); j++) { //each process
if (j != 0) { //don't send to yourself
MPI_Send(&lines[j]->start, 1, MPI_INT, j, tag, MPI_COMM_WORLD); //start position in file
+ MPI_Send(&numSeqs, 1, MPI_INT, j, tag, MPI_COMM_WORLD);
MPI_Send(&bufferSizes[j], 1, MPI_INT, j, tag, MPI_COMM_WORLD); //how bytes for the read
}
}
- cout << pid << " done sending" << endl;
+
char buf[bufferSizes[0]];
- MPI_File_read_at(in, 0, buf, bufferSizes[0], MPI_CHAR, &status);
- cout << pid << " done reading" << endl;
+ MPI_File_read_at(inMPI, 0, buf, bufferSizes[0], MPI_CHAR, &status);
+
string tempBuf = buf;
if (tempBuf.length() > bufferSizes[0]) { tempBuf = tempBuf.substr(0, bufferSizes[0]); }
MPICreateFilter(F, tempBuf);
- cout << pid << "done with mpi create filter " << endl;
- if (m->control_pressed) { MPI_File_close(&in); return filterString; }
+
+ if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; }
vector<int> temp; temp.resize(alignmentLength+1);
for(int i = 0; i < ((processors-1)*5); i++) {
MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status);
int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for
-
+
if (receiveTag == Atag) { //you are recieveing the A frequencies
for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; }
}else if (receiveTag == Ttag) { //you are recieveing the T frequencies
}else { //i am the child process
- cout << pid << endl;
+
int startPos, bufferSize;
- ierr = MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
- ierr = MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
- cout << pid << '\t' << startPos << '\t' << bufferSize << endl;
+ MPI_Recv(&startPos, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+ MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+ MPI_Recv(&bufferSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
+
//send freqs
char buf2[bufferSize];
- MPI_File_read_at( in, startPos, buf2, bufferSize, MPI_CHAR, &status);
+ MPI_File_read_at(inMPI, startPos, buf2, bufferSize, MPI_CHAR, &status);
string tempBuf = buf2;
if (tempBuf.length() > bufferSize) { tempBuf = tempBuf.substr(0, bufferSize); }
MPICreateFilter(F, tempBuf);
- cout << pid << "done with mpi create filter " << endl;
- if (m->control_pressed) { MPI_File_close(&in); return filterString; }
+
+ if (m->control_pressed) { MPI_File_close(&inMPI); return filterString; }
//send my fequency counts
F.a.push_back(Atag);
}
MPI_Barrier(MPI_COMM_WORLD);
- MPI_File_close(&in);
+ MPI_File_close(&inMPI);
#else
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
numSeqs += numFastaSeqs;
- for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
lines.push_back(new linePair(0, numFastaSeqs));
driverCreateFilter(F, fastafileNames[s], lines[0]);
numSeqs += numFastaSeqs;
- for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
lines.push_back(new linePair(0, numFastaSeqs));
driverCreateFilter(F, fastafileNames[s], lines[0]);
in.seekg(line->start);
- for(int i=0;i<line->numSeqs;i++){
+ for(int i=0;i<line->num;i++){
if (m->control_pressed) { in.close(); return 1; }
}
//report progress
- if((line->numSeqs) % 100 != 0){ m->mothurOut(toString(line->numSeqs)); m->mothurOutEndLine(); }
+ if((line->num) % 100 != 0){ m->mothurOut(toString(line->num)); m->mothurOutEndLine(); }
in.close();
int FilterSeqsCommand::setLines(string filename) {
try {
- for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
+
vector<long int> positions;
bufferSizes.clear();