diff options
Diffstat (limited to 'lib/cpp/src/thrift/qt')
-rw-r--r-- | lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp | 66 | ||||
-rw-r--r-- | lib/cpp/src/thrift/qt/TQIODeviceTransport.h | 24 | ||||
-rw-r--r-- | lib/cpp/src/thrift/qt/TQTcpServer.cpp | 61 | ||||
-rw-r--r-- | lib/cpp/src/thrift/qt/TQTcpServer.h | 25 |
4 files changed, 81 insertions, 95 deletions
diff --git a/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp b/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp index 2c8284751..686f24297 100644 --- a/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp +++ b/lib/cpp/src/thrift/qt/TQIODeviceTransport.cpp @@ -26,43 +26,37 @@ using boost::shared_ptr; -namespace apache { namespace thrift { namespace transport { +namespace apache { +namespace thrift { +namespace transport { -TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev) - : dev_(dev) -{ +TQIODeviceTransport::TQIODeviceTransport(shared_ptr<QIODevice> dev) : dev_(dev) { } -TQIODeviceTransport::~TQIODeviceTransport() -{ +TQIODeviceTransport::~TQIODeviceTransport() { dev_->close(); } -void TQIODeviceTransport::open() -{ +void TQIODeviceTransport::open() { if (!isOpen()) { throw TTransportException(TTransportException::NOT_OPEN, "open(): underlying QIODevice isn't open"); } } -bool TQIODeviceTransport::isOpen() -{ +bool TQIODeviceTransport::isOpen() { return dev_->isOpen(); } -bool TQIODeviceTransport::peek() -{ +bool TQIODeviceTransport::peek() { return dev_->bytesAvailable() > 0; } -void TQIODeviceTransport::close() -{ +void TQIODeviceTransport::close() { dev_->close(); } -uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) -{ +uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) { uint32_t requestLen = len; while (len) { uint32_t readSize; @@ -86,8 +80,7 @@ uint32_t TQIODeviceTransport::readAll(uint8_t* buf, uint32_t len) return requestLen; } -uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len) -{ +uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len) { uint32_t actualSize; qint64 readSize; @@ -97,24 +90,22 @@ uint32_t TQIODeviceTransport::read(uint8_t* buf, uint32_t len) } actualSize = (uint32_t)std::min((qint64)len, dev_->bytesAvailable()); - readSize = dev_->read(reinterpret_cast<char *>(buf), actualSize); + readSize = dev_->read(reinterpret_cast<char*>(buf), actualSize); if (readSize < 0) { QAbstractSocket* socket; - if ((socket = qobject_cast<QAbstractSocket* >(dev_.get()))) { + if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) { throw TTransportException(TTransportException::UNKNOWN, "Failed to read() from QAbstractSocket", socket->error()); } - throw TTransportException(TTransportException::UNKNOWN, - "Failed to read from from QIODevice"); + throw TTransportException(TTransportException::UNKNOWN, "Failed to read from from QIODevice"); } return (uint32_t)readSize; } -void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len) -{ +void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len) { while (len) { uint32_t written = write_partial(buf, len); len -= written; @@ -122,8 +113,7 @@ void TQIODeviceTransport::write(const uint8_t* buf, uint32_t len) } } -uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) -{ +uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) { qint64 written; if (!dev_->isOpen()) { @@ -136,7 +126,8 @@ uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) QAbstractSocket* socket; if ((socket = qobject_cast<QAbstractSocket*>(dev_.get()))) { throw TTransportException(TTransportException::UNKNOWN, - "write_partial(): failed to write to QAbstractSocket", socket->error()); + "write_partial(): failed to write to QAbstractSocket", + socket->error()); } throw TTransportException(TTransportException::UNKNOWN, @@ -146,8 +137,7 @@ uint32_t TQIODeviceTransport::write_partial(const uint8_t* buf, uint32_t len) return (uint32_t)written; } -void TQIODeviceTransport::flush() -{ +void TQIODeviceTransport::flush() { if (!dev_->isOpen()) { throw TTransportException(TTransportException::NOT_OPEN, "flush(): underlying QIODevice is not open"); @@ -162,18 +152,16 @@ void TQIODeviceTransport::flush() } } -uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len) -{ - (void) buf; - (void) len; +uint8_t* TQIODeviceTransport::borrow(uint8_t* buf, uint32_t* len) { + (void)buf; + (void)len; return NULL; } -void TQIODeviceTransport::consume(uint32_t len) -{ - (void) len; +void TQIODeviceTransport::consume(uint32_t len) { + (void)len; throw TTransportException(TTransportException::UNKNOWN); } - -}}} // apache::thrift::transport - +} +} +} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/qt/TQIODeviceTransport.h b/lib/cpp/src/thrift/qt/TQIODeviceTransport.h index c5221dd6b..8091d3287 100644 --- a/lib/cpp/src/thrift/qt/TQIODeviceTransport.h +++ b/lib/cpp/src/thrift/qt/TQIODeviceTransport.h @@ -26,13 +26,16 @@ class QIODevice; -namespace apache { namespace thrift { namespace transport { +namespace apache { +namespace thrift { +namespace transport { /** * Transport that operates on a QIODevice (socket, file, etc). */ -class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> { - public: +class TQIODeviceTransport + : public apache::thrift::transport::TVirtualTransport<TQIODeviceTransport> { +public: explicit TQIODeviceTransport(boost::shared_ptr<QIODevice> dev); virtual ~TQIODeviceTransport(); @@ -41,7 +44,7 @@ class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport< bool peek(); void close(); - uint32_t readAll(uint8_t *buf, uint32_t len); + uint32_t readAll(uint8_t* buf, uint32_t len); uint32_t read(uint8_t* buf, uint32_t len); void write(const uint8_t* buf, uint32_t len); @@ -52,13 +55,14 @@ class TQIODeviceTransport : public apache::thrift::transport::TVirtualTransport< uint8_t* borrow(uint8_t* buf, uint32_t* len); void consume(uint32_t len); - private: - TQIODeviceTransport(const TQIODeviceTransport&); - TQIODeviceTransport& operator=(const TQIODeviceTransport&); +private: + TQIODeviceTransport(const TQIODeviceTransport&); + TQIODeviceTransport& operator=(const TQIODeviceTransport&); - boost::shared_ptr<QIODevice> dev_; + boost::shared_ptr<QIODevice> dev_; }; -}}} // apache::thrift::transport +} +} +} // apache::thrift::transport #endif // #ifndef _THRIFT_ASYNC_TQIODEVICE_TRANSPORT_H_ - diff --git a/lib/cpp/src/thrift/qt/TQTcpServer.cpp b/lib/cpp/src/thrift/qt/TQTcpServer.cpp index 2b3cf9852..a3211df5e 100644 --- a/lib/cpp/src/thrift/qt/TQTcpServer.cpp +++ b/lib/cpp/src/thrift/qt/TQTcpServer.cpp @@ -38,7 +38,9 @@ using apache::thrift::stdcxx::bind; QT_USE_NAMESPACE -namespace apache { namespace thrift { namespace async { +namespace apache { +namespace thrift { +namespace async { struct TQTcpServer::ConnectionContext { shared_ptr<QTcpSocket> connection_; @@ -50,31 +52,21 @@ struct TQTcpServer::ConnectionContext { shared_ptr<TTransport> transport, shared_ptr<TProtocol> iprot, shared_ptr<TProtocol> oprot) - : connection_(connection) - , transport_(transport) - , iprot_(iprot) - , oprot_(oprot) - {} + : connection_(connection), transport_(transport), iprot_(iprot), oprot_(oprot) {} }; TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server, shared_ptr<TAsyncProcessor> processor, shared_ptr<TProtocolFactory> pfact, QObject* parent) - : QObject(parent) - , server_(server) - , processor_(processor) - , pfact_(pfact) -{ + : QObject(parent), server_(server), processor_(processor), pfact_(pfact) { connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming())); } -TQTcpServer::~TQTcpServer() -{ +TQTcpServer::~TQTcpServer() { } -void TQTcpServer::processIncoming() -{ +void TQTcpServer::processIncoming() { while (server_->hasPendingConnections()) { // take ownership of the QTcpSocket; technically it could be deleted // when the QTcpServer is destroyed, but any real app should delete this @@ -89,25 +81,22 @@ void TQTcpServer::processIncoming() transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection)); iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); - } catch(...) { + } catch (...) { qWarning("[TQTcpServer] Failed to initialize transports/protocols"); continue; } - ctxMap_[connection.get()] = - shared_ptr<ConnectionContext>( - new ConnectionContext(connection, transport, iprot, oprot)); + ctxMap_[connection.get()] + = shared_ptr<ConnectionContext>(new ConnectionContext(connection, transport, iprot, oprot)); connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode())); // need to use QueuedConnection since we will be deleting the socket in the slot - connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()), - Qt::QueuedConnection); + connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()), Qt::QueuedConnection); } } -void TQTcpServer::beginDecode() -{ +void TQTcpServer::beginDecode() { QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender())); Q_ASSERT(connection); @@ -119,22 +108,20 @@ void TQTcpServer::beginDecode() shared_ptr<ConnectionContext> ctx = ctxMap_[connection]; try { - processor_->process( - bind(&TQTcpServer::finish, this, - ctx, apache::thrift::stdcxx::placeholders::_1), - ctx->iprot_, ctx->oprot_); - } catch(const TTransportException& ex) { - qWarning("[TQTcpServer] TTransportException during processing: '%s'", - ex.what()); + processor_ + ->process(bind(&TQTcpServer::finish, this, ctx, apache::thrift::stdcxx::placeholders::_1), + ctx->iprot_, + ctx->oprot_); + } catch (const TTransportException& ex) { + qWarning("[TQTcpServer] TTransportException during processing: '%s'", ex.what()); ctxMap_.erase(connection); - } catch(...) { + } catch (...) { qWarning("[TQTcpServer] Unknown processor exception"); ctxMap_.erase(connection); } } -void TQTcpServer::socketClosed() -{ +void TQTcpServer::socketClosed() { QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender())); Q_ASSERT(connection); @@ -146,12 +133,12 @@ void TQTcpServer::socketClosed() ctxMap_.erase(connection); } -void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) -{ +void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) { if (!healthy) { qWarning("[TQTcpServer] Processor failed to process data successfully"); ctxMap_.erase(ctx->connection_.get()); } } - -}}} // apache::thrift::async +} +} +} // apache::thrift::async diff --git a/lib/cpp/src/thrift/qt/TQTcpServer.h b/lib/cpp/src/thrift/qt/TQTcpServer.h index 2ef64a75e..3403f1e66 100644 --- a/lib/cpp/src/thrift/qt/TQTcpServer.h +++ b/lib/cpp/src/thrift/qt/TQTcpServer.h @@ -25,11 +25,17 @@ #include <boost/shared_ptr.hpp> -namespace apache { namespace thrift { namespace protocol { +namespace apache { +namespace thrift { +namespace protocol { class TProtocolFactory; -}}} // apache::thrift::protocol +} +} +} // apache::thrift::protocol -namespace apache { namespace thrift { namespace async { +namespace apache { +namespace thrift { +namespace async { class TAsyncProcessor; @@ -39,20 +45,20 @@ class TAsyncProcessor; * processor and a protocol factory, and then run the Qt event loop. */ class TQTcpServer : public QObject { - Q_OBJECT - public: + Q_OBJECT +public: TQTcpServer(boost::shared_ptr<QTcpServer> server, boost::shared_ptr<TAsyncProcessor> processor, boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocolFactory, QT_PREPEND_NAMESPACE(QObject)* parent = NULL); virtual ~TQTcpServer(); - private Q_SLOTS: +private Q_SLOTS: void processIncoming(); void beginDecode(); void socketClosed(); - private: +private: TQTcpServer(const TQTcpServer&); TQTcpServer& operator=(const TQTcpServer&); @@ -66,7 +72,8 @@ class TQTcpServer : public QObject { std::map<QT_PREPEND_NAMESPACE(QTcpSocket)*, boost::shared_ptr<ConnectionContext> > ctxMap_; }; - -}}} // apache::thrift::async +} +} +} // apache::thrift::async #endif // #ifndef _THRIFT_TASYNC_QTCP_SERVER_H_ |