summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp130
1 files changed, 19 insertions, 111 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1994c4fdf5..d156b4a914 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -52,37 +52,14 @@ class Connection::MgmtClient : public Connection::MgmtWrapper
management::Client::shared_ptr mgmtClient;
public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming);
~MgmtClient();
void received(framing::AMQFrame& frame);
management::ManagementObject::shared_ptr getManagementObject() const;
void closing();
};
-class Connection::MgmtLink : public Connection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(Connection& connection, const management::Args& args);
-};
-
-
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
@@ -103,14 +80,21 @@ void Connection::initMgmt(bool asLink)
if (agent.get () != 0)
{
if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
} else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
}
}
}
}
+void Connection::requestIOProcessing (boost::function0<void> callback)
+{
+ ioCallback = callback;
+ out->activateOutput();
+}
+
+
Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
@@ -160,8 +144,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::doOutput()
{
try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (ioCallback)
+ ioCallback(); // Lend the IO thread for management processing
+ ioCallback = 0;
if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
@@ -192,8 +177,7 @@ ManagementObject::shared_ptr Connection::GetManagementObject (void) const
return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& args)
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -207,93 +191,17 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
out->activateOutput();
status = Manageable::STATUS_OK;
break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
}
return status;
}
-Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-Connection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void Connection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void Connection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void Connection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void Connection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
+ ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming)
{
mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
+ (new management::Client (conn, parent, mgmtId, incoming));
agent->addObject (mgmtClient);
}