From e785a01818b6f65ca365122c20a4c4f746b7080e Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 18 Apr 2008 19:44:25 +0000 Subject: Split AsynchIOAcceptor into IOHandler and connection control code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649666 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 5 +- cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 296 ---------------------------------- cpp/src/qpid/sys/AsynchIOHandler.cpp | 172 ++++++++++++++++++++ cpp/src/qpid/sys/AsynchIOHandler.h | 70 ++++++++ cpp/src/qpid/sys/TCPIOPlugin.cpp | 109 +++++++++++++ 5 files changed, 354 insertions(+), 298 deletions(-) delete mode 100644 cpp/src/qpid/sys/AsynchIOAcceptor.cpp create mode 100644 cpp/src/qpid/sys/AsynchIOHandler.cpp create mode 100644 cpp/src/qpid/sys/AsynchIOHandler.h create mode 100644 cpp/src/qpid/sys/TCPIOPlugin.cpp (limited to 'cpp') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 40e094de5d..09fd75d5c1 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -172,7 +172,7 @@ libqpidcommon_la_SOURCES = \ qpid/Plugin.cpp \ qpid/Url.cpp \ qpid/sys/AggregateOutput.cpp \ - qpid/sys/AsynchIOAcceptor.cpp \ + qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ qpid/sys/Runnable.cpp \ qpid/sys/SystemInfo.cpp \ @@ -259,7 +259,8 @@ libqpidbroker_la_SOURCES = \ qpid/management/Manageable.cpp \ qpid/management/ManagementAgent.cpp \ qpid/management/ManagementExchange.cpp \ - qpid/management/ManagementObject.cpp + qpid/management/ManagementObject.cpp \ + qpid/sys/TCPIOPlugin.cpp libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp deleted file mode 100644 index 5133fde183..0000000000 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ /dev/null @@ -1,296 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Acceptor.h" - -#include "Socket.h" -#include "AsynchIO.h" -#include "Mutex.h" -#include "Thread.h" - -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/log/Statement.h" - -#include -#include -#include -#include -#include -#include - -namespace qpid { -namespace sys { - -class AsynchIOAcceptor : public Acceptor { - Socket listener; - const uint16_t listeningPort; - std::auto_ptr acceptor; - - public: - AsynchIOAcceptor(int16_t port, int backlog); - void run(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); - - uint16_t getPort() const; - std::string getHost() const; - - private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); -}; - -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) -{ - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); -} - -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : - listeningPort(listener.listen(port, backlog)), - acceptor(0) -{} - -// Buffer definition -struct Buff : public AsynchIO::BufferBase { - Buff() : - AsynchIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - -class AsynchIOHandler : public OutputControl { - std::string identifier; - AsynchIO* aio; - ConnectionCodec::Factory* factory; - ConnectionCodec* codec; - bool readError; - bool isClient; - - void write(const framing::ProtocolInitiation&); - - public: - AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : - identifier(id), - aio(0), - factory(f), - codec(0), - readError(false), - isClient(false) - {} - - ~AsynchIOHandler() { - if (codec) - codec->closed(); - delete codec; - } - - void setClient() { isClient = true; } - - void init(AsynchIO* a) { - aio = a; - } - - // Output side - void close(); - void activateOutput(); - - // Input side - void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); - void eof(AsynchIO& aio); - void disconnect(AsynchIO& aio); - - // Notifications - void nobuffs(AsynchIO& aio); - void idle(AsynchIO& aio); - void closedSocket(AsynchIO& aio, const Socket& s); -}; - -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { - AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); - AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio); - - // Give connection some buffers to use - for (int i = 0; i < 4; i++) { - aio->queueReadBuffer(new Buff); - } - aio->start(poller); -} - - -uint16_t AsynchIOAcceptor::getPort() const { - return listeningPort; // Immutable no need for lock. -} - -std::string AsynchIOAcceptor::getHost() const { - return listener.getSockname(); -} - -void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - acceptor.reset( - new AsynchAcceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); - acceptor->start(poller); -} - -void AsynchIOAcceptor::connect( - Poller::shared_ptr poller, - const std::string& host, int16_t port, - ConnectionCodec::Factory* f) -{ - Socket* socket = new Socket();//Should be deleted by handle when socket closes - socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); - async->setClient(); - AsynchIO* aio = new AsynchIO(*socket, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio); - - // Give connection some buffers to use - for (int i = 0; i < 4; i++) { - aio->queueReadBuffer(new Buff); - } - aio->start(poller); -} - -void AsynchIOHandler::write(const framing::ProtocolInitiation& data) -{ - QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - data.encode(out); - buff->dataCount = data.size(); - aio->queueWrite(buff); -} - -void AsynchIOHandler::activateOutput() { - aio->notifyPendingWrite(); -} - -// Input side -void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { - if (readError) { - return; - } - size_t decoded = 0; - if (codec) { // Already initiated - try { - decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); - }catch(const std::exception& e){ - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); - } - }else{ - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - decoded = in.getPosition(); - QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")"); - try { - codec = factory->create(protocolInit.getVersion(), *this, identifier); - if (!codec) { - //TODO: may still want to revise this... - //send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); - readError = true; - aio->queueWriteClose(); - } - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); - } - } - } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (decoded != size_t(buff->dataCount)) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += decoded; - buff->dataCount -= decoded; - aio->unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio->queueReadBuffer(buff); - } -} - -void AsynchIOHandler::eof(AsynchIO&) { - QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - if (codec) codec->closed(); - aio->queueWriteClose(); -} - -void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { - // If we closed with data still to send log a warning - if (!aio->writeQueueEmpty()) { - QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); - } - delete &s; - aio->queueForDeletion(); - delete this; -} - -void AsynchIOHandler::disconnect(AsynchIO& a) { - // treat the same as eof - eof(a); -} - -// Notifications -void AsynchIOHandler::nobuffs(AsynchIO&) { -} - -void AsynchIOHandler::idle(AsynchIO&){ - if (isClient && codec == 0) { - codec = factory->create(*this, identifier); - write(framing::ProtocolInitiation(codec->getVersion())); - return; - } - if (codec == 0) return; - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; - size_t encoded=codec->encode(buff->bytes, buff->byteCount); - buff->dataCount = encoded; - aio->queueWrite(buff); - } - if (codec->isClosed()) - aio->queueWriteClose(); -} - -}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp new file mode 100644 index 0000000000..ca2bd7c93c --- /dev/null +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIOHandler.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace sys { + +// Buffer definition +struct Buff : public AsynchIO::BufferBase { + Buff() : + AsynchIO::BufferBase(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : + identifier(id), + aio(0), + factory(f), + codec(0), + readError(false), + isClient(false) +{} + +AsynchIOHandler::~AsynchIOHandler() { + if (codec) + codec->closed(); + delete codec; +} + +void AsynchIOHandler::init(AsynchIO* a, int numBuffs) { + aio = a; + + // Give connection some buffers to use + for (int i = 0; i < numBuffs; i++) { + aio->queueReadBuffer(new Buff); + } +} + +void AsynchIOHandler::write(const framing::ProtocolInitiation& data) +{ + QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + +void AsynchIOHandler::activateOutput() { + aio->notifyPendingWrite(); +} + +// Input side +void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { + if (readError) { + return; + } + size_t decoded = 0; + if (codec) { // Already initiated + try { + decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + }catch(const std::exception& e){ + QPID_LOG(error, e.what()); + readError = true; + aio->queueWriteClose(); + } + }else{ + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")"); + try { + codec = factory->create(protocolInit.getVersion(), *this, identifier); + if (!codec) { + //TODO: may still want to revise this... + //send valid version header & close connection. + write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + readError = true; + aio->queueWriteClose(); + } + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); + readError = true; + aio->queueWriteClose(); + } + } + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded != size_t(buff->dataCount)) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio->unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buff); + } +} + +void AsynchIOHandler::eof(AsynchIO&) { + QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); + if (codec) codec->closed(); + aio->queueWriteClose(); +} + +void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { + // If we closed with data still to send log a warning + if (!aio->writeQueueEmpty()) { + QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); + } + delete &s; + aio->queueForDeletion(); + delete this; +} + +void AsynchIOHandler::disconnect(AsynchIO& a) { + // treat the same as eof + eof(a); +} + +// Notifications +void AsynchIOHandler::nobuffs(AsynchIO&) { +} + +void AsynchIOHandler::idle(AsynchIO&){ + if (isClient && codec == 0) { + codec = factory->create(*this, identifier); + write(framing::ProtocolInitiation(codec->getVersion())); + return; + } + if (codec == 0) return; + if (codec->canEncode()) { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) buff = new Buff; + size_t encoded=codec->encode(buff->bytes, buff->byteCount); + buff->dataCount = encoded; + aio->queueWrite(buff); + } + if (codec->isClosed()) + aio->queueWriteClose(); +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h new file mode 100644 index 0000000000..530613367a --- /dev/null +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -0,0 +1,70 @@ +#ifndef _sys_AsynchIOHandler_h +#define _sys_AsynchIOHandler_h +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "OutputControl.h" +#include "ConnectionCodec.h" +#include "AsynchIO.h" + +namespace qpid { + +namespace framing { + class ProtocolInitiation; +} + +namespace sys { + +class AsynchIOHandler : public OutputControl { + std::string identifier; + AsynchIO* aio; + ConnectionCodec::Factory* factory; + ConnectionCodec* codec; + bool readError; + bool isClient; + + void write(const framing::ProtocolInitiation&); + + public: + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); + ~AsynchIOHandler(); + void init(AsynchIO* a, int numBuffs); + + void setClient() { isClient = true; } + + // Output side + void close(); + void activateOutput(); + + // Input side + void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); + void eof(AsynchIO& aio); + void disconnect(AsynchIO& aio); + + // Notifications + void nobuffs(AsynchIO& aio); + void idle(AsynchIO& aio); + void closedSocket(AsynchIO& aio, const Socket& s); +}; + +}} // namespace qpid::sys + +#endif // _sys_AsynchIOHandler_h diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp new file mode 100644 index 0000000000..779319b7e4 --- /dev/null +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Acceptor.h" + +#include "AsynchIOHandler.h" +#include "AsynchIO.h" + +#include +#include + +namespace qpid { +namespace sys { + +class AsynchIOAcceptor : public Acceptor { + Socket listener; + const uint16_t listeningPort; + std::auto_ptr acceptor; + + public: + AsynchIOAcceptor(int16_t port, int backlog); + void run(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); + + uint16_t getPort() const; + std::string getHost() const; + + private: + void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); +}; + +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) +{ + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); +} + +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : + listeningPort(listener.listen(port, backlog)), + acceptor(0) +{} + +void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + AsynchIO* aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, 4); + aio->start(poller); +} + + +uint16_t AsynchIOAcceptor::getPort() const { + return listeningPort; // Immutable no need for lock. +} + +std::string AsynchIOAcceptor::getHost() const { + return listener.getSockname(); +} + +void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + acceptor.reset( + new AsynchAcceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + acceptor->start(poller); +} + +void AsynchIOAcceptor::connect( + Poller::shared_ptr poller, + const std::string& host, int16_t port, + ConnectionCodec::Factory* f) +{ + Socket* socket = new Socket();//Should be deleted by handle when socket closes + socket->connect(host, port); + AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); + async->setClient(); + AsynchIO* aio = new AsynchIO(*socket, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, 4); + aio->start(poller); +} + +}} // namespace qpid::sys -- cgit v1.2.1