diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 190 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 54 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 53 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConnectionInputHandler.h | 1 |
11 files changed, 363 insertions, 33 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 3ba07a180a..03b573c30f 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -242,7 +242,7 @@ Manageable* Broker::GetVhostObject(void) const } Manageable::status_t Broker::ManagementMethod (uint32_t methodId, - Args& /*_args*/) + Args& args) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -253,6 +253,10 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case management::Broker::METHOD_ECHO : status = Manageable::STATUS_OK; break; + case management::Broker::METHOD_CONNECT : + connect(dynamic_cast<management::ArgsBrokerConnect&>(args)); + status = Manageable::STATUS_OK; + break; case management::Broker::METHOD_JOINCLUSTER : case management::Broker::METHOD_LEAVECLUSTER : @@ -263,5 +267,10 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } +void Broker::connect(management::ArgsBrokerConnect& args) +{ + getAcceptor().connect(args.i_host, args.i_port, &factory); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index fb4b9916da..a2cb3466be 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -34,6 +34,7 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qpid/management/Broker.h" +#include "qpid/management/ArgsBrokerConnect.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/framing/FrameHandler.h" @@ -123,6 +124,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M Vhost::shared_ptr vhostObject; void declareStandardExchange(const std::string& name, const std::string& type); + void connect(management::ArgsBrokerConnect& args); }; }} diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 88761533cf..1e73a60144 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -21,14 +21,18 @@ #include "Connection.h" #include "SessionState.h" #include "BrokerAdapter.h" +#include "Bridge.h" #include "SemanticHandler.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/management/ArgsLinkBind.h" +#include "qpid/management/ArgsLinkPull.h" #include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> #include <algorithm> #include <iostream> @@ -47,7 +51,43 @@ using qpid::management::Args; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId) : +class Connection::MgmtClient : public Connection::MgmtWrapper +{ + management::Client::shared_ptr mgmtClient; + +public: + MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + ~MgmtClient(); + void received(framing::AMQFrame& frame); + management::ManagementObject::shared_ptr getManagementObject() const; + void closing(); +}; + +class Connection::MgmtLink : public Connection::MgmtWrapper +{ + typedef boost::ptr_vector<Bridge> Bridges; + + management::Link::shared_ptr mgmtLink; + Bridges created;//holds list of bridges pending creation + Bridges cancelled;//holds list of bridges pending cancellation + Bridges active;//holds active bridges + uint channelCounter; + sys::Mutex lock; + + void cancel(Bridge*); + +public: + MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + ~MgmtLink(); + void received(framing::AMQFrame& frame); + management::ManagementObject::shared_ptr getManagementObject() const; + void closing(); + void processPending(); + void process(Connection& connection, const management::Args& args); +}; + + +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) : broker(broker_), outputTasks(*out_), out(out_), @@ -56,7 +96,11 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std client(0), stagingThreshold(broker.getStagingThreshold()), adapter(*this), - mgmtClosing(0) + mgmtClosing(0), + mgmtId(mgmtId_) +{} + +void Connection::initMgmt(bool asLink) { Manageable* parent = broker.GetVhostObject (); @@ -66,18 +110,16 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (agent.get () != 0) { - mgmtObject = management::Client::shared_ptr - (new management::Client (this, parent, mgmtId)); - agent->addObject (mgmtObject); + if (asLink) { + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId)); + } else { + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId)); + } } } } -Connection::~Connection () -{ - if (mgmtObject.get () != 0) - mgmtObject->resourceDestroy (); -} +Connection::~Connection () {} void Connection::received(framing::AMQFrame& frame){ if (mgmtClosing) @@ -88,12 +130,8 @@ void Connection::received(framing::AMQFrame& frame){ } else { getChannel(frame.getChannel()).in(frame); } - - if (mgmtObject.get () != 0) - { - mgmtObject->inc_framesFromClient (); - mgmtObject->inc_bytesFromClient (frame.size ()); - } + + if (mgmtWrapper.get()) mgmtWrapper->received(frame); } void Connection::close( @@ -107,6 +145,7 @@ void Connection::close( void Connection::initiated(const framing::ProtocolInitiation& header) { version = ProtocolVersion(header.getMajor(), header.getMinor()); adapter.init(header); + initMgmt(); } void Connection::idleOut(){} @@ -133,8 +172,12 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } bool Connection::doOutput() -{ +{ try{ + //process any pending mgmt commands: + if (mgmtWrapper.get()) mgmtWrapper->processPending(); + + //then do other output as needed: return outputTasks.doOutput(); }catch(ConnectionException& e){ close(e.code, e.what(), 0, 0); @@ -159,11 +202,11 @@ SessionHandler& Connection::getChannel(ChannelId id) { ManagementObject::shared_ptr Connection::GetManagementObject (void) const { - return dynamic_pointer_cast<ManagementObject> (mgmtObject); + return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); } Manageable::status_t Connection::ManagementMethod (uint32_t methodId, - Args& /*args*/) + Args& args) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -173,7 +216,13 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, { case management::Client::METHOD_CLOSE : mgmtClosing = 1; - mgmtObject->set_closing (1); + if (mgmtWrapper.get()) mgmtWrapper->closing(); + status = Manageable::STATUS_OK; + break; + case management::Link::METHOD_BRIDGE : + //queue this up and request chance to do output (i.e. get connections thread of control): + mgmtWrapper->process(*this, args); + out->activateOutput(); status = Manageable::STATUS_OK; break; } @@ -192,5 +241,106 @@ const string& Connection::getUserId() const return userId; } +Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) + : channelCounter(1) +{ + mgmtLink = management::Link::shared_ptr + (new management::Link(conn, parent, mgmtId)); + agent->addObject (mgmtLink); +} + +Connection::MgmtLink::~MgmtLink() +{ + if (mgmtLink.get () != 0) + mgmtLink->resourceDestroy (); +} + +void Connection::MgmtLink::received(framing::AMQFrame& frame) +{ + if (mgmtLink.get () != 0) + { + mgmtLink->inc_framesFromPeer (); + mgmtLink->inc_bytesFromPeer (frame.size ()); + } +} + +management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const +{ + return dynamic_pointer_cast<ManagementObject>(mgmtLink); +} + +void Connection::MgmtLink::closing() +{ + if (mgmtLink) mgmtLink->set_closing (1); +} + +void Connection::MgmtLink::processPending() +{ + //process any pending creates + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + i->create(); + } + active.transfer(active.end(), created.begin(), created.end(), created); + } + if (!cancelled.empty()) { + //process any pending cancellations + for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { + i->cancel(); + } + cancelled.clear(); + } +} + +void Connection::MgmtLink::process(Connection& connection, const management::Args& args) +{ + created.push_back(new Bridge(channelCounter++, connection, + boost::bind(&MgmtLink::cancel, this, _1), + dynamic_cast<const management::ArgsLinkBridge&>(args))); +} + +void Connection::MgmtLink::cancel(Bridge* b) +{ + //need to take this out the active map and add it to the cancelled map + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if (&(*i) == b) { + cancelled.transfer(cancelled.end(), i, active); + break; + } + } +} + +Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) +{ + mgmtClient = management::Client::shared_ptr + (new management::Client (conn, parent, mgmtId)); + agent->addObject (mgmtClient); +} + +Connection::MgmtClient::~MgmtClient() +{ + if (mgmtClient.get () != 0) + mgmtClient->resourceDestroy (); +} + +void Connection::MgmtClient::received(framing::AMQFrame& frame) +{ + if (mgmtClient.get () != 0) + { + mgmtClient->inc_framesFromClient (); + mgmtClient->inc_bytesFromClient (frame.size ()); + } +} + +management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const +{ + return dynamic_pointer_cast<ManagementObject>(mgmtClient); +} + +void Connection::MgmtClient::closing() +{ + if (mgmtClient) mgmtClient->set_closing (1); +} + }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 781b2304ec..99b394dda0 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -21,6 +21,7 @@ #ifndef _Connection_ #define _Connection_ +#include <memory> #include <sstream> #include <vector> @@ -41,6 +42,7 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Client.h" +#include "qpid/management/Link.h" #include <boost/ptr_container/ptr_map.hpp> @@ -87,6 +89,7 @@ class Connection : public sys::ConnectionInputHandler, void idleIn(); void closed(); bool doOutput(); + framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); } void closeChannel(framing::ChannelId channel); @@ -98,10 +101,31 @@ class Connection : public sys::ConnectionInputHandler, void setUserId(const string& uid); const string& getUserId() const; + void initMgmt(bool asLink = false); + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + /** + * Connection may appear, for the purposes of management, as a + * normal client initiated connection or as an agent initiated + * inter-broker link. This wrapper abstracts the common interface + * for both. + */ + class MgmtWrapper + { + public: + virtual ~MgmtWrapper(){} + virtual void received(framing::AMQFrame& frame) = 0; + virtual management::ManagementObject::shared_ptr getManagementObject() const = 0; + virtual void closing() = 0; + virtual void processPending(){} + virtual void process(Connection&, const management::Args&){} + }; + class MgmtClient; + class MgmtLink; + framing::ProtocolVersion version; ChannelMap channels; sys::ConnectionOutputHandler* out; @@ -110,9 +134,10 @@ class Connection : public sys::ConnectionInputHandler, framing::AMQP_ClientProxy::Connection* client; uint64_t stagingThreshold; ConnectionHandler adapter; - management::Client::shared_ptr mgmtObject; + std::auto_ptr<MgmtWrapper> mgmtWrapper; bool mgmtClosing; string userId; + const std::string mgmtId; }; }} diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 45c2f29d87..e296d52214 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -23,6 +23,7 @@ #include "ConnectionHandler.h" #include "Connection.h" #include "qpid/framing/ConnectionStartBody.h" +#include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" using namespace qpid; @@ -40,6 +41,7 @@ void ConnectionHandler::init(const framing::ProtocolInitiation& header) { FieldTable properties; string mechanisms(PLAIN); string locales(en_US); + handler->serverMode = true; handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); } @@ -52,8 +54,13 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); try{ - if (!invoke(*handler.get(), *method)) - throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); + if (handler->serverMode) { + if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method)) + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); + } else { + if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method)) + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); + } }catch(ConnectionException& e){ handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ @@ -63,9 +70,10 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {} -ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), connection(c) {} +ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), server(c.getOutput()), + connection(c), serverMode(false) {} -void ConnectionHandler::Handler::startOk(const FieldTable& /*clientProperties*/, +void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/, const string& mechanism, const string& response, const string& /*locale*/) { @@ -110,3 +118,41 @@ void ConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*r void ConnectionHandler::Handler::closeOk(){ connection.getOutput().close(); } + + +void ConnectionHandler::Handler::start(uint8_t /*versionMajor*/, + uint8_t /*versionMinor*/, + const FieldTable& /*serverProperties*/, + const string& /*mechanisms*/, + const string& /*locales*/) +{ + string uid = "qpidd"; + string pwd = "qpidd"; + string response = ((char)0) + uid + ((char)0) + pwd; + server.startOk(FieldTable(), PLAIN, response, en_US); + connection.initMgmt(true); +} + +void ConnectionHandler::Handler::secure(const string& /*challenge*/) +{ + server.secureOk(""); +} + +void ConnectionHandler::Handler::tune(uint16_t channelMax, + uint32_t frameMax, + uint16_t heartbeat) +{ + connection.setFrameMax(frameMax); + connection.setHeartbeat(heartbeat); + server.tuneOk(channelMax, frameMax, heartbeat); + server.open("/", "", true); +} + +void ConnectionHandler::Handler::openOk(const string& /*knownHosts*/) +{ +} + +void ConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/) +{ + +} diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index aa8c9366cd..2a581d5675 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -24,8 +24,10 @@ #include <memory> #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" @@ -39,10 +41,13 @@ class Connection; // TODO aconway 2007-09-18: Rename to ConnectionHandler class ConnectionHandler : public framing::FrameHandler { - struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler + struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, + public framing::AMQP_ClientOperations::ConnectionHandler { framing::AMQP_ClientProxy::Connection client; + framing::AMQP_ServerProxy::Connection server; Connection& connection; + bool serverMode; Handler(Connection& connection); void startOk(const qpid::framing::FieldTable& clientProperties, @@ -55,6 +60,23 @@ class ConnectionHandler : public framing::FrameHandler void close(uint16_t replyCode, const std::string& replyText, uint16_t classId, uint16_t methodId); void closeOk(); + + + void start(uint8_t versionMajor, + uint8_t versionMinor, + const qpid::framing::FieldTable& serverProperties, + const std::string& mechanisms, + const std::string& locales); + + void secure(const std::string& challenge); + + void tune(uint16_t channelMax, + uint32_t frameMax, + uint16_t heartbeat); + + void openOk(const std::string& knownHosts); + + void redirect(const std::string& host, const std::string& knownHosts); }; std::auto_ptr<Handler> handler; public: diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index bbdbccad7d..fb46cb522d 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -23,6 +23,7 @@ #include "Connection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" +#include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" @@ -57,17 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(*this, *m)) + if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { return; - else if (session.get()) { + } else if (session.get()) { boost::optional<SequenceNumber> ack=session->received(f); session->in.handle(f); if (ack) peerSession.ack(*ack, SequenceNumberSet()); - } - else if (!ignoring) + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else if (!ignoring) { throw ChannelErrorException( QPID_MSG("Channel " << channel.get() << " is not open")); + } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. session->detach(); @@ -188,4 +191,17 @@ void SessionHandler::solicitAck() { peerSession.ack(session->sendingAck(), SequenceNumberSet()); } +void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) +{ + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, detachedLifetime)); + session.reset(state.release()); +} + +void SessionHandler::detached() +{ + connection.broker.getSessionManager().suspend(session); + session.reset(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 9a68ddb46f..6f6f5e941f 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -23,6 +23,7 @@ */ #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" @@ -43,6 +44,7 @@ class SessionState; */ class SessionHandler : public framing::FrameHandler::InOutHandler, public framing::AMQP_ServerOperations::SessionHandler, + public framing::AMQP_ClientOperations::SessionHandler, private boost::noncopyable { public: @@ -81,12 +83,17 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, const framing::SequenceNumberSet& seenFrameSet); void highWaterMark(uint32_t lastSentMark); void solicitAck(); + + //extra methods required for assuming client role + void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); + void detached(); void assertAttached(const char* method) const; void assertActive(const char* method) const; void assertClosed(const char* method) const; + Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 2eee8b4abd..1e87b76e04 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -38,6 +38,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; virtual void run(ConnectionInputHandlerFactory* factory) = 0; + virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 485f8c20f4..650bb31a68 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -30,6 +30,7 @@ #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/AMQDataBlock.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" @@ -53,6 +54,7 @@ class AsynchIOAcceptor : public Acceptor { AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); + void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory); void shutdown(); uint16_t getPort() const; @@ -92,13 +94,17 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { bool initiated; bool readError; std::string identifier; + bool isClient; + + void write(const framing::AMQDataBlock&); public: AsynchIOHandler() : inputHandler(0), frameQueueClosed(false), initiated(false), - readError(false) + readError(false), + isClient(false) {} ~AsynchIOHandler() { @@ -107,6 +113,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { delete inputHandler; } + void setClient() { isClient = true; } + void init(AsynchIO* a, ConnectionInputHandler* h) { aio = a; inputHandler = h; @@ -179,11 +187,48 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { t[i].join(); } } + +void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +{ + Socket* socket = new Socket();//Should be deleted by handle when socket closes + socket->connect(host, port); + AsynchIOHandler* async = new AsynchIOHandler; + async->setClient(); + ConnectionInputHandler* handler = f->create(async, *socket); + AsynchIO* aio = new AsynchIO(*socket, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); + + // Give connection some buffers to use + for (int i = 0; i < 4; i++) { + aio->queueReadBuffer(new Buff); + } + aio->start(poller); + +} + void AsynchIOAcceptor::shutdown() { poller->shutdown(); } + +void AsynchIOHandler::write(const framing::AMQDataBlock& data) +{ + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + // Output side void AsynchIOHandler::send(framing::AMQFrame& frame) { // TODO: Need to find out if we are in the callback context, @@ -274,6 +319,12 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ + if (isClient && !initiated) { + //get & write protocol header from upper layers + write(inputHandler->getInitiation()); + initiated = true; + return; + } ScopedLock<Mutex> l(frameQueueLock); if (frameQueue.empty()) { diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h index 226096c5ef..1936b5ec50 100644 --- a/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -36,6 +36,7 @@ namespace sys { public TimeoutHandler, public OutputTask { public: + virtual qpid::framing::ProtocolInitiation getInitiation() = 0; virtual void closed() = 0; }; |