]> git.donarmstrong.com Git - bamtools.git/blob - src/api/internal/BgzfStream_p.cpp
Bug fix in BgzfStream I/O
[bamtools.git] / src / api / internal / BgzfStream_p.cpp
1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 11 October 2011(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
11
12 #include "api/BamAux.h"
13 #include "api/BamConstants.h"
14 #include "api/internal/BamDeviceFactory_p.h"
15 #include "api/internal/BamException_p.h"
16 #include "api/internal/BgzfStream_p.h"
17 using namespace BamTools;
18 using namespace BamTools::Internal;
19
20 #include "zlib.h"
21
22 #include <cstring>
23 #include <algorithm>
24 #include <iostream>
25 #include <sstream>
26 using namespace std;
27
28 // ----------------------------
29 // RaiiWrapper implementation
30 // ----------------------------
31
32 BgzfStream::RaiiWrapper::RaiiWrapper(void) {
33     CompressedBlock   = new char[Constants::BGZF_MAX_BLOCK_SIZE];
34     UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE];
35 }
36
37 BgzfStream::RaiiWrapper::~RaiiWrapper(void) {
38
39     // clean up buffers
40     delete[] CompressedBlock;
41     delete[] UncompressedBlock;
42     CompressedBlock = 0;
43     UncompressedBlock = 0;
44 }
45
46 // ---------------------------
47 // BgzfStream implementation
48 // ---------------------------
49
50 // constructor
51 BgzfStream::BgzfStream(void)
52   : m_blockLength(0)
53   , m_blockOffset(0)
54   , m_blockAddress(0)
55   , m_isWriteCompressed(true)
56   , m_device(0)
57 { }
58
59 // destructor
60 BgzfStream::~BgzfStream(void) {
61     Close();
62 }
63
64 // checks BGZF block header
65 bool BgzfStream::CheckBlockHeader(char* header) {
66     return (header[0] == Constants::GZIP_ID1 &&
67             header[1] == Constants::GZIP_ID2 &&
68             header[2] == Z_DEFLATED &&
69             (header[3] & Constants::FLG_FEXTRA) != 0 &&
70             BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
71             header[12] == Constants::BGZF_ID1 &&
72             header[13] == Constants::BGZF_ID2 &&
73             BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
74 }
75
76 // closes BGZF file
77 void BgzfStream::Close(void) {
78
79     // skip if no device open
80     if ( m_device == 0 ) return;
81
82     // if writing to file, flush the current BGZF block,
83     // then write an empty block (as EOF marker)
84     if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
85         FlushBlock();
86         const size_t blockLength = DeflateBlock();
87         m_device->Write(Resources.CompressedBlock, blockLength);
88     }
89
90     // close device
91     m_device->Close();
92     delete m_device;
93     m_device = 0;
94
95     // reset state
96     m_blockLength = 0;
97     m_blockOffset = 0;
98     m_blockAddress = 0;
99     m_isWriteCompressed = true;
100 }
101
102 // compresses the current block
103 size_t BgzfStream::DeflateBlock(void) {
104
105     // initialize the gzip header
106     char* buffer = Resources.CompressedBlock;
107     memset(buffer, 0, 18);
108     buffer[0]  = Constants::GZIP_ID1;
109     buffer[1]  = Constants::GZIP_ID2;
110     buffer[2]  = Constants::CM_DEFLATE;
111     buffer[3]  = Constants::FLG_FEXTRA;
112     buffer[9]  = Constants::OS_UNKNOWN;
113     buffer[10] = Constants::BGZF_XLEN;
114     buffer[12] = Constants::BGZF_ID1;
115     buffer[13] = Constants::BGZF_ID2;
116     buffer[14] = Constants::BGZF_LEN;
117
118     // set compression level
119     const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
120
121     // loop to retry for blocks that do not compress enough
122     int inputLength = m_blockOffset;
123     size_t compressedLength = 0;
124     const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
125
126     while ( true ) {
127
128         // initialize zstream values
129         z_stream zs;
130         zs.zalloc    = NULL;
131         zs.zfree     = NULL;
132         zs.next_in   = (Bytef*)Resources.UncompressedBlock;
133         zs.avail_in  = inputLength;
134         zs.next_out  = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
135         zs.avail_out = bufferSize -
136                        Constants::BGZF_BLOCK_HEADER_LENGTH -
137                        Constants::BGZF_BLOCK_FOOTER_LENGTH;
138
139         // initialize the zlib compression algorithm
140         int status = deflateInit2(&zs,
141                                   compressionLevel,
142                                   Z_DEFLATED,
143                                   Constants::GZIP_WINDOW_BITS,
144                                   Constants::Z_DEFAULT_MEM_LEVEL,
145                                   Z_DEFAULT_STRATEGY);
146         if ( status != Z_OK )
147             throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
148
149         // compress the data
150         status = deflate(&zs, Z_FINISH);
151
152         // if not at stream end
153         if ( status != Z_STREAM_END ) {
154
155             deflateEnd(&zs);
156
157             // there was not enough space available in buffer
158             // try to reduce the input length & re-start loop
159             if ( status == Z_OK ) {
160                 inputLength -= 1024;
161                 if ( inputLength < 0 )
162                     throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
163                 continue;
164             }
165
166             throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
167         }
168
169         // finalize the compression routine
170         status = deflateEnd(&zs);
171         if ( status != Z_OK )
172             throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
173
174         // update compressedLength
175         compressedLength = zs.total_out +
176                            Constants::BGZF_BLOCK_HEADER_LENGTH +
177                            Constants::BGZF_BLOCK_FOOTER_LENGTH;
178         if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
179             throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
180
181         // quit while loop
182         break;
183     }
184
185     // store the compressed length
186     BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
187
188     // store the CRC32 checksum
189     uint32_t crc = crc32(0, NULL, 0);
190     crc = crc32(crc, (Bytef*)Resources.UncompressedBlock, inputLength);
191     BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
192     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
193
194     // ensure that we have less than a block of data left
195     int remaining = m_blockOffset - inputLength;
196     if ( remaining > 0 ) {
197         if ( remaining > inputLength )
198             throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
199         memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining);
200     }
201
202     // update block data
203     m_blockOffset = remaining;
204
205     // return result
206     return compressedLength;
207 }
208
209 // flushes the data in the BGZF block
210 void BgzfStream::FlushBlock(void) {
211
212     BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
213
214     // flush all of the remaining blocks
215     while ( m_blockOffset > 0 ) {
216
217         // compress the data block
218         const size_t blockLength = DeflateBlock();
219
220         // flush the data to our output device
221         const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength);
222         if ( numBytesWritten != blockLength ) {
223             stringstream s("");
224             s << "expected to write " << blockLength
225               << " bytes during flushing, but wrote " << numBytesWritten;
226             throw BamException("BgzfStream::FlushBlock", s.str());
227         }
228
229         // update block data
230         m_blockAddress += blockLength;
231     }
232 }
233
234 // decompresses the current block
235 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
236
237     // setup zlib stream object
238     z_stream zs;
239     zs.zalloc    = NULL;
240     zs.zfree     = NULL;
241     zs.next_in   = (Bytef*)Resources.CompressedBlock + 18;
242     zs.avail_in  = blockLength - 16;
243     zs.next_out  = (Bytef*)Resources.UncompressedBlock;
244     zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
245
246     // initialize
247     int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
248     if ( status != Z_OK )
249         throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
250
251     // decompress
252     status = inflate(&zs, Z_FINISH);
253     if ( status != Z_STREAM_END ) {
254         inflateEnd(&zs);
255         throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
256     }
257
258     // finalize
259     status = inflateEnd(&zs);
260     if ( status != Z_OK ) {
261         inflateEnd(&zs);
262         throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
263     }
264
265     // return result
266     return zs.total_out;
267 }
268
269 bool BgzfStream::IsOpen(void) const {
270     if ( m_device == 0 )
271         return false;
272     return m_device->IsOpen();
273 }
274
275 void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
276
277     // close current device if necessary
278     Close();
279     BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
280
281     // retrieve new IO device depending on filename
282     m_device = BamDeviceFactory::CreateDevice(filename);
283     BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
284
285     // if device fails to open
286     if ( !m_device->Open(mode) ) {
287         const string deviceError = m_device->GetErrorString();
288         const string message = string("could not open BGZF stream: \n\t") + deviceError;
289         throw BamException("BgzfStream::Open", message);
290     }
291 }
292
293 // reads BGZF data into a byte buffer
294 size_t BgzfStream::Read(char* data, const size_t dataLength) {
295
296     if ( dataLength == 0 )
297         return 0;
298
299     // if stream not open for reading
300     BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
301     if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
302         return 0;
303
304     // read blocks as needed until desired data length is retrieved
305     char* output = data;
306     size_t numBytesRead = 0;
307     while ( numBytesRead < dataLength ) {
308
309         // determine bytes available in current block
310         int bytesAvailable = m_blockLength - m_blockOffset;
311
312         // read (and decompress) next block if needed
313         if ( bytesAvailable <= 0 ) {
314             ReadBlock();
315             bytesAvailable = m_blockLength - m_blockOffset;
316             if ( bytesAvailable <= 0 )
317                 break;
318         }
319
320         // copy data from uncompressed source buffer into data destination buffer
321         const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
322         memcpy(output, Resources.UncompressedBlock + m_blockOffset, copyLength);
323
324         // update counters
325         m_blockOffset += copyLength;
326         output        += copyLength;
327         numBytesRead  += copyLength;
328     }
329
330     // update block data
331     if ( m_blockOffset == m_blockLength ) {
332         m_blockAddress = m_device->Tell();
333         m_blockOffset  = 0;
334         m_blockLength  = 0;
335
336     }
337
338     // return actual number of bytes read
339     return numBytesRead;
340 }
341
342 // reads a BGZF block
343 void BgzfStream::ReadBlock(void) {
344
345     BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
346
347     // store block's starting address
348     int64_t blockAddress = m_device->Tell();
349
350     // read block header from file
351     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
352     size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
353
354     // if block header empty
355     if ( numBytesRead == 0 ) {
356         m_blockLength = 0;
357         return;
358     }
359
360     // if block header invalid size
361     if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH )
362         throw BamException("BgzfStream::ReadBlock", "invalid block header size");
363
364     // validate block header contents
365     if ( !BgzfStream::CheckBlockHeader(header) )
366         throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
367
368     // copy header contents to compressed buffer
369     const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
370     memcpy(Resources.CompressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
371
372     // read remainder of block
373     const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
374     numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
375     if ( numBytesRead != remaining )
376         throw BamException("BgzfStream::ReadBlock", "could not read data from block");
377
378     // decompress block data
379     numBytesRead = InflateBlock(blockLength);
380
381     // update block data
382     if ( m_blockLength != 0 )
383         m_blockOffset = 0;
384     m_blockAddress = blockAddress;
385     m_blockLength  = numBytesRead;
386 }
387
388 // seek to position in BGZF file
389 void BgzfStream::Seek(const int64_t& position) {
390
391     BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
392
393     // skip if device is not open
394     if ( !IsOpen() ) return;
395
396     // determine adjusted offset & address
397     int     blockOffset  = (position & 0xFFFF);
398     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
399
400     // attempt seek in file
401     if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
402
403         // update block data & return success
404         m_blockLength  = 0;
405         m_blockAddress = blockAddress;
406         m_blockOffset  = blockOffset;
407     }
408     else {
409         stringstream s("");
410         s << "unable to seek to position: " << position;
411         throw BamException("BgzfStream::Seek", s.str());
412     }
413 }
414
415 void BgzfStream::SetWriteCompressed(bool ok) {
416     m_isWriteCompressed = ok;
417 }
418
419 // get file position in BGZF file
420 int64_t BgzfStream::Tell(void) const {
421     if ( !IsOpen() )
422         return 0;
423     return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
424 }
425
426 // writes the supplied data into the BGZF buffer
427 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
428
429     BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
430     BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
431                  "BgzfStream::Write() - trying to write to non-writable IO device");
432
433     // skip if file not open for writing
434     if ( !IsOpen() )
435         return 0;
436
437     // write blocks as needed til all data is written
438     size_t numBytesWritten = 0;
439     const char* input = data;
440     const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
441     while ( numBytesWritten < dataLength ) {
442
443         // copy data contents to uncompressed output buffer
444         unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
445         char* buffer = Resources.UncompressedBlock;
446         memcpy(buffer + m_blockOffset, input, copyLength);
447
448         // update counter
449         m_blockOffset   += copyLength;
450         input           += copyLength;
451         numBytesWritten += copyLength;
452
453         // flush (& compress) output buffer when full
454         if ( m_blockOffset == blockLength )
455             FlushBlock();
456     }
457
458     // return actual number of bytes written
459     return numBytesWritten;
460 }