From: westcott Date: Fri, 30 Apr 2010 12:01:35 +0000 (+0000) Subject: paralellized trim.seqs X-Git-Url: https://git.donarmstrong.com/?a=commitdiff_plain;h=c9c6df74a855bc1fcecb93cf56eb09d0907e7cf2;hp=8699fd2c88f10abda5fe32c89be061a89d673fd6;p=mothur.git paralellized trim.seqs --- diff --git a/Mothur.xcodeproj/project.pbxproj b/Mothur.xcodeproj/project.pbxproj index 3ee1d81..ce6918c 100644 --- a/Mothur.xcodeproj/project.pbxproj +++ b/Mothur.xcodeproj/project.pbxproj @@ -726,36 +726,36 @@ A7DA20F1113FECD400BF472F /* readtreecommand.h */, A7DA20F2113FECD400BF472F /* removeseqscommand.cpp */, A7DA20F3113FECD400BF472F /* removeseqscommand.h */, - A7DA20F4113FECD400BF472F /* reversecommand.cpp */, A7DA20F5113FECD400BF472F /* reversecommand.h */, - A7DA20F8113FECD400BF472F /* screenseqscommand.cpp */, + A7DA20F4113FECD400BF472F /* reversecommand.cpp */, A7DA20F9113FECD400BF472F /* screenseqscommand.h */, - A7DA20FA113FECD400BF472F /* secondarystructurecommand.cpp */, + A7DA20F8113FECD400BF472F /* screenseqscommand.cpp */, A7DA20FB113FECD400BF472F /* secondarystructurecommand.h */, - A7DA20FC113FECD400BF472F /* seqsummarycommand.cpp */, + A7DA20FA113FECD400BF472F /* secondarystructurecommand.cpp */, A7DA20FD113FECD400BF472F /* seqsummarycommand.h */, - A7DA2102113FECD400BF472F /* setdircommand.cpp */, + A7DA20FC113FECD400BF472F /* seqsummarycommand.cpp */, A7DA2103113FECD400BF472F /* setdircommand.h */, + A7DA2102113FECD400BF472F /* setdircommand.cpp */, A76C4A1011876BAF0009460B /* setlogfilecommand.h */, A76C4A1111876BAF0009460B /* setlogfilecommand.cpp */, - A7DA210F113FECD400BF472F /* sharedcommand.cpp */, A7DA2110113FECD400BF472F /* sharedcommand.h */, - A7DA2154113FECD400BF472F /* summarycommand.cpp */, + A7DA210F113FECD400BF472F /* sharedcommand.cpp */, A7DA2155113FECD400BF472F /* summarycommand.h */, - A7DA2158113FECD400BF472F /* summarysharedcommand.cpp */, + A7DA2154113FECD400BF472F /* summarycommand.cpp */, A7DA2159113FECD400BF472F /* summarysharedcommand.h */, - A7DA215A113FECD400BF472F /* systemcommand.cpp */, + A7DA2158113FECD400BF472F /* summarysharedcommand.cpp */, A7DA215B113FECD400BF472F /* systemcommand.h */, - A7DA2161113FECD400BF472F /* treegroupscommand.cpp */, + A7DA215A113FECD400BF472F /* systemcommand.cpp */, A7DA2162113FECD400BF472F /* treegroupscommand.h */, - A7DA2167113FECD400BF472F /* trimseqscommand.cpp */, + A7DA2161113FECD400BF472F /* treegroupscommand.cpp */, A7DA2168113FECD400BF472F /* trimseqscommand.h */, - A7DA2169113FECD400BF472F /* unifracunweightedcommand.cpp */, + A7DA2167113FECD400BF472F /* trimseqscommand.cpp */, A7DA216A113FECD400BF472F /* unifracunweightedcommand.h */, - A7DA216B113FECD400BF472F /* unifracweightedcommand.cpp */, + A7DA2169113FECD400BF472F /* unifracunweightedcommand.cpp */, A7DA216C113FECD400BF472F /* unifracweightedcommand.h */, - A7DA2177113FECD400BF472F /* venncommand.cpp */, + A7DA216B113FECD400BF472F /* unifracweightedcommand.cpp */, A7DA2178113FECD400BF472F /* venncommand.h */, + A7DA2177113FECD400BF472F /* venncommand.cpp */, ); name = commands; sourceTree = ""; diff --git a/makefile b/makefile index b15c44b..b7bb8d0 100644 --- a/makefile +++ b/makefile @@ -26,7 +26,7 @@ ifeq ($(strip $(USEREADLINE)),yes) -L../readline-6.0 endif -USEMPI ?= yes +USEMPI ?= no ifeq ($(strip $(USEMPI)),yes) diff --git a/trimseqscommand.cpp b/trimseqscommand.cpp index 3d80f9e..ef7dfdd 100644 --- a/trimseqscommand.cpp +++ b/trimseqscommand.cpp @@ -22,7 +22,7 @@ TrimSeqsCommand::TrimSeqsCommand(string option) { else { //valid paramters for this command string AlignArray[] = {"fasta", "flip", "oligos", "maxambig", "maxhomop", "minlength", "maxlength", "qfile", - "qthreshold", "qaverage", "allfiles", "qtrim", "outputdir","inputdir"}; + "qthreshold", "qaverage", "allfiles", "qtrim", "processors", "outputdir","inputdir"}; vector myArray (AlignArray, AlignArray+(sizeof(AlignArray)/sizeof(string))); @@ -120,6 +120,9 @@ TrimSeqsCommand::TrimSeqsCommand(string option) { temp = validParameter.validFile(parameters, "allfiles", false); if (temp == "not found") { temp = "F"; } allFiles = isTrue(temp); + temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = "1"; } + convert(temp, processors); + if(allFiles && oligoFile == ""){ m->mothurOut("You selected allfiles, but didn't enter an oligos file. Ignoring the allfiles request."); m->mothurOutEndLine(); } @@ -187,40 +190,151 @@ int TrimSeqsCommand::execute(){ numFPrimers = 0; //this needs to be initialized numRPrimers = 0; - ifstream inFASTA; - openInputFile(fastaFile, inFASTA); - - ofstream outFASTA; string trimSeqFile = outputDir + getRootName(getSimpleName(fastaFile)) + "trim.fasta"; - openOutputFile(trimSeqFile, outFASTA); outputNames.push_back(trimSeqFile); + string scrapSeqFile = outputDir + getRootName(getSimpleName(fastaFile)) + "scrap.fasta"; + outputNames.push_back(scrapSeqFile); + string groupFile = outputDir + getRootName(getSimpleName(fastaFile)) + "groups"; - ofstream outGroups; - vector fastaFileNames; + vector fastaFileNames; if(oligoFile != ""){ - string groupFile = outputDir + getRootName(getSimpleName(fastaFile)) + "groups"; - openOutputFile(groupFile, outGroups); outputNames.push_back(groupFile); getOligos(fastaFileNames); } + if(qFileName != "") { setLines(qFileName, qLines); } + + + #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + if(processors == 1){ + ifstream inFASTA; + openInputFile(fastaFile, inFASTA); + int numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numSeqs)); + + driverCreateTrim(fastaFile, qFileName, trimSeqFile, scrapSeqFile, groupFile, fastaFileNames, lines[0], lines[0]); + + for (int j = 0; j < fastaFileNames.size(); j++) { + rename((fastaFileNames[j] + toString(getpid()) + ".temp").c_str(), fastaFileNames[j].c_str()); + } + + }else{ + setLines(fastaFile, lines); + if(qFileName == "") { qLines = lines; } + + createProcessesCreateTrim(fastaFile, qFileName, trimSeqFile, scrapSeqFile, groupFile, fastaFileNames); + + rename((trimSeqFile + toString(processIDS[0]) + ".temp").c_str(), trimSeqFile.c_str()); + rename((scrapSeqFile + toString(processIDS[0]) + ".temp").c_str(), scrapSeqFile.c_str()); + rename((groupFile + toString(processIDS[0]) + ".temp").c_str(), groupFile.c_str()); + for (int j = 0; j < fastaFileNames.size(); j++) { + rename((fastaFileNames[j] + toString(processIDS[0]) + ".temp").c_str(), fastaFileNames[j].c_str()); + } + //append files + for(int i=1;icontrol_pressed) { return 0; } + #else + ifstream inFASTA; + openInputFile(fastafileNames[s], inFASTA); + numSeqs=count(istreambuf_iterator(inFASTA),istreambuf_iterator(), '>'); + inFASTA.close(); + + lines.push_back(new linePair(0, numSeqs)); + + driverCreateSummary(fastafile, qFileName, trimSeqFile, scrapSeqFile, groupFile, fastaFileNames, lines[0], lines[0]); + + if (m->control_pressed) { return 0; } + #endif + + + for(int i=0;i'){ + inFASTA >> seqName; + outGroups << seqName << '\t' << groupVector[i] << endl; + } + while (!inFASTA.eof()) { char c = inFASTA.get(); if (c == 10 || c == 13){ break; } } + } + outGroups.close(); + inFASTA.close(); + } + + if (m->control_pressed) { + for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } + return 0; + } + + m->mothurOutEndLine(); + m->mothurOut("Output File Names: "); m->mothurOutEndLine(); + for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } + m->mothurOutEndLine(); + + return 0; + + } + catch(exception& e) { + m->errorOut(e, "TrimSeqsCommand", "execute"); + exit(1); + } +} + +/**************************************************************************************/ +int TrimSeqsCommand::driverCreateTrim(string filename, string qFileName, string trimFile, string scrapFile, string groupFile, vector fastaNames, linePair* line, linePair* qline) { + try { + + ofstream outFASTA; + int able = openOutputFile(trimFile, outFASTA); + ofstream scrapFASTA; - string scrapSeqFile = outputDir + getRootName(getSimpleName(fastaFile)) + "scrap.fasta"; - openOutputFile(scrapSeqFile, scrapFASTA); - outputNames.push_back(scrapSeqFile); + openOutputFile(scrapFile, scrapFASTA); + + ofstream outGroups; + vector fastaFileNames; + if (oligoFile != "") { + openOutputFile(groupFile, outGroups); + for (int i = 0; i < fastaNames.size(); i++) { + fastaFileNames.push_back(new ofstream((fastaNames[i] + toString(getpid()) + ".temp").c_str(), ios::ate)); + } + } + + ifstream inFASTA; + openInputFile(filename, inFASTA); ifstream qFile; if(qFileName != "") { openInputFile(qFileName, qFile); } - bool success; - - while(!inFASTA.eof()){ + qFile.seekg(qline->start); + inFASTA.seekg(line->start); + for(int i=0;inum;i++){ + if (m->control_pressed) { inFASTA.close(); outFASTA.close(); scrapFASTA.close(); - outGroups.close(); + if (oligoFile != "") { outGroups.close(); } if(qFileName != "") { qFile.close(); } for(int i=0;iclose(); @@ -229,7 +343,8 @@ int TrimSeqsCommand::execute(){ for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } return 0; } - + + bool success = 1; Sequence currSeq(inFASTA); @@ -296,10 +411,11 @@ int TrimSeqsCommand::execute(){ } gobble(inFASTA); } + inFASTA.close(); outFASTA.close(); scrapFASTA.close(); - outGroups.close(); + if (oligoFile != "") { outGroups.close(); } if(qFileName != "") { qFile.close(); } for(int i=0;ierrorOut(e, "TrimSeqsCommand", "driverCreateTrim"); + exit(1); + } +} +/**************************************************************************************************/ +int TrimSeqsCommand::createProcessesCreateTrim(string filename, string qFileName, string trimFile, string scrapFile, string groupFile, vector fastaNames) { + try { +#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) + int process = 0; + int exitCommand = 1; + processIDS.clear(); + + //loop through and create all the processes you want + while (process != processors) { + int pid = fork(); - while(!inFASTA.eof()){ - if(inFASTA.get() == '>'){ - inFASTA >> seqName; - outGroups << seqName << '\t' << groupVector[i] << endl; - } - while (!inFASTA.eof()) { char c = inFASTA.get(); if (c == 10 || c == 13){ break; } } - } - outGroups.close(); - inFASTA.close(); + if (pid > 0) { + processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later + process++; + }else if (pid == 0){ + driverCreateTrim(filename, qFileName, (trimFile + toString(getpid()) + ".temp"), (scrapFile + toString(getpid()) + ".temp"), (groupFile + toString(getpid()) + ".temp"), fastaNames, lines[process], qLines[process]); + exit(0); + }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); } } - if (m->control_pressed) { - for (int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); } - return 0; + //force parent to wait until all the processes are done + for (int i=0;ierrorOut(e, "TrimSeqsCommand", "createProcessesCreateTrim"); + exit(1); + } +} +/**************************************************************************************************/ - m->mothurOutEndLine(); - m->mothurOut("Output File Names: "); m->mothurOutEndLine(); - for (int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); } - m->mothurOutEndLine(); +int TrimSeqsCommand::setLines(string filename, vector& lines) { + try { + + lines.clear(); + + vector positions; + + ifstream inFASTA; + openInputFile(filename, inFASTA); - return 0; + string input; + while(!inFASTA.eof()){ + input = getline(inFASTA); + + if (input.length() != 0) { + if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1); } + } + } + inFASTA.close(); + + int numFastaSeqs = positions.size(); + + FILE * pFile; + long size; + + //get num bytes in file + pFile = fopen (filename.c_str(),"rb"); + if (pFile==NULL) perror ("Error opening file"); + else{ + fseek (pFile, 0, SEEK_END); + size=ftell (pFile); + fclose (pFile); + } + + int numSeqsPerProcessor = numFastaSeqs / processors; + + for (int i = 0; i < processors; i++) { + + long int startPos = positions[ i * numSeqsPerProcessor ]; + if(i == processors - 1){ + numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; + }else{ + long int myEnd = positions[ (i+1) * numSeqsPerProcessor ]; + } + lines.push_back(new linePair(startPos, numSeqsPerProcessor)); + } + + return numFastaSeqs; } catch(exception& e) { - m->errorOut(e, "TrimSeqsCommand", "execute"); + m->errorOut(e, "TrimSeqsCommand", "setLines"); exit(1); } } - //*************************************************************************************************************** -void TrimSeqsCommand::getOligos(vector& outFASTAVec){ +void TrimSeqsCommand::getOligos(vector& outFASTAVec){ //vector& outFASTAVec try { ifstream inOligos; openInputFile(oligoFile, inOligos); @@ -383,8 +561,9 @@ void TrimSeqsCommand::getOligos(vector& outFASTAVec){ groupVector.push_back(group); if(allFiles){ - outFASTAVec.push_back(new ofstream((outputDir + getRootName(getSimpleName(fastaFile)) + group + ".fasta").c_str(), ios::ate)); + //outFASTAVec.push_back(new ofstream((outputDir + getRootName(getSimpleName(fastaFile)) + group + ".fasta").c_str(), ios::ate)); outputNames.push_back((outputDir + getRootName(getSimpleName(fastaFile)) + group + ".fasta")); + outFASTAVec.push_back((outputDir + getRootName(getSimpleName(fastaFile)) + group + ".fasta")); } } } @@ -641,7 +820,7 @@ bool TrimSeqsCommand::cullQualAverage(Sequence& seq, ifstream& qFile){ string name; qFile >> name; - if (name[0] == '>') { if(name.substr(1) != seq.getName()) { m->mothurOut("sequence name mismatch btwn fasta: " + seq.getName() + " and qual file: " + name); m->mothurOutEndLine(); } } + if (name[0] == '>') { if(name.substr(1) != seq.getName()) { m->mothurOut("sequence name mismatch btwn fasta: " + seq.getName() + " and qual file: " + name); m->mothurOutEndLine(); } } while (!qFile.eof()) { char c = qFile.get(); if (c == 10 || c == 13){ break; } } diff --git a/trimseqscommand.h b/trimseqscommand.h index a93eba0..11778fb 100644 --- a/trimseqscommand.h +++ b/trimseqscommand.h @@ -22,7 +22,14 @@ public: void help(); private: - void getOligos(vector&); + + struct linePair { + int start; + int num; + linePair(long int i, int j) : start(i), num(j) {} + }; + + void getOligos(vector&); bool stripQualThreshold(Sequence&, ifstream&); bool cullQualAverage(Sequence&, ifstream&); bool stripBarcode(Sequence&, int&); @@ -37,10 +44,19 @@ private: string fastaFile, oligoFile, qFileName, outputDir; bool flip, allFiles, qtrim; - int numFPrimers, numRPrimers, maxAmbig, maxHomoP, minLength, maxLength, qThreshold, qAverage; + int numFPrimers, numRPrimers, maxAmbig, maxHomoP, minLength, maxLength, qThreshold, qAverage, processors; vector forPrimer, revPrimer, outputNames; map barcodes; vector groupVector; + + vector processIDS; //processid + vector lines; + vector qLines; + + int driverCreateTrim(string, string, string, string, string, vector, linePair*, linePair*); + int createProcessesCreateTrim(string, string, string, string, string, vector); + int setLines(string, vector&); + }; #endif