X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=calcCI.cpp;h=45c76838c150490240e24a8c215cb4d05069d203;hb=52f1bd6f44f9b2630b839f192fb9ece18581983b;hp=3ae71157c58a023bb4cd7fe316e26c7b9e572663;hpb=4a5e5138d3fc409e7ade5c14de7689612290f74f;p=rsem.git diff --git a/calcCI.cpp b/calcCI.cpp index 3ae7115..45c7683 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -22,15 +22,14 @@ #include "GroupInfo.h" #include "Buffer.h" + using namespace std; struct Params { int no; FILE *fi; - bufsize_type size; - int sp; engine_type *engine; - double *mw; + const double *mw; }; struct CIType { @@ -50,11 +49,12 @@ int nMB; double confidence; int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC int nThreads; -int cvlen; + +float *l_bars; char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN]; -CIType *iso_tau, *gene_tau; +CIType *iso_tpm, *gene_tpm, *iso_fpkm, *gene_fpkm; int M, m; Refs refs; @@ -71,7 +71,6 @@ bool quiet; Params *paramsArray; pthread_t *threads; pthread_attr_t attr; -void *status; int rc; CIParams *ciParamsArray; @@ -108,68 +107,58 @@ void calcExpectedEffectiveLengths(ModelType& model) { } void* sample_theta_from_c(void* arg) { - int *cvec; double *theta; + float *tpm; gamma_dist **gammas; gamma_generator **rgs; Params *params = (Params*)arg; FILE *fi = params->fi; - double *mw = params->mw; - Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF); + const double *mw = params->mw; - cvec = new int[cvlen]; - theta = new double[cvlen]; - gammas = new gamma_dist*[cvlen]; - rgs = new gamma_generator*[cvlen]; - - float *vec = new float[cvlen]; + cvec = new int[M + 1]; + theta = new double[M + 1]; + gammas = new gamma_dist*[M + 1]; + rgs = new gamma_generator*[M + 1]; + tpm = new float[M + 1]; + float l_bar; // the mean transcript length over the sample int cnt = 0; while (fscanf(fi, "%d", &cvec[0]) == 1) { - for (int j = 1; j < cvlen; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1); + for (int j = 1; j <= M; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1); ++cnt; - for (int j = 0; j < cvlen; j++) { + for (int j = 0; j <= M; j++) { gammas[j] = new gamma_dist(cvec[j]); rgs[j] = new gamma_generator(*(params->engine), *gammas[j]); } for (int i = 0; i < nSpC; i++) { double sum = 0.0; - for (int j = 0; j < cvlen; j++) { - theta[j] = ((j == 0 || eel[j] >= EPSILON) ? (*rgs[j])() : 0.0); + for (int j = 0; j <= M; j++) { + theta[j] = ((j == 0 || (eel[j] >= EPSILON && mw[j] >= EPSILON)) ? (*rgs[j])() / mw[j] : 0.0); sum += theta[j]; } assert(sum >= EPSILON); - for (int j = 0; j < cvlen; j++) theta[j] /= sum; + for (int j = 0; j <= M; j++) theta[j] /= sum; sum = 0.0; - for (int j = 0; j < cvlen; j++) { - theta[j] = (mw[j] < EPSILON ? 0.0 : theta[j] / mw[j]); - sum += theta[j]; - } - assert(sum >= EPSILON); - for (int j = 0; j < cvlen; j++) theta[j] /= sum; - - - sum = 0.0; - vec[0] = theta[0]; - for (int j = 1; j < cvlen; j++) + tpm[0] = 0.0; + for (int j = 1; j <= M; j++) if (eel[j] >= EPSILON) { - vec[j] = theta[j] / eel[j]; - sum += vec[j]; + tpm[j] = theta[j] / eel[j]; + sum += tpm[j]; } else assert(theta[j] < EPSILON); assert(sum >= EPSILON); - for (int j = 1; j < cvlen; j++) vec[j] /= sum; - - buffer.write(vec); + l_bar = 0.0; // store mean effective length of the sample + for (int j = 1; j <= M; j++) { tpm[j] /= sum; l_bar += tpm[j] * eel[j]; tpm[j] *= 1e6; } + buffer->write(l_bar, tpm + 1); // ommit the first element in tpm } - for (int j = 0; j < cvlen; j++) { + for (int j = 0; j <= M; j++) { delete gammas[j]; delete rgs[j]; } @@ -181,8 +170,7 @@ void* sample_theta_from_c(void* arg) { delete[] theta; delete[] gammas; delete[] rgs; - - delete[] vec; + delete[] tpm; return NULL; } @@ -193,20 +181,9 @@ void sample_theta_vectors_from_count_vectors() { model.read(modelF); calcExpectedEffectiveLengths(model); - char splitF[STRLEN]; - bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; - bufsize_type quotient, left; - int sum = 0; - - sprintf(splitF, "%s.split", imdName); - FILE *fi = fopen(splitF, "r"); - int num_threads; - assert(fscanf(fi, "%d", &num_threads) == 1); - assert(num_threads <= nThreads); + int num_threads = min(nThreads, nCV); - quotient = buf_maxcv / num_threads; - assert(quotient > 0); - left = buf_maxcv % num_threads; + buffer = new Buffer(nMB, nSamples, M, l_bars, tmpF); paramsArray = new Params[num_threads]; threads = new pthread_t[num_threads]; @@ -216,25 +193,10 @@ void sample_theta_vectors_from_count_vectors() { paramsArray[i].no = i; sprintf(inpF, "%s%d", cvsF, i); paramsArray[i].fi = fopen(inpF, "r"); - - int num_samples; - assert(fscanf(fi, "%d", &num_samples) == 1); - num_samples *= nSpC; - if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples; - else { - paramsArray[i].size = quotient; - if (left > 0) { ++paramsArray[i].size; --left; } - } - paramsArray[i].size *= cvlen * FLOATSIZE; - paramsArray[i].sp = sum; - sum += num_samples; - paramsArray[i].engine = engineFactory::new_engine(); paramsArray[i].mw = model.getMW(); } - fclose(fi); - /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); @@ -244,7 +206,7 @@ void sample_theta_vectors_from_count_vectors() { pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); } for (int i = 0; i < num_threads; i++) { - rc = pthread_join(threads[i], &status); + rc = pthread_join(threads[i], NULL); pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); } @@ -258,6 +220,8 @@ void sample_theta_vectors_from_count_vectors() { } delete[] paramsArray; + delete buffer; // Must delete here, force the content left in the buffer be written into the disk + if (verbose) { printf("Sampling is finished!\n"); } } @@ -305,29 +269,44 @@ void calcCI(int nSamples, float *samples, float &lb, float &ub) { } void* calcCI_batch(void* arg) { - float *itsamples, *gtsamples; + float *itsamples, *gtsamples, *ifsamples, *gfsamples; ifstream fin; CIParams *ciParams = (CIParams*)arg; itsamples = new float[nSamples]; gtsamples = new float[nSamples]; + ifsamples = new float[nSamples]; + gfsamples = new float[nSamples]; fin.open(tmpF, ios::binary); - streampos pos = streampos(gi.spAt(ciParams->start_gene_id)) * nSamples * FLOATSIZE; + // minus 1 here for that theta0 is not written! + streampos pos = streampos(gi.spAt(ciParams->start_gene_id) - 1) * nSamples * FLOATSIZE; fin.seekg(pos, ios::beg); int cnt = 0; for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) { int b = gi.spAt(i), e = gi.spAt(i + 1); memset(gtsamples, 0, FLOATSIZE * nSamples); + memset(gfsamples, 0, FLOATSIZE * nSamples); for (int j = b; j < e; j++) { for (int k = 0; k < nSamples; k++) { fin.read((char*)(&itsamples[k]), FLOATSIZE); gtsamples[k] += itsamples[k]; + ifsamples[k] = 1e3 / l_bars[k] * itsamples[k]; + gfsamples[k] += ifsamples[k]; } - calcCI(nSamples, itsamples, iso_tau[j].lb, iso_tau[j].ub); + calcCI(nSamples, itsamples, iso_tpm[j].lb, iso_tpm[j].ub); + calcCI(nSamples, ifsamples, iso_fpkm[j].lb, iso_fpkm[j].ub); + } + + if (e - b > 1) { + calcCI(nSamples, gtsamples, gene_tpm[i].lb, gene_tpm[i].ub); + calcCI(nSamples, gfsamples, gene_fpkm[i].lb, gene_fpkm[i].ub); + } + else { + gene_tpm[i].lb = iso_tpm[b].lb; gene_tpm[i].ub = iso_tpm[b].ub; + gene_fpkm[i].lb = iso_fpkm[b].lb; gene_fpkm[i].ub = iso_fpkm[b].ub; } - calcCI(nSamples, gtsamples, gene_tau[i].lb, gene_tau[i].ub); ++cnt; if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); } @@ -344,25 +323,27 @@ void* calcCI_batch(void* arg) { void calculate_credibility_intervals(char* imdName) { FILE *fo; char outF[STRLEN]; + int num_threads = nThreads; - iso_tau = new CIType[M + 1]; - gene_tau = new CIType[m]; + iso_tpm = new CIType[M + 1]; + gene_tpm = new CIType[m]; + iso_fpkm = new CIType[M + 1]; + gene_fpkm = new CIType[m]; - // nThreads must be intact here. assert(M > 0); - int quotient = M / nThreads; - if (quotient < 1) { nThreads = M; quotient = 1; } + int quotient = M / num_threads; + if (quotient < 1) { num_threads = M; quotient = 1; } int cur_gene_id = 0; int num_isoforms = 0; // A just so so strategy for paralleling - ciParamsArray = new CIParams[nThreads]; - for (int i = 0; i < nThreads; i++) { + ciParamsArray = new CIParams[num_threads]; + for (int i = 0; i < num_threads; i++) { ciParamsArray[i].no = i; ciParamsArray[i].start_gene_id = cur_gene_id; num_isoforms = 0; - while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) { + while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) { num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id); ++cur_gene_id; } @@ -370,19 +351,19 @@ void calculate_credibility_intervals(char* imdName) { ciParamsArray[i].end_gene_id = cur_gene_id; } - threads = new pthread_t[nThreads]; + threads = new pthread_t[num_threads]; /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); // paralleling - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i])); pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); } - for (int i = 0; i < nThreads; i++) { - rc = pthread_join(threads[i], &status); + for (int i = 0; i < num_threads; i++) { + rc = pthread_join(threads[i], NULL); pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); } @@ -398,32 +379,45 @@ void calculate_credibility_intervals(char* imdName) { sprintf(outF, "%s.iso_res", imdName); fo = fopen(outF, "a"); for (int i = 1; i <= M; i++) - fprintf(fo, "%.6g%c", iso_tau[i].lb, (i < M ? '\t' : '\n')); + fprintf(fo, "%.6g%c", iso_tpm[i].lb, (i < M ? '\t' : '\n')); + for (int i = 1; i <= M; i++) + fprintf(fo, "%.6g%c", iso_tpm[i].ub, (i < M ? '\t' : '\n')); for (int i = 1; i <= M; i++) - fprintf(fo, "%.6g%c", iso_tau[i].ub, (i < M ? '\t' : '\n')); + fprintf(fo, "%.6g%c", iso_fpkm[i].lb, (i < M ? '\t' : '\n')); + for (int i = 1; i <= M; i++) + fprintf(fo, "%.6g%c", iso_fpkm[i].ub, (i < M ? '\t' : '\n')); fclose(fo); //gene level results sprintf(outF, "%s.gene_res", imdName); fo = fopen(outF, "a"); for (int i = 0; i < m; i++) - fprintf(fo, "%.6g%c", gene_tau[i].lb, (i < m - 1 ? '\t' : '\n')); + fprintf(fo, "%.6g%c", gene_tpm[i].lb, (i < m - 1 ? '\t' : '\n')); + for (int i = 0; i < m; i++) + fprintf(fo, "%.6g%c", gene_tpm[i].ub, (i < m - 1 ? '\t' : '\n')); for (int i = 0; i < m; i++) - fprintf(fo, "%.6g%c", gene_tau[i].ub, (i < m - 1 ? '\t' : '\n')); + fprintf(fo, "%.6g%c", gene_fpkm[i].lb, (i < m - 1 ? '\t' : '\n')); + for (int i = 0; i < m; i++) + fprintf(fo, "%.6g%c", gene_fpkm[i].ub, (i < m - 1 ? '\t' : '\n')); fclose(fo); - delete[] iso_tau; - delete[] gene_tau; + delete[] iso_tpm; + delete[] gene_tpm; + delete[] iso_fpkm; + delete[] gene_fpkm; if (verbose) { printf("All credibility intervals are calculated!\n"); } } int main(int argc, char* argv[]) { if (argc < 8) { - printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nCV nSpC nMB [-p #Threads] [-q]\n"); + printf("Usage: rsem-calculate-credibility-intervals reference_name imdName statName confidence nCV nSpC nMB [-p #Threads] [-q]\n"); exit(-1); } + strcpy(imdName, argv[2]); + strcpy(statName, argv[3]); + confidence = atof(argv[4]); nCV = atoi(argv[5]); nSpC = atoi(argv[6]); @@ -445,11 +439,9 @@ int main(int argc, char* argv[]) { m = gi.getm(); nSamples = nCV * nSpC; - cvlen = M + 1; - assert(nSamples > 0 && cvlen > 1); // for Buffter.h: (bufsize_type)nSamples + assert(nSamples > 0 && M > 0); // for Buffter.h: (bufsize_type)nSamples + l_bars = new float[nSamples]; - sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); - sprintf(statName, "%s.stat/%s", argv[2], argv[3]); sprintf(tmpF, "%s.tmp", imdName); sprintf(cvsF, "%s.countvectors", imdName); @@ -470,6 +462,7 @@ int main(int argc, char* argv[]) { // Phase II calculate_credibility_intervals(imdName); + delete l_bars; /* sprintf(command, "rm -f %s", tmpF); int status = system(command);