]> git.donarmstrong.com Git - fastq-tools.git/blob - src/fastq-sort.c
c8f80840232a06dd3ec729edfb3401b76d0b89fc
[fastq-tools.git] / src / fastq-sort.c
1 /*
2  * This file is part of fastq-tools.
3  *
4  * Copyright (c) 2012 by Daniel C. Jones <dcjones@cs.washington.edu>
5  *
6  * fastq-sort:
7  * Sort fastq files efficiently.
8  *
9  */
10
11 #include <assert.h>
12 #include <ctype.h>
13 #include <getopt.h>
14 #include <string.h>
15 #include <time.h>
16 #include <unistd.h>
17
18 #include "common.h"
19 #include "parse.h"
20
21
22 /* User comparison function. */
23 static int (*user_cmp)(const void*, const void*);
24 static int (*cmp)(const void*, const void*);
25
26 int rev_cmp(const void* a, const void* b)
27 {
28     return -user_cmp(a, b);
29 }
30
31
32 /* A collection of filenames of sorted chunks of fastq. */
33 typedef struct seq_dumps_t_
34 {
35     char** fns;
36     size_t n;
37     size_t size;
38 } seq_dumps_t;
39
40
41 seq_dumps_t* seq_dumps_create()
42 {
43     seq_dumps_t* d = malloc_or_die(sizeof(seq_dumps_t));
44     d->n = 0;
45     d->size = 64;
46     d->fns = malloc_or_die(d->size * sizeof(char*));
47     return d;
48 }
49
50
51 void seq_dumps_free(seq_dumps_t* d)
52 {
53     size_t i;
54     for (i = 0; i < d->n; ++i) {
55         if (unlink(d->fns[i]) == -1) {
56             fprintf(stderr, "Warning: unable to remove temporary file %s.\n",
57                     d->fns[i]);
58         }
59         free(d->fns[i]);
60     }
61     free(d->fns);
62     free(d);
63 }
64
65
66 static inline size_t heap_parent(size_t i) { return (i - 1) / 2; }
67 static inline size_t heap_left  (size_t i) { return 2 * i + 1; }
68 static inline size_t heap_right (size_t i) { return 2 * i + 2; }
69
70
71 /* Enqueue an item in the heap.
72  *
73  * Args:
74  *   heap: A binary heap in which stores indexs into seqs.
75  *   n: Fixed maximum size of the heap.
76  *   m: Current size of the heap.
77  *   seqs: Sequences.
78  *   idx: Index to enqueue.
79  *   cmp: Comparison function.
80  *
81  */
82 void heap_push(size_t* heap, size_t n, size_t* m, seq_t** seqs,
83                int (*cmp)(const void*, const void*), size_t idx)
84 {
85     if (*m >= n) return;
86     size_t tmp, j, i = (*m)++;
87     heap[i] = idx;
88
89     /* percolate up */
90     while (i > 0) {
91         j = heap_parent(i);
92         if (cmp(seqs[heap[i]], seqs[heap[j]]) < 0) {
93             tmp = heap[i];
94             heap[i] = heap[j];
95             heap[j] = tmp;
96             i = j;
97         }
98         else break;
99     }
100 }
101
102
103 /* Dequeue an item from a heap. */
104 size_t heap_pop(size_t* heap, size_t* m, seq_t** seqs,
105                int (*cmp)(const void*, const void*))
106 {
107     assert(*m > 0);
108     size_t ans = heap[0];
109     heap[0] = heap[--(*m)];
110     size_t tmp, l, r, j, i = 0;
111     while (true) {
112         l = heap_left(i);
113         r = heap_right(i);
114
115         if (l >= (*m)) break;
116
117         if (r >= (*m)) {
118             j = l;
119         }
120         else {
121             j = cmp(seqs[heap[l]], seqs[heap[r]]) < 0 ? l : r;
122         }
123
124         if (cmp(seqs[heap[i]], seqs[heap[j]]) > 0) {
125             tmp = heap[i];
126             heap[i] = heap[j];
127             heap[j] = tmp;
128             i = j;
129         }
130         else break;
131     }
132
133     return ans;
134 }
135
136
137 /* n-way merge sort to stdout */
138 void merge_sort(const seq_dumps_t* d, int (*cmp)(const void*, const void*))
139 {
140     FILE** files = malloc_or_die(d->n * sizeof(FILE*));
141     size_t i;
142     for (i = 0; i < d->n; ++i) {
143         files[i] = fopen(d->fns[i], "rb");
144         if (files[i] == NULL) {
145             fprintf(stderr, "Cannot open temporary file %s for reading.\n",
146                     d->fns[i]);
147             exit(EXIT_FAILURE);
148         }
149     }
150
151     fastq_t** fs = malloc_or_die(d->n * sizeof(fastq_t*));
152     seq_t** seqs = malloc_or_die(d->n * sizeof(seq_t*));
153     for (i = 0; i < d->n; ++i) {
154         fs[i] = fastq_create(files[i]);
155         seqs[i] = seq_create();
156     }
157
158     /* A binary heap of indexes to fs. We use this to repeatedly pop the
159      * smallest fastq entry. */
160     size_t* heap = malloc_or_die(d->n * sizeof(size_t));
161
162     /* heap size */
163     size_t m = 0;
164
165     for (i = 0; i < d->n; ++i) {
166         if (fastq_read(fs[i], seqs[i])) {
167             heap_push(heap, d->n, &m, seqs, cmp, i);
168         }
169     }
170
171     while (m > 0) {
172         i = heap_pop(heap, &m, seqs, cmp);
173         fastq_print(stdout, seqs[i]);
174         if (fastq_read(fs[i], seqs[i])) {
175             heap_push(heap, d->n, &m, seqs, cmp, i);
176         }
177     }
178
179     for (i = 0; i < d->n; ++i) {
180         seq_free(seqs[i]);
181         fastq_free(fs[i]);
182         fclose(files[i]);
183     }
184
185     free(files);
186     free(fs);
187 }
188
189
190 typedef struct seq_array_t_
191 {
192     seq_t* seqs;
193
194     /* Number of seq objects. */
195     size_t n;
196
197     /* Size reserved in seqs. */
198     size_t size;
199
200     /* Space reserved for strings. */
201     char* data;
202
203     /* Data used. */
204     size_t data_used;
205
206     /* Total data size. */
207     size_t data_size;
208 } seq_array_t;
209
210
211 seq_array_t* seq_array_create(size_t data_size)
212 {
213     seq_array_t* a = malloc_or_die(sizeof(seq_array_t));
214     a->size = 1024;
215     a->n = 0;
216     a->seqs = malloc_or_die(a->size * sizeof(seq_t));
217
218     a->data_size = data_size;
219     a->data_used = 0;
220     a->data = malloc_or_die(data_size);
221
222     return a;
223 }
224
225
226 void seq_array_free(seq_array_t* a)
227 {
228     free(a->seqs);
229     free(a->data);
230     free(a);
231 }
232
233
234 /* Push a fastq entry to back of the array. Return false if there was not enough
235  * space. */
236 bool seq_array_push(seq_array_t* a, const seq_t* seq)
237 {
238     size_t size_needed = (seq->id1.n + 1) + (seq->seq.n + 1) +
239                          (seq->id2.n + 1) + (seq->qual.n + 1);
240
241     if (size_needed > a->data_size - a->data_used) return false;
242
243     if (a->n == a->size) {
244         a->size *= 2;
245         a->seqs = realloc_or_die(a->seqs, a->size * sizeof(seq_t));
246     }
247
248     memcpy(&a->data[a->data_used], seq->id1.s, seq->id1.n + 1);
249     a->seqs[a->n].id1.s = &a->data[a->data_used];
250     a->seqs[a->n].id1.n = seq->id1.n;
251     a->data_used += seq->id1.n + 1;
252
253     memcpy(&a->data[a->data_used], seq->seq.s, seq->seq.n + 1);
254     a->seqs[a->n].seq.s = &a->data[a->data_used];
255     a->seqs[a->n].seq.n = seq->seq.n;
256     a->data_used += seq->seq.n + 1;
257
258     memcpy(&a->data[a->data_used], seq->id2.s, seq->id2.n + 1);
259     a->seqs[a->n].id2.s = &a->data[a->data_used];
260     a->seqs[a->n].id2.n = seq->id2.n;
261     a->data_used += seq->id2.n + 1;
262
263     memcpy(&a->data[a->data_used], seq->qual.s, seq->qual.n + 1);
264     a->seqs[a->n].qual.s = &a->data[a->data_used];
265     a->seqs[a->n].qual.n = seq->qual.n;
266     a->data_used += seq->qual.n + 1;
267
268     ++a->n;
269
270     return true;
271 }
272
273
274 void seq_array_clear(seq_array_t* a)
275 {
276     a->n = 0;
277     a->data_used = 0;
278 }
279
280
281 void seq_array_sort(seq_array_t* a, int (*cmp)(const void*, const void*))
282 {
283     qsort(a->seqs, a->n, sizeof(seq_t), cmp);
284 }
285
286
287 int seq_cmp_hash(const void* a, const void* b)
288 {
289     uint32_t ha = seq_hash((seq_t*) a);
290     uint32_t hb = seq_hash((seq_t*) b);
291
292     if      (ha < hb) return -1;
293     else if (ha > hb) return  1;
294     else              return  0;
295     return 0;
296 }
297
298
299 int seq_cmp_id(const void* a, const void* b)
300 {
301     return strcmp(((seq_t*) a)->id1.s, ((seq_t*) b)->id1.s);
302 }
303
304
305 int seq_cmp_seq(const void* a, const void* b)
306 {
307     return strcmp(((seq_t*) a)->seq.s, ((seq_t*) b)->seq.s);
308 }
309
310
311 static float seq_gc(const seq_t* s)
312 {
313     if (s->seq.n == 0) return 0.0;
314
315     float gc = 0;
316     size_t i;
317     for (i = 0; i < s->seq.n; ++i) {
318         if (s->seq.s[i] == 'g' || s->seq.s[i] == 'G' ||
319             s->seq.s[i] == 'c' || s->seq.s[i] == 'C') {
320             ++gc;
321         }
322     }
323
324     return gc / (float) s->seq.n;
325 }
326
327
328 int seq_cmp_gc(const void* a, const void* b)
329 {
330     float gc_a = seq_gc((seq_t*) a);
331     float gc_b = seq_gc((seq_t*) b);
332
333     if      (gc_a < gc_b) return -1;
334     else if (gc_a > gc_b) return 1;
335     else                  return 0;
336 }
337
338
339 static float seq_mean_qual(const seq_t* s)
340 {
341     if (s->qual.n == 0) return 0.0;
342
343     float q = 0.0;
344     size_t i;
345     for (i = 0; i < s->qual.n; ++i) {
346         q += (float) s->qual.s[i];
347     }
348     return q / (float) s->qual.n;
349 }
350
351
352 int seq_cmp_mean_qual(const void* a, const void* b)
353 {
354     float mq_a = seq_mean_qual((seq_t*) a);
355     float mq_b = seq_mean_qual((seq_t*) b);
356
357     if      (mq_a < mq_b) return -1;
358     else if (mq_a > mq_b) return 1;
359     else                  return 0;
360 }
361
362
363 void seq_array_dump(seq_dumps_t* d, const seq_array_t* a)
364 {
365     const char* template = "/tmp/fastq_sort.XXXXXXXX";
366     char* fn = malloc_or_die(strlen(template) + 1);
367     memcpy(fn, template, strlen(template) + 1);
368     if (mktemp(fn) == NULL) {
369         fprintf(stderr, "Unable to create a temporary file.\n");
370         exit(EXIT_FAILURE);
371     }
372
373     FILE* f = fopen(fn, "wb");
374     if (f == NULL) {
375         fprintf(stderr, "Unable to open temporary file %s for writing.\n", fn);
376         exit(EXIT_FAILURE);
377     }
378
379     size_t i;
380     for (i = 0; i < a->n; ++i) {
381         fastq_print(f, &a->seqs[i]);
382     }
383
384     if (d->n == d->size) {
385         d->size *= 2;
386         d->fns = realloc_or_die(d->fns, d->size * sizeof(char*));
387     }
388     d->fns[d->n++] = fn;
389
390     fclose(f);
391 }
392
393
394 static const char* prog_name = "fastq-sort";
395
396
397 void print_help()
398 {
399     fprintf(stdout,
400 "fastq-sort [OPTION]... [FILE]...\n"
401 "Concatenate and sort FASTQ files and write to standard output.\n"
402 "Options:\n"
403 "  -r, --reverse      sort in reverse (i.e., descending) order\n"
404 "  -I, --id           sort alphabetically by read identifier\n"
405 "  -S, --seq          sort alphabetically by sequence\n"
406 "  -R, --random       randomly shuffle the sequences\n"
407 "      --seed[=SEED]  seed to use for random shuffle.\n"
408 "  -G, --gc           sort by GC content\n"
409 "  -M, --mean-qual  sort by median quality score\n" /* TODO */
410 "  -h, --help         print this message\n"
411 "  -V, --version      output version information and exit\n"
412    );
413 }
414
415
416 /* Parse a size specification, which is just a number with a K, M, G suffix. */
417 size_t parse_size(const char* str)
418 {
419     char* endptr;
420     unsigned long size = strtoul(str, &endptr, 10);
421
422     if      (toupper(*endptr) == 'K') size *= 1000;
423     else if (toupper(*endptr) == 'M') size *= 1000000;
424     else if (toupper(*endptr) == 'G') size *= 1000000000;
425
426     return size;
427 }
428
429
430 int main(int argc, char* argv[])
431 {
432     int opt, opt_idx;
433     size_t buffer_size = 100000000;
434     bool reverse_sort = false;
435     user_cmp = seq_cmp_id;
436
437     static struct option long_options[] =
438     {
439         {"buffer-size", required_argument, NULL, 'S'},
440         {"reverse",     no_argument,       NULL, 'r'},
441         {"id",          no_argument,       NULL, 'i'},
442         {"seq",         no_argument,       NULL, 's'},
443         {"random",      no_argument,       NULL, 'R'},
444         {"seed",        optional_argument, NULL, 0},
445         {"gc",          no_argument,       NULL, 'G'},
446         {"mean-qual",   no_argument,       NULL, 'M'},
447         {"help",        no_argument,       NULL, 'h'},
448         {"version",     no_argument,       NULL, 'V'},
449         {0, 0, 0, 0}
450     };
451
452     while (true) {
453         opt = getopt_long(argc, argv, "S:risRGhV", long_options, &opt_idx);
454         if (opt == -1) break;
455
456         switch (opt) {
457             case 'S':
458                 buffer_size = parse_size(optarg);
459                 break;
460
461             case 'r':
462                 reverse_sort = true;
463                 break;
464
465             case 'i':
466                 user_cmp = seq_cmp_id;
467                 break;
468
469             case 's':
470                 user_cmp = seq_cmp_seq;
471                 break;
472
473             case 'R':
474                 user_cmp = seq_cmp_hash;
475                 break;
476
477             case 'G':
478                 user_cmp = seq_cmp_gc;
479                 break;
480
481             case 'M':
482                 user_cmp = seq_cmp_mean_qual;
483                 break;
484
485             case 'h':
486                 print_help();
487                 return 0;
488
489             case 'V':
490                 print_version(stdout, prog_name);
491                 return 0;
492
493             case 0:
494                 if (strcmp(long_options[opt_idx].name, "seed") == 0) {
495                     uint32_t seed;
496                     if (optarg) {
497                         seed = strtoul(optarg, NULL, 10);
498                     }
499                     else {
500                         seed = (uint32_t) time(NULL);
501                     }
502                     seq_hash_set_seed(seed);
503                 }
504                 break;
505
506             case '?':
507                 return 1;
508
509             default:
510                 abort();
511         }
512     }
513
514     cmp = reverse_sort ? rev_cmp : user_cmp;
515
516     seq_array_t* a = seq_array_create(buffer_size);
517     seq_dumps_t* d = seq_dumps_create();
518     seq_t* seq = seq_create();
519
520     fastq_t* f;
521     if (optind >= argc) {
522         f = fastq_create(stdin);
523         while (fastq_read(f, seq)) {
524             if (!seq_array_push(a, seq)) {
525                 seq_array_sort(a, cmp);
526                 seq_array_dump(d, a);
527                 seq_array_clear(a);
528                 if (!seq_array_push(a, seq)) {
529                     fprintf(stderr, "The buffer size is to small.\n");
530                     return EXIT_FAILURE;
531                 }
532             }
533         }
534         fastq_free(f);
535     }
536     else {
537         FILE* file;
538         for (; optind < argc; ++optind) {
539             file = fopen(argv[optind], "rb");
540             if (file == NULL) {
541                 fprintf(stderr, "Cannot open %s for reading.\n", argv[optind]);
542                 return EXIT_FAILURE;
543             }
544             f = fastq_create(file);
545
546             while (fastq_read(f, seq)) {
547                 if (!seq_array_push(a, seq)) {
548                     seq_array_sort(a, cmp);
549                     seq_array_dump(d, a);
550                     seq_array_clear(a);
551                     if (!seq_array_push(a, seq)) {
552                         fprintf(stderr, "The buffer size is to small.\n");
553                         return EXIT_FAILURE;
554                     }
555                 }
556             }
557
558             fastq_free(f);
559             fclose(file);
560         }
561     }
562
563     if (a->n > 0) {
564         seq_array_sort(a, cmp);
565
566         /* We were able to sort everything in memory. */
567         if (d->n == 0) {
568             size_t i;
569             for (i = 0; i < a->n; ++i) {
570                 fastq_print(stdout, &a->seqs[i]);
571             }
572         }
573         else {
574             seq_array_dump(d, a);
575             merge_sort(d, cmp);
576         }
577     }
578
579     seq_dumps_free(d);
580     seq_free(seq);
581     seq_array_free(a);
582
583     return EXIT_SUCCESS;
584 }
585
586