diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/amqp/descriptors.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedSession.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.h | 4 |
9 files changed, 122 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h index 857231ddda..6c7aadd13e 100644 --- a/qpid/cpp/src/qpid/amqp/descriptors.h +++ b/qpid/cpp/src/qpid/amqp/descriptors.h @@ -109,6 +109,7 @@ const std::string NOT_FOUND("amqp:not-found"); const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access"); const std::string DECODE_ERROR("amqp:decode-error"); const std::string NOT_ALLOWED("amqp:not-allowed"); +const std::string NOT_IMPLEMENTED("amqp:not-implemented"); const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded"); const std::string RESOURCE_DELETED("amqp:resource-deleted"); const std::string PRECONDITION_FAILED("amqp:precondition-failed"); diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 514add7e44..c95a2d3537 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -70,7 +70,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker : BrokerContext(b), ManagedConnection(getBroker(), i), connection(pn_connection()), transport(pn_transport()), - out(o), id(i), haveOutput(true), closeInitiated(false) + out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) { if (pn_transport_bind(transport, connection)) { //error @@ -169,9 +169,22 @@ size_t Connection::encode(char* buffer, size_t size) bool Connection::canEncode() { if (!closeInitiated) { + if (closeRequested) { + close(); + return true; + } try { - for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { - if (i->second->dispatch()) haveOutput = true; + for (Sessions::iterator i = sessions.begin();i != sessions.end();) { + if (i->second->endedByManagement()) { + pn_session_close(i->first); + i->second->close(); + sessions.erase(i++); + haveOutput = true; + QPID_LOG_CAT(debug, model, id << " session ended by management"); + } else { + if (i->second->dispatch()) haveOutput = true; + ++i; + } } process(); } catch (const Exception& e) { @@ -372,4 +385,10 @@ void Connection::setUserId(const std::string& user) throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit"); } } + +void Connection::closedByManagement() +{ + closeRequested = true; + out.activateOutput(); +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 0a63045350..961c21012e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -69,12 +69,14 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man bool haveOutput; Sessions sessions; bool closeInitiated; + bool closeRequested; virtual void process(); std::string getError(); void close(); void open(); void readPeerProperties(); + void closedByManagement(); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index de46165e1a..6a2f8c5698 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -19,6 +19,8 @@ * */ #include "qpid/broker/amqp/ManagedConnection.h" +#include "qpid/broker/amqp/Exception.h" +#include "qpid/amqp/descriptors.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" @@ -176,4 +178,34 @@ void ManagedConnection::incomingMessageReceived() if (connection) connection->inc_msgsFromClient(); } +void ManagedConnection::closedByManagement() +{ + throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Connection close requested, but not implemented")); +} + +qpid::management::Manageable::status_t ManagedConnection::ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string& error) +{ + qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD; + + try { + switch (methodId) + { + case _qmf::Connection::METHOD_CLOSE : + closedByManagement(); + if (connection) connection->set_closing(true); + status = qpid::management::Manageable::STATUS_OK; + break; + } + } catch (const Exception& e) { + if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) { + status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED; + } else { + error = e.what(); + status = qpid::management::Manageable::STATUS_EXCEPTION; + } + } + + return status; +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index a9f90cefcf..dc97b55862 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -61,6 +61,11 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT const std::map<std::string, types::Variant>& getClientProperties() const; virtual bool isLink() const; void opened(); + + qpid::management::Manageable::status_t ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string&); + + protected: + virtual void closedByManagement(); private: const std::string id; std::string userid; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp index dc003ebde7..5917ddb28a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp @@ -20,6 +20,8 @@ */ #include "qpid/broker/amqp/ManagedSession.h" #include "qpid/broker/amqp/ManagedConnection.h" +#include "qpid/broker/amqp/Exception.h" +#include "qpid/amqp/descriptors.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" @@ -89,4 +91,41 @@ ManagedConnection& ManagedSession::getParent() return parent; } +void ManagedSession::detachedByManagement() +{ + throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Session detach requested, but not implemented")); +} + +qpid::management::Manageable::status_t ManagedSession::ManagementMethod (uint32_t methodId, + qpid::management::Args& /*args*/, + std::string& error) +{ + qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD; + + try { + switch (methodId) + { + case _qmf::Session::METHOD_DETACH : + detachedByManagement(); + status = qpid::management::Manageable::STATUS_OK; + break; + + case _qmf::Session::METHOD_CLOSE : + case _qmf::Session::METHOD_SOLICITACK : + case _qmf::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + } catch (const Exception& e) { + if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) { + status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED; + } else { + error = e.what(); + status = qpid::management::Manageable::STATUS_EXCEPTION; + } + } + + return status; +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h index 5264f3fd7e..0da048654c 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h @@ -48,6 +48,10 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke void outgoingMessageAccepted(); void outgoingMessageRejected(); ManagedConnection& getParent(); + + qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&); + protected: + virtual void detachedByManagement(); private: ManagedConnection& parent; const std::string id; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 1749725047..fb3c220d02 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -202,7 +202,8 @@ class IncomingToExchange : public DecodingIncoming Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), - authorise(connection.getUserId(), connection.getBroker().getAcl()) {} + authorise(connection.getUserId(), connection.getBroker().getAcl()), + detachRequested() {} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -689,6 +690,17 @@ Authorise& Session::getAuthorise() return authorise; } +bool Session::endedByManagement() const +{ + return detachRequested; +} + +void Session::detachedByManagement() +{ + detachRequested = true; + wakeup(); +} + void IncomingToQueue::handle(qpid::broker::Message& message) { if (queue->isDeleted()) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 523e17948b..997ad5d87b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -66,6 +66,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses void readable(pn_link_t*, pn_delivery_t*); void writable(pn_link_t*, pn_delivery_t*); bool dispatch(); + bool endedByManagement() const; void close(); /** @@ -79,6 +80,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses void wakeup(); Authorise& getAuthorise(); + protected: + void detachedByManagement(); private: typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks; typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks; @@ -92,6 +95,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses qpid::sys::Mutex lock; std::set< boost::shared_ptr<Queue> > exclusiveQueues; Authorise authorise; + bool detachRequested; struct ResolvedNode { |