X-Git-Url: https://git.donarmstrong.com/?p=rsem.git;a=blobdiff_plain;f=calcCI.cpp;h=89414f9b02f00ba641b8f987845aea8e977ef1b1;hp=c9ccce5909a2be59baabe8b3345232256e8a2da4;hb=757263aa0ff5367f826468db30d983282e51e7f0;hpb=58f823f5be6dfbe00896fc44ac3aac5e881e9c5c diff --git a/calcCI.cpp b/calcCI.cpp index c9ccce5..89414f9 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -5,10 +5,12 @@ #include #include #include - -#include "boost/random.hpp" +#include +#include #include "utils.h" +#include "my_assert.h" +#include "sampling.h" #include "Model.h" #include "SingleModel.h" @@ -19,14 +21,15 @@ #include "Refs.h" #include "GroupInfo.h" +#include "Buffer.h" using namespace std; -typedef unsigned long bufsize_type; -typedef boost::mt19937 engine_type; -typedef boost::gamma_distribution<> distribution_type; -typedef boost::variate_generator generator_type; - -const int FLOATSIZE = sizeof(float); +struct Params { + int no; + FILE *fi; + engine_type *engine; + double *mw; +}; struct CIType { float lb, ub; // the interval is [lb, ub] @@ -34,28 +37,23 @@ struct CIType { CIType() { lb = ub = 0.0; } }; -bool quiet; +struct CIParams { + int no; + int start_gene_id, end_gene_id; +}; + int model_type; +int nMB; double confidence; +int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC +int nThreads; +int cvlen; -int nC, cvlen, nSpC, nSamples; // nSpC : number of sample theta vectors per count vector -int fr, to; // each flush, sample fr .. to - 1 - -int nMB; -bufsize_type size; -float *buffer; char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN]; -ofstream ftmpOut; -int *cvec; -double *theta; CIType *iso_tau, *gene_tau; -engine_type engine(time(NULL)); -distribution_type **gammas; -generator_type **rgs; - int M, m; Refs refs; GroupInfo gi; @@ -63,160 +61,180 @@ char imdName[STRLEN], statName[STRLEN]; char modelF[STRLEN], groupF[STRLEN], refF[STRLEN]; vector eel; //expected effective lengths -double *tau_denoms; // denominators for tau values -template -void calcExpectedEffectiveLengths(ModelType& model) { - int lb, ub, span; - double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i) - - model.getGLD().copyTo(pdf, cdf, lb, ub, span); - clen = new double[span + 1]; - clen[0] = 0.0; - for (int i = 1; i <= span; i++) { - clen[i] = clen[i - 1] + pdf[i] * (lb + i); - } - - eel.clear(); - eel.resize(M + 1, 0.0); - for (int i = 1; i <= M; i++) { - int totLen = refs.getRef(i).getTotLen(); - int fullLen = refs.getRef(i).getFullLen(); - int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); - int pos2 = max(min(totLen, ub) - lb, 0); - - if (pos2 == 0) { eel[i] = 0.0; continue; } - - eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); - assert(eel[i] >= 0); - if (eel[i] < MINEEL) { eel[i] = 0.0; } - } - - delete[] pdf; - delete[] cdf; - delete[] clen; -} +Buffer *buffer; -void flushToTempFile() { - int gap1 = fr * FLOATSIZE; - int gap2 = (nSamples - to) * FLOATSIZE; - float *p = NULL; - - ftmpOut.seekp(0, ios_base::beg); - for (int i = 0; i < cvlen; i++) { - p = buffer + i; - ftmpOut.seekp(gap1, ios_base::cur); - for (int j = fr; j < to; j++) { - ftmpOut.write((char*)p, FLOATSIZE); - p += cvlen; - } - ftmpOut.seekp(gap2, ios_base::cur); - } -} +bool quiet; -template -void sampling() { - float *p, *ub; - ifstream fin(cvsF); - ModelType model; +Params *paramsArray; +pthread_t *threads; +pthread_attr_t attr; +void *status; +int rc; - model.read(modelF); - calcExpectedEffectiveLengths(model); +CIParams *ciParamsArray; - ftmpOut.open(tmpF, ios::binary); +template +void calcExpectedEffectiveLengths(ModelType& model) { + int lb, ub, span; + double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = \sigma_{j=1}^{i}pdf[i]*(lb+i) + + model.getGLD().copyTo(pdf, cdf, lb, ub, span); + clen = new double[span + 1]; + clen[0] = 0.0; + for (int i = 1; i <= span; i++) { + clen[i] = clen[i - 1] + pdf[i] * (lb + i); + } - fin>>nC>>cvlen; - assert(cvlen = M + 1); + eel.assign(M + 1, 0.0); + for (int i = 1; i <= M; i++) { + int totLen = refs.getRef(i).getTotLen(); + int fullLen = refs.getRef(i).getFullLen(); + int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); + int pos2 = max(min(totLen, ub) - lb, 0); - nSamples = nC * nSpC; + if (pos2 == 0) { eel[i] = 0.0; continue; } + + eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); + assert(eel[i] >= 0); + if (eel[i] < MINEEL) { eel[i] = 0.0; } + } + + delete[] pdf; + delete[] cdf; + delete[] clen; +} - fr = to = 0; +void* sample_theta_from_c(void* arg) { - size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; - if (size > (bufsize_type)nSamples) size = nSamples; - size *= cvlen; - buffer = new float[size]; + int *cvec; + double *theta; + gamma_dist **gammas; + gamma_generator **rgs; - ub = buffer + size; - p = buffer; + Params *params = (Params*)arg; + FILE *fi = params->fi; + double *mw = params->mw; cvec = new int[cvlen]; theta = new double[cvlen]; - gammas = new distribution_type*[cvlen]; - rgs = new generator_type*[cvlen]; + gammas = new gamma_dist*[cvlen]; + rgs = new gamma_generator*[cvlen]; - tau_denoms = new double[nSamples]; - memset(tau_denoms, 0, sizeof(double) * nSamples); + float **vecs = new float*[nSpC]; + for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen]; - double *mw = model.getMW(); - for (int i = 0; i < nC; i++) { - for (int j = 0; j < cvlen; j++) { - fin>>cvec[j]; - } + int cnt = 0; + while (fscanf(fi, "%d", &cvec[0]) == 1) { + for (int j = 1; j < cvlen; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1); + + ++cnt; for (int j = 0; j < cvlen; j++) { - gammas[j] = new distribution_type(cvec[j]); // need change back before publishing - rgs[j] = new generator_type(engine, *gammas[j]); + gammas[j] = new gamma_dist(cvec[j]); + rgs[j] = new gamma_generator(*(params->engine), *gammas[j]); } - for (int j = 0; j < nSpC; j++) { + for (int i = 0; i < nSpC; i++) { double sum = 0.0; - for (int k = 0; k < cvlen; k++) { - theta[k] = (k == 0 || eel[k] > EPSILON ? (*rgs[k])() : 0.0); - sum += theta[k]; + for (int j = 0; j < cvlen; j++) { + theta[j] = ((j == 0 || eel[j] >= EPSILON) ? (*rgs[j])() : 0.0); + sum += theta[j]; } - assert(sum > 0.0); - for (int k = 0; k < cvlen; k++) theta[k] /= sum; + assert(sum >= EPSILON); + for (int j = 0; j < cvlen; j++) theta[j] /= sum; sum = 0.0; - for (int k = 0; k < cvlen; k++) { - theta[k] = (mw[k] < EPSILON ? 0.0 : theta[k] / mw[k]); - sum += theta[k]; + for (int j = 0; j < cvlen; j++) { + theta[j] = (mw[j] < EPSILON ? 0.0 : theta[j] / mw[j]); + sum += theta[j]; } assert(sum >= EPSILON); - for (int k = 0; k < cvlen; k++) theta[k] /= sum; - - *p = (float)theta[0]; ++p; - assert(1.0 - theta[0] > 0.0); - for (int k = 1; k < cvlen; k++) { - if (eel[k] > EPSILON) { - theta[k] /= (1.0 - theta[0]); - tau_denoms[to] += theta[k] / eel[k]; - } - else { - if (theta[k] != 0.0) { fprintf(stderr, "K=%d Theta_K=%lf\n", k, theta[k]); exit(-1); } + for (int j = 0; j < cvlen; j++) theta[j] /= sum; + + + sum = 0.0; + vecs[i][0] = theta[0]; + for (int j = 1; j < cvlen; j++) + if (eel[j] >= EPSILON) { + vecs[i][j] = theta[j] / eel[j]; + sum += vecs[i][j]; } + else assert(theta[j] < EPSILON); - *p = (float)theta[k]; - ++p; - } - ++to; - if (p == ub) { - flushToTempFile(); - p = buffer; - fr = to; - if (verbose) { printf("%d vectors are sampled!\n", to); } - } + assert(sum >= EPSILON); + for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum; } + buffer->write(nSpC, vecs); + for (int j = 0; j < cvlen; j++) { delete gammas[j]; delete rgs[j]; } - } - - if (fr != to) { flushToTempFile(); } - fin.close(); - ftmpOut.close(); - - delete[] buffer; + if (verbose && cnt % 100 == 0) { printf("Thread %d, %d count vectors are processed!\n", params->no, cnt); } + } delete[] cvec; delete[] theta; delete[] gammas; delete[] rgs; + for (int i = 0; i < nSpC; i++) delete[] vecs[i]; + delete[] vecs; + + return NULL; +} + +template +void sample_theta_vectors_from_count_vectors() { + ModelType model; + model.read(modelF); + calcExpectedEffectiveLengths(model); + + + int num_threads = min(nThreads, nCV); + + buffer = new Buffer(nMB, nSamples, cvlen, tmpF); + + paramsArray = new Params[num_threads]; + threads = new pthread_t[num_threads]; + + char inpF[STRLEN]; + for (int i = 0; i < num_threads; i++) { + paramsArray[i].no = i; + sprintf(inpF, "%s%d", cvsF, i); + paramsArray[i].fi = fopen(inpF, "r"); + paramsArray[i].engine = engineFactory::new_engine(); + paramsArray[i].mw = model.getMW(); + } + + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + for (int i = 0; i < num_threads; i++) { + rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(¶msArray[i])); + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); + } + for (int i = 0; i < num_threads; i++) { + rc = pthread_join(threads[i], &status); + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!"); + } + + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + for (int i = 0; i < num_threads; i++) { + fclose(paramsArray[i].fi); + delete paramsArray[i].engine; + } + delete[] paramsArray; + + delete buffer; // Must delete here, force the content left in the buffer be written into the disk + if (verbose) { printf("Sampling is finished!\n"); } } @@ -263,45 +281,96 @@ void calcCI(int nSamples, float *samples, float &lb, float &ub) { } while (p <= threshold); } -void generateResults(char* imdName) { +void* calcCI_batch(void* arg) { float *itsamples, *gtsamples; ifstream fin; - FILE *fo; - char outF[STRLEN]; - - iso_tau = new CIType[M + 1]; - gene_tau = new CIType[m]; + CIParams *ciParams = (CIParams*)arg; itsamples = new float[nSamples]; gtsamples = new float[nSamples]; fin.open(tmpF, ios::binary); + streampos pos = streampos(gi.spAt(ciParams->start_gene_id)) * nSamples * FLOATSIZE; + fin.seekg(pos, ios::beg); - //read sampled theta0 values - for (int k = 0; k < nSamples; k++) fin.read((char*)(&itsamples[k]), FLOATSIZE); - - for (int i = 0; i < m; i++) { + int cnt = 0; + for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) { int b = gi.spAt(i), e = gi.spAt(i + 1); memset(gtsamples, 0, FLOATSIZE * nSamples); for (int j = b; j < e; j++) { for (int k = 0; k < nSamples; k++) { fin.read((char*)(&itsamples[k]), FLOATSIZE); - if (eel[j] > EPSILON && tau_denoms[k] > EPSILON) { itsamples[k] = itsamples[k] / eel[j] / tau_denoms[k]; } - else { - if (itsamples[k] != 0.0) { fprintf(stderr, "Not equal to 0! K=%d, Sampled Theta Value=%lf\n", k, itsamples[k]); exit(-1); } - itsamples[k] = 0.0; - } gtsamples[k] += itsamples[k]; } calcCI(nSamples, itsamples, iso_tau[j].lb, iso_tau[j].ub); } calcCI(nSamples, gtsamples, gene_tau[i].lb, gene_tau[i].ub); - if (verbose && (i + 1) % 1000 == 0) { printf("%d genes are done!\n", i + 1); } + ++cnt; + if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); } } fin.close(); + delete[] itsamples; + delete[] gtsamples; + + return NULL; +} + +void calculate_credibility_intervals(char* imdName) { + FILE *fo; + char outF[STRLEN]; + int num_threads = nThreads; + + iso_tau = new CIType[M + 1]; + gene_tau = new CIType[m]; + + assert(M > 0); + int quotient = M / num_threads; + if (quotient < 1) { num_threads = M; quotient = 1; } + int cur_gene_id = 0; + int num_isoforms = 0; + + // A just so so strategy for paralleling + ciParamsArray = new CIParams[num_threads]; + for (int i = 0; i < num_threads; i++) { + ciParamsArray[i].no = i; + ciParamsArray[i].start_gene_id = cur_gene_id; + num_isoforms = 0; + + while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) { + num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id); + ++cur_gene_id; + } + + ciParamsArray[i].end_gene_id = cur_gene_id; + } + + threads = new pthread_t[num_threads]; + + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + // paralleling + for (int i = 0; i < num_threads; i++) { + rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i])); + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); + } + for (int i = 0; i < num_threads; i++) { + rc = pthread_join(threads[i], &status); + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); + } + + // releasing resources + + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + delete[] ciParamsArray; + //isoform level results sprintf(outF, "%s.iso_res", imdName); fo = fopen(outF, "a"); @@ -320,47 +389,31 @@ void generateResults(char* imdName) { fprintf(fo, "%.6g%c", gene_tau[i].ub, (i < m - 1 ? '\t' : '\n')); fclose(fo); - delete[] itsamples; - delete[] gtsamples; - delete[] iso_tau; delete[] gene_tau; if (verbose) { printf("All credibility intervals are calculated!\n"); } - - sprintf(outF, "%s.tau_denoms", imdName); - fo = fopen(outF, "w"); - fprintf(fo, "%d\n", nSamples); - for (int i = 0; i < nSamples; i++) fprintf(fo, "%.15g ", tau_denoms[i]); - fprintf(fo, "\n"); - fclose(fo); } int main(int argc, char* argv[]) { - if (argc < 7) { - printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nSpC nMB[-q]\n"); + if (argc < 8) { + printf("Usage: rsem-calculate-credibility-intervals reference_name sample_name sampleToken confidence nCV nSpC nMB [-p #Threads] [-q]\n"); exit(-1); } confidence = atof(argv[4]); - nSpC = atoi(argv[5]); - nMB = atoi(argv[6]); + nCV = atoi(argv[5]); + nSpC = atoi(argv[6]); + nMB = atoi(argv[7]); + nThreads = 1; quiet = false; - if (argc > 7 && !strcmp(argv[7], "-q")) { - quiet = true; + for (int i = 8; i < argc; i++) { + if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]); + if (!strcmp(argv[i], "-q")) quiet = true; } verbose = !quiet; - sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); - sprintf(statName, "%s.stat/%s", argv[2], argv[3]); - - sprintf(modelF, "%s.model", statName); - FILE *fi = fopen(modelF, "r"); - if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); } - fscanf(fi, "%d", &model_type); - fclose(fi); - sprintf(refF, "%s.seq", argv[1]); refs.loadRefs(refF, 1); M = refs.getM(); @@ -368,26 +421,40 @@ int main(int argc, char* argv[]) { gi.load(groupF); m = gi.getm(); + nSamples = nCV * nSpC; + cvlen = M + 1; + assert(nSamples > 0 && cvlen > 1); // for Buffter.h: (bufsize_type)nSamples + + sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); + sprintf(statName, "%s.stat/%s", argv[2], argv[3]); sprintf(tmpF, "%s.tmp", imdName); sprintf(cvsF, "%s.countvectors", imdName); + sprintf(modelF, "%s.model", statName); + FILE *fi = fopen(modelF, "r"); + general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!"); + assert(fscanf(fi, "%d", &model_type) == 1); + fclose(fi); + + // Phase I switch(model_type) { - case 0 : sampling(); break; - case 1 : sampling(); break; - case 2 : sampling(); break; - case 3 : sampling(); break; + case 0 : sample_theta_vectors_from_count_vectors(); break; + case 1 : sample_theta_vectors_from_count_vectors(); break; + case 2 : sample_theta_vectors_from_count_vectors(); break; + case 3 : sample_theta_vectors_from_count_vectors(); break; } - generateResults(imdName); - - delete[] tau_denoms; + // Phase II + calculate_credibility_intervals(imdName); + /* sprintf(command, "rm -f %s", tmpF); int status = system(command); if (status != 0) { fprintf(stderr, "Cannot delete %s!\n", tmpF); exit(-1); } + */ return 0; }