diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 66 |
1 files changed, 63 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index f2cd1c11e4..fbe018e8ae 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/management/ManagementAgent.h" #include <boost/bind.hpp> @@ -38,11 +39,15 @@ using namespace qpid::sys; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : + Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const Socket& s) : broker(broker_), outputTasks(*out_), out(out_), @@ -50,15 +55,45 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : heartbeat(0), client(0), stagingThreshold(broker.getStagingThreshold()), - adapter(*this) -{} + adapter(*this), + mgmtClosing(0) +{ + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Client::shared_ptr + (new management::Client (this, parent, s.getPeerAddress ())); + agent->addObject (mgmtObject); + } + } +} + +Connection::~Connection () +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} void Connection::received(framing::AMQFrame& frame){ + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + if (frame.getChannel() == 0) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); } + + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesFromClient (); + mgmtObject->inc_bytesFromClient (frame.size ()); + } } void Connection::close( @@ -122,5 +157,30 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *get_pointer(i); } +ManagementObject::shared_ptr Connection::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Connection::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Client::METHOD_CLOSE : + mgmtClosing = 1; + mgmtObject->set_closing (1); + status = Manageable::STATUS_OK; + break; + } + + return status; +} + + }} |