]> git.donarmstrong.com Git - bamtools.git/blob - src/api/internal/BgzfStream_p.cpp
Basic internal implementation of BamFile & BamPipe
[bamtools.git] / src / api / internal / BgzfStream_p.cpp
1 // ***************************************************************************
2 // BgzfStream_p.cpp (c) 2011 Derek Barnett
3 // Marth Lab, Department of Biology, Boston College
4 // All rights reserved.
5 // ---------------------------------------------------------------------------
6 // Last modified: 9 September 2011(DB)
7 // ---------------------------------------------------------------------------
8 // Based on BGZF routines developed at the Broad Institute.
9 // Provides the basic functionality for reading & writing BGZF files
10 // Replaces the old BGZF.* files to avoid clashing with other toolkits
11 // ***************************************************************************
12
13 #include <api/internal/BamDeviceFactory_p.h>
14 #include <api/internal/BgzfStream_p.h>
15 using namespace BamTools;
16 using namespace BamTools::Internal;
17
18 #include <cstring>
19 #include <algorithm>
20 #include <iostream>
21 using namespace std;
22
23 // constructor
24 BgzfStream::BgzfStream(void)
25     : m_uncompressedBlockSize(Constants::BGZF_DEFAULT_BLOCK_SIZE)
26     , m_compressedBlockSize(Constants::BGZF_MAX_BLOCK_SIZE)
27     , m_blockLength(0)
28     , m_blockOffset(0)
29     , m_blockAddress(0)
30     , m_uncompressedBlock(NULL)
31     , m_compressedBlock(NULL)
32     , m_isOpen(false)
33     , m_isWriteOnly(false)
34     , m_isWriteCompressed(true)
35     , m_device(0)
36     , m_stream(NULL)
37 {
38     try {
39         m_compressedBlock   = new char[m_compressedBlockSize];
40         m_uncompressedBlock = new char[m_uncompressedBlockSize];
41     } catch( std::bad_alloc& ba ) {
42         fprintf(stderr, "BgzfStream ERROR: unable to allocate memory\n");
43         exit(1);
44     }
45 }
46
47 // destructor
48 BgzfStream::~BgzfStream(void) {
49     if( m_compressedBlock   ) delete[] m_compressedBlock;
50     if( m_uncompressedBlock ) delete[] m_uncompressedBlock;
51 }
52
53 // closes BGZF file
54 void BgzfStream::Close(void) {
55
56     // skip if no device open
57     if ( m_device == 0 )
58         return;
59
60     // if writing to file, flush the current BGZF block,
61     // then write an empty block (as EOF marker)
62     if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
63         FlushBlock();
64         int blockLength = DeflateBlock();
65         m_device->Write(m_compressedBlock, blockLength);
66     }
67
68     // close device
69     m_device->Close();
70
71     // clean up & reset flags
72     delete m_device;
73     m_device = 0;
74     m_isWriteCompressed = true;
75     m_isOpen = false;
76 }
77
78 // compresses the current block
79 unsigned int BgzfStream::DeflateBlock(void) {
80
81     // initialize the gzip header
82     char* buffer = m_compressedBlock;
83     memset(buffer, 0, 18);
84     buffer[0]  = Constants::GZIP_ID1;
85     buffer[1]  = (char)Constants::GZIP_ID2;
86     buffer[2]  = Constants::CM_DEFLATE;
87     buffer[3]  = Constants::FLG_FEXTRA;
88     buffer[9]  = (char)Constants::OS_UNKNOWN;
89     buffer[10] = Constants::BGZF_XLEN;
90     buffer[12] = Constants::BGZF_ID1;
91     buffer[13] = Constants::BGZF_ID2;
92     buffer[14] = Constants::BGZF_LEN;
93
94     // set compression level
95     const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
96
97     // loop to retry for blocks that do not compress enough
98     int inputLength = m_blockOffset;
99     unsigned int compressedLength = 0;
100     unsigned int bufferSize = m_compressedBlockSize;
101
102     while ( true ) {
103
104         // initialize zstream values
105         z_stream zs;
106         zs.zalloc    = NULL;
107         zs.zfree     = NULL;
108         zs.next_in   = (Bytef*)m_uncompressedBlock;
109         zs.avail_in  = inputLength;
110         zs.next_out  = (Bytef*)&buffer[Constants::BGZF_BLOCK_HEADER_LENGTH];
111         zs.avail_out = bufferSize - Constants::BGZF_BLOCK_HEADER_LENGTH - Constants::BGZF_BLOCK_FOOTER_LENGTH;
112
113         // initialize the zlib compression algorithm
114         if ( deflateInit2(&zs,
115                           compressionLevel,
116                           Z_DEFLATED,
117                           Constants::GZIP_WINDOW_BITS,
118                           Constants::Z_DEFAULT_MEM_LEVEL,
119                           Z_DEFAULT_STRATEGY) != Z_OK )
120         {
121             fprintf(stderr, "BgzfStream ERROR: zlib deflate initialization failed\n");
122             exit(1);
123         }
124
125         // compress the data
126         int status = deflate(&zs, Z_FINISH);
127         if ( status != Z_STREAM_END ) {
128
129             deflateEnd(&zs);
130
131             // reduce the input length and try again
132             if ( status == Z_OK ) {
133                 inputLength -= 1024;
134                 if ( inputLength < 0 ) {
135                     fprintf(stderr, "BgzfStream ERROR: input reduction failed\n");
136                     exit(1);
137                 }
138                 continue;
139             }
140
141             fprintf(stderr, "BgzfStream ERROR: zlib::deflateEnd() failed\n");
142             exit(1);
143         }
144
145         // finalize the compression routine
146         if ( deflateEnd(&zs) != Z_OK ) {
147             fprintf(stderr, "BgzfStream ERROR: zlib::deflateEnd() failed\n");
148             exit(1);
149         }
150
151         compressedLength = zs.total_out;
152         compressedLength += Constants::BGZF_BLOCK_HEADER_LENGTH + Constants::BGZF_BLOCK_FOOTER_LENGTH;
153         if ( compressedLength > Constants::BGZF_MAX_BLOCK_SIZE ) {
154             fprintf(stderr, "BgzfStream ERROR: deflate overflow\n");
155             exit(1);
156         }
157
158         break;
159     }
160
161     // store the compressed length
162     BamTools::PackUnsignedShort(&buffer[16], (unsigned short)(compressedLength - 1));
163
164     // store the CRC32 checksum
165     unsigned int crc = crc32(0, NULL, 0);
166     crc = crc32(crc, (Bytef*)m_uncompressedBlock, inputLength);
167     BamTools::PackUnsignedInt(&buffer[compressedLength - 8], crc);
168     BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
169
170     // ensure that we have less than a block of data left
171     int remaining = m_blockOffset - inputLength;
172     if ( remaining > 0 ) {
173         if ( remaining > inputLength ) {
174             fprintf(stderr, "BgzfStream ERROR: after deflate, remainder too large\n");
175             exit(1);
176         }
177         memcpy(m_uncompressedBlock, m_uncompressedBlock + inputLength, remaining);
178     }
179
180     // update block data
181     m_blockOffset = remaining;
182
183     // return result
184     return compressedLength;
185 }
186
187 // flushes the data in the BGZF block
188 void BgzfStream::FlushBlock(void) {
189
190     BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
191
192     // flush all of the remaining blocks
193     while ( m_blockOffset > 0 ) {
194
195         // compress the data block
196         unsigned int blockLength = DeflateBlock();
197
198         // flush the data to our output stream
199         unsigned int numBytesWritten = m_device->Write(m_compressedBlock, blockLength);
200         if ( numBytesWritten != blockLength ) {
201             fprintf(stderr, "BgzfStream ERROR: expected to write %u bytes during flushing, but wrote %u bytes\n",
202                     blockLength, numBytesWritten);
203             exit(1);
204         }
205
206         // update block data
207         m_blockAddress += blockLength;
208     }
209 }
210
211 // decompresses the current block
212 int BgzfStream::InflateBlock(const int& blockLength) {
213
214     // inflate the data from compressed buffer into uncompressed buffer
215     z_stream zs;
216     zs.zalloc    = NULL;
217     zs.zfree     = NULL;
218     zs.next_in   = (Bytef*)m_compressedBlock + 18;
219     zs.avail_in  = blockLength - 16;
220     zs.next_out  = (Bytef*)m_uncompressedBlock;
221     zs.avail_out = m_uncompressedBlockSize;
222
223     int status = inflateInit2(&zs, Constants::GZIP_WINDOW_BITS);
224     if ( status != Z_OK ) {
225         fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflateInit() failed\n");
226         return -1;
227     }
228
229     status = inflate(&zs, Z_FINISH);
230     if ( status != Z_STREAM_END ) {
231         inflateEnd(&zs);
232         fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflate() failed\n");
233         return -1;
234     }
235
236     status = inflateEnd(&zs);
237     if ( status != Z_OK ) {
238         fprintf(stderr, "BgzfStream ERROR: could not decompress block - zlib::inflateEnd() failed\n");
239         return -1;
240     }
241
242     // return result
243     return zs.total_out;
244 }
245
246 bool BgzfStream::IsOpen(void) const {
247     if ( m_device == 0 )
248         return false;
249     return m_device->IsOpen();
250 }
251
252 bool BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
253
254     // close current device if necessary
255     Close();
256
257     // sanity check
258     BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
259
260     // retrieve new IO device depending on filename
261     m_device = BamDeviceFactory::CreateDevice(filename);
262
263     // sanity check
264     BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
265
266     // if device fails to open
267     if ( !m_device->Open(mode) ) {
268         cerr << "BgzfStream::Open() - unable to open IO device:" << endl;
269         cerr << m_device->ErrorString();
270         return false;
271     }
272
273     // otherwise, set flag & return true
274     m_isOpen = true;
275     m_isWriteOnly = ( mode == IBamIODevice::WriteOnly );
276     return true;
277
278 }
279
280 // opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing)
281 bool BgzfStream::Open(const string& filename, const char* mode) {
282
283     // close current stream, if necessary, before opening next
284     if ( m_isOpen ) Close();
285
286     // determine open mode
287     if ( strcmp(mode, "rb") == 0 )
288         m_isWriteOnly = false;
289     else if ( strcmp(mode, "wb") == 0)
290         m_isWriteOnly = true;
291     else {
292         fprintf(stderr, "BgzfStream ERROR: unknown file mode: %s\n", mode);
293         return false;
294     }
295
296     // open BGZF stream on a file
297     if ( (filename != "stdin") && (filename != "stdout") && (filename != "-"))
298         m_stream = fopen(filename.c_str(), mode);
299
300     // open BGZF stream on stdin
301     else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) )
302         m_stream = freopen(NULL, mode, stdin);
303
304     // open BGZF stream on stdout
305     else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) )
306         m_stream = freopen(NULL, mode, stdout);
307
308     if ( !m_stream ) {
309         fprintf(stderr, "BgzfStream ERROR: unable to open file %s\n", filename.c_str() );
310         return false;
311     }
312
313     // set flag & return success
314     m_isOpen = true;
315     return true;
316 }
317
318 // reads BGZF data into a byte buffer
319 unsigned int BgzfStream::Read(char* data, const unsigned int dataLength) {
320
321     if ( dataLength == 0 )
322         return 0;
323
324     // if stream not open for reading
325     BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
326     if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
327         return 0;
328
329     // read blocks as needed until desired data length is retrieved
330     char* output = data;
331     unsigned int numBytesRead = 0;
332     while ( numBytesRead < dataLength ) {
333
334         // determine bytes available in current block
335         int bytesAvailable = m_blockLength - m_blockOffset;
336
337         // read (and decompress) next block if needed
338         if ( bytesAvailable <= 0 ) {
339             if ( !ReadBlock() ) return -1;
340             bytesAvailable = m_blockLength - m_blockOffset;
341             if ( bytesAvailable <= 0 ) break;
342         }
343
344         // copy data from uncompressed source buffer into data destination buffer
345         char* buffer   = m_uncompressedBlock;
346         int copyLength = min( (int)(dataLength-numBytesRead), bytesAvailable );
347         memcpy(output, buffer + m_blockOffset, copyLength);
348
349         // update counters
350         m_blockOffset += copyLength;
351         output        += copyLength;
352         numBytesRead  += copyLength;
353     }
354
355     // update block data
356     if ( m_blockOffset == m_blockLength ) {
357         m_blockAddress = m_device->Tell();
358         m_blockOffset  = 0;
359         m_blockLength  = 0;
360     }
361
362     return numBytesRead;
363 }
364
365 // reads a BGZF block
366 bool BgzfStream::ReadBlock(void) {
367
368     BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
369
370     // store block's starting address
371     int64_t blockAddress = m_device->Tell();
372
373     // read block header from file
374     char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
375     int numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
376
377     // if block header empty
378     if ( numBytesRead == 0 ) {
379         m_blockLength = 0;
380         return true;
381     }
382
383     // if block header invalid size
384     if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH ) {
385         fprintf(stderr, "BgzfStream ERROR: read block failed - could not read block header\n");
386         return false;
387     }
388
389     // validate block header contents
390     if ( !BgzfStream::CheckBlockHeader(header) ) {
391         fprintf(stderr, "BgzfStream ERROR: read block failed - invalid block header\n");
392         return false;
393     }
394
395     // copy header contents to compressed buffer
396     int blockLength = BamTools::UnpackUnsignedShort(&header[16]) + 1;
397     char* compressedBlock = m_compressedBlock;
398     memcpy(compressedBlock, header, Constants::BGZF_BLOCK_HEADER_LENGTH);
399     int remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
400
401     // read remainder of block
402     numBytesRead = m_device->Read(&compressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
403     if ( numBytesRead != remaining ) {
404         fprintf(stderr, "BgzfStream ERROR: read block failed - could not read data from block\n");
405         return false;
406     }
407
408     // decompress block data
409     numBytesRead = InflateBlock(blockLength);
410     if ( numBytesRead < 0 ) {
411         fprintf(stderr, "BgzfStream ERROR: read block failed - could not decompress block data\n");
412         return false;
413     }
414
415     // update block data
416     if ( m_blockLength != 0 )
417         m_blockOffset = 0;
418     m_blockAddress = blockAddress;
419     m_blockLength  = numBytesRead;
420
421     // return success
422     return true;
423 }
424
425 // seek to position in BGZF file
426 bool BgzfStream::Seek(const int64_t& position) {
427
428     BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
429
430     // skip if not open or not seek-able
431     if ( !IsOpen() || !m_device->IsRandomAccess() )
432         return false;
433
434     // determine adjusted offset & address
435     int     blockOffset  = (position & 0xFFFF);
436     int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
437
438     // attempt seek in file
439     if ( !m_device->Seek(blockAddress) ) {
440         fprintf(stderr, "BgzfStream ERROR: unable to seek in file\n");
441         return false;
442     }
443
444     // update block data & return success
445     m_blockLength  = 0;
446     m_blockAddress = blockAddress;
447     m_blockOffset  = blockOffset;
448     return true;
449 }
450
451 void BgzfStream::SetWriteCompressed(bool ok) {
452     m_isWriteCompressed = ok;
453 }
454
455 // get file position in BGZF file
456 int64_t BgzfStream::Tell(void) const {
457     if ( !m_isOpen ) return 0;
458     return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
459 }
460
461 // writes the supplied data into the BGZF buffer
462 unsigned int BgzfStream::Write(const char* data, const unsigned int dataLength) {
463
464     BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
465     BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
466                  "BgzfStream::Write() - trying to write to non-writable IO device");
467
468     // write blocks as needed til all data is written
469     unsigned int numBytesWritten = 0;
470     const char* input = data;
471     unsigned int blockLength = m_uncompressedBlockSize;
472     while ( numBytesWritten < dataLength ) {
473
474         // copy data contents to uncompressed output buffer
475         unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
476         char* buffer = m_uncompressedBlock;
477         memcpy(buffer + m_blockOffset, input, copyLength);
478
479         // update counter
480         m_blockOffset   += copyLength;
481         input           += copyLength;
482         numBytesWritten += copyLength;
483
484         // flush (& compress) output buffer when full
485         if ( m_blockOffset == blockLength )
486             FlushBlock();
487     }
488
489     // return result
490     return numBytesWritten;
491 }