From 4a5e5138d3fc409e7ade5c14de7689612290f74f Mon Sep 17 00:00:00 2001 From: Bo Li Date: Tue, 7 Feb 2012 22:38:51 -0600 Subject: [PATCH] rewrote parallelization of calcCI.cpp --- Buffer.h | 37 +++++++--------------- Gibbs.cpp | 9 ++++++ calcCI.cpp | 72 +++++++++++++++++++++++++++--------------- makefile | 2 -- rsem-prepare-reference | 2 +- 5 files changed, 69 insertions(+), 53 deletions(-) diff --git a/Buffer.h b/Buffer.h index 1ce28bc..1c5a2d6 100644 --- a/Buffer.h +++ b/Buffer.h @@ -3,48 +3,36 @@ #include #include -#include - -#include "my_assert.h" typedef unsigned long long bufsize_type; const int FLOATSIZE = sizeof(float); class Buffer { public: - Buffer(int nMB, int nSamples, int cvlen, const char* tmpF) { + Buffer(bufsize_type size, int sp, int nSamples, int cvlen, const char* tmpF) { cpos = 0; - size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; - if (size > (bufsize_type)nSamples) size = nSamples; - general_assert(size > 0, "Memory allocated for credibility intervals is not enough!"); - size *= cvlen; - + this->size = size; buffer = new float[size]; ftmpOut.open(tmpF, std::ios::binary); - pthread_mutex_init(&lock, NULL); - fr = to = 0; + fr = to = sp; this->nSamples = nSamples; this->cvlen = cvlen; + } ~Buffer() { if (fr < to) flushToTempFile(); delete[] buffer; - pthread_mutex_destroy(&lock); ftmpOut.close(); } - void write(int n, float **vecs) { - pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!"); - for (int i = 0; i < n; i++) { - if (size - cpos < bufsize_type(cvlen)) flushToTempFile(); - memcpy(buffer + cpos, vecs[i], FLOATSIZE * cvlen); - cpos += cvlen; - ++to; - } - pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!"); + void write(float *vec) { + if (size - cpos < bufsize_type(cvlen)) flushToTempFile(); + memcpy(buffer + cpos, vec, FLOATSIZE * cvlen); + cpos += cvlen; + ++to; } private: @@ -52,7 +40,6 @@ private: float *buffer; std::ofstream ftmpOut; - pthread_mutex_t lock; int fr, to; // each flush, sample fr .. to - 1 int nSamples, cvlen; @@ -62,15 +49,15 @@ private: std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE; float *p = NULL; - ftmpOut.seekp(0, std::ios_base::beg); + ftmpOut.seekp(0, std::ios::beg); for (int i = 0; i < cvlen; i++) { p = buffer + i; - ftmpOut.seekp(gap1, std::ios_base::cur); + ftmpOut.seekp(gap1, std::ios::cur); for (int j = fr; j < to; j++) { ftmpOut.write((char*)p, FLOATSIZE); p += cvlen; } - ftmpOut.seekp(gap2, std::ios_base::cur); + ftmpOut.seekp(gap2, std::ios::cur); } cpos = 0; diff --git a/Gibbs.cpp b/Gibbs.cpp index 1a084d6..f911da5 100644 --- a/Gibbs.cpp +++ b/Gibbs.cpp @@ -130,6 +130,7 @@ void load_data(char* reference_name, char* statName, char* imdName) { void init() { int quotient, left; char outF[STRLEN]; + char splitF[STRLEN]; quotient = NSAMPLES / nThreads; left = NSAMPLES % nThreads; @@ -156,6 +157,14 @@ void init() { memset(paramsArray[i].pme_theta, 0, sizeof(double) * (M + 1)); } + // output task splitting information + sprintf(splitF, "%s.split", imdName); + FILE *fo = fopen(splitF, "w"); + fprintf(fo, "%d", nThreads); + for (int i = 0; i < nThreads; i++) fprintf(fo, " %d", paramsArray[i].nsamples); + fprintf(fo, "\n"); + fclose(fo); + /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); diff --git a/calcCI.cpp b/calcCI.cpp index 79a6bb2..3ae7115 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -27,6 +27,8 @@ using namespace std; struct Params { int no; FILE *fi; + bufsize_type size; + int sp; engine_type *engine; double *mw; }; @@ -115,14 +117,14 @@ void* sample_theta_from_c(void* arg) { Params *params = (Params*)arg; FILE *fi = params->fi; double *mw = params->mw; + Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF); cvec = new int[cvlen]; theta = new double[cvlen]; gammas = new gamma_dist*[cvlen]; rgs = new gamma_generator*[cvlen]; - float **vecs = new float*[nSpC]; - for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen]; + float *vec = new float[cvlen]; int cnt = 0; while (fscanf(fi, "%d", &cvec[0]) == 1) { @@ -154,19 +156,18 @@ void* sample_theta_from_c(void* arg) { sum = 0.0; - vecs[i][0] = theta[0]; + vec[0] = theta[0]; for (int j = 1; j < cvlen; j++) if (eel[j] >= EPSILON) { - vecs[i][j] = theta[j] / eel[j]; - sum += vecs[i][j]; + vec[j] = theta[j] / eel[j]; + sum += vec[j]; } else assert(theta[j] < EPSILON); - assert(sum >= EPSILON); - for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum; - } + for (int j = 1; j < cvlen; j++) vec[j] /= sum; - buffer->write(nSpC, vecs); + buffer.write(vec); + } for (int j = 0; j < cvlen; j++) { delete gammas[j]; @@ -181,8 +182,7 @@ void* sample_theta_from_c(void* arg) { delete[] gammas; delete[] rgs; - for (int i = 0; i < nSpC; i++) delete[] vecs[i]; - delete[] vecs; + delete[] vec; return NULL; } @@ -193,29 +193,57 @@ void sample_theta_vectors_from_count_vectors() { model.read(modelF); calcExpectedEffectiveLengths(model); - buffer = new Buffer(nMB, nSamples, cvlen, tmpF); + char splitF[STRLEN]; + bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; + bufsize_type quotient, left; + int sum = 0; - paramsArray = new Params[nThreads]; - threads = new pthread_t[nThreads]; + sprintf(splitF, "%s.split", imdName); + FILE *fi = fopen(splitF, "r"); + int num_threads; + assert(fscanf(fi, "%d", &num_threads) == 1); + assert(num_threads <= nThreads); + + quotient = buf_maxcv / num_threads; + assert(quotient > 0); + left = buf_maxcv % num_threads; + + paramsArray = new Params[num_threads]; + threads = new pthread_t[num_threads]; char inpF[STRLEN]; - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { paramsArray[i].no = i; sprintf(inpF, "%s%d", cvsF, i); paramsArray[i].fi = fopen(inpF, "r"); + + int num_samples; + assert(fscanf(fi, "%d", &num_samples) == 1); + num_samples *= nSpC; + if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples; + else { + paramsArray[i].size = quotient; + if (left > 0) { ++paramsArray[i].size; --left; } + } + paramsArray[i].size *= cvlen * FLOATSIZE; + paramsArray[i].sp = sum; + sum += num_samples; + paramsArray[i].engine = engineFactory::new_engine(); paramsArray[i].mw = model.getMW(); } + fclose(fi); + /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(¶msArray[i])); pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); } - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_join(threads[i], &status); pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); } @@ -224,14 +252,12 @@ void sample_theta_vectors_from_count_vectors() { pthread_attr_destroy(&attr); delete[] threads; - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { fclose(paramsArray[i].fi); delete paramsArray[i].engine; } delete[] paramsArray; - delete buffer; // Must delete here, force the content left in the buffer be written into the disk - if (verbose) { printf("Sampling is finished!\n"); } } @@ -322,6 +348,7 @@ void calculate_credibility_intervals(char* imdName) { iso_tau = new CIType[M + 1]; gene_tau = new CIType[m]; + // nThreads must be intact here. assert(M > 0); int quotient = M / nThreads; if (quotient < 1) { nThreads = M; quotient = 1; } @@ -410,11 +437,6 @@ int main(int argc, char* argv[]) { } verbose = !quiet; - if (nThreads > nCV) { - nThreads = nCV; - printf("Warning: Number of count vectors is less than number of threads! Change the number of threads to %d!\n", nThreads); - } - sprintf(refF, "%s.seq", argv[1]); refs.loadRefs(refF, 1); M = refs.getM(); diff --git a/makefile b/makefile index 45ab35d..211a7ea 100644 --- a/makefile +++ b/makefile @@ -118,8 +118,6 @@ rsem-run-gibbs : Gibbs.o Gibbs.o : utils.h my_assert.h boost/random.hpp sampling.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h Gibbs.cpp $(CC) $(COFLAGS) Gibbs.cpp -Buffer.h : my_assert.h - rsem-calculate-credibility-intervals : calcCI.o $(CC) -o rsem-calculate-credibility-intervals calcCI.o -lpthread diff --git a/rsem-prepare-reference b/rsem-prepare-reference index 78743e9..80a9549 100755 --- a/rsem-prepare-reference +++ b/rsem-prepare-reference @@ -155,7 +155,7 @@ Each line of should be of the form: gene_id transcript_id with the two fields separated by a tab character. - + If you are using a GTF file for the "UCSC Genes" gene set from the UCSC Genome Browser, then the "knownIsoforms.txt" file (obtained from the "Downloads" section of the UCSC Genome Browser site) is of this format. If this option is off, then the mapping of isoforms to genes depends on whether the --gtf option is specified. If --gtf is specified, then RSEM uses the "gene_id" and "transcript_id" attributes in the GTF file. Otherwise, RSEM assumes that each sequence in the reference sequence files is a separate gene. -- 2.39.2