]> git.donarmstrong.com Git - bamtools.git/blob - src/api/internal/BgzfStream_p.cpp
Merge branches 'master' and 'iodevice' into iodevice
[bamtools.git] / src / api / internal / BgzfStream_p.cpp
1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 9 September 2011(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
11
12 #include <api/internal/BamDeviceFactory_p.h>
13 #include <api/internal/BgzfStream_p.h>
14 using namespace BamTools;
15 using namespace BamTools::Internal;
16
17 #include <cstring>
18 #include <algorithm>
19 #include <iostream>
20 using namespace std;
21
22 // constructor
23 BgzfStream::BgzfStream(void)
24     : m_uncompressedBlockSize(Constants::BGZF_DEFAULT_BLOCK_SIZE)
25     , m_compressedBlockSize(Constants::BGZF_MAX_BLOCK_SIZE)
26     , m_blockLength(0)
27     , m_blockOffset(0)
28     , m_blockAddress(0)
29     , m_uncompressedBlock(NULL)
30     , m_compressedBlock(NULL)
31     , m_isOpen(false)
32     , m_isWriteOnly(false)
33     , m_isWriteCompressed(true)
34     , m_device(0)
35     , m_stream(NULL)
36 {
37     try {
38         m_compressedBlock   = new char[m_compressedBlockSize];
39         m_uncompressedBlock = new char[m_uncompressedBlockSize];
40     } catch( std::bad_alloc& ba ) {
41         fprintf(stderr, "BgzfStream ERROR: unable to allocate memory\n");
42         exit(1);
43     }
44 }
45
46 // destructor
47 BgzfStream::~BgzfStream(void) {
48     if( m_compressedBlock   ) delete[] m_compressedBlock;
49     if( m_uncompressedBlock ) delete[] m_uncompressedBlock;
50 }
51
52 // closes BGZF file
53 void BgzfStream::Close(void) {
54
55     // skip if no device open
56     if ( m_device == 0 )
57         return;
58
59     // if writing to file, flush the current BGZF block,
60     // then write an empty block (as EOF marker)
61     if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
62         FlushBlock();
63         int blockLength = DeflateBlock();
64         m_device->Write(m_compressedBlock, blockLength);
65     }
66
67     // close device
68     m_device->Close();
69
70     // clean up & reset flags
71     delete m_device;
72     m_device = 0;
73     m_isWriteCompressed = true;
74     m_isOpen = false;
75 }
76
77 // compresses the current block
78 unsigned int BgzfStream::DeflateBlock(void) {
79
80     // initialize the gzip header
81     char* buffer = m_compressedBlock;
82     memset(buffer, 0, 18);
83     buffer[0]  = Constants::GZIP_ID1;
84     buffer[1]  = (char)Constants::GZIP_ID2;
85     buffer[2]  = Constants::CM_DEFLATE;
86     buffer[3]  = Constants::FLG_FEXTRA;
87     buffer[9]  = (char)Constants::OS_UNKNOWN;
88     buffer[10] = Constants::BGZF_XLEN;
89     buffer[12] = Constants::BGZF_ID1;
90     buffer[13] = Constants::BGZF_ID2;
91     buffer[14] = Constants::BGZF_LEN;
92
93     // set compression level
94     const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
95
96     // loop to retry for blocks that do not compress enough
97     int inputLength = m_blockOffset;
98     unsigned int compressedLength = 0;
99     unsigned int bufferSize = m_compressedBlockSize;
100
101     while ( true ) {
102
103         // initialize zstream values
104         z_stream zs;
105         zs.zalloc    = NULL;
106         zs.zfree     = NULL;
107         zs.next_in   = (Bytef*)m_uncompressedBlock;
108         zs.avail_in  = inputLength;
109         zs.next_out  = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
110         zs.avail_out = bufferSize - Constants::BGZF_BLOCK_HEADER_LENGTH - Constants::BGZF_BLOCK_FOOTER_LENGTH;
111
112         // initialize the zlib compression algorithm
113         if ( deflateInit2(&zs,
114                           compressionLevel,
115                           Z_DEFLATED,
116                           Constants::GZIP_WINDOW_BITS,
117                           Constants::Z_DEFAULT_MEM_LEVEL,
118                           Z_DEFAULT_STRATEGY) != Z_OK )
119         {
120             fprintf(stderr, "BgzfStream ERROR: zlib deflate initialization failed\n");
121             exit(1);
122         }
123
124         // compress the data
125         int status = deflate(&zs, Z_FINISH);
126         if ( status != Z_STREAM_END ) {
127
128             deflateEnd(&zs);
129
130             // reduce the input length and try again
131             if ( status == Z_OK ) {
132                 inputLength -= 1024;
133                 if ( inputLength < 0 ) {
134                     fprintf(stderr, "BgzfStream ERROR: input reduction failed\n");
135                     exit(1);
136                 }
137                 continue;
138             }
139
140             fprintf(stderr, "BgzfStream ERROR: zlib::deflateEnd() failed\n");
141             exit(1);
142         }
143
144         // finalize the compression routine
145         if ( deflateEnd(&zs) != Z_OK ) {
146             fprintf(stderr, "BgzfStream ERROR: zlib::deflateEnd() failed\n");
147             exit(1);
148         }
149
150         compressedLength = zs.total_out;
151         compressedLength += Constants::BGZF_BLOCK_HEADER_LENGTH + Constants::BGZF_BLOCK_FOOTER_LENGTH;
152         if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE ) {
153             fprintf(stderr, "BgzfStream ERROR: deflate overflow\n");
154             exit(1);
155         }
156
157         break;
158     }
159
160     // store the compressed length
161     BamTools::PackUnsignedShort(&buffer[16], (unsigned short)(compressedLength - 1));
162
163     // store the CRC32 checksum
164     unsigned int crc = crc32(0, NULL, 0);
165     crc = crc32(crc, (Bytef*)m_uncompressedBlock, inputLength);
166     BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
167     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
168
169     // ensure that we have less than a block of data left
170     int remaining = m_blockOffset - inputLength;
171     if ( remaining > 0 ) {
172         if ( remaining > inputLength ) {
173             fprintf(stderr, "BgzfStream ERROR: after deflate, remainder too large\n");
174             exit(1);
175         }
176         memcpy(m_uncompressedBlock, m_uncompressedBlock + inputLength, remaining);
177     }
178
179     // update block data
180     m_blockOffset = remaining;
181
182     // return result
183     return compressedLength;
184 }
185
186 // flushes the data in the BGZF block
187 void BgzfStream::FlushBlock(void) {
188
189     BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
190
191     // flush all of the remaining blocks
192     while ( m_blockOffset > 0 ) {
193
194         // compress the data block
195         unsigned int blockLength = DeflateBlock();
196
197         // flush the data to our output stream
198         unsigned int numBytesWritten = m_device->Write(m_compressedBlock, blockLength);
199         if ( numBytesWritten != blockLength ) {
200             fprintf(stderr, "BgzfStream ERROR: expected to write %u bytes during flushing, but wrote %u bytes\n",
201                     blockLength, numBytesWritten);
202             exit(1);
203         }
204
205         // update block data
206         m_blockAddress += blockLength;
207     }
208 }
209
210 // decompresses the current block
211 int BgzfStream::InflateBlock(const int& blockLength) {
212
213     // inflate the data from compressed buffer into uncompressed buffer
214     z_stream zs;
215     zs.zalloc    = NULL;
216     zs.zfree     = NULL;
217     zs.next_in   = (Bytef*)m_compressedBlock + 18;
218     zs.avail_in  = blockLength - 16;
219     zs.next_out  = (Bytef*)m_uncompressedBlock;
220     zs.avail_out = m_uncompressedBlockSize;
221
222     int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
223     if ( status != Z_OK ) {
224         fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflateInit() failed\n");
225         return -1;
226     }
227
228     status = inflate(&zs, Z_FINISH);
229     if ( status != Z_STREAM_END ) {
230         inflateEnd(&zs);
231         fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflate() failed\n");
232         return -1;
233     }
234
235     status = inflateEnd(&zs);
236     if ( status != Z_OK ) {
237         fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflateEnd() failed\n");
238         return -1;
239     }
240
241     // return result
242     return zs.total_out;
243 }
244
245 bool BgzfStream::IsOpen(void) const {
246     if ( m_device == 0 )
247         return false;
248     return m_device->IsOpen();
249 }
250
251 bool BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
252
253     // close current device if necessary
254     Close();
255
256     // sanity check
257     BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
258
259     // retrieve new IO device depending on filename
260     m_device = BamDeviceFactory::CreateDevice(filename);
261
262     // sanity check
263     BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
264
265     // if device fails to open
266     if ( !m_device->Open(mode) ) {
267         cerr << "BgzfStream::Open() - unable to open IO device:" << endl;
268         cerr << m_device->ErrorString();
269         return false;
270     }
271
272     // otherwise, set flag & return true
273     m_isOpen = true;
274     m_isWriteOnly = ( mode == IBamIODevice::WriteOnly );
275     return true;
276
277 }
278
279 // opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing)
280 bool BgzfStream::Open(const string& filename, const char* mode) {
281
282     // close current stream, if necessary, before opening next
283     if ( m_isOpen ) Close();
284
285     // determine open mode
286     if ( strcmp(mode, "rb") == 0 )
287         m_isWriteOnly = false;
288     else if ( strcmp(mode, "wb") == 0)
289         m_isWriteOnly = true;
290     else {
291         fprintf(stderr, "BgzfStream ERROR: unknown file mode: %s\n", mode);
292         return false;
293     }
294
295     // open BGZF stream on a file
296     if ( (filename != "stdin") && (filename != "stdout") && (filename != "-"))
297         m_stream = fopen(filename.c_str(), mode);
298
299     // open BGZF stream on stdin
300     else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) )
301         m_stream = freopen(NULL, mode, stdin);
302
303     // open BGZF stream on stdout
304     else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) )
305         m_stream = freopen(NULL, mode, stdout);
306
307     if ( !m_stream ) {
308         fprintf(stderr, "BgzfStream ERROR: unable to open file %s\n", filename.c_str() );
309         return false;
310     }
311
312     // set flag & return success
313     m_isOpen = true;
314     return true;
315 }
316
317 // reads BGZF data into a byte buffer
318 unsigned int BgzfStream::Read(char* data, const unsigned int dataLength) {
319
320     if ( dataLength == 0 )
321         return 0;
322
323     // if stream not open for reading
324     BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
325     if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
326         return 0;
327
328     // read blocks as needed until desired data length is retrieved
329     char* output = data;
330     unsigned int numBytesRead = 0;
331     while ( numBytesRead < dataLength ) {
332
333         // determine bytes available in current block
334         int bytesAvailable = m_blockLength - m_blockOffset;
335
336         // read (and decompress) next block if needed
337         if ( bytesAvailable <= 0 ) {
338             if ( !ReadBlock() ) return -1;
339             bytesAvailable = m_blockLength - m_blockOffset;
340             if ( bytesAvailable <= 0 ) break;
341         }
342
343         // copy data from uncompressed source buffer into data destination buffer
344         char* buffer   = m_uncompressedBlock;
345         int copyLength = min( (int)(dataLength-numBytesRead), bytesAvailable );
346         memcpy(output, buffer + m_blockOffset, copyLength);
347
348         // update counters
349         m_blockOffset += copyLength;
350         output        += copyLength;
351         numBytesRead  += copyLength;
352     }
353
354     // update block data
355     if ( m_blockOffset == m_blockLength ) {
356         m_blockAddress = m_device->Tell();
357         m_blockOffset  = 0;
358         m_blockLength  = 0;
359     }
360
361     return numBytesRead;
362 }
363
364 // reads a BGZF block
365 bool BgzfStream::ReadBlock(void) {
366
367     BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
368
369     // store block's starting address
370     int64_t blockAddress = m_device->Tell();
371
372     // read block header from file
373     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
374     int numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
375
376     // if block header empty
377     if ( numBytesRead == 0 ) {
378         m_blockLength = 0;
379         return true;
380     }
381
382     // if block header invalid size
383     if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH ) {
384         fprintf(stderr, "BgzfStream ERROR: read block failed - could not read block header\n");
385         return false;
386     }
387
388     // validate block header contents
389     if ( !BgzfStream::CheckBlockHeader(header) ) {
390         fprintf(stderr, "BgzfStream ERROR: read block failed - invalid block header\n");
391         return false;
392     }
393
394     // copy header contents to compressed buffer
395     int blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
396     char* compressedBlock = m_compressedBlock;
397     memcpy(compressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
398     int remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
399
400     // read remainder of block
401     numBytesRead = m_device->Read(&compressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
402     if ( numBytesRead != remaining ) {
403         fprintf(stderr, "BgzfStream ERROR: read block failed - could not read data from block\n");
404         return false;
405     }
406
407     // decompress block data
408     numBytesRead = InflateBlock(blockLength);
409     if ( numBytesRead < 0 ) {
410         fprintf(stderr, "BgzfStream ERROR: read block failed - could not decompress block data\n");
411         return false;
412     }
413
414     // update block data
415     if ( m_blockLength != 0 )
416         m_blockOffset = 0;
417     m_blockAddress = blockAddress;
418     m_blockLength  = numBytesRead;
419
420     // return success
421     return true;
422 }
423
424 // seek to position in BGZF file
425 bool BgzfStream::Seek(const int64_t& position) {
426
427     BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
428
429     // skip if not open or not seek-able
430     if ( !IsOpen() /*|| !m_device->IsRandomAccess()*/ ) {
431         cerr << "BgzfStream::Seek() - device not open" << endl;
432         return false;
433     }
434
435     // determine adjusted offset & address
436     int     blockOffset  = (position & 0xFFFF);
437     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
438
439     // attempt seek in file
440     if ( !m_device->Seek(blockAddress) ) {
441         cerr << "BgzfStream ERROR: unable to seek in file" << endl;
442         return false;
443     }
444
445     // update block data & return success
446     m_blockLength  = 0;
447     m_blockAddress = blockAddress;
448     m_blockOffset  = blockOffset;
449     return true;
450 }
451
452 void BgzfStream::SetWriteCompressed(bool ok) {
453     m_isWriteCompressed = ok;
454 }
455
456 // get file position in BGZF file
457 int64_t BgzfStream::Tell(void) const {
458     if ( !m_isOpen ) return 0;
459     return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
460 }
461
462 // writes the supplied data into the BGZF buffer
463 unsigned int BgzfStream::Write(const char* data, const unsigned int dataLength) {
464
465     BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
466     BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
467                  "BgzfStream::Write() - trying to write to non-writable IO device");
468
469     // write blocks as needed til all data is written
470     unsigned int numBytesWritten = 0;
471     const char* input = data;
472     unsigned int blockLength = m_uncompressedBlockSize;
473     while ( numBytesWritten < dataLength ) {
474
475         // copy data contents to uncompressed output buffer
476         unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
477         char* buffer = m_uncompressedBlock;
478         memcpy(buffer + m_blockOffset, input, copyLength);
479
480         // update counter
481         m_blockOffset   += copyLength;
482         input           += copyLength;
483         numBytesWritten += copyLength;
484
485         // flush (& compress) output buffer when full
486         if ( m_blockOffset == blockLength )
487             FlushBlock();
488     }
489
490     // return result
491     return numBytesWritten;
492 }