X-Git-Url: https://git.donarmstrong.com/?p=rsem.git;a=blobdiff_plain;f=calcCI.cpp;h=89414f9b02f00ba641b8f987845aea8e977ef1b1;hp=3ae71157c58a023bb4cd7fe316e26c7b9e572663;hb=757263aa0ff5367f826468db30d983282e51e7f0;hpb=4a5e5138d3fc409e7ade5c14de7689612290f74f diff --git a/calcCI.cpp b/calcCI.cpp index 3ae7115..89414f9 100644 --- a/calcCI.cpp +++ b/calcCI.cpp @@ -27,8 +27,6 @@ using namespace std; struct Params { int no; FILE *fi; - bufsize_type size; - int sp; engine_type *engine; double *mw; }; @@ -117,14 +115,14 @@ void* sample_theta_from_c(void* arg) { Params *params = (Params*)arg; FILE *fi = params->fi; double *mw = params->mw; - Buffer buffer(params->size, params->sp, nSamples, cvlen, tmpF); cvec = new int[cvlen]; theta = new double[cvlen]; gammas = new gamma_dist*[cvlen]; rgs = new gamma_generator*[cvlen]; - float *vec = new float[cvlen]; + float **vecs = new float*[nSpC]; + for (int i = 0; i < nSpC; i++) vecs[i] = new float[cvlen]; int cnt = 0; while (fscanf(fi, "%d", &cvec[0]) == 1) { @@ -156,19 +154,20 @@ void* sample_theta_from_c(void* arg) { sum = 0.0; - vec[0] = theta[0]; + vecs[i][0] = theta[0]; for (int j = 1; j < cvlen; j++) if (eel[j] >= EPSILON) { - vec[j] = theta[j] / eel[j]; - sum += vec[j]; + vecs[i][j] = theta[j] / eel[j]; + sum += vecs[i][j]; } else assert(theta[j] < EPSILON); - assert(sum >= EPSILON); - for (int j = 1; j < cvlen; j++) vec[j] /= sum; - buffer.write(vec); + assert(sum >= EPSILON); + for (int j = 1; j < cvlen; j++) vecs[i][j] /= sum; } + buffer->write(nSpC, vecs); + for (int j = 0; j < cvlen; j++) { delete gammas[j]; delete rgs[j]; @@ -182,7 +181,8 @@ void* sample_theta_from_c(void* arg) { delete[] gammas; delete[] rgs; - delete[] vec; + for (int i = 0; i < nSpC; i++) delete[] vecs[i]; + delete[] vecs; return NULL; } @@ -193,20 +193,10 @@ void sample_theta_vectors_from_count_vectors() { model.read(modelF); calcExpectedEffectiveLengths(model); - char splitF[STRLEN]; - bufsize_type buf_maxcv = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / cvlen; - bufsize_type quotient, left; - int sum = 0; - sprintf(splitF, "%s.split", imdName); - FILE *fi = fopen(splitF, "r"); - int num_threads; - assert(fscanf(fi, "%d", &num_threads) == 1); - assert(num_threads <= nThreads); + int num_threads = min(nThreads, nCV); - quotient = buf_maxcv / num_threads; - assert(quotient > 0); - left = buf_maxcv % num_threads; + buffer = new Buffer(nMB, nSamples, cvlen, tmpF); paramsArray = new Params[num_threads]; threads = new pthread_t[num_threads]; @@ -216,25 +206,10 @@ void sample_theta_vectors_from_count_vectors() { paramsArray[i].no = i; sprintf(inpF, "%s%d", cvsF, i); paramsArray[i].fi = fopen(inpF, "r"); - - int num_samples; - assert(fscanf(fi, "%d", &num_samples) == 1); - num_samples *= nSpC; - if (bufsize_type(num_samples) <= quotient) paramsArray[i].size = num_samples; - else { - paramsArray[i].size = quotient; - if (left > 0) { ++paramsArray[i].size; --left; } - } - paramsArray[i].size *= cvlen * FLOATSIZE; - paramsArray[i].sp = sum; - sum += num_samples; - paramsArray[i].engine = engineFactory::new_engine(); paramsArray[i].mw = model.getMW(); } - fclose(fi); - /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); @@ -258,6 +233,8 @@ void sample_theta_vectors_from_count_vectors() { } delete[] paramsArray; + delete buffer; // Must delete here, force the content left in the buffer be written into the disk + if (verbose) { printf("Sampling is finished!\n"); } } @@ -344,25 +321,25 @@ void* calcCI_batch(void* arg) { void calculate_credibility_intervals(char* imdName) { FILE *fo; char outF[STRLEN]; + int num_threads = nThreads; iso_tau = new CIType[M + 1]; gene_tau = new CIType[m]; - // nThreads must be intact here. assert(M > 0); - int quotient = M / nThreads; - if (quotient < 1) { nThreads = M; quotient = 1; } + int quotient = M / num_threads; + if (quotient < 1) { num_threads = M; quotient = 1; } int cur_gene_id = 0; int num_isoforms = 0; // A just so so strategy for paralleling - ciParamsArray = new CIParams[nThreads]; - for (int i = 0; i < nThreads; i++) { + ciParamsArray = new CIParams[num_threads]; + for (int i = 0; i < num_threads; i++) { ciParamsArray[i].no = i; ciParamsArray[i].start_gene_id = cur_gene_id; num_isoforms = 0; - while ((m - cur_gene_id > nThreads - i - 1) && (i == nThreads - 1 || num_isoforms < quotient)) { + while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) { num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id); ++cur_gene_id; } @@ -370,18 +347,18 @@ void calculate_credibility_intervals(char* imdName) { ciParamsArray[i].end_gene_id = cur_gene_id; } - threads = new pthread_t[nThreads]; + threads = new pthread_t[num_threads]; /* set thread attribute to be joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); // paralleling - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i])); pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); } - for (int i = 0; i < nThreads; i++) { + for (int i = 0; i < num_threads; i++) { rc = pthread_join(threads[i], &status); pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!"); }