2 * filterseqscommand.cpp
5 * Created by Thomas Ryabin on 5/4/09.
6 * Copyright 2009 Schloss Lab UMASS Amherst. All rights reserved.
10 #include "filterseqscommand.h"
11 #include "sequence.hpp"
14 //**********************************************************************************************************************
15 vector<string> FilterSeqsCommand::setParameters(){
17 CommandParameter pfasta("fasta", "InputTypes", "", "", "none", "none", "none",false,true); parameters.push_back(pfasta);
18 CommandParameter phard("hard", "InputTypes", "", "", "none", "none", "none",false,false); parameters.push_back(phard);
19 CommandParameter ptrump("trump", "String", "", "*", "", "", "",false,false); parameters.push_back(ptrump);
20 CommandParameter psoft("soft", "Number", "", "0", "", "", "",false,false); parameters.push_back(psoft);
21 CommandParameter pvertical("vertical", "Boolean", "", "T", "", "", "",false,false); parameters.push_back(pvertical);
22 CommandParameter pprocessors("processors", "Number", "", "1", "", "", "",false,false); parameters.push_back(pprocessors);
23 CommandParameter pinputdir("inputdir", "String", "", "", "", "", "",false,false); parameters.push_back(pinputdir);
24 CommandParameter poutputdir("outputdir", "String", "", "", "", "", "",false,false); parameters.push_back(poutputdir);
26 vector<string> myArray;
27 for (int i = 0; i < parameters.size(); i++) { myArray.push_back(parameters[i].name); }
31 m->errorOut(e, "FilterSeqsCommand", "setParameters");
35 //**********************************************************************************************************************
36 string FilterSeqsCommand::getHelpString(){
38 string helpString = "";
39 helpString += "The filter.seqs command reads a file containing sequences and creates a .filter and .filter.fasta file.\n";
40 helpString += "The filter.seqs command parameters are fasta, trump, soft, hard, processors and vertical. \n";
41 helpString += "The fasta parameter is required, unless you have a valid current fasta file. You may enter several fasta files to build the filter from and filter, by separating their names with -'s.\n";
42 helpString += "For example: fasta=abrecovery.fasta-amazon.fasta \n";
43 helpString += "The trump option will remove a column if the trump character is found at that position in any sequence of the alignment. Default=*, meaning no trump. \n";
44 helpString += "A soft mask removes any column where the dominant base (i.e. A, T, G, C, or U) does not occur in at least a designated percentage of sequences. Default=0.\n";
45 helpString += "The hard parameter allows you to enter a file containing the filter you want to use.\n";
46 helpString += "The vertical parameter removes columns where all sequences contain a gap character. The default is T.\n";
47 helpString += "The processors parameter allows you to specify the number of processors to use. The default is 1.\n";
48 helpString += "The filter.seqs command should be in the following format: \n";
49 helpString += "filter.seqs(fasta=yourFastaFile, trump=yourTrump) \n";
50 helpString += "Example filter.seqs(fasta=abrecovery.fasta, trump=.).\n";
51 helpString += "Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFasta).\n";
55 m->errorOut(e, "FilterSeqsCommand", "getHelpString");
59 //**********************************************************************************************************************
60 FilterSeqsCommand::FilterSeqsCommand(){
62 abort = true; calledHelp = true;
64 vector<string> tempOutNames;
65 outputTypes["fasta"] = tempOutNames;
66 outputTypes["filter"] = tempOutNames;
69 m->errorOut(e, "FilterSeqsCommand", "FilterSeqsCommand");
73 /**************************************************************************************/
74 FilterSeqsCommand::FilterSeqsCommand(string option) {
76 abort = false; calledHelp = false;
79 //allow user to run help
80 if(option == "help") { help(); abort = true; calledHelp = true; }
81 else if(option == "citation") { citation(); abort = true; calledHelp = true;}
84 vector<string> myArray = setParameters();
86 OptionParser parser(option);
87 map<string,string> parameters = parser.getParameters();
89 ValidParameters validParameter("filter.seqs");
90 map<string,string>::iterator it;
92 //check to make sure all parameters are valid for command
93 for (it = parameters.begin(); it != parameters.end(); it++) {
94 if (validParameter.isValidParameter(it->first, myArray, it->second) != true) { abort = true; }
97 //initialize outputTypes
98 vector<string> tempOutNames;
99 outputTypes["fasta"] = tempOutNames;
100 outputTypes["filter"] = tempOutNames;
102 //if the user changes the input directory command factory will send this info to us in the output parameter
103 string inputDir = validParameter.validFile(parameters, "inputdir", false);
104 if (inputDir == "not found"){ inputDir = ""; }
107 it = parameters.find("fasta");
108 //user has given a template file
109 if(it != parameters.end()){
110 path = m->hasPath(it->second);
111 //if the user has not given a path then, add inputdir. else leave path alone.
112 if (path == "") { parameters["fasta"] = inputDir + it->second; }
115 it = parameters.find("hard");
116 //user has given a template file
117 if(it != parameters.end()){
118 path = m->hasPath(it->second);
119 //if the user has not given a path then, add inputdir. else leave path alone.
120 if (path == "") { parameters["hard"] = inputDir + it->second; }
124 //check for required parameters
125 fasta = validParameter.validFile(parameters, "fasta", false);
126 if (fasta == "not found") {
127 fasta = m->getFastaFile();
129 fastafileNames.push_back(fasta);
130 m->mothurOut("Using " + fasta + " as input file for the fasta parameter."); m->mothurOutEndLine();
131 string simpleName = m->getSimpleName(fasta);
132 filterFileName += simpleName.substr(0, simpleName.find_first_of('.'));
134 else { m->mothurOut("You have no current fastafile and the fasta parameter is required."); m->mothurOutEndLine(); abort = true; }
137 m->splitAtDash(fasta, fastafileNames);
139 //go through files and make sure they are good, if not, then disregard them
140 for (int i = 0; i < fastafileNames.size(); i++) {
143 if (fastafileNames[i] == "current") {
144 fastafileNames[i] = m->getFastaFile();
145 if (fastafileNames[i] != "") { m->mothurOut("Using " + fastafileNames[i] + " as input file for the fasta parameter where you had given current."); m->mothurOutEndLine(); }
147 m->mothurOut("You have no current fastafile, ignoring current."); m->mothurOutEndLine(); ignore=true;
148 //erase from file list
149 fastafileNames.erase(fastafileNames.begin()+i);
155 if (inputDir != "") {
156 string path = m->hasPath(fastafileNames[i]);
157 //if the user has not given a path then, add inputdir. else leave path alone.
158 if (path == "") { fastafileNames[i] = inputDir + fastafileNames[i]; }
162 int ableToOpen = m->openInputFile(fastafileNames[i], in, "noerror");
164 //if you can't open it, try default location
165 if (ableToOpen == 1) {
166 if (m->getDefaultPath() != "") { //default path is set
167 string tryPath = m->getDefaultPath() + m->getSimpleName(fastafileNames[i]);
168 m->mothurOut("Unable to open " + fastafileNames[i] + ". Trying default " + tryPath); m->mothurOutEndLine();
170 ableToOpen = m->openInputFile(tryPath, in2, "noerror");
172 fastafileNames[i] = tryPath;
176 //if you can't open it, try default location
177 if (ableToOpen == 1) {
178 if (m->getOutputDir() != "") { //default path is set
179 string tryPath = m->getOutputDir() + m->getSimpleName(fastafileNames[i]);
180 m->mothurOut("Unable to open " + fastafileNames[i] + ". Trying output directory " + tryPath); m->mothurOutEndLine();
182 ableToOpen = m->openInputFile(tryPath, in2, "noerror");
184 fastafileNames[i] = tryPath;
190 if (ableToOpen == 1) {
191 m->mothurOut("Unable to open " + fastafileNames[i] + ". It will be disregarded."); m->mothurOutEndLine();
192 //erase from file list
193 fastafileNames.erase(fastafileNames.begin()+i);
196 string simpleName = m->getSimpleName(fastafileNames[i]);
197 filterFileName += simpleName.substr(0, simpleName.find_first_of('.'));
198 m->setFastaFile(fastafileNames[i]);
204 //make sure there is at least one valid file left
205 if (fastafileNames.size() == 0) { m->mothurOut("no valid files."); m->mothurOutEndLine(); abort = true; }
209 //if the user changes the output directory command factory will send this info to us in the output parameter
210 outputDir = validParameter.validFile(parameters, "outputdir", false); if (outputDir == "not found"){
212 outputDir += m->hasPath(fastafileNames[0]); //if user entered a file with a path then preserve it
215 //check for optional parameter and set defaults
216 // ...at some point should added some additional type checking...
219 hard = validParameter.validFile(parameters, "hard", true); if (hard == "not found") { hard = ""; }
220 else if (hard == "not open") { hard = ""; abort = true; }
222 temp = validParameter.validFile(parameters, "trump", false); if (temp == "not found") { temp = "*"; }
225 temp = validParameter.validFile(parameters, "soft", false); if (temp == "not found") { soft = 0; }
226 else { soft = (float)atoi(temp.c_str()) / 100.0; }
228 temp = validParameter.validFile(parameters, "processors", false); if (temp == "not found"){ temp = m->getProcessors(); }
229 m->setProcessors(temp);
230 m->mothurConvert(temp, processors);
232 vertical = validParameter.validFile(parameters, "vertical", false);
233 if (vertical == "not found") {
234 if ((hard == "") && (trump == '*') && (soft == 0)) { vertical = "T"; } //you have not given a hard file or set the trump char.
235 else { vertical = "F"; }
242 catch(exception& e) {
243 m->errorOut(e, "FilterSeqsCommand", "FilterSeqsCommand");
247 /**************************************************************************************/
249 int FilterSeqsCommand::execute() {
252 if (abort == true) { if (calledHelp) { return 0; } return 2; }
255 m->openInputFile(fastafileNames[0], inFASTA);
257 Sequence testSeq(inFASTA);
258 alignmentLength = testSeq.getAlignLength();
261 ////////////create filter/////////////////
262 m->mothurOut("Creating Filter... "); m->mothurOutEndLine();
264 filter = createFilter();
266 m->mothurOutEndLine(); m->mothurOutEndLine();
268 if (m->control_pressed) { outputTypes.clear(); return 0; }
272 MPI_Comm_rank(MPI_COMM_WORLD, &pid);
274 if (pid == 0) { //only one process should output the filter
279 //prevent giantic file name
281 if (fastafileNames.size() > 3) { filterFile = outputDir + "merge.filter"; }
282 else { filterFile = outputDir + filterFileName + ".filter"; }
284 m->openOutputFile(filterFile, outFilter);
285 outFilter << filter << endl;
287 outputNames.push_back(filterFile); outputTypes["filter"].push_back(filterFile);
293 ////////////run filter/////////////////
295 m->mothurOut("Running Filter... "); m->mothurOutEndLine();
299 m->mothurOutEndLine(); m->mothurOutEndLine();
301 int filteredLength = 0;
302 for(int i=0;i<alignmentLength;i++){
303 if(filter[i] == '1'){ filteredLength++; }
306 if (m->control_pressed) { outputTypes.clear(); for(int i = 0; i < outputNames.size(); i++) { m->mothurRemove(outputNames[i]); } return 0; }
309 m->mothurOutEndLine();
310 m->mothurOut("Length of filtered alignment: " + toString(filteredLength)); m->mothurOutEndLine();
311 m->mothurOut("Number of columns removed: " + toString((alignmentLength-filteredLength))); m->mothurOutEndLine();
312 m->mothurOut("Length of the original alignment: " + toString(alignmentLength)); m->mothurOutEndLine();
313 m->mothurOut("Number of sequences used to construct filter: " + toString(numSeqs)); m->mothurOutEndLine();
315 //set fasta file as new current fastafile
317 itTypes = outputTypes.find("fasta");
318 if (itTypes != outputTypes.end()) {
319 if ((itTypes->second).size() != 0) { current = (itTypes->second)[0]; m->setFastaFile(current); }
322 m->mothurOutEndLine();
323 m->mothurOut("Output File Names: "); m->mothurOutEndLine();
324 for(int i = 0; i < outputNames.size(); i++) { m->mothurOut(outputNames[i]); m->mothurOutEndLine(); }
325 m->mothurOutEndLine();
330 catch(exception& e) {
331 m->errorOut(e, "FilterSeqsCommand", "execute");
335 /**************************************************************************************/
336 int FilterSeqsCommand::filterSequences() {
341 for (int s = 0; s < fastafileNames.size(); s++) {
343 for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
345 string filteredFasta = outputDir + m->getRootName(m->getSimpleName(fastafileNames[s])) + "filter.fasta";
347 int pid, numSeqsPerProcessor, num;
349 vector<unsigned long long>MPIPos;
352 MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
353 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
357 int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY;
358 int inMode=MPI_MODE_RDONLY;
360 char outFilename[1024];
361 strcpy(outFilename, filteredFasta.c_str());
363 char inFileName[1024];
364 strcpy(inFileName, fastafileNames[s].c_str());
366 MPI_File_open(MPI_COMM_WORLD, inFileName, inMode, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer
367 MPI_File_open(MPI_COMM_WORLD, outFilename, outMode, MPI_INFO_NULL, &outMPI);
369 if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; }
371 if (pid == 0) { //you are the root process
373 MPIPos = m->setFilePosFasta(fastafileNames[s], num); //fills MPIPos, returns numSeqs
376 //send file positions to all processes
377 for(int i = 1; i < processors; i++) {
378 MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
379 MPI_Send(&MPIPos[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD);
382 //figure out how many sequences you have to do
383 numSeqsPerProcessor = num / processors;
384 int startIndex = pid * numSeqsPerProcessor;
385 if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; }
389 driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos);
391 if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; }
394 for(int i = 1; i < processors; i++) {
396 MPI_Recv(buf, 5, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
399 }else { //you are a child process
400 MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
401 MPIPos.resize(num+1);
403 MPI_Recv(&MPIPos[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status);
405 //figure out how many sequences you have to align
406 numSeqsPerProcessor = num / processors;
407 int startIndex = pid * numSeqsPerProcessor;
408 if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; }
412 driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos);
414 if (m->control_pressed) { MPI_File_close(&inMPI); MPI_File_close(&outMPI); return 0; }
419 //tell parent you are done.
420 MPI_Send(buf, 5, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
423 MPI_File_close(&outMPI);
424 MPI_File_close(&inMPI);
425 MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
429 vector<unsigned long long> positions;
430 if (savedPositions.size() != 0) { positions = savedPositions[s]; }
432 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
433 positions = m->divideFile(fastafileNames[s], processors);
436 int numFastaSeqs = 0;
437 positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs);
438 if (positions.size() < processors) { processors = positions.size(); }
442 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
443 //vector<unsigned long long> positions = m->divideFile(fastafileNames[s], processors);
445 for (int i = 0; i < (positions.size()-1); i++) {
446 lines.push_back(new linePair(positions[i], positions[(i+1)]));
450 int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
451 numSeqs += numFastaSeqs;
453 int numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s], filteredFasta);
454 numSeqs += numFastaSeqs;
457 if (m->control_pressed) { return 1; }
460 lines.push_back(new linePair(0, 1000));
461 int numFastaSeqs = driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
462 numSeqs += numFastaSeqs;
464 int numFastaSeqs = positions.size()-1;
465 //positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs);
467 //figure out how many sequences you have to process
468 int numSeqsPerProcessor = numFastaSeqs / processors;
469 for (int i = 0; i < processors; i++) {
470 int startIndex = i * numSeqsPerProcessor;
471 if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }
472 lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
475 numFastaSeqs = createProcessesRunFilter(filter, fastafileNames[s], filteredFasta);
476 numSeqs += numFastaSeqs;
479 if (m->control_pressed) { return 1; }
482 outputNames.push_back(filteredFasta); outputTypes["fasta"].push_back(filteredFasta);
487 catch(exception& e) {
488 m->errorOut(e, "FilterSeqsCommand", "filterSequences");
493 /**************************************************************************************/
494 int FilterSeqsCommand::driverMPIRun(int start, int num, MPI_File& inMPI, MPI_File& outMPI, vector<unsigned long long>& MPIPos) {
496 string outputString = "";
500 for(int i=0;i<num;i++){
502 if (m->control_pressed) { return 0; }
505 int length = MPIPos[start+i+1] - MPIPos[start+i];
506 char* buf4 = new char[length];
507 MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status);
509 string tempBuf = buf4;
510 if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); }
511 istringstream iss (tempBuf,istringstream::in);
514 Sequence seq(iss); m->gobble(iss);
516 if (seq.getName() != "") {
517 string align = seq.getAligned();
518 string filterSeq = "";
520 for(int j=0;j<alignmentLength;j++){
521 if(filter[j] == '1'){
522 filterSeq += align[j];
527 outputString += ">" + seq.getName() + "\n" + filterSeq + "\n";
529 if(count % 10 == 0){ //output to file
530 //send results to parent
531 int length = outputString.length();
532 char* buf = new char[length];
533 memcpy(buf, outputString.c_str(), length);
535 MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status);
542 if((i+1) % 100 == 0){ cout << (i+1) << endl; m->mothurOutJustToLog(toString(i+1) + "\n"); }
545 if(outputString != ""){ //output to file
546 //send results to parent
547 int length = outputString.length();
548 char* buf = new char[length];
549 memcpy(buf, outputString.c_str(), length);
551 MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status);
556 if((num) % 100 != 0){ cout << (num) << endl; m->mothurOutJustToLog(toString(num) + "\n"); }
560 catch(exception& e) {
561 m->errorOut(e, "FilterSeqsCommand", "driverRunFilter");
566 /**************************************************************************************/
567 int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string inputFilename, linePair* filePos) {
570 m->openOutputFile(outputFilename, out);
573 m->openInputFile(inputFilename, in);
575 in.seekg(filePos->start);
582 if (m->control_pressed) { in.close(); out.close(); return 0; }
584 Sequence seq(in); m->gobble(in);
585 if (seq.getName() != "") {
586 string align = seq.getAligned();
587 string filterSeq = "";
589 for(int j=0;j<alignmentLength;j++){
590 if(filter[j] == '1'){
591 filterSeq += align[j];
595 out << '>' << seq.getName() << endl << filterSeq << endl;
599 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
600 unsigned long long pos = in.tellg();
601 if ((pos == -1) || (pos >= filePos->end)) { break; }
603 if (in.eof()) { break; }
607 if((count) % 100 == 0){ m->mothurOut(toString(count)); m->mothurOutEndLine(); }
610 if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine(); }
618 catch(exception& e) {
619 m->errorOut(e, "FilterSeqsCommand", "driverRunFilter");
623 /**************************************************************************************************/
625 int FilterSeqsCommand::createProcessesRunFilter(string F, string filename, string filteredFastaName) {
632 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
635 //loop through and create all the processes you want
636 while (process != processors) {
640 processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
643 string filteredFasta = filename + toString(getpid()) + ".temp";
644 num = driverRunFilter(F, filteredFasta, filename, lines[process]);
646 //pass numSeqs to parent
648 string tempFile = filename + toString(getpid()) + ".num.temp";
649 m->openOutputFile(tempFile, out);
655 m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
656 for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
661 num = driverRunFilter(F, filteredFastaName, filename, lines[0]);
663 //force parent to wait until all the processes are done
664 for (int i=0;i<processIDS.size();i++) {
665 int temp = processIDS[i];
669 for (int i = 0; i < processIDS.size(); i++) {
671 string tempFile = filename + toString(processIDS[i]) + ".num.temp";
672 m->openInputFile(tempFile, in);
673 if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
674 in.close(); m->mothurRemove(tempFile);
676 m->appendFiles((filename + toString(processIDS[i]) + ".temp"), filteredFastaName);
677 m->mothurRemove((filename + toString(processIDS[i]) + ".temp"));
682 //////////////////////////////////////////////////////////////////////////////////////////////////////
683 //Windows version shared memory, so be careful when passing variables through the filterData struct.
684 //Above fork() will clone, so memory is separate, but that's not the case with windows,
685 //Taking advantage of shared memory to allow both threads to add info to F.
686 //////////////////////////////////////////////////////////////////////////////////////////////////////
688 vector<filterRunData*> pDataArray;
689 DWORD dwThreadIdArray[processors-1];
690 HANDLE hThreadArray[processors-1];
692 //Create processor worker threads.
693 for( int i=0; i<processors-1; i++){
695 string extension = "";
696 if (i != 0) { extension = toString(i) + ".temp"; }
698 filterRunData* tempFilter = new filterRunData(filter, filename, (filteredFastaName + extension), m, lines[i]->start, lines[i]->end, alignmentLength, i);
699 pDataArray.push_back(tempFilter);
700 processIDS.push_back(i);
702 hThreadArray[i] = CreateThread(NULL, 0, MyRunFilterThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
705 num = driverRunFilter(F, (filteredFastaName + toString(processors-1) + ".temp"), filename, lines[processors-1]);
707 //Wait until all threads have terminated.
708 WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
710 //Close all thread handles and free memory allocations.
711 for(int i=0; i < pDataArray.size(); i++){
712 num += pDataArray[i]->count;
713 CloseHandle(hThreadArray[i]);
714 delete pDataArray[i];
717 for (int i = 1; i < processors; i++) {
718 m->appendFiles((filteredFastaName + toString(i) + ".temp"), filteredFastaName);
719 m->mothurRemove((filteredFastaName + toString(i) + ".temp"));
726 catch(exception& e) {
727 m->errorOut(e, "FilterSeqsCommand", "createProcessesRunFilter");
731 /**************************************************************************************/
732 string FilterSeqsCommand::createFilter() {
734 string filterString = "";
737 if (soft != 0) { F.setSoft(soft); }
738 if (trump != '*') { F.setTrump(trump); }
740 F.setLength(alignmentLength);
742 if(trump != '*' || m->isTrue(vertical) || soft != 0){
746 if(hard.compare("") != 0) { F.doHard(hard); }
747 else { F.setFilter(string(alignmentLength, '1')); }
750 if(trump != '*' || m->isTrue(vertical) || soft != 0){
751 for (int s = 0; s < fastafileNames.size(); s++) {
753 for (int i = 0; i < lines.size(); i++) { delete lines[i]; } lines.clear();
756 int pid, numSeqsPerProcessor, num;
758 vector<unsigned long long> MPIPos;
762 MPI_Comm_size(MPI_COMM_WORLD, &processors);
763 MPI_Comm_rank(MPI_COMM_WORLD, &pid);
765 //char* tempFileName = new char(fastafileNames[s].length());
766 //tempFileName = &(fastafileNames[s][0]);
768 char tempFileName[1024];
769 strcpy(tempFileName, fastafileNames[s].c_str());
771 MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer
773 if (m->control_pressed) { MPI_File_close(&inMPI); return 0; }
775 if (pid == 0) { //you are the root process
776 MPIPos = m->setFilePosFasta(fastafileNames[s], num); //fills MPIPos, returns numSeqs
779 //send file positions to all processes
780 for(int i = 1; i < processors; i++) {
781 MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
782 MPI_Send(&MPIPos[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD);
785 //figure out how many sequences you have to do
786 numSeqsPerProcessor = num / processors;
787 int startIndex = pid * numSeqsPerProcessor;
788 if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; }
792 MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos);
794 if (m->control_pressed) { MPI_File_close(&inMPI); return 0; }
796 }else { //i am the child process
797 MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
798 MPIPos.resize(num+1);
800 MPI_Recv(&MPIPos[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status);
802 //figure out how many sequences you have to align
803 numSeqsPerProcessor = num / processors;
804 int startIndex = pid * numSeqsPerProcessor;
805 if(pid == (processors - 1)){ numSeqsPerProcessor = num - pid * numSeqsPerProcessor; }
809 MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos);
811 if (m->control_pressed) { MPI_File_close(&inMPI); return 0; }
814 MPI_File_close(&inMPI);
815 MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
819 vector<unsigned long long> positions;
820 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
821 positions = m->divideFile(fastafileNames[s], processors);
822 for (int i = 0; i < (positions.size()-1); i++) {
823 lines.push_back(new linePair(positions[i], positions[(i+1)]));
827 int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
828 numSeqs += numFastaSeqs;
830 int numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]);
831 numSeqs += numFastaSeqs;
835 lines.push_back(new linePair(0, 1000));
836 int numFastaSeqs = driverCreateFilter(F, fastafileNames[s], lines[0]);
837 numSeqs += numFastaSeqs;
839 int numFastaSeqs = 0;
840 positions = m->setFilePosFasta(fastafileNames[s], numFastaSeqs);
841 if (positions.size() < processors) { processors = positions.size(); }
843 //figure out how many sequences you have to process
844 int numSeqsPerProcessor = numFastaSeqs / processors;
845 for (int i = 0; i < processors; i++) {
846 int startIndex = i * numSeqsPerProcessor;
847 if(i == (processors - 1)){ numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor; }
848 lines.push_back(new linePair(positions[startIndex], numSeqsPerProcessor));
851 numFastaSeqs = createProcessesCreateFilter(F, fastafileNames[s]);
852 numSeqs += numFastaSeqs;
855 //save the file positions so we can reuse them in the runFilter function
856 savedPositions[s] = positions;
858 if (m->control_pressed) { return filterString; }
867 int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
870 MPI_Comm_rank(MPI_COMM_WORLD, &pid);
872 if(trump != '*' || m->isTrue(vertical) || soft != 0){
874 if (pid == 0) { //only one process should output the filter
876 vector<int> temp; temp.resize(alignmentLength+1);
878 //get the frequencies from the child processes
879 for(int i = 1; i < processors; i++) {
881 for (int j = 0; j < 5; j++) {
883 MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status);
884 int receiveTag = temp[temp.size()-1]; //child process added a int to the end to indicate what letter count this is for
886 if (receiveTag == Atag) { //you are recieveing the A frequencies
887 for (int k = 0; k < alignmentLength; k++) { F.a[k] += temp[k]; }
888 }else if (receiveTag == Ttag) { //you are recieveing the T frequencies
889 for (int k = 0; k < alignmentLength; k++) { F.t[k] += temp[k]; }
890 }else if (receiveTag == Ctag) { //you are recieveing the C frequencies
891 for (int k = 0; k < alignmentLength; k++) { F.c[k] += temp[k]; }
892 }else if (receiveTag == Gtag) { //you are recieveing the G frequencies
893 for (int k = 0; k < alignmentLength; k++) { F.g[k] += temp[k]; }
894 }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies
895 for (int k = 0; k < alignmentLength; k++) { F.gap[k] += temp[k]; }
901 //send my fequency counts
903 int ierr = MPI_Send(&(F.a[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
905 ierr = MPI_Send (&(F.t[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
907 ierr = MPI_Send(&(F.c[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
909 ierr = MPI_Send(&(F.g[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
910 F.gap.push_back(Gaptag);
911 ierr = MPI_Send(&(F.gap[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
916 MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
918 if (pid == 0) { //only one process should output the filter
921 F.setNumSeqs(numSeqs);
922 if(m->isTrue(vertical) == 1) { F.doVertical(); }
923 if(soft != 0) { F.doSoft(); }
924 filterString = F.getFilter();
927 //send filter string to kids
928 //for(int i = 1; i < processors; i++) {
929 // MPI_Send(&filterString[0], alignmentLength, MPI_CHAR, i, 2001, MPI_COMM_WORLD);
931 MPI_Bcast(&filterString[0], alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD);
933 //recieve filterString
934 char* tempBuf = new char[alignmentLength];
935 //MPI_Recv(&tempBuf[0], alignmentLength, MPI_CHAR, 0, 2001, MPI_COMM_WORLD, &status);
936 MPI_Bcast(tempBuf, alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD);
938 filterString = tempBuf;
939 if (filterString.length() > alignmentLength) { filterString = filterString.substr(0, alignmentLength); }
943 MPI_Barrier(MPI_COMM_WORLD);
948 catch(exception& e) {
949 m->errorOut(e, "FilterSeqsCommand", "createFilter");
953 /**************************************************************************************/
954 int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* filePos) {
958 m->openInputFile(filename, in);
960 in.seekg(filePos->start);
967 if (m->control_pressed) { in.close(); return 1; }
969 Sequence seq(in); m->gobble(in);
970 if (seq.getName() != "") {
971 if (seq.getAligned().length() != alignmentLength) { m->mothurOut("Sequences are not all the same length, please correct."); m->mothurOutEndLine(); m->control_pressed = true; }
973 if(trump != '*') { F.doTrump(seq); }
974 if(m->isTrue(vertical) || soft != 0) { F.getFreqs(seq); }
979 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
980 unsigned long long pos = in.tellg();
981 if ((pos == -1) || (pos >= filePos->end)) { break; }
983 if (in.eof()) { break; }
987 if((count) % 100 == 0){ m->mothurOut(toString(count)); m->mothurOutEndLine(); }
990 if((count) % 100 != 0){ m->mothurOut(toString(count)); m->mothurOutEndLine(); }
995 catch(exception& e) {
996 m->errorOut(e, "FilterSeqsCommand", "driverCreateFilter");
1001 /**************************************************************************************/
1002 int FilterSeqsCommand::MPICreateFilter(int start, int num, Filters& F, MPI_File& inMPI, vector<unsigned long long>& MPIPos) {
1007 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
1009 for(int i=0;i<num;i++){
1011 if (m->control_pressed) { return 0; }
1013 //read next sequence
1014 int length = MPIPos[start+i+1] - MPIPos[start+i];
1016 char* buf4 = new char[length];
1017 MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status);
1019 string tempBuf = buf4;
1020 if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length); }
1021 istringstream iss (tempBuf,istringstream::in);
1026 if (seq.getAligned().length() != alignmentLength) { cout << "Alignment length is " << alignmentLength << " and sequence " << seq.getName() << " has length " << seq.getAligned().length() << ", please correct." << endl; exit(1); }
1028 if(trump != '*'){ F.doTrump(seq); }
1029 if(m->isTrue(vertical) || soft != 0){ F.getFreqs(seq); }
1033 if((i+1) % 100 == 0){ cout << (i+1) << endl; m->mothurOutJustToLog(toString(i+1) + "\n"); }
1037 if((num) % 100 != 0){ cout << num << endl; m->mothurOutJustToLog(toString(num) + "\n"); }
1041 catch(exception& e) {
1042 m->errorOut(e, "FilterSeqsCommand", "MPICreateFilter");
1047 /**************************************************************************************************/
1049 int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
1055 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
1057 //loop through and create all the processes you want
1058 while (process != processors) {
1062 processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
1064 }else if (pid == 0){
1065 //reset child's filter counts to 0;
1066 F.a.clear(); F.a.resize(alignmentLength, 0);
1067 F.t.clear(); F.t.resize(alignmentLength, 0);
1068 F.g.clear(); F.g.resize(alignmentLength, 0);
1069 F.c.clear(); F.c.resize(alignmentLength, 0);
1070 F.gap.clear(); F.gap.resize(alignmentLength, 0);
1072 num = driverCreateFilter(F, filename, lines[process]);
1074 //write out filter counts to file
1075 filename += toString(getpid()) + "filterValues.temp";
1077 m->openOutputFile(filename, out);
1080 out << F.getFilter() << endl;
1081 for (int k = 0; k < alignmentLength; k++) { out << F.a[k] << '\t'; } out << endl;
1082 for (int k = 0; k < alignmentLength; k++) { out << F.t[k] << '\t'; } out << endl;
1083 for (int k = 0; k < alignmentLength; k++) { out << F.g[k] << '\t'; } out << endl;
1084 for (int k = 0; k < alignmentLength; k++) { out << F.c[k] << '\t'; } out << endl;
1085 for (int k = 0; k < alignmentLength; k++) { out << F.gap[k] << '\t'; } out << endl;
1087 //cout << F.getFilter() << endl;
1092 m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
1093 for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
1098 //parent do your part
1099 num = driverCreateFilter(F, filename, lines[0]);
1101 //force parent to wait until all the processes are done
1102 for (int i=0;i<(processors-1);i++) {
1103 int temp = processIDS[i];
1107 //parent reads in and combines Filter info
1108 for (int i = 0; i < processIDS.size(); i++) {
1109 string tempFilename = filename + toString(processIDS[i]) + "filterValues.temp";
1111 m->openInputFile(tempFilename, in);
1114 string tempFilterString;
1116 in >> tempNum; m->gobble(in); num += tempNum;
1118 in >> tempFilterString;
1119 F.mergeFilter(tempFilterString);
1121 for (int k = 0; k < alignmentLength; k++) { in >> temp; F.a[k] += temp; } m->gobble(in);
1122 for (int k = 0; k < alignmentLength; k++) { in >> temp; F.t[k] += temp; } m->gobble(in);
1123 for (int k = 0; k < alignmentLength; k++) { in >> temp; F.g[k] += temp; } m->gobble(in);
1124 for (int k = 0; k < alignmentLength; k++) { in >> temp; F.c[k] += temp; } m->gobble(in);
1125 for (int k = 0; k < alignmentLength; k++) { in >> temp; F.gap[k] += temp; } m->gobble(in);
1128 m->mothurRemove(tempFilename);
1134 //////////////////////////////////////////////////////////////////////////////////////////////////////
1135 //Windows version shared memory, so be careful when passing variables through the filterData struct.
1136 //Above fork() will clone, so memory is separate, but that's not the case with windows,
1137 //Taking advantage of shared memory to allow both threads to add info to F.
1138 //////////////////////////////////////////////////////////////////////////////////////////////////////
1140 vector<filterData*> pDataArray;
1141 DWORD dwThreadIdArray[processors];
1142 HANDLE hThreadArray[processors];
1144 //Create processor worker threads.
1145 for( int i=0; i<processors; i++ ){
1147 filterData* tempFilter = new filterData(filename, m, lines[i]->start, lines[i]->end, alignmentLength, trump, vertical, soft, hard, i);
1148 pDataArray.push_back(tempFilter);
1149 processIDS.push_back(i);
1151 hThreadArray[i] = CreateThread(NULL, 0, MyCreateFilterThreadFunction, pDataArray[i], 0, &dwThreadIdArray[i]);
1154 //Wait until all threads have terminated.
1155 WaitForMultipleObjects(processors, hThreadArray, TRUE, INFINITE);
1157 //Close all thread handles and free memory allocations.
1158 for(int i=0; i < pDataArray.size(); i++){
1159 num += pDataArray[i]->count;
1160 F.mergeFilter(pDataArray[i]->F.getFilter());
1162 for (int k = 0; k < alignmentLength; k++) { F.a[k] += pDataArray[i]->F.a[k]; }
1163 for (int k = 0; k < alignmentLength; k++) { F.t[k] += pDataArray[i]->F.t[k]; }
1164 for (int k = 0; k < alignmentLength; k++) { F.g[k] += pDataArray[i]->F.g[k]; }
1165 for (int k = 0; k < alignmentLength; k++) { F.c[k] += pDataArray[i]->F.c[k]; }
1166 for (int k = 0; k < alignmentLength; k++) { F.gap[k] += pDataArray[i]->F.gap[k]; }
1168 CloseHandle(hThreadArray[i]);
1169 delete pDataArray[i];
1176 catch(exception& e) {
1177 m->errorOut(e, "FilterSeqsCommand", "createProcessesCreateFilter");
1181 /**************************************************************************************/