]> git.donarmstrong.com Git - fastq-tools.git/blob - src/fastq-sort.c
e96d9bf5164c9711aa965e05a4a47868929e7ffd
[fastq-tools.git] / src / fastq-sort.c
1 /*
2  * This file is part of fastq-tools.
3  *
4  * Copyright (c) 2012 by Daniel C. Jones <dcjones@cs.washington.edu>
5  *
6  * fastq-sample :
7  * Sample reads with or without replacement from a FASTQ file.
8  *
9  */
10
11 #include <assert.h>
12 #include <getopt.h>
13 #include <string.h>
14 #include <unistd.h>
15
16 #include "common.h"
17 #include "parse.h"
18
19
20 /* User comparison function. */
21 static int (*user_cmp)(const void*, const void*);
22 static int (*cmp)(const void*, const void*);
23
24 int rev_cmp(const void* a, const void* b)
25 {
26     return -user_cmp(a, b);
27 }
28
29
30 /* A collection of filenames of sorted chunks of fastq. */
31 typedef struct seq_dumps_t_
32 {
33     char** fns;
34     size_t n;
35     size_t size;
36 } seq_dumps_t;
37
38
39 seq_dumps_t* seq_dumps_create()
40 {
41     seq_dumps_t* d = malloc_or_die(sizeof(seq_dumps_t));
42     d->n = 0;
43     d->size = 64;
44     d->fns = malloc_or_die(d->size * sizeof(char*));
45     return d;
46 }
47
48
49 void seq_dumps_free(seq_dumps_t* d)
50 {
51     size_t i;
52     for (i = 0; i < d->n; ++i) {
53         if (unlink(d->fns[i]) == -1) {
54             fprintf(stderr, "Warning: unable to remove temporary file %s.\n",
55                     d->fns[i]);
56         }
57         free(d->fns[i]);
58     }
59     free(d->fns);
60     free(d);
61 }
62
63
64 static inline size_t heap_parent(size_t i) { return (i - 1) / 2; }
65 static inline size_t heap_left  (size_t i) { return 2 * i + 1; }
66 static inline size_t heap_right (size_t i) { return 2 * i + 2; }
67
68
69 /* Enqueue an item in the heap.
70  *
71  * Args:
72  *   heap: A binary heap in which stores indexs into seqs.
73  *   n: Fixed maximum size of the heap.
74  *   m: Current size of the heap.
75  *   seqs: Sequences.
76  *   idx: Index to enqueue.
77  *   cmp: Comparison function.
78  *
79  */
80 void heap_push(size_t* heap, size_t n, size_t* m, seq_t** seqs,
81                int (*cmp)(const void*, const void*), size_t idx)
82 {
83     if (*m >= n) return;
84     size_t tmp, j, i = (*m)++;
85     heap[i] = idx;
86
87     /* percolate up */
88     while (i > 0) {
89         j = heap_parent(i);
90         if (cmp(seqs[heap[i]], seqs[heap[j]]) < 0) {
91             tmp = heap[i];
92             heap[i] = heap[j];
93             heap[j] = tmp;
94             i = j;
95         }
96         else break;
97     }
98 }
99
100
101 /* Dequeue an item from a heap. */
102 size_t heap_pop(size_t* heap, size_t* m, seq_t** seqs,
103                int (*cmp)(const void*, const void*))
104 {
105     assert(*m > 0);
106     size_t ans = heap[0];
107     heap[0] = heap[--(*m)];
108     size_t tmp, l, r, j, i = 0;
109     while (true) {
110         l = heap_left(i);
111         r = heap_right(i);
112
113         if (l >= (*m)) break;
114
115         if (r >= (*m)) {
116             j = l;
117         }
118         else {
119             j = cmp(seqs[heap[l]], seqs[heap[r]]) < 0 ? l : r;
120         }
121
122         if (cmp(seqs[heap[i]], seqs[heap[j]]) > 0) {
123             tmp = heap[i];
124             heap[i] = heap[j];
125             heap[j] = tmp;
126             i = j;
127         }
128         else break;
129     }
130
131     return ans;
132 }
133
134
135 /* n-way merge sort to stdout */
136 void merge_sort(const seq_dumps_t* d, int (*cmp)(const void*, const void*))
137 {
138     FILE** files = malloc_or_die(d->n * sizeof(FILE*));
139     size_t i;
140     for (i = 0; i < d->n; ++i) {
141         files[i] = fopen(d->fns[i], "rb");
142         if (files[i] == NULL) {
143             fprintf(stderr, "Cannot open temporary file %s for reading.\n",
144                     d->fns[i]);
145             exit(EXIT_FAILURE);
146         }
147     }
148
149     fastq_t** fs = malloc_or_die(d->n * sizeof(fastq_t*));
150     seq_t** seqs = malloc_or_die(d->n * sizeof(seq_t*));
151     for (i = 0; i < d->n; ++i) {
152         fs[i] = fastq_create(files[i]);
153         seqs[i] = seq_create();
154     }
155
156     /* A binary heap of indexes to fs. We use this to repeatedly pop the
157      * smallest fastq entry. */
158     size_t* heap = malloc_or_die(d->n * sizeof(size_t));
159
160     /* heap size */
161     size_t m = 0;
162
163     for (i = 0; i < d->n; ++i) {
164         if (fastq_read(fs[i], seqs[i])) {
165             heap_push(heap, d->n, &m, seqs, cmp, i);
166         }
167     }
168
169     while (m > 0) {
170         i = heap_pop(heap, &m, seqs, cmp);
171         fastq_print(stdout, seqs[i]);
172         if (fastq_read(fs[i], seqs[i])) {
173             heap_push(heap, d->n, &m, seqs, cmp, i);
174         }
175     }
176
177     for (i = 0; i < d->n; ++i) {
178         seq_free(seqs[i]);
179         fastq_free(fs[i]);
180         fclose(files[i]);
181     }
182
183     free(files);
184     free(fs);
185 }
186
187
188 typedef struct seq_array_t_
189 {
190     seq_t* seqs;
191
192     /* Number of seq objects. */
193     size_t n;
194
195     /* Size reserved in seqs. */
196     size_t size;
197
198     /* Space reserved for strings. */
199     char* data;
200
201     /* Data used. */
202     size_t data_used;
203
204     /* Total data size. */
205     size_t data_size;
206 } seq_array_t;
207
208
209 seq_array_t* seq_array_create(size_t data_size)
210 {
211     seq_array_t* a = malloc_or_die(sizeof(seq_array_t));
212     a->size = 1024;
213     a->n = 0;
214     a->seqs = malloc_or_die(a->size * sizeof(seq_t));
215
216     a->data_size = data_size;
217     a->data_used = 0;
218     a->data = malloc_or_die(data_size);
219
220     return a;
221 }
222
223
224 void seq_array_free(seq_array_t* a)
225 {
226     free(a->seqs);
227     free(a->data);
228     free(a);
229 }
230
231
232 /* Push a fastq entry to back of the array. Return false if there was not enough
233  * space. */
234 bool seq_array_push(seq_array_t* a, const seq_t* seq)
235 {
236     size_t size_needed = (seq->id1.n + 1) + (seq->seq.n + 1) +
237                          (seq->id2.n + 1) + (seq->qual.n + 1);
238
239     if (size_needed > a->data_size - a->data_used) return false;
240
241     if (a->n == a->size) {
242         a->size *= 2;
243         a->seqs = realloc_or_die(a->seqs, a->size * sizeof(seq_t));
244     }
245
246     memcpy(&a->data[a->data_used], seq->id1.s, seq->id1.n + 1);
247     a->seqs[a->n].id1.s = &a->data[a->data_used];
248     a->seqs[a->n].id1.n = seq->id1.n + 1;
249     a->data_used += seq->id1.n + 1;
250
251     memcpy(&a->data[a->data_used], seq->seq.s, seq->seq.n + 1);
252     a->seqs[a->n].seq.s = &a->data[a->data_used];
253     a->seqs[a->n].seq.n = seq->seq.n + 1;
254     a->data_used += seq->seq.n + 1;
255
256     memcpy(&a->data[a->data_used], seq->id2.s, seq->id2.n + 1);
257     a->seqs[a->n].id2.s = &a->data[a->data_used];
258     a->seqs[a->n].id2.n = seq->id2.n + 1;
259     a->data_used += seq->id2.n + 1;
260
261     memcpy(&a->data[a->data_used], seq->qual.s, seq->qual.n + 1);
262     a->seqs[a->n].qual.s = &a->data[a->data_used];
263     a->seqs[a->n].qual.n = seq->qual.n + 1;
264     a->data_used += seq->qual.n + 1;
265
266     ++a->n;
267
268     return true;
269 }
270
271
272 void seq_array_clear(seq_array_t* a)
273 {
274     a->n = 0;
275     a->data_used = 0;
276 }
277
278
279 void seq_array_sort(seq_array_t* a, int (*cmp)(const void*, const void*))
280 {
281     qsort(a->seqs, a->n, sizeof(seq_t), cmp);
282 }
283
284
285 int seq_cmp_hash(const void* a, const void* b)
286 {
287     uint32_t ha = seq_hash((seq_t*) a);
288     uint32_t hb = seq_hash((seq_t*) b);
289
290     if      (ha < hb) return -1;
291     else if (ha > hb) return  1;
292     else              return  0;
293     return 0;
294 }
295
296
297 int seq_cmp_id(const void* a, const void* b)
298 {
299     return strcmp(((seq_t*) a)->id1.s, ((seq_t*) b)->id1.s);
300 }
301
302
303 int seq_cmp_seq(const void* a, const void* b)
304 {
305     return strcmp(((seq_t*) a)->seq.s, ((seq_t*) b)->seq.s);
306 }
307
308
309 void seq_array_dump(seq_dumps_t* d, const seq_array_t* a)
310 {
311     const char* template = "/tmp/fastq_sort.XXXXXXXX";
312     char* fn = malloc_or_die(strlen(template) + 1);
313     memcpy(fn, template, strlen(template) + 1);
314     if (mktemp(fn) == NULL) {
315         fprintf(stderr, "Unable to create a temporary file.\n");
316         exit(EXIT_FAILURE);
317     }
318
319     FILE* f = fopen(fn, "wb");
320     if (f == NULL) {
321         fprintf(stderr, "Unable to open temporary file %s for writing.\n", fn);
322         exit(EXIT_FAILURE);
323     }
324
325     size_t i;
326     for (i = 0; i < a->n; ++i) {
327         fastq_print(f, &a->seqs[i]);
328     }
329
330     if (d->n == d->size) {
331         d->size *= 2;
332         d->fns = realloc_or_die(d->fns, d->size * sizeof(char*));
333     }
334     d->fns[d->n++] = fn;
335
336     fclose(f);
337 }
338
339
340 static const char* prog_name = "fastq-sort";
341
342
343 void print_help()
344 {
345     fprintf(stdout,
346 "fastq-sort [OPTION]... [FILE]...\n"
347 "Concatenate and sort FASTQ files and write to standard output.\n"
348 "Options:\n"
349 "  -I, --id           sort alphabetically by read identifier\n"
350 "  -S, --seq          sort alphabetically by sequence\n"
351 "  -R, --random       randomly shuffle the sequences\n"
352 "  -G, --gc           sort by GC content\n" /* TODO */
353 "  -M, --median-qual  sort by median quality score\n" /* TODO */
354 "  -h, --help         print this message\n"
355 "  -V, --version      output version information and exit\n"
356    );
357 }
358
359
360 int main(int argc, char* argv[])
361 {
362     int opt, opt_idx;
363     size_t buffer_size = 100000000;
364     bool reverse_sort = false;
365     user_cmp = seq_cmp_id;
366
367     static struct option long_options[] =
368     {
369         {"reverse", no_argument, NULL, 'r'},
370         {"id",      no_argument, NULL, 'I'},
371         {"seq",     no_argument, NULL, 'S'},
372         {"random",  no_argument, NULL, 'R'},
373         {"help",    no_argument, NULL, 'h'},
374         {"version", no_argument, NULL, 'V'},
375         {0, 0, 0, 0}
376     };
377
378     while (true) {
379         opt = getopt_long(argc, argv, "rISRhV", long_options, &opt_idx);
380         if (opt == -1) break;
381
382         switch (opt) {
383             case 'r':
384                 reverse_sort = true;
385                 break;
386
387             case 'I':
388                 user_cmp = seq_cmp_id;
389                 break;
390
391             case 'S':
392                 user_cmp = seq_cmp_seq;
393                 break;
394
395             case 'R':
396                 user_cmp = seq_cmp_hash;
397                 break;
398
399             case 'h':
400                 print_help();
401                 return 0;
402
403             case 'V':
404                 print_version(stdout, prog_name);
405                 return 0;
406
407             case '?':
408                 return 1;
409
410             default:
411                 abort();
412         }
413     }
414
415     cmp = reverse_sort ? rev_cmp : user_cmp;
416
417     seq_array_t* a = seq_array_create(buffer_size);
418     seq_dumps_t* d = seq_dumps_create();
419     seq_t* seq = seq_create();
420
421     fastq_t* f;
422     if (optind >= argc) {
423         f = fastq_create(stdin);
424         while (fastq_read(f, seq)) {
425             if (!seq_array_push(a, seq)) {
426                 seq_array_sort(a, cmp);
427                 seq_array_dump(d, a);
428                 seq_array_clear(a);
429                 if (!seq_array_push(a, seq)) {
430                     fprintf(stderr, "The buffer size is to small.\n");
431                     return EXIT_FAILURE;
432                 }
433             }
434         }
435         fastq_free(f);
436     }
437     else {
438         FILE* file;
439         for (; optind < argc; ++optind) {
440             file = fopen(argv[optind], "rb");
441             if (file == NULL) {
442                 fprintf(stderr, "Cannot open %s for reading.\n", argv[optind]);
443                 return EXIT_FAILURE;
444             }
445             f = fastq_create(file);
446
447             while (fastq_read(f, seq)) {
448                 if (!seq_array_push(a, seq)) {
449                     seq_array_sort(a, cmp);
450                     seq_array_dump(d, a);
451                     seq_array_clear(a);
452                     if (!seq_array_push(a, seq)) {
453                         fprintf(stderr, "The buffer size is to small.\n");
454                         return EXIT_FAILURE;
455                     }
456                 }
457             }
458
459             fastq_free(f);
460             fclose(file);
461         }
462     }
463
464     if (a->n > 0) {
465         seq_array_sort(a, cmp);
466
467         /* We were able to sort everything in memory. */
468         if (d->n == 0) {
469             size_t i;
470             for (i = 0; i < a->n; ++i) {
471                 fastq_print(stdout, &a->seqs[i]);
472             }
473         }
474         else {
475             seq_array_dump(d, a);
476             merge_sort(d, cmp);
477         }
478     }
479
480     seq_dumps_free(d);
481     seq_free(seq);
482     seq_array_free(a);
483
484     return EXIT_SUCCESS;
485 }
486
487