diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-11-13 19:30:07 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-11-13 19:30:07 +0000 |
commit | ab7955c6747f6377c2d36fe8686c3cec39d003b2 (patch) | |
tree | c0f381ecf9ff76aed702f43f05a40f7d6cbbdb31 /cpp/src | |
parent | 9877ca6cfbf36a34d46b42bc89a9f1fa0ef08b4b (diff) | |
download | qpid-python-ab7955c6747f6377c2d36fe8686c3cec39d003b2.tar.gz |
Add management subscription object
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 53 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 8 |
2 files changed, 57 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 7e3090bf17..3e23af99c0 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -58,6 +58,11 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; using qpid::ptr_map_ptr; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +namespace _qmf = qmf::org::apache::qpid::broker; SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) : session(ss), @@ -261,8 +266,38 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), - deliveryCount(0) -{} + deliveryCount(0), + mgmtObject(0) +{ + if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) + { + ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); + qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); + + if (agent != 0) + { + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name ,arguments, + acquire, ackExpected, syncFrequency, resumeId, resumeTtl, exclusive); + agent->addObject (mgmtObject, agent->allocateId(this)); + mgmtObject->set_mode("WINDOW"); + } + } +} + +ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const +{ + return (ManagementObject*) mgmtObject; +} + +Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + return status; +} + OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -283,6 +318,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (acquire && !ackExpected) { queue->dequeue(0, msg); } + if (mgmtObject) { mgmtObject->inc_delivered(); } return true; } @@ -299,6 +335,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // in future. // blocked = !(filter(msg) && checkCredit(msg)); + if (mgmtObject && !blocked && acquire) { mgmtObject->inc_accepted(); } return !blocked; } @@ -341,7 +378,11 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() {} +SemanticState::ConsumerImpl::~ConsumerImpl() +{ + if (mgmtObject != 0) + mgmtObject->resourceDestroy (); +} void SemanticState::cancel(ConsumerImpl::shared_ptr c) { @@ -524,11 +565,17 @@ void SemanticState::stop(const std::string& destination) void SemanticState::ConsumerImpl::setWindowMode() { windowing = true; + if (mgmtObject){ + mgmtObject->set_mode("WINDOW"); + } } void SemanticState::ConsumerImpl::setCreditMode() { windowing = false; + if (mgmtObject){ + mgmtObject->set_mode("CREDIT"); + } } void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 89fe7b83dd..99f793c1fc 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -38,6 +38,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicValue.h" #include "qpid/broker/AclModule.h" +#include "qmf/org/apache/qpid/broker/Subscription.h" #include <list> #include <map> @@ -58,7 +59,8 @@ class SessionContext; class SemanticState : private boost::noncopyable { public: class ConsumerImpl : public Consumer, public sys::OutputTask, - public boost::enable_shared_from_this<ConsumerImpl> + public boost::enable_shared_from_this<ConsumerImpl>, + public management::Manageable { mutable qpid::sys::Mutex lock; SemanticState* const parent; @@ -77,6 +79,7 @@ class SemanticState : private boost::noncopyable { bool notifyEnabled; const int syncFrequency; int deliveryCount; + qmf::org::apache::qpid::broker::Subscription* mgmtObject; bool checkCredit(boost::intrusive_ptr<Message>& msg); void allocateCredit(boost::intrusive_ptr<Message>& msg); @@ -130,6 +133,9 @@ class SemanticState : private boost::noncopyable { SemanticState& getParent() { return *parent; } const SemanticState& getParent() const { return *parent; } + // Manageable entry points + management::ManagementObject* GetManagementObject (void) const; + management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; private: |