From 88680e52e8c58cd20f29d0de15dceefe15e81c8f Mon Sep 17 00:00:00 2001 From: Daniel Jones Date: Tue, 9 Oct 2012 15:20:04 -0700 Subject: [PATCH] An essentially functional fastq-sort program. --- src/fastq-grep.c | 2 +- src/fastq-sort.c | 282 ++++++++++++++++++++++++++++++++++++++++++----- src/parse.c | 15 ++- 3 files changed, 264 insertions(+), 35 deletions(-) diff --git a/src/fastq-grep.c b/src/fastq-grep.c index b09b2bf..4a5c412 100644 --- a/src/fastq-grep.c +++ b/src/fastq-grep.c @@ -113,7 +113,7 @@ int main(int argc, char* argv[]) FILE* mismatch_file = NULL; static struct option long_options[] = - { + { {"id", no_argument, &id_flag, 1}, {"invert-match", no_argument, &invert_flag, 1}, {"mismatches", required_argument, NULL, 'm'}, diff --git a/src/fastq-sort.c b/src/fastq-sort.c index 522c0ce..171b46a 100644 --- a/src/fastq-sort.c +++ b/src/fastq-sort.c @@ -11,11 +11,170 @@ #include #include #include +#include #include "common.h" #include "parse.h" +/* A collection of filenames of sorted chunks of fastq. */ +typedef struct seq_dumps_t_ +{ + char** fns; + size_t n; + size_t size; +} seq_dumps_t; + + +seq_dumps_t* seq_dumps_create() +{ + seq_dumps_t* d = malloc_or_die(sizeof(seq_dumps_t)); + d->n = 0; + d->size = 64; + d->fns = malloc_or_die(d->size * sizeof(char*)); + return d; +} + + +void seq_dumps_free(seq_dumps_t* d) +{ + size_t i; + for (i = 0; i < d->n; ++i) { + if (unlink(d->fns[i]) == -1) { + fprintf(stderr, "Warning: unable to remove temporary file %s.\n", + d->fns[i]); + } + free(d->fns[i]); + } + free(d->fns); + free(d); +} + + +static inline size_t heap_parent(size_t i) { return (i - 1) / 2; } +static inline size_t heap_left (size_t i) { return 2 * i + 1; } +static inline size_t heap_right (size_t i) { return 2 * i + 2; } + + +/* Enqueue an item in the heap. + * + * Args: + * heap: A binary heap in which stores indexs into seqs. + * n: Fixed maximum size of the heap. + * m: Current size of the heap. + * seqs: Sequences. + * idx: Index to enqueue. + * cmp: Comparison function. + * + */ +void heap_push(size_t* heap, size_t n, size_t* m, seq_t** seqs, + int (*cmp)(const void*, const void*), size_t idx) +{ + if (*m >= n) return; + size_t tmp, j, i = (*m)++; + heap[i] = idx; + + /* percolate up */ + while (i > 0) { + j = heap_parent(i); + if (cmp(seqs[heap[i]], seqs[heap[j]]) < 0) { + tmp = heap[i]; + heap[i] = heap[j]; + heap[j] = tmp; + i = j; + } + else break; + } +} + + +/* Dequeue an item from a heap. */ +size_t heap_pop(size_t* heap, size_t* m, seq_t** seqs, + int (*cmp)(const void*, const void*)) +{ + assert(*m > 0); + size_t ans = heap[0]; + heap[0] = heap[--(*m)]; + size_t tmp, l, r, j, i = 0; + while (true) { + l = heap_left(i); + r = heap_right(i); + + if (l >= (*m)) break; + + if (r >= (*m)) { + j = l; + } + else { + j = cmp(seqs[heap[l]], seqs[heap[r]]) < 0 ? l : r; + } + + if (cmp(seqs[heap[i]], seqs[heap[j]]) > 0) { + tmp = heap[i]; + heap[i] = heap[j]; + heap[j] = tmp; + i = j; + } + else break; + } + + return ans; +} + + +/* n-way merge sort to stdout */ +void merge_sort(const seq_dumps_t* d, int (*cmp)(const void*, const void*)) +{ + FILE** files = malloc_or_die(d->n * sizeof(FILE*)); + size_t i; + for (i = 0; i < d->n; ++i) { + files[i] = fopen(d->fns[i], "rb"); + if (files[i] == NULL) { + fprintf(stderr, "Cannot open temporary file %s for reading.\n", + d->fns[i]); + exit(EXIT_FAILURE); + } + } + + fastq_t** fs = malloc_or_die(d->n * sizeof(fastq_t*)); + seq_t** seqs = malloc_or_die(d->n * sizeof(seq_t*)); + for (i = 0; i < d->n; ++i) { + fs[i] = fastq_create(files[i]); + seqs[i] = seq_create(); + } + + /* A binary heap of indexes to fs. We use this to repeatedly pop the + * smallest fastq entry. */ + size_t* heap = malloc_or_die(d->n * sizeof(size_t)); + + /* heap size */ + size_t m = 0; + + for (i = 0; i < d->n; ++i) { + if (fastq_read(fs[i], seqs[i])) { + heap_push(heap, d->n, &m, seqs, cmp, i); + } + } + + while (m > 0) { + i = heap_pop(heap, &m, seqs, cmp); + fastq_print(stdout, seqs[i]); + if (fastq_read(fs[i], seqs[i])) { + heap_push(heap, d->n, &m, seqs, cmp, i); + } + } + + for (i = 0; i < d->n; ++i) { + seq_free(seqs[i]); + fastq_free(fs[i]); + fclose(files[i]); + } + + free(files); + free(fs); +} + + typedef struct seq_array_t_ { seq_t* seqs; @@ -42,7 +201,7 @@ seq_array_t* seq_array_create(size_t data_size) seq_array_t* a = malloc_or_die(sizeof(seq_array_t)); a->size = 1024; a->n = 0; - a->seqs = malloc_or_die(sizeof(seq_t)); + a->seqs = malloc_or_die(a->size * sizeof(seq_t)); a->data_size = data_size; a->data_used = 0; @@ -74,22 +233,22 @@ bool seq_array_push(seq_array_t* a, const seq_t* seq) a->seqs = realloc_or_die(a->seqs, a->size * sizeof(seq_t)); } - memcpy(seq->id1.s, &a->data[a->data_used], seq->id1.n + 1); + memcpy(&a->data[a->data_used], seq->id1.s, seq->id1.n + 1); a->seqs[a->n].id1.s = &a->data[a->data_used]; a->seqs[a->n].id1.n = seq->id1.n + 1; a->data_used += seq->id1.n + 1; - memcpy(seq->seq.s, &a->data[a->data_used], seq->seq.n + 1); + memcpy(&a->data[a->data_used], seq->seq.s, seq->seq.n + 1); a->seqs[a->n].seq.s = &a->data[a->data_used]; a->seqs[a->n].seq.n = seq->seq.n + 1; a->data_used += seq->seq.n + 1; - memcpy(seq->id2.s, &a->data[a->data_used], seq->id2.n + 1); + memcpy(&a->data[a->data_used], seq->id2.s, seq->id2.n + 1); a->seqs[a->n].id2.s = &a->data[a->data_used]; a->seqs[a->n].id2.n = seq->id2.n + 1; a->data_used += seq->id2.n + 1; - memcpy(seq->qual.s, &a->data[a->data_used], seq->qual.n + 1); + memcpy(&a->data[a->data_used], seq->qual.s, seq->qual.n + 1); a->seqs[a->n].qual.s = &a->data[a->data_used]; a->seqs[a->n].qual.n = seq->qual.n + 1; a->data_used += seq->qual.n + 1; @@ -113,15 +272,61 @@ void seq_array_sort(seq_array_t* a, int (*cmp)(const void*, const void*)) } -int seq_cmp_hash(const void* a_, const void* b_) +int seq_cmp_hash(const void* a, const void* b) { - const seq_t* a = (seq_t*) a_; - const seq_t* b = (seq_t*) b_; - /* TODO: hash and compare. */ + uint32_t ha = seq_hash((seq_t*) a); + uint32_t hb = seq_hash((seq_t*) b); + + if (ha < hb) return -1; + else if (ha > hb) return 1; + else return 0; return 0; } +int seq_cmp_id(const void* a, const void* b) +{ + return strcmp(((seq_t*) a)->id1.s, ((seq_t*) b)->id1.s); +} + + +int seq_cmp_seq(const void* a, const void* b) +{ + return strcmp(((seq_t*) a)->seq.s, ((seq_t*) b)->seq.s); +} + + +void seq_array_dump(seq_dumps_t* d, const seq_array_t* a) +{ + const char* template = "/tmp/fastq_sort.XXXXXXXX"; + char* fn = malloc_or_die(strlen(template) + 1); + memcpy(fn, template, strlen(template) + 1); + if (mktemp(fn) == NULL) { + fprintf(stderr, "Unable to create a temporary file.\n"); + exit(EXIT_FAILURE); + } + + FILE* f = fopen(fn, "wb"); + if (f == NULL) { + fprintf(stderr, "Unable to open temporary file %s for writing.\n", fn); + exit(EXIT_FAILURE); + } + + size_t i; + for (i = 0; i < a->n; ++i) { + fastq_print(f, &a->seqs[i]); + } + + if (d->n == d->size) { + d->size *= 2; + d->fns = realloc_or_die(d->fns, d->size * sizeof(char*)); + } + d->fns[d->n++] = fn; + + fclose(f); +} + + static const char* prog_name = "fastq-sort"; @@ -131,13 +336,17 @@ void print_help() "fastq-sort [OPTION]... [FILE]...\n" "Concatenate and sort FASTQ files and write to standard output.\n" "Options:\n" -" -h, --help print this message\n" -" -V, --version output version information and exit\n" +" -I, --id sort alphabetically by read identifier\n" +" -S, --seq sort alphabetically by sequence\n" +" -R, --random randomly shuffle the sequences\n" +" -G, --gc sort by GC content\n" /* TODO */ +" -M, --median-qual sort by median quality score\n" /* TODO */ +" -h, --help print this message\n" +" -V, --version output version information and exit\n" ); } - int main(int argc, char* argv[]) { int opt, opt_idx; @@ -146,6 +355,9 @@ int main(int argc, char* argv[]) static struct option long_options[] = { + {"id", no_argument, NULL, 'I'}, + {"seq", no_argument, NULL, 'S'}, + {"random", no_argument, NULL, 'R'}, {"help", no_argument, NULL, 'h'}, {"version", no_argument, NULL, 'V'}, {0, 0, 0, 0} @@ -156,6 +368,18 @@ int main(int argc, char* argv[]) if (opt == -1) break; switch (opt) { + case 'I': + cmp = seq_cmp_id; + break; + + case 'S': + cmp = seq_cmp_seq; + break; + + case 'R': + cmp = seq_cmp_hash; + break; + case 'h': print_help(); return 0; @@ -173,6 +397,7 @@ int main(int argc, char* argv[]) } seq_array_t* a = seq_array_create(buffer_size); + seq_dumps_t* d = seq_dumps_create(); seq_t* seq = seq_create(); fastq_t* f; @@ -181,13 +406,9 @@ int main(int argc, char* argv[]) while (fastq_read(f, seq)) { if (!seq_array_push(a, seq)) { seq_array_sort(a, cmp); - - /* TODO: dump a to a temporary file. Push that file name to an - * array somewhere. - * */ - + seq_array_dump(d, a); seq_array_clear(a); - if (seq_array_push(a, seq)) { + if (!seq_array_push(a, seq)) { fprintf(stderr, "The buffer size is to small.\n"); return EXIT_FAILURE; } @@ -208,12 +429,9 @@ int main(int argc, char* argv[]) while (fastq_read(f, seq)) { if (!seq_array_push(a, seq)) { seq_array_sort(a, cmp); - - /* TODO: dump a to a temporary file. Push that file name to - * an array somewhere. */ - + seq_array_dump(d, a); seq_array_clear(a); - if (seq_array_push(a, seq)) { + if (!seq_array_push(a, seq)) { fprintf(stderr, "The buffer size is to small.\n"); return EXIT_FAILURE; } @@ -228,14 +446,20 @@ int main(int argc, char* argv[]) if (a->n > 0) { seq_array_sort(a, cmp); - /* TODO: special case if everything fit into a. Just dump it to output. - */ - - /* TODO: dump to a temp file, push file name to the stack. */ + /* We were able to sort everything in memory. */ + if (d->n == 0) { + size_t i; + for (i = 0; i < a->n; ++i) { + fastq_print(stdout, &a->seqs[i]); + } + } + else { + seq_array_dump(d, a); + merge_sort(d, cmp); + } } - /* TODO: Merge sort on the external files. */ - + seq_dumps_free(d); seq_free(seq); seq_array_free(a); diff --git a/src/parse.c b/src/parse.c index fd13921..560578f 100644 --- a/src/parse.c +++ b/src/parse.c @@ -11,7 +11,7 @@ static void str_init(str_t* str) { str->n = 0; str->size = 128; - str->s = malloc_or_die(str->size = 0); + str->s = malloc_or_die(str->size); str->s[0] = '\0'; } @@ -87,12 +87,12 @@ static inline uint32_t rotl32(uint32_t x, int8_t r) } -uint32_t murmurhash3(const uint8_t* data, size_t len_) +uint32_t murmurhash3(uint32_t seed, const uint8_t* data, size_t len_) { const int len = (int) len_; const int nblocks = len / 4; - uint32_t h1 = 0xc062fb4a; + uint32_t h1 = seed; uint32_t c1 = 0xcc9e2d51; uint32_t c2 = 0x1b873593; @@ -144,10 +144,15 @@ uint32_t murmurhash3(const uint8_t* data, size_t len_) uint32_t seq_hash(const seq_t* seq) { - /* TODO */ - return 0; + uint32_t h = 0xc062fb4a; /* random seed */ + h = murmurhash3(h, (uint8_t*) seq->id1.s, seq->id1.n); + h = murmurhash3(h, (uint8_t*) seq->seq.s, seq->seq.n); + h = murmurhash3(h, (uint8_t*) seq->id2.s, seq->id2.n); + h = murmurhash3(h, (uint8_t*) seq->qual.s, seq->qual.n); + return h; } + static const size_t parser_buf_size = 1000000; -- 2.39.2