]> git.donarmstrong.com Git - bamtools.git/blob - src/api/internal/io/BgzfStream_p.cpp
Clarified documentation on BamReader::GetNextAlignmentCore
[bamtools.git] / src / api / internal / io / BgzfStream_p.cpp
1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // ---------------------------------------------------------------------------
5 // Last modified: 17 January 2012(DB)
6 // ---------------------------------------------------------------------------
7 // Based on BGZF routines developed at the Broad Institute.
8 // Provides the basic functionality for reading & writing BGZF files
9 // Replaces the old BGZF.* files to avoid clashing with other toolkits
10 // ***************************************************************************
11
12 #include "api/BamAux.h"
13 #include "api/BamConstants.h"
14 #include "api/internal/io/BamDeviceFactory_p.h"
15 #include "api/internal/io/BgzfStream_p.h"
16 #include "api/internal/utils/BamException_p.h"
17 using namespace BamTools;
18 using namespace BamTools::Internal;
19
20 #include "zlib.h"
21
22 #include <cstring>
23 #include <algorithm>
24 #include <iostream>
25 #include <sstream>
26 using namespace std;
27
28 // ---------------------------
29 // BgzfStream implementation
30 // ---------------------------
31
32 // constructor
33 BgzfStream::BgzfStream(void)
34   : m_blockLength(0)
35   , m_blockOffset(0)
36   , m_blockAddress(0)
37   , m_isWriteCompressed(true)
38   , m_device(0)
39   , m_uncompressedBlock(Constants::BGZF_DEFAULT_BLOCK_SIZE)
40   , m_compressedBlock(Constants::BGZF_MAX_BLOCK_SIZE)
41 { }
42
43 // destructor
44 BgzfStream::~BgzfStream(void) {
45     Close();
46 }
47
48 // checks BGZF block header
49 bool BgzfStream::CheckBlockHeader(char* header) {
50     return (header[0] == Constants::GZIP_ID1 &&
51             header[1] == Constants::GZIP_ID2 &&
52             header[2] == Z_DEFLATED &&
53             (header[3] & Constants::FLG_FEXTRA) != 0 &&
54             BamTools::UnpackUnsignedShort(&header[10]) == Constants::BGZF_XLEN &&
55             header[12] == Constants::BGZF_ID1 &&
56             header[13] == Constants::BGZF_ID2 &&
57             BamTools::UnpackUnsignedShort(&header[14]) == Constants::BGZF_LEN );
58 }
59
60 // closes BGZF file
61 void BgzfStream::Close(void) {
62
63     // skip if no device open
64     if ( m_device == 0 ) return;
65
66     // if writing to file, flush the current BGZF block,
67     // then write an empty block (as EOF marker)
68     if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
69         FlushBlock();
70         const size_t blockLength = DeflateBlock(0);
71         m_device->Write(m_compressedBlock.Buffer, blockLength);
72     }
73
74     // close device
75     m_device->Close();
76     delete m_device;
77     m_device = 0;
78
79     // ensure our buffers are cleared out
80     m_uncompressedBlock.Clear();
81     m_compressedBlock.Clear();
82
83     // reset state
84     m_blockLength = 0;
85     m_blockOffset = 0;
86     m_blockAddress = 0;
87     m_isWriteCompressed = true;
88 }
89
90 // compresses the current block
91 size_t BgzfStream::DeflateBlock(int32_t blockLength) {
92
93     // initialize the gzip header
94     char* buffer = m_compressedBlock.Buffer;
95     memset(buffer, 0, 18);
96     buffer[0]  = Constants::GZIP_ID1;
97     buffer[1]  = Constants::GZIP_ID2;
98     buffer[2]  = Constants::CM_DEFLATE;
99     buffer[3]  = Constants::FLG_FEXTRA;
100     buffer[9]  = Constants::OS_UNKNOWN;
101     buffer[10] = Constants::BGZF_XLEN;
102     buffer[12] = Constants::BGZF_ID1;
103     buffer[13] = Constants::BGZF_ID2;
104     buffer[14] = Constants::BGZF_LEN;
105
106     // set compression level
107     const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
108
109     // loop to retry for blocks that do not compress enough
110     int inputLength = blockLength;
111     size_t compressedLength = 0;
112     const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
113
114     while ( true ) {
115
116         // initialize zstream values
117         z_stream zs;
118         zs.zalloc    = NULL;
119         zs.zfree     = NULL;
120         zs.next_in   = (Bytef*)m_uncompressedBlock.Buffer;
121         zs.avail_in  = inputLength;
122         zs.next_out  = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
123         zs.avail_out = bufferSize -
124                        Constants::BGZF_BLOCK_HEADER_LENGTH -
125                        Constants::BGZF_BLOCK_FOOTER_LENGTH;
126
127         // initialize the zlib compression algorithm
128         int status = deflateInit2(&zs,
129                                   compressionLevel,
130                                   Z_DEFLATED,
131                                   Constants::GZIP_WINDOW_BITS,
132                                   Constants::Z_DEFAULT_MEM_LEVEL,
133                                   Z_DEFAULT_STRATEGY);
134         if ( status != Z_OK )
135             throw BamException("BgzfStream::DeflateBlock", "zlib deflateInit2 failed");
136
137         // compress the data
138         status = deflate(&zs, Z_FINISH);
139
140         // if not at stream end
141         if ( status != Z_STREAM_END ) {
142
143             deflateEnd(&zs);
144
145             // there was not enough space available in buffer
146             // try to reduce the input length & re-start loop
147             if ( status == Z_OK ) {
148                 inputLength -= 1024;
149                 if ( inputLength < 0 )
150                     throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
151                 continue;
152             }
153
154             throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
155         }
156
157         // finalize the compression routine
158         status = deflateEnd(&zs);
159         if ( status != Z_OK )
160             throw BamException("BgzfStream::DeflateBlock", "zlib deflateEnd failed");
161
162         // update compressedLength
163         compressedLength = zs.total_out +
164                            Constants::BGZF_BLOCK_HEADER_LENGTH +
165                            Constants::BGZF_BLOCK_FOOTER_LENGTH;
166         if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE )
167             throw BamException("BgzfStream::DeflateBlock", "deflate overflow");
168
169         // quit while loop
170         break;
171     }
172
173     // store the compressed length
174     BamTools::PackUnsignedShort(&buffer[16], static_cast<uint16_t>(compressedLength - 1));
175
176     // store the CRC32 checksum
177     uint32_t crc = crc32(0, NULL, 0);
178     crc = crc32(crc, (Bytef*)m_uncompressedBlock.Buffer, inputLength);
179     BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
180     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
181
182     // ensure that we have less than a block of data left
183     int remaining = blockLength - inputLength;
184     if ( remaining > 0 ) {
185         if ( remaining > inputLength )
186             throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
187         memcpy(m_uncompressedBlock.Buffer, m_uncompressedBlock.Buffer + inputLength, remaining);
188     }
189
190     // update block data
191     m_blockOffset = remaining;
192
193     // return result
194     return compressedLength;
195 }
196
197 // flushes the data in the BGZF block
198 void BgzfStream::FlushBlock(void) {
199
200     BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
201
202     // flush all of the remaining blocks
203     while ( m_blockOffset > 0 ) {
204
205         // compress the data block
206         const size_t blockLength = DeflateBlock(m_blockOffset);
207
208         // flush the data to our output device
209         const int64_t numBytesWritten = m_device->Write(m_compressedBlock.Buffer, blockLength);
210
211         // check for device error
212         if ( numBytesWritten < 0 ) {
213             const string message = string("device error: ") + m_device->GetErrorString();
214             throw BamException("BgzfStream::FlushBlock", message);
215         }
216
217         // check that we wrote expected numBytes
218         if ( numBytesWritten != static_cast<int64_t>(blockLength) ) {
219             stringstream s("");
220             s << "expected to write " << blockLength
221               << " bytes during flushing, but wrote " << numBytesWritten;
222             throw BamException("BgzfStream::FlushBlock", s.str());
223         }
224
225         // update block data
226         m_blockAddress += blockLength;
227     }
228 }
229
230 // decompresses the current block
231 size_t BgzfStream::InflateBlock(const size_t& blockLength) {
232
233     // setup zlib stream object
234     z_stream zs;
235     zs.zalloc    = NULL;
236     zs.zfree     = NULL;
237     zs.next_in   = (Bytef*)m_compressedBlock.Buffer + 18;
238     zs.avail_in  = blockLength - 16;
239     zs.next_out  = (Bytef*)m_uncompressedBlock.Buffer;
240     zs.avail_out = Constants::BGZF_DEFAULT_BLOCK_SIZE;
241
242     // initialize
243     int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
244     if ( status != Z_OK )
245         throw BamException("BgzfStream::InflateBlock", "zlib inflateInit failed");
246
247     // decompress
248     status = inflate(&zs, Z_FINISH);
249     if ( status != Z_STREAM_END ) {
250         inflateEnd(&zs);
251         throw BamException("BgzfStream::InflateBlock", "zlib inflate failed");
252     }
253
254     // finalize
255     status = inflateEnd(&zs);
256     if ( status != Z_OK ) {
257         inflateEnd(&zs);
258         throw BamException("BgzfStream::InflateBlock", "zlib inflateEnd failed");
259     }
260
261     // return result
262     return zs.total_out;
263 }
264
265 bool BgzfStream::IsOpen(void) const {
266     if ( m_device == 0 )
267         return false;
268     return m_device->IsOpen();
269 }
270
271 void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
272
273     // close current device if necessary
274     Close();
275     BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
276
277     // retrieve new IO device depending on filename
278     m_device = BamDeviceFactory::CreateDevice(filename);
279     BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
280
281     // if device fails to open
282     if ( !m_device->Open(mode) ) {
283         const string deviceError = m_device->GetErrorString();
284         const string message = string("could not open BGZF stream: \n\t") + deviceError;
285         throw BamException("BgzfStream::Open", message);
286     }
287 }
288
289 // reads BGZF data into a byte buffer
290 size_t BgzfStream::Read(char* data, const size_t dataLength) {
291
292     if ( dataLength == 0 )
293         return 0;
294
295     // if stream not open for reading
296     BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
297     if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
298         return 0;
299
300     // read blocks as needed until desired data length is retrieved
301     char* output = data;
302     size_t numBytesRead = 0;
303     while ( numBytesRead < dataLength ) {
304
305         // determine bytes available in current block
306         int bytesAvailable = m_blockLength - m_blockOffset;
307
308         // read (and decompress) next block if needed
309         if ( bytesAvailable <= 0 ) {
310             ReadBlock();
311             bytesAvailable = m_blockLength - m_blockOffset;
312             if ( bytesAvailable <= 0 )
313                 break;
314         }
315
316         // copy data from uncompressed source buffer into data destination buffer
317         const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
318         memcpy(output, m_uncompressedBlock.Buffer + m_blockOffset, copyLength);
319
320         // update counters
321         m_blockOffset += copyLength;
322         output        += copyLength;
323         numBytesRead  += copyLength;
324     }
325
326     // update block data
327     if ( m_blockOffset == m_blockLength ) {
328         m_blockAddress = m_device->Tell();
329         m_blockOffset  = 0;
330         m_blockLength  = 0;
331     }
332
333     // return actual number of bytes read
334     return numBytesRead;
335 }
336
337 // reads a BGZF block
338 void BgzfStream::ReadBlock(void) {
339
340     BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
341
342     // store block's starting address
343     const int64_t blockAddress = m_device->Tell();
344
345     // read block header from file
346     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
347     int64_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
348
349     // check for device error
350     if ( numBytesRead < 0 ) {
351         const string message = string("device error: ") + m_device->GetErrorString();
352         throw BamException("BgzfStream::ReadBlock", message);
353     }
354
355     // if block header empty
356     if ( numBytesRead == 0 ) {
357         m_blockLength = 0;
358         return;
359     }
360
361     // if block header invalid size
362     if ( numBytesRead != static_cast<int8_t>(Constants::BGZF_BLOCK_HEADER_LENGTH) )
363         throw BamException("BgzfStream::ReadBlock", "invalid block header size");
364
365     // validate block header contents
366     if ( !BgzfStream::CheckBlockHeader(header) )
367         throw BamException("BgzfStream::ReadBlock", "invalid block header contents");
368
369     // copy header contents to compressed buffer
370     const size_t blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
371     memcpy(m_compressedBlock.Buffer, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
372
373     // read remainder of block
374     const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
375     numBytesRead = m_device->Read(&m_compressedBlock.Buffer[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
376
377     // check for device error
378     if ( numBytesRead < 0 ) {
379         const string message = string("device error: ") + m_device->GetErrorString();
380         throw BamException("BgzfStream::ReadBlock", message);
381     }
382
383     // check that we read in expected numBytes
384     if ( numBytesRead != static_cast<int64_t>(remaining) )
385         throw BamException("BgzfStream::ReadBlock", "could not read data from block");
386
387     // decompress block data
388     const size_t newBlockLength = InflateBlock(blockLength);
389
390     // update block data
391     if ( m_blockLength != 0 )
392         m_blockOffset = 0;
393     m_blockAddress = blockAddress;
394     m_blockLength  = newBlockLength;
395 }
396
397 // seek to position in BGZF file
398 void BgzfStream::Seek(const int64_t& position) {
399
400     BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
401
402     // skip if device is not open
403     if ( !IsOpen() ) return;
404
405     // determine adjusted offset & address
406     int     blockOffset  = (position & 0xFFFF);
407     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
408
409     // attempt seek in file
410     if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
411
412         // update block data & return success
413         m_blockLength  = 0;
414         m_blockAddress = blockAddress;
415         m_blockOffset  = blockOffset;
416     }
417     else {
418         stringstream s("");
419         s << "unable to seek to position: " << position;
420         throw BamException("BgzfStream::Seek", s.str());
421     }
422 }
423
424 void BgzfStream::SetWriteCompressed(bool ok) {
425     m_isWriteCompressed = ok;
426 }
427
428 // get file position in BGZF file
429 int64_t BgzfStream::Tell(void) const {
430     if ( !IsOpen() )
431         return 0;
432     return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
433 }
434
435 // writes the supplied data into the BGZF buffer
436 size_t BgzfStream::Write(const char* data, const size_t dataLength) {
437
438     BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
439     BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
440                  "BgzfStream::Write() - trying to write to non-writable IO device");
441
442     // skip if file not open for writing
443     if ( !IsOpen() )
444         return 0;
445
446     // write blocks as needed til all data is written
447     size_t numBytesWritten = 0;
448     const char* input = data;
449     const size_t blockLength = Constants::BGZF_DEFAULT_BLOCK_SIZE;
450     while ( numBytesWritten < dataLength ) {
451
452         // copy data contents to uncompressed output buffer
453         unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
454         char* buffer = m_uncompressedBlock.Buffer;
455         memcpy(buffer + m_blockOffset, input, copyLength);
456
457         // update counter
458         m_blockOffset   += copyLength;
459         input           += copyLength;
460         numBytesWritten += copyLength;
461
462         // flush (& compress) output buffer when full
463         if ( m_blockOffset == static_cast<int32_t>(blockLength) )
464             FlushBlock();
465     }
466
467     // return actual number of bytes written
468     return numBytesWritten;
469 }