]> git.donarmstrong.com Git - rsem.git/commitdiff
rewrote parallelization of calcCI.cpp
authorBo Li <bli@cs.wisc.edu>
Wed, 8 Feb 2012 04:38:51 +0000 (22:38 -0600)
committerBo Li <bli@cs.wisc.edu>
Wed, 8 Feb 2012 04:38:51 +0000 (22:38 -0600)
Buffer.h
Gibbs.cpp
calcCI.cpp
makefile
rsem-prepare-reference

index 1ce28bca18d4e21193965886cd92e0d64e5a927e..1c5a2d6f5415a901a9ec7ee335207a2644d2d6d6 100644 (file)
--- a/Buffer.h
+++ b/Buffer.h
@@ -3,48 +3,36 @@
 
 #include<cstdio>
 #include<fstream>
-#include<pthread.h>
-
-#include "my_assert.h"
 
 typedef unsigned long long bufsize_type;
 const int FLOATSIZE = sizeof(float);
 
 class Buffer {
 public:
-       Buffer(int nMB, int nSamples, int cvlen, const char* tmpF) {
+       Buffer(bufsize_type size, int sp, int nSamples, int cvlen, const char* tmpF) {
                cpos = 0;
-               size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
-               if (size > (bufsize_type)nSamples) size = nSamples;
-               general_assert(size > 0, "Memory allocated for credibility intervals is not enough!");
-               size *= cvlen;
-
+               this->size = size;
                buffer = new float[size];
                ftmpOut.open(tmpF, std::ios::binary);
-               pthread_mutex_init(&lock, NULL);
 
-               fr = to = 0;
+               fr = to = sp;
                this->nSamples = nSamples;
                this->cvlen = cvlen;
+
        }
 
        ~Buffer() {
                if (fr < to) flushToTempFile();
 
                delete[] buffer;
-               pthread_mutex_destroy(&lock);
                ftmpOut.close();
        }
 
-       void write(int n, float **vecs) {
-               pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!");
-               for (int i = 0; i < n; i++) {
-                       if (size - cpos < bufsize_type(cvlen)) flushToTempFile();
-                       memcpy(buffer + cpos, vecs[i], FLOATSIZE * cvlen);
-                       cpos += cvlen;
-                       ++to;
-               }
-               pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!");
+       void write(float *vec) {
+               if (size - cpos < bufsize_type(cvlen)) flushToTempFile();
+               memcpy(buffer + cpos, vec, FLOATSIZE * cvlen);
+               cpos += cvlen;
+               ++to;
        }
 
 private:
@@ -52,7 +40,6 @@ private:
 
        float *buffer;
        std::ofstream ftmpOut;
-       pthread_mutex_t lock;
 
        int fr, to; // each flush, sample fr .. to - 1
        int nSamples, cvlen;
@@ -62,15 +49,15 @@ private:
                std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE;
                float *p = NULL;
 
-               ftmpOut.seekp(0, std::ios_base::beg);
+               ftmpOut.seekp(0, std::ios::beg);
                for (int i = 0; i < cvlen; i++) {
                        p = buffer + i;
-                       ftmpOut.seekp(gap1, std::ios_base::cur);
+                       ftmpOut.seekp(gap1, std::ios::cur);
                        for (int j = fr; j < to; j++) {
                                ftmpOut.write((char*)p, FLOATSIZE);
                                p += cvlen;
                        }
-                       ftmpOut.seekp(gap2, std::ios_base::cur);
+                       ftmpOut.seekp(gap2, std::ios::cur);
                }
 
                cpos = 0;
index 1a084d6b2828731daefe71d8b591d686fa13c19c..f911da53c3e95370a224b95d64250abb3434bb7c 100644 (file)
--- a/Gibbs.cpp
+++ b/Gibbs.cpp
@@ -130,6 +130,7 @@ void load_data(char* reference_name, char* statName, char* imdName) {
 void init() {
        int quotient, left;
        char outF[STRLEN];
+       char splitF[STRLEN];
 
        quotient = NSAMPLES / nThreads;
        left = NSAMPLES % nThreads;
@@ -156,6 +157,14 @@ void init() {
                memset(paramsArray[i].pme_theta, 0, sizeof(double) * (M + 1));
        }
 
+       // output task splitting information
+       sprintf(splitF, "%s.split", imdName);
+       FILE *fo = fopen(splitF, "w");
+       fprintf(fo, "%d", nThreads);
+       for (int i = 0; i < nThreads; i++) fprintf(fo, " %d", paramsArray[i].nsamples);
+       fprintf(fo, "\n");
+       fclose(fo);
+
        /* set thread attribute to be joinable */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
index 79a6bb2f924ddd0746e0cd816430541ad00e0370..3ae71157c58a023bb4cd7fe316e26c7b9e572663 100644 (file)
@@ -27,6 +27,8 @@ using namespace std;
 struct Params {
        int no;
        FILE *fi;
+       bufsize_type size;
+       int sp;
        engine_type *engine;
        double *mw;
 };
@@ -115,14 +117,14 @@ void* sample_theta_from_c(void* arg) {
        Params *params = (Params*)arg;
        FILE *fi = params->fi;
        double *mw = params->mw;
+       Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF);
 
        cvec = new int[cvlen];
        theta = new double[cvlen];
        gammas = new gamma_dist*[cvlen];
        rgs = new gamma_generator*[cvlen];
 
-       float **vecs = new float*[nSpC];
-       for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen];
+       float *vec = new float[cvlen];
 
        int cnt = 0;
        while (fscanf(fi, "%d", &cvec[0]) == 1) {
@@ -154,19 +156,18 @@ void* sample_theta_from_c(void* arg) {
 
 
                        sum = 0.0;
-                       vecs[i][0] = theta[0];
+                       vec[0] = theta[0];
                        for (int j = 1; j < cvlen; j++)
                                if (eel[j] >= EPSILON) {
-                                       vecs[i][j] = theta[j] / eel[j];
-                                       sum += vecs[i][j];
+                                       vec[j] = theta[j] / eel[j];
+                                       sum += vec[j];
                                }
                                else assert(theta[j] < EPSILON);
-
                        assert(sum >= EPSILON);
-                       for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum;
-               }
+                       for (int j = 1; j < cvlen; j++) vec[j] /= sum;
 
-               buffer->write(nSpC, vecs);
+                       buffer.write(vec);
+               }
 
                for (int j = 0; j < cvlen; j++) {
                        delete gammas[j];
@@ -181,8 +182,7 @@ void* sample_theta_from_c(void* arg) {
        delete[] gammas;
        delete[] rgs;
 
-       for (int i = 0; i < nSpC; i++) delete[] vecs[i];
-       delete[] vecs;
+       delete[] vec;
 
        return NULL;
 }
@@ -193,29 +193,57 @@ void sample_theta_vectors_from_count_vectors() {
        model.read(modelF);
        calcExpectedEffectiveLengths<ModelType>(model);
 
-       buffer = new Buffer(nMB, nSamples, cvlen, tmpF);
+       char splitF[STRLEN];
+       bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
+       bufsize_type quotient, left;
+       int sum = 0;
 
-       paramsArray = new Params[nThreads];
-       threads = new pthread_t[nThreads];
+       sprintf(splitF, "%s.split", imdName);
+       FILE *fi = fopen(splitF, "r");
+       int num_threads;
+       assert(fscanf(fi, "%d", &num_threads) == 1);
+       assert(num_threads <= nThreads);
+
+       quotient = buf_maxcv / num_threads;
+       assert(quotient > 0);
+       left = buf_maxcv % num_threads;
+
+       paramsArray = new Params[num_threads];
+       threads = new pthread_t[num_threads];
 
        char inpF[STRLEN];
-       for (int i = 0; i < nThreads; i++) {
+       for (int i = 0; i < num_threads; i++) {
                paramsArray[i].no = i;
                sprintf(inpF, "%s%d", cvsF, i);
                paramsArray[i].fi = fopen(inpF, "r");
+
+               int num_samples;
+               assert(fscanf(fi, "%d", &num_samples) == 1);
+               num_samples *= nSpC;
+               if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples;
+               else {
+                       paramsArray[i].size = quotient;
+                       if (left > 0) { ++paramsArray[i].size; --left; }
+               }
+               paramsArray[i].size *=  cvlen * FLOATSIZE;
+               paramsArray[i].sp = sum;
+               sum += num_samples;
+
                paramsArray[i].engine = engineFactory::new_engine();
                paramsArray[i].mw = model.getMW();
        }
 
+       fclose(fi);
+
        /* set thread attribute to be joinable */
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
-       for (int i = 0; i < nThreads; i++) {
+       for (int i = 0; i < num_threads; i++) {
                rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(&paramsArray[i]));
                pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
        }
-       for (int i = 0; i < nThreads; i++) {
+       for (int i = 0; i < num_threads; i++) {
                rc = pthread_join(threads[i], &status);
                pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
        }
@@ -224,14 +252,12 @@ void sample_theta_vectors_from_count_vectors() {
        pthread_attr_destroy(&attr);
        delete[] threads;
 
-       for (int i = 0; i < nThreads; i++) {
+       for (int i = 0; i < num_threads; i++) {
                fclose(paramsArray[i].fi);
                delete paramsArray[i].engine;
        }
        delete[] paramsArray;
 
-       delete buffer; // Must delete here, force the content left in the buffer be written into the disk
-
        if (verbose) { printf("Sampling is finished!\n"); }
 }
 
@@ -322,6 +348,7 @@ void calculate_credibility_intervals(char* imdName) {
        iso_tau = new CIType[M + 1];
        gene_tau = new CIType[m];
 
+       // nThreads must be intact here.
        assert(M > 0);
        int quotient = M / nThreads;
        if (quotient < 1) { nThreads = M; quotient = 1; }
@@ -410,11 +437,6 @@ int main(int argc, char* argv[]) {
        }
        verbose = !quiet;
 
-       if (nThreads > nCV) {
-               nThreads = nCV;
-               printf("Warning: Number of count vectors is less than number of threads! Change the number of threads to %d!\n", nThreads);
-       }
-
        sprintf(refF, "%s.seq", argv[1]);
        refs.loadRefs(refF, 1);
        M = refs.getM();
index 45ab35d11cee55d44654949830bf4c0b43869d66..211a7ea5bbc86ee39362e3465c54fa6dd05f7304 100644 (file)
--- a/makefile
+++ b/makefile
@@ -118,8 +118,6 @@ rsem-run-gibbs : Gibbs.o
 Gibbs.o : utils.h my_assert.h boost/random.hpp sampling.h Model.h SingleModel.h SingleQModel.h PairedEndModel.h PairedEndQModel.h RefSeq.h RefSeqPolicy.h PolyARules.h Refs.h GroupInfo.h Gibbs.cpp 
        $(CC) $(COFLAGS) Gibbs.cpp
 
-Buffer.h : my_assert.h
-
 rsem-calculate-credibility-intervals : calcCI.o
        $(CC) -o rsem-calculate-credibility-intervals calcCI.o -lpthread
 
index 78743e946f2b2f1b6b02ecdb6cbeeaaebdf3c70e..80a954942031c88f6e6e50eb3fd84c86793ad20a 100755 (executable)
@@ -155,7 +155,7 @@ Each line of <file> should be of the form:
 gene_id transcript_id
 
 with the two fields separated by a tab character.
+
 If you are using a GTF file for the "UCSC Genes" gene set from the UCSC Genome Browser, then the "knownIsoforms.txt" file (obtained from the "Downloads" section of the UCSC Genome Browser site) is of this format.
 
 If this option is off, then the mapping of isoforms to genes depends on whether the --gtf option is specified.  If --gtf is specified, then RSEM uses the "gene_id" and "transcript_id" attributes in the GTF file.  Otherwise, RSEM assumes that each sequence in the reference sequence files is a separate gene.