X-Git-Url: https://git.donarmstrong.com/?p=rsem.git;a=blobdiff_plain;f=Gibbs.cpp;h=d5e0b9c49634e29c9a4f2b5ed2a5c74026f5642f;hp=979860bda97cf07bd6e2a4f37869fa59186c7e3a;hb=7cd7abcad92a44bbd5d8d1b2bb3a871cd479c0bf;hpb=1c7a81621434a852a7f7e11d61621e50ba1b7f2a diff --git a/Gibbs.cpp b/Gibbs.cpp index 979860b..d5e0b9c 100644 --- a/Gibbs.cpp +++ b/Gibbs.cpp @@ -5,8 +5,10 @@ #include #include #include +#include #include "utils.h" +#include "my_assert.h" #include "sampling.h" #include "Model.h" @@ -16,10 +18,22 @@ #include "PairedEndQModel.h" #include "Refs.h" + #include "GroupInfo.h" +#include "WriteResults.h" using namespace std; +struct Params { + int no, nsamples; + FILE *fo; + engine_type *engine; + double *pme_c, *pve_c; //posterior mean and variance vectors on counts + double *pme_tpm, *pme_fpkm; + + double *pve_c_genes, *pve_c_trans; +}; + struct Item { int sid; double conprb; @@ -30,70 +44,76 @@ struct Item { } }; +int nThreads; + int model_type; -int m, M, N0, N1, nHits; +int M; +READ_INT_TYPE N0, N1; +HIT_INT_TYPE nHits; double totc; -int BURNIN, CHAINLEN, GAP; -char imdName[STRLEN], statName[STRLEN]; -char thetaF[STRLEN], ofgF[STRLEN], groupF[STRLEN], refF[STRLEN], modelF[STRLEN]; +int BURNIN, NSAMPLES, GAP; +char refName[STRLEN], imdName[STRLEN], statName[STRLEN]; +char thetaF[STRLEN], ofgF[STRLEN], refF[STRLEN], modelF[STRLEN]; char cvsF[STRLEN]; Refs refs; -GroupInfo gi; - -vector theta, pme_theta, pme_c, eel; -vector s, z; +vector s; vector hits; -vector counts; + +vector eel; +double *mw; + +vector pme_c, pve_c; //global posterior mean and variance vectors on counts +vector pme_tpm, pme_fpkm; bool quiet; -vector arr; +Params *paramsArray; +pthread_t *threads; +pthread_attr_t attr; +int rc; + +bool hasSeed; +seedType seed; + +int m; +char groupF[STRLEN]; +GroupInfo gi; + +bool alleleS; +int m_trans; +GroupInfo gt, ta; +vector pve_c_genes, pve_c_trans; + +void load_group_info(char* refName) { + // Load group info + sprintf(groupF, "%s.grp", refName); + gi.load(groupF); + m = gi.getm(); + + alleleS = isAlleleSpecific(refName, >, &ta); // if allele-specific + m_trans = (alleleS ? ta.getm() : 0); + + if (verbose) { printf("Loading group information is finished!\n"); } +} -void load_data(char* reference_name, char* statName, char* imdName) { +void load_data(char* refName, char* statName, char* imdName) { ifstream fin; string line; int tmpVal; //load reference file - sprintf(refF, "%s.seq", reference_name); + sprintf(refF, "%s.seq", refName); refs.loadRefs(refF, 1); M = refs.getM(); - //load groupF - sprintf(groupF, "%s.grp", reference_name); - gi.load(groupF); - m = gi.getm(); - - //load thetaF - sprintf(thetaF, "%s.theta",statName); - fin.open(thetaF); - if (!fin.is_open()) { - fprintf(stderr, "Cannot open %s!\n", thetaF); - exit(-1); - } - fin>>tmpVal; - if (tmpVal != M + 1) { - fprintf(stderr, "Number of transcripts is not consistent in %s and %s!\n", refF, thetaF); - exit(-1); - } - theta.clear(); theta.resize(M + 1); - for (int i = 0; i <= M; i++) fin>>theta[i]; - fin.close(); - //load ofgF; sprintf(ofgF, "%s.ofg", imdName); fin.open(ofgF); - if (!fin.is_open()) { - fprintf(stderr, "Cannot open %s!\n", ofgF); - exit(-1); - } + general_assert(fin.is_open(), "Cannot open " + cstrtos(ofgF) + "!"); fin>>tmpVal>>N0; - if (tmpVal != M) { - fprintf(stderr, "M in %s is not consistent with %s!\n", ofgF, refF); - exit(-1); - } + general_assert(tmpVal == M, "M in " + cstrtos(ofgF) + " is not consistent with " + cstrtos(refF) + "!"); getline(fin, line); s.clear(); hits.clear(); @@ -113,237 +133,319 @@ void load_data(char* reference_name, char* statName, char* imdName) { N1 = s.size() - 1; nHits = hits.size(); - if (verbose) { printf("Loading Data is finished!\n"); } + totc = N0 + N1 + (M + 1); + + if (verbose) { printf("Loading data is finished!\n"); } } -void init() { - int len, fr, to; +template +void init_model_related(char* modelF) { + ModelType model; + model.read(modelF); - arr.clear(); - z.clear(); - counts.clear(); + calcExpectedEffectiveLengths(M, refs, model, eel); + memcpy(mw, model.getMW(), sizeof(double) * (M + 1)); // otherwise, after exiting this procedure, mw becomes undefined +} - z.resize(N1); - counts.resize(M + 1, 1); // 1 pseudo count - counts[0] += N0; +// assign threads +void init() { + int quotient, left; + char outF[STRLEN]; - for (int i = 0; i < N1; i++) { - fr = s[i]; to = s[i + 1]; - len = to - fr; - arr.resize(len); - for (int j = fr; j < to; j++) { - arr[j - fr] = theta[hits[j].sid] * hits[j].conprb; - if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative + quotient = NSAMPLES / nThreads; + left = NSAMPLES % nThreads; + + sprintf(cvsF, "%s.countvectors", imdName); + paramsArray = new Params[nThreads]; + threads = new pthread_t[nThreads]; + + hasSeed ? engineFactory::init(seed) : engineFactory::init(); + for (int i = 0; i < nThreads; i++) { + paramsArray[i].no = i; + + paramsArray[i].nsamples = quotient; + if (i < left) paramsArray[i].nsamples++; + + sprintf(outF, "%s%d", cvsF, i); + paramsArray[i].fo = fopen(outF, "w"); + + paramsArray[i].engine = engineFactory::new_engine(); + paramsArray[i].pme_c = new double[M + 1]; + memset(paramsArray[i].pme_c, 0, sizeof(double) * (M + 1)); + paramsArray[i].pve_c = new double[M + 1]; + memset(paramsArray[i].pve_c, 0, sizeof(double) * (M + 1)); + paramsArray[i].pme_tpm = new double[M + 1]; + memset(paramsArray[i].pme_tpm, 0, sizeof(double) * (M + 1)); + paramsArray[i].pme_fpkm = new double[M + 1]; + memset(paramsArray[i].pme_fpkm, 0, sizeof(double) * (M + 1)); + + paramsArray[i].pve_c_genes = new double[m]; + memset(paramsArray[i].pve_c_genes, 0, sizeof(double) * m); + + paramsArray[i].pve_c_trans = NULL; + if (alleleS) { + paramsArray[i].pve_c_trans = new double[m_trans]; + memset(paramsArray[i].pve_c_trans, 0, sizeof(double) * m_trans); } - z[i] = hits[fr + sample(arr, len)].sid; - ++counts[z[i]]; } + engineFactory::finish(); - totc = N0 + N1 + (M + 1); + /* set thread attribute to be joinable */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - if (verbose) { printf("Initialization is finished!\n"); } + if (verbose) { printf("Initialization finished!\n"); } } -void writeCountVector(FILE* fo) { +//sample theta from Dir(1) +void sampleTheta(engine_type& engine, vector& theta) { + gamma_dist gm(1); + gamma_generator gmg(engine, gm); + double denom; + + theta.assign(M + 1, 0); + denom = 0.0; + for (int i = 0; i <= M; i++) { + theta[i] = gmg(); + denom += theta[i]; + } + assert(denom > EPSILON); + for (int i = 0; i <= M; i++) theta[i] /= denom; +} + +void writeCountVector(FILE* fo, vector& counts) { for (int i = 0; i < M; i++) { fprintf(fo, "%d ", counts[i]); } fprintf(fo, "%d\n", counts[M]); } -void Gibbs(char* imdName) { - FILE *fo; - int fr, to, len; +void* Gibbs(void* arg) { + int CHAINLEN; + HIT_INT_TYPE len, fr, to; + Params *params = (Params*)arg; - sprintf(cvsF, "%s.countvectors", imdName); - fo = fopen(cvsF, "w"); - assert(CHAINLEN % GAP == 0); - fprintf(fo, "%d %d\n", CHAINLEN / GAP, M + 1); - //fprintf(fo, "%d %d\n", CHAINLEN, M + 1); + vector theta, tpm, fpkm; + vector z, counts; + vector arr; + + uniform_01_generator rg(*params->engine, uniform_01_dist()); - pme_c.clear(); pme_c.resize(M + 1, 0.0); - pme_theta.clear(); pme_theta.resize(M + 1, 0.0); + // generate initial state + sampleTheta(*params->engine, theta); + + z.assign(N1, 0); + + counts.assign(M + 1, 1); // 1 pseudo count + counts[0] += N0; + + for (READ_INT_TYPE i = 0; i < N1; i++) { + fr = s[i]; to = s[i + 1]; + len = to - fr; + arr.assign(len, 0); + for (HIT_INT_TYPE j = fr; j < to; j++) { + arr[j - fr] = theta[hits[j].sid] * hits[j].conprb; + if (j > fr) arr[j - fr] += arr[j - fr - 1]; // cumulative + } + z[i] = hits[fr + sample(rg, arr, len)].sid; + ++counts[z[i]]; + } + + // Gibbs sampling + CHAINLEN = 1 + (params->nsamples - 1) * GAP; for (int ROUND = 1; ROUND <= BURNIN + CHAINLEN; ROUND++) { - for (int i = 0; i < N1; i++) { + for (READ_INT_TYPE i = 0; i < N1; i++) { --counts[z[i]]; fr = s[i]; to = s[i + 1]; len = to - fr; - arr.resize(len); - for (int j = fr; j < to; j++) { + arr.assign(len, 0); + for (HIT_INT_TYPE j = fr; j < to; j++) { arr[j - fr] = counts[hits[j].sid] * hits[j].conprb; if (j > fr) arr[j - fr] += arr[j - fr - 1]; //cumulative } - z[i] = hits[fr + sample(arr, len)].sid; + z[i] = hits[fr + sample(rg, arr, len)].sid; ++counts[z[i]]; } if (ROUND > BURNIN) { - if ((ROUND - BURNIN - 1) % GAP == 0) writeCountVector(fo); - for (int i = 0; i <= M; i++) { - pme_c[i] += counts[i] - 1; - pme_theta[i] += counts[i] / totc; + if ((ROUND - BURNIN - 1) % GAP == 0) { + writeCountVector(params->fo, counts); + for (int i = 0; i <= M; i++) theta[i] = counts[i] / totc; + polishTheta(M, theta, eel, mw); + calcExpressionValues(M, theta, eel, tpm, fpkm); + for (int i = 0; i <= M; i++) { + params->pme_c[i] += counts[i] - 1; + params->pve_c[i] += double(counts[i] - 1) * (counts[i] - 1); + params->pme_tpm[i] += tpm[i]; + params->pme_fpkm[i] += fpkm[i]; + } + + for (int i = 0; i < m; i++) { + int b = gi.spAt(i), e = gi.spAt(i + 1); + double count = 0.0; + for (int j = b; j < e; j++) count += counts[j] - 1; + params->pve_c_genes[i] += count * count; + } + + if (alleleS) + for (int i = 0; i < m_trans; i++) { + int b = ta.spAt(i), e = ta.spAt(i + 1); + double count = 0.0; + for (int j = b; j < e; j++) count += counts[j] - 1; + params->pve_c_trans[i] += count * count; + } } } - if (verbose) { printf("ROUND %d is finished!\n", ROUND); } - } - fclose(fo); - - for (int i = 0; i <= M; i++) { - pme_c[i] /= CHAINLEN; - pme_theta[i] /= CHAINLEN; + if (verbose && ROUND % 100 == 0) { printf("Thread %d, ROUND %d is finished!\n", params->no, ROUND); } } - if (verbose) { printf("Gibbs is finished!\n"); } + return NULL; } -template -void calcExpectedEffectiveLengths(ModelType& model) { - int lb, ub, span; - double *pdf = NULL, *cdf = NULL, *clen = NULL; // clen[i] = sigma_{j=1}^{i}pdf[i]*(lb+i) - - model.getGLD().copyTo(pdf, cdf, lb, ub, span); - clen = new double[span + 1]; - clen[0] = 0.0; - for (int i = 1; i <= span; i++) { - clen[i] = clen[i - 1] + pdf[i] * (lb + i); - } - - eel.clear(); - eel.resize(M + 1, 0.0); - for (int i = 1; i <= M; i++) { - int totLen = refs.getRef(i).getTotLen(); - int fullLen = refs.getRef(i).getFullLen(); - int pos1 = max(min(totLen - fullLen + 1, ub) - lb, 0); - int pos2 = max(min(totLen, ub) - lb, 0); - - if (pos2 == 0) { eel[i] = 0.0; continue; } - - eel[i] = fullLen * cdf[pos1] + ((cdf[pos2] - cdf[pos1]) * (totLen + 1) - (clen[pos2] - clen[pos1])); - assert(eel[i] >= 0); - if (eel[i] < MINEEL) { eel[i] = 0.0; } - } - - delete[] pdf; - delete[] cdf; - delete[] clen; -} - -template -void writeEstimatedParameters(char* modelF, char* imdName) { - ModelType model; - double denom; - char outF[STRLEN]; - FILE *fo; +void release() { +// char inpF[STRLEN], command[STRLEN]; + string line; - model.read(modelF); + /* destroy attribute */ + pthread_attr_destroy(&attr); + delete[] threads; + + pme_c.assign(M + 1, 0); + pve_c.assign(M + 1, 0); + pme_tpm.assign(M + 1, 0); + pme_fpkm.assign(M + 1, 0); + + pve_c_genes.assign(m, 0); + pve_c_trans.clear(); + if (alleleS) pve_c_trans.assign(m_trans, 0); + + for (int i = 0; i < nThreads; i++) { + fclose(paramsArray[i].fo); + delete paramsArray[i].engine; + for (int j = 0; j <= M; j++) { + pme_c[j] += paramsArray[i].pme_c[j]; + pve_c[j] += paramsArray[i].pve_c[j]; + pme_tpm[j] += paramsArray[i].pme_tpm[j]; + pme_fpkm[j] += paramsArray[i].pme_fpkm[j]; + } - calcExpectedEffectiveLengths(model); + for (int j = 0; j < m; j++) + pve_c_genes[j] += paramsArray[i].pve_c_genes[j]; + + if (alleleS) + for (int j = 0; j < m_trans; j++) + pve_c_trans[j] += paramsArray[i].pve_c_trans[j]; - denom = pme_theta[0]; - for (int i = 1; i <= M; i++) - if (eel[i] < EPSILON) pme_theta[i] = 0.0; - else denom += pme_theta[i]; - if (denom <= 0) { fprintf(stderr, "No Expected Effective Length is no less than %.6g?!\n", MINEEL); exit(-1); } - for (int i = 0; i <= M; i++) pme_theta[i] /= denom; + delete[] paramsArray[i].pme_c; + delete[] paramsArray[i].pve_c; + delete[] paramsArray[i].pme_tpm; + delete[] paramsArray[i].pme_fpkm; - denom = 0.0; - double *mw = model.getMW(); - for (int i = 0; i <= M; i++) { - pme_theta[i] = (mw[i] < EPSILON ? 0.0 : pme_theta[i] / mw[i]); - denom += pme_theta[i]; + delete[] paramsArray[i].pve_c_genes; + if (alleleS) delete[] paramsArray[i].pve_c_trans; } - assert(denom >= EPSILON); - for (int i = 0; i <= M; i++) pme_theta[i] /= denom; - - //calculate tau values - double *tau = new double[M + 1]; - memset(tau, 0, sizeof(double) * (M + 1)); + delete[] paramsArray; - denom = 0.0; - for (int i = 1; i <= M; i++) - if (eel[i] > EPSILON) { - tau[i] = pme_theta[i] / eel[i]; - denom += tau[i]; - } - if (denom <= 0) { fprintf(stderr, "No alignable reads?!\n"); exit(-1); } - //assert(denom > 0); - for (int i = 1; i <= M; i++) { - tau[i] /= denom; + for (int i = 0; i <= M; i++) { + pme_c[i] /= NSAMPLES; + pve_c[i] = (pve_c[i] - double(NSAMPLES) * pme_c[i] * pme_c[i]) / double(NSAMPLES - 1); + if (pve_c[i] < 0.0) pve_c[i] = 0.0; + pme_tpm[i] /= NSAMPLES; + pme_fpkm[i] /= NSAMPLES; } - //isoform level results - sprintf(outF, "%s.iso_res", imdName); - fo = fopen(outF, "a"); - if (fo == NULL) { fprintf(stderr, "Cannot open %s!\n", outF); exit(-1); } - for (int i = 1; i <= M; i++) - fprintf(fo, "%.2f%c", pme_c[i], (i < M ? '\t' : '\n')); - for (int i = 1; i <= M; i++) - fprintf(fo, "%.15g%c", tau[i], (i < M ? '\t' : '\n')); - fclose(fo); - - //gene level results - sprintf(outF, "%s.gene_res", imdName); - fo = fopen(outF, "a"); - if (fo == NULL) { fprintf(stderr, "Cannot open %s!\n", outF); exit(-1); } - for (int i = 0; i < m; i++) { - double sumC = 0.0; // sum of pme counts - int b = gi.spAt(i), e = gi.spAt(i + 1); - for (int j = b; j < e; j++) { - sumC += pme_c[j]; - } - fprintf(fo, "%.15g%c", sumC, (i < m - 1 ? '\t' : '\n')); - } for (int i = 0; i < m; i++) { - double sumT = 0.0; // sum of tau values - int b = gi.spAt(i), e = gi.spAt(i + 1); - for (int j = b; j < e; j++) { - sumT += tau[j]; - } - fprintf(fo, "%.15g%c", sumT, (i < m - 1 ? '\t' : '\n')); + int b = gi.spAt(i), e = gi.spAt(i + 1); + double pme_c_gene = 0.0; + for (int j = b; j < e; j++) pme_c_gene += pme_c[j]; + pve_c_genes[i] = (pve_c_genes[i] - double(NSAMPLES) * pme_c_gene * pme_c_gene) / double(NSAMPLES - 1); + if (pve_c_genes[i] < 0.0) pve_c_genes[i] = 0.0; } - fclose(fo); - delete[] tau; - - if (verbose) { printf("Gibbs based expression values are written!\n"); } + if (alleleS) + for (int i = 0; i < m_trans; i++) { + int b = ta.spAt(i), e = ta.spAt(i + 1); + double pme_c_tran = 0.0; + for (int j = b; j < e; j++) pme_c_tran += pme_c[j]; + pve_c_trans[i] = (pve_c_trans[i] - double(NSAMPLES) * pme_c_tran * pme_c_tran) / double(NSAMPLES - 1); + if (pve_c_trans[i] < 0.0) pve_c_trans[i] = 0.0; + } } - int main(int argc, char* argv[]) { if (argc < 7) { - printf("Usage: rsem-run-gibbs reference_name sample_name sampleToken BURNIN CHAINLEN GAP [-q]\n"); + printf("Usage: rsem-run-gibbs reference_name imdName statName BURNIN NSAMPLES GAP [-p #Threads] [--seed seed] [-q]\n"); exit(-1); } + strcpy(refName, argv[1]); + strcpy(imdName, argv[2]); + strcpy(statName, argv[3]); + BURNIN = atoi(argv[4]); - CHAINLEN = atoi(argv[5]); + NSAMPLES = atoi(argv[5]); GAP = atoi(argv[6]); - sprintf(imdName, "%s.temp/%s", argv[2], argv[3]); - sprintf(statName, "%s.stat/%s", argv[2], argv[3]); - load_data(argv[1], statName, imdName); + nThreads = 1; + hasSeed = false; quiet = false; - if (argc > 7 && !strcmp(argv[7], "-q")) { - quiet = true; + + for (int i = 7; i < argc; i++) { + if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]); + if (!strcmp(argv[i], "--seed")) { + hasSeed = true; + int len = strlen(argv[i + 1]); + seed = 0; + for (int k = 0; k < len; k++) seed = seed * 10 + (argv[i + 1][k] - '0'); + } + if (!strcmp(argv[i], "-q")) quiet = true; } verbose = !quiet; - init(); - Gibbs(imdName); + assert(NSAMPLES > 1); // Otherwise, we cannot calculate posterior variance + + if (nThreads > NSAMPLES) { + nThreads = NSAMPLES; + printf("Warning: Number of samples is less than number of threads! Change the number of threads to %d!\n", nThreads); + } + + load_group_info(refName); + load_data(refName, statName, imdName); sprintf(modelF, "%s.model", statName); FILE *fi = fopen(modelF, "r"); - if (fi == NULL) { fprintf(stderr, "Cannot open %s!\n", modelF); exit(-1); } - fscanf(fi, "%d", &model_type); + general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!"); + assert(fscanf(fi, "%d", &model_type) == 1); fclose(fi); + mw = new double[M + 1]; // make an extra copy + switch(model_type) { - case 0 : writeEstimatedParameters(modelF, imdName); break; - case 1 : writeEstimatedParameters(modelF, imdName); break; - case 2 : writeEstimatedParameters(modelF, imdName); break; - case 3 : writeEstimatedParameters(modelF, imdName); break; + case 0 : init_model_related(modelF); break; + case 1 : init_model_related(modelF); break; + case 2 : init_model_related(modelF); break; + case 3 : init_model_related(modelF); break; } + if (verbose) printf("Gibbs started!\n"); + + init(); + for (int i = 0; i < nThreads; i++) { + rc = pthread_create(&threads[i], &attr, Gibbs, (void*)(¶msArray[i])); + pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0)!"); + } + for (int i = 0; i < nThreads; i++) { + rc = pthread_join(threads[i], NULL); + pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0)!"); + } + release(); + + if (verbose) printf("Gibbs finished!\n"); + + writeResultsGibbs(M, m, m_trans, gi, gt, ta, alleleS, imdName, pme_c, pme_fpkm, pme_tpm, pve_c, pve_c_genes, pve_c_trans); + + delete mw; // delete the copy + return 0; }