summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-01-22 15:57:21 +0000
committerGordon Sim <gsim@apache.org>2014-01-22 15:57:21 +0000
commitd3c27b1a0e29fae81b7863820a77079c669b896c (patch)
tree186e1b0d7e2525b881d873dad2915c087499603a
parent4510276f7cdc7d574b29563b57d8576de9f63e88 (diff)
downloadqpid-python-d3c27b1a0e29fae81b7863820a77079c669b896c.tar.gz
QPID-5501: implement missing qmf operations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560393 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp/descriptors.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp39
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h4
9 files changed, 122 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h
index 857231ddda..6c7aadd13e 100644
--- a/qpid/cpp/src/qpid/amqp/descriptors.h
+++ b/qpid/cpp/src/qpid/amqp/descriptors.h
@@ -109,6 +109,7 @@ const std::string NOT_FOUND("amqp:not-found");
const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
const std::string DECODE_ERROR("amqp:decode-error");
const std::string NOT_ALLOWED("amqp:not-allowed");
+const std::string NOT_IMPLEMENTED("amqp:not-implemented");
const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
const std::string RESOURCE_DELETED("amqp:resource-deleted");
const std::string PRECONDITION_FAILED("amqp:precondition-failed");
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 514add7e44..c95a2d3537 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -70,7 +70,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
: BrokerContext(b), ManagedConnection(getBroker(), i),
connection(pn_connection()),
transport(pn_transport()),
- out(o), id(i), haveOutput(true), closeInitiated(false)
+ out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
{
if (pn_transport_bind(transport, connection)) {
//error
@@ -169,9 +169,22 @@ size_t Connection::encode(char* buffer, size_t size)
bool Connection::canEncode()
{
if (!closeInitiated) {
+ if (closeRequested) {
+ close();
+ return true;
+ }
try {
- for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
- if (i->second->dispatch()) haveOutput = true;
+ for (Sessions::iterator i = sessions.begin();i != sessions.end();) {
+ if (i->second->endedByManagement()) {
+ pn_session_close(i->first);
+ i->second->close();
+ sessions.erase(i++);
+ haveOutput = true;
+ QPID_LOG_CAT(debug, model, id << " session ended by management");
+ } else {
+ if (i->second->dispatch()) haveOutput = true;
+ ++i;
+ }
}
process();
} catch (const Exception& e) {
@@ -372,4 +385,10 @@ void Connection::setUserId(const std::string& user)
throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
}
}
+
+void Connection::closedByManagement()
+{
+ closeRequested = true;
+ out.activateOutput();
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 0a63045350..961c21012e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -69,12 +69,14 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
bool haveOutput;
Sessions sessions;
bool closeInitiated;
+ bool closeRequested;
virtual void process();
std::string getError();
void close();
void open();
void readPeerProperties();
+ void closedByManagement();
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
index de46165e1a..6a2f8c5698 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -19,6 +19,8 @@
*
*/
#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/amqp/Exception.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
@@ -176,4 +178,34 @@ void ManagedConnection::incomingMessageReceived()
if (connection) connection->inc_msgsFromClient();
}
+void ManagedConnection::closedByManagement()
+{
+ throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Connection close requested, but not implemented"));
+}
+
+qpid::management::Manageable::status_t ManagedConnection::ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string& error)
+{
+ qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD;
+
+ try {
+ switch (methodId)
+ {
+ case _qmf::Connection::METHOD_CLOSE :
+ closedByManagement();
+ if (connection) connection->set_closing(true);
+ status = qpid::management::Manageable::STATUS_OK;
+ break;
+ }
+ } catch (const Exception& e) {
+ if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) {
+ status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED;
+ } else {
+ error = e.what();
+ status = qpid::management::Manageable::STATUS_EXCEPTION;
+ }
+ }
+
+ return status;
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
index a9f90cefcf..dc97b55862 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -61,6 +61,11 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT
const std::map<std::string, types::Variant>& getClientProperties() const;
virtual bool isLink() const;
void opened();
+
+ qpid::management::Manageable::status_t ManagementMethod(uint32_t methodId, qpid::management::Args&, std::string&);
+
+ protected:
+ virtual void closedByManagement();
private:
const std::string id;
std::string userid;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
index dc003ebde7..5917ddb28a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/broker/amqp/Exception.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
@@ -89,4 +91,41 @@ ManagedConnection& ManagedSession::getParent()
return parent;
}
+void ManagedSession::detachedByManagement()
+{
+ throw Exception(qpid::amqp::error_conditions::NOT_IMPLEMENTED, QPID_MSG(id << "Session detach requested, but not implemented"));
+}
+
+qpid::management::Manageable::status_t ManagedSession::ManagementMethod (uint32_t methodId,
+ qpid::management::Args& /*args*/,
+ std::string& error)
+{
+ qpid::management::Manageable::status_t status = qpid::management::Manageable::STATUS_UNKNOWN_METHOD;
+
+ try {
+ switch (methodId)
+ {
+ case _qmf::Session::METHOD_DETACH :
+ detachedByManagement();
+ status = qpid::management::Manageable::STATUS_OK;
+ break;
+
+ case _qmf::Session::METHOD_CLOSE :
+ case _qmf::Session::METHOD_SOLICITACK :
+ case _qmf::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+ } catch (const Exception& e) {
+ if (e.symbol() == qpid::amqp::error_conditions::NOT_IMPLEMENTED) {
+ status = qpid::management::Manageable::STATUS_NOT_IMPLEMENTED;
+ } else {
+ error = e.what();
+ status = qpid::management::Manageable::STATUS_EXCEPTION;
+ }
+ }
+
+ return status;
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
index 5264f3fd7e..0da048654c 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
@@ -48,6 +48,10 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke
void outgoingMessageAccepted();
void outgoingMessageRejected();
ManagedConnection& getParent();
+
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&);
+ protected:
+ virtual void detachedByManagement();
private:
ManagedConnection& parent;
const std::string id;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 1749725047..fb3c220d02 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -202,7 +202,8 @@ class IncomingToExchange : public DecodingIncoming
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
- authorise(connection.getUserId(), connection.getBroker().getAcl()) {}
+ authorise(connection.getUserId(), connection.getBroker().getAcl()),
+ detachRequested() {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -689,6 +690,17 @@ Authorise& Session::getAuthorise()
return authorise;
}
+bool Session::endedByManagement() const
+{
+ return detachRequested;
+}
+
+void Session::detachedByManagement()
+{
+ detachRequested = true;
+ wakeup();
+}
+
void IncomingToQueue::handle(qpid::broker::Message& message)
{
if (queue->isDeleted()) {
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 523e17948b..997ad5d87b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -66,6 +66,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
void readable(pn_link_t*, pn_delivery_t*);
void writable(pn_link_t*, pn_delivery_t*);
bool dispatch();
+ bool endedByManagement() const;
void close();
/**
@@ -79,6 +80,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
void wakeup();
Authorise& getAuthorise();
+ protected:
+ void detachedByManagement();
private:
typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
@@ -92,6 +95,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
qpid::sys::Mutex lock;
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
Authorise authorise;
+ bool detachRequested;
struct ResolvedNode
{