]> git.donarmstrong.com Git - mothur.git/blob - filterseqscommand.cpp
1c70b3e493fd2c3bf407746f8f97ddb276b801ea
[mothur.git] / filterseqscommand.cpp
1 /*
2  *  filterseqscommand.cpp
3  *  Mothur
4  *
5  *  Created by Thomas Ryabin on 5/4/09.
6  *  Copyright 2009 Schloss Lab UMASS Amherst. All rights reserved.
7  *
8  */
9
10 #include "filterseqscommand.h"
11 #include "sequence.hpp"
12
13 /**************************************************************************************/
14
15 FilterSeqsCommand::FilterSeqsCommand(string option)  {
16         try {
17                 abort = false;
18                 filterFileName = "";
19                 
20                 //allow user to run help
21                 if(option == "help") { help(); abort = true; }
22                 
23                 else {
24                         //valid paramters for this command
25                         string Array[] =  {"fasta", "trump", "soft", "hard", "vertical", "outputdir","inputdir", "processors"};
26                         vector<string> myArray (Array, Array+(sizeof(Array)/sizeof(string)));
27                         
28                         OptionParser parser(option);
29                         map<string,string> parameters = parser.getParameters();
30                         
31                         ValidParameters validParameter;
32                         map<string,string>::iterator it;
33                         
34                         //check to make sure all parameters are valid for command
35                         for (it = parameters.begin(); it != parameters.end(); it++) { 
36                                 if (validParameter.isValidParameter(it->first, myArray, it->second) != true) {  abort = true;  }
37                         }
38                         
39                         //if the user changes the input directory command factory will send this info to us in the output parameter 
40                         string inputDir = validParameter.validFile(parameters, "inputdir", false);              
41                         if (inputDir == "not found"){   inputDir = "";          }
42                         else {
43                                 string path;
44                                 it = parameters.find("fasta");
45                                 //user has given a template file
46                                 if(it != parameters.end()){ 
47                                         path = hasPath(it->second);
48                                         //if the user has not given a path then, add inputdir. else leave path alone.
49                                         if (path == "") {       parameters["fasta"] = inputDir + it->second;            }
50                                 }
51                                 
52                                 it = parameters.find("hard");
53                                 //user has given a template file
54                                 if(it != parameters.end()){ 
55                                         path = hasPath(it->second);
56                                         //if the user has not given a path then, add inputdir. else leave path alone.
57                                         if (path == "") {       parameters["hard"] = inputDir + it->second;             }
58                                 }
59                         }
60                         
61                         //check for required parameters
62                         fasta = validParameter.validFile(parameters, "fasta", false);
63                         if (fasta == "not found") { m->mothurOut("fasta is a required parameter for the filter.seqs command."); m->mothurOutEndLine(); abort = true;  }
64                         else { 
65                                 splitAtDash(fasta, fastafileNames);
66                                 
67                                 //go through files and make sure they are good, if not, then disregard them
68                                 for (int i = 0; i < fastafileNames.size(); i++) {
69                                         if (inputDir != "") {
70                                                 string path = hasPath(fastafileNames[i]);
71                                                 //if the user has not given a path then, add inputdir. else leave path alone.
72                                                 if (path == "") {       fastafileNames[i] = inputDir + fastafileNames[i];               }
73                                         }
74
75                                         int ableToOpen;
76                                         ifstream in;
77                                         ableToOpen = openInputFile(fastafileNames[i], in);
78                                         if (ableToOpen == 1) { 
79                                                 m->mothurOut(fastafileNames[i] + " will be disregarded."); m->mothurOutEndLine(); 
80                                                 //erase from file list
81                                                 fastafileNames.erase(fastafileNames.begin()+i);
82                                                 i--;
83                                         }else{  
84                                                 string simpleName = getSimpleName(fastafileNames[i]);
85                                                 filterFileName += simpleName.substr(0, simpleName.find_first_of('.'));
86                                         }
87                                         in.close();
88                                 }
89                                 
90                                 //make sure there is at least one valid file left
91                                 if (fastafileNames.size() == 0) { m->mothurOut("no valid files."); m->mothurOutEndLine(); abort = true; }
92                         }
93                         
94                         if (!abort) {
95                                 //if the user changes the output directory command factory will send this info to us in the output parameter 
96                                 outputDir = validParameter.validFile(parameters, "outputdir", false);           if (outputDir == "not found"){  
97                                         outputDir = ""; 
98                                         outputDir += hasPath(fastafileNames[0]); //if user entered a file with a path then preserve it  
99                                 }
100                         }
101                         //check for optional parameter and set defaults
102                         // ...at some point should added some additional type checking...
103                         
104                         string temp;
105                         temp = validParameter.validFile(parameters, "trump", false);                    if (temp == "not found") { temp = "*"; }
106                         trump = temp[0];
107                         
108                         temp = validParameter.validFile(parameters, "soft", false);                             if (temp == "not found") { soft = 0; }
109                         else {  soft = (float)atoi(temp.c_str()) / 100.0;  }
110                         
111                         temp = validParameter.validFile(parameters, "processors", false);       if (temp == "not found"){       temp = "1";                             }
112                         convert(temp, processors); 
113                         
114                         hard = validParameter.validFile(parameters, "hard", true);                              if (hard == "not found") { hard = ""; }
115                         else if (hard == "not open") { abort = true; }  
116                         
117                         vertical = validParameter.validFile(parameters, "vertical", false);             if (vertical == "not found") { vertical = "T"; }
118                         
119                         numSeqs = 0;
120                         
121                 }
122                 
123         }
124         catch(exception& e) {
125                 m->errorOut(e, "FilterSeqsCommand", "FilterSeqsCommand");
126                 exit(1);
127         }
128 }
129
130 //**********************************************************************************************************************
131
132 void FilterSeqsCommand::help(){
133         try {
134                                 
135                 m->mothurOut("The filter.seqs command reads a file containing sequences and creates a .filter and .filter.fasta file.\n");
136                 m->mothurOut("The filter.seqs command parameters are fasta, trump, soft, hard and vertical. \n");
137                 m->mothurOut("The fasta parameter is required. You may enter several fasta files to build the filter from and filter, by separating their names with -'s.\n");
138                 m->mothurOut("For example: fasta=abrecovery.fasta-amazon.fasta \n");
139                 m->mothurOut("The trump parameter .... The default is ...\n");
140                 m->mothurOut("The soft parameter .... The default is ....\n");
141                 m->mothurOut("The hard parameter .... The default is ....\n");
142                 m->mothurOut("The vertical parameter .... The default is T.\n");
143                 m->mothurOut("The filter.seqs command should be in the following format: \n");
144                 m->mothurOut("filter.seqs(fasta=yourFastaFile, trump=yourTrump, soft=yourSoft, hard=yourHard, vertical=yourVertical) \n");
145                 m->mothurOut("Example filter.seqs(fasta=abrecovery.fasta, trump=..., soft=..., hard=..., vertical=T).\n");
146                 m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFasta).\n\n");
147                 
148         }
149         catch(exception& e) {
150                 m->errorOut(e, "FilterSeqsCommand", "help");
151                 exit(1);
152         }
153 }
154
155 /**************************************************************************************/
156
157 int FilterSeqsCommand::execute() {      
158         try {
159         
160                 if (abort == true) { return 0; }
161                 
162                 ifstream inFASTA;
163                 openInputFile(fastafileNames[0], inFASTA);
164                 
165                 Sequence testSeq(inFASTA);
166                 alignmentLength = testSeq.getAlignLength();
167                 inFASTA.close();
168                 
169                 ////////////create filter/////////////////
170                 m->mothurOut("Creating Filter... "); m->mothurOutEndLine();
171                 
172                 filter = createFilter();
173                 
174                 m->mothurOutEndLine();  m->mothurOutEndLine();
175                 
176                 if (m->control_pressed) { return 0; }
177                 
178                 #ifdef USE_MPI
179                         int pid;
180                         MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
181                                         
182                         if (pid == 0) { //only one process should output the filter
183                 #endif
184                 
185                 ofstream outFilter;
186                 
187                 string filterFile = outputDir + filterFileName + ".filter";
188                 openOutputFile(filterFile, outFilter);
189                 outFilter << filter << endl;
190                 outFilter.close();
191                 outputNames.push_back(filterFile);
192                 
193                 #ifdef USE_MPI
194                         }
195                 #endif
196                 
197                 ////////////run filter/////////////////
198                 
199                 m->mothurOut("Running Filter... "); m->mothurOutEndLine();
200                 
201                 filterSequences();
202                 
203                 m->mothurOutEndLine();  m->mothurOutEndLine();
204                                         
205                 int filteredLength = 0;
206                 for(int i=0;i<alignmentLength;i++){
207                         if(filter[i] == '1'){   filteredLength++;       }
208                 }
209                 
210                 if (m->control_pressed) {  for(int i = 0; i < outputNames.size(); i++) { remove(outputNames[i].c_str()); }  return 0; }
211
212                 
213                 m->mothurOutEndLine();
214                 m->mothurOut("Length of filtered alignment: " + toString(filteredLength)); m->mothurOutEndLine();
215                 m->mothurOut("Number of columns removed: " + toString((alignmentLength-filteredLength))); m->mothurOutEndLine();
216                 m->mothurOut("Length of the original alignment: " + toString(alignmentLength)); m->mothurOutEndLine();
217                 m->mothurOut("Number of sequences used to construct filter: " + toString(numSeqs)); m->mothurOutEndLine();
218                 
219                 
220                 m->mothurOutEndLine();
221                 m->mothurOut("Output File Names: "); m->mothurOutEndLine();
222                 for(int i = 0; i < outputNames.size(); i++) {  m->mothurOut(outputNames[i]); m->mothurOutEndLine();      }
223                 m->mothurOutEndLine();
224                 
225                 return 0;
226                 
227         }
228         catch(exception& e) {
229                 m->errorOut(e, "FilterSeqsCommand", "execute");
230                 exit(1);
231         }
232 }
233 /**************************************************************************************/
234 int FilterSeqsCommand::filterSequences() {      
235         try {
236                 
237                 numSeqs = 0;
238                 
239                 for (int s = 0; s < fastafileNames.size(); s++) {
240                         
241                                 for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
242                                 
243                                 string filteredFasta = outputDir + getRootName(getSimpleName(fastafileNames[s])) + "filter.fasta";
244 #ifdef USE_MPI  
245                                 int pid, start, end, numSeqsPerProcessor, num; 
246                                 int tag = 2001;
247                                 vector<long>MPIPos;
248                                                 
249                                 MPI_Status status; 
250                                 MPI_Comm_size(MPI_COMM_WORLD, &processors); //set processors to the number of mpi processes running
251                                 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
252                                 
253                                 MPI_File outMPI;
254                                 MPI_File tempMPI;
255                                 MPI_File inMPI;
256                                 int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; 
257                                 int inMode=MPI_MODE_RDONLY; 
258                                 
259                                 //char* outFilename = new char[filteredFasta.length()];
260                                 //memcpy(outFilename, filteredFasta.c_str(), filteredFasta.length());
261                                 
262                                 char outFilename[1024];
263                                 strcpy(outFilename, filteredFasta.c_str());
264
265                                 //char* inFileName = new char[fastafileNames[s].length()];
266                                 //memcpy(inFileName, fastafileNames[s].c_str(), fastafileNames[s].length());
267                                 
268                                 char inFileName[1024];
269                                 strcpy(inFileName, fastafileNames[s].c_str());
270                                 
271                                 MPI_File_open(MPI_COMM_WORLD, inFileName, inMode, MPI_INFO_NULL, &inMPI);  //comm, filename, mode, info, filepointer
272                                 MPI_File_open(MPI_COMM_WORLD, outFilename, outMode, MPI_INFO_NULL, &outMPI);
273                                 
274                                 //delete inFileName;
275                                 //delete outFilename;
276
277                                 if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);  return 0;  }
278
279                                 if (pid == 0) { //you are the root process 
280                                         
281                                         MPIPos = setFilePosFasta(fastafileNames[s], num); //fills MPIPos, returns numSeqs
282                                         numSeqs += num;
283                                         
284                                         //send file positions to all processes
285                                         for(int i = 1; i < processors; i++) { 
286                                                 MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
287                                                 MPI_Send(&MPIPos[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD);
288                                         }
289                                         
290                                         //figure out how many sequences you have to do
291                                         numSeqsPerProcessor = num / processors;
292                                         int startIndex =  pid * numSeqsPerProcessor;
293                                         if(pid == (processors - 1)){    numSeqsPerProcessor = num - pid * numSeqsPerProcessor;  }
294                                         
295                                 
296                                         //do your part
297                                         driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos);
298                                         
299                                         if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);  return 0;  }
300                                         
301                                         //wait on chidren
302                                         for(int i = 1; i < processors; i++) { 
303                                                 char buf[4];
304                                                 MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
305                                         }
306                                         
307                                 }else { //you are a child process
308                                         MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
309                                         MPIPos.resize(num+1);
310                                         numSeqs += num;
311                                         MPI_Recv(&MPIPos[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status);
312                                         
313                                         //figure out how many sequences you have to align
314                                         numSeqsPerProcessor = num / processors;
315                                         int startIndex =  pid * numSeqsPerProcessor;
316                                         if(pid == (processors - 1)){    numSeqsPerProcessor = num - pid * numSeqsPerProcessor;  }
317                                         
318                                         
319                                         //align your part
320                                         driverMPIRun(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos);           
321                                         
322                                         if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);  return 0;  }
323                                         
324                                         char buf[4];
325                                         strcpy(buf, "done"); 
326                                         
327                                         //tell parent you are done.
328                                         MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
329                                 }
330                                 
331                                 MPI_File_close(&outMPI);
332                                 MPI_File_close(&inMPI);
333                                 MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
334                                 
335 #else
336                 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
337                                 if(processors == 1){
338                                         ifstream inFASTA;
339                                         openInputFile(fastafileNames[s], inFASTA);
340                                         int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
341                                         inFASTA.close();
342                                         
343                                         lines.push_back(new linePair(0, numFastaSeqs));
344                                         
345                                         numSeqs += numFastaSeqs;
346                                         
347                                         driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
348                                 }else{
349                                         setLines(fastafileNames[s]);
350                                         createProcessesRunFilter(filter, fastafileNames[s]); 
351                                 
352                                         rename((fastafileNames[s] + toString(processIDS[0]) + ".temp").c_str(), filteredFasta.c_str());
353                                 
354                                         //append fasta files
355                                         for(int i=1;i<processors;i++){
356                                                 appendFiles((fastafileNames[s] + toString(processIDS[i]) + ".temp"), filteredFasta);
357                                                 remove((fastafileNames[s] + toString(processIDS[i]) + ".temp").c_str());
358                                         }
359                                 }
360                                 
361                                 if (m->control_pressed) {  return 1; }
362                 #else
363                                 ifstream inFASTA;
364                                 openInputFile(fastafileNames[s], inFASTA);
365                                 int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
366                                 inFASTA.close();
367                                         
368                                 lines.push_back(new linePair(0, numFastaSeqs));
369                                 
370                                 numSeqs += numFastaSeqs;
371                                 
372                                 driverRunFilter(filter, filteredFasta, fastafileNames[s], lines[0]);
373
374                                 if (m->control_pressed) {  return 1; }
375                 #endif
376 #endif
377                         outputNames.push_back(filteredFasta);
378                 }
379
380                 return 0;
381         }
382         catch(exception& e) {
383                 m->errorOut(e, "FilterSeqsCommand", "filterSequences");
384                 exit(1);
385         }
386 }
387 #ifdef USE_MPI
388 /**************************************************************************************/
389 int FilterSeqsCommand::driverMPIRun(int start, int num, MPI_File& inMPI, MPI_File& outMPI, vector<long>& MPIPos) {      
390         try {
391                 string outputString = "";
392                 int count = 0;
393                 MPI_Status status; 
394                 
395                 for(int i=0;i<num;i++){
396                 
397                         if (m->control_pressed) { return 0; }
398                 
399                         //read next sequence
400                         int length = MPIPos[start+i+1] - MPIPos[start+i];
401                         char* buf4 = new char[length];
402                         MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status);
403                         
404                         string tempBuf = buf4;
405                         if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length);  }
406                         istringstream iss (tempBuf,istringstream::in);
407                         delete buf4;
408         
409                         Sequence seq(iss);  gobble(iss);
410                         
411                         if (seq.getName() != "") {
412                                 string align = seq.getAligned();
413                                 string filterSeq = "";
414                                         
415                                 for(int j=0;j<alignmentLength;j++){
416                                         if(filter[j] == '1'){
417                                                 filterSeq += align[j];
418                                         }
419                                 }
420                                 
421                                 count++;
422                                 outputString += ">" + seq.getName() + "\n" + filterSeq + "\n";
423                                 
424                                 if(count % 10 == 0){ //output to file 
425                                         //send results to parent
426                                         int length = outputString.length();
427                                         char* buf = new char[length];
428                                         memcpy(buf, outputString.c_str(), length);
429                                 
430                                         MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status);
431                                         outputString = "";
432                                         delete buf;
433                                 }
434
435                         }
436                         
437                         if((i+1) % 100 == 0){   cout << (i+1) << endl;   m->mothurOutJustToLog(toString(i+1) + "\n");   }
438                 }
439                 
440                 if(outputString != ""){ //output to file 
441                         //send results to parent
442                         int length = outputString.length();
443                         char* buf = new char[length];
444                         memcpy(buf, outputString.c_str(), length);
445                         
446                         MPI_File_write_shared(outMPI, buf, length, MPI_CHAR, &status);
447                         outputString = "";
448                         delete buf;
449                 }
450                 
451                 if((num) % 100 != 0){   cout << (num) << endl;   m->mothurOutJustToLog(toString(num) + "\n");   }
452                         
453                 return 0;
454         }
455         catch(exception& e) {
456                 m->errorOut(e, "FilterSeqsCommand", "driverRunFilter");
457                 exit(1);
458         }
459 }
460 #endif
461 /**************************************************************************************/
462 int FilterSeqsCommand::driverRunFilter(string F, string outputFilename, string inputFilename, linePair* line) { 
463         try {
464                 ofstream out;
465                 openOutputFile(outputFilename, out);
466                 
467                 ifstream in;
468                 openInputFile(inputFilename, in);
469                                 
470                 in.seekg(line->start);
471                 
472                 for(int i=0;i<line->num;i++){
473                                 
474                                 if (m->control_pressed) { in.close(); out.close(); return 0; }
475                                 
476                                 Sequence seq(in);
477                                 if (seq.getName() != "") {
478                                         string align = seq.getAligned();
479                                         string filterSeq = "";
480                                         
481                                         for(int j=0;j<alignmentLength;j++){
482                                                 if(filter[j] == '1'){
483                                                         filterSeq += align[j];
484                                                 }
485                                         }
486                                         
487                                         out << '>' << seq.getName() << endl << filterSeq << endl;
488                                 }
489                                 gobble(in);
490                 }
491                 out.close();
492                 in.close();
493                 
494                 return 0;
495         }
496         catch(exception& e) {
497                 m->errorOut(e, "FilterSeqsCommand", "driverRunFilter");
498                 exit(1);
499         }
500 }
501 /**************************************************************************************************/
502
503 int FilterSeqsCommand::createProcessesRunFilter(string F, string filename) {
504         try {
505 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
506                 int process = 0;
507                 int exitCommand = 1;
508                 processIDS.clear();
509                 
510                 //loop through and create all the processes you want
511                 while (process != processors) {
512                         int pid = fork();
513                         
514                         if (pid > 0) {
515                                 processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
516                                 process++;
517                         }else if (pid == 0){
518                                 string filteredFasta = filename + toString(getpid()) + ".temp";
519                                 driverRunFilter(F, filteredFasta, filename, lines[process]);
520                                 exit(0);
521                         }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
522                 }
523                 
524                 //force parent to wait until all the processes are done
525                 for (int i=0;i<processors;i++) { 
526                         int temp = processIDS[i];
527                         wait(&temp);
528                 }
529                 
530                 return exitCommand;
531 #endif          
532         }
533         catch(exception& e) {
534                 m->errorOut(e, "FilterSeqsCommand", "createProcessesRunFilter");
535                 exit(1);
536         }
537 }
538 /**************************************************************************************/
539 string FilterSeqsCommand::createFilter() {      
540         try {
541                 string filterString = "";                       
542                 Filters F;
543                 
544                 if (soft != 0)                  {  F.setSoft(soft);             }
545                 if (trump != '*')               {  F.setTrump(trump);   }
546                 
547                 F.setLength(alignmentLength);
548                 
549                 if(trump != '*' || isTrue(vertical) || soft != 0){
550                         F.initialize();
551                 }
552                 
553                 if(hard.compare("") != 0)       {       F.doHard(hard);         }
554                 else                                            {       F.setFilter(string(alignmentLength, '1'));      }
555                 
556                 numSeqs = 0;
557                 if(trump != '*' || isTrue(vertical) || soft != 0){
558                         for (int s = 0; s < fastafileNames.size(); s++) {
559                         
560                                 for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
561                         
562 #ifdef USE_MPI  
563                                 int pid, numSeqsPerProcessor, num; 
564                                 int tag = 2001;
565                                 vector<long> MPIPos;
566                                 
567                                 MPI_Status status; 
568                                 MPI_File inMPI; 
569                                 MPI_Comm_size(MPI_COMM_WORLD, &processors);
570                                 MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
571                                                         
572                                 //char* tempFileName = new char(fastafileNames[s].length());
573                                 //tempFileName = &(fastafileNames[s][0]);
574                                 
575                                 char tempFileName[1024];
576                                 strcpy(tempFileName, fastafileNames[s].c_str());
577                 
578                                 MPI_File_open(MPI_COMM_WORLD, tempFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI);  //comm, filename, mode, info, filepointer
579                                 
580                                 if (m->control_pressed) {  MPI_File_close(&inMPI);  return 0;  }
581                                 
582                                 if (pid == 0) { //you are the root process
583                                                 MPIPos = setFilePosFasta(fastafileNames[s], num); //fills MPIPos, returns numSeqs
584                                                 numSeqs += num;
585                                                 
586                                                 //send file positions to all processes
587                                                 for(int i = 1; i < processors; i++) { 
588                                                         MPI_Send(&num, 1, MPI_INT, i, tag, MPI_COMM_WORLD);
589                                                         MPI_Send(&MPIPos[0], (num+1), MPI_LONG, i, tag, MPI_COMM_WORLD);
590                                                 }
591                                                                 
592                                                 //figure out how many sequences you have to do
593                                                 numSeqsPerProcessor = num / processors;
594                                                 int startIndex =  pid * numSeqsPerProcessor;
595                                                 if(pid == (processors - 1)){    numSeqsPerProcessor = num - pid * numSeqsPerProcessor;  }
596                                                 
597                                 
598                                                 //do your part
599                                                 MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI, MPIPos);
600                                                 
601                                                 if (m->control_pressed) {  MPI_File_close(&inMPI);  return 0;  }
602                                                                                                 
603                                 }else { //i am the child process
604                                         MPI_Recv(&num, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status);
605                                         MPIPos.resize(num+1);
606                                         numSeqs += num;
607                                         MPI_Recv(&MPIPos[0], (num+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status);
608                                         
609                                         //figure out how many sequences you have to align
610                                         numSeqsPerProcessor = num / processors;
611                                         int startIndex =  pid * numSeqsPerProcessor;
612                                         if(pid == (processors - 1)){    numSeqsPerProcessor = num - pid * numSeqsPerProcessor;  }
613                                         
614                                         
615                                         //do your part
616                                         MPICreateFilter(startIndex, numSeqsPerProcessor, F, inMPI,  MPIPos);
617                                         
618                                         if (m->control_pressed) {  MPI_File_close(&inMPI);  return 0;  }
619                                 }
620                                 
621                                 MPI_File_close(&inMPI);
622                                 MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
623                                 
624 #else
625                 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
626                                 if(processors == 1){
627                                         ifstream inFASTA;
628                                         openInputFile(fastafileNames[s], inFASTA);
629                                         int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
630                                         inFASTA.close();
631                                         
632                                         numSeqs += numFastaSeqs;
633                                         
634                                         lines.push_back(new linePair(0, numFastaSeqs));
635                                         
636                                         driverCreateFilter(F, fastafileNames[s], lines[0]);
637                                 }else{
638                                         setLines(fastafileNames[s]);                                    
639                                         createProcessesCreateFilter(F, fastafileNames[s]); 
640                                 }
641                                 
642                                 if (m->control_pressed) {  return filterString; }
643                 #else
644                                 ifstream inFASTA;
645                                 openInputFile(fastafileNames[s], inFASTA);
646                                 int numFastaSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
647                                 inFASTA.close();
648                                 
649                                 numSeqs += numFastaSeqs;
650                                 
651                                 lines.push_back(new linePair(0, numFastaSeqs));
652                                 
653                                 driverCreateFilter(F, fastafileNames[s], lines[0]);
654                                 if (m->control_pressed) {  return filterString; }
655                 #endif
656 #endif
657                         
658                         }
659                 }
660
661
662 #ifdef USE_MPI  
663                 int pid;
664                 int Atag = 1; int Ttag = 2; int Ctag = 3; int Gtag = 4; int Gaptag = 5;
665                 MPI_Status status;
666                 
667                 MPI_Comm_rank(MPI_COMM_WORLD, &pid); 
668                 
669                 if(trump != '*' || isTrue(vertical) || soft != 0){
670                         
671                         if (pid == 0) { //only one process should output the filter
672                         
673                                 vector<int> temp; temp.resize(alignmentLength+1);
674                                                                 
675                                 //get the frequencies from the child processes
676                                 for(int i = 1; i < processors; i++) { 
677                                 
678                                         for (int j = 0; j < 5; j++) {
679                                         
680                                                 MPI_Recv(&temp[0], (alignmentLength+1), MPI_INT, i, 2001, MPI_COMM_WORLD, &status); 
681                                                 int receiveTag = temp[temp.size()-1];  //child process added a int to the end to indicate what letter count this is for
682                                                 
683                                                 if (receiveTag == Atag) { //you are recieveing the A frequencies
684                                                         for (int k = 0; k < alignmentLength; k++) {             F.a[k] += temp[k];      }
685                                                 }else if (receiveTag == Ttag) { //you are recieveing the T frequencies
686                                                         for (int k = 0; k < alignmentLength; k++) {             F.t[k] += temp[k];      }
687                                                 }else if (receiveTag == Ctag) { //you are recieveing the C frequencies
688                                                         for (int k = 0; k < alignmentLength; k++) {             F.c[k] += temp[k];      }
689                                                 }else if (receiveTag == Gtag) { //you are recieveing the G frequencies
690                                                         for (int k = 0; k < alignmentLength; k++) {             F.g[k] += temp[k];      }
691                                                 }else if (receiveTag == Gaptag) { //you are recieveing the gap frequencies
692                                                         for (int k = 0; k < alignmentLength; k++) {             F.gap[k] += temp[k];    }
693                                                 }
694                                         }
695                                 } 
696                         }else{
697                         
698                                 //send my fequency counts
699                                 F.a.push_back(Atag);
700                                 int ierr = MPI_Send(&(F.a[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
701                                 F.t.push_back(Ttag);
702                                 ierr = MPI_Send (&(F.t[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
703                                 F.c.push_back(Ctag);
704                                 ierr = MPI_Send(&(F.c[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
705                                 F.g.push_back(Gtag);
706                                 ierr = MPI_Send(&(F.g[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
707                                 F.gap.push_back(Gaptag);
708                                 ierr = MPI_Send(&(F.gap[0]), (alignmentLength+1), MPI_INT, 0, 2001, MPI_COMM_WORLD);
709                         }
710                         
711                 }
712                 
713                 MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case
714                 
715                 if (pid == 0) { //only one process should output the filter
716 #endif
717                 F.setNumSeqs(numSeqs);
718                                 
719                 if(isTrue(vertical) == 1)       {       F.doVertical(); }
720                 if(soft != 0)                           {       F.doSoft();             }
721                         
722                 filterString = F.getFilter();
723                 
724 #ifdef USE_MPI
725                 //send filter string to kids
726                 //for(int i = 1; i < processors; i++) { 
727                 //      MPI_Send(&filterString[0], alignmentLength, MPI_CHAR, i, 2001, MPI_COMM_WORLD);
728                 //}
729                 MPI_Bcast(&filterString[0], alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD);
730         }else{
731                 //recieve filterString
732                 char* tempBuf = new char[alignmentLength];
733                 //MPI_Recv(&tempBuf[0], alignmentLength, MPI_CHAR, 0, 2001, MPI_COMM_WORLD, &status);
734                 MPI_Bcast(tempBuf, alignmentLength, MPI_CHAR, 0, MPI_COMM_WORLD);
735                 
736                 filterString = tempBuf;
737                 if (filterString.length() > alignmentLength) { filterString = filterString.substr(0, alignmentLength);  }
738                 delete tempBuf; 
739         }
740         
741         MPI_Barrier(MPI_COMM_WORLD);
742 #endif
743                                 
744                 return filterString;
745         }
746         catch(exception& e) {
747                 m->errorOut(e, "FilterSeqsCommand", "createFilter");
748                 exit(1);
749         }
750 }
751 /**************************************************************************************/
752 int FilterSeqsCommand::driverCreateFilter(Filters& F, string filename, linePair* line) {        
753         try {
754                 
755                 ifstream in;
756                 openInputFile(filename, in);
757                                 
758                 in.seekg(line->start);
759                 
760                 for(int i=0;i<line->num;i++){
761                                 
762                         if (m->control_pressed) { in.close(); return 1; }
763                                         
764                         Sequence seq(in);
765                         if (seq.getName() != "") {
766                                         if (seq.getAligned().length() != alignmentLength) { m->mothurOut("Sequences are not all the same length, please correct."); m->mothurOutEndLine(); m->control_pressed = true;  }
767                                         
768                                         if(trump != '*'){       F.doTrump(seq); }
769                                         if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
770                                         cout.flush();
771                         }
772                         
773                         //report progress
774                         if((i+1) % 100 == 0){   m->mothurOut(toString(i+1)); m->mothurOutEndLine();             }
775                 }
776                 
777                 //report progress
778                 if((line->num) % 100 != 0){     m->mothurOut(toString(line->num)); m->mothurOutEndLine();               }
779                 
780                 in.close();
781                 
782                 return 0;
783         }
784         catch(exception& e) {
785                 m->errorOut(e, "FilterSeqsCommand", "driverCreateFilter");
786                 exit(1);
787         }
788 }
789 #ifdef USE_MPI
790 /**************************************************************************************/
791 int FilterSeqsCommand::MPICreateFilter(int start, int num, Filters& F, MPI_File& inMPI, vector<long>& MPIPos) { 
792         try {
793                 
794                 MPI_Status status; 
795                 int pid;
796                 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
797                 
798                 for(int i=0;i<num;i++){
799                         
800                         if (m->control_pressed) { return 0; }
801                         
802                         //read next sequence
803                         int length = MPIPos[start+i+1] - MPIPos[start+i];
804         
805                         char* buf4 = new char[length];
806                         MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status);
807                         
808                         string tempBuf = buf4;
809                         if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length);  }
810                         istringstream iss (tempBuf,istringstream::in);
811                         delete buf4;
812
813                         Sequence seq(iss);  
814
815                         if (seq.getAligned().length() != alignmentLength) {  cout << "Alignment length is " << alignmentLength << " and sequence " << seq.getName() << " has length " << seq.getAligned().length() << ", please correct." << endl; exit(1);  }
816                         
817                         if(trump != '*'){       F.doTrump(seq); }
818                         if(isTrue(vertical) || soft != 0){      F.getFreqs(seq);        }
819                         cout.flush();
820                                                 
821                         //report progress
822                         if((i+1) % 100 == 0){   cout << (i+1) << endl;   m->mothurOutJustToLog(toString(i+1) + "\n");   }
823                 }
824                 
825                 //report progress
826                 if((num) % 100 != 0){   cout << num << endl; m->mothurOutJustToLog(toString(num) + "\n");       }
827                 
828                 return 0;
829         }
830         catch(exception& e) {
831                 m->errorOut(e, "FilterSeqsCommand", "MPICreateFilter");
832                 exit(1);
833         }
834 }
835 #endif
836 /**************************************************************************************************/
837
838 int FilterSeqsCommand::createProcessesCreateFilter(Filters& F, string filename) {
839         try {
840 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
841                 int process = 0;
842                 int exitCommand = 1;
843                 processIDS.clear();
844                 
845                 //loop through and create all the processes you want
846                 while (process != processors) {
847                         int pid = vfork();
848                         
849                         if (pid > 0) {
850                                 processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
851                                 process++;
852                         }else if (pid == 0){
853                                 driverCreateFilter(F, filename, lines[process]);
854                                 exit(0);
855                         }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
856                 }
857                 
858                 //force parent to wait until all the processes are done
859                 for (int i=0;i<processors;i++) { 
860                         int temp = processIDS[i];
861                         wait(&temp);
862                 }
863                 
864                 return exitCommand;
865 #endif          
866         }
867         catch(exception& e) {
868                 m->errorOut(e, "FilterSeqsCommand", "createProcessesCreateFilter");
869                 exit(1);
870         }
871 }
872 /**************************************************************************************************/
873
874 int FilterSeqsCommand::setLines(string filename) {
875         try {
876                 
877                 vector<long int> positions;
878                 bufferSizes.clear();
879                 
880                 ifstream inFASTA;
881                 openInputFile(filename, inFASTA);
882                         
883                 string input;
884                 while(!inFASTA.eof()){  
885                         input = getline(inFASTA);
886
887                         if (input.length() != 0) {
888                                 if(input[0] == '>'){ long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1);     }
889                         }
890                 }
891                 inFASTA.close();
892                 
893                 int numFastaSeqs = positions.size();
894         
895                 FILE * pFile;
896                 long size;
897                 
898                 //get num bytes in file
899                 pFile = fopen (filename.c_str(),"rb");
900                 if (pFile==NULL) perror ("Error opening file");
901                 else{
902                         fseek (pFile, 0, SEEK_END);
903                         size=ftell (pFile);
904                         fclose (pFile);
905                 }
906         
907                 numSeqs += numFastaSeqs;
908                 
909                 int numSeqsPerProcessor = numFastaSeqs / processors;
910                 
911                 for (int i = 0; i < processors; i++) {
912
913                         long int startPos = positions[ i * numSeqsPerProcessor ];
914                         if(i == processors - 1){
915                                 numSeqsPerProcessor = numFastaSeqs - i * numSeqsPerProcessor;
916                                 bufferSizes.push_back(size - startPos);
917                         }else{  
918                                 long int myEnd = positions[ (i+1) * numSeqsPerProcessor ];
919                                 bufferSizes.push_back(myEnd-startPos);
920                         }
921                         lines.push_back(new linePair(startPos, numSeqsPerProcessor));
922                 }
923                 
924                 return 0;
925         }
926         catch(exception& e) {
927                 m->errorOut(e, "FilterSeqsCommand", "setLines");
928                 exit(1);
929         }
930 }
931 /**************************************************************************************/