]> git.donarmstrong.com Git - rsem.git/blob - calcCI.cpp
Updated WHAT_IS_NEW
[rsem.git] / calcCI.cpp
1 #include<ctime>
2 #include<cstdio>
3 #include<cstring>
4 #include<cstdlib>
5 #include<cassert>
6 #include<fstream>
7 #include<algorithm>
8 #include<vector>
9 #include<pthread.h>
10
11 #include "utils.h"
12 #include "my_assert.h"
13 #include "sampling.h"
14
15 #include "Model.h"
16 #include "SingleModel.h"
17 #include "SingleQModel.h"
18 #include "PairedEndModel.h"
19 #include "PairedEndQModel.h"
20
21 #include "Refs.h"
22 #include "GroupInfo.h"
23 #include "WriteResults.h"
24
25 #include "Buffer.h"
26
27 using namespace std;
28
29 struct Params {
30         int no;
31         FILE *fi;
32         engine_type *engine;
33         const double *mw;
34 };
35
36 struct CIParams {
37         int no;
38         int start_gene_id, end_gene_id;
39 };
40
41 struct CIType {
42         float lb, ub; // the interval is [lb, ub]
43
44         CIType() { lb = ub = 0.0; }
45 };
46
47 int model_type;
48
49 int nMB;
50 double confidence;
51 int nCV, nSpC, nSamples; // nCV: number of count vectors; nSpC: number of theta vectors sampled per count vector; nSamples: nCV * nSpC
52 int nThreads;
53
54 float *l_bars;
55
56 char cvsF[STRLEN], tmpF[STRLEN], command[STRLEN];
57
58 CIType *tpm, *fpkm;
59 CIType *iso_tpm = NULL, *iso_fpkm = NULL;
60 CIType *gene_tpm, *gene_fpkm;
61
62 int M, m;
63 Refs refs;
64 GroupInfo gi;
65 char refName[STRLEN], imdName[STRLEN], statName[STRLEN];
66 char modelF[STRLEN], groupF[STRLEN], refF[STRLEN];
67
68 bool alleleS;
69 int m_trans;
70 GroupInfo ta;
71
72 vector<double> eel; //expected effective lengths
73
74 Buffer *buffer;
75
76 bool quiet;
77
78 Params *paramsArray;
79 pthread_t *threads;
80 pthread_attr_t attr;
81 int rc;
82
83 bool hasSeed;
84 seedType seed;
85
86 CIParams *ciParamsArray;
87
88 void* sample_theta_from_c(void* arg) {
89         int *cvec;
90         double *theta;
91         float *tpm;
92         gamma_dist **gammas;
93         gamma_generator **rgs;
94
95         Params *params = (Params*)arg;
96         FILE *fi = params->fi;
97         const double *mw = params->mw;
98
99         cvec = new int[M + 1];
100         theta = new double[M + 1];
101         gammas = new gamma_dist*[M + 1];
102         rgs = new gamma_generator*[M + 1];
103         tpm = new float[M + 1];
104         float l_bar; // the mean transcript length over the sample
105
106         int cnt = 0;
107         while (fscanf(fi, "%d", &cvec[0]) == 1) {
108                 for (int j = 1; j <= M; j++) assert(fscanf(fi, "%d", &cvec[j]) == 1);
109                 assert(cvec[0] > 0);
110
111                 ++cnt;
112
113                 for (int j = 0; j <= M; j++) {
114                   gammas[j] = NULL; rgs[j] = NULL;
115                   if (cvec[j] > 0) {
116                     gammas[j] = new gamma_dist(cvec[j]);
117                     rgs[j] = new gamma_generator(*(params->engine), *gammas[j]);
118                   }
119                 }
120                   
121                 for (int i = 0; i < nSpC; i++) {
122                         double sum = 0.0;
123                         for (int j = 0; j <= M; j++) {
124                                 theta[j] = ((j == 0 || (cvec[j] > 0 && eel[j] >= EPSILON && mw[j] >= EPSILON)) ? (*rgs[j])() / mw[j] : 0.0);
125                                 sum += theta[j];
126                         }
127                         assert(sum >= EPSILON);
128                         for (int j = 0; j <= M; j++) theta[j] /= sum;
129
130                         sum = 0.0;
131                         tpm[0] = 0.0;
132                         for (int j = 1; j <= M; j++)
133                                 if (eel[j] >= EPSILON) {
134                                         tpm[j] = theta[j] / eel[j];
135                                         sum += tpm[j];
136                                 }
137                                 else assert(theta[j] < EPSILON);
138                         assert(sum >= EPSILON);
139                         l_bar = 0.0; // store mean effective length of the sample
140                         for (int j = 1; j <= M; j++) { tpm[j] /= sum; l_bar += tpm[j] * eel[j]; tpm[j] *= 1e6; }
141                         buffer->write(l_bar, tpm + 1); // ommit the first element in tpm
142                 }
143
144                 for (int j = 0; j <= M; j++) {
145                   if (gammas[j] != NULL) delete gammas[j];
146                   if (rgs[j] != NULL) delete rgs[j];
147                 }
148
149                 if (verbose && cnt % 100 == 0) { printf("Thread %d, %d count vectors are processed!\n", params->no, cnt); }
150         }
151
152         delete[] cvec;
153         delete[] theta;
154         delete[] gammas;
155         delete[] rgs;
156         delete[] tpm;
157
158         return NULL;
159 }
160
161 template<class ModelType>
162 void sample_theta_vectors_from_count_vectors() {
163         ModelType model;
164         model.read(modelF);
165         calcExpectedEffectiveLengths<ModelType>(M, refs, model, eel);
166
167         int num_threads = min(nThreads, nCV);
168
169         buffer = new Buffer(nMB, nSamples, M, l_bars, tmpF);
170
171         paramsArray = new Params[num_threads];
172         threads = new pthread_t[num_threads];
173
174         char inpF[STRLEN];
175         hasSeed ? engineFactory::init(seed) : engineFactory::init();
176         for (int i = 0; i < num_threads; i++) {
177                 paramsArray[i].no = i;
178                 sprintf(inpF, "%s%d", cvsF, i);
179                 paramsArray[i].fi = fopen(inpF, "r");
180                 paramsArray[i].engine = engineFactory::new_engine();
181                 paramsArray[i].mw = model.getMW();
182         }
183         engineFactory::finish();
184
185         /* set thread attribute to be joinable */
186         pthread_attr_init(&attr);
187         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
188
189         for (int i = 0; i < num_threads; i++) {
190                 rc = pthread_create(&threads[i], &attr, &sample_theta_from_c, (void*)(&paramsArray[i]));
191                 pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
192         }
193         for (int i = 0; i < num_threads; i++) {
194                 rc = pthread_join(threads[i], NULL);
195                 pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in sample_theta_vectors_from_count_vectors!");
196         }
197
198         /* destroy attribute */
199         pthread_attr_destroy(&attr);
200         delete[] threads;
201
202         for (int i = 0; i < num_threads; i++) {
203                 fclose(paramsArray[i].fi);
204                 delete paramsArray[i].engine;
205         }
206         delete[] paramsArray;
207
208         delete buffer; // Must delete here, force the content left in the buffer be written into the disk
209
210         if (verbose) { printf("Sampling is finished!\n"); }
211 }
212
213 void calcCI(int nSamples, float *samples, float &lb, float &ub) {
214         int p, q; // p pointer for lb, q pointer for ub;
215         int newp, newq;
216         int threshold = nSamples - (int(confidence * nSamples - 1e-8) + 1);
217         int nOutside = 0;
218
219         sort(samples, samples + nSamples);
220
221         p = 0; q = nSamples - 1;
222         newq = nSamples - 1;
223         do {
224                 q = newq;
225                 while (newq > 0 && samples[newq - 1] == samples[newq]) newq--;
226                 newq--;
227         } while (newq >= 0 && nSamples - (newq + 1) <= threshold);
228
229         nOutside = nSamples - (q + 1);
230
231         lb = -1e30; ub = 1e30;
232         do {
233                 if (samples[q] - samples[p] < ub - lb) {
234                         lb = samples[p];
235                         ub = samples[q];
236                 }
237
238                 newp = p;
239                 while (newp < nSamples - 1 && samples[newp] == samples[newp + 1]) newp++;
240                 newp++;
241                 if (newp <= threshold) {
242                         nOutside += newp - p;
243                         p = newp;
244                         while (nOutside > threshold && q < nSamples - 1) {
245                                 newq = q + 1;
246                                 while (newq < nSamples - 1 && samples[newq] == samples[newq + 1]) newq++;
247                                 nOutside -= newq - q;
248                                 q = newq;
249                         }
250                         assert(nOutside <= threshold);
251                 }
252                 else p = newp;
253         } while (p <= threshold);
254 }
255
256 void* calcCI_batch(void* arg) {
257         float *tsamples, *fsamples;
258         float *itsamples = NULL, *ifsamples = NULL, *gtsamples, *gfsamples;
259         ifstream fin;
260         CIParams *ciParams = (CIParams*)arg;
261         int curtid, curaid, tid;
262
263         tsamples = new float[nSamples];
264         fsamples = new float[nSamples];
265         if (alleleS) { 
266           itsamples = new float[nSamples];
267           ifsamples = new float[nSamples];
268         }
269         gtsamples = new float[nSamples];
270         gfsamples = new float[nSamples];
271
272         fin.open(tmpF, ios::binary);
273         // minus 1 here for that theta0 is not written!
274         streampos pos = streampos(gi.spAt(ciParams->start_gene_id) - 1) * nSamples * FLOATSIZE;
275         fin.seekg(pos, ios::beg);
276
277         int cnt = 0;
278         if (alleleS) {
279           curtid = curaid = -1;
280           memset(itsamples, 0, FLOATSIZE * nSamples);
281           memset(ifsamples, 0, FLOATSIZE * nSamples);
282         }
283         for (int i = ciParams->start_gene_id; i < ciParams->end_gene_id; i++) {
284                 int b = gi.spAt(i), e = gi.spAt(i + 1);
285                 memset(gtsamples, 0, FLOATSIZE * nSamples);
286                 memset(gfsamples, 0, FLOATSIZE * nSamples);
287                 for (int j = b; j < e; j++) {
288                         if (alleleS) {
289                           tid = ta.gidAt(j);
290                           if (curtid != tid) {
291                             if (curtid >= 0) {
292                               if (j - curaid > 1) {
293                                 calcCI(nSamples, itsamples, iso_tpm[curtid].lb, iso_tpm[curtid].ub);
294                                 calcCI(nSamples, ifsamples, iso_fpkm[curtid].lb, iso_fpkm[curtid].ub);
295                               }
296                               else {
297                                 iso_tpm[curtid] = tpm[curaid];
298                                 iso_fpkm[curtid] = fpkm[curaid];
299                               }
300                             }
301                             curtid = tid;
302                             curaid = j;
303                           }
304                         }
305
306                         for (int k = 0; k < nSamples; k++) {
307                                 fin.read((char*)(&tsamples[k]), FLOATSIZE);
308                                 fsamples[k] = 1e3 / l_bars[k] * tsamples[k];
309                                 if (alleleS) {
310                                   itsamples[k] += tsamples[k];
311                                   ifsamples[k] += fsamples[k];
312                                 }
313                                 gtsamples[k] += tsamples[k];
314                                 gfsamples[k] += fsamples[k];
315                         }
316                         calcCI(nSamples, tsamples, tpm[j].lb, tpm[j].ub);
317                         calcCI(nSamples, fsamples, fpkm[j].lb, fpkm[j].ub);
318                 }
319
320                 if (e - b > 1) {
321                         calcCI(nSamples, gtsamples, gene_tpm[i].lb, gene_tpm[i].ub);
322                         calcCI(nSamples, gfsamples, gene_fpkm[i].lb, gene_fpkm[i].ub);
323                 }
324                 else {
325                         gene_tpm[i] = tpm[b];
326                         gene_fpkm[i] = fpkm[b];
327                 }
328
329                 ++cnt;
330                 if (verbose && cnt % 1000 == 0) { printf("In thread %d, %d genes are processed for CI calculation!\n", ciParams->no, cnt); }
331         }
332         fin.close();
333
334         if (alleleS && (curtid >= 0)) {
335           if (gi.spAt(ciParams->end_gene_id) - curaid > 1) {
336             calcCI(nSamples, itsamples, iso_tpm[curtid].lb, iso_tpm[curtid].ub);
337             calcCI(nSamples, ifsamples, iso_fpkm[curtid].lb, iso_fpkm[curtid].ub);
338           }
339           else {
340             iso_tpm[curtid] = tpm[curaid];
341             iso_fpkm[curtid] = fpkm[curaid];
342           }
343         }
344         
345         delete[] tsamples;
346         delete[] fsamples;
347         if (alleleS) {
348           delete[] itsamples;
349           delete[] ifsamples;
350         }
351         delete[] gtsamples;
352         delete[] gfsamples;
353
354         return NULL;
355 }
356
357 void calculate_credibility_intervals(char* imdName) {
358         FILE *fo;
359         char outF[STRLEN];
360         int num_threads = nThreads;
361
362         tpm = new CIType[M + 1];
363         fpkm = new CIType[M + 1];
364         if (alleleS) {
365           iso_tpm = new CIType[m_trans];
366           iso_fpkm = new CIType[m_trans];
367         }
368         gene_tpm = new CIType[m];
369         gene_fpkm = new CIType[m];
370
371         assert(M > 0);
372         int quotient = M / num_threads;
373         if (quotient < 1) { num_threads = M; quotient = 1; }
374         int cur_gene_id = 0;
375         int num_isoforms = 0;
376
377         // A just so so strategy for paralleling
378         ciParamsArray = new CIParams[num_threads];
379         for (int i = 0; i < num_threads; i++) {
380                 ciParamsArray[i].no = i;
381                 ciParamsArray[i].start_gene_id = cur_gene_id;
382                 num_isoforms = 0;
383
384                 while ((m - cur_gene_id > num_threads - i - 1) && (i == num_threads - 1 || num_isoforms < quotient)) {
385                         num_isoforms += gi.spAt(cur_gene_id + 1) - gi.spAt(cur_gene_id);
386                         ++cur_gene_id;
387                 }
388
389                 ciParamsArray[i].end_gene_id = cur_gene_id;
390         }
391
392         threads = new pthread_t[num_threads];
393
394         /* set thread attribute to be joinable */
395         pthread_attr_init(&attr);
396         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
397
398         // paralleling
399         for (int i = 0; i < num_threads; i++) {
400                 rc = pthread_create(&threads[i], &attr, &calcCI_batch, (void*)(&ciParamsArray[i]));
401                 pthread_assert(rc, "pthread_create", "Cannot create thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
402         }
403         for (int i = 0; i < num_threads; i++) {
404                 rc = pthread_join(threads[i], NULL);
405                 pthread_assert(rc, "pthread_join", "Cannot join thread " + itos(i) + " (numbered from 0) in calculate_credibility_intervals!");
406         }
407
408         // releasing resources
409
410         /* destroy attribute */
411         pthread_attr_destroy(&attr);
412         delete[] threads;
413
414         delete[] ciParamsArray;
415
416         alleleS ? sprintf(outF, "%s.allele_res", imdName) : sprintf(outF, "%s.iso_res", imdName);
417         fo = fopen(outF, "a");
418         for (int i = 1; i <= M; i++)
419           fprintf(fo, "%.6g%c", tpm[i].lb, (i < M ? '\t' : '\n'));
420         for (int i = 1; i <= M; i++)
421           fprintf(fo, "%.6g%c", tpm[i].ub, (i < M ? '\t' : '\n'));
422         for (int i = 1; i <= M; i++)
423           fprintf(fo, "%.6g%c", fpkm[i].lb, (i < M ? '\t' : '\n'));
424         for (int i = 1; i <= M; i++)
425           fprintf(fo, "%.6g%c", fpkm[i].ub, (i < M ? '\t' : '\n'));
426         fclose(fo);
427
428         if (alleleS) {
429           //isoform level results
430           sprintf(outF, "%s.iso_res", imdName);
431           fo = fopen(outF, "a");
432           for (int i = 0; i < m_trans; i++)
433             fprintf(fo, "%.6g%c", iso_tpm[i].lb, (i < m_trans - 1 ? '\t' : '\n'));
434           for (int i = 0; i < m_trans; i++)
435             fprintf(fo, "%.6g%c", iso_tpm[i].ub, (i < m_trans - 1 ? '\t' : '\n'));
436           for (int i = 0; i < m_trans; i++)
437             fprintf(fo, "%.6g%c", iso_fpkm[i].lb, (i < m_trans - 1 ? '\t' : '\n'));
438           for (int i = 0; i < m_trans; i++)
439             fprintf(fo, "%.6g%c", iso_fpkm[i].ub, (i < m_trans - 1 ? '\t' : '\n'));
440           fclose(fo);
441         }
442
443         //gene level results
444         sprintf(outF, "%s.gene_res", imdName);
445         fo = fopen(outF, "a");
446         for (int i = 0; i < m; i++)
447                 fprintf(fo, "%.6g%c", gene_tpm[i].lb, (i < m - 1 ? '\t' : '\n'));
448         for (int i = 0; i < m; i++)
449                 fprintf(fo, "%.6g%c", gene_tpm[i].ub, (i < m - 1 ? '\t' : '\n'));
450         for (int i = 0; i < m; i++)
451                 fprintf(fo, "%.6g%c", gene_fpkm[i].lb, (i < m - 1 ? '\t' : '\n'));
452         for (int i = 0; i < m; i++)
453                 fprintf(fo, "%.6g%c", gene_fpkm[i].ub, (i < m - 1 ? '\t' : '\n'));
454         fclose(fo);
455
456         delete[] tpm;
457         delete[] fpkm;
458         if (alleleS) { 
459           delete[] iso_tpm; 
460           delete[] iso_fpkm; 
461         }
462         delete[] gene_tpm;
463         delete[] gene_fpkm;
464
465         if (verbose) { printf("All credibility intervals are calculated!\n"); }
466 }
467
468 int main(int argc, char* argv[]) {
469         if (argc < 8) {
470                 printf("Usage: rsem-calculate-credibility-intervals reference_name imdName statName confidence nCV nSpC nMB [-p #Threads] [--seed seed] [-q]\n");
471                 exit(-1);
472         }
473
474         strcpy(refName, argv[1]);
475         strcpy(imdName, argv[2]);
476         strcpy(statName, argv[3]);
477
478         confidence = atof(argv[4]);
479         nCV = atoi(argv[5]);
480         nSpC = atoi(argv[6]);
481         nMB = atoi(argv[7]);
482
483         nThreads = 1;
484         quiet = false;
485         hasSeed = false;
486         for (int i = 8; i < argc; i++) {
487                 if (!strcmp(argv[i], "-p")) nThreads = atoi(argv[i + 1]);
488                 if (!strcmp(argv[i], "--seed")) {
489                   hasSeed = true;
490                   int len = strlen(argv[i + 1]);
491                   seed = 0;
492                   for (int k = 0; k < len; k++) seed = seed * 10 + (argv[i + 1][k] - '0');
493                 }
494                 if (!strcmp(argv[i], "-q")) quiet = true;
495         }
496         verbose = !quiet;
497
498         sprintf(refF, "%s.seq", refName);
499         refs.loadRefs(refF, 1);
500         M = refs.getM();
501
502         sprintf(groupF, "%s.grp", refName);
503         gi.load(groupF);
504         m = gi.getm();
505
506         // allele-specific 
507         alleleS = isAlleleSpecific(refName, NULL, &ta);
508         if (alleleS) m_trans = ta.getm();
509
510         nSamples = nCV * nSpC;
511         assert(nSamples > 0 && M > 0); // for Buffter.h: (bufsize_type)nSamples
512         l_bars = new float[nSamples];
513
514         sprintf(tmpF, "%s.tmp", imdName);
515         sprintf(cvsF, "%s.countvectors", imdName);
516
517         sprintf(modelF, "%s.model", statName);
518         FILE *fi = fopen(modelF, "r");
519         general_assert(fi != NULL, "Cannot open " + cstrtos(modelF) + "!");
520         assert(fscanf(fi, "%d", &model_type) == 1);
521         fclose(fi);
522
523         // Phase I
524         switch(model_type) {
525         case 0 : sample_theta_vectors_from_count_vectors<SingleModel>(); break;
526         case 1 : sample_theta_vectors_from_count_vectors<SingleQModel>(); break;
527         case 2 : sample_theta_vectors_from_count_vectors<PairedEndModel>(); break;
528         case 3 : sample_theta_vectors_from_count_vectors<PairedEndQModel>(); break;
529         }
530
531         // Phase II
532         calculate_credibility_intervals(imdName);
533
534         delete l_bars;
535
536         return 0;
537 }