X-Git-Url: https://git.donarmstrong.com/?p=mothur.git;a=blobdiff_plain;f=chimera.cpp;h=53e77322f52ba40eb478e0b385a84982de752d6b;hp=ed843971ac7d4ab229595262631d548494f27d1c;hb=cf9987b67aa49777a4c91c2d21f96e58bf17aa82;hpb=65b6a38d00b3a72021611211e7c25392022c69ed diff --git a/chimera.cpp b/chimera.cpp index ed84397..53e7732 100644 --- a/chimera.cpp +++ b/chimera.cpp @@ -122,69 +122,109 @@ vector Chimera::readSeqs(string file) { m->mothurOut("Reading sequences from " + file + "..."); cout.flush(); - #ifdef USE_MPI + #ifdef USE_MPI int pid, processors; - vector positions; + vector positions; int numSeqs; int tag = 2001; - - MPI_Status status; MPI_File inMPI; - MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are - MPI_Comm_size(MPI_COMM_WORLD, &processors); - - //char* inFileName = new char[file.length()]; - //memcpy(inFileName, file.c_str(), file.length()); - - char inFileName[1024]; - strcpy(inFileName, file.c_str()); - - MPI_File_open(MPI_COMM_WORLD, inFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer - //delete inFileName; - - if (pid == 0) { + MPI_Status status; + + if (byGroup) { + char inFileName[1024]; + strcpy(inFileName, file.c_str()); + + MPI_File_open(MPI_COMM_SELF, inFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + positions = m->setFilePosFasta(file, numSeqs); //fills MPIPos, returns numSeqs - - //send file positions to all processes - for(int i = 1; i < processors; i++) { - MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); - MPI_Send(&positions[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + + //read file + for(int i=0;icontrol_pressed) { MPI_File_close(&inMPI); return container; } + + //read next sequence + int seqlength = positions[i+1] - positions[i]; + char* buf4 = new char[seqlength]; + + MPI_File_read_at(inMPI, positions[i], buf4, seqlength, MPI_CHAR, &status); + + string tempBuf = buf4; + if (tempBuf.length() > seqlength) { tempBuf = tempBuf.substr(0, seqlength); } + delete buf4; + + istringstream iss (tempBuf,istringstream::in); + + Sequence* current = new Sequence(iss); + if (current->getName() != "") { + if (count == 0) { length = current->getAligned().length(); count++; } //gets first seqs length + else if (length != current->getAligned().length()) { unaligned = true; } + + container.push_back(current); + if (rdb->save) { rdb->referenceSeqs.push_back(*current); } + } } - }else{ - MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); - positions.resize(numSeqs+1); - MPI_Recv(&positions[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); - } - - //read file - for(int i=0;icontrol_pressed) { MPI_File_close(&inMPI); return container; } - - //read next sequence - int seqlength = positions[i+1] - positions[i]; - char* buf4 = new char[seqlength]; - - MPI_File_read_at(inMPI, positions[i], buf4, seqlength, MPI_CHAR, &status); - string tempBuf = buf4; - if (tempBuf.length() > seqlength) { tempBuf = tempBuf.substr(0, seqlength); } - delete buf4; - - istringstream iss (tempBuf,istringstream::in); - - Sequence* current = new Sequence(iss); - if (current->getName() != "") { - if (count == 0) { length = current->getAligned().length(); count++; } //gets first seqs length - else if (length != current->getAligned().length()) { unaligned = true; } - - container.push_back(current); - if (rdb->save) { rdb->referenceSeqs.push_back(*current); } + MPI_File_close(&inMPI); + + }else { + + MPI_Comm_rank(MPI_COMM_WORLD, &pid); //find out who we are + MPI_Comm_size(MPI_COMM_WORLD, &processors); + + //char* inFileName = new char[file.length()]; + //memcpy(inFileName, file.c_str(), file.length()); + + char inFileName[1024]; + strcpy(inFileName, file.c_str()); + + MPI_File_open(MPI_COMM_WORLD, inFileName, MPI_MODE_RDONLY, MPI_INFO_NULL, &inMPI); //comm, filename, mode, info, filepointer + //delete inFileName; + + if (pid == 0) { + positions = m->setFilePosFasta(file, numSeqs); //fills MPIPos, returns numSeqs + + //send file positions to all processes + for(int i = 1; i < processors; i++) { + MPI_Send(&numSeqs, 1, MPI_INT, i, tag, MPI_COMM_WORLD); + MPI_Send(&positions[0], (numSeqs+1), MPI_LONG, i, tag, MPI_COMM_WORLD); + } + }else{ + MPI_Recv(&numSeqs, 1, MPI_INT, 0, tag, MPI_COMM_WORLD, &status); + positions.resize(numSeqs+1); + MPI_Recv(&positions[0], (numSeqs+1), MPI_LONG, 0, tag, MPI_COMM_WORLD, &status); + } + + //read file + for(int i=0;icontrol_pressed) { MPI_File_close(&inMPI); return container; } + + //read next sequence + int seqlength = positions[i+1] - positions[i]; + char* buf4 = new char[seqlength]; + + MPI_File_read_at(inMPI, positions[i], buf4, seqlength, MPI_CHAR, &status); + + string tempBuf = buf4; + if (tempBuf.length() > seqlength) { tempBuf = tempBuf.substr(0, seqlength); } + delete buf4; + + istringstream iss (tempBuf,istringstream::in); + + Sequence* current = new Sequence(iss); + if (current->getName() != "") { + if (count == 0) { length = current->getAligned().length(); count++; } //gets first seqs length + else if (length != current->getAligned().length()) { unaligned = true; } + + container.push_back(current); + if (rdb->save) { rdb->referenceSeqs.push_back(*current); } + } } + + MPI_File_close(&inMPI); + MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case } - - MPI_File_close(&inMPI); - MPI_Barrier(MPI_COMM_WORLD); //make everyone wait - just in case #else ifstream in; @@ -198,7 +238,7 @@ vector Chimera::readSeqs(string file) { Sequence* current = new Sequence(in); m->gobble(in); if (count == 0) { length = current->getAligned().length(); count++; } //gets first seqs length - else if (length != current->getAligned().length()) { unaligned = true; } + else if (length != current->getAligned().length()) { unaligned = true; } if (current->getName() != "") { container.push_back(current);