From 89d6711c2beed6ee75fb00e5e57f1a91564d3e89 Mon Sep 17 00:00:00 2001 From: westcott Date: Thu, 29 Apr 2010 11:15:33 +0000 Subject: [PATCH] paralellized screen.seqs and added mpi code to it. fixed bug with all mpi commands that was caused by the startindex line coming after the numSeqsPerProcess line, only occurred in files where the number of seqs could not be divide evenly between processes. --- aligncommand.cpp | 6 +- chimeraccodecommand.cpp | 6 +- chimeracheckcommand.cpp | 6 +- chimerapintailcommand.cpp | 6 +- chimeraslayercommand.cpp | 6 +- classifyseqscommand.cpp | 6 +- commandfactory.cpp | 2 +- distancecommand.cpp | 8 + filterseqscommand.cpp | 12 +- makefile | 3 +- mothur.h | 26 +- phylosummary.cpp | 4 +- phylotree.cpp | 2 +- screenseqscommand.cpp | 790 +++++++++++++++++++++++++++++--------- screenseqscommand.h | 18 +- 15 files changed, 682 insertions(+), 219 deletions(-) diff --git a/aligncommand.cpp b/aligncommand.cpp index 37089b6..a246ecc 100644 --- a/aligncommand.cpp +++ b/aligncommand.cpp @@ -308,8 +308,9 @@ int AlignCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIAlign, outMPIReport, outMPIAccnos, MPIPos); @@ -328,8 +329,9 @@ int AlignCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIAlign, outMPIReport, outMPIAccnos, MPIPos); diff --git a/chimeraccodecommand.cpp b/chimeraccodecommand.cpp index 0b96f21..cc046ea 100644 --- a/chimeraccodecommand.cpp +++ b/chimeraccodecommand.cpp @@ -238,8 +238,9 @@ int ChimeraCcodeCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, outMPIAccnos, MPIPos); @@ -258,8 +259,9 @@ int ChimeraCcodeCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, outMPIAccnos, MPIPos); diff --git a/chimeracheckcommand.cpp b/chimeracheckcommand.cpp index 625314a..1b25861 100644 --- a/chimeracheckcommand.cpp +++ b/chimeracheckcommand.cpp @@ -196,8 +196,9 @@ int ChimeraCheckCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos); @@ -216,8 +217,9 @@ int ChimeraCheckCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos); diff --git a/chimerapintailcommand.cpp b/chimerapintailcommand.cpp index 62f1f2b..adf060f 100644 --- a/chimerapintailcommand.cpp +++ b/chimerapintailcommand.cpp @@ -254,8 +254,9 @@ int ChimeraPintailCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, outMPIAccnos, MPIPos); @@ -274,8 +275,9 @@ int ChimeraPintailCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, outMPIAccnos, MPIPos); diff --git a/chimeraslayercommand.cpp b/chimeraslayercommand.cpp index 58aecf5..435cd30 100644 --- a/chimeraslayercommand.cpp +++ b/chimeraslayercommand.cpp @@ -266,8 +266,9 @@ int ChimeraSlayerCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, outMPIAccnos, MPIPos); @@ -286,8 +287,9 @@ int ChimeraSlayerCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, outMPIAccnos, MPIPos); diff --git a/classifyseqscommand.cpp b/classifyseqscommand.cpp index 3837849..0538bff 100644 --- a/classifyseqscommand.cpp +++ b/classifyseqscommand.cpp @@ -382,8 +382,9 @@ int ClassifySeqsCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPINewTax, outMPITempTax, MPIPos); @@ -401,8 +402,9 @@ int ClassifySeqsCommand::execute(){ //figure out how many sequences you have to align numSeqsPerProcessor = numFastaSeqs / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } + //align your part driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPINewTax, outMPITempTax, MPIPos); diff --git a/commandfactory.cpp b/commandfactory.cpp index cad0442..8d78d63 100644 --- a/commandfactory.cpp +++ b/commandfactory.cpp @@ -132,7 +132,6 @@ CommandFactory::CommandFactory(){ //commands["consensus"] = "consensus"; commands["help"] = "help"; commands["summary.seqs"] = "summary.seqs"; - commands["screen.seqs"] = "screen.seqs"; commands["reverse.seqs"] = "reverse.seqs"; commands["trim.seqs"] = "trim.seqs"; commands["list.seqs"] = "list.seqs"; @@ -163,6 +162,7 @@ CommandFactory::CommandFactory(){ commands["chimera.slayer"] = "MPIEnabled"; commands["chimera.pintail"] = "MPIEnabled"; commands["chimera.bellerophon"] = "MPIEnabled"; + commands["screen.seqs"] = "MPIEnabled"; commands["quit"] = "MPIEnabled"; } diff --git a/distancecommand.cpp b/distancecommand.cpp index da5d0f1..ec32ffe 100644 --- a/distancecommand.cpp +++ b/distancecommand.cpp @@ -342,6 +342,14 @@ int DistanceCommand::execute(){ if (output == "square") { convertMatrix(outputFile); } + ifstream fileHandle; + fileHandle.open(outputFile.c_str()); + if(fileHandle) { + gobble(fileHandle); + if (fileHandle.eof()) { m->mothurOut(outputFile + " is blank. This can result if there are no distances below your cutoff."); m->mothurOutEndLine(); } + } + + #ifdef USE_MPI } #endif diff --git a/filterseqscommand.cpp b/filterseqscommand.cpp index b64e2f5..28e1373 100644 --- a/filterseqscommand.cpp +++ b/filterseqscommand.cpp @@ -287,8 +287,9 @@ int FilterSeqsCommand::filterSequences() { //figure out how many sequences you have to do numSeqsPerProcessor = num / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + //do your part driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos); @@ -309,8 +310,9 @@ int FilterSeqsCommand::filterSequences() { //figure out how many sequences you have to align numSeqsPerProcessor = num / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + //align your part driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos); @@ -580,8 +582,9 @@ string FilterSeqsCommand::createFilter() { //figure out how many sequences you have to do numSeqsPerProcessor = num / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + //do your part MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos); @@ -597,8 +600,9 @@ string FilterSeqsCommand::createFilter() { //figure out how many sequences you have to align numSeqsPerProcessor = num / processors; - if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; } + //do your part MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos); diff --git a/makefile b/makefile index 181b897..b15c44b 100644 --- a/makefile +++ b/makefile @@ -26,7 +26,8 @@ ifeq ($(strip $(USEREADLINE)),yes) -L../readline-6.0 endif -USEMPI ?= no +USEMPI ?= yes + ifeq ($(strip $(USEMPI)),yes) CC_OPTIONS += -DUSE_MPI diff --git a/mothur.h b/mothur.h index 062c481..7f3608e 100644 --- a/mothur.h +++ b/mothur.h @@ -865,14 +865,16 @@ inline void appendFiles(string temp, string filename) { //open output file in append mode openOutputFileAppend(filename, output); - openInputFile(temp, input); + int ableToOpen = openInputFile(temp, input, "no error"); - while(char c = input.get()){ - if(input.eof()) { break; } - else { output << c; } + if (ableToOpen == 0) { //you opened it + while(char c = input.get()){ + if(input.eof()) { break; } + else { output << c; } + } + input.close(); } - input.close(); output.close(); } catch(exception& e) { @@ -964,7 +966,7 @@ inline vector setFilePosFasta(string filename, int& num) { num = positions.size(); - FILE * pFile; + /*FILE * pFile; long size; //get num bytes in file @@ -974,7 +976,19 @@ inline vector setFilePosFasta(string filename, int& num) { fseek (pFile, 0, SEEK_END); size=ftell (pFile); fclose (pFile); + }*/ + + long size = positions[(positions.size()-1)]; + ifstream in; + openInputFile(filename, in); + + in.seekg(size); + + while(char c = in.get()){ + if(in.eof()) { break; } + else { size++; } } + in.close(); positions.push_back(size); diff --git a/phylosummary.cpp b/phylosummary.cpp index a5f67c0..915a05f 100644 --- a/phylosummary.cpp +++ b/phylosummary.cpp @@ -128,7 +128,7 @@ int PhyloSummary::addSeqToTree(string seqName, string seqTaxonomy){ currentNode = childPointer->second; }else{ //otherwise, error m->mothurOut("Warning: cannot find taxon " + taxon + " in reference taxonomy tree at level " + toString(tree[currentNode].level) + " for " + seqName + ". This may cause totals of daughter levels not to add up in summary file."); m->mothurOutEndLine(); - seqTaxonomy = ""; + break; } level++; @@ -168,7 +168,7 @@ void PhyloSummary::assignRank(int index){ void PhyloSummary::print(ofstream& out){ try { //print labels - out << "taxlevel\t rank ID\t label\t daughterlevels\t total\t"; + out << "taxlevel\t rankID\t taxon\t daughterlevels\t total\t"; if (groupmap != NULL) { for (int i = 0; i < groupmap->namesOfGroups.size(); i++) { out << groupmap->namesOfGroups[i] << '\t'; diff --git a/phylotree.cpp b/phylotree.cpp index 8c48e4f..855eaf9 100644 --- a/phylotree.cpp +++ b/phylotree.cpp @@ -373,7 +373,7 @@ void PhyloTree::binUnclassified(string file){ //this sequence is unclassified at some levels while(level != maxLevel){ - + level++; string taxon = "unclassified"; diff --git a/screenseqscommand.cpp b/screenseqscommand.cpp index 5b3d7d7..ab75032 100644 --- a/screenseqscommand.cpp +++ b/screenseqscommand.cpp @@ -22,7 +22,7 @@ ScreenSeqsCommand::ScreenSeqsCommand(string option) { else { //valid paramters for this command string AlignArray[] = {"fasta", "start", "end", "maxambig", "maxhomop", "minlength", "maxlength", - "name", "group", "alignreport","outputdir","inputdir"}; + "name", "group", "alignreport","processors","outputdir","inputdir"}; vector myArray (AlignArray, AlignArray+(sizeof(AlignArray)/sizeof(string))); OptionParser parser(option); @@ -117,6 +117,10 @@ ScreenSeqsCommand::ScreenSeqsCommand(string option) { temp = validParameter.validFile(parameters, "maxlength", false); if (temp == "not found") { temp = "-1"; } convert(temp, maxLength); + + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; } + convert(temp, processors); + } } @@ -163,68 +167,280 @@ int ScreenSeqsCommand::execute(){ if (abort == true) { return 0; } - ifstream inFASTA; - openInputFile(fastafile, inFASTA); - - set badSeqNames; - string goodSeqFile = outputDir + getRootName(getSimpleName(fastafile)) + "good" + getExtension(fastafile); string badSeqFile = outputDir + getRootName(getSimpleName(fastafile)) + "bad" + getExtension(fastafile); + string badAccnosFile = outputDir + getRootName(getSimpleName(fastafile)) + "bad.accnos"; - ofstream goodSeqOut; openOutputFile(goodSeqFile, goodSeqOut); - ofstream badSeqOut; openOutputFile(badSeqFile, badSeqOut); + int numFastaSeqs = 0; + set badSeqNames; + int start = time(NULL); - while(!inFASTA.eof()){ - if (m->control_pressed) { goodSeqOut.close(); badSeqOut.close(); inFASTA.close(); remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } +#ifdef USE_MPI + int pid, end, numSeqsPerProcessor; + int tag = 2001; + vector MPIPos; - Sequence currSeq(inFASTA); - if (currSeq.getName() != "") { - bool goodSeq = 1; // innocent until proven guilty - if(goodSeq == 1 && startPos != -1 && startPos < currSeq.getStartPos()) { goodSeq = 0; } - if(goodSeq == 1 && endPos != -1 && endPos > currSeq.getEndPos()) { goodSeq = 0; } - if(goodSeq == 1 && maxAmbig != -1 && maxAmbig < currSeq.getAmbigBases()) { goodSeq = 0; } - if(goodSeq == 1 && maxHomoP != -1 && maxHomoP < currSeq.getLongHomoPolymer()) { goodSeq = 0; } - if(goodSeq == 1 && minLength != -1 && minLength > currSeq.getNumBases()) { goodSeq = 0; } - if(goodSeq == 1 && maxLength != -1 && maxLength < currSeq.getNumBases()) { goodSeq = 0; } + MPI_Status status; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); + + MPI_File inMPI; + MPI_File outMPIGood; + MPI_File outMPIBad; + MPI_File outMPIBadAccnos; + + int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; + int inMode=MPI_MODE_RDONLY; + + char outGoodFilename[1024]; + strcpy(outGoodFilename, goodSeqFile.c_str()); + + char outBadFilename[1024]; + strcpy(outBadFilename, badSeqFile.c_str()); + + char outBadAccnosFilename[1024]; + strcpy(outBadAccnosFilename, badAccnosFile.c_str()); + + char inFileName[1024]; + strcpy(inFileName, fastafile.c_str()); + + MPI_File_open(MPI_COMM_WORLD, inFileName, inMode, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + MPI_File_open(MPI_COMM_WORLD, outGoodFilename, outMode, MPI_INFO_NULL, &outMPIGood); + MPI_File_open(MPI_COMM_WORLD, outBadFilename, outMode, MPI_INFO_NULL, &outMPIBad); + MPI_File_open(MPI_COMM_WORLD, outBadAccnosFilename, outMode, MPI_INFO_NULL, &outMPIBadAccnos); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBad); MPI_File_close(&outMPIBadAccnos); return 0; } + + if (pid == 0) { //you are the root process - if(goodSeq == 1){ - currSeq.printSequence(goodSeqOut); + MPIPos = setFilePosFasta(fastafile, numFastaSeqs); //fills MPIPos, returns numSeqs + + //send file positions to all processes + MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //send numSeqs + MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos + + //figure out how many sequences you have to align + numSeqsPerProcessor = numFastaSeqs / processors; + int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } + + //align your part + driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIGood, outMPIBad, outMPIBadAccnos, MPIPos, badSeqNames); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBadAccnos); MPI_File_close(&outMPIBad); return 0; } + + for (int i = 1; i < processors; i++) { + + //get bad lists + int badSize; + MPI_Recv(&badSize, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); + /*for (int j = 0; j < badSize; j++) { + int length; + MPI_Recv(&length, 1, MPI_INT, i, tag, MPI_COMM_WORLD, &status); //recv the length of the name + char* buf2 = new char[length]; //make space to recieve it + MPI_Recv(buf2, length, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); //get name + + string tempBuf = buf2; + if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } + delete buf2; + + badSeqNames.insert(tempBuf); + }*/ } - else{ - currSeq.printSequence(badSeqOut); - badSeqNames.insert(currSeq.getName()); + }else{ //you are a child process + MPI_Bcast(&numFastaSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs + MPIPos.resize(numFastaSeqs+1); + MPI_Bcast(&MPIPos[0], (numFastaSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions + + //figure out how many sequences you have to align + numSeqsPerProcessor = numFastaSeqs / processors; + int startIndex = pid * numSeqsPerProcessor; + if(pid == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - pid * numSeqsPerProcessor; } + + //align your part + driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPIGood, outMPIBad, outMPIBadAccnos, MPIPos, badSeqNames); + + if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPIGood); MPI_File_close(&outMPIBad); MPI_File_close(&outMPIBadAccnos); return 0; } + + //send bad list + int badSize = badSeqNames.size(); + MPI_Send(&badSize, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + + /* + set::iterator it; + for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { + string name = *it; + int length = name.length(); + char* buf2 = new char[length]; + memcpy(buf2, name.c_str(), length); + + MPI_Send(&length, 1, MPI_INT, 0, tag, MPI_COMM_WORLD); + MPI_Send(buf2, length, MPI_CHAR, 0, tag, MPI_COMM_WORLD); + }*/ + } + + //close files + MPI_File_close(&inMPI); + MPI_File_close(&outMPIGood); + MPI_File_close(&outMPIBad); + MPI_File_close(&outMPIBadAccnos); + +#else + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + ifstream inFASTA; + openInputFile(fastafile, inFASTA); + numFastaSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numFastaSeqs)); + + driver(lines[0], goodSeqFile, badSeqFile, badAccnosFile, fastafile, badSeqNames); + + if (m->control_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + + }else{ + vector positions; + processIDS.resize(0); + + ifstream inFASTA; + openInputFile(fastafile, inFASTA); + + string input; + while(!inFASTA.eof()){ + input = getline(inFASTA); + if (input.length() != 0) { + if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } + } + } + inFASTA.close(); + + numFastaSeqs = positions.size(); + + int numSeqsPerProcessor = numFastaSeqs / processors; + + for (int i = 0; i < processors; i++) { + long int startPos = positions[ i * numSeqsPerProcessor ]; + if(i == processors - 1){ + numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; + } + lines.push_back(new linePair(startPos, numSeqsPerProcessor)); + } + + createProcesses(goodSeqFile, badSeqFile, badAccnosFile, fastafile, badSeqNames); + + rename((goodSeqFile + toString(processIDS[0]) + ".temp").c_str(), goodSeqFile.c_str()); + rename((badSeqFile + toString(processIDS[0]) + ".temp").c_str(), badSeqFile.c_str()); + rename((badAccnosFile + toString(processIDS[0]) + ".temp").c_str(), badAccnosFile.c_str()); + + //append alignment and report files + for(int i=1;icontrol_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + + //read badSeqs in because root process doesnt know what other "bad" seqs the children found + ifstream inBad; + int ableToOpen = openInputFile(badAccnosFile, inBad, "no error"); + + if (ableToOpen == 0) { + badSeqNames.clear(); + string tempName; + while (!inBad.eof()) { + inBad >> tempName; gobble(inBad); + badSeqNames.insert(tempName); + } + inBad.close(); } } - gobble(inFASTA); - } + #else + ifstream inFASTA; + openInputFile(fastafile, inFASTA); + numFastaSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numFastaSeqs)); + + driver(lines[0], goodSeqFile, badSeqFile, badAccnosFile, fastafile, badSeqNames); + + if (m->control_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + + #endif + +#endif + + #ifdef USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &pid); + + if (pid == 0) { //only one process should fix files + + //read accnos file with all names in it, process 0 just has its names + MPI_File inMPIAccnos; + MPI_Offset size; + + char inFileName[1024]; + strcpy(inFileName, badAccnosFile.c_str()); + + MPI_File_open(MPI_COMM_SELF, inFileName, inMode, MPI_INFO_NULL, &inMPIAccnos); //comm, filename, mode, info, filepointer + MPI_File_get_size(inMPIAccnos, &size); + + char* buffer = new char[size]; + MPI_File_read(inMPIAccnos, buffer, size, MPI_CHAR, &status); + string tempBuf = buffer; + if (tempBuf.length() > size) { tempBuf = tempBuf.substr(0, size); } + istringstream iss (tempBuf,istringstream::in); + + delete buffer; + MPI_File_close(&inMPIAccnos); + + badSeqNames.clear(); + string tempName; + while (!iss.eof()) { + iss >> tempName; gobble(iss); + badSeqNames.insert(tempName); + } + #endif + if(namefile != "" && groupfile != "") { screenNameGroupFile(badSeqNames); - if (m->control_pressed) { goodSeqOut.close(); badSeqOut.close(); inFASTA.close(); remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + if (m->control_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } }else if(namefile != "") { screenNameGroupFile(badSeqNames); - if (m->control_pressed) { goodSeqOut.close(); badSeqOut.close(); inFASTA.close(); remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + if (m->control_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } }else if(groupfile != "") { screenGroupFile(badSeqNames); } // this screens just the group - if (m->control_pressed) { goodSeqOut.close(); badSeqOut.close(); inFASTA.close(); remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + if (m->control_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } if(alignreport != "") { screenAlignReport(badSeqNames); } - goodSeqOut.close(); - badSeqOut.close(); - inFASTA.close(); - - if (m->control_pressed) { remove(goodSeqFile.c_str()); remove(badSeqFile.c_str()); return 0; } + + #ifdef USE_MPI + } + #endif m->mothurOutEndLine(); m->mothurOut("Output File Names: "); m->mothurOutEndLine(); m->mothurOut(goodSeqFile); m->mothurOutEndLine(); m->mothurOut(badSeqFile); m->mothurOutEndLine(); + m->mothurOut(badAccnosFile); m->mothurOutEndLine(); for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } m->mothurOutEndLine(); + m->mothurOutEndLine(); + + m->mothurOut("It took " + toString(time(NULL) - start) + " secs to screen " + toString(numFastaSeqs) + " sequences."); + m->mothurOutEndLine(); - return 0; } catch(exception& e) { @@ -236,63 +452,118 @@ int ScreenSeqsCommand::execute(){ //*************************************************************************************************************** int ScreenSeqsCommand::screenNameGroupFile(set badSeqNames){ + try { + ifstream inputNames; + openInputFile(namefile, inputNames); + set badSeqGroups; + string seqName, seqList, group; + set::iterator it; + + string goodNameFile = outputDir + getRootName(getSimpleName(namefile)) + "good" + getExtension(namefile); + string badNameFile = outputDir + getRootName(getSimpleName(namefile)) + "bad" + getExtension(namefile); + + outputNames.push_back(goodNameFile); outputNames.push_back(badNameFile); + + ofstream goodNameOut; openOutputFile(goodNameFile, goodNameOut); + ofstream badNameOut; openOutputFile(badNameFile, badNameOut); + + while(!inputNames.eof()){ + if (m->control_pressed) { goodNameOut.close(); badNameOut.close(); inputNames.close(); remove(goodNameFile.c_str()); remove(badNameFile.c_str()); return 0; } - ifstream inputNames; - openInputFile(namefile, inputNames); - set badSeqGroups; - string seqName, seqList, group; - set::iterator it; - - string goodNameFile = outputDir + getRootName(getSimpleName(namefile)) + "good" + getExtension(namefile); - string badNameFile = outputDir + getRootName(getSimpleName(namefile)) + "bad" + getExtension(namefile); - - outputNames.push_back(goodNameFile); outputNames.push_back(badNameFile); - - ofstream goodNameOut; openOutputFile(goodNameFile, goodNameOut); - ofstream badNameOut; openOutputFile(badNameFile, badNameOut); - - while(!inputNames.eof()){ - if (m->control_pressed) { goodNameOut.close(); badNameOut.close(); inputNames.close(); remove(goodNameFile.c_str()); remove(badNameFile.c_str()); return 0; } - - inputNames >> seqName >> seqList; - it = badSeqNames.find(seqName); - - if(it != badSeqNames.end()){ - badSeqNames.erase(it); - badNameOut << seqName << '\t' << seqList << endl; - if(namefile != ""){ - int start = 0; - for(int i=0;i> seqName >> seqList; + it = badSeqNames.find(seqName); + + if(it != badSeqNames.end()){ + badSeqNames.erase(it); + badNameOut << seqName << '\t' << seqList << endl; + if(namefile != ""){ + int start = 0; + for(int i=0;imothurOut("Your namefile does not include the sequence " + *it + " please correct."); + m->mothurOutEndLine(); + } } - gobble(inputNames); - } - inputNames.close(); - goodNameOut.close(); - badNameOut.close(); - - //we were unable to remove some of the bad sequences - if (badSeqNames.size() != 0) { - for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { - m->mothurOut("Your namefile does not include the sequence " + *it + " please correct."); - m->mothurOutEndLine(); + + if(groupfile != ""){ + + ifstream inputGroups; + openInputFile(groupfile, inputGroups); + + string goodGroupFile = outputDir + getRootName(getSimpleName(groupfile)) + "good" + getExtension(groupfile); + string badGroupFile = outputDir + getRootName(getSimpleName(groupfile)) + "bad" + getExtension(groupfile); + + outputNames.push_back(goodGroupFile); outputNames.push_back(badGroupFile); + + ofstream goodGroupOut; openOutputFile(goodGroupFile, goodGroupOut); + ofstream badGroupOut; openOutputFile(badGroupFile, badGroupOut); + + while(!inputGroups.eof()){ + if (m->control_pressed) { goodGroupOut.close(); badGroupOut.close(); inputGroups.close(); remove(goodNameFile.c_str()); remove(badNameFile.c_str()); remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); return 0; } + + inputGroups >> seqName >> group; + + it = badSeqGroups.find(seqName); + + if(it != badSeqGroups.end()){ + badSeqGroups.erase(it); + badGroupOut << seqName << '\t' << group << endl; + } + else{ + goodGroupOut << seqName << '\t' << group << endl; + } + gobble(inputGroups); + } + inputGroups.close(); + goodGroupOut.close(); + badGroupOut.close(); + + //we were unable to remove some of the bad sequences + if (badSeqGroups.size() != 0) { + for (it = badSeqGroups.begin(); it != badSeqGroups.end(); it++) { + m->mothurOut("Your namefile does not include the sequence " + *it + " please correct."); + m->mothurOutEndLine(); + } + } } + + return 0; + + } + catch(exception& e) { + m->errorOut(e, "ScreenSeqsCommand", "screenNameGroupFile"); + exit(1); } +} - if(groupfile != ""){ - +//*************************************************************************************************************** + +int ScreenSeqsCommand::screenGroupFile(set badSeqNames){ + try { ifstream inputGroups; openInputFile(groupfile, inputGroups); - + string seqName, group; + set::iterator it; + string goodGroupFile = outputDir + getRootName(getSimpleName(groupfile)) + "good" + getExtension(groupfile); string badGroupFile = outputDir + getRootName(getSimpleName(groupfile)) + "bad" + getExtension(groupfile); @@ -302,14 +573,13 @@ int ScreenSeqsCommand::screenNameGroupFile(set badSeqNames){ ofstream badGroupOut; openOutputFile(badGroupFile, badGroupOut); while(!inputGroups.eof()){ - if (m->control_pressed) { goodGroupOut.close(); badGroupOut.close(); inputGroups.close(); remove(goodNameFile.c_str()); remove(badNameFile.c_str()); remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); return 0; } + if (m->control_pressed) { goodGroupOut.close(); badGroupOut.close(); inputGroups.close(); remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); return 0; } inputGroups >> seqName >> group; - - it = badSeqGroups.find(seqName); + it = badSeqNames.find(seqName); - if(it != badSeqGroups.end()){ - badSeqGroups.erase(it); + if(it != badSeqNames.end()){ + badSeqNames.erase(it); badGroupOut << seqName << '\t' << group << endl; } else{ @@ -317,142 +587,280 @@ int ScreenSeqsCommand::screenNameGroupFile(set badSeqNames){ } gobble(inputGroups); } - inputGroups.close(); - goodGroupOut.close(); - badGroupOut.close(); + if (m->control_pressed) { goodGroupOut.close(); badGroupOut.close(); inputGroups.close(); remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); return 0; } + //we were unable to remove some of the bad sequences - if (badSeqGroups.size() != 0) { - for (it = badSeqGroups.begin(); it != badSeqGroups.end(); it++) { - m->mothurOut("Your namefile does not include the sequence " + *it + " please correct."); + if (badSeqNames.size() != 0) { + for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { + m->mothurOut("Your groupfile does not include the sequence " + *it + " please correct."); m->mothurOutEndLine(); } } - } - return 0; + inputGroups.close(); + goodGroupOut.close(); + badGroupOut.close(); + + if (m->control_pressed) { remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); } + + return 0; + + } + catch(exception& e) { + m->errorOut(e, "ScreenSeqsCommand", "screenGroupFile"); + exit(1); + } } //*************************************************************************************************************** -int ScreenSeqsCommand::screenGroupFile(set badSeqNames){ - - ifstream inputGroups; - openInputFile(groupfile, inputGroups); - string seqName, group; - set::iterator it; - - string goodGroupFile = outputDir + getRootName(getSimpleName(groupfile)) + "good" + getExtension(groupfile); - string badGroupFile = outputDir + getRootName(getSimpleName(groupfile)) + "bad" + getExtension(groupfile); - - outputNames.push_back(goodGroupFile); outputNames.push_back(badGroupFile); - - ofstream goodGroupOut; openOutputFile(goodGroupFile, goodGroupOut); - ofstream badGroupOut; openOutputFile(badGroupFile, badGroupOut); - - while(!inputGroups.eof()){ - if (m->control_pressed) { goodGroupOut.close(); badGroupOut.close(); inputGroups.close(); remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); return 0; } - - inputGroups >> seqName >> group; - it = badSeqNames.find(seqName); +int ScreenSeqsCommand::screenAlignReport(set badSeqNames){ + try { + ifstream inputAlignReport; + openInputFile(alignreport, inputAlignReport); + string seqName, group; + set::iterator it; + + string goodAlignReportFile = outputDir + getRootName(getSimpleName(alignreport)) + "good" + getExtension(alignreport); + string badAlignReportFile = outputDir + getRootName(getSimpleName(alignreport)) + "bad" + getExtension(alignreport); - if(it != badSeqNames.end()){ - badSeqNames.erase(it); - badGroupOut << seqName << '\t' << group << endl; + outputNames.push_back(goodAlignReportFile); outputNames.push_back(badAlignReportFile); + + ofstream goodAlignReportOut; openOutputFile(goodAlignReportFile, goodAlignReportOut); + ofstream badAlignReportOut; openOutputFile(badAlignReportFile, badAlignReportOut); + + while (!inputAlignReport.eof()) { // need to copy header + char c = inputAlignReport.get(); + goodAlignReportOut << c; + badAlignReportOut << c; + if (c == 10 || c == 13){ break; } } - else{ - goodGroupOut << seqName << '\t' << group << endl; + + while(!inputAlignReport.eof()){ + if (m->control_pressed) { goodAlignReportOut.close(); badAlignReportOut.close(); inputAlignReport.close(); remove(goodAlignReportFile.c_str()); remove(badAlignReportFile.c_str()); return 0; } + + inputAlignReport >> seqName; + it = badSeqNames.find(seqName); + string line; + while (!inputAlignReport.eof()) { // need to copy header + char c = inputAlignReport.get(); + line += c; + if (c == 10 || c == 13){ break; } + } + + if(it != badSeqNames.end()){ + badSeqNames.erase(it); + badAlignReportOut << seqName << '\t' << line; + } + else{ + goodAlignReportOut << seqName << '\t' << line; + } + gobble(inputAlignReport); } - gobble(inputGroups); - } - - if (m->control_pressed) { goodGroupOut.close(); badGroupOut.close(); inputGroups.close(); remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); return 0; } + + if (m->control_pressed) { goodAlignReportOut.close(); badAlignReportOut.close(); inputAlignReport.close(); remove(goodAlignReportFile.c_str()); remove(badAlignReportFile.c_str()); return 0; } - //we were unable to remove some of the bad sequences - if (badSeqNames.size() != 0) { - for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { - m->mothurOut("Your groupfile does not include the sequence " + *it + " please correct."); - m->mothurOutEndLine(); + //we were unable to remove some of the bad sequences + if (badSeqNames.size() != 0) { + for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { + m->mothurOut("Your file does not include the sequence " + *it + " please correct."); + m->mothurOutEndLine(); + } } - } - - inputGroups.close(); - goodGroupOut.close(); - badGroupOut.close(); - - if (m->control_pressed) { remove(goodGroupFile.c_str()); remove(badGroupFile.c_str()); } + inputAlignReport.close(); + goodAlignReportOut.close(); + badAlignReportOut.close(); + + if (m->control_pressed) { remove(goodAlignReportFile.c_str()); remove(badAlignReportFile.c_str()); return 0; } + + return 0; - return 0; + } + catch(exception& e) { + m->errorOut(e, "ScreenSeqsCommand", "screenAlignReport"); + exit(1); + } } +//********************************************************************************************************************** -//*************************************************************************************************************** +int ScreenSeqsCommand::driver(linePair* line, string goodFName, string badFName, string badAccnosFName, string filename, set& badSeqNames){ + try { + ofstream goodFile; + openOutputFile(goodFName, goodFile); + + ofstream badFile; + openOutputFile(badFName, badFile); + + ofstream badAccnosFile; + openOutputFile(badAccnosFName, badAccnosFile); + + ifstream inFASTA; + openInputFile(filename, inFASTA); -int ScreenSeqsCommand::screenAlignReport(set badSeqNames){ + inFASTA.seekg(line->start); - ifstream inputAlignReport; - openInputFile(alignreport, inputAlignReport); - string seqName, group; - set::iterator it; - - string goodAlignReportFile = outputDir + getRootName(getSimpleName(alignreport)) + "good" + getExtension(alignreport); - string badAlignReportFile = outputDir + getRootName(getSimpleName(alignreport)) + "bad" + getExtension(alignreport); - - outputNames.push_back(goodAlignReportFile); outputNames.push_back(badAlignReportFile); - - ofstream goodAlignReportOut; openOutputFile(goodAlignReportFile, goodAlignReportOut); - ofstream badAlignReportOut; openOutputFile(badAlignReportFile, badAlignReportOut); - - while (!inputAlignReport.eof()) { // need to copy header - char c = inputAlignReport.get(); - goodAlignReportOut << c; - badAlignReportOut << c; - if (c == 10 || c == 13){ break; } + for(int i=0;inumSeqs;i++){ + + if (m->control_pressed) { return 0; } + + Sequence currSeq(inFASTA); + if (currSeq.getName() != "") { + bool goodSeq = 1; // innocent until proven guilty + if(goodSeq == 1 && startPos != -1 && startPos < currSeq.getStartPos()) { goodSeq = 0; } + if(goodSeq == 1 && endPos != -1 && endPos > currSeq.getEndPos()) { goodSeq = 0; } + if(goodSeq == 1 && maxAmbig != -1 && maxAmbig < currSeq.getAmbigBases()) { goodSeq = 0; } + if(goodSeq == 1 && maxHomoP != -1 && maxHomoP < currSeq.getLongHomoPolymer()) { goodSeq = 0; } + if(goodSeq == 1 && minLength != -1 && minLength > currSeq.getNumBases()) { goodSeq = 0; } + if(goodSeq == 1 && maxLength != -1 && maxLength < currSeq.getNumBases()) { goodSeq = 0; } + + if(goodSeq == 1){ + currSeq.printSequence(goodFile); + } + else{ + currSeq.printSequence(badFile); + badAccnosFile << currSeq.getName() << endl; + badSeqNames.insert(currSeq.getName()); + } + } + gobble(inFASTA); + } + + + goodFile.close(); + inFASTA.close(); + badFile.close(); + badAccnosFile.close(); + + return 1; + } + catch(exception& e) { + m->errorOut(e, "ScreenSeqsCommand", "driver"); + exit(1); } +} +//********************************************************************************************************************** +#ifdef USE_MPI +int ScreenSeqsCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& goodFile, MPI_File& badFile, MPI_File& badAccnosFile, vector& MPIPos, set& badSeqNames){ + try { + string outputString = ""; + MPI_Status statusGood; + MPI_Status statusBad; + MPI_Status statusBadAccnos; + MPI_Status status; + int pid; + MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + + for(int i=0;icontrol_pressed) { return 0; } + + //read next sequence + int length = MPIPos[start+i+1] - MPIPos[start+i]; - while(!inputAlignReport.eof()){ - if (m->control_pressed) { goodAlignReportOut.close(); badAlignReportOut.close(); inputAlignReport.close(); remove(goodAlignReportFile.c_str()); remove(badAlignReportFile.c_str()); return 0; } + char* buf4 = new char[length]; + memcpy(buf4, outputString.c_str(), length); - inputAlignReport >> seqName; - it = badSeqNames.find(seqName); - string line; - while (!inputAlignReport.eof()) { // need to copy header - char c = inputAlignReport.get(); - line += c; - if (c == 10 || c == 13){ break; } + MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status); + + string tempBuf = buf4; delete buf4; + if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); } + istringstream iss (tempBuf,istringstream::in); + + Sequence currSeq(iss); + + //process seq + if (currSeq.getName() != "") { + bool goodSeq = 1; // innocent until proven guilty + if(goodSeq == 1 && startPos != -1 && startPos < currSeq.getStartPos()) { goodSeq = 0; } + if(goodSeq == 1 && endPos != -1 && endPos > currSeq.getEndPos()) { goodSeq = 0; } + if(goodSeq == 1 && maxAmbig != -1 && maxAmbig < currSeq.getAmbigBases()) { goodSeq = 0; } + if(goodSeq == 1 && maxHomoP != -1 && maxHomoP < currSeq.getLongHomoPolymer()) { goodSeq = 0; } + if(goodSeq == 1 && minLength != -1 && minLength > currSeq.getNumBases()) { goodSeq = 0; } + if(goodSeq == 1 && maxLength != -1 && maxLength < currSeq.getNumBases()) { goodSeq = 0; } + + if(goodSeq == 1){ + outputString = ">" + currSeq.getName() + "\n" + currSeq.getAligned() + "\n"; + + //print good seq + length = outputString.length(); + char* buf2 = new char[length]; + memcpy(buf2, outputString.c_str(), length); + + MPI_File_write_shared(goodFile, buf2, length, MPI_CHAR, &statusGood); + delete buf2; + } + else{ + outputString = ">" + currSeq.getName() + "\n" + currSeq.getAligned() + "\n"; + + //print bad seq to fasta + length = outputString.length(); + char* buf2 = new char[length]; + memcpy(buf2, outputString.c_str(), length); + + MPI_File_write_shared(badFile, buf2, length, MPI_CHAR, &statusBad); + delete buf2; + + badSeqNames.insert(currSeq.getName()); + + //write to bad accnos file + outputString = currSeq.getName() + "\n"; + + length = outputString.length(); + char* buf3 = new char[length]; + memcpy(buf3, outputString.c_str(), length); + + MPI_File_write_shared(badAccnosFile, buf3, length, MPI_CHAR, &statusBadAccnos); + delete buf3; + } + } } + + return 1; + } + catch(exception& e) { + m->errorOut(e, "ScreenSeqsCommand", "driverMPI"); + exit(1); + } +} +#endif +/**************************************************************************************************/ + +int ScreenSeqsCommand::createProcesses(string goodFileName, string badFileName, string badAccnos, string filename, set& badSeqNames) { + try { +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 0; + int exitCommand = 1; - if(it != badSeqNames.end()){ - badSeqNames.erase(it); - badAlignReportOut << seqName << '\t' << line;; + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); + + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + exitCommand = driver(lines[process], goodFileName + toString(getpid()) + ".temp", badFileName + toString(getpid()) + ".temp", badAccnos + toString(getpid()) + ".temp", filename, badSeqNames); + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } } - else{ - goodAlignReportOut << seqName << '\t' << line; + + //force parent to wait until all the processes are done + for (int i=0;icontrol_pressed) { goodAlignReportOut.close(); badAlignReportOut.close(); inputAlignReport.close(); remove(goodAlignReportFile.c_str()); remove(badAlignReportFile.c_str()); return 0; } - - //we were unable to remove some of the bad sequences - if (badSeqNames.size() != 0) { - for (it = badSeqNames.begin(); it != badSeqNames.end(); it++) { - m->mothurOut("Your file does not include the sequence " + *it + " please correct."); - m->mothurOutEndLine(); - } + catch(exception& e) { + m->errorOut(e, "ScreenSeqsCommand", "createProcesses"); + exit(1); } - - inputAlignReport.close(); - goodAlignReportOut.close(); - badAlignReportOut.close(); - - if (m->control_pressed) { remove(goodAlignReportFile.c_str()); remove(badAlignReportFile.c_str()); return 0; } - - return 0; - - } //*************************************************************************************************************** diff --git a/screenseqscommand.h b/screenseqscommand.h index 54f4bb1..f1b7205 100644 --- a/screenseqscommand.h +++ b/screenseqscommand.h @@ -21,13 +21,29 @@ public: void help(); private: + + struct linePair { + int start; + int numSeqs; + linePair(long int i, int j) : start(i), numSeqs(j) {} + }; + vector processIDS; //processid + vector lines; + int screenNameGroupFile(set); int screenGroupFile(set); int screenAlignReport(set); + int driver(linePair*, string, string, string, string, set&); + int createProcesses(string, string, string, string, set&); + + #ifdef USE_MPI + int driverMPI(int, int, MPI_File&, MPI_File&, MPI_File&, MPI_File&, vector&, set&); + #endif + bool abort; string fastafile, namefile, groupfile, alignreport, outputDir; - int startPos, endPos, maxAmbig, maxHomoP, minLength, maxLength; + int startPos, endPos, maxAmbig, maxHomoP, minLength, maxLength, processors; vector outputNames; }; -- 2.39.2