summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-02-01 18:21:01 +0000
committerGordon Sim <gsim@apache.org>2008-02-01 18:21:01 +0000
commit5891c19a838bd8987fbc04d23923f4f5f2ca4636 (patch)
tree1b8b75e076ebded9b57c84b547b8cf9b80a71427 /cpp/src
parent4db96f7ad47c69982cdc6cf7b5e5c47b00f1144b (diff)
downloadqpid-python-5891c19a838bd8987fbc04d23923f4f5f2ca4636.tar.gz
Initial cut of inter-broker bridging
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp11
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp190
-rw-r--r--cpp/src/qpid/broker/Connection.h27
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp54
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h26
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp24
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h7
-rw-r--r--cpp/src/qpid/sys/Acceptor.h1
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp53
-rw-r--r--cpp/src/qpid/sys/ConnectionInputHandler.h1
11 files changed, 363 insertions, 33 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 3ba07a180a..03b573c30f 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -242,7 +242,7 @@ Manageable* Broker::GetVhostObject(void) const
}
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
- Args& /*_args*/)
+ Args& args)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -253,6 +253,10 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case management::Broker::METHOD_ECHO :
status = Manageable::STATUS_OK;
break;
+ case management::Broker::METHOD_CONNECT :
+ connect(dynamic_cast<management::ArgsBrokerConnect&>(args));
+ status = Manageable::STATUS_OK;
+ break;
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
@@ -263,5 +267,10 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
+void Broker::connect(management::ArgsBrokerConnect& args)
+{
+ getAcceptor().connect(args.i_host, args.i_port, &factory);
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index fb4b9916da..a2cb3466be 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -34,6 +34,7 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/management/Broker.h"
+#include "qpid/management/ArgsBrokerConnect.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
#include "qpid/framing/FrameHandler.h"
@@ -123,6 +124,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
Vhost::shared_ptr vhostObject;
void declareStandardExchange(const std::string& name, const std::string& type);
+ void connect(management::ArgsBrokerConnect& args);
};
}}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 88761533cf..1e73a60144 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -21,14 +21,18 @@
#include "Connection.h"
#include "SessionState.h"
#include "BrokerAdapter.h"
+#include "Bridge.h"
#include "SemanticHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ArgsLinkBind.h"
+#include "qpid/management/ArgsLinkPull.h"
#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
#include <algorithm>
#include <iostream>
@@ -47,7 +51,43 @@ using qpid::management::Args;
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId) :
+class Connection::MgmtClient : public Connection::MgmtWrapper
+{
+ management::Client::shared_ptr mgmtClient;
+
+public:
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtClient();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+};
+
+class Connection::MgmtLink : public Connection::MgmtWrapper
+{
+ typedef boost::ptr_vector<Bridge> Bridges;
+
+ management::Link::shared_ptr mgmtLink;
+ Bridges created;//holds list of bridges pending creation
+ Bridges cancelled;//holds list of bridges pending cancellation
+ Bridges active;//holds active bridges
+ uint channelCounter;
+ sys::Mutex lock;
+
+ void cancel(Bridge*);
+
+public:
+ MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtLink();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+ void processPending();
+ void process(Connection& connection, const management::Args& args);
+};
+
+
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
broker(broker_),
outputTasks(*out_),
out(out_),
@@ -56,7 +96,11 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
client(0),
stagingThreshold(broker.getStagingThreshold()),
adapter(*this),
- mgmtClosing(0)
+ mgmtClosing(0),
+ mgmtId(mgmtId_)
+{}
+
+void Connection::initMgmt(bool asLink)
{
Manageable* parent = broker.GetVhostObject ();
@@ -66,18 +110,16 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (agent.get () != 0)
{
- mgmtObject = management::Client::shared_ptr
- (new management::Client (this, parent, mgmtId));
- agent->addObject (mgmtObject);
+ if (asLink) {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ } else {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ }
}
}
}
-Connection::~Connection ()
-{
- if (mgmtObject.get () != 0)
- mgmtObject->resourceDestroy ();
-}
+Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
if (mgmtClosing)
@@ -88,12 +130,8 @@ void Connection::received(framing::AMQFrame& frame){
} else {
getChannel(frame.getChannel()).in(frame);
}
-
- if (mgmtObject.get () != 0)
- {
- mgmtObject->inc_framesFromClient ();
- mgmtObject->inc_bytesFromClient (frame.size ());
- }
+
+ if (mgmtWrapper.get()) mgmtWrapper->received(frame);
}
void Connection::close(
@@ -107,6 +145,7 @@ void Connection::close(
void Connection::initiated(const framing::ProtocolInitiation& header) {
version = ProtocolVersion(header.getMajor(), header.getMinor());
adapter.init(header);
+ initMgmt();
}
void Connection::idleOut(){}
@@ -133,8 +172,12 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
bool Connection::doOutput()
-{
+{
try{
+ //process any pending mgmt commands:
+ if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+ //then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.what(), 0, 0);
@@ -159,11 +202,11 @@ SessionHandler& Connection::getChannel(ChannelId id) {
ManagementObject::shared_ptr Connection::GetManagementObject (void) const
{
- return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+ return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& /*args*/)
+ Args& args)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -173,7 +216,13 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
{
case management::Client::METHOD_CLOSE :
mgmtClosing = 1;
- mgmtObject->set_closing (1);
+ if (mgmtWrapper.get()) mgmtWrapper->closing();
+ status = Manageable::STATUS_OK;
+ break;
+ case management::Link::METHOD_BRIDGE :
+ //queue this up and request chance to do output (i.e. get connections thread of control):
+ mgmtWrapper->process(*this, args);
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
}
@@ -192,5 +241,106 @@ const string& Connection::getUserId() const
return userId;
}
+Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+ : channelCounter(1)
+{
+ mgmtLink = management::Link::shared_ptr
+ (new management::Link(conn, parent, mgmtId));
+ agent->addObject (mgmtLink);
+}
+
+Connection::MgmtLink::~MgmtLink()
+{
+ if (mgmtLink.get () != 0)
+ mgmtLink->resourceDestroy ();
+}
+
+void Connection::MgmtLink::received(framing::AMQFrame& frame)
+{
+ if (mgmtLink.get () != 0)
+ {
+ mgmtLink->inc_framesFromPeer ();
+ mgmtLink->inc_bytesFromPeer (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void Connection::MgmtLink::closing()
+{
+ if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void Connection::MgmtLink::processPending()
+{
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create();
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
+{
+ created.push_back(new Bridge(channelCounter++, connection,
+ boost::bind(&MgmtLink::cancel, this, _1),
+ dynamic_cast<const management::ArgsLinkBridge&>(args)));
+}
+
+void Connection::MgmtLink::cancel(Bridge* b)
+{
+ //need to take this out the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == b) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+}
+
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+ mgmtClient = management::Client::shared_ptr
+ (new management::Client (conn, parent, mgmtId));
+ agent->addObject (mgmtClient);
+}
+
+Connection::MgmtClient::~MgmtClient()
+{
+ if (mgmtClient.get () != 0)
+ mgmtClient->resourceDestroy ();
+}
+
+void Connection::MgmtClient::received(framing::AMQFrame& frame)
+{
+ if (mgmtClient.get () != 0)
+ {
+ mgmtClient->inc_framesFromClient ();
+ mgmtClient->inc_bytesFromClient (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void Connection::MgmtClient::closing()
+{
+ if (mgmtClient) mgmtClient->set_closing (1);
+}
+
}}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 781b2304ec..99b394dda0 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -21,6 +21,7 @@
#ifndef _Connection_
#define _Connection_
+#include <memory>
#include <sstream>
#include <vector>
@@ -41,6 +42,7 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Client.h"
+#include "qpid/management/Link.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -87,6 +89,7 @@ class Connection : public sys::ConnectionInputHandler,
void idleIn();
void closed();
bool doOutput();
+ framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
void closeChannel(framing::ChannelId channel);
@@ -98,10 +101,31 @@ class Connection : public sys::ConnectionInputHandler,
void setUserId(const string& uid);
const string& getUserId() const;
+ void initMgmt(bool asLink = false);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ /**
+ * Connection may appear, for the purposes of management, as a
+ * normal client initiated connection or as an agent initiated
+ * inter-broker link. This wrapper abstracts the common interface
+ * for both.
+ */
+ class MgmtWrapper
+ {
+ public:
+ virtual ~MgmtWrapper(){}
+ virtual void received(framing::AMQFrame& frame) = 0;
+ virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
+ virtual void closing() = 0;
+ virtual void processPending(){}
+ virtual void process(Connection&, const management::Args&){}
+ };
+ class MgmtClient;
+ class MgmtLink;
+
framing::ProtocolVersion version;
ChannelMap channels;
sys::ConnectionOutputHandler* out;
@@ -110,9 +134,10 @@ class Connection : public sys::ConnectionInputHandler,
framing::AMQP_ClientProxy::Connection* client;
uint64_t stagingThreshold;
ConnectionHandler adapter;
- management::Client::shared_ptr mgmtObject;
+ std::auto_ptr<MgmtWrapper> mgmtWrapper;
bool mgmtClosing;
string userId;
+ const std::string mgmtId;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 45c2f29d87..e296d52214 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -23,6 +23,7 @@
#include "ConnectionHandler.h"
#include "Connection.h"
#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
using namespace qpid;
@@ -40,6 +41,7 @@ void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
FieldTable properties;
string mechanisms(PLAIN);
string locales(en_US);
+ handler->serverMode = true;
handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
}
@@ -52,8 +54,13 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
try{
- if (!invoke(*handler.get(), *method))
- throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ if (handler->serverMode) {
+ if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ } else {
+ if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
+ }
}catch(ConnectionException& e){
handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
@@ -63,9 +70,10 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {}
-ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), connection(c) {}
+ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), server(c.getOutput()),
+ connection(c), serverMode(false) {}
-void ConnectionHandler::Handler::startOk(const FieldTable& /*clientProperties*/,
+void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
const string& mechanism,
const string& response, const string& /*locale*/)
{
@@ -110,3 +118,41 @@ void ConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*r
void ConnectionHandler::Handler::closeOk(){
connection.getOutput().close();
}
+
+
+void ConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
+ uint8_t /*versionMinor*/,
+ const FieldTable& /*serverProperties*/,
+ const string& /*mechanisms*/,
+ const string& /*locales*/)
+{
+ string uid = "qpidd";
+ string pwd = "qpidd";
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ server.startOk(FieldTable(), PLAIN, response, en_US);
+ connection.initMgmt(true);
+}
+
+void ConnectionHandler::Handler::secure(const string& /*challenge*/)
+{
+ server.secureOk("");
+}
+
+void ConnectionHandler::Handler::tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat)
+{
+ connection.setFrameMax(frameMax);
+ connection.setHeartbeat(heartbeat);
+ server.tuneOk(channelMax, frameMax, heartbeat);
+ server.open("/", "", true);
+}
+
+void ConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
+{
+}
+
+void ConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/)
+{
+
+}
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index aa8c9366cd..2a581d5675 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -24,8 +24,10 @@
#include <memory>
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -39,10 +41,13 @@ class Connection;
// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
{
framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
Connection& connection;
+ bool serverMode;
Handler(Connection& connection);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -55,6 +60,23 @@ class ConnectionHandler : public framing::FrameHandler
void close(uint16_t replyCode, const std::string& replyText,
uint16_t classId, uint16_t methodId);
void closeOk();
+
+
+ void start(uint8_t versionMajor,
+ uint8_t versionMinor,
+ const qpid::framing::FieldTable& serverProperties,
+ const std::string& mechanisms,
+ const std::string& locales);
+
+ void secure(const std::string& challenge);
+
+ void tune(uint16_t channelMax,
+ uint32_t frameMax,
+ uint16_t heartbeat);
+
+ void openOk(const std::string& knownHosts);
+
+ void redirect(const std::string& host, const std::string& knownHosts);
};
std::auto_ptr<Handler> handler;
public:
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index bbdbccad7d..fb46cb522d 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -23,6 +23,7 @@
#include "Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -57,17 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(*this, *m))
+ if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
return;
- else if (session.get()) {
+ } else if (session.get()) {
boost::optional<SequenceNumber> ack=session->received(f);
session->in.handle(f);
if (ack)
peerSession.ack(*ack, SequenceNumberSet());
- }
- else if (!ignoring)
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (!ignoring) {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
@@ -188,4 +191,17 @@ void SessionHandler::solicitAck() {
peerSession.ack(session->sendingAck(), SequenceNumberSet());
}
+void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+}
+
+void SessionHandler::detached()
+{
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 9a68ddb46f..6f6f5e941f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -23,6 +23,7 @@
*/
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
@@ -43,6 +44,7 @@ class SessionState;
*/
class SessionHandler : public framing::FrameHandler::InOutHandler,
public framing::AMQP_ServerOperations::SessionHandler,
+ public framing::AMQP_ClientOperations::SessionHandler,
private boost::noncopyable
{
public:
@@ -81,12 +83,17 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
const framing::SequenceNumberSet& seenFrameSet);
void highWaterMark(uint32_t lastSentMark);
void solicitAck();
+
+ //extra methods required for assuming client role
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void detached();
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
+
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 2eee8b4abd..1e87b76e04 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -38,6 +38,7 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
+ virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 485f8c20f4..650bb31a68 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -30,6 +30,7 @@
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQDataBlock.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -53,6 +54,7 @@ class AsynchIOAcceptor : public Acceptor {
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
+ void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
void shutdown();
uint16_t getPort() const;
@@ -92,13 +94,17 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
bool initiated;
bool readError;
std::string identifier;
+ bool isClient;
+
+ void write(const framing::AMQDataBlock&);
public:
AsynchIOHandler() :
inputHandler(0),
frameQueueClosed(false),
initiated(false),
- readError(false)
+ readError(false),
+ isClient(false)
{}
~AsynchIOHandler() {
@@ -107,6 +113,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
delete inputHandler;
}
+ void setClient() { isClient = true; }
+
void init(AsynchIO* a, ConnectionInputHandler* h) {
aio = a;
inputHandler = h;
@@ -179,11 +187,48 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
t[i].join();
}
}
+
+void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+{
+ Socket* socket = new Socket();//Should be deleted by handle when socket closes
+ socket->connect(host, port);
+ AsynchIOHandler* async = new AsynchIOHandler;
+ async->setClient();
+ ConnectionInputHandler* handler = f->create(async, *socket);
+ AsynchIO* aio = new AsynchIO(*socket,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+
+}
+
void AsynchIOAcceptor::shutdown() {
poller->shutdown();
}
+
+void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+{
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
// Output side
void AsynchIOHandler::send(framing::AMQFrame& frame) {
// TODO: Need to find out if we are in the callback context,
@@ -274,6 +319,12 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
+ if (isClient && !initiated) {
+ //get & write protocol header from upper layers
+ write(inputHandler->getInitiation());
+ initiated = true;
+ return;
+ }
ScopedLock<Mutex> l(frameQueueLock);
if (frameQueue.empty()) {
diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h
index 226096c5ef..1936b5ec50 100644
--- a/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -36,6 +36,7 @@ namespace sys {
public TimeoutHandler, public OutputTask
{
public:
+ virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
virtual void closed() = 0;
};