]> git.donarmstrong.com Git - mothur.git/blob - chimeracheckcommand.cpp
paralellized screen.seqs and added mpi code to it. fixed bug with all mpi commands...
[mothur.git] / chimeracheckcommand.cpp
1 /*
2  *  chimeracheckcommand.cpp
3  *  Mothur
4  *
5  *  Created by westcott on 3/31/10.
6  *  Copyright 2010 Schloss Lab. All rights reserved.
7  *
8  */
9
10 #include "chimeracheckcommand.h"
11 #include "chimeracheckrdp.h"
12
13 //***************************************************************************************************************
14
15 ChimeraCheckCommand::ChimeraCheckCommand(string option)  {
16         try {
17                 abort = false;
18                 
19                 //allow user to run help
20                 if(option == "help") { help(); abort = true; }
21                 
22                 else {
23                         //valid paramters for this command
24                         string Array[] =  {"fasta","processors","increment","template","ksize","svg", "name","outputdir","inputdir" };
25                         vector<string> myArray (Array, Array+(sizeof(Array)/sizeof(string)));
26                         
27                         OptionParser parser(option);
28                         map<string,string> parameters = parser.getParameters();
29                         
30                         ValidParameters validParameter;
31                         map<string,string>::iterator it;
32                         
33                         //check to make sure all parameters are valid for command
34                         for (it = parameters.begin(); it != parameters.end(); it++) { 
35                                 if (validParameter.isValidParameter(it->first, myArray, it->second) != true) {  abort = true;  }
36                         }
37                         
38                         //if the user changes the input directory command factory will send this info to us in the output parameter 
39                         string inputDir = validParameter.validFile(parameters, "inputdir", false);              
40                         if (inputDir == "not found"){   inputDir = "";          }
41                         else {
42                                 string path;
43                                 it = parameters.find("fasta");
44                                 //user has given a template file
45                                 if(it != parameters.end()){ 
46                                         path = hasPath(it->second);
47                                         //if the user has not given a path then, add inputdir. else leave path alone.
48                                         if (path == "") {       parameters["fasta"] = inputDir + it->second;            }
49                                 }
50                                 
51                                 it = parameters.find("template");
52                                 //user has given a template file
53                                 if(it != parameters.end()){ 
54                                         path = hasPath(it->second);
55                                         //if the user has not given a path then, add inputdir. else leave path alone.
56                                         if (path == "") {       parameters["template"] = inputDir + it->second;         }
57                                 }
58                                 
59                                 it = parameters.find("name");
60                                 //user has given a template file
61                                 if(it != parameters.end()){ 
62                                         path = hasPath(it->second);
63                                         //if the user has not given a path then, add inputdir. else leave path alone.
64                                         if (path == "") {       parameters["name"] = inputDir + it->second;             }
65                                 }
66                         }
67
68                         
69                         //check for required parameters
70                         fastafile = validParameter.validFile(parameters, "fasta", true);
71                         if (fastafile == "not open") { abort = true; }
72                         else if (fastafile == "not found") { fastafile = ""; m->mothurOut("fasta is a required parameter for the chimera.check command."); m->mothurOutEndLine(); abort = true;  }      
73                         
74                         //if the user changes the output directory command factory will send this info to us in the output parameter 
75                         outputDir = validParameter.validFile(parameters, "outputdir", false);           if (outputDir == "not found"){  
76                                 outputDir = ""; 
77                                 outputDir += hasPath(fastafile); //if user entered a file with a path then preserve it  
78                         }
79
80                         templatefile = validParameter.validFile(parameters, "template", true);
81                         if (templatefile == "not open") { abort = true; }
82                         else if (templatefile == "not found") { templatefile = "";  m->mothurOut("template is a required parameter for the chimera.check command."); m->mothurOutEndLine(); abort = true;  }    
83                         
84                         namefile = validParameter.validFile(parameters, "name", true);
85                         if (namefile == "not open") { abort = true; }
86                         else if (namefile == "not found") { namefile = "";  }
87
88                         string temp = validParameter.validFile(parameters, "processors", false);                if (temp == "not found") { temp = "1"; }
89                         convert(temp, processors);
90                         
91                         temp = validParameter.validFile(parameters, "ksize", false);                    if (temp == "not found") { temp = "7"; }
92                         convert(temp, ksize);
93                         
94                         temp = validParameter.validFile(parameters, "svg", false);                              if (temp == "not found") { temp = "F"; }
95                         svg = isTrue(temp);
96                         
97                         temp = validParameter.validFile(parameters, "increment", false);                if (temp == "not found") { temp = "10"; }
98                         convert(temp, increment);                       
99                 }
100         }
101         catch(exception& e) {
102                 m->errorOut(e, "ChimeraCheckCommand", "ChimeraCheckCommand");
103                 exit(1);
104         }
105 }
106 //**********************************************************************************************************************
107
108 void ChimeraCheckCommand::help(){
109         try {
110         
111                 m->mothurOut("The chimera.check command reads a fastafile and templatefile and outputs potentially chimeric sequences.\n");
112                 m->mothurOut("This command was created using the algorythms described in CHIMERA_CHECK version 2.7 written by Niels Larsen. \n");
113                 m->mothurOut("The chimera.check command parameters are fasta, template, processors, ksize, increment, svg and name.\n");
114                 m->mothurOut("The fasta parameter allows you to enter the fasta file containing your potentially chimeric sequences, and is required. \n");
115                 m->mothurOut("The template parameter allows you to enter a template file containing known non-chimeric sequences, and is required. \n");
116                 m->mothurOut("The processors parameter allows you to specify how many processors you would like to use.  The default is 1. \n");
117                 #ifdef USE_MPI
118                 m->mothurOut("When using MPI, the processors parameter is set to the number of MPI processes running. \n");
119                 #endif
120                 m->mothurOut("The increment parameter allows you to specify how far you move each window while finding chimeric sequences, default is 10.\n");
121                 m->mothurOut("The ksize parameter allows you to input kmersize, default is 7. \n");
122                 m->mothurOut("The svg parameter allows you to specify whether or not you would like a svg file outputted for each query sequence, default is False.\n");
123                 m->mothurOut("The name parameter allows you to enter a file containing names of sequences you would like .svg files for.\n");
124                 m->mothurOut("The chimera.check command should be in the following format: \n");
125                 m->mothurOut("chimera.check(fasta=yourFastaFile, template=yourTemplateFile, processors=yourProcessors, ksize=yourKmerSize) \n");
126                 m->mothurOut("Example: chimera.check(fasta=AD.fasta, template=core_set_aligned,imputed.fasta, processors=4, ksize=8) \n");
127                 m->mothurOut("Note: No spaces between parameter labels (i.e. fasta), '=' and parameters (i.e.yourFastaFile).\n\n");     
128         }
129         catch(exception& e) {
130                 m->errorOut(e, "ChimeraCheckCommand", "help");
131                 exit(1);
132         }
133 }
134
135 //***************************************************************************************************************
136
137 ChimeraCheckCommand::~ChimeraCheckCommand(){    /*      do nothing      */      }
138
139 //***************************************************************************************************************
140
141 int ChimeraCheckCommand::execute(){
142         try{
143                 
144                 if (abort == true) { return 0; }
145                 
146                 int start = time(NULL); 
147                 
148                 chimera = new ChimeraCheckRDP(fastafile, templatefile, namefile, svg, increment, ksize, outputDir);                     
149
150                 if (m->control_pressed) { delete chimera;       return 0;       }
151                 
152                 string outputFileName = outputDir + getRootName(getSimpleName(fastafile))  + "chimeracheck.chimeras";
153                 
154         #ifdef USE_MPI
155         
156                         int pid, end, numSeqsPerProcessor; 
157                         int tag = 2001;
158                         vector<long> MPIPos;
159                         
160                         MPI_Status status; 
161                         MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
162                         MPI_Comm_size(MPI_COMM_WORLD, &processors); 
163
164                         MPI_File inMPI;
165                         MPI_File outMPI;
166                                                 
167                         int outMode=MPI_MODE_CREATE|MPI_MODE_WRONLY; 
168                         int inMode=MPI_MODE_RDONLY; 
169                         
170                         //char* outFilename = new char[outputFileName.length()];
171                         //memcpy(outFilename, outputFileName.c_str(), outputFileName.length());
172                         
173                         char outFilename[1024];
174                         strcpy(outFilename, outputFileName.c_str());
175
176                         //char* inFileName = new char[fastafile.length()];
177                         //memcpy(inFileName, fastafile.c_str(), fastafile.length());
178                         
179                         char inFileName[1024];
180                         strcpy(inFileName, fastafile.c_str());
181
182                         MPI_File_open(MPI_COMM_WORLD, inFileName, inMode, MPI_INFO_NULL, &inMPI);  //comm, filename, mode, info, filepointer
183                         MPI_File_open(MPI_COMM_WORLD, outFilename, outMode, MPI_INFO_NULL, &outMPI);
184                         
185                         //delete outFilename;
186                         //delete inFileName;
187
188                         if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);  delete chimera; return 0;  }
189                         
190                         if (pid == 0) { //you are the root process 
191                                 MPIPos = setFilePosFasta(fastafile, numSeqs); //fills MPIPos, returns numSeqs
192                                 
193                                 //send file positions to all processes
194                                 MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD);  //send numSeqs
195                                 MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //send file pos        
196                                 
197                                 //figure out how many sequences you have to align
198                                 numSeqsPerProcessor = numSeqs / processors;
199                                 int startIndex =  pid * numSeqsPerProcessor;
200                                 if(pid == (processors - 1)){    numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor;      }
201                                 
202                         
203                                 //align your part
204                                 driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos);
205                                 
206                                 if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);  remove(outputFileName.c_str());  delete chimera; return 0;  }
207                                 
208                                 //wait on chidren
209                                 for(int i = 1; i < processors; i++) { 
210                                         char buf[4];
211                                         MPI_Recv(buf, 4, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status); 
212                                 }
213                         }else{ //you are a child process
214                                 MPI_Bcast(&numSeqs, 1, MPI_INT, 0, MPI_COMM_WORLD); //get numSeqs
215                                 MPIPos.resize(numSeqs+1);
216                                 MPI_Bcast(&MPIPos[0], (numSeqs+1), MPI_LONG, 0, MPI_COMM_WORLD); //get file positions
217                                 
218                                 //figure out how many sequences you have to align
219                                 numSeqsPerProcessor = numSeqs / processors;
220                                 int startIndex =  pid * numSeqsPerProcessor;
221                                 if(pid == (processors - 1)){    numSeqsPerProcessor = numSeqs - pid * numSeqsPerProcessor;      }
222                                 
223                                 
224                                 //align your part
225                                 driverMPI(startIndex, numSeqsPerProcessor, inMPI, outMPI, MPIPos);
226                                 
227                                 if (m->control_pressed) {  MPI_File_close(&inMPI);  MPI_File_close(&outMPI);   delete chimera; return 0;  }
228                                 
229                                 //tell parent you are done.
230                                 char buf[4];
231                                 strcpy(buf, "done"); 
232                                 MPI_Send(buf, 4, MPI_CHAR, 0, tag, MPI_COMM_WORLD);
233                         }
234                         
235                         //close files 
236                         MPI_File_close(&inMPI);
237                         MPI_File_close(&outMPI);
238         #else
239                 
240                 //break up file
241                 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
242                         if(processors == 1){
243                                 ifstream inFASTA;
244                                 openInputFile(fastafile, inFASTA);
245                                 numSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
246                                 inFASTA.close();
247                                 
248                                 lines.push_back(new linePair(0, numSeqs));
249                                 
250                                 driver(lines[0], outputFileName, fastafile);
251                                 
252                                 if (m->control_pressed) { 
253                                         remove(outputFileName.c_str()); 
254                                         for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
255                                         delete chimera;
256                                         return 0;
257                                 }
258                                                                 
259                         }else{
260                                 vector<int> positions;
261                                 processIDS.resize(0);
262                                 
263                                 ifstream inFASTA;
264                                 openInputFile(fastafile, inFASTA);
265                                 
266                                 string input;
267                                 while(!inFASTA.eof()){
268                                         input = getline(inFASTA);
269                                         if (input.length() != 0) {
270                                                 if(input[0] == '>'){    long int pos = inFASTA.tellg(); positions.push_back(pos - input.length() - 1);  }
271                                         }
272                                 }
273                                 inFASTA.close();
274                                 
275                                 numSeqs = positions.size();
276                                 
277                                 int numSeqsPerProcessor = numSeqs / processors;
278                                 
279                                 for (int i = 0; i < processors; i++) {
280                                         long int startPos = positions[ i * numSeqsPerProcessor ];
281                                         if(i == processors - 1){
282                                                 numSeqsPerProcessor = numSeqs - i * numSeqsPerProcessor;
283                                         }
284                                         lines.push_back(new linePair(startPos, numSeqsPerProcessor));
285                                 }
286                                 
287                                 
288                                 createProcesses(outputFileName, fastafile); 
289                         
290                                 rename((outputFileName + toString(processIDS[0]) + ".temp").c_str(), outputFileName.c_str());
291                                         
292                                 //append output files
293                                 for(int i=1;i<processors;i++){
294                                         appendFiles((outputFileName + toString(processIDS[i]) + ".temp"), outputFileName);
295                                         remove((outputFileName + toString(processIDS[i]) + ".temp").c_str());
296                                 }
297                                 
298                                 if (m->control_pressed) { 
299                                         remove(outputFileName.c_str()); 
300                                         for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
301                                         delete chimera;
302                                         return 0;
303                                 }
304                         }
305
306                 #else
307                         ifstream inFASTA;
308                         openInputFile(fastafile, inFASTA);
309                         numSeqs=count(istreambuf_iterator<char>(inFASTA),istreambuf_iterator<char>(), '>');
310                         inFASTA.close();
311                         lines.push_back(new linePair(0, numSeqs));
312                         
313                         driver(lines[0], outputFileName, fastafile);
314                         
315                         if (m->control_pressed) { 
316                                         remove(outputFileName.c_str()); 
317                                         for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
318                                         delete chimera;
319                                         return 0;
320                         }
321                 #endif
322         #endif          
323                 delete chimera;
324                 for (int i = 0; i < lines.size(); i++) {  delete lines[i];  }  lines.clear();
325                 
326                 m->mothurOutEndLine(); m->mothurOut("This method does not determine if a sequence is chimeric, but allows you to make that determination based on the IS values."); m->mothurOutEndLine(); 
327                 
328                 m->mothurOutEndLine();
329                 m->mothurOut("Output File Names: "); m->mothurOutEndLine();
330                 m->mothurOut(outputFileName); m->mothurOutEndLine();    
331                 m->mothurOutEndLine();
332                 m->mothurOutEndLine(); m->mothurOut("It took " + toString(time(NULL) - start) + " secs to check " + toString(numSeqs) + " sequences."); m->mothurOutEndLine();
333
334                 return 0;
335                 
336         }
337         catch(exception& e) {
338                 m->errorOut(e, "ChimeraCheckCommand", "execute");
339                 exit(1);
340         }
341 }
342 //**********************************************************************************************************************
343
344 int ChimeraCheckCommand::driver(linePair* line, string outputFName, string filename){
345         try {
346                 ofstream out;
347                 openOutputFile(outputFName, out);
348                 
349                 ofstream out2;
350                 
351                 ifstream inFASTA;
352                 openInputFile(filename, inFASTA);
353
354                 inFASTA.seekg(line->start);
355                 
356                 for(int i=0;i<line->numSeqs;i++){
357                 
358                         if (m->control_pressed) {       return 1;       }
359                 
360                         Sequence* candidateSeq = new Sequence(inFASTA);  gobble(inFASTA);
361                                 
362                         if (candidateSeq->getName() != "") { //incase there is a commented sequence at the end of a file
363                                 //find chimeras
364                                 chimera->getChimeras(candidateSeq);
365                                 
366                                 if (m->control_pressed) {       delete candidateSeq; return 1;  }
367         
368                                 //print results
369                                 chimera->print(out, out2);
370                         }
371                         delete candidateSeq;
372                         
373                         //report progress
374                         if((i+1) % 100 == 0){   m->mothurOut("Processing sequence: " + toString(i+1)); m->mothurOutEndLine();           }
375                 }
376                 //report progress
377                 if((line->numSeqs) % 100 != 0){ m->mothurOut("Processing sequence: " + toString(line->numSeqs)); m->mothurOutEndLine();         }
378                 
379                 out.close();
380                 inFASTA.close();
381                                 
382                 return 0;
383         }
384         catch(exception& e) {
385                 m->errorOut(e, "ChimeraCheckCommand", "driver");
386                 exit(1);
387         }
388 }
389 //**********************************************************************************************************************
390 #ifdef USE_MPI
391 int ChimeraCheckCommand::driverMPI(int start, int num, MPI_File& inMPI, MPI_File& outMPI, vector<long>& MPIPos){
392         try {
393                 MPI_File outAccMPI;
394                 MPI_Status status; 
395                 int pid;
396                 MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are
397                 
398                 for(int i=0;i<num;i++){
399                         
400                         if (m->control_pressed) { return 0; }
401                         
402                         //read next sequence
403                         int length = MPIPos[start+i+1] - MPIPos[start+i];
404         
405                         char* buf4 = new char[length];
406                         MPI_File_read_at(inMPI, MPIPos[start+i], buf4, length, MPI_CHAR, &status);
407                         
408                         string tempBuf = buf4;
409                         if (tempBuf.length() > length) { tempBuf = tempBuf.substr(0, length);  }
410                         istringstream iss (tempBuf,istringstream::in);
411                         delete buf4;
412
413                         Sequence* candidateSeq = new Sequence(iss);  gobble(iss);
414                                 
415                         if (candidateSeq->getName() != "") { //incase there is a commented sequence at the end of a file
416                                 //find chimeras
417                                 chimera->getChimeras(candidateSeq);
418                                         
419                                 //print results
420                                 chimera->print(outMPI, outAccMPI);
421                         }
422                         delete candidateSeq;
423                         
424                         //report progress
425                         if((i+1) % 100 == 0){  cout << "Processing sequence: " << (i+1) << endl;        m->mothurOutJustToLog("Processing sequence: " + toString(i+1) + "\n");          }
426                 }
427                 //report progress
428                 if(num % 100 != 0){             cout << "Processing sequence: " << num << endl; m->mothurOutJustToLog("Processing sequence: " + toString(num) + "\n");  }
429                 
430                 return 0;
431         }
432         catch(exception& e) {
433                 m->errorOut(e, "ChimeraCheckCommand", "driverMPI");
434                 exit(1);
435         }
436 }
437 #endif
438
439 /**************************************************************************************************/
440
441 int ChimeraCheckCommand::createProcesses(string outputFileName, string filename) {
442         try {
443 #if defined (__APPLE__) || (__MACH__) || (linux) || (__linux)
444                 int process = 0;
445                 //              processIDS.resize(0);
446                 
447                 //loop through and create all the processes you want
448                 while (process != processors) {
449                         int pid = fork();
450                         
451                         if (pid > 0) {
452                                 processIDS.push_back(pid);  //create map from line number to pid so you can append files in correct order later
453                                 process++;
454                         }else if (pid == 0){
455                                 driver(lines[process], outputFileName + toString(getpid()) + ".temp", filename);
456                                 exit(0);
457                         }else { m->mothurOut("unable to spawn the necessary processes."); m->mothurOutEndLine(); exit(0); }
458                 }
459                 
460                 //force parent to wait until all the processes are done
461                 for (int i=0;i<processors;i++) { 
462                         int temp = processIDS[i];
463                         wait(&temp);
464                 }
465                 
466                 return 0;
467 #endif          
468         }
469         catch(exception& e) {
470                 m->errorOut(e, "ChimeraCheckCommand", "createProcesses");
471                 exit(1);
472         }
473 }
474 /**************************************************************************************************/
475
476