]> git.donarmstrong.com Git - rsem.git/blobdiff - calcCI.cpp
Fixed a bug in perl scripts for printing error messages
[rsem.git] / calcCI.cpp
index f3964d18b8472d65a42b07355041ddeaed82a045..99ce14858825c989216dbd6ab309562218a8ae62 100644 (file)
@@ -5,10 +5,12 @@
 #include<cassert>
 #include<fstream>
 #include<algorithm>
-
-#include "boost/random.hpp"
+#include<vector>
+#include<pthread.h>
 
 #include "utils.h"
+#include "my_assert.h"
+#include "sampling.h"
 
 #include "Model.h"
 #include "SingleModel.h"
 #include "Refs.h"
 #include "GroupInfo.h"
 
+#include "Buffer.h"
 using namespace std;
 
-typedef unsigned long bufsize_type;
-typedef boost::mt19937 engine_type;
-typedef boost::gamma_distribution<> distribution_type;
-typedef boost::variate_generator<engine_type&, distribution_type> generator_type;
-
-const int FLOATSIZE = sizeof(float);
+struct Params {
+       int no;
+       FILE *fi;
+       engine_type *engine;
+       double *mw;
+};
 
 struct CIType {
        float lb, ub; // the interval is [lb, ub]
@@ -34,28 +37,23 @@ struct CIType {
        CIType() { lb = ub = 0.0; }
 };
 
-bool quiet;
+struct CIParams {
+       int no;
+       int start_gene_id, end_gene_id;
+};
+
 int model_type;
 
+int nMB;
 double confidence;
+int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC
+int nThreads;
+int cvlen;
 
-int nC, cvlen, nSpC, nSamples; // nSpC : number of sample theta vectors per count vector
-int fr, to; // each flush, sample fr .. to - 1
-
-int nMB;
-bufsize_type size;
-float *buffer;
 char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN];
-ofstream ftmpOut;
 
-int *cvec;
-double *theta;
 CIType *iso_tau, *gene_tau;
 
-engine_type engine(time(NULL));
-distribution_type **gammas;
-generator_type **rgs;
-
 int M, m;
 Refs refs;
 GroupInfo gi;
@@ -63,160 +61,180 @@ char imdName[STRLEN], statName[STRLEN];
 char modelF[STRLEN], groupF[STRLEN], refF[STRLEN];
 
 vector<double> eel; //expected effective lengths
-double *tau_denoms; // denominators for tau values
 
-template<class ModelType>
-void calcExpectedEffectiveLengths(ModelType& model) {
-  int lb, ub, span;
-  double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i)
-  
-  model.getGLD().copyTo(pdf, cdf, lb, ub, span);
-  clen = new double[span + 1];
-  clen[0] = 0.0;
-  for (int i = 1; i <= span; i++) {
-    clen[i] = clen[i - 1] + pdf[i] * (lb + i);
-  }
-
-  eel.clear();
-  eel.resize(M + 1, 0.0);
-  for (int i = 1; i <= M; i++) {
-    int totLen = refs.getRef(i).getTotLen();
-    int fullLen = refs.getRef(i).getFullLen();
-    int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
-    int pos2 = max(min(totLen, ub) - lb, 0);
-
-    if (pos2 == 0) { eel[i] = 0.0; continue; }
-    
-    eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
-    assert(eel[i] >= 0);
-    if (eel[i] < MINEEL) { eel[i] = 0.0; }
-  }
-  
-  delete[] pdf;
-  delete[] cdf;
-  delete[] clen;
-}
+Buffer *buffer;
 
-void flushToTempFile() {
-       int gap1 = fr * FLOATSIZE;
-       int gap2 = (nSamples - to) * FLOATSIZE;
-       float *p = NULL;
-
-       ftmpOut.seekp(0, ios_base::beg);
-       for (int i = 0; i < cvlen; i++) {
-               p = buffer + i;
-               ftmpOut.seekp(gap1, ios_base::cur);
-               for (int j = fr; j < to; j++) {
-                       ftmpOut.write((char*)p, FLOATSIZE);
-                       p += cvlen;
-               }
-               ftmpOut.seekp(gap2, ios_base::cur);
-       }
-}
+bool quiet;
 
-template<class ModelType>
-void sampling() {
-       float *p, *ub;
-       ifstream fin(cvsF);
-       ModelType model;
+Params *paramsArray;
+pthread_t *threads;
+pthread_attr_t attr;
+void *status;
+int rc;
 
-       model.read(modelF);
-       calcExpectedEffectiveLengths<ModelType>(model);
+CIParams *ciParamsArray;
 
-       ftmpOut.open(tmpF, ios::binary);
+template<class ModelType>
+void calcExpectedEffectiveLengths(ModelType& model) {
+       int lb, ub, span;
+       double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i)
+  
+       model.getGLD().copyTo(pdf, cdf, lb, ub, span);
+       clen = new double[span + 1];
+       clen[0] = 0.0;
+       for (int i = 1; i <= span; i++) {
+               clen[i] = clen[i - 1] + pdf[i] * (lb + i);
+       }
 
-       fin>>nC>>cvlen;
-       assert(cvlen = M + 1);
+       eel.assign(M + 1, 0.0);
+       for (int i = 1; i <= M; i++) {
+               int totLen = refs.getRef(i).getTotLen();
+               int fullLen = refs.getRef(i).getFullLen();
+               int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
+               int pos2 = max(min(totLen, ub) - lb, 0);
 
-       nSamples = nC * nSpC;
+               if (pos2 == 0) { eel[i] = 0.0; continue; }
+    
+               eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
+               assert(eel[i] >= 0);
+               if (eel[i] < MINEEL) { eel[i] = 0.0; }
+       }
+  
+       delete[] pdf;
+       delete[] cdf;
+       delete[] clen;
+}
 
-       fr = to = 0;
+void* sample_theta_from_c(void* arg) {
 
-       size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
-       if (size > (bufsize_type)nSamples) size = nSamples;
-       size *= cvlen;
-       buffer = new float[size];
+       int *cvec;
+       double *theta;
+       gamma_dist **gammas;
+       gamma_generator **rgs;
 
-       ub = buffer + size;
-       p = buffer;
+       Params *params = (Params*)arg;
+       FILE *fi = params->fi;
+       double *mw = params->mw;
 
        cvec = new int[cvlen];
        theta = new double[cvlen];
-       gammas = new distribution_type*[cvlen];
-       rgs = new generator_type*[cvlen];
+       gammas = new gamma_dist*[cvlen];
+       rgs = new gamma_generator*[cvlen];
 
-       tau_denoms = new double[nSamples];
-       memset(tau_denoms, 0, sizeof(double) * nSamples);
+       float **vecs = new float*[nSpC];
+       for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen];
 
-       double *mw = model.getMW();
-       for (int i = 0; i < nC; i++) {
-               for (int j = 0; j < cvlen; j++) {
-                       fin>>cvec[j];
-               }
+       int cnt = 0;
+       while (fscanf(fi, "%d", &cvec[0]) == 1) {
+               for (int j = 1; j < cvlen; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1);
+
+               ++cnt;
 
                for (int j = 0; j < cvlen; j++) {
-                       gammas[j] = new distribution_type(cvec[j]); // need change back before publishing
-                       rgs[j] = new generator_type(engine, *gammas[j]);
+                       gammas[j] = new gamma_dist(cvec[j]);
+                       rgs[j] = new gamma_generator(*(params->engine), *gammas[j]);
                }
 
-               for (int j = 0; j < nSpC; j++) {
+               for (int i = 0; i < nSpC; i++) {
                        double sum = 0.0;
-                       for (int k = 0; k < cvlen; k++) {
-                               theta[k] = (k == 0 || eel[k] > EPSILON ? (*rgs[k])() : 0.0);
-                               sum += theta[k];
+                       for (int j = 0; j < cvlen; j++) {
+                               theta[j] = ((j == 0 || eel[j] >= EPSILON) ? (*rgs[j])() : 0.0);
+                               sum += theta[j];
                        }
-                       assert(sum > 0.0);
-                       for (int k = 0; k < cvlen; k++) theta[k] /= sum;
+                       assert(sum >= EPSILON);
+                       for (int j = 0; j < cvlen; j++) theta[j] /= sum;
 
                        sum = 0.0;
-                       for (int k = 0; k < cvlen; k++) {
-                         theta[k] = (mw[k] < EPSILON ? 0.0 : theta[k] / mw[k]);
-                         sum += theta[k];
+                       for (int j = 0; j < cvlen; j++) {
+                               theta[j] = (mw[j] < EPSILON ? 0.0 : theta[j] / mw[j]);
+                               sum += theta[j];
                        }
                        assert(sum >= EPSILON);
-                       for (int k = 0; k < cvlen; k++) theta[k] /= sum;
-
-                       *p = (float)theta[0]; ++p;
-                       assert(1.0 - theta[0] > 0.0);
-                       for (int k = 1; k < cvlen; k++) {
-                               if (eel[k] > EPSILON) {
-                                       theta[k] /= (1.0 - theta[0]);
-                                       tau_denoms[to] += theta[k] / eel[k];
-                               }
-                               else {
-                                       if (theta[k] != 0.0) { fprintf(stderr, "K=%d Theta_K=%lf\n", k, theta[k]); exit(-1); }
+                       for (int j = 0; j < cvlen; j++) theta[j] /= sum;
+
+
+                       sum = 0.0;
+                       vecs[i][0] = theta[0];
+                       for (int j = 1; j < cvlen; j++)
+                               if (eel[j] >= EPSILON) {
+                                       vecs[i][j] = theta[j] / eel[j];
+                                       sum += vecs[i][j];
                                }
+                               else assert(theta[j] < EPSILON);
 
-                               *p = (float)theta[k];
-                               ++p;
-                       }
-                       ++to;
-                       if (p == ub) {
-                               flushToTempFile();
-                               p = buffer;
-                               fr = to;
-                               if (verbose) { printf("%d vectors are sampled!\n", to); }
-                       }
+                       assert(sum >= EPSILON);
+                       for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum;
                }
 
+               buffer->write(nSpC, vecs);
+
                for (int j = 0; j < cvlen; j++) {
                        delete gammas[j];
                        delete rgs[j];
                }
-       }
 
-       if (fr != to) { flushToTempFile(); }
-
-       fin.close();
-       ftmpOut.close();
-
-       delete[] buffer;
+               if (verbose && cnt % 100 == 0) { printf("Thread %d, %d count vectors are processed!\n", params->no, cnt); }
+       }
 
        delete[] cvec;
        delete[] theta;
        delete[] gammas;
        delete[] rgs;
 
+       for (int i = 0; i < nSpC; i++) delete[] vecs[i];
+       delete[] vecs;
+
+       return NULL;
+}
+
+template<class ModelType>
+void sample_theta_vectors_from_count_vectors() {
+       ModelType model;
+       model.read(modelF);
+       calcExpectedEffectiveLengths<ModelType>(model);
+
+
+       int num_threads = min(nThreads, nCV);
+
+       buffer = new Buffer(nMB, nSamples, cvlen, tmpF);
+
+       paramsArray = new Params[num_threads];
+       threads = new pthread_t[num_threads];
+
+       char inpF[STRLEN];
+       for (int i = 0; i < num_threads; i++) {
+               paramsArray[i].no = i;
+               sprintf(inpF, "%s%d", cvsF, i);
+               paramsArray[i].fi = fopen(inpF, "r");
+               paramsArray[i].engine = engineFactory::new_engine();
+               paramsArray[i].mw = model.getMW();
+       }
+
+       /* set thread attribute to be joinable */
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+       for (int i = 0; i < num_threads; i++) {
+               rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(&paramsArray[i]));
+               pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
+       }
+       for (int i = 0; i < num_threads; i++) {
+               rc = pthread_join(threads[i], &status);
+               pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
+       }
+
+       /* destroy attribute */
+       pthread_attr_destroy(&attr);
+       delete[] threads;
+
+       for (int i = 0; i < num_threads; i++) {
+               fclose(paramsArray[i].fi);
+               delete paramsArray[i].engine;
+       }
+       delete[] paramsArray;
+
+       delete buffer; // Must delete here, force the content left in the buffer be written into the disk
+
        if (verbose) { printf("Sampling is finished!\n"); }
 }
 
@@ -263,45 +281,96 @@ void calcCI(int nSamples, float *samples, float &lb, float &ub) {
        } while (p <= threshold);
 }
 
-void generateResults(char* imdName) {
+void* calcCI_batch(void* arg) {
        float *itsamples, *gtsamples;
        ifstream fin;
-       FILE *fo;
-       char outF[STRLEN];
-
-       iso_tau = new CIType[M + 1];
-       gene_tau = new CIType[m];
+       CIParams *ciParams = (CIParams*)arg;
 
        itsamples = new float[nSamples];
        gtsamples = new float[nSamples];
 
        fin.open(tmpF, ios::binary);
+       streampos pos = streampos(gi.spAt(ciParams->start_gene_id)) * nSamples * FLOATSIZE;
+       fin.seekg(pos, ios::beg);
 
-       //read sampled theta0 values
-       for (int k = 0; k < nSamples; k++) fin.read((char*)(&itsamples[k]), FLOATSIZE);
-
-       for (int i = 0; i < m; i++) {
+       int cnt = 0;
+       for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) {
                int b = gi.spAt(i), e = gi.spAt(i + 1);
                memset(gtsamples, 0, FLOATSIZE * nSamples);
                for (int j = b; j < e; j++) {
                        for (int k = 0; k < nSamples; k++) {
                                fin.read((char*)(&itsamples[k]), FLOATSIZE);
-                               if (eel[j] > EPSILON && tau_denoms[k] > EPSILON) { itsamples[k] = itsamples[k] / eel[j] / tau_denoms[k]; }
-                               else {
-                                       if (itsamples[k] != 0.0) { fprintf(stderr, "Not equal to 0! K=%d, Sampled Theta Value=%lf\n", k, itsamples[k]); exit(-1); }
-                                       itsamples[k] = 0.0;
-                               }
                                gtsamples[k] += itsamples[k];
                        }
                        calcCI(nSamples, itsamples, iso_tau[j].lb, iso_tau[j].ub);
                }
                calcCI(nSamples, gtsamples, gene_tau[i].lb, gene_tau[i].ub);
 
-               if (verbose && (i + 1) % 1000 == 0) { printf("%d genes are done!\n", i + 1); }
+               ++cnt;
+               if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); }
        }
 
        fin.close();
 
+       delete[] itsamples;
+       delete[] gtsamples;
+
+       return NULL;
+}
+
+void calculate_credibility_intervals(char* imdName) {
+       FILE *fo;
+       char outF[STRLEN];
+       int num_threads = nThreads;
+
+       iso_tau = new CIType[M + 1];
+       gene_tau = new CIType[m];
+
+       assert(M > 0);
+       int quotient = M / num_threads;
+       if (quotient < 1) { num_threads = M; quotient = 1; }
+       int cur_gene_id = 0;
+       int num_isoforms = 0;
+
+       // A just so so strategy for paralleling
+       ciParamsArray = new CIParams[num_threads];
+       for (int i = 0; i < num_threads; i++) {
+               ciParamsArray[i].no = i;
+               ciParamsArray[i].start_gene_id = cur_gene_id;
+               num_isoforms = 0;
+
+               while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) {
+                       num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id);
+                       ++cur_gene_id;
+               }
+
+               ciParamsArray[i].end_gene_id = cur_gene_id;
+       }
+
+       threads = new pthread_t[num_threads];
+
+       /* set thread attribute to be joinable */
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+       // paralleling
+       for (int i = 0; i < num_threads; i++) {
+               rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i]));
+               pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
+       }
+       for (int i = 0; i < num_threads; i++) {
+               rc = pthread_join(threads[i], &status);
+               pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
+       }
+
+       // releasing resources
+
+       /* destroy attribute */
+       pthread_attr_destroy(&attr);
+       delete[] threads;
+
+       delete[] ciParamsArray;
+
        //isoform level results
        sprintf(outF, "%s.iso_res", imdName);
        fo = fopen(outF, "a");
@@ -320,47 +389,34 @@ void generateResults(char* imdName) {
                fprintf(fo, "%.6g%c", gene_tau[i].ub, (i < m - 1 ? '\t' : '\n'));
        fclose(fo);
 
-       delete[] itsamples;
-       delete[] gtsamples;
-
        delete[] iso_tau;
        delete[] gene_tau;
 
        if (verbose) { printf("All credibility intervals are calculated!\n"); }
-
-       sprintf(outF, "%s.tau_denoms", imdName);
-       fo = fopen(outF, "w");
-       fprintf(fo, "%d\n", nSamples);
-       for (int i = 0; i < nSamples; i++) fprintf(fo, "%.15g ", tau_denoms[i]);
-       fprintf(fo, "\n");
-       fclose(fo);
 }
 
 int main(int argc, char* argv[]) {
-       if (argc < 7) {
-               printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nSpC nMB[-q]\n");
+       if (argc < 8) {
+               printf("Usage: rsem-calculate-credibility-intervals reference_name imdName statName confidence nCV nSpC nMB [-p #Threads] [-q]\n");
                exit(-1);
        }
 
+       strcpy(imdName, argv[2]);
+       strcpy(statName, argv[3]);
+
        confidence = atof(argv[4]);
-       nSpC = atoi(argv[5]);
-       nMB = atoi(argv[6]);
+       nCV = atoi(argv[5]);
+       nSpC = atoi(argv[6]);
+       nMB = atoi(argv[7]);
 
+       nThreads = 1;
        quiet = false;
-       if (argc > 7 && !strcmp(argv[7], "-q")) {
-               quiet = true;
+       for (int i = 8; i < argc; i++) {
+               if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
+               if (!strcmp(argv[i], "-q")) quiet = true;
        }
        verbose = !quiet;
 
-       sprintf(imdName, "%s.temp/%s", argv[2], argv[3]);
-       sprintf(statName, "%s.stat/%s", argv[2], argv[3]);
-
-       sprintf(modelF, "%s.model", statName);
-       FILE *fi = fopen(modelF, "r");
-       if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); }
-       fscanf(fi, "%d", &model_type);
-       fclose(fi);
-
        sprintf(refF, "%s.seq", argv[1]);
        refs.loadRefs(refF, 1);
        M = refs.getM();
@@ -368,19 +424,29 @@ int main(int argc, char* argv[]) {
        gi.load(groupF);
        m = gi.getm();
 
+       nSamples = nCV * nSpC;
+       cvlen = M + 1;
+       assert(nSamples > 0 && cvlen > 1); // for Buffter.h: (bufsize_type)nSamples
+
        sprintf(tmpF, "%s.tmp", imdName);
        sprintf(cvsF, "%s.countvectors", imdName);
 
+       sprintf(modelF, "%s.model", statName);
+       FILE *fi = fopen(modelF, "r");
+       general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!");
+       assert(fscanf(fi, "%d", &model_type) == 1);
+       fclose(fi);
+
+       // Phase I
        switch(model_type) {
-       case 0 : sampling<SingleModel>(); break;
-       case 1 : sampling<SingleQModel>(); break;
-       case 2 : sampling<PairedEndModel>(); break;
-       case 3 : sampling<PairedEndQModel>(); break;
+       case 0 : sample_theta_vectors_from_count_vectors<SingleModel>(); break;
+       case 1 : sample_theta_vectors_from_count_vectors<SingleQModel>(); break;
+       case 2 : sample_theta_vectors_from_count_vectors<PairedEndModel>(); break;
+       case 3 : sample_theta_vectors_from_count_vectors<PairedEndQModel>(); break;
        }
 
-       generateResults(imdName);
-
-       delete[] tau_denoms;
+       // Phase II
+       calculate_credibility_intervals(imdName);
 
        /*
        sprintf(command, "rm -f %s", tmpF);
@@ -389,7 +455,7 @@ int main(int argc, char* argv[]) {
                fprintf(stderr, "Cannot delete %s!\n", tmpF);
                exit(-1);
        }
-        */
+       */
 
        return 0;
 }