+vector< vector< vector<string> > > MakeContigsCommand::preProcessData(unsigned long int& numReads) {
+ try {
+ vector< vector< vector<string> > > filesToProcess;
+
+ if (ffastqfile != "") { //reading one file
+ vector< vector<string> > files = readFastqFiles(numReads, ffastqfile, rfastqfile);
+ //adjust for really large processors or really small files
+ if (numReads == 0) { m->mothurOut("[ERROR]: no good reads.\n"); m->control_pressed = true; }
+ if (numReads < processors) {
+ for (int i = numReads; i < processors; i++) { for(int j = 0; j < files[i].size(); j++) { m->mothurRemove(files[i][j]); } files[i].clear(); }
+ files.resize(numReads);
+ processors = numReads;
+ }
+ filesToProcess.push_back(files);
+ }else if (file != "") { //reading multiple files
+ //return only valid pairs
+ vector< vector<string> > filePairsToProcess = readFileNames(file);
+
+ if (m->control_pressed) { return filesToProcess; }
+
+ if (filePairsToProcess.size() != 0) {
+ for (int i = 0; i < filePairsToProcess.size(); i++) {
+
+ if (m->control_pressed) { for (int l = 0; l < filesToProcess.size(); l++) { for (int k = 0; k < filesToProcess[l].size(); k++) { for(int j = 0; j < filesToProcess[l][k].size(); j++) { m->mothurRemove(filesToProcess[l][k][j]); } filesToProcess[l][k].clear(); } return filesToProcess; } }
+
+ unsigned long int thisFilesReads;
+ vector< vector<string> > files = readFastqFiles(thisFilesReads, filePairsToProcess[i][0], filePairsToProcess[i][1]);
+
+ //adjust for really large processors or really small files
+ if (thisFilesReads < processors) {
+ m->mothurOut("[ERROR]: " + filePairsToProcess[i][0] + " has less than " + toString(processors) + " good reads, skipping\n");
+ for (int k = 0; k < files.size(); k++) { for(int j = 0; j < files[k].size(); j++) { m->mothurRemove(files[k][j]); } files[k].clear(); }
+ }else {
+ filesToProcess.push_back(files);
+ numReads += thisFilesReads;
+ }
+ }
+ //all files are bad
+ if (numReads == 0) { m->control_pressed = true; }
+ }
+ }else if (ffastafile != "") {
+ vector< vector<string> > files = readFastaFiles(numReads, ffastafile, rfastafile);
+ //adjust for really large processors or really small files
+ if (numReads == 0) { m->mothurOut("[ERROR]: no good reads.\n"); m->control_pressed = true; }
+ if (numReads < processors) {
+ for (int i = numReads; i < processors; i++) { for(int j = 0; j < files[i].size(); j++) { m->mothurRemove(files[i][j]); } files[i].clear(); }
+ files.resize(numReads);
+ processors = numReads;
+ }
+ filesToProcess.push_back(files);
+ }else { m->control_pressed = true; } //should not get here
+
+ return filesToProcess;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "MakeContigsCommand", "preProcessData");
+ exit(1);
+ }
+}
+//**********************************************************************************************************************
+int MakeContigsCommand::createProcesses(vector< vector<string> > files, string outputFasta, string outputScrapFasta, string outputMisMatches, vector<vector<string> > fastaFileNames) {
+ try {
+ int num = 0;
+ vector<int> processIDS;
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+ int process = 0;
+
+ //loop through and create all the processes you want
+ while (process != processors-1) {
+ int pid = fork();
+
+ if (pid > 0) {
+ processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
+ process++;
+ }else if (pid == 0){
+ vector<vector<string> > tempFASTAFileNames = fastaFileNames;
+
+ if(allFiles){
+ ofstream temp;
+
+ for(int i=0;i<tempFASTAFileNames.size();i++){
+ for(int j=0;j<tempFASTAFileNames[i].size();j++){
+ if (tempFASTAFileNames[i][j] != "") {
+ tempFASTAFileNames[i][j] += toString(getpid()) + ".temp";
+ m->openOutputFile(tempFASTAFileNames[i][j], temp); temp.close();
+ }
+ }
+ }
+ }
+
+ num = driver(files[process],
+ outputFasta + toString(getpid()) + ".temp",
+ outputScrapFasta + toString(getpid()) + ".temp",
+ outputMisMatches + toString(getpid()) + ".temp",
+ tempFASTAFileNames, process);
+
+ //pass groupCounts to parent
+ ofstream out;
+ string tempFile = toString(getpid()) + ".num.temp";
+ m->openOutputFile(tempFile, out);
+ out << num << endl;
+ if(createGroup){
+ out << groupCounts.size() << endl;
+
+ for (map<string, int>::iterator it = groupCounts.begin(); it != groupCounts.end(); it++) {
+ out << it->first << '\t' << it->second << endl;
+ }
+
+ out << groupMap.size() << endl;
+ for (map<string, string>::iterator it = groupMap.begin(); it != groupMap.end(); it++) {
+ out << it->first << '\t' << it->second << endl;
+ }
+ }
+ out.close();
+
+ exit(0);
+ }else {
+ m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
+ for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
+ exit(0);
+ }
+ }
+
+ ofstream temp;
+ m->openOutputFile(outputFasta, temp); temp.close();
+ m->openOutputFile(outputScrapFasta, temp); temp.close();
+
+ //do my part
+ num = driver(files[processors-1], outputFasta, outputScrapFasta, outputMisMatches, fastaFileNames, processors-1);
+
+ //force parent to wait until all the processes are done
+ for (int i=0;i<processIDS.size();i++) {
+ int temp = processIDS[i];
+ wait(&temp);
+ }
+
+ for (int i = 0; i < processIDS.size(); i++) {
+ ifstream in;
+ string tempFile = toString(processIDS[i]) + ".num.temp";
+ m->openInputFile(tempFile, in);
+ int tempNum;
+ in >> tempNum; num += tempNum; m->gobble(in);
+
+ if(createGroup){
+ string group;
+ in >> tempNum; m->gobble(in);
+
+ if (tempNum != 0) {
+ for (int j = 0; j < tempNum; j++) {
+ int groupNum;
+ in >> group >> groupNum; m->gobble(in);
+
+ map<string, int>::iterator it = groupCounts.find(group);
+ if (it == groupCounts.end()) { groupCounts[group] = groupNum; }
+ else { groupCounts[it->first] += groupNum; }
+ }
+ }
+ in >> tempNum; m->gobble(in);
+ if (tempNum != 0) {
+ for (int j = 0; j < tempNum; j++) {
+ string group, seqName;
+ in >> seqName >> group; m->gobble(in);
+
+ map<string, string>::iterator it = groupMap.find(seqName);
+ if (it == groupMap.end()) { groupMap[seqName] = group; }
+ else { m->mothurOut("[ERROR]: " + seqName + " is in your fasta file more than once. Sequence names must be unique. please correct.\n"); }
+ }
+ }
+ }
+ in.close(); m->mothurRemove(tempFile);
+ }
+ #else
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+ //Windows version shared memory, so be careful when passing variables through the contigsData struct.
+ //Above fork() will clone, so memory is separate, but that's not the case with windows,
+ //////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ vector<contigsData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+
+ //Create processor worker threads.
+ for( int h=0; h<processors-1; h++ ){
+ string extension = "";
+ if (h != 0) { extension = toString(h) + ".temp"; processIDS.push_back(h); }
+ vector<vector<string> > tempFASTAFileNames = fastaFileNames;
+
+ if(allFiles){
+ ofstream temp;
+
+ for(int i=0;i<tempFASTAFileNames.size();i++){
+ for(int j=0;j<tempFASTAFileNames[i].size();j++){
+ if (tempFASTAFileNames[i][j] != "") {
+ tempFASTAFileNames[i][j] += extension;
+ m->openOutputFile(tempFASTAFileNames[i][j], temp); temp.close();
+ }
+ }
+ }
+ }
+
+
+ contigsData* tempcontig = new contigsData(files[h], (outputFasta + extension), (outputScrapFasta + extension), (outputMisMatches + extension), align, m, match, misMatch, gapOpen, gapExtend, insert, deltaq, barcodes, primers, tempFASTAFileNames, barcodeNameVector, primerNameVector, pdiffs, bdiffs, tdiffs, createGroup, allFiles, trimOverlap, h);
+ pDataArray.push_back(tempcontig);
+
+ hThreadArray[h] = CreateThread(NULL, 0, MyContigsThreadFunction, pDataArray[h], 0, &dwThreadIdArray[h]);
+ }
+
+ vector<vector<string> > tempFASTAFileNames = fastaFileNames;
+
+ if(allFiles){
+ ofstream temp;
+ string extension = toString(processors-1) + ".temp";
+
+ for(int i=0;i<tempFASTAFileNames.size();i++){
+ for(int j=0;j<tempFASTAFileNames[i].size();j++){
+ if (tempFASTAFileNames[i][j] != "") {
+ tempFASTAFileNames[i][j] += extension;
+ m->openOutputFile(tempFASTAFileNames[i][j], temp); temp.close();
+ }
+ }
+ }
+ }
+
+ //parent do my part
+ ofstream temp;
+ m->openOutputFile(outputFasta, temp); temp.close();
+ m->openOutputFile(outputScrapFasta, temp); temp.close();
+
+ //do my part
+ processIDS.push_back(processors-1);
+ num = driver(files[processors-1], (outputFasta+ toString(processors-1) + ".temp"), (outputScrapFasta+ toString(processors-1) + ".temp"), (outputMisMatches+ toString(processors-1) + ".temp"), tempFASTAFileNames, processors-1);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ if (!pDataArray[i]->done) {
+ m->mothurOut("[ERROR]: process " + toString(i) + " only processed " + toString(pDataArray[i]->count) + " of sequences assigned to it, quitting. \n"); m->control_pressed = true;
+ }
+ for (map<string, int>::iterator it = pDataArray[i]->groupCounts.begin(); it != pDataArray[i]->groupCounts.end(); it++) {
+ map<string, int>::iterator it2 = groupCounts.find(it->first);
+ if (it2 == groupCounts.end()) { groupCounts[it->first] = it->second; }
+ else { groupCounts[it->first] += it->second; }
+ }
+ for (map<string, string>::iterator it = pDataArray[i]->groupMap.begin(); it != pDataArray[i]->groupMap.end(); it++) {
+ map<string, string>::iterator it2 = groupMap.find(it->first);
+ if (it2 == groupMap.end()) { groupMap[it->first] = it->second; }
+ else { m->mothurOut("[ERROR]: " + it->first + " is in your fasta file more than once. Sequence names must be unique. please correct.\n"); }
+ }
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
+ #endif
+
+ for (int i = 0; i < processIDS.size(); i++) {
+ m->appendFiles((outputFasta + toString(processIDS[i]) + ".temp"), outputFasta);
+ m->mothurRemove((outputFasta + toString(processIDS[i]) + ".temp"));
+
+ m->appendFiles((outputScrapFasta + toString(processIDS[i]) + ".temp"), outputScrapFasta);
+ m->mothurRemove((outputScrapFasta + toString(processIDS[i]) + ".temp"));
+
+ m->appendFiles((outputMisMatches + toString(processIDS[i]) + ".temp"), outputMisMatches);
+ m->mothurRemove((outputMisMatches + toString(processIDS[i]) + ".temp"));
+
+ if(allFiles){
+ for(int j=0;j<fastaFileNames.size();j++){
+ for(int k=0;k<fastaFileNames[j].size();k++){
+ if (fastaFileNames[j][k] != "") {
+ m->appendFiles((fastaFileNames[j][k] + toString(processIDS[i]) + ".temp"), fastaFileNames[j][k]);
+ m->mothurRemove((fastaFileNames[j][k] + toString(processIDS[i]) + ".temp"));
+ }
+ }
+ }
+ }
+ }
+
+ return num;
+ }
+ catch(exception& e) {
+ m->errorOut(e, "MakeContigsCommand", "createProcesses");
+ exit(1);
+ }
+}
+//**********************************************************************************************************************
+int MakeContigsCommand::driver(vector<string> files, string outputFasta, string outputScrapFasta, string outputMisMatches, vector<vector<string> > fastaFileNames, int process){