summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-11-13 19:30:07 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-11-13 19:30:07 +0000
commitab7955c6747f6377c2d36fe8686c3cec39d003b2 (patch)
treec0f381ecf9ff76aed702f43f05a40f7d6cbbdb31 /cpp/src
parent9877ca6cfbf36a34d46b42bc89a9f1fa0ef08b4b (diff)
downloadqpid-python-ab7955c6747f6377c2d36fe8686c3cec39d003b2.tar.gz
Add management subscription object
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp53
-rw-r--r--cpp/src/qpid/broker/SemanticState.h8
2 files changed, 57 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 7e3090bf17..3e23af99c0 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -58,6 +58,11 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::ptr_map_ptr;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+namespace _qmf = qmf::org::apache::qpid::broker;
SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
: session(ss),
@@ -261,8 +266,38 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
- deliveryCount(0)
-{}
+ deliveryCount(0),
+ mgmtObject(0)
+{
+ if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
+ {
+ ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
+ qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
+
+ if (agent != 0)
+ {
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name ,arguments,
+ acquire, ackExpected, syncFrequency, resumeId, resumeTtl, exclusive);
+ agent->addObject (mgmtObject, agent->allocateId(this));
+ mgmtObject->set_mode("WINDOW");
+ }
+ }
+}
+
+ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const
+{
+ return (ManagementObject*) mgmtObject;
+}
+
+Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ return status;
+}
+
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -283,6 +318,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
}
+ if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
}
@@ -299,6 +335,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
// in future.
//
blocked = !(filter(msg) && checkCredit(msg));
+ if (mgmtObject && !blocked && acquire) { mgmtObject->inc_accepted(); }
return !blocked;
}
@@ -341,7 +378,11 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
return enoughCredit;
}
-SemanticState::ConsumerImpl::~ConsumerImpl() {}
+SemanticState::ConsumerImpl::~ConsumerImpl()
+{
+ if (mgmtObject != 0)
+ mgmtObject->resourceDestroy ();
+}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
@@ -524,11 +565,17 @@ void SemanticState::stop(const std::string& destination)
void SemanticState::ConsumerImpl::setWindowMode()
{
windowing = true;
+ if (mgmtObject){
+ mgmtObject->set_mode("WINDOW");
+ }
}
void SemanticState::ConsumerImpl::setCreditMode()
{
windowing = false;
+ if (mgmtObject){
+ mgmtObject->set_mode("CREDIT");
+ }
}
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 89fe7b83dd..99f793c1fc 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -38,6 +38,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/broker/AclModule.h"
+#include "qmf/org/apache/qpid/broker/Subscription.h"
#include <list>
#include <map>
@@ -58,7 +59,8 @@ class SessionContext;
class SemanticState : private boost::noncopyable {
public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
- public boost::enable_shared_from_this<ConsumerImpl>
+ public boost::enable_shared_from_this<ConsumerImpl>,
+ public management::Manageable
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
@@ -77,6 +79,7 @@ class SemanticState : private boost::noncopyable {
bool notifyEnabled;
const int syncFrequency;
int deliveryCount;
+ qmf::org::apache::qpid::broker::Subscription* mgmtObject;
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
@@ -130,6 +133,9 @@ class SemanticState : private boost::noncopyable {
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
+ // Manageable entry points
+ management::ManagementObject* GetManagementObject (void) const;
+ management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
private: