From 8e8ed0f0d520fa4d240ab98bd7cc0b0933777fdb Mon Sep 17 00:00:00 2001 From: derek Date: Wed, 9 Nov 2011 17:34:48 -0500 Subject: [PATCH] Added FTP support (text-tested, not BAM) --- src/api/internal/io/BamFtp_p.cpp | 446 +++++++++++++++++- src/api/internal/io/BamFtp_p.h | 34 ++ src/api/internal/io/BamHttp_p.cpp | 50 +- src/api/internal/io/BamHttp_p.h | 2 +- src/api/internal/io/ByteArray_p.cpp | 6 +- src/api/internal/io/HostAddress_p.cpp | 13 +- src/api/internal/io/HostInfo_p.cpp | 10 +- src/api/internal/io/HttpHeader_p.cpp | 385 +++++++++++++++ src/api/internal/io/HttpHeader_p.h | 112 +++++ src/api/internal/io/NetUnix_p.h | 26 + src/api/internal/io/NetWin_p.h | 39 ++ src/api/internal/io/RollingBuffer_p.cpp | 335 +++++++++++++ src/api/internal/io/RollingBuffer_p.h | 50 ++ src/api/internal/io/TcpSocketEngine_win_p.cpp | 303 ++++++++++++ src/api/internal/io/TcpSocket_p.cpp | 144 +++++- src/api/internal/io/TcpSocket_p.h | 4 +- 16 files changed, 1890 insertions(+), 69 deletions(-) create mode 100644 src/api/internal/io/HttpHeader_p.cpp create mode 100644 src/api/internal/io/HttpHeader_p.h create mode 100644 src/api/internal/io/NetUnix_p.h create mode 100644 src/api/internal/io/NetWin_p.h create mode 100644 src/api/internal/io/RollingBuffer_p.cpp create mode 100644 src/api/internal/io/RollingBuffer_p.h create mode 100644 src/api/internal/io/TcpSocketEngine_win_p.cpp diff --git a/src/api/internal/io/BamFtp_p.cpp b/src/api/internal/io/BamFtp_p.cpp index 10181cb..31195bd 100644 --- a/src/api/internal/io/BamFtp_p.cpp +++ b/src/api/internal/io/BamFtp_p.cpp @@ -2,27 +2,243 @@ // BamFtp_p.cpp (c) 2011 Derek Barnett // Marth Lab, Department of Biology, Boston College // --------------------------------------------------------------------------- -// Last modified: 25 October 2011 (DB) +// Last modified: 8 November 2011 (DB) // --------------------------------------------------------------------------- // Provides reading/writing of BAM files on FTP server // *************************************************************************** +#include "api/BamAux.h" #include "api/internal/io/BamFtp_p.h" +#include "api/internal/io/TcpSocket_p.h" using namespace BamTools; using namespace BamTools::Internal; +#include // debug + +#include +#include +#include +#include using namespace std; +namespace BamTools { +namespace Internal { + +// ----------- +// constants +// ----------- + +static const uint16_t FTP_PORT = 21; +static const string FTP_PREFIX = "ftp://"; +static const size_t FTP_PREFIX_LENGTH = 6; +static const string FTP_NEWLINE = "\r\n"; +static const string DEFAULT_USER = "anonymous"; +static const string DEFAULT_PASS = "anonymous@"; +static const string ABOR_CMD = "ABOR"; +static const string USER_CMD = "USER"; +static const string PASS_CMD = "PASS"; +static const string PASV_CMD = "PASV"; +static const string REIN_CMD = "REIN"; +static const string REST_CMD = "REST"; +static const string RETR_CMD = "RETR"; +static const string TYPE_CMD = "TYPE"; +static const char COLON_CHAR = ':'; +static const char COMMA_CHAR = ','; +static const char DOT_CHAR = '.'; +static const char MINUS_CHAR = '-'; +static const char SLASH_CHAR = '/'; +static const char SPACE_CHAR = ' '; +static const char LEFT_PAREN_CHAR = '('; +static const char RIGHT_PAREN_CHAR = ')'; + +// ----------------- +// utility methods +// ----------------- + +static inline +vector split(const string& source, const char delim) { + + stringstream ss(source); + string field; + vector fields; + + while ( getline(ss, field, delim) ) + fields.push_back(field); + return fields; +} + +static inline +bool startsWith(const string& source, const string& pattern) { + return ( source.find(pattern) == 0 ); +} + +static inline +string toLower(const string& s) { + string out; + const size_t sSize = s.size(); + out.reserve(sSize); + for ( size_t i = 0; i < sSize; ++i ) + out[i] = tolower(s[i]); + return out; +} + +} // namespace Internal +} // namespace BamTools + +// ----------------------- +// BamFtp implementation +// ----------------------- + BamFtp::BamFtp(const string& url) : IBamIODevice() + , m_commandSocket(new TcpSocket) + , m_dataSocket(new TcpSocket) + , m_port(FTP_PORT) + , m_dataPort(0) + , m_username(DEFAULT_USER) + , m_password(DEFAULT_PASS) + , m_isUrlParsed(false) + , m_filePosition(-1) { - BT_ASSERT_X(false, "BamFtp not yet implemented"); + ParseUrl(url); } -BamFtp::~BamFtp(void) { } +BamFtp::~BamFtp(void) { + + // close connection & clean up + Close(); + if ( m_commandSocket ) + delete m_commandSocket; + if ( m_dataSocket ) + delete m_dataSocket; +} void BamFtp::Close(void) { - return ; + + // disconnect socket + m_commandSocket->DisconnectFromHost(); + m_dataSocket->DisconnectFromHost(); + + // reset state - necessary?? + m_isUrlParsed = false; + m_filePosition = -1; + m_username = DEFAULT_USER; + m_password = DEFAULT_PASS; + m_dataHostname.clear(); + m_dataPort = 0; +} + +bool BamFtp::ConnectCommandSocket(void) { + + BT_ASSERT_X(m_commandSocket, "null command socket?"); + + // connect to FTP server + if ( !m_commandSocket->ConnectToHost(m_hostname, m_port, m_mode) ) { + SetErrorString("BamFtp::ConnectCommandSocket", "could not connect to host"); + return false; + } + + // receive initial reply from host + if ( !ReceiveReply() ) { + Close(); + return false; + } + + // send USER command + string userCommand = USER_CMD + SPACE_CHAR + m_username + FTP_NEWLINE; + if ( !SendCommand(userCommand, true) ) { + Close(); + return false; + } + + // send PASS command + string passwordCommand = PASS_CMD + SPACE_CHAR + m_password + FTP_NEWLINE; + if ( !SendCommand(passwordCommand, true) ) { + Close(); + return false; + } + + // send TYPE command + string typeCommand = TYPE_CMD + SPACE_CHAR + "I" + FTP_NEWLINE; + if ( !SendCommand(typeCommand, true) ) { + Close(); + return false; + } + + // return success + return true; +} + +bool BamFtp::ConnectDataSocket(void) { + + // failure if can't connect to command socket first + if ( !m_commandSocket->IsConnected() ) { + if ( !ConnectCommandSocket() ) + return false; + } + + // make sure we're starting with a fresh data channel + if ( m_dataSocket->IsConnected() ) + m_dataSocket->DisconnectFromHost(); + + // send passive connection command + const string passiveCommand = PASV_CMD + FTP_NEWLINE; + if ( !SendCommand(passiveCommand, true) ) { + // TODO: set error string + return false; + } + + // retrieve passive connection port + if ( !ParsePassiveResponse() ) { + // TODO: set error string + return false; + } + + // set up restart command (tell server where to start fetching bytes from) + if ( m_filePosition >= 0 ) { + + stringstream fpStream(""); + fpStream << m_filePosition; + string restartCommand = REST_CMD + SPACE_CHAR + fpStream.str() + FTP_NEWLINE; + if ( !SendCommand(restartCommand, true) ) { + // TODO: set error string + return false; + } + } + + // main file retrieval request + string retrieveCommand = RETR_CMD + SPACE_CHAR + m_filename + FTP_NEWLINE; + if ( !SendCommand(retrieveCommand, false) ) { + // TODO: set error string + return false; + } + + // make data channel connection + if ( !m_dataSocket->ConnectToHost(m_dataHostname, m_dataPort) ) { + // TODO: set error string + return false; + } + + // fetch intial reply from server + if ( !ReceiveReply() ) { + // TODO: set error string + m_dataSocket->DisconnectFromHost(); + return false; + } + + // make sure we have reply code 150 (all good) + if ( !startsWith(m_response, "150") ) { + // TODO: set error string + m_dataSocket->DisconnectFromHost(); + return false; + } + + // return success + return true; +} + +bool BamFtp::IsOpen(void) const { + return IBamIODevice::IsOpen() && m_isUrlParsed; } bool BamFtp::IsRandomAccess(void) const { @@ -31,33 +247,243 @@ bool BamFtp::IsRandomAccess(void) const { bool BamFtp::Open(const IBamIODevice::OpenMode mode) { + // BamFtp only supports read-only access if ( mode != IBamIODevice::ReadOnly ) { SetErrorString("BamFtp::Open", "writing on this device is not supported"); return false; } + // initialize basic valid state + m_mode = mode; + m_filePosition = 0; + // attempt connection to command & data sockets + return ( ConnectCommandSocket() && ConnectDataSocket() ); +} + +bool BamFtp::ParsePassiveResponse(void) { + + // fail if empty + if ( m_response.empty() ) + return false; + + // find parentheses + const size_t leftParenFound = m_response.find(LEFT_PAREN_CHAR); + const size_t rightParenFound = m_response.find(RIGHT_PAREN_CHAR); + if ( leftParenFound == string::npos || rightParenFound == string::npos ) + return false; + + // grab everything between ( should be "h1,h2,h3,h4,p1,p2" ) + string::const_iterator responseBegin = m_response.begin(); + const string hostAndPort(responseBegin+leftParenFound+1, responseBegin+rightParenFound); + + // parse into string fields + vector fields = split(hostAndPort, COMMA_CHAR); + if ( fields.size() != 6 ) + return false; + + // fetch passive connection IP + m_dataHostname = fields[0] + DOT_CHAR + + fields[1] + DOT_CHAR + + fields[2] + DOT_CHAR + + fields[3]; + + // fetch passive connection port + const uint8_t portUpper = static_cast(atoi(fields[4].c_str())); + const uint8_t portLower = static_cast(atoi(fields[5].c_str())); + m_dataPort = ( portUpper<<8 ) + portLower; + + // return success return true; } +void BamFtp::ParseUrl(const string& url) { + + // clear flag to start + m_isUrlParsed = false; + + // make sure url starts with "ftp://", case-insensitive + string tempUrl(url); + toLower(tempUrl); + const size_t prefixFound = tempUrl.find(FTP_PREFIX); + if ( prefixFound == string::npos ) + return; + + // find end of host name portion (first '/' hit after the prefix) + const size_t firstSlashFound = tempUrl.find(SLASH_CHAR, FTP_PREFIX_LENGTH); + if ( firstSlashFound == string::npos ) { + ; // no slash found... no filename given along with host? + } + + // fetch hostname + string hostname = tempUrl.substr(FTP_PREFIX_LENGTH, (firstSlashFound - FTP_PREFIX_LENGTH)); + m_hostname = hostname; + m_port = FTP_PORT; + + // store remainder of URL as filename (must be non-empty) + string filename = tempUrl.substr(firstSlashFound); + if ( filename.empty() ) + return; + m_filename = filename; + + // set parsed OK flag + m_isUrlParsed = true; +} + int64_t BamFtp::Read(char* data, const unsigned int numBytes) { - (void)data; - (void)numBytes; - return 0; + + // if BamHttp not in a valid state + if ( !IsOpen() ) + return -1; + + // read until hit desired @numBytes + int64_t bytesReadSoFar = 0; + while ( bytesReadSoFar < numBytes ) { + + // calculate number of bytes we're going to try to read this iteration + const size_t remainingBytes = ( numBytes - bytesReadSoFar ); + + // if either disconnected somehow, or (more likely) we have seeked since last read + if ( !m_dataSocket->IsConnected() ) { + if ( !ConnectDataSocket() ) { + // TODO: set error string + return -1; + } + } + + // read bytes from data socket + const int64_t socketBytesRead = ReadDataSocket(data+bytesReadSoFar, remainingBytes); + if ( socketBytesRead < 0 ) + return -1; + bytesReadSoFar += socketBytesRead; + m_filePosition += socketBytesRead; + } + + // return actual number bytes successfully read + return bytesReadSoFar; +} + +int64_t BamFtp::ReadCommandSocket(char* data, const unsigned int maxNumBytes) { + + // try to read 'remainingBytes' from socket + const int64_t numBytesRead = m_commandSocket->Read(data, maxNumBytes); + if ( numBytesRead < 0 ) + return -1; + return numBytesRead; +} + +int64_t BamFtp::ReadDataSocket(char* data, const unsigned int maxNumBytes) { + + // try to read 'remainingBytes' from socket + const int64_t numBytesRead = m_dataSocket->Read(data, maxNumBytes); + if ( numBytesRead < 0 ) + return -1; + return numBytesRead; +} + +bool BamFtp::ReceiveReply(void) { + + // failure if not connected + if ( !m_commandSocket->IsConnected() ) { + SetErrorString("BamFtp::ReceiveReply()", "command socket not connected"); + return false; + } + + m_response.clear(); + + // read header data (& discard for now) + bool headerEnd = false; + while ( !headerEnd ) { + + const string headerLine = m_commandSocket->ReadLine(); + m_response += headerLine; + + // if line is of form 'xyz ', quit reading lines + if ( (headerLine.length() >= 4 ) && + isdigit(headerLine[0]) && + isdigit(headerLine[1]) && + isdigit(headerLine[2]) && + ( headerLine[3] != MINUS_CHAR ) + ) + { + headerEnd = true; + } + } + + // return success, depending on response + if ( m_response.empty() ) { + SetErrorString("BamFtp::ReceiveReply", "error reading server reply"); + return false; + } + return true; } bool BamFtp::Seek(const int64_t& position) { - (void)position; + + // if FTP device not in a valid state + if ( !IsOpen() ) { + // TODO: set error string + return false; + } + + // ---------------------- + // UGLY !! but works?? + // ---------------------- + // disconnect from server + m_dataSocket->DisconnectFromHost(); + m_commandSocket->DisconnectFromHost(); + + // update file position & return success + m_filePosition = position; + return true; +} + +bool BamFtp::SendCommand(const string& command, bool waitForReply) { + + // failure if not connected + if ( !m_commandSocket->IsConnected() ) { + SetErrorString("BamFtp::SendCommand", "command socket not connected"); + return false; + } + + // write command to 'command socket' + if ( WriteCommandSocket(command.c_str(), command.length()) == -1 ) { + SetErrorString("BamFtp::SendCommand", "error writing to socket"); + // get actual error from command socket?? + return false; + } + + // if we sent a command that receives a response + if ( waitForReply ) + return ReceiveReply(); + + // return success return true; } int64_t BamFtp::Tell(void) const { - return -1; + return ( IsOpen() ? m_filePosition : -1 ); } int64_t BamFtp::Write(const char* data, const unsigned int numBytes) { (void)data; (void)numBytes; BT_ASSERT_X(false, "BamFtp::Write : write-mode not supported on this device"); - return 0; + SetErrorString("BamFtp::Write", "write-mode not supported on this device"); + return -1; +} + +int64_t BamFtp::WriteCommandSocket(const char* data, const unsigned int numBytes) { + if ( !m_commandSocket->IsConnected() ) + return -1; + m_commandSocket->ClearBuffer(); + return m_commandSocket->Write(data, numBytes); +} + +int64_t BamFtp::WriteDataSocket(const char* data, const unsigned int numBytes) { + (void)data; + (void)numBytes; + BT_ASSERT_X(false, "BamFtp::WriteDataSocket: write-mode not supported on this device"); + SetErrorString("BamFtp::Write", "write-mode not supported on this device"); + return -1; } diff --git a/src/api/internal/io/BamFtp_p.h b/src/api/internal/io/BamFtp_p.h index d049a10..d9e02fb 100644 --- a/src/api/internal/io/BamFtp_p.h +++ b/src/api/internal/io/BamFtp_p.h @@ -26,6 +26,8 @@ namespace BamTools { namespace Internal { +class TcpSocket; + class BamFtp : public IBamIODevice { // ctor & dtor @@ -36,6 +38,7 @@ class BamFtp : public IBamIODevice { // IBamIODevice implementation public: void Close(void); + bool IsOpen(void) const; bool IsRandomAccess(void) const; bool Open(const IBamIODevice::OpenMode mode); int64_t Read(char* data, const unsigned int numBytes); @@ -45,9 +48,40 @@ class BamFtp : public IBamIODevice { // internal methods private: + bool ConnectCommandSocket(void); + bool ConnectDataSocket(void); + bool ParsePassiveResponse(void); + void ParseUrl(const std::string& url); + int64_t ReadCommandSocket(char* data, const unsigned int numBytes); + int64_t ReadDataSocket(char* data, const unsigned int numBytes); + bool ReceiveReply(void); + bool SendCommand(const std::string& command, bool waitForReply); + int64_t WriteCommandSocket(const char* data, const unsigned int numBytes); + int64_t WriteDataSocket(const char* data, const unsigned int numBytes); // data members private: + // our main socket + TcpSocket* m_commandSocket; + TcpSocket* m_dataSocket; + + // our connection data + std::string m_hostname; + uint16_t m_port; + std::string m_dataHostname; + uint16_t m_dataPort; + std::string m_filename; + + std::string m_username; + std::string m_password; + + std::string m_response; + + // internal state flags + bool m_isUrlParsed; + + // file position + int64_t m_filePosition; }; } // namespace Internal diff --git a/src/api/internal/io/BamHttp_p.cpp b/src/api/internal/io/BamHttp_p.cpp index 04fb8e2..d889db4 100644 --- a/src/api/internal/io/BamHttp_p.cpp +++ b/src/api/internal/io/BamHttp_p.cpp @@ -2,7 +2,7 @@ // BamHttp_p.cpp (c) 2011 Derek Barnett // Marth Lab, Department of Biology, Boston College // --------------------------------------------------------------------------- -// Last modified: 7 November 2011 (DB) +// Last modified: 8 November 2011 (DB) // --------------------------------------------------------------------------- // Provides reading/writing of BAM files on HTTP server // *************************************************************************** @@ -37,6 +37,11 @@ static const char SLASH_CHAR = '/'; // utility methods // ----------------- +static inline +bool endsWith(const string& source, const string& pattern) { + return ( source.find(pattern) == (source.length() - pattern.length()) ); +} + static inline string toLower(const string& s) { string out; @@ -161,6 +166,9 @@ bool BamHttp::Open(const IBamIODevice::OpenMode mode) { void BamHttp::ParseUrl(const string& url) { + // clear flag to start + m_isUrlParsed = false; + // make sure url starts with "http://", case-insensitive string tempUrl(url); toLower(tempUrl); @@ -209,13 +217,14 @@ int64_t BamHttp::Read(char* data, const unsigned int numBytes) { // if socket has access to entire file contents // i.e. we received response with full data (status code == 200) - if ( !m_endRangeFilePosition >= 0 ) { + if ( m_endRangeFilePosition < 0 ) { // try to read 'remainingBytes' from socket const int64_t socketBytesRead = ReadFromSocket(data+bytesReadSoFar, remainingBytes); if ( socketBytesRead < 0 ) return -1; bytesReadSoFar += socketBytesRead; + m_filePosition += socketBytesRead; } // socket has access to a range of data (might already be in buffer) @@ -232,6 +241,7 @@ int64_t BamHttp::Read(char* data, const unsigned int numBytes) { if ( socketBytesRead < 0 ) return -1; bytesReadSoFar += socketBytesRead; + m_filePosition += socketBytesRead; } // otherwise, this is a 1st-time read OR we already read everything from the last GET request @@ -256,7 +266,6 @@ int64_t BamHttp::ReadFromSocket(char* data, const unsigned int maxNumBytes) { const int64_t numBytesRead = m_socket->Read(data, maxNumBytes); if ( numBytesRead < 0 ) return -1; - m_filePosition += numBytesRead; return numBytesRead; } @@ -270,21 +279,17 @@ bool BamHttp::ReceiveResponse(void) { if ( !EnsureSocketConnection() ) return false; - // read response header from socket - RaiiBuffer header(0x10000); - size_t l = 0; - while ( m_socket->Read(header.Buffer + l, 1) >= 0 ) { - if ( header.Buffer[l] == '\n' && l >= 3 ) { - if (strncmp(header.Buffer + l - 3, "\r\n\r\n", 4) == 0) - break; - } - ++l; - } + // fetch header, up until double new line string responseHeader; - responseHeader.resize(l+1); - for ( size_t i = 0; i < l; ++i ) - responseHeader[i] = header.Buffer[i]; + static const string doubleNewLine = "\n\n"; + do { + // read line & append to full header + const string headerLine = m_socket->ReadLine(); + responseHeader += headerLine; + + } while ( !endsWith(responseHeader, doubleNewLine) ); + // sanity check if ( responseHeader.empty() ) { // TODO: set error string Close(); @@ -339,7 +344,7 @@ bool BamHttp::Seek(const int64_t& position) { // discard socket's buffer contents, update positions, & return success m_socket->ClearBuffer(); m_filePosition = position; - m_endRangeFilePosition = -1; + m_endRangeFilePosition = position; return true; } @@ -351,8 +356,8 @@ bool BamHttp::SendRequest(const size_t numBytes) { // create range string m_endRangeFilePosition = m_filePosition + numBytes; - stringstream range("bytes="); - range << m_filePosition << "-" << m_endRangeFilePosition; + stringstream range(""); + range << "bytes=" << m_filePosition << "-" << m_endRangeFilePosition; // make sure we're connected if ( !EnsureSocketConnection() ) @@ -377,12 +382,13 @@ int64_t BamHttp::Write(const char* data, const unsigned int numBytes) { (void)data; (void)numBytes; BT_ASSERT_X(false, "BamHttp::Write : write-mode not supported on this device"); - return 0; + SetErrorString("BamHttp::Write", "write-mode not supported on this device"); + return -1; } int64_t BamHttp::WriteToSocket(const char* data, const unsigned int numBytes) { - if ( !EnsureSocketConnection() ) - return false; + if ( !m_socket->IsConnected() ) + return -1; m_socket->ClearBuffer(); return m_socket->Write(data, numBytes); } diff --git a/src/api/internal/io/BamHttp_p.h b/src/api/internal/io/BamHttp_p.h index e48693e..c3d9502 100644 --- a/src/api/internal/io/BamHttp_p.h +++ b/src/api/internal/io/BamHttp_p.h @@ -40,7 +40,7 @@ class BamHttp : public IBamIODevice { // IBamIODevice implementation public: void Close(void); - bool IsOpen(void) const ; + bool IsOpen(void) const; bool IsRandomAccess(void) const; bool Open(const IBamIODevice::OpenMode mode); int64_t Read(char* data, const unsigned int numBytes); diff --git a/src/api/internal/io/ByteArray_p.cpp b/src/api/internal/io/ByteArray_p.cpp index aa74f28..2bfdd1b 100644 --- a/src/api/internal/io/ByteArray_p.cpp +++ b/src/api/internal/io/ByteArray_p.cpp @@ -2,6 +2,8 @@ using namespace BamTools; using namespace BamTools::Internal; +#include // debug + #include #include using namespace std; @@ -59,9 +61,9 @@ char& ByteArray::operator[](size_t i) { } size_t ByteArray::IndexOf(const char c, const size_t from, const size_t to) const { - const size_t size = ( (to == 0 ) ? m_data.size() : to); + const size_t size = ( (to == 0 ) ? m_data.size() : to ); for ( size_t i = from; i < size; ++i ) { - if ( m_data.at(i) == c ) + if ( m_data.at(i) == c ) return i; } return m_data.size(); diff --git a/src/api/internal/io/HostAddress_p.cpp b/src/api/internal/io/HostAddress_p.cpp index aa3c9a3..9d4fc97 100644 --- a/src/api/internal/io/HostAddress_p.cpp +++ b/src/api/internal/io/HostAddress_p.cpp @@ -15,12 +15,6 @@ using namespace std; namespace BamTools { namespace Internal { -// convenience 'isalpha' wrapper -static inline -bool isAlpha(char c) { - return ( isalpha(c) != 0 ); -} - // split a string into fields, on delimiter character static inline vector split(const string& source, char delim) { @@ -57,6 +51,13 @@ bool parseIp4(const string& address, uint32_t& maybeIp4 ) { uint32_t ipv4(0); for ( uint8_t i = 0; i < 4; ++i ) { + const string& field = addressFields.at(i); + const size_t fieldSize = field.size(); + for ( size_t j = 0; j < fieldSize; ++j ) { + if ( !isdigit(field[j]) ) + return false; + } + int value = atoi( addressFields.at(i).c_str() ); if ( value < 0 || value > 255 ) return false; diff --git a/src/api/internal/io/HostInfo_p.cpp b/src/api/internal/io/HostInfo_p.cpp index 2bb0187..693b2f2 100644 --- a/src/api/internal/io/HostInfo_p.cpp +++ b/src/api/internal/io/HostInfo_p.cpp @@ -9,6 +9,8 @@ using namespace BamTools::Internal; # include "api/internal/io/NetUnix_p.h" #endif +#include // debug + // standard C++ includes #include #include @@ -139,6 +141,8 @@ HostInfo HostInfo::Lookup(const string& hostname, const string& port) { // do 'normal' lookup else { + cout << "HostInfo::Lookup() - looking up addresses for domain name: " << hostname << endl; + // setup address lookup 'hints' addrinfo hints; memset(&hints, 0, sizeof(hints)); @@ -153,6 +157,8 @@ HostInfo HostInfo::Lookup(const string& hostname, const string& port) { // if everything OK if ( status == 0 ) { + cout << "HostInfo::Lookup() - found addresses" << endl; + // iterate over all IP addresses found addrinfo* p = res; for ( ; p != NULL; p = p->ai_next ) { @@ -161,6 +167,7 @@ HostInfo HostInfo::Lookup(const string& hostname, const string& port) { if ( p->ai_family == AF_INET ) { sockaddr_in* ipv4 = (sockaddr_in*)p->ai_addr; HostAddress a( ntohl(ipv4->sin_addr.s_addr) ); + cout << "\t" << a.GetIPString() << endl; uniqueAddresses.insert(a); } @@ -168,6 +175,7 @@ HostInfo HostInfo::Lookup(const string& hostname, const string& port) { else if ( p->ai_family == AF_INET6 ) { sockaddr_in6* ipv6 = (sockaddr_in6*)p->ai_addr; HostAddress a(ipv6->sin6_addr.s6_addr); + cout << "\t" << a.GetIPString() << endl; uniqueAddresses.insert(a); } } @@ -185,7 +193,7 @@ HostInfo HostInfo::Lookup(const string& hostname, const string& port) { status == EAI_NONAME || status == EAI_FAIL # ifdef EAI_NODATA - || status == EAI_NODATA // officially deprecated, but just in case we run into it + || status == EAI_NODATA // officially deprecated, but just in case we happen to hit it # endif // EAI_NODATA #else // _WIN32 diff --git a/src/api/internal/io/HttpHeader_p.cpp b/src/api/internal/io/HttpHeader_p.cpp new file mode 100644 index 0000000..1398d4c --- /dev/null +++ b/src/api/internal/io/HttpHeader_p.cpp @@ -0,0 +1,385 @@ +#include "api/internal/io/HttpHeader_p.h" +using namespace BamTools; +using namespace BamTools::Internal; + +#include +#include +#include +using namespace std; + +namespace BamTools { + +// ----------- +// constants +// ----------- + +namespace Constants { + +static const char CAR_RET_CHAR = '\r'; +static const char COLON_CHAR = ':'; +static const char DOT_CHAR = '.'; +static const char NEWLINE_CHAR = '\n'; +static const char SPACE_CHAR = ' '; +static const char TAB_CHAR = '\t'; + +static const string FIELD_NEWLINE = "\r\n"; +static const string FIELD_SEPARATOR = ": "; +static const string HTTP_STRING = "HTTP/"; + +} // namespace Constants + +// ------------------------ +// static utility methods +// ------------------------ + +namespace Internal { + +static inline +bool IsSpace(const char c) { + const int n = static_cast(c); + return ( n== 0 || (n <= 13 && n >= 9) ); +} + +// split on hitting single char delim +static vector Split(const string& source, const char delim) { + stringstream ss(source); + string field; + vector fields; + while ( getline(ss, field, delim) ) + fields.push_back(field); + return fields; +} + +static string Trim(const string& source) { + + // skip if empty string + if ( source.empty() ) + return source; + + // fetch string data + const char* s = source.data(); // ignoring null-term on purpose + const size_t size = source.size(); + size_t start = 0; + size_t end = size-1; + + // skip if no spaces at start or end + if ( !IsSpace(s[start]) && !IsSpace( s[end] ) ) + return source; + + // remove leading whitespace + while ( (start != end) && IsSpace(s[start]) ) + ++start; + + // remove trailing whitespace + if ( start <= end ) { + while ( end && IsSpace(s[end]) ) + --end; + } + + // return result + return string(s + start, (end-start) + 1); +} + +} // namespace Internal +} // namespace BamTools + +// --------------------------- +// HttpHeader implementation +// --------------------------- + +HttpHeader::HttpHeader(void) + : m_isValid(true) + , m_majorVersion(1) + , m_minorVersion(1) +{ } + +HttpHeader::HttpHeader(const string& s) + : m_isValid(true) + , m_majorVersion(1) + , m_minorVersion(1) +{ + Parse(s); +} + +HttpHeader::~HttpHeader(void) { } + +bool HttpHeader::ContainsKey(const string& key) const { + return ( m_fields.find(key) != m_fields.end() ); +} + +int HttpHeader::GetMajorVersion(void) const { + return m_majorVersion; +} + +int HttpHeader::GetMinorVersion(void) const { + return m_minorVersion; +} + +string HttpHeader::GetValue(const string& key) const { + if ( ContainsKey(key) ) + return m_fields.at(key); + else return string(); +} + +bool HttpHeader::IsValid(void) const { + return m_isValid; +} + +void HttpHeader::Parse(const string& s) { + + // trim whitespace from input string + const string trimmed = Trim(s); + + // split into list of header lines + vector rawFields = Split(trimmed, Constants::NEWLINE_CHAR); + + // prep our 'cleaned' fields container + vector cleanFields; + cleanFields.reserve(rawFields.size()); + + // remove any empty fields and clean any trailing windows-style carriage returns ('\r') + vector::iterator rawFieldIter = rawFields.begin(); + vector::iterator rawFieldEnd = rawFields.end(); + for ( ; rawFieldIter != rawFieldEnd; ++rawFieldIter ) { + string& field = (*rawFieldIter); + + // skip empty fields + if ( field.empty() ) + continue; + + // remove carriage returns + const size_t fieldSize = field.size(); + if ( field[fieldSize-1] == Constants::CAR_RET_CHAR ) + field.resize(fieldSize-1); + + // store cleaned field + cleanFields.push_back(field); + } + + // skip add'l processing if nothing here + if ( cleanFields.empty() ) + return; + + // parse header lines + int lineNumber = 0; + vector::const_iterator fieldIter = cleanFields.begin(); + vector::const_iterator fieldEnd = cleanFields.end(); + for ( ; fieldIter != fieldEnd; ++fieldIter, ++lineNumber ) { + if ( !ParseLine( (*fieldIter), lineNumber ) ) { + m_isValid = false; + return; + } + } +} + +bool HttpHeader::ParseLine(const string& line, int) { + + // find colon position, return failure if not found + const size_t colonFound = line.find(Constants::COLON_CHAR); + if ( colonFound == string::npos ) + return false; + + // store key/value (without leading/trailing whitespace) & return success + const string key = Trim(line.substr(0, colonFound)); + const string value = Trim(line.substr(colonFound+1)); + m_fields[key] = value; + return true; +} + +void HttpHeader::RemoveField(const string& key) { + m_fields.erase(key); +} + +void HttpHeader::SetField(const string& key, const string& value) { + m_fields[key] = value; +} + +void HttpHeader::SetValid(bool ok) { + m_isValid = ok; +} + +void HttpHeader::SetVersion(int major, int minor) { + m_majorVersion = major; + m_minorVersion = minor; +} + +string HttpHeader::ToString(void) const { + string result(""); + if ( m_isValid ) { + map::const_iterator fieldIter = m_fields.begin(); + map::const_iterator fieldEnd = m_fields.end(); + for ( ; fieldIter != fieldEnd; ++fieldIter ) { + const string& key = (*fieldIter).first; + const string& value = (*fieldIter).second; + const string& line = key + Constants::FIELD_SEPARATOR + + value + Constants::FIELD_NEWLINE; + result += line; + } + } + return result; +} + +// ---------------------------------- +// HttpRequestHeader implementation +// ---------------------------------- + +HttpRequestHeader::HttpRequestHeader(const string& method, + const string& resource, + int majorVersion, + int minorVersion) + : HttpHeader() + , m_method(method) + , m_resource(resource) +{ + SetVersion(majorVersion, minorVersion); +} + +HttpRequestHeader::~HttpRequestHeader(void) { } + +string HttpRequestHeader::GetMethod(void) const { + return m_method; +} + +string HttpRequestHeader::GetResource(void) const { + return m_resource; +} + +bool HttpRequestHeader::ParseLine(const string& line, int lineNumber) { + + // if not 'request line', just let base class parse + if ( lineNumber != 0 ) + return HttpHeader::ParseLine(line, lineNumber); + + // fail if empty line + if ( line.empty() ) + return false; + + // walk through request line, storing positions + // GET /path/to/resource HTTP/1.1 + // ^ ^^ ^^ + const size_t foundMethod = line.find_first_not_of(Constants::SPACE_CHAR); // skip any leading whitespace + if ( foundMethod == string::npos ) return false; + const size_t foundFirstSpace = line.find(Constants::SPACE_CHAR, foundMethod+1); + if ( foundFirstSpace == string::npos ) return false; + const size_t foundResource = line.find_first_not_of(Constants::SPACE_CHAR, foundFirstSpace+1); + if ( foundResource == string::npos ) return false; + const size_t foundSecondSpace = line.find(Constants::SPACE_CHAR, foundResource+1); + if ( foundSecondSpace == string::npos ) return false; + const size_t foundVersion= line.find_first_not_of(Constants::SPACE_CHAR, foundSecondSpace+1); + if ( foundVersion == string::npos ) return false; + + // parse out method & resource + m_method = line.substr(foundMethod, foundFirstSpace - foundMethod); + m_resource = line.substr(foundResource, foundSecondSpace - foundResource); + + // parse out version numbers + const string temp = line.substr(foundVersion); + if ( (temp.find(Constants::HTTP_STRING) != 0) || (temp.size() != 8) ) + return false; + const int major = static_cast(temp.at(5) - '0'); + const int minor = static_cast(temp.at(7) - '0'); + SetVersion(major, minor); + + // if we get here, return success + return true; +} + +string HttpRequestHeader::ToString(void) const { + stringstream request(""); + request << m_method << Constants::SPACE_CHAR + << m_resource << Constants::SPACE_CHAR + << Constants::HTTP_STRING << GetMajorVersion() << Constants::DOT_CHAR << GetMinorVersion() + << Constants::FIELD_NEWLINE + << HttpHeader::ToString() + << Constants::FIELD_NEWLINE; + return request.str(); +} + +// ----------------------------------- +// HttpResponseHeader implementation +// ----------------------------------- + +HttpResponseHeader::HttpResponseHeader(const int statusCode, + const string& reason, + int majorVersion, + int minorVersion) + + : HttpHeader() + , m_statusCode(statusCode) + , m_reason(reason) +{ + SetVersion(majorVersion, minorVersion); +} + +HttpResponseHeader::HttpResponseHeader(const string& s) + : HttpHeader() + , m_statusCode(0) +{ + Parse(s); +} + +HttpResponseHeader::~HttpResponseHeader(void) { } + +string HttpResponseHeader::GetReason(void) const { + return m_reason; +} + +int HttpResponseHeader::GetStatusCode(void) const { + return m_statusCode; +} + +bool HttpResponseHeader::ParseLine(const string& line, int lineNumber) { + + // if not 'status line', just let base class + if ( lineNumber != 0 ) + return HttpHeader::ParseLine(line, lineNumber); + + // fail if empty line + if ( line.empty() ) + return false; + + // walk through status line, storing positions + // HTTP/1.1 200 OK + // ^ ^^ ^^ + + const size_t foundVersion = line.find_first_not_of(Constants::SPACE_CHAR); // skip any leading whitespace + if ( foundVersion == string::npos ) return false; + const size_t foundFirstSpace = line.find(Constants::SPACE_CHAR, foundVersion+1); + if ( foundFirstSpace == string::npos ) return false; + const size_t foundStatusCode = line.find_first_not_of(Constants::SPACE_CHAR, foundFirstSpace+1); + if ( foundStatusCode == string::npos ) return false; + const size_t foundSecondSpace = line.find(Constants::SPACE_CHAR, foundStatusCode+1); + if ( foundSecondSpace == string::npos ) return false; + const size_t foundReason= line.find_first_not_of(Constants::SPACE_CHAR, foundSecondSpace+1); + if ( foundReason == string::npos ) return false; + + // parse version numbers + string temp = line.substr(foundVersion, foundFirstSpace - foundVersion); + if ( (temp.find(Constants::HTTP_STRING) != 0) || (temp.size() != 8) ) + return false; + const int major = static_cast(temp.at(5) - '0'); + const int minor = static_cast(temp.at(7) - '0'); + SetVersion(major, minor); + + // parse status code + temp = line.substr(foundStatusCode, foundSecondSpace - foundStatusCode); + if ( temp.size() != 3 ) return false; + m_statusCode = atoi( temp.c_str() ); + + // reason phrase should be everything else left + m_reason = line.substr(foundReason); + + // if we get here, return success + return true; +} + +string HttpResponseHeader::ToString(void) const { + stringstream response(""); + response << Constants::HTTP_STRING << GetMajorVersion() << Constants::DOT_CHAR << GetMinorVersion() + << Constants::SPACE_CHAR << m_statusCode + << Constants::SPACE_CHAR << m_reason + << Constants::FIELD_NEWLINE + << HttpHeader::ToString() + << Constants::FIELD_NEWLINE; + return response.str(); +} diff --git a/src/api/internal/io/HttpHeader_p.h b/src/api/internal/io/HttpHeader_p.h new file mode 100644 index 0000000..764ff63 --- /dev/null +++ b/src/api/internal/io/HttpHeader_p.h @@ -0,0 +1,112 @@ +#ifndef HTTP_HEADER_P_H +#define HTTP_HEADER_P_H + +#include "api/api_global.h" +#include +#include + +namespace BamTools { +namespace Internal { + +class HttpHeader { + + // ctors & dtor + public: + HttpHeader(void); + HttpHeader(const std::string& s); + virtual ~HttpHeader(void); + + // HttpHeader interface + public: + + // header field=>value access + bool ContainsKey(const std::string& key) const; + std::string GetValue(const std::string& key) const; + void RemoveField(const std::string& key); + void SetField(const std::string& key, const std::string& value); + + // get formatted header string + virtual std::string ToString(void) const; + + // query HTTP version used + int GetMajorVersion(void) const; + int GetMinorVersion(void) const; + + // see if header was parsed OK + bool IsValid(void) const; + + // internal methods + protected: + void Parse(const std::string& s); + virtual bool ParseLine(const std::string& line, int lineNumber); + void SetValid(bool ok); + void SetVersion(int major, int minor); + + // data members + private: + std::map m_fields; + + bool m_isValid; // should usually be true, only false if error processing a header line + int m_majorVersion; + int m_minorVersion; +}; + +class HttpRequestHeader : public HttpHeader { + + // ctor & dtor + public: + HttpRequestHeader(const std::string& method, // "GET", "PUT", etc + const std::string& resource, // filename + int majorVersion = 1, // version info + int minorVersion = 1); + ~HttpRequestHeader(void); + + // HttpRequestHeader interface + public: + std::string GetMethod(void) const; + std::string GetResource(void) const; + + // HttpHeader implementation + public: + std::string ToString(void) const; + protected: + bool ParseLine(const std::string& line, int lineNumber); + + // data members + private: + std::string m_method; + std::string m_resource; +}; + +class HttpResponseHeader : public HttpHeader { + + // ctor & dtor + public: + HttpResponseHeader(const int statusCode, // 200, 404, etc + const std::string& reason = std::string(), // 'reason phrase' for code + int majorVersion = 1, // version info + int minorVersion = 1); + HttpResponseHeader(const std::string& s); + ~HttpResponseHeader(void); + + // HttpRequestHeader interface + public: + std::string GetReason(void) const; + int GetStatusCode(void) const; + + // HttpHeader implementation + public: + std::string ToString(void) const; + protected: + bool ParseLine(const std::string& line, int lineNumber); + + // data members + private: + int m_statusCode; + std::string m_reason; +}; + +} // namespace Internal +} // namespace BamTools + +#endif // HTTP_HEADER_P_H diff --git a/src/api/internal/io/NetUnix_p.h b/src/api/internal/io/NetUnix_p.h new file mode 100644 index 0000000..14b2132 --- /dev/null +++ b/src/api/internal/io/NetUnix_p.h @@ -0,0 +1,26 @@ +#ifndef NETUNIX_P_H +#define NETUNIX_P_H + +#ifndef _WIN32 // <-- source files only include the proper Net*_p.h, but this is a double-check + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef BT_SOCKLEN_T +# define BT_SOCKLEN_T socklen_t +#endif + +namespace BamTools { +namespace Internal { + +} // namespace Internal +} // namespace BamTools + +#endif // _WIN32 +#endif // NETUNIX_P_H diff --git a/src/api/internal/io/NetWin_p.h b/src/api/internal/io/NetWin_p.h new file mode 100644 index 0000000..fb138b2 --- /dev/null +++ b/src/api/internal/io/NetWin_p.h @@ -0,0 +1,39 @@ +#ifndef NETWIN_P_H +#define NETWIN_P_H + +#ifdef _WIN32 // <-- source files only include the proper Net*_p.h, but this is a double-check + +#include // <-- should bring 'windows.h' along with it +#include + +#ifndef BT_SOCKLEN_T +# define BT_SOCKLEN_T int +#endif + +#ifdef _MSC_VER +# pragma comment(lib, "ws2_32.lib") +#endif + +namespace BamTools { +namespace Internal { + +// use RAII to ensure WSA is en +class WindowsSockInit { + public: + WindowsSockInit(void) { + WSAData wsadata; + WSAStartup(MAKEWORD(2,2), &wsadata); // catch error ? + } + + ~WindowsSockInit(void) { + WSACleanup(); + } +}; + +} // namespace Internal +} // namespace BamTools + +#endif // _WIN32 + +#endif // NETWIN_P_H + diff --git a/src/api/internal/io/RollingBuffer_p.cpp b/src/api/internal/io/RollingBuffer_p.cpp new file mode 100644 index 0000000..ab29253 --- /dev/null +++ b/src/api/internal/io/RollingBuffer_p.cpp @@ -0,0 +1,335 @@ +#include "api/internal/io/RollingBuffer_p.h" +using namespace BamTools; +using namespace BamTools::Internal; + +#include // for debug + +#include +#include +#include +#include +using namespace std; + +// ------------------------------ +// RollingBuffer implementation +// ------------------------------ + +RollingBuffer::RollingBuffer(size_t growth) + : m_bufferGrowth(growth) +{ + // buffer always contains at least 1 (maybe empty) byte array + m_data.push_back( ByteArray() ); + + // set cleared state + Clear(); +} + +RollingBuffer::~RollingBuffer(void) { } + +size_t RollingBuffer::BlockSize(void) const { + + // if only one byte array in buffer <- needed? + if ( m_tailBufferIndex == 0 ) + return m_tail - m_head; + + // otherwise return remaining num bytes in first array + const ByteArray& first = m_data.front(); + return first.Size() - m_head; +} + +bool RollingBuffer::CanReadLine(void) const { + return IndexOf('\n') != string::npos; +} + +void RollingBuffer::Chop(size_t n) { + + // update buffer size + if ( n > m_totalBufferSize ) + m_totalBufferSize = 0; + else + m_totalBufferSize -= n; + + // loop until target case hit + for ( ; ; ) { + + // if only one array, decrement tail + if ( m_tailBufferIndex == 0 ) { + m_tail -= n; + + // if all data chopped + if ( m_tail <= m_head ) { + m_head = 0; + m_tail = 0; + } + return; + } + + // if there's room in last byte array to 'chop', just decrement tail + if ( n <= m_tail ) { + m_tail -= n; + return; + } + + // otherwise we're going to overlap our internal byte arrays + // reduce our chop amount by the amount of data in the last byte array + n -= m_tail; + + // remove last byte array & set tail to it's end + m_data.pop_back(); + --m_tailBufferIndex; + m_tail = m_data.at(m_tailBufferIndex).Size(); + } + + // if buffer is now empty, reset state & clear up memory + if ( IsEmpty() ) + Clear(); +} + +void RollingBuffer::Clear(void) { + + // remove all byte arrays (except first) + m_data.erase( m_data.begin()+1, m_data.end() ); + + // clear out first byte array + m_data[0].Resize(0); + m_data[0].Squeeze(); + + // reset index & size markers + m_head = 0; + m_tail = 0; + m_tailBufferIndex = 0; + m_totalBufferSize = 0; +} + +void RollingBuffer::Free(size_t n) { + + // update buffer size + if ( n > m_totalBufferSize ) + m_totalBufferSize = 0; + else + m_totalBufferSize -= n; + + // loop until target case hit + for ( ; ; ) { + + const size_t blockSize = BlockSize(); + + // if there's room in current array + if ( n < blockSize ) { + + // shift 'head' over @n bytes + m_head += n; + + // check for emptied, single byte array + if ( m_head == m_tail && m_tailBufferIndex == 0 ) { + m_head = 0; + m_tail = 0; + } + + break; + } + + // otherwise we need to check next byte array + // first update amount to remove + n -= blockSize; + + // special case - there was only 1 array + if ( m_data.size() == 1 ) { + if ( m_data.at(0).Size() != m_bufferGrowth ) + m_data[0].Resize(m_bufferGrowth); + m_head = 0; + m_tail = 0; + m_tailBufferIndex = 0; + break; + } + + // otherwise, remove first array and move to next iteration + m_data.pop_front(); + --m_tailBufferIndex; + m_head = 0; + } + + // if buffer is now empty, reset state & clear up memory + if ( IsEmpty() ) + Clear(); +} + +size_t RollingBuffer::IndexOf(char c) const { + + size_t index(0); + + // iterate over byte arrays + const size_t numBuffers = m_data.size(); + for ( size_t i = 0; i < numBuffers; ++i ) { + const ByteArray& current = m_data.at(i); + + // if on first array, use head; else 0 + const size_t start = ( (i==0) ? m_head : 0 ); + + // if on last array, set end; else use current byte array size + const size_t end = ( (i==m_tailBufferIndex) ? m_tail : current.Size()); + + // look through this iteration's byte array for @c + const char* p = current.ConstData()+start; + for ( size_t j = start; j < end; ++j ) { + if ( *p++ == c ) + return index; + ++index; + } + } + + // no match found + return string::npos; +} + +bool RollingBuffer::IsEmpty(void) const { + return (m_tailBufferIndex == 0) && (m_tail == 0); +} + +size_t RollingBuffer::Read(char* dest, size_t max) { + + size_t bytesToRead = std::min(Size(), max); + size_t bytesReadSoFar = 0; + + while ( bytesReadSoFar < bytesToRead ) { + const char* readPtr = ReadPointer(); + size_t blockBytes = std::min( (bytesToRead - bytesReadSoFar), BlockSize() ); + if ( dest ) + memcpy(dest+bytesReadSoFar, readPtr, blockBytes); + bytesReadSoFar += blockBytes; + Free(blockBytes); + } + + return bytesReadSoFar; +} + +size_t RollingBuffer::ReadLine(char* dest, size_t max) { + + // if we can't read line or if max is 0 + if ( !CanReadLine() || max == 0 ) + return 0; + + // otherwise, read until we hit newline + size_t bytesReadSoFar = 0; + bool finished = false; + while ( !finished ) { + + const size_t index = IndexOf('\n'); + const char* readPtr = ReadPointer(); + size_t bytesToRead = std::min( (index+1)-bytesReadSoFar, BlockSize() ); + bytesToRead = std::min( bytesToRead, (max-1)-bytesReadSoFar ); + memcpy(dest+bytesReadSoFar, readPtr, bytesToRead); + bytesReadSoFar += bytesToRead; + Free(bytesToRead); + + if ( !((bytesReadSoFar < index+1)&&(bytesReadSoFar < max-1)) ) + finished = true; + } + + // null terminate 'dest' & return numBytesRead + dest[bytesReadSoFar] = '\0'; + return bytesReadSoFar; +} + +string RollingBuffer::ReadLine(size_t max) { + + ByteArray result; + result.Resize(max); + + size_t numBytesRead = 0; + + // if max not provided, we need to read incrementally + if ( max == 0 ) { + max = UINT_MAX; + + // make sure we leave room for null terminator + result.Resize(1); + + size_t readResult; + do { + result.Resize(std::min(max, result.Size()+m_bufferGrowth)); + readResult = ReadLine(result.Data() + numBytesRead, result.Size() - numBytesRead); + if ( readResult > 0 || numBytesRead == 0 ) + numBytesRead += readResult; + } while ( readResult == m_bufferGrowth && result[numBytesRead-1] != '\n'); + } + + // otherwise read line with provided max + else numBytesRead = ReadLine(result.Data(), result.Size()); + + // adjust byte array depending on numBytesRead + if ( numBytesRead == 0 ) + result.Clear(); + else + result.Resize(numBytesRead); + + // return string from byte array + return string(result.ConstData(), result.Size()); +} + +const char* RollingBuffer::ReadPointer(void) const { + + // return null if empty buffer + if ( m_data.empty() ) + return 0; + + // otherwise return pointer to current position + const ByteArray& first = m_data.front(); + return first.ConstData() + m_head; +} + +char* RollingBuffer::Reserve(size_t n) { + + // if empty buffer + if ( m_totalBufferSize == 0 ) { + m_data[0].Resize( std::max(m_bufferGrowth, n) ); + m_totalBufferSize += n; + m_tail = n; + return m_data[m_tailBufferIndex].Data(); + } + + // increment buffer's byte count + m_totalBufferSize += n; + + // if buffer already contains enough space to fit @n more bytes + if ( (m_tail + n) <= m_data.at(m_tailBufferIndex).Size() ) { + + // fetch write pointer at current 'tail', increment tail by @n & return + char* ptr = m_data[m_tailBufferIndex].Data() + m_tail; + m_tail += n; + return ptr; + } + + // if last byte array isn't half full + if ( m_tail < m_data.at(m_tailBufferIndex).Size()/2 ) { + + // we'll allow simple resize + m_data[m_tailBufferIndex].Resize(m_tail + n); + + // fetch write pointer at current 'tail', increment tail by @n & return + char* ptr = m_data[m_tailBufferIndex].Data() + m_tail; + m_tail += n; + return ptr; + } + + // otherwise, shrink last byte array to current used size + m_data[m_tailBufferIndex].Resize(m_tail); + + // then append new byte array + m_data.push_back( ByteArray() ); + ++m_tailBufferIndex; + m_data[m_tailBufferIndex].Resize( std::max(m_bufferGrowth, n) ); + m_tail = n; + + // return write-able pointer on new array + return m_data[m_tailBufferIndex].Data(); +} + +size_t RollingBuffer::Size(void) const { + return m_totalBufferSize; +} + +void RollingBuffer::Write(const char* src, size_t n) { + char* writePtr = Reserve(n); + memcpy(writePtr, src, n); +} diff --git a/src/api/internal/io/RollingBuffer_p.h b/src/api/internal/io/RollingBuffer_p.h new file mode 100644 index 0000000..70f06f2 --- /dev/null +++ b/src/api/internal/io/RollingBuffer_p.h @@ -0,0 +1,50 @@ +#ifndef ROLLINGBUFFER_P_H +#define ROLLINGBUFFER_P_H + +#include "api/api_global.h" +#include "api/internal/io/ByteArray_p.h" +#include +#include + +namespace BamTools { +namespace Internal { + +class RollingBuffer { + + // ctors & dtor + public: + RollingBuffer(size_t growth); // inits buffer, new byte arrays will try to be of size @growth + ~RollingBuffer(void); // dtor + + // RollingBuffer interface + public: + size_t BlockSize(void) const; // returns current buffer size + bool CanReadLine(void) const; // checks buffer for carriage return + void Chop(size_t n); // frees @n bytes from end of buffer + void Clear(void); // clears entire buffer structure + void Free(size_t n); // frees @n bytes from front of buffer + size_t IndexOf(char c) const; // checks buffer for @c + bool IsEmpty(void) const; // returns whether buffer contains data + size_t Read(char* dest, size_t max); // returns up to @maxLen bytes into @dest, returns exactly how many bytes were read from buffer + size_t ReadLine(char* dest, size_t max); + std::string ReadLine(size_t max = 0); + + const char* ReadPointer(void) const; // returns a C-fxn compatible char* to byte data + char* Reserve(size_t n); // ensures that buffer contains space for @n incoming bytes, returns write-able char* + size_t Size(void) const; // returns current number of bytes stored in buffer + void Write(const char* src, size_t n); // reserves space for @n bytes, then appends contents of @src to buffer + + // data members + private: + size_t m_head; // index into current data (next char) + size_t m_tail; // index into last data position + size_t m_tailBufferIndex; // m_data::size() - 1 + size_t m_totalBufferSize; // total buffer size + size_t m_bufferGrowth; // new buffers are typically initialized with this size + std::deque m_data; // basic 'buffer of buffers' +}; + +} // namespace Internal +} // namespace BamTools + +#endif // ROLLINGBUFFER_P_H diff --git a/src/api/internal/io/TcpSocketEngine_win_p.cpp b/src/api/internal/io/TcpSocketEngine_win_p.cpp new file mode 100644 index 0000000..6438b12 --- /dev/null +++ b/src/api/internal/io/TcpSocketEngine_win_p.cpp @@ -0,0 +1,303 @@ +#include "api/internal/io/TcpSocketEngine_p.h" +#include "api/internal/io/NetWin_p.h" +using namespace BamTools; +using namespace BamTools::Internal; + +#include +using namespace std; + +// ------------------------ +// static utility methods +// ------------------------ + +namespace BamTools { +namespace Internal { + +static inline +void getPortAndAddress(const sockaddr* s, uint16_t& port, HostAddress& address) { + + // IPv6 + if (s->sa_family == AF_INET6) { + sockaddr_in6* ip6 = (sockaddr_in6*)s; + port = ntohs(ip6->sin6_port); + IPv6Address tmp; + memcpy(&tmp, &ip6->sin6_addr.in6_addr, sizeof(tmp)); + address.SetAddress(tmp); + return; + } + + // IPv4 + if ( s->sa_family == AF_INET ) { + sockaddr_in* ip4 = (sockaddr_in*)s; + port = ntohl(ip4->sin_port); + address.SetAddress( ntohl(ip4->sin_addr) ); + return; + } + + // should be unreachable + BT_ASSERT_X(false, "TcpSocketEngine::getPortAndAddress() : unknown network protocol "); + return false; +} + +} // namespace Internal +} // namespace BamTools + +// -------------------------------- +// TcpSocketEngine implementation +// -------------------------------- + +void TcpSocketEngine::nativeClose(void) { + close(m_socketDescriptor); +} + +bool TcpSocketEngine::nativeConnect(const HostAddress& address, const uint16_t port) { + + // setup connection parameters from address/port + sockaddr_in sockAddrIPv4; + sockaddr_in6 sockAddrIPv6; + sockaddr* sockAddrPtr = 0; + BT_SOCKLEN_T sockAddrSize = 0; + + // IPv6 + if ( address.GetProtocol() == HostAddress::IPv6Protocol ) { + + memset(&sockAddrIPv6, 0, sizeof(sockAddrIPv6)); + sockAddrIPv6.sin6_family = AF_INET6; + sockAddrIPv6.sin6_port = htons(port); + + IPv6Address ip6 = address.GetIPv6Address(); + memcpy(&sockAddrIPv6.sin6_addr.s6_addr, &ip6, sizeof(ip6)); + + sockAddrSize = sizeof(sockAddrIPv6); + sockAddrPtr = (sockaddr*)&sockAddrIPv6; + } + + // IPv4 + else if ( address.GetProtocol() == HostAddress::IPv4Protocol ) { + + memset(&sockAddrIPv4, 0, sizeof(sockAddrIPv4)); + sockAddrIPv4.sin_family = AF_INET; + sockAddrIPv4.sin_port = htons(port); + sockAddrIPv4.sin_addr.s_addr = htonl(address.GetIPv4Address()); + + sockAddrSize = sizeof(sockAddrIPv4); + sockAddrPtr = (sockaddr*)&sockAddrIPv4; + } + + // unknown (should be unreachable) + else BT_ASSERT_X(false, "TcpSocketEngine::nativeConnect() : unknown network protocol"); + + // attempt conenction + int connectResult = connect(socketDescriptor, sockAddrPtr, sockAddrSize); + + // if hit error + if ( connectResult == -1 ) { + + // see what error was encountered + switch ( errno ) { + + case EISCONN: + m_socketState = TcpSocket::ConnectedState; + break; + case ECONNREFUSED: + case EINVAL: + m_socketError = TcpSocket::ConnectionRefusedError; + m_socketState = TcpSocket::UnconnectedState; + m_errorString = "connection refused"; + break; + case ETIMEDOUT: + m_socketError = TcpSocket::NetworkError; + m_errorString = "connection timed out"; + break; + case EHOSTUNREACH: + m_socketError = TcpSocket::NetworkError; + m_socketState = TcpSocket::UnconnectedState; + m_errorString = "host unreachable"; + break; + case ENETUNREACH: + m_socketError = TcpSocket::NetworkError; + m_socketState = TcpSocket::UnconnectedState; + m_errorString = "network unreachable"; + break; + case EADDRINUSE: + m_socketError = TcpSocket::NetworkError; + m_errorString = "address already in use"; + break; + case EACCES: + case EPERM: + m_socketError = TcpSocket::SocketAccessError; + m_socketState = TcpSocket::UnconnectedState; + m_errorString = "permission denied"; + case EAFNOSUPPORT: + case EBADF: + case EFAULT: + case ENOTSOCK: + m_socketState = TcpSocket::UnconnectedState; + default: + break; + } + + if ( m_socketState != TcpSocket::ConnectedState ) + return false; + } + + // otherwise, we should be good + // update state & return success + m_socketState = TcpSocket::ConnectedState; + return true; +} + +bool TcpSocketEngine::nativeCreateSocket(HostAddress::NetworkProtocol protocol) { + + // get protocol value for requested protocol type + const int protocolNum = ( (protocol == HostAddress::IPv6Protocol) ? AF_INET6 : AF_INET ); + + // attempt to create socket + int socketFd = socket(protocolNum, SOCK_STREAM, IPPROTO_TCP); + + // if we fetched an invalid socket descriptor + if ( socketFd <= 0 ) { + + // see what error we got + switch ( errno ) { + case EPROTONOSUPPORT: + case EAFNOSUPPORT: + case EINVAL: + m_socketError = TcpSocket::UnsupportedSocketOperationError; + m_errorString = "protocol not supported"; + break; + case ENFILE: + case EMFILE: + case ENOBUFS: + case ENOMEM: + m_socketError = TcpSocket::SocketResourceError; + m_errorString = "out of resources"; + break; + case EACCES: + m_socketError = TcpSocket::SocketAccessError; + m_errorString = "permission denied"; + break; + default: + break; + } + + // return failure + return false; + } + + // otherwise, store our socket FD & return success + m_socketDescriptor = socketFd; + return true; +} + +bool TcpSocketEngine::nativeFetchConnectionParameters(void) { + + // reset addresses/ports + m_localAddress.Clear(); + m_remoteAddress.Clear(); + m_localPort = 0; + m_remotePort = 0; + + // skip (return failure) if invalid socket FD + if ( m_socketDescriptor == -1 ) + return false; + + sockaddr sa; + BT_SOCKLEN_T sockAddrSize = sizeof(sa); + + // fetch local address info + memset(&sa, 0, sizeof(sa)); + if ( getsockname(m_socketDescriptor, &sa, &sockAddrSize) == 0 ) { + getPortAndAddress(&sa, m_localPort, m_localAddress); + } + else if ( errno == EBADF ) { + m_socketError = TcpSocket::UnsupportedSocketOperationError; + m_errorString = "invalid socket descriptor"; + return false; + } + + // fetch remote address + if ( getpeername(m_socketDescriptor, &sa, &sockAddrSize) == 0 ) + getPortAndAddress(&sa, m_remotePort, m_remoteAddress); + + // return success + return true; +} + +size_t TcpSocketEngine::nativeNumBytesAvailable(void) const { + + // fetch number of bytes, return 0 on error + int numBytes(0); + if ( ioctl(m_socketDescriptor, FIONREAD, (char*)&numBytes) < 0 ) + return 0; + return static_cast(numBytes); +} + +int64_t TcpSocketEngine::nativeRead(char* dest, size_t max) { + + if ( !IsValid() ) + return -1; + + ssize_t ret = read(m_socketDescriptor, dest, max); + if ( ret < 0 ) { + ret = -1; + switch ( errno ) { + case EAGAIN : + // No data was available for reading + ret = -2; + break; + case ECONNRESET : + ret = 0; + break; + default: + break; + } + } + + return static_cast(ret); +} + +// negative value for msecs will block (forever) until +int TcpSocketEngine::nativeSelect(int msecs, bool isRead) const { + + // set up FD set + fd_set fds; + FD_ZERO(&fds); + FD_SET(m_socketDescriptor, &fds); + + // setup our timeout + timeval tv; + tv.tv_sec = msecs / 1000; + tv.tv_usec = (msecs % 1000) * 1000; + + // do 'select' + int ret; + if ( isRead ) + ret = select(m_socketDescriptor + 1, &fds, 0, 0, (msecs < 0 ? 0 : &tv)); + else + ret = select(m_socketDescriptor + 1, 0, &fds, 0, (msecs < 0 ? 0 : &tv)); + return ret; +} + +int64_t TcpSocketEngine::nativeWrite(const char* data, size_t length) { + + ssize_t writtenBytes = write(m_socketDescriptor, data, length); + if ( writtenBytes < 0 ) { + switch (errno) { + case EPIPE: + case ECONNRESET: + writtenBytes = -1; + m_socketError = TcpSocket::RemoteHostClosedError; + m_errorString = "remote host closed connection"; + Close(); + break; + case EAGAIN: + writtenBytes = 0; + break; + default: + break; + } + } + + return static_cast(writtenBytes); +} diff --git a/src/api/internal/io/TcpSocket_p.cpp b/src/api/internal/io/TcpSocket_p.cpp index eb9e760..2f24b8b 100644 --- a/src/api/internal/io/TcpSocket_p.cpp +++ b/src/api/internal/io/TcpSocket_p.cpp @@ -7,11 +7,15 @@ // Provides generic TCP socket (buffered) I/O // *************************************************************************** +#include "api/internal/io/ByteArray_p.h" #include "api/internal/io/TcpSocket_p.h" #include "api/internal/io/TcpSocketEngine_p.h" using namespace BamTools; using namespace BamTools::Internal; +#include // debug + +#include #include #include using namespace std; @@ -24,7 +28,7 @@ namespace BamTools { namespace Internal { // constants -static const size_t DEFAULT_BUFFER_SIZE = 0x8000; +static const size_t DEFAULT_BUFFER_SIZE = 0x4000; } // namespace Internal } // namespace BamTools @@ -67,15 +71,16 @@ bool TcpSocket::ConnectImpl(const HostInfo& hostInfo, { // skip if we're already connected if ( m_state == TcpSocket::ConnectedState ) { - m_error = TcpSocket::SocketResourceError; + m_error = TcpSocket::SocketResourceError; + m_errorString = "socket already connected"; return false; } // reset socket state - m_mode = mode; - m_hostName = hostInfo.HostName(); - m_state = TcpSocket::UnconnectedState; - m_error = TcpSocket::UnknownSocketError; + m_hostName = hostInfo.HostName(); + m_mode = mode; + m_state = TcpSocket::UnconnectedState; + m_error = TcpSocket::UnknownSocketError; // m_localPort = 0; m_remotePort = 0; // m_localAddress.Clear(); @@ -86,6 +91,7 @@ bool TcpSocket::ConnectImpl(const HostInfo& hostInfo, vector addresses = hostInfo.Addresses(); if ( addresses.empty() ) { m_error = TcpSocket::HostNotFoundError; + m_errorString = "no IP addresses found for host"; return false; } @@ -124,6 +130,7 @@ bool TcpSocket::ConnectImpl(const HostInfo& hostInfo, // if we get here, no connection could be made m_error = TcpSocket::HostNotFoundError; + m_errorString = "could not connect to any host addresses"; return false; } @@ -149,7 +156,7 @@ bool TcpSocket::ConnectToHost(const string& hostName, // if host name was IP address ("x.x.x.x" or IPv6 format) // otherwise host name was 'plain-text' ("www.foo.bar") // we need to look up IP address(es) - if ( hostAddress.HasIPAddress() ) + if ( hostAddress.HasIPAddress() ) info.SetAddresses( vector(1, hostAddress) ); else info = HostInfo::Lookup(hostName, port); @@ -171,6 +178,9 @@ void TcpSocket::DisconnectFromHost(void) { m_remoteAddress.Clear(); m_hostName.clear(); m_cachedSocketDescriptor = -1; + + // for future, make sure there's outgoing data that needs to be flushed + m_readBuffer.Clear(); } TcpSocket::SocketError TcpSocket::GetError(void) const { @@ -241,6 +251,13 @@ int64_t TcpSocket::Read(char* data, const unsigned int numBytes) { bool TcpSocket::ReadFromSocket(void) { + // check for any socket engine errors + if ( !m_engine->IsValid() ) { + m_errorString = "TcpSocket::ReadFromSocket - socket disconnected"; + ResetSocketEngine(); + return false; + } + // wait for ready read bool timedOut; bool isReadyRead = m_engine->WaitForRead(5000, &timedOut); @@ -250,13 +267,15 @@ bool TcpSocket::ReadFromSocket(void) { // if we simply timed out if ( timedOut ) { - // TODO: set error string + m_errorString = "TcpSocket::ReadFromSocket - timed out waiting for ready read"; + // get error from engine ? return false; } // otherwise, there was an error else { - // TODO: set error string + m_errorString = "TcpSocket::ReadFromSocket - encountered error while waiting for ready read"; + // get error from engine ? return false; } } @@ -264,12 +283,16 @@ bool TcpSocket::ReadFromSocket(void) { // ######################################################################### // clean this up - smells funky, but it's a key step so it has to be right // ######################################################################### + // get number of bytes available from socket // (if 0, still try to read some data so we don't trigger any OS event behavior // that respond to repeated access to a remote closed socket) int64_t bytesToRead = m_engine->NumBytesAvailable(); - if ( bytesToRead < 0 ) + if ( bytesToRead < 0 ) { + m_errorString = "TcpSocket::ReadFromSocket - encountered error while determining numBytesAvailable"; + // get error from engine ? return false; + } else if ( bytesToRead == 0 ) bytesToRead = 4096; @@ -277,29 +300,86 @@ bool TcpSocket::ReadFromSocket(void) { char* buffer = m_readBuffer.Reserve(bytesToRead); int64_t numBytesRead = m_engine->Read(buffer, bytesToRead); - // (Qt uses -2 for no data, not error) - // squeeze buffer back down & return success - if ( numBytesRead == -2 ) { - m_readBuffer.Chop(bytesToRead); - return true; - } - // ######################################################################### - - // check for any socket engine errors - if ( !m_engine->IsValid() ) { - // TODO: set error string - ResetSocketEngine(); + // if error while reading + if ( numBytesRead == -1 ) { + m_errorString = "TcpSocket::ReadFromSocket - encountered error while reading bytes"; + // get error from engine ? return false; } + // handle special case (no data, but not error) + if ( numBytesRead == -2 ) + m_readBuffer.Chop(bytesToRead); + // return success return true; } -string TcpSocket::ReadLine(void) { - if ( m_readBuffer.CanReadLine() ) - return m_readBuffer.ReadLine(); - return string(); +string TcpSocket::ReadLine(int64_t max) { + + // prep result byte buffer + ByteArray result; + + size_t bufferMax = ((max > static_cast(string::npos)) ? string::npos : static_cast(max)); + result.Resize(bufferMax); + + // read data + int64_t readBytes(0); + if ( result.Size() == 0 ) { + + if ( bufferMax == 0 ) + bufferMax = string::npos; + + result.Resize(1); + + int64_t readResult; + do { + result.Resize( static_cast(std::min(bufferMax, result.Size() + DEFAULT_BUFFER_SIZE)) ); + readResult = ReadLine(result.Data()+readBytes, result.Size()-readBytes); + if ( readResult > 0 || readBytes == 0 ) + readBytes += readResult; + } while ( readResult == DEFAULT_BUFFER_SIZE && result[static_cast(readBytes-1)] != '\n' ); + + } else + readBytes = ReadLine(result.Data(), result.Size()); + + // clean up byte buffer + if ( readBytes <= 0 ) + result.Clear(); + else + result.Resize(static_cast(readBytes)); + + // return byte buffer as string + return string( result.ConstData(), result.Size() ); +} + +int64_t TcpSocket::ReadLine(char* dest, size_t max) { + + // wait for buffer to contain line contents + if ( !WaitForReadLine() ) { + m_errorString = "TcpSocket::ReadLine - error waiting for read line"; + return -1; + } + + // leave room for null term + if ( max < 2 ) + return -1; + --max; + + // read from buffer, handle newlines + int64_t readSoFar = m_readBuffer.ReadLine(dest, max); + if ( readSoFar && dest[readSoFar-1] == '\n' ) { + + // adjust for windows-style '\r\n' + if ( readSoFar > 1 && dest[readSoFar-2] == '\r') { + --readSoFar; + dest[readSoFar-1] = '\n'; + } + } + + // null terminate & return number of bytes read + dest[readSoFar] = '\0'; + return readSoFar; } void TcpSocket::ResetSocketEngine(void) { @@ -316,6 +396,18 @@ void TcpSocket::ResetSocketEngine(void) { m_cachedSocketDescriptor = -1; } +bool TcpSocket::WaitForReadLine(void) { + + // wait until we can read a line (will return immediately if already capable) + while ( !CanReadLine() ) { + if ( !ReadFromSocket() ) + return false; + } + + // if we get here, success + return true; +} + int64_t TcpSocket::Write(const char* data, const unsigned int numBytes) { // single-shot attempt at write (not buffered, just try to shove the data through socket) diff --git a/src/api/internal/io/TcpSocket_p.h b/src/api/internal/io/TcpSocket_p.h index 4cd1f1a..9727423 100644 --- a/src/api/internal/io/TcpSocket_p.h +++ b/src/api/internal/io/TcpSocket_p.h @@ -72,7 +72,9 @@ class TcpSocket { bool CanReadLine(void) const; void ClearBuffer(void); // force buffer to clear (not a 'flush', just a 'discard') int64_t Read(char* data, const unsigned int numBytes); - std::string ReadLine(void); + std::string ReadLine(int64_t max = 0); + int64_t ReadLine(char* dest, size_t max); + bool WaitForReadLine(void); int64_t Write(const char* data, const unsigned int numBytes); // connection values -- 2.39.2