]> git.donarmstrong.com Git - bamtools.git/blob - src/api/internal/BgzfStream_p.cpp
f70b97eac1eb692035040ebf26a23bdc3686f75b
[bamtools.git] / src / api / internal / BgzfStream_p.cpp
1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 7 October 2011(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
11
12 #include <api/internal/BamDeviceFactory_p.h>
13 #include <api/internal/BamException_p.h>
14 #include <api/internal/BgzfStream_p.h>
15 using namespace BamTools;
16 using namespace BamTools::Internal;
17
18 #include <cstring>
19 #include <algorithm>
20 #include <iostream>
21 #include <sstream>
22 using namespace std;
23
24 // ----------------------------
25 // RaiiWrapper implementation
26 // ----------------------------
27
28 BgzfStream::RaiiWrapper::RaiiWrapper(void) {
29     CompressedBlock   = new char[Constants::BGZF_MAX_BLOCK_SIZE];
30     UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE];
31 }
32
33 BgzfStream::RaiiWrapper::~RaiiWrapper(void) {
34
35     // clean up buffers
36     delete[] CompressedBlock;
37     delete[] UncompressedBlock;
38     CompressedBlock = 0;
39     UncompressedBlock = 0;
40 }
41
42 // ---------------------------
43 // BgzfStream implementation
44 // ---------------------------
45
46 // constructor
47 BgzfStream::BgzfStream(void)
48   : m_blockLength(0)
49   , m_blockOffset(0)
50   , m_blockAddress(0)
51   , m_isWriteCompressed(true)
52   , m_device(0)
53 { }
54
55 // destructor
56 BgzfStream::~BgzfStream(void) {
57     Close();
58 }
59
60 // checks BGZF block header
61 bool BgzfStream::CheckBlockHeader(char* header) {
62     return (header[0] == Constants::GZIP_ID1 &&
63             header[1] == Constants::GZIP_ID2 &&
64             header[2] == Z_DEFLATED &&
65             (header[3] & Constants::FLG_FEXTRA) != 0 &&
66             BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
67             header[12] == Constants::BGZF_ID1 &&
68             header[13] == Constants::BGZF_ID2 &&
69             BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
70 }
71
72 // closes BGZF file
73 void BgzfStream::Close(void) {
74
75     // reset state
76     m_blockLength = 0;
77     m_blockOffset = 0;
78     m_blockAddress = 0;
79     m_isWriteCompressed = true;
80
81     // skip if no device open
82     if ( m_device == 0 ) return;
83
84     // if writing to file, flush the current BGZF block,
85     // then write an empty block (as EOF marker)
86     if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
87         FlushBlock();
88         const size_t blockLength = DeflateBlock();
89         m_device->Write(Resources.CompressedBlock, blockLength);
90     }
91
92     // close device
93     m_device->Close();
94     delete m_device;
95     m_device = 0;
96 }
97
98 // compresses the current block
99 size_t BgzfStream::DeflateBlock(void) {
100
101     // initialize the gzip header
102     char* buffer = Resources.CompressedBlock;
103     memset(buffer, 0, 18);
104     buffer[0]  = Constants::GZIP_ID1;
105     buffer[1]  = Constants::GZIP_ID2;
106     buffer[2]  = Constants::CM_DEFLATE;
107     buffer[3]  = Constants::FLG_FEXTRA;
108     buffer[9]  = Constants::OS_UNKNOWN;
109     buffer[10] = Constants::BGZF_XLEN;
110     buffer[12] = Constants::BGZF_ID1;
111     buffer[13] = Constants::BGZF_ID2;
112     buffer[14] = Constants::BGZF_LEN;
113
114     // set compression level
115     const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
116
117     // loop to retry for blocks that do not compress enough
118     int inputLength = m_blockOffset;
119     size_t compressedLength = 0;
120     const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
121
122     while ( true ) {
123
124         // initialize zstream values
125         z_stream zs;
126         zs.zalloc    = NULL;
127         zs.zfree     = NULL;
128         zs.next_in   = (Bytef*)Resources.UncompressedBlock;
129         zs.avail_in  = inputLength;
130         zs.next_out  = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
131         zs.avail_out = bufferSize -
132                        Constants::BGZF_BLOCK_HEADER_LENGTH -
133                        Constants::BGZF_BLOCK_FOOTER_LENGTH;
134
135         // initialize the zlib compression algorithm
136         int status = deflateInit2(&zs,
137                                   compressionLevel,
138                                   Z_DEFLATED,
139                                   Constants::GZIP_WINDOW_BITS,
140                                   Constants::Z_DEFAULT_MEM_LEVEL,
141                                   Z_DEFAULT_STRATEGY);
142         if ( status != Z_OK )
143             throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
144
145         // compress the data
146         status = deflate(&zs, Z_FINISH);
147
148         // if not at stream end
149         if ( status != Z_STREAM_END ) {
150
151             deflateEnd(&zs);
152
153             // if error status
154             if ( status != Z_OK )
155                 throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
156
157             // not enough space available in buffer
158             // try to reduce the input length & re-start loop
159             inputLength -= 1024;
160             if ( inputLength <= 0 )
161                 throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
162             continue;
163         }
164
165         // finalize the compression routine
166         status = deflateEnd(&zs);
167         if ( status != Z_OK )
168             throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
169
170         // update compressedLength
171         compressedLength = zs.total_out +
172                            Constants::BGZF_BLOCK_HEADER_LENGTH +
173                            Constants::BGZF_BLOCK_FOOTER_LENGTH;
174         if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
175             throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
176
177         // quit while loop
178         break;
179     }
180
181     // store the compressed length
182     BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
183
184     // store the CRC32 checksum
185     uint32_t crc = crc32(0, NULL, 0);
186     crc = crc32(crc, (Bytef*)Resources.UncompressedBlock, inputLength);
187     BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
188     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
189
190     // ensure that we have less than a block of data left
191     int remaining = m_blockOffset - inputLength;
192     if ( remaining > 0 ) {
193         if ( remaining > inputLength )
194             throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
195         memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining);
196     }
197
198     // update block data
199     m_blockOffset = remaining;
200
201     // return result
202     return compressedLength;
203 }
204
205 // flushes the data in the BGZF block
206 void BgzfStream::FlushBlock(void) {
207
208     BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
209
210     // flush all of the remaining blocks
211     while ( m_blockOffset > 0 ) {
212
213         // compress the data block
214         const size_t blockLength = DeflateBlock();
215
216         // flush the data to our output device
217         const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength);
218         if ( numBytesWritten != blockLength ) {
219             stringstream s("");
220             s << "expected to write " << blockLength
221               << " bytes during flushing, but wrote " << numBytesWritten;
222             throw BamException("BgzfStream::FlushBlock", s.str());
223         }
224
225         // update block data
226         m_blockAddress += blockLength;
227     }
228 }
229
230 // decompresses the current block
231 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
232
233     // setup zlib stream object
234     z_stream zs;
235     zs.zalloc    = NULL;
236     zs.zfree     = NULL;
237     zs.next_in   = (Bytef*)Resources.CompressedBlock + 18;
238     zs.avail_in  = blockLength - 16;
239     zs.next_out  = (Bytef*)Resources.UncompressedBlock;
240     zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
241
242     // initialize
243     int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
244     if ( status != Z_OK )
245         throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
246
247     // decompress
248     status = inflate(&zs, Z_FINISH);
249     if ( status != Z_STREAM_END ) {
250         inflateEnd(&zs);
251         throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
252     }
253
254     // finalize
255     status = inflateEnd(&zs);
256     if ( status != Z_OK ) {
257         inflateEnd(&zs);
258         throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
259     }
260
261     // return result
262     return zs.total_out;
263 }
264
265 bool BgzfStream::IsOpen(void) const {
266     if ( m_device == 0 )
267         return false;
268     return m_device->IsOpen();
269 }
270
271 void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
272
273     // close current device if necessary
274     Close();
275     BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
276
277     // retrieve new IO device depending on filename
278     m_device = BamDeviceFactory::CreateDevice(filename);
279     BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
280
281     // if device fails to open
282     if ( !m_device->Open(mode) ) {
283         const string deviceError = m_device->GetErrorString();
284         const string message = string("could not open BGZF stream: \n\t") + deviceError;
285         throw BamException("BgzfStream::Open", message);
286     }
287 }
288
289 // reads BGZF data into a byte buffer
290 size_t BgzfStream::Read(char* data, const size_t dataLength) {
291
292     if ( dataLength == 0 )
293         return 0;
294
295     // if stream not open for reading
296     BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
297     if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
298         return 0;
299
300     // read blocks as needed until desired data length is retrieved
301     size_t numBytesRead = 0;
302     while ( numBytesRead < dataLength ) {
303
304         // determine bytes available in current block
305         int bytesAvailable = m_blockLength - m_blockOffset;
306
307         // read (and decompress) next block if needed
308         if ( bytesAvailable <= 0 ) {
309             ReadBlock();
310             bytesAvailable = m_blockLength - m_blockOffset;
311             if ( bytesAvailable <= 0 )
312                 break;
313         }
314
315         // copy data from uncompressed source buffer into data destination buffer
316         const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
317         memcpy(data, Resources.UncompressedBlock + m_blockOffset, copyLength);
318
319         // update counters
320         m_blockOffset  += copyLength;
321         data         += copyLength;
322         numBytesRead += copyLength;
323     }
324
325     // update block data
326     if ( m_blockOffset == m_blockLength ) {
327         m_blockAddress = m_device->Tell();
328         m_blockOffset  = 0;
329         m_blockLength  = 0;
330
331     }
332
333     // return actual number of bytes read
334     return numBytesRead;
335 }
336
337 // reads a BGZF block
338 void BgzfStream::ReadBlock(void) {
339
340     BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
341
342     // store block's starting address
343     int64_t blockAddress = m_device->Tell();
344
345     // read block header from file
346     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
347     size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
348
349     // if block header empty
350     if ( numBytesRead == 0 ) {
351         m_blockLength = 0;
352         return;
353     }
354
355     // if block header invalid size
356     if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH )
357         throw BamException("BgzfStream::ReadBlock", "invalid block header size");
358
359     // validate block header contents
360     if ( !BgzfStream::CheckBlockHeader(header) )
361         throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
362
363     // copy header contents to compressed buffer
364     const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
365     memcpy(Resources.CompressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
366
367     // read remainder of block
368     const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
369     numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
370     if ( numBytesRead != remaining )
371         throw BamException("BgzfStream::ReadBlock", "could not read data from block");
372
373     // decompress block data
374     numBytesRead = InflateBlock(blockLength);
375
376     // update block data
377     if ( m_blockLength != 0 )
378         m_blockOffset = 0;
379     m_blockAddress = blockAddress;
380     m_blockLength  = numBytesRead;
381 }
382
383 // seek to position in BGZF file
384 void BgzfStream::Seek(const int64_t& position) {
385
386     BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
387
388     // skip if device is not open
389     if ( !IsOpen() ) return;
390
391     // determine adjusted offset & address
392     int     blockOffset  = (position & 0xFFFF);
393     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
394
395     // attempt seek in file
396     if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
397
398         // update block data & return success
399         m_blockLength  = 0;
400         m_blockAddress = blockAddress;
401         m_blockOffset  = blockOffset;
402     }
403     else {
404         stringstream s("");
405         s << "unable to seek to position: " << position;
406         throw BamException("BgzfStream::Seek", s.str());
407     }
408 }
409
410 void BgzfStream::SetWriteCompressed(bool ok) {
411     m_isWriteCompressed = ok;
412 }
413
414 // get file position in BGZF file
415 int64_t BgzfStream::Tell(void) const {
416     if ( !IsOpen() )
417         return 0;
418     return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
419 }
420
421 // writes the supplied data into the BGZF buffer
422 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
423
424     BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
425     BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
426                  "BgzfStream::Write() - trying to write to non-writable IO device");
427
428     // skip if file not open for writing
429     if ( !IsOpen() )
430         return 0;
431
432     // write blocks as needed til all data is written
433     size_t numBytesWritten = 0;
434     const char* input = data;
435     const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
436     while ( numBytesWritten < dataLength ) {
437
438         // copy data contents to uncompressed output buffer
439         unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
440         char* buffer = Resources.UncompressedBlock;
441         memcpy(buffer + m_blockOffset, input, copyLength);
442
443         // update counter
444         m_blockOffset   += copyLength;
445         input           += copyLength;
446         numBytesWritten += copyLength;
447
448         // flush (& compress) output buffer when full
449         if ( m_blockOffset == blockLength )
450             FlushBlock();
451     }
452
453     // return actual number of bytes written
454     return numBytesWritten;
455 }