#include<cassert>
#include<fstream>
#include<algorithm>
-
-#include "boost/random.hpp"
+#include<vector>
+#include<pthread.h>
#include "utils.h"
+#include "my_assert.h"
+#include "sampling.h"
#include "Model.h"
#include "SingleModel.h"
#include "Refs.h"
#include "GroupInfo.h"
-using namespace std;
+#include "Buffer.h"
-typedef unsigned long bufsize_type;
-typedef boost::mt19937 engine_type;
-typedef boost::gamma_distribution<> distribution_type;
-typedef boost::variate_generator<engine_type&, distribution_type> generator_type;
+using namespace std;
-const int FLOATSIZE = sizeof(float);
+struct Params {
+ int no;
+ FILE *fi;
+ engine_type *engine;
+ const double *mw;
+};
struct CIType {
float lb, ub; // the interval is [lb, ub]
CIType() { lb = ub = 0.0; }
};
-bool quiet;
+struct CIParams {
+ int no;
+ int start_gene_id, end_gene_id;
+};
+
int model_type;
+int nMB;
double confidence;
+int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC
+int nThreads;
-int nC, cvlen, nSpC, nSamples; // nSpC : number of sample theta vectors per count vector
-int fr, to; // each flush, sample fr .. to - 1
+float *l_bars;
-int nMB;
-bufsize_type size;
-float *buffer;
char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN];
-ofstream ftmpOut;
-
-int *cvec;
-double *theta;
-CIType *iso_tau, *gene_tau;
-engine_type engine(time(NULL));
-distribution_type **gammas;
-generator_type **rgs;
+CIType *iso_tpm, *gene_tpm, *iso_fpkm, *gene_fpkm;
int M, m;
Refs refs;
char modelF[STRLEN], groupF[STRLEN], refF[STRLEN];
vector<double> eel; //expected effective lengths
-double *tau_denoms; // denominators for tau values
-
-template<class ModelType>
-void calcExpectedEffectiveLengths(ModelType& model) {
- int lb, ub, span;
- double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i)
-
- model.getGLD().copyTo(pdf, cdf, lb, ub, span);
- clen = new double[span + 1];
- clen[0] = 0.0;
- for (int i = 1; i <= span; i++) {
- clen[i] = clen[i - 1] + pdf[i] * (lb + i);
- }
-
- eel.clear();
- eel.resize(M + 1, 0.0);
- for (int i = 1; i <= M; i++) {
- int totLen = refs.getRef(i).getTotLen();
- int fullLen = refs.getRef(i).getFullLen();
- int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
- int pos2 = max(min(totLen, ub) - lb, 0);
-
- if (pos2 == 0) { eel[i] = 0.0; continue; }
-
- eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
- assert(eel[i] >= 0);
- if (eel[i] < MINEEL) { eel[i] = 0.0; }
- }
-
- delete[] pdf;
- delete[] cdf;
- delete[] clen;
-}
-
-void flushToTempFile() {
- int gap1 = fr * FLOATSIZE;
- int gap2 = (nSamples - to) * FLOATSIZE;
- float *p = NULL;
-
- ftmpOut.seekp(0, ios_base::beg);
- for (int i = 0; i < cvlen; i++) {
- p = buffer + i;
- ftmpOut.seekp(gap1, ios_base::cur);
- for (int j = fr; j < to; j++) {
- ftmpOut.write((char*)p, FLOATSIZE);
- p += cvlen;
- }
- ftmpOut.seekp(gap2, ios_base::cur);
- }
-}
-template<class ModelType>
-void sampling() {
- float *p, *ub;
- ifstream fin(cvsF);
- ModelType model;
+Buffer *buffer;
- model.read(modelF);
- calcExpectedEffectiveLengths<ModelType>(model);
-
- ftmpOut.open(tmpF, ios::binary);
-
- fin>>nC>>cvlen;
- assert(cvlen = M + 1);
-
- nSamples = nC * nSpC;
-
- fr = to = 0;
+bool quiet;
- size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen;
- if (size > (bufsize_type)nSamples) size = nSamples;
- size *= cvlen;
- buffer = new float[size];
+Params *paramsArray;
+pthread_t *threads;
+pthread_attr_t attr;
+int rc;
- ub = buffer + size;
- p = buffer;
+CIParams *ciParamsArray;
- cvec = new int[cvlen];
- theta = new double[cvlen];
- gammas = new distribution_type*[cvlen];
- rgs = new generator_type*[cvlen];
+template<class ModelType>
+void calcExpectedEffectiveLengths(ModelType& model) {
+ int lb, ub, span;
+ double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i)
+
+ model.getGLD().copyTo(pdf, cdf, lb, ub, span);
+ clen = new double[span + 1];
+ clen[0] = 0.0;
+ for (int i = 1; i <= span; i++) {
+ clen[i] = clen[i - 1] + pdf[i] * (lb + i);
+ }
- tau_denoms = new double[nSamples];
- memset(tau_denoms, 0, sizeof(double) * nSamples);
+ eel.assign(M + 1, 0.0);
+ for (int i = 1; i <= M; i++) {
+ int totLen = refs.getRef(i).getTotLen();
+ int fullLen = refs.getRef(i).getFullLen();
+ int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0);
+ int pos2 = max(min(totLen, ub) - lb, 0);
- double *mw = model.getMW();
- for (int i = 0; i < nC; i++) {
- for (int j = 0; j < cvlen; j++) {
- fin>>cvec[j];
- }
+ if (pos2 == 0) { eel[i] = 0.0; continue; }
+
+ eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1]));
+ assert(eel[i] >= 0);
+ if (eel[i] < MINEEL) { eel[i] = 0.0; }
+ }
+
+ delete[] pdf;
+ delete[] cdf;
+ delete[] clen;
+}
- for (int j = 0; j < cvlen; j++) {
- gammas[j] = new distribution_type(cvec[j]); // need change back before publishing
- rgs[j] = new generator_type(engine, *gammas[j]);
+void* sample_theta_from_c(void* arg) {
+ int *cvec;
+ double *theta;
+ float *tpm;
+ gamma_dist **gammas;
+ gamma_generator **rgs;
+
+ Params *params = (Params*)arg;
+ FILE *fi = params->fi;
+ const double *mw = params->mw;
+
+ cvec = new int[M + 1];
+ theta = new double[M + 1];
+ gammas = new gamma_dist*[M + 1];
+ rgs = new gamma_generator*[M + 1];
+ tpm = new float[M + 1];
+ float l_bar; // the mean transcript length over the sample
+
+ int cnt = 0;
+ while (fscanf(fi, "%d", &cvec[0]) == 1) {
+ for (int j = 1; j <= M; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1);
+
+ ++cnt;
+
+ for (int j = 0; j <= M; j++) {
+ gammas[j] = new gamma_dist(cvec[j]);
+ rgs[j] = new gamma_generator(*(params->engine), *gammas[j]);
}
- for (int j = 0; j < nSpC; j++) {
+ for (int i = 0; i < nSpC; i++) {
double sum = 0.0;
- for (int k = 0; k < cvlen; k++) {
- theta[k] = (k == 0 || eel[k] > EPSILON ? (*rgs[k])() : 0.0);
- sum += theta[k];
+ for (int j = 0; j <= M; j++) {
+ theta[j] = ((j == 0 || (eel[j] >= EPSILON && mw[j] >= EPSILON)) ? (*rgs[j])() / mw[j] : 0.0);
+ sum += theta[j];
}
- assert(sum > 0.0);
- for (int k = 0; k < cvlen; k++) theta[k] /= sum;
+ assert(sum >= EPSILON);
+ for (int j = 0; j <= M; j++) theta[j] /= sum;
sum = 0.0;
- for (int k = 0; k < cvlen; k++) {
- theta[k] = (mw[k] < EPSILON ? 0.0 : theta[k] / mw[k]);
- sum += theta[k];
- }
- assert(sum >= EPSILON);
- for (int k = 0; k < cvlen; k++) theta[k] /= sum;
-
- *p = (float)theta[0]; ++p;
- assert(1.0 - theta[0] > 0.0);
- for (int k = 1; k < cvlen; k++) {
- if (eel[k] > EPSILON) {
- theta[k] /= (1.0 - theta[0]);
- tau_denoms[to] += theta[k] / eel[k];
+ tpm[0] = 0.0;
+ for (int j = 1; j <= M; j++)
+ if (eel[j] >= EPSILON) {
+ tpm[j] = theta[j] / eel[j];
+ sum += tpm[j];
}
- else {
- if (theta[k] != 0.0) { fprintf(stderr, "K=%d Theta_K=%lf\n", k, theta[k]); exit(-1); }
- }
-
- *p = (float)theta[k];
- ++p;
- }
- ++to;
- if (p == ub) {
- flushToTempFile();
- p = buffer;
- fr = to;
- if (verbose) { printf("%d vectors are sampled!\n", to); }
- }
+ else assert(theta[j] < EPSILON);
+ assert(sum >= EPSILON);
+ l_bar = 0.0; // store mean effective length of the sample
+ for (int j = 1; j <= M; j++) { tpm[j] /= sum; l_bar += tpm[j] * eel[j]; tpm[j] *= 1e6; }
+ buffer->write(l_bar, tpm + 1); // ommit the first element in tpm
}
- for (int j = 0; j < cvlen; j++) {
+ for (int j = 0; j <= M; j++) {
delete gammas[j];
delete rgs[j];
}
- }
-
- if (fr != to) { flushToTempFile(); }
- fin.close();
- ftmpOut.close();
-
- delete[] buffer;
+ if (verbose && cnt % 100 == 0) { printf("Thread %d, %d count vectors are processed!\n", params->no, cnt); }
+ }
delete[] cvec;
delete[] theta;
delete[] gammas;
delete[] rgs;
+ delete[] tpm;
+
+ return NULL;
+}
+
+template<class ModelType>
+void sample_theta_vectors_from_count_vectors() {
+ ModelType model;
+ model.read(modelF);
+ calcExpectedEffectiveLengths<ModelType>(model);
+
+ int num_threads = min(nThreads, nCV);
+
+ buffer = new Buffer(nMB, nSamples, M, l_bars, tmpF);
+
+ paramsArray = new Params[num_threads];
+ threads = new pthread_t[num_threads];
+
+ char inpF[STRLEN];
+ for (int i = 0; i < num_threads; i++) {
+ paramsArray[i].no = i;
+ sprintf(inpF, "%s%d", cvsF, i);
+ paramsArray[i].fi = fopen(inpF, "r");
+ paramsArray[i].engine = engineFactory::new_engine();
+ paramsArray[i].mw = model.getMW();
+ }
+
+ /* set thread attribute to be joinable */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ for (int i = 0; i < num_threads; i++) {
+ rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(¶msArray[i]));
+ pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
+ }
+ for (int i = 0; i < num_threads; i++) {
+ rc = pthread_join(threads[i], NULL);
+ pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
+ }
+
+ /* destroy attribute */
+ pthread_attr_destroy(&attr);
+ delete[] threads;
+
+ for (int i = 0; i < num_threads; i++) {
+ fclose(paramsArray[i].fi);
+ delete paramsArray[i].engine;
+ }
+ delete[] paramsArray;
+
+ delete buffer; // Must delete here, force the content left in the buffer be written into the disk
if (verbose) { printf("Sampling is finished!\n"); }
}
} while (p <= threshold);
}
-void generateResults(char* imdName) {
- float *itsamples, *gtsamples;
+void* calcCI_batch(void* arg) {
+ float *itsamples, *gtsamples, *ifsamples, *gfsamples;
ifstream fin;
- FILE *fo;
- char outF[STRLEN];
-
- iso_tau = new CIType[M + 1];
- gene_tau = new CIType[m];
+ CIParams *ciParams = (CIParams*)arg;
itsamples = new float[nSamples];
gtsamples = new float[nSamples];
+ ifsamples = new float[nSamples];
+ gfsamples = new float[nSamples];
fin.open(tmpF, ios::binary);
+ // minus 1 here for that theta0 is not written!
+ streampos pos = streampos(gi.spAt(ciParams->start_gene_id) - 1) * nSamples * FLOATSIZE;
+ fin.seekg(pos, ios::beg);
- //read sampled theta0 values
- for (int k = 0; k < nSamples; k++) fin.read((char*)(&itsamples[k]), FLOATSIZE);
-
- for (int i = 0; i < m; i++) {
+ int cnt = 0;
+ for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) {
int b = gi.spAt(i), e = gi.spAt(i + 1);
memset(gtsamples, 0, FLOATSIZE * nSamples);
+ memset(gfsamples, 0, FLOATSIZE * nSamples);
for (int j = b; j < e; j++) {
for (int k = 0; k < nSamples; k++) {
fin.read((char*)(&itsamples[k]), FLOATSIZE);
- if (eel[j] > EPSILON && tau_denoms[k] > EPSILON) { itsamples[k] = itsamples[k] / eel[j] / tau_denoms[k]; }
- else {
- if (itsamples[k] != 0.0) { fprintf(stderr, "Not equal to 0! K=%d, Sampled Theta Value=%lf\n", k, itsamples[k]); exit(-1); }
- itsamples[k] = 0.0;
- }
gtsamples[k] += itsamples[k];
+ ifsamples[k] = 1e3 / l_bars[k] * itsamples[k];
+ gfsamples[k] += ifsamples[k];
}
- calcCI(nSamples, itsamples, iso_tau[j].lb, iso_tau[j].ub);
+ calcCI(nSamples, itsamples, iso_tpm[j].lb, iso_tpm[j].ub);
+ calcCI(nSamples, ifsamples, iso_fpkm[j].lb, iso_fpkm[j].ub);
+ }
+
+ if (e - b > 1) {
+ calcCI(nSamples, gtsamples, gene_tpm[i].lb, gene_tpm[i].ub);
+ calcCI(nSamples, gfsamples, gene_fpkm[i].lb, gene_fpkm[i].ub);
+ }
+ else {
+ gene_tpm[i].lb = iso_tpm[b].lb; gene_tpm[i].ub = iso_tpm[b].ub;
+ gene_fpkm[i].lb = iso_fpkm[b].lb; gene_fpkm[i].ub = iso_fpkm[b].ub;
}
- calcCI(nSamples, gtsamples, gene_tau[i].lb, gene_tau[i].ub);
- if (verbose && (i + 1) % 1000 == 0) { printf("%d genes are done!\n", i + 1); }
+ ++cnt;
+ if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); }
}
fin.close();
+ delete[] itsamples;
+ delete[] gtsamples;
+
+ return NULL;
+}
+
+void calculate_credibility_intervals(char* imdName) {
+ FILE *fo;
+ char outF[STRLEN];
+ int num_threads = nThreads;
+
+ iso_tpm = new CIType[M + 1];
+ gene_tpm = new CIType[m];
+ iso_fpkm = new CIType[M + 1];
+ gene_fpkm = new CIType[m];
+
+ assert(M > 0);
+ int quotient = M / num_threads;
+ if (quotient < 1) { num_threads = M; quotient = 1; }
+ int cur_gene_id = 0;
+ int num_isoforms = 0;
+
+ // A just so so strategy for paralleling
+ ciParamsArray = new CIParams[num_threads];
+ for (int i = 0; i < num_threads; i++) {
+ ciParamsArray[i].no = i;
+ ciParamsArray[i].start_gene_id = cur_gene_id;
+ num_isoforms = 0;
+
+ while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) {
+ num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id);
+ ++cur_gene_id;
+ }
+
+ ciParamsArray[i].end_gene_id = cur_gene_id;
+ }
+
+ threads = new pthread_t[num_threads];
+
+ /* set thread attribute to be joinable */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ // paralleling
+ for (int i = 0; i < num_threads; i++) {
+ rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i]));
+ pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
+ }
+ for (int i = 0; i < num_threads; i++) {
+ rc = pthread_join(threads[i], NULL);
+ pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
+ }
+
+ // releasing resources
+
+ /* destroy attribute */
+ pthread_attr_destroy(&attr);
+ delete[] threads;
+
+ delete[] ciParamsArray;
+
//isoform level results
sprintf(outF, "%s.iso_res", imdName);
fo = fopen(outF, "a");
for (int i = 1; i <= M; i++)
- fprintf(fo, "%.6g%c", iso_tau[i].lb, (i < M ? '\t' : '\n'));
+ fprintf(fo, "%.6g%c", iso_tpm[i].lb, (i < M ? '\t' : '\n'));
+ for (int i = 1; i <= M; i++)
+ fprintf(fo, "%.6g%c", iso_tpm[i].ub, (i < M ? '\t' : '\n'));
for (int i = 1; i <= M; i++)
- fprintf(fo, "%.6g%c", iso_tau[i].ub, (i < M ? '\t' : '\n'));
+ fprintf(fo, "%.6g%c", iso_fpkm[i].lb, (i < M ? '\t' : '\n'));
+ for (int i = 1; i <= M; i++)
+ fprintf(fo, "%.6g%c", iso_fpkm[i].ub, (i < M ? '\t' : '\n'));
fclose(fo);
//gene level results
sprintf(outF, "%s.gene_res", imdName);
fo = fopen(outF, "a");
for (int i = 0; i < m; i++)
- fprintf(fo, "%.6g%c", gene_tau[i].lb, (i < m - 1 ? '\t' : '\n'));
+ fprintf(fo, "%.6g%c", gene_tpm[i].lb, (i < m - 1 ? '\t' : '\n'));
+ for (int i = 0; i < m; i++)
+ fprintf(fo, "%.6g%c", gene_tpm[i].ub, (i < m - 1 ? '\t' : '\n'));
for (int i = 0; i < m; i++)
- fprintf(fo, "%.6g%c", gene_tau[i].ub, (i < m - 1 ? '\t' : '\n'));
+ fprintf(fo, "%.6g%c", gene_fpkm[i].lb, (i < m - 1 ? '\t' : '\n'));
+ for (int i = 0; i < m; i++)
+ fprintf(fo, "%.6g%c", gene_fpkm[i].ub, (i < m - 1 ? '\t' : '\n'));
fclose(fo);
- delete[] itsamples;
- delete[] gtsamples;
-
- delete[] iso_tau;
- delete[] gene_tau;
+ delete[] iso_tpm;
+ delete[] gene_tpm;
+ delete[] iso_fpkm;
+ delete[] gene_fpkm;
if (verbose) { printf("All credibility intervals are calculated!\n"); }
-
- sprintf(outF, "%s.tau_denoms", imdName);
- fo = fopen(outF, "w");
- fprintf(fo, "%d\n", nSamples);
- for (int i = 0; i < nSamples; i++) fprintf(fo, "%.15g ", tau_denoms[i]);
- fprintf(fo, "\n");
- fclose(fo);
}
int main(int argc, char* argv[]) {
- if (argc < 7) {
- printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nSpC nMB[-q]\n");
+ if (argc < 8) {
+ printf("Usage: rsem-calculate-credibility-intervals reference_name imdName statName confidence nCV nSpC nMB [-p #Threads] [-q]\n");
exit(-1);
}
+ strcpy(imdName, argv[2]);
+ strcpy(statName, argv[3]);
+
confidence = atof(argv[4]);
- nSpC = atoi(argv[5]);
- nMB = atoi(argv[6]);
+ nCV = atoi(argv[5]);
+ nSpC = atoi(argv[6]);
+ nMB = atoi(argv[7]);
+ nThreads = 1;
quiet = false;
- if (argc > 7 && !strcmp(argv[7], "-q")) {
- quiet = true;
+ for (int i = 8; i < argc; i++) {
+ if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
+ if (!strcmp(argv[i], "-q")) quiet = true;
}
verbose = !quiet;
- sprintf(imdName, "%s.temp/%s", argv[2], argv[3]);
- sprintf(statName, "%s.stat/%s", argv[2], argv[3]);
-
- sprintf(modelF, "%s.model", statName);
- FILE *fi = fopen(modelF, "r");
- if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); }
- fscanf(fi, "%d", &model_type);
- fclose(fi);
-
sprintf(refF, "%s.seq", argv[1]);
refs.loadRefs(refF, 1);
M = refs.getM();
gi.load(groupF);
m = gi.getm();
+ nSamples = nCV * nSpC;
+ assert(nSamples > 0 && M > 0); // for Buffter.h: (bufsize_type)nSamples
+ l_bars = new float[nSamples];
+
sprintf(tmpF, "%s.tmp", imdName);
sprintf(cvsF, "%s.countvectors", imdName);
+ sprintf(modelF, "%s.model", statName);
+ FILE *fi = fopen(modelF, "r");
+ general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!");
+ assert(fscanf(fi, "%d", &model_type) == 1);
+ fclose(fi);
+
+ // Phase I
switch(model_type) {
- case 0 : sampling<SingleModel>(); break;
- case 1 : sampling<SingleQModel>(); break;
- case 2 : sampling<PairedEndModel>(); break;
- case 3 : sampling<PairedEndQModel>(); break;
+ case 0 : sample_theta_vectors_from_count_vectors<SingleModel>(); break;
+ case 1 : sample_theta_vectors_from_count_vectors<SingleQModel>(); break;
+ case 2 : sample_theta_vectors_from_count_vectors<PairedEndModel>(); break;
+ case 3 : sample_theta_vectors_from_count_vectors<PairedEndQModel>(); break;
}
- generateResults(imdName);
-
- delete[] tau_denoms;
+ // Phase II
+ calculate_credibility_intervals(imdName);
+ delete l_bars;
/*
sprintf(command, "rm -f %s", tmpF);
int status = system(command);