// BgzfStream_p.cpp (c) 2011 Derek Barnett
// Marth Lab, Department of Biology, Boston College
// ---------------------------------------------------------------------------
-// Last modified: 6 October 2011(DB)
+// Last modified: 11 October 2011(DB)
// ---------------------------------------------------------------------------
// Based on BGZF routines developed at the Broad Institute.
// Provides the basic functionality for reading & writing BGZF files
// Replaces the old BGZF.* files to avoid clashing with other toolkits
// ***************************************************************************
-#include <api/internal/BamException_p.h>
-#include <api/internal/BgzfStream_p.h>
+#include "api/BamAux.h"
+#include "api/BamConstants.h"
+#include "api/internal/BamDeviceFactory_p.h"
+#include "api/internal/BamException_p.h"
+#include "api/internal/BgzfStream_p.h"
using namespace BamTools;
using namespace BamTools::Internal;
+#include "zlib.h"
+
#include <cstring>
#include <algorithm>
+#include <iostream>
#include <sstream>
using namespace std;
// RaiiWrapper implementation
// ----------------------------
-BgzfStream::RaiiWrapper::RaiiWrapper(void)
- : Stream(0)
-{
+BgzfStream::RaiiWrapper::RaiiWrapper(void) {
CompressedBlock = new char[Constants::BGZF_MAX_BLOCK_SIZE];
UncompressedBlock = new char[Constants::BGZF_DEFAULT_BLOCK_SIZE];
}
delete[] UncompressedBlock;
CompressedBlock = 0;
UncompressedBlock = 0;
-
- if ( Stream ) {
- fflush(Stream);
- fclose(Stream);
- Stream = 0;
- }
}
// ---------------------------
// constructor
BgzfStream::BgzfStream(void)
- : BlockLength(0)
- , BlockOffset(0)
- , BlockAddress(0)
- , IsOpen(false)
- , IsWriteOnly(false)
- , IsWriteCompressed(true)
+ : m_blockLength(0)
+ , m_blockOffset(0)
+ , m_blockAddress(0)
+ , m_isWriteCompressed(true)
+ , m_device(0)
{ }
// destructor
// closes BGZF file
void BgzfStream::Close(void) {
- // skip if file not open
- if ( !IsOpen )
- return;
+ // skip if no device open
+ if ( m_device == 0 ) return;
// if writing to file, flush the current BGZF block,
// then write an empty block (as EOF marker)
- if ( IsWriteOnly ) {
+ if ( m_device->IsOpen() && (m_device->Mode() == IBamIODevice::WriteOnly) ) {
FlushBlock();
const size_t blockLength = DeflateBlock();
- fwrite(Resources.CompressedBlock, 1, blockLength, Resources.Stream);
+ m_device->Write(Resources.CompressedBlock, blockLength);
}
- // flush and close stream
- fflush(Resources.Stream);
- fclose(Resources.Stream);
- Resources.Stream = 0;
-
- // reset initial state
- BlockLength = 0;
- BlockOffset = 0;
- BlockAddress = 0;
- IsOpen = false;
- IsWriteOnly = false;
- IsWriteCompressed = true;
+ // close device
+ m_device->Close();
+ delete m_device;
+ m_device = 0;
+
+ // reset state
+ m_blockLength = 0;
+ m_blockOffset = 0;
+ m_blockAddress = 0;
+ m_isWriteCompressed = true;
}
// compresses the current block
buffer[14] = Constants::BGZF_LEN;
// set compression level
- const int compressionLevel = ( IsWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
+ const int compressionLevel = ( m_isWriteCompressed ? Z_DEFAULT_COMPRESSION : 0 );
// loop to retry for blocks that do not compress enough
- int inputLength = BlockOffset;
+ int inputLength = m_blockOffset;
size_t compressedLength = 0;
const unsigned int bufferSize = Constants::BGZF_MAX_BLOCK_SIZE;
deflateEnd(&zs);
- // if error status
- if ( status != Z_OK )
- throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
-
- // not enough space available in buffer
+ // there was not enough space available in buffer
// try to reduce the input length & re-start loop
- inputLength -= 1024;
- if ( inputLength <= 0 )
- throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
- continue;
+ if ( status == Z_OK ) {
+ inputLength -= 1024;
+ if ( inputLength < 0 )
+ throw BamException("BgzfStream::DeflateBlock", "input reduction failed");
+ continue;
+ }
+
+ throw BamException("BgzfStream::DeflateBlock", "zlib deflate failed");
}
// finalize the compression routine
BamTools::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
// ensure that we have less than a block of data left
- int remaining = BlockOffset - inputLength;
+ int remaining = m_blockOffset - inputLength;
if ( remaining > 0 ) {
if ( remaining > inputLength )
throw BamException("BgzfStream::DeflateBlock", "after deflate, remainder too large");
memcpy(Resources.UncompressedBlock, Resources.UncompressedBlock + inputLength, remaining);
}
- // update block data & return compressedlength
- BlockOffset = remaining;
+ // update block data
+ m_blockOffset = remaining;
+
+ // return result
return compressedLength;
}
// flushes the data in the BGZF block
void BgzfStream::FlushBlock(void) {
+ BT_ASSERT_X( m_device, "BgzfStream::FlushBlock() - attempting to flush to null device" );
+
// flush all of the remaining blocks
- while ( BlockOffset > 0 ) {
+ while ( m_blockOffset > 0 ) {
// compress the data block
const size_t blockLength = DeflateBlock();
- // flush the data to our output stream
- const size_t numBytesWritten = fwrite(Resources.CompressedBlock, 1, blockLength, Resources.Stream);
+ // flush the data to our output device
+ const size_t numBytesWritten = m_device->Write(Resources.CompressedBlock, blockLength);
if ( numBytesWritten != blockLength ) {
stringstream s("");
s << "expected to write " << blockLength
}
// update block data
- BlockAddress += blockLength;
+ m_blockAddress += blockLength;
}
}
return zs.total_out;
}
-// opens the BGZF file for reading (mode is either "rb" for reading, or "wb" for writing)
-void BgzfStream::Open(const string& filename, const char* mode) {
-
- // make sure we're starting with fresh state
- if ( IsOpen )
- Close();
-
- // determine open mode
- if ( strcmp(mode, "rb") == 0 )
- IsWriteOnly = false;
- else if ( strcmp(mode, "wb") == 0)
- IsWriteOnly = true;
- else {
- const string message = string("unknown file mode: ") + mode;
- throw BamException("BgzfStream::Open", message);
- }
+bool BgzfStream::IsOpen(void) const {
+ if ( m_device == 0 )
+ return false;
+ return m_device->IsOpen();
+}
- // open BGZF stream on a file
- if ( (filename != "stdin") && (filename != "stdout") && (filename != "-"))
- Resources.Stream = fopen(filename.c_str(), mode);
+void BgzfStream::Open(const string& filename, const IBamIODevice::OpenMode mode) {
- // open BGZF stream on stdin
- else if ( (filename == "stdin" || filename == "-") && (strcmp(mode, "rb") == 0 ) )
- Resources.Stream = freopen(NULL, mode, stdin);
+ // close current device if necessary
+ Close();
+ BT_ASSERT_X( (m_device == 0), "BgzfStream::Open() - unable to properly close previous IO device" );
- // open BGZF stream on stdout
- else if ( (filename == "stdout" || filename == "-") && (strcmp(mode, "wb") == 0) )
- Resources.Stream = freopen(NULL, mode, stdout);
+ // retrieve new IO device depending on filename
+ m_device = BamDeviceFactory::CreateDevice(filename);
+ BT_ASSERT_X( m_device, "BgzfStream::Open() - unable to create IO device from filename" );
- // ensure valid Stream
- if ( !Resources.Stream ) {
- const string message = string("unable to open file: ") + filename;
+ // if device fails to open
+ if ( !m_device->Open(mode) ) {
+ const string deviceError = m_device->GetErrorString();
+ const string message = string("could not open BGZF stream: \n\t") + deviceError;
throw BamException("BgzfStream::Open", message);
}
-
- // set flag
- IsOpen = true;
}
// reads BGZF data into a byte buffer
size_t BgzfStream::Read(char* data, const size_t dataLength) {
- // if stream not open for reading (or empty request)
- if ( !IsOpen || IsWriteOnly || dataLength == 0 )
+ if ( dataLength == 0 )
+ return 0;
+
+ // if stream not open for reading
+ BT_ASSERT_X( m_device, "BgzfStream::Read() - trying to read from null device");
+ if ( !m_device->IsOpen() || (m_device->Mode() != IBamIODevice::ReadOnly) )
return 0;
// read blocks as needed until desired data length is retrieved
+ char* output = data;
size_t numBytesRead = 0;
while ( numBytesRead < dataLength ) {
// determine bytes available in current block
- int bytesAvailable = BlockLength - BlockOffset;
+ int bytesAvailable = m_blockLength - m_blockOffset;
// read (and decompress) next block if needed
if ( bytesAvailable <= 0 ) {
ReadBlock();
- bytesAvailable = BlockLength - BlockOffset;
+ bytesAvailable = m_blockLength - m_blockOffset;
if ( bytesAvailable <= 0 )
break;
}
// copy data from uncompressed source buffer into data destination buffer
const size_t copyLength = min( (dataLength-numBytesRead), (size_t)bytesAvailable );
- memcpy(data, Resources.UncompressedBlock + BlockOffset, copyLength);
+ memcpy(output, Resources.UncompressedBlock + m_blockOffset, copyLength);
// update counters
- BlockOffset += copyLength;
- data += copyLength;
- numBytesRead += copyLength;
+ m_blockOffset += copyLength;
+ output += copyLength;
+ numBytesRead += copyLength;
}
// update block data
- if ( BlockOffset == BlockLength ) {
- BlockAddress = ftell64(Resources.Stream);
- BlockOffset = 0;
- BlockLength = 0;
+ if ( m_blockOffset == m_blockLength ) {
+ m_blockAddress = m_device->Tell();
+ m_blockOffset = 0;
+ m_blockLength = 0;
+
}
// return actual number of bytes read
// reads a BGZF block
void BgzfStream::ReadBlock(void) {
- // store block start
- int64_t blockAddress = ftell64(Resources.Stream);
+ BT_ASSERT_X( m_device, "BgzfStream::ReadBlock() - trying to read from null IO device");
+
+ // store block's starting address
+ int64_t blockAddress = m_device->Tell();
// read block header from file
char header[Constants::BGZF_BLOCK_HEADER_LENGTH];
- size_t count = fread(header, 1, Constants::BGZF_BLOCK_HEADER_LENGTH, Resources.Stream);
+ size_t numBytesRead = m_device->Read(header, Constants::BGZF_BLOCK_HEADER_LENGTH);
- // if block header empty, set marker & skip rest of method
- if ( count == 0 ) {
- BlockLength = 0;
+ // if block header empty
+ if ( numBytesRead == 0 ) {
+ m_blockLength = 0;
return;
}
// if block header invalid size
- if ( count != sizeof(header) )
+ if ( numBytesRead != Constants::BGZF_BLOCK_HEADER_LENGTH )
throw BamException("BgzfStream::ReadBlock", "invalid block header size");
// validate block header contents
// read remainder of block
const size_t remaining = blockLength - Constants::BGZF_BLOCK_HEADER_LENGTH;
- count = fread(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], 1, remaining, Resources.Stream);
- if ( count != remaining )
+ numBytesRead = m_device->Read(&Resources.CompressedBlock[Constants::BGZF_BLOCK_HEADER_LENGTH], remaining);
+ if ( numBytesRead != remaining )
throw BamException("BgzfStream::ReadBlock", "could not read data from block");
// decompress block data
- count = InflateBlock(blockLength);
+ numBytesRead = InflateBlock(blockLength);
- // update block metadata
- if ( BlockLength != 0 )
- BlockOffset = 0;
- BlockAddress = blockAddress;
- BlockLength = count;
+ // update block data
+ if ( m_blockLength != 0 )
+ m_blockOffset = 0;
+ m_blockAddress = blockAddress;
+ m_blockLength = numBytesRead;
}
// seek to position in BGZF file
void BgzfStream::Seek(const int64_t& position) {
+ BT_ASSERT_X( m_device, "BgzfStream::Seek() - trying to seek on null IO device");
+
+ // skip if device is not open
+ if ( !IsOpen() ) return;
+
// determine adjusted offset & address
int blockOffset = (position & 0xFFFF);
int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
// attempt seek in file
- if ( fseek64(Resources.Stream, blockAddress, SEEK_SET) != 0 ) {
+ if ( m_device->IsRandomAccess() && m_device->Seek(blockAddress) ) {
+
+ // update block data & return success
+ m_blockLength = 0;
+ m_blockAddress = blockAddress;
+ m_blockOffset = blockOffset;
+ }
+ else {
stringstream s("");
s << "unable to seek to position: " << position;
throw BamException("BgzfStream::Seek", s.str());
}
-
- // if successful, update block metadata
- BlockLength = 0;
- BlockAddress = blockAddress;
- BlockOffset = blockOffset;
}
void BgzfStream::SetWriteCompressed(bool ok) {
- IsWriteCompressed = ok;
+ m_isWriteCompressed = ok;
}
// get file position in BGZF file
int64_t BgzfStream::Tell(void) const {
- if ( !IsOpen )
+ if ( !IsOpen() )
return 0;
- return ( (BlockAddress << 16) | (BlockOffset & 0xFFFF) );
+ return ( (m_blockAddress << 16) | (m_blockOffset & 0xFFFF) );
}
// writes the supplied data into the BGZF buffer
size_t BgzfStream::Write(const char* data, const size_t dataLength) {
+ BT_ASSERT_X( m_device, "BgzfStream::Write() - trying to write to null IO device");
+ BT_ASSERT_X( (m_device->Mode() == IBamIODevice::WriteOnly),
+ "BgzfStream::Write() - trying to write to non-writable IO device");
+
// skip if file not open for writing
- if ( !IsOpen || !IsWriteOnly )
- return false;
+ if ( !IsOpen() )
+ return 0;
// write blocks as needed til all data is written
size_t numBytesWritten = 0;
while ( numBytesWritten < dataLength ) {
// copy data contents to uncompressed output buffer
- const size_t copyLength = min(blockLength - BlockOffset, dataLength - numBytesWritten);
+ unsigned int copyLength = min(blockLength - m_blockOffset, dataLength - numBytesWritten);
char* buffer = Resources.UncompressedBlock;
- memcpy(buffer + BlockOffset, input, copyLength);
+ memcpy(buffer + m_blockOffset, input, copyLength);
- // update counters
- BlockOffset += copyLength;
+ // update counter
+ m_blockOffset += copyLength;
input += copyLength;
numBytesWritten += copyLength;
// flush (& compress) output buffer when full
- if ( BlockOffset == blockLength )
+ if ( m_blockOffset == blockLength )
FlushBlock();
}