]> git.donarmstrong.com Git - fastq-tools.git/blob - src/fastq-sort.c
Fix a bug causing random sort to be not so random.
[fastq-tools.git] / src / fastq-sort.c
1 /*
2  * This file is part of fastq-tools.
3  *
4  * Copyright (c) 2012 by Daniel C. Jones <dcjones@cs.washington.edu>
5  *
6  * fastq-sort:
7  * Sort fastq files efficiently.
8  *
9  */
10
11 #include <assert.h>
12 #include <ctype.h>
13 #include <getopt.h>
14 #include <string.h>
15 #include <unistd.h>
16
17 #include "common.h"
18 #include "parse.h"
19
20
21 /* User comparison function. */
22 static int (*user_cmp)(const void*, const void*);
23 static int (*cmp)(const void*, const void*);
24
25 int rev_cmp(const void* a, const void* b)
26 {
27     return -user_cmp(a, b);
28 }
29
30
31 /* A collection of filenames of sorted chunks of fastq. */
32 typedef struct seq_dumps_t_
33 {
34     char** fns;
35     size_t n;
36     size_t size;
37 } seq_dumps_t;
38
39
40 seq_dumps_t* seq_dumps_create()
41 {
42     seq_dumps_t* d = malloc_or_die(sizeof(seq_dumps_t));
43     d->n = 0;
44     d->size = 64;
45     d->fns = malloc_or_die(d->size * sizeof(char*));
46     return d;
47 }
48
49
50 void seq_dumps_free(seq_dumps_t* d)
51 {
52     size_t i;
53     for (i = 0; i < d->n; ++i) {
54         if (unlink(d->fns[i]) == -1) {
55             fprintf(stderr, "Warning: unable to remove temporary file %s.\n",
56                     d->fns[i]);
57         }
58         free(d->fns[i]);
59     }
60     free(d->fns);
61     free(d);
62 }
63
64
65 static inline size_t heap_parent(size_t i) { return (i - 1) / 2; }
66 static inline size_t heap_left  (size_t i) { return 2 * i + 1; }
67 static inline size_t heap_right (size_t i) { return 2 * i + 2; }
68
69
70 /* Enqueue an item in the heap.
71  *
72  * Args:
73  *   heap: A binary heap in which stores indexs into seqs.
74  *   n: Fixed maximum size of the heap.
75  *   m: Current size of the heap.
76  *   seqs: Sequences.
77  *   idx: Index to enqueue.
78  *   cmp: Comparison function.
79  *
80  */
81 void heap_push(size_t* heap, size_t n, size_t* m, seq_t** seqs,
82                int (*cmp)(const void*, const void*), size_t idx)
83 {
84     if (*m >= n) return;
85     size_t tmp, j, i = (*m)++;
86     heap[i] = idx;
87
88     /* percolate up */
89     while (i > 0) {
90         j = heap_parent(i);
91         if (cmp(seqs[heap[i]], seqs[heap[j]]) < 0) {
92             tmp = heap[i];
93             heap[i] = heap[j];
94             heap[j] = tmp;
95             i = j;
96         }
97         else break;
98     }
99 }
100
101
102 /* Dequeue an item from a heap. */
103 size_t heap_pop(size_t* heap, size_t* m, seq_t** seqs,
104                int (*cmp)(const void*, const void*))
105 {
106     assert(*m > 0);
107     size_t ans = heap[0];
108     heap[0] = heap[--(*m)];
109     size_t tmp, l, r, j, i = 0;
110     while (true) {
111         l = heap_left(i);
112         r = heap_right(i);
113
114         if (l >= (*m)) break;
115
116         if (r >= (*m)) {
117             j = l;
118         }
119         else {
120             j = cmp(seqs[heap[l]], seqs[heap[r]]) < 0 ? l : r;
121         }
122
123         if (cmp(seqs[heap[i]], seqs[heap[j]]) > 0) {
124             tmp = heap[i];
125             heap[i] = heap[j];
126             heap[j] = tmp;
127             i = j;
128         }
129         else break;
130     }
131
132     return ans;
133 }
134
135
136 /* n-way merge sort to stdout */
137 void merge_sort(const seq_dumps_t* d, int (*cmp)(const void*, const void*))
138 {
139     FILE** files = malloc_or_die(d->n * sizeof(FILE*));
140     size_t i;
141     for (i = 0; i < d->n; ++i) {
142         files[i] = fopen(d->fns[i], "rb");
143         if (files[i] == NULL) {
144             fprintf(stderr, "Cannot open temporary file %s for reading.\n",
145                     d->fns[i]);
146             exit(EXIT_FAILURE);
147         }
148     }
149
150     fastq_t** fs = malloc_or_die(d->n * sizeof(fastq_t*));
151     seq_t** seqs = malloc_or_die(d->n * sizeof(seq_t*));
152     for (i = 0; i < d->n; ++i) {
153         fs[i] = fastq_create(files[i]);
154         seqs[i] = seq_create();
155     }
156
157     /* A binary heap of indexes to fs. We use this to repeatedly pop the
158      * smallest fastq entry. */
159     size_t* heap = malloc_or_die(d->n * sizeof(size_t));
160
161     /* heap size */
162     size_t m = 0;
163
164     for (i = 0; i < d->n; ++i) {
165         if (fastq_read(fs[i], seqs[i])) {
166             heap_push(heap, d->n, &m, seqs, cmp, i);
167         }
168     }
169
170     while (m > 0) {
171         i = heap_pop(heap, &m, seqs, cmp);
172         fastq_print(stdout, seqs[i]);
173         if (fastq_read(fs[i], seqs[i])) {
174             heap_push(heap, d->n, &m, seqs, cmp, i);
175         }
176     }
177
178     for (i = 0; i < d->n; ++i) {
179         seq_free(seqs[i]);
180         fastq_free(fs[i]);
181         fclose(files[i]);
182     }
183
184     free(files);
185     free(fs);
186 }
187
188
189 typedef struct seq_array_t_
190 {
191     seq_t* seqs;
192
193     /* Number of seq objects. */
194     size_t n;
195
196     /* Size reserved in seqs. */
197     size_t size;
198
199     /* Space reserved for strings. */
200     char* data;
201
202     /* Data used. */
203     size_t data_used;
204
205     /* Total data size. */
206     size_t data_size;
207 } seq_array_t;
208
209
210 seq_array_t* seq_array_create(size_t data_size)
211 {
212     seq_array_t* a = malloc_or_die(sizeof(seq_array_t));
213     a->size = 1024;
214     a->n = 0;
215     a->seqs = malloc_or_die(a->size * sizeof(seq_t));
216
217     a->data_size = data_size;
218     a->data_used = 0;
219     a->data = malloc_or_die(data_size);
220
221     return a;
222 }
223
224
225 void seq_array_free(seq_array_t* a)
226 {
227     free(a->seqs);
228     free(a->data);
229     free(a);
230 }
231
232
233 /* Push a fastq entry to back of the array. Return false if there was not enough
234  * space. */
235 bool seq_array_push(seq_array_t* a, const seq_t* seq)
236 {
237     size_t size_needed = (seq->id1.n + 1) + (seq->seq.n + 1) +
238                          (seq->id2.n + 1) + (seq->qual.n + 1);
239
240     if (size_needed > a->data_size - a->data_used) return false;
241
242     if (a->n == a->size) {
243         a->size *= 2;
244         a->seqs = realloc_or_die(a->seqs, a->size * sizeof(seq_t));
245     }
246
247     memcpy(&a->data[a->data_used], seq->id1.s, seq->id1.n + 1);
248     a->seqs[a->n].id1.s = &a->data[a->data_used];
249     a->seqs[a->n].id1.n = seq->id1.n;
250     a->data_used += seq->id1.n + 1;
251
252     memcpy(&a->data[a->data_used], seq->seq.s, seq->seq.n + 1);
253     a->seqs[a->n].seq.s = &a->data[a->data_used];
254     a->seqs[a->n].seq.n = seq->seq.n;
255     a->data_used += seq->seq.n + 1;
256
257     memcpy(&a->data[a->data_used], seq->id2.s, seq->id2.n + 1);
258     a->seqs[a->n].id2.s = &a->data[a->data_used];
259     a->seqs[a->n].id2.n = seq->id2.n;
260     a->data_used += seq->id2.n + 1;
261
262     memcpy(&a->data[a->data_used], seq->qual.s, seq->qual.n + 1);
263     a->seqs[a->n].qual.s = &a->data[a->data_used];
264     a->seqs[a->n].qual.n = seq->qual.n;
265     a->data_used += seq->qual.n + 1;
266
267     ++a->n;
268
269     return true;
270 }
271
272
273 void seq_array_clear(seq_array_t* a)
274 {
275     a->n = 0;
276     a->data_used = 0;
277 }
278
279
280 void seq_array_sort(seq_array_t* a, int (*cmp)(const void*, const void*))
281 {
282     qsort(a->seqs, a->n, sizeof(seq_t), cmp);
283 }
284
285
286 int seq_cmp_hash(const void* a, const void* b)
287 {
288     uint32_t ha = seq_hash((seq_t*) a);
289     uint32_t hb = seq_hash((seq_t*) b);
290
291     if      (ha < hb) return -1;
292     else if (ha > hb) return  1;
293     else              return  0;
294     return 0;
295 }
296
297
298 int seq_cmp_id(const void* a, const void* b)
299 {
300     return strcmp(((seq_t*) a)->id1.s, ((seq_t*) b)->id1.s);
301 }
302
303
304 int seq_cmp_seq(const void* a, const void* b)
305 {
306     return strcmp(((seq_t*) a)->seq.s, ((seq_t*) b)->seq.s);
307 }
308
309
310 void seq_array_dump(seq_dumps_t* d, const seq_array_t* a)
311 {
312     const char* template = "/tmp/fastq_sort.XXXXXXXX";
313     char* fn = malloc_or_die(strlen(template) + 1);
314     memcpy(fn, template, strlen(template) + 1);
315     if (mktemp(fn) == NULL) {
316         fprintf(stderr, "Unable to create a temporary file.\n");
317         exit(EXIT_FAILURE);
318     }
319
320     FILE* f = fopen(fn, "wb");
321     if (f == NULL) {
322         fprintf(stderr, "Unable to open temporary file %s for writing.\n", fn);
323         exit(EXIT_FAILURE);
324     }
325
326     size_t i;
327     for (i = 0; i < a->n; ++i) {
328         fastq_print(f, &a->seqs[i]);
329     }
330
331     if (d->n == d->size) {
332         d->size *= 2;
333         d->fns = realloc_or_die(d->fns, d->size * sizeof(char*));
334     }
335     d->fns[d->n++] = fn;
336
337     fclose(f);
338 }
339
340
341 static const char* prog_name = "fastq-sort";
342
343
344 void print_help()
345 {
346     fprintf(stdout,
347 "fastq-sort [OPTION]... [FILE]...\n"
348 "Concatenate and sort FASTQ files and write to standard output.\n"
349 "Options:\n"
350 "  -I, --id           sort alphabetically by read identifier\n"
351 "  -S, --seq          sort alphabetically by sequence\n"
352 "  -R, --random       randomly shuffle the sequences\n"
353 "  -G, --gc           sort by GC content\n" /* TODO */
354 "  -M, --median-qual  sort by median quality score\n" /* TODO */
355 "  -h, --help         print this message\n"
356 "  -V, --version      output version information and exit\n"
357    );
358 }
359
360
361 /* Parse a size specification, which is just a number with a K, M, G suffix. */
362 size_t parse_size(const char* str)
363 {
364     char* endptr;
365     unsigned long size = strtoul(str, &endptr, 10);
366
367     if      (toupper(*endptr) == 'K') size *= 1000;
368     else if (toupper(*endptr) == 'M') size *= 1000000;
369     else if (toupper(*endptr) == 'G') size *= 1000000000;
370
371     return size;
372 }
373
374
375 int main(int argc, char* argv[])
376 {
377     int opt, opt_idx;
378     size_t buffer_size = 100000000;
379     bool reverse_sort = false;
380     user_cmp = seq_cmp_id;
381
382     static struct option long_options[] =
383     {
384         {"buffer-size", required_argument, NULL, 'S'},
385         {"reverse",     no_argument, NULL, 'r'},
386         {"id",          no_argument, NULL, 'i'},
387         {"seq",         no_argument, NULL, 's'},
388         {"random",      no_argument, NULL, 'R'},
389         {"help",        no_argument, NULL, 'h'},
390         {"version",     no_argument, NULL, 'V'},
391         {0, 0, 0, 0}
392     };
393
394     while (true) {
395         opt = getopt_long(argc, argv, "S:risRhV", long_options, &opt_idx);
396         if (opt == -1) break;
397
398         switch (opt) {
399             case 'S':
400                 buffer_size = parse_size(optarg);
401                 break;
402
403             case 'r':
404                 reverse_sort = true;
405                 break;
406
407             case 'i':
408                 user_cmp = seq_cmp_id;
409                 break;
410
411             case 's':
412                 user_cmp = seq_cmp_seq;
413                 break;
414
415             case 'R':
416                 user_cmp = seq_cmp_hash;
417                 break;
418
419             case 'h':
420                 print_help();
421                 return 0;
422
423             case 'V':
424                 print_version(stdout, prog_name);
425                 return 0;
426
427             case '?':
428                 return 1;
429
430             default:
431                 abort();
432         }
433     }
434
435     cmp = reverse_sort ? rev_cmp : user_cmp;
436
437     seq_array_t* a = seq_array_create(buffer_size);
438     seq_dumps_t* d = seq_dumps_create();
439     seq_t* seq = seq_create();
440
441     fastq_t* f;
442     if (optind >= argc) {
443         f = fastq_create(stdin);
444         while (fastq_read(f, seq)) {
445             if (!seq_array_push(a, seq)) {
446                 seq_array_sort(a, cmp);
447                 seq_array_dump(d, a);
448                 seq_array_clear(a);
449                 if (!seq_array_push(a, seq)) {
450                     fprintf(stderr, "The buffer size is to small.\n");
451                     return EXIT_FAILURE;
452                 }
453             }
454         }
455         fastq_free(f);
456     }
457     else {
458         FILE* file;
459         for (; optind < argc; ++optind) {
460             file = fopen(argv[optind], "rb");
461             if (file == NULL) {
462                 fprintf(stderr, "Cannot open %s for reading.\n", argv[optind]);
463                 return EXIT_FAILURE;
464             }
465             f = fastq_create(file);
466
467             while (fastq_read(f, seq)) {
468                 if (!seq_array_push(a, seq)) {
469                     seq_array_sort(a, cmp);
470                     seq_array_dump(d, a);
471                     seq_array_clear(a);
472                     if (!seq_array_push(a, seq)) {
473                         fprintf(stderr, "The buffer size is to small.\n");
474                         return EXIT_FAILURE;
475                     }
476                 }
477             }
478
479             fastq_free(f);
480             fclose(file);
481         }
482     }
483
484     if (a->n > 0) {
485         seq_array_sort(a, cmp);
486
487         /* We were able to sort everything in memory. */
488         if (d->n == 0) {
489             size_t i;
490             for (i = 0; i < a->n; ++i) {
491                 fastq_print(stdout, &a->seqs[i]);
492             }
493         }
494         else {
495             seq_array_dump(d, a);
496             merge_sort(d, cmp);
497         }
498     }
499
500     seq_dumps_free(d);
501     seq_free(seq);
502     seq_array_free(a);
503
504     return EXIT_SUCCESS;
505 }
506
507