]> git.donarmstrong.com Git - bamtools.git/blob - src/api/internal/BgzfStream_p.cpp
Merge with earlier IODevice work
[bamtools.git] / src / api / internal / BgzfStream_p.cpp
1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 7 October 2011(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
11
12 #include <api/internal/BamDeviceFactory_p.h>
13 #include <api/internal/BamException_p.h>
14 #include <api/internal/BgzfStream_p.h>
15 using namespace BamTools;
16 using namespace BamTools::Internal;
17
18 #include <cstring>
19 #include <algorithm>
20 #include <iostream>
21 #include <sstream>
22 using namespace std;
23
24 // ----------------------------
25 // RaiiWrapper implementation
26 // ----------------------------
27
28 BgzfStream::RaiiWrapper::RaiiWrapper(void)
29     : Stream(0)
30 {
31     CompressedBlock   = new char[Constants::BGZF_MAX_BLOCK_SIZE];
32     UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE];
33 }
34
35 BgzfStream::RaiiWrapper::~RaiiWrapper(void) {
36
37     // clean up buffers
38     delete[] CompressedBlock;
39     delete[] UncompressedBlock;
40     CompressedBlock = 0;
41     UncompressedBlock = 0;
42
43     if ( Stream ) {
44         fflush(Stream);
45         fclose(Stream);
46         Stream = 0;
47     }
48 }
49
50 // ---------------------------
51 // BgzfStream implementation
52 // ---------------------------
53
54 // constructor
55 BgzfStream::BgzfStream(void)
56   : m_blockLength(0)
57   , m_blockOffset(0)
58   , m_blockAddress(0)
59   , m_isOpen(false)
60   , m_isWriteOnly(false)
61   , m_isWriteCompressed(true)
62   , m_device(0)
63 { }
64
65 // destructor
66 BgzfStream::~BgzfStream(void) {
67     Close();
68 }
69
70 // checks BGZF block header
71 bool BgzfStream::CheckBlockHeader(char* header) {
72     return (header[0] == Constants::GZIP_ID1 &&
73             header[1] == Constants::GZIP_ID2 &&
74             header[2] == Z_DEFLATED &&
75             (header[3] & Constants::FLG_FEXTRA) != 0 &&
76             BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
77             header[12] == Constants::BGZF_ID1 &&
78             header[13] == Constants::BGZF_ID2 &&
79             BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
80 }
81
82 // closes BGZF file
83 void BgzfStream::Close(void) {
84
85     // skip if no device open
86     if ( m_device == 0 )
87         return;
88
89     // if writing to file, flush the current BGZF block,
90     // then write an empty block (as EOF marker)
91     if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
92         FlushBlock();
93         const size_t blockLength = DeflateBlock();
94         m_device->Write(Resources.CompressedBlock, blockLength);
95     }
96
97     // close device
98     m_device->Close();
99     delete m_device;
100     m_device = 0;
101
102     // ??
103     fflush(Resources.Stream);
104     fclose(Resources.Stream);
105     Resources.Stream = 0;
106
107     // reset state
108     m_blockLength = 0;
109     m_blockOffset = 0;
110     m_blockAddress = 0;
111     m_isOpen = false;
112     m_isWriteOnly = false;
113     m_isWriteCompressed = true;
114
115 }
116
117 // compresses the current block
118 size_t BgzfStream::DeflateBlock(void) {
119
120     // initialize the gzip header
121     char* buffer = Resources.CompressedBlock;
122     memset(buffer, 0, 18);
123     buffer[0]  = Constants::GZIP_ID1;
124     buffer[1]  = Constants::GZIP_ID2;
125     buffer[2]  = Constants::CM_DEFLATE;
126     buffer[3]  = Constants::FLG_FEXTRA;
127     buffer[9]  = Constants::OS_UNKNOWN;
128     buffer[10] = Constants::BGZF_XLEN;
129     buffer[12] = Constants::BGZF_ID1;
130     buffer[13] = Constants::BGZF_ID2;
131     buffer[14] = Constants::BGZF_LEN;
132
133     // set compression level
134     const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
135
136     // loop to retry for blocks that do not compress enough
137     int inputLength = BlockOffset;
138     size_t compressedLength = 0;
139     const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
140
141     while ( true ) {
142
143         // initialize zstream values
144         z_stream zs;
145         zs.zalloc    = NULL;
146         zs.zfree     = NULL;
147         zs.next_in   = (Bytef*)Resources.UncompressedBlock;
148         zs.avail_in  = inputLength;
149         zs.next_out  = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
150         zs.avail_out = bufferSize -
151                        Constants::BGZF_BLOCK_HEADER_LENGTH -
152                        Constants::BGZF_BLOCK_FOOTER_LENGTH;
153
154         // initialize the zlib compression algorithm
155         int status = deflateInit2(&zs,
156                                   compressionLevel,
157                                   Z_DEFLATED,
158                                   Constants::GZIP_WINDOW_BITS,
159                                   Constants::Z_DEFAULT_MEM_LEVEL,
160                                   Z_DEFAULT_STRATEGY);
161         if ( status != Z_OK )
162             throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
163
164         // compress the data
165         status = deflate(&zs, Z_FINISH);
166
167         // if not at stream end
168         if ( status != Z_STREAM_END ) {
169
170             deflateEnd(&zs);
171
172             // if error status
173             if ( status != Z_OK )
174                 throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
175
176             // not enough space available in buffer
177             // try to reduce the input length & re-start loop
178             inputLength -= 1024;
179             if ( inputLength <= 0 )
180                 throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
181             continue;
182         }
183
184         // finalize the compression routine
185         status = deflateEnd(&zs);
186         if ( status != Z_OK )
187             throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
188
189         // update compressedLength
190         compressedLength = zs.total_out +
191                            Constants::BGZF_BLOCK_HEADER_LENGTH +
192                            Constants::BGZF_BLOCK_FOOTER_LENGTH;
193         if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
194             throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
195
196         // quit while loop
197         break;
198     }
199
200     // store the compressed length
201     BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
202
203     // store the CRC32 checksum
204     uint32_t crc = crc32(0, NULL, 0);
205     crc = crc32(crc, (Bytef*)Resources.UncompressedBlock, inputLength);
206     BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
207     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
208
209     // ensure that we have less than a block of data left
210     int remaining = m_blockOffset - inputLength;
211     if ( remaining > 0 ) {
212         if ( remaining > inputLength )
213             throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
214         memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining);
215     }
216
217     // update block data
218     m_blockOffset = remaining;
219
220     // return result
221     return compressedLength;
222 }
223
224 // flushes the data in the BGZF block
225 void BgzfStream::FlushBlock(void) {
226
227     BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
228
229     // flush all of the remaining blocks
230     while ( m_blockOffset > 0 ) {
231
232         // compress the data block
233         const size_t blockLength = DeflateBlock();
234
235         // flush the data to our output device
236         const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength);
237         if ( numBytesWritten != blockLength ) {
238             stringstream s("");
239             s << "expected to write " << blockLength
240               << " bytes during flushing, but wrote " << numBytesWritten;
241             throw BamException("BgzfStream::FlushBlock", s.str());
242         }
243
244         // update block data
245         m_blockAddress += blockLength;
246     }
247 }
248
249 // decompresses the current block
250 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
251
252     // setup zlib stream object
253     z_stream zs;
254     zs.zalloc    = NULL;
255     zs.zfree     = NULL;
256     zs.next_in   = (Bytef*)Resources.CompressedBlock + 18;
257     zs.avail_in  = blockLength - 16;
258     zs.next_out  = (Bytef*)Resources.UncompressedBlock;
259     zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
260
261     // initialize
262     int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
263     if ( status != Z_OK )
264         throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
265
266     // decompress
267     status = inflate(&zs, Z_FINISH);
268     if ( status != Z_STREAM_END ) {
269         inflateEnd(&zs);
270         throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
271     }
272
273     // finalize
274     status = inflateEnd(&zs);
275     if ( status != Z_OK ) {
276         inflateEnd(&zs);
277         throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
278     }
279
280     // return result
281     return zs.total_out;
282 }
283
284 bool BgzfStream::IsOpen(void) const {
285     if ( m_device == 0 )
286         return false;
287     return m_device->IsOpen();
288 }
289
290 bool BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
291
292     // close current device if necessary
293     Close();
294
295     // sanity check
296     BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
297
298     // retrieve new IO device depending on filename
299     m_device = BamDeviceFactory::CreateDevice(filename);
300
301     // sanity check
302     BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
303
304     // if device fails to open
305     if ( !m_device->Open(mode) ) {
306         cerr << "BgzfStream::Open() - unable to open IO device:" << endl;
307         cerr << m_device->ErrorString();
308         return false;
309     }
310
311     // otherwise, set flag & return true
312     m_isOpen = true;
313     m_isWriteOnly = ( mode == IBamIODevice::WriteOnly );
314     return true;
315
316 }
317
318 // opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing)
319 void BgzfStream::Open(const string& filename, const char* mode) {
320
321     // make sure we're starting with fresh state
322     if ( IsOpen() )
323         Close();
324
325     // determine open mode
326     if ( strcmp(mode, "rb") == 0 )
327         m_isWriteOnly = false;
328     else if ( strcmp(mode, "wb") == 0)
329         m_isWriteOnly = true;
330     else {
331         const string message = string("unknown file mode: ") + mode;
332         throw BamException("BgzfStream::Open", message);
333     }
334
335     // open BGZF stream on a file
336     if ( (filename != "stdin") && (filename != "stdout") && (filename != "-"))
337         Resources.Stream = fopen(filename.c_str(), mode);
338
339     // open BGZF stream on stdin
340     else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) )
341         Resources.Stream = freopen(NULL, mode, stdin);
342
343     // open BGZF stream on stdout
344     else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) )
345         Resources.Stream = freopen(NULL, mode, stdout);
346
347     // ensure valid Stream
348     if ( !Resources.Stream ) {
349         const string message = string("unable to open file: ") + filename;
350         throw BamException("BgzfStream::Open", message);
351     }
352
353     // set flag & return success
354     m_isOpen = true;
355     return true;
356 }
357
358 // reads BGZF data into a byte buffer
359 size_t BgzfStream::Read(char* data, const size_t dataLength) {
360
361     if ( dataLength == 0 )
362         return 0;
363
364     // if stream not open for reading
365     BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
366     if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
367         return 0;
368
369     // read blocks as needed until desired data length is retrieved
370     size_t numBytesRead = 0;
371     while ( numBytesRead < dataLength ) {
372
373         // determine bytes available in current block
374         int bytesAvailable = m_blockLength - m_blockOffset;
375
376         // read (and decompress) next block if needed
377         if ( bytesAvailable <= 0 ) {
378             ReadBlock();
379             bytesAvailable = m_blockLength - m_blockOffset;
380             if ( bytesAvailable <= 0 )
381                 break;
382         }
383
384         // copy data from uncompressed source buffer into data destination buffer
385         const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
386         memcpy(data, Resources.UncompressedBlock + m_blockOffset, copyLength);
387
388         // update counters
389         m_blockOffset  += copyLength;
390         data         += copyLength;
391         numBytesRead += copyLength;
392     }
393
394     // update block data
395     if ( m_blockOffset == m_blockLength ) {
396         m_blockAddress = m_device->Tell();
397         m_BlockOffset  = 0;
398         m_BlockLength  = 0;
399
400     }
401
402     // return actual number of bytes read
403     return numBytesRead;
404 }
405
406 // reads a BGZF block
407 void BgzfStream::ReadBlock(void) {
408
409     BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
410
411     // store block's starting address
412     int64_t blockAddress = m_device->Tell();
413
414     // read block header from file
415     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
416     size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
417
418     // if block header empty
419     if ( numBytesRead == 0 ) {
420         m_blockLength = 0;
421         return true;
422     }
423
424     // if block header invalid size
425     if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH )
426         throw BamException("BgzfStream::ReadBlock", "invalid block header size");
427
428     // validate block header contents
429     if ( !BgzfStream::CheckBlockHeader(header) )
430         throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
431
432     // copy header contents to compressed buffer
433     const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
434     memcpy(Resources.CompressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
435
436     // read remainder of block
437     const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
438     numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
439     if ( numBytesRead != remaining )
440         throw BamException("BgzfStream::ReadBlock", "could not read data from block");
441
442     // decompress block data
443     numBytesRead = InflateBlock(blockLength);
444
445     // update block data
446     if ( m_blockLength != 0 )
447         m_blockOffset = 0;
448     m_blockAddress = blockAddress;
449     m_blockLength  = numBytesRead;
450 }
451
452 // seek to position in BGZF file
453 void BgzfStream::Seek(const int64_t& position) {
454
455     BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
456
457     // skip if not open or not seek-able
458     if ( !IsOpen() /*|| !m_device->IsRandomAccess()*/ ) {
459         cerr << "BgzfStream::Seek() - device not open" << endl;
460         return false;
461     }
462
463     // determine adjusted offset & address
464     int     blockOffset  = (position & 0xFFFF);
465     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
466
467     // attempt seek in file
468     if ( !m_device->Seek(blockAddress) ) {
469         stringstream s("");
470         s << "unable to seek to position: " << position;
471         throw BamException("BgzfStream::Seek", s.str());
472     }
473
474     // update block data & return success
475     m_blockLength  = 0;
476     m_blockAddress = blockAddress;
477     m_blockOffset  = blockOffset;
478 }
479
480 void BgzfStream::SetWriteCompressed(bool ok) {
481     m_isWriteCompressed = ok;
482 }
483
484 // get file position in BGZF file
485 int64_t BgzfStream::Tell(void) const {
486     if ( !m_isOpen ) return 0;
487     return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
488 }
489
490 // writes the supplied data into the BGZF buffer
491 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
492
493     BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
494     BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
495                  "BgzfStream::Write() - trying to write to non-writable IO device");
496
497     // skip if file not open for writing
498     if ( !IsOpen || !IsWriteOnly )
499         return false;
500
501     // write blocks as needed til all data is written
502     size_t numBytesWritten = 0;
503     const char* input = data;
504     const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
505     while ( numBytesWritten < dataLength ) {
506
507         // copy data contents to uncompressed output buffer
508         unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
509         char* buffer = Resources.UncompressedBlock;
510         memcpy(buffer + m_blockOffset, input, copyLength);
511
512         // update counter
513         m_blockOffset   += copyLength;
514         input           += copyLength;
515         numBytesWritten += copyLength;
516
517         // flush (& compress) output buffer when full
518         if ( m_blockOffset == blockLength )
519             FlushBlock();
520     }
521
522     // return actual number of bytes written
523     return numBytesWritten;
524 }