return ret;
}
+/***************
+ * BAM sorting *
+ ***************/
+
+#include <pthread.h>
+
typedef bam1_t *bam1_p;
static inline int bam1_lt(const bam1_p a, const bam1_p b)
}
KSORT_INIT(sort, bam1_p, bam1_lt)
-static void sort_blocks(int n, int k, bam1_p *buf, const char *prefix, const bam_header_t *h, int is_stdout)
+typedef struct {
+ size_t buf_len;
+ const char *prefix;
+ bam1_p *buf;
+ const bam_header_t *h;
+ int index;
+} worker_t;
+
+static void write_buffer(const char *fn, const char *mode, size_t l, bam1_p *buf, const bam_header_t *h, int n_threads)
{
- char *name, mode[3];
- int i;
+ size_t i;
bamFile fp;
- ks_mergesort(sort, k, buf, 0);
- name = (char*)calloc(strlen(prefix) + 20, 1);
- if (n >= 0) {
- sprintf(name, "%s.%.4d.bam", prefix, n);
- strcpy(mode, "w1");
- } else {
- sprintf(name, "%s.bam", prefix);
- strcpy(mode, "w");
- }
- fp = is_stdout? bam_dopen(fileno(stdout), mode) : bam_open(name, mode);
- if (fp == 0) {
- fprintf(stderr, "[sort_blocks] fail to create file %s.\n", name);
- free(name);
- // FIXME: possible memory leak
- return;
- }
- free(name);
+ fp = strcmp(fn, "-")? bam_open(fn, mode) : bam_dopen(fileno(stdout), mode);
+ if (fp == 0) return;
bam_header_write(fp, h);
- for (i = 0; i < k; ++i)
+ if (n_threads > 1) bgzf_mt(fp, n_threads, 256);
+ for (i = 0; i < l; ++i)
bam_write1_core(fp, &buf[i]->core, buf[i]->data_len, buf[i]->data);
bam_close(fp);
}
+static void *worker(void *data)
+{
+ worker_t *w = (worker_t*)data;
+ char *name;
+ ks_mergesort(sort, w->buf_len, w->buf, 0);
+ name = (char*)calloc(strlen(w->prefix) + 20, 1);
+ sprintf(name, "%s.%.4d.bam", w->prefix, w->index);
+ write_buffer(name, "w1", w->buf_len, w->buf, w->h, 0);
+ free(name);
+ return 0;
+}
+
+static int sort_blocks(int n_files, size_t k, bam1_p *buf, const char *prefix, const bam_header_t *h, int n_threads)
+{
+ int i;
+ size_t rest;
+ bam1_p *b;
+ pthread_t *tid;
+ pthread_attr_t attr;
+ worker_t *w;
+
+ if (n_threads < 1) n_threads = 1;
+ if (k < n_threads * 64) n_threads = 1; // use a single thread if we only sort a small batch of records
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ w = calloc(n_threads, sizeof(worker_t));
+ tid = calloc(n_threads, sizeof(pthread_t));
+ b = buf; rest = k;
+ for (i = 0; i < n_threads; ++i) {
+ w[i].buf_len = rest / (n_threads - i);
+ w[i].buf = b;
+ w[i].prefix = prefix;
+ w[i].h = h;
+ w[i].index = n_files + i;
+ b += w[i].buf_len; rest -= w[i].buf_len;
+ pthread_create(&tid[i], &attr, worker, &w[i]);
+ }
+ for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
+ free(tid); free(w);
+ return n_files + n_threads;
+}
+
/*!
@abstract Sort an unsorted BAM file based on the chromosome order
and the leftmost position of an alignment
and then merge them by calling bam_merge_core(). This function is
NOT thread safe.
*/
-void bam_sort_core_ext(int is_by_qname, const char *fn, const char *prefix, size_t max_mem, int is_stdout)
+void bam_sort_core_ext(int is_by_qname, const char *fn, const char *prefix, size_t _max_mem, int is_stdout, int n_threads)
{
- int n, ret, k, i;
- size_t mem;
+ int ret, i, extra_mem, n_files = 0;
+ size_t mem, max_k, k, max_mem;
bam_header_t *header;
bamFile fp;
bam1_t *b, **buf;
+ char *fnout = 0;
+ if (n_threads < 2) n_threads = 1;
+ extra_mem = sizeof(void*) + sizeof(void*) + (sizeof(bam1_t) - sizeof(bam1_core_t));
g_is_by_qname = is_by_qname;
- n = k = 0; mem = 0;
+ max_k = k = 0; mem = 0;
+ max_mem = _max_mem * n_threads;
+ buf = 0;
fp = strcmp(fn, "-")? bam_open(fn, "r") : bam_dopen(fileno(stdin), "r");
if (fp == 0) {
fprintf(stderr, "[bam_sort_core] fail to open file %s\n", fn);
return;
}
header = bam_header_read(fp);
- buf = (bam1_t**)calloc(max_mem / BAM_CORE_SIZE, sizeof(bam1_t*));
// write sub files
for (;;) {
+ if (k == max_k) {
+ size_t old_max = max_k;
+ max_k = max_k? max_k<<1 : 0x10000;
+ buf = realloc(buf, max_k * sizeof(void*));
+ memset(buf + old_max, 0, sizeof(void*) * (max_k - old_max));
+ }
if (buf[k] == 0) buf[k] = (bam1_t*)calloc(1, sizeof(bam1_t));
b = buf[k];
if ((ret = bam_read1(fp, b)) < 0) break;
- mem += ret;
+ mem += ret + extra_mem;
++k;
if (mem >= max_mem) {
- sort_blocks(n++, k, buf, prefix, header, 0);
- mem = 0; k = 0;
+ n_files = sort_blocks(n_files, k, buf, prefix, header, n_threads);
+ mem = k = 0;
}
}
if (ret != -1)
fprintf(stderr, "[bam_sort_core] truncated file. Continue anyway.\n");
- if (n == 0) sort_blocks(-1, k, buf, prefix, header, is_stdout);
- else { // then merge
- char **fns, *fnout;
- fprintf(stderr, "[bam_sort_core] merging from %d files...\n", n+1);
- sort_blocks(n++, k, buf, prefix, header, 0);
- fnout = (char*)calloc(strlen(prefix) + 20, 1);
- if (is_stdout) sprintf(fnout, "-");
- else sprintf(fnout, "%s.bam", prefix);
- fns = (char**)calloc(n, sizeof(char*));
- for (i = 0; i < n; ++i) {
+ // output file name
+ fnout = calloc(strlen(prefix) + 20, 1);
+ if (is_stdout) sprintf(fnout, "-");
+ else sprintf(fnout, "%s.bam", prefix);
+ // write the final output
+ if (n_files == 0) { // a single block
+ ks_mergesort(sort, k, buf, 0);
+ write_buffer(fnout, "w", k, buf, header, n_threads);
+ } else { // then merge
+ char **fns;
+ n_files = sort_blocks(n_files, k, buf, prefix, header, n_threads);
+ fprintf(stderr, "[bam_sort_core] merging from %d files...\n", n_files);
+ fns = (char**)calloc(n_files, sizeof(char*));
+ for (i = 0; i < n_files; ++i) {
fns[i] = (char*)calloc(strlen(prefix) + 20, 1);
sprintf(fns[i], "%s.%.4d.bam", prefix, i);
}
- bam_merge_core(is_by_qname, fnout, 0, n, fns, 0, 0);
- free(fnout);
- for (i = 0; i < n; ++i) {
+ bam_merge_core2(is_by_qname, fnout, 0, n_files, fns, 0, 0, n_threads);
+ for (i = 0; i < n_files; ++i) {
unlink(fns[i]);
free(fns[i]);
}
free(fns);
}
- for (k = 0; k < max_mem / BAM_CORE_SIZE; ++k) {
- if (buf[k]) {
- free(buf[k]->data);
- free(buf[k]);
- }
+ free(fnout);
+ // free
+ for (k = 0; k < max_k; ++k) {
+ if (!buf[k]) continue;
+ free(buf[k]->data);
+ free(buf[k]);
}
free(buf);
bam_header_destroy(header);
void bam_sort_core(int is_by_qname, const char *fn, const char *prefix, size_t max_mem)
{
- bam_sort_core_ext(is_by_qname, fn, prefix, max_mem, 0);
-}
-
-
-size_t bam_sort_get_max_mem(char *max_mem_string)
-{
- char c;
- size_t max_mem;
- size_t multiplier=1;
- c=max_mem_string[strlen(max_mem_string)-1];
- switch(c) {
- case 'G':
- multiplier*=1024;
- case 'M':
- multiplier*=1024;
- case 'K':
- multiplier*=1024;
- case 'B':
- max_mem_string[strlen(max_mem_string)-1]='\0';
- break;
- default:
- break;
- }
- max_mem = multiplier * atol(max_mem_string);
- // max_mem should be checked that it was not zero after atol!
- return max_mem;
+ bam_sort_core_ext(is_by_qname, fn, prefix, max_mem, 0, 0);
}
int bam_sort(int argc, char *argv[])
{
- size_t max_mem = 500000000;
- int c, is_by_qname = 0, is_stdout = 0;
- while ((c = getopt(argc, argv, "nom:")) >= 0) {
+ size_t max_mem = 1<<29; // 512MB
+ int c, is_by_qname = 0, is_stdout = 0, n_threads = 0;
+ while ((c = getopt(argc, argv, "nom:@:")) >= 0) {
switch (c) {
case 'o': is_stdout = 1; break;
case 'n': is_by_qname = 1; break;
- case 'm': max_mem = bam_sort_get_max_mem(optarg); break;
+ case 'm': {
+ char *q;
+ max_mem = strtol(optarg, &q, 0);
+ if (*q == 'k' || *q == 'K') max_mem <<= 10;
+ else if (*q == 'm' || *q == 'M') max_mem <<= 20;
+ else if (*q == 'g' || *q == 'G') max_mem <<= 30;
+ break;
+ }
+ case '@': n_threads = atoi(optarg); break;
}
}
if (optind + 2 > argc) {
- fprintf(stderr, "Usage: samtools sort [-on] [-m <maxMem>] <in.bam> <out.prefix>\n");
+ fprintf(stderr, "Usage: samtools sort [-on] [-m maxMem=1G] <in.bam> <out.prefix>\n");
return 1;
}
- bam_sort_core_ext(is_by_qname, argv[optind], argv[optind+1], max_mem, is_stdout);
+ bam_sort_core_ext(is_by_qname, argv[optind], argv[optind+1], max_mem, is_stdout, n_threads);
return 0;
}