summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-06-02 16:01:51 +0000
committerTed Ross <tross@apache.org>2008-06-02 16:01:51 +0000
commit40c8b6f844ce64fc4245e5f91e6b1eaea2fc9e94 (patch)
tree80438fbfb8739e9189fdad70d8271ae2ca8d26f4 /cpp/src/qpid
parente1c0b830b67e68be71e65ef18657e746ed6b971f (diff)
downloadqpid-python-40c8b6f844ce64fc4245e5f91e6b1eaea2fc9e94.tar.gz
QPID-1113 Management cleanup and performance enhancements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662470 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp16
-rw-r--r--cpp/src/qpid/broker/Connection.cpp58
-rw-r--r--cpp/src/qpid/broker/Connection.h5
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h2
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/Link.cpp4
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp30
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h5
-rw-r--r--cpp/src/qpid/broker/Queue.cpp18
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/XmlExchange.cpp4
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp2
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp3
-rw-r--r--cpp/src/qpid/management/ManagementObject.h1
-rw-r--r--cpp/src/qpid/sys/AtomicCount.h52
18 files changed, 152 insertions, 82 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index f3e103dfaf..18b2c52dad 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -39,7 +39,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const management::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args),
mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest,
- args.i_key, args.i_src_is_queue, args.i_src_is_local,
+ args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes)),
listener(l), name(Uuid(true).str()), persistenceId(0)
{
@@ -61,10 +61,10 @@ void Bridge::create(ConnectionState& c)
session->attach(name, false);
session->commandPoint(0,0);
- if (args.i_src_is_local) {
+ if (args.i_srcIsLocal) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
- if (args.i_src_is_queue) {
+ if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
@@ -79,7 +79,7 @@ void Bridge::create(ConnectionState& c)
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
}
- bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
+ bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
@@ -150,8 +150,8 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
buffer.putShortString(args.i_key);
- buffer.putOctet(args.i_src_is_queue ? 1 : 0);
- buffer.putOctet(args.i_src_is_local ? 1 : 0);
+ buffer.putOctet(args.i_srcIsQueue ? 1 : 0);
+ buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
}
@@ -165,8 +165,8 @@ uint32_t Bridge::encodedSize() const
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
+ args.i_key.size() + 1
- + 1 // src_is_queue
- + 1 // src_is_local
+ + 1 // srcIsQueue
+ + 1 // srcIsLocal
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1;
}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ea3d3547f5..61319f3c09 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -55,34 +55,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtId(mgmtId_),
links(broker_.getLinks())
{
- Manageable* parent = broker.GetVhostObject ();
+ Manageable* parent = broker.GetVhostObject();
if (isLink)
- links.notifyConnection (mgmtId, this);
+ links.notifyConnection(mgmtId, this);
if (parent != 0)
{
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
- if (agent.get () != 0)
- mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink));
- agent->addObject (mgmtObject);
+ if (agent.get() != 0)
+ mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink));
+ agent->addObject(mgmtObject);
}
}
-void Connection::requestIOProcessing (boost::function0<void> callback)
+void Connection::requestIOProcessing(boost::function0<void> callback)
{
ioCallback = callback;
out->activateOutput();
}
-Connection::~Connection ()
+Connection::~Connection()
{
if (mgmtObject.get() != 0)
mgmtObject->resourceDestroy();
if (isLink)
- links.notifyClosed (mgmtId);
+ links.notifyClosed(mgmtId);
}
void Connection::received(framing::AMQFrame& frame){
@@ -98,21 +98,21 @@ void Connection::received(framing::AMQFrame& frame){
recordFromClient(frame);
}
-void Connection::recordFromServer (framing::AMQFrame& frame)
+void Connection::recordFromServer(framing::AMQFrame& frame)
{
- if (mgmtObject.get () != 0)
+ if (mgmtObject.get() != 0)
{
- mgmtObject->inc_framesToClient ();
- mgmtObject->inc_bytesToClient (frame.size ());
+ mgmtObject->inc_framesToClient();
+ mgmtObject->inc_bytesToClient(frame.size());
}
}
-void Connection::recordFromClient (framing::AMQFrame& frame)
+void Connection::recordFromClient(framing::AMQFrame& frame)
{
- if (mgmtObject.get () != 0)
+ if (mgmtObject.get() != 0)
{
- mgmtObject->inc_framesFromClient ();
- mgmtObject->inc_bytesFromClient (frame.size ());
+ mgmtObject->inc_framesFromClient();
+ mgmtObject->inc_bytesFromClient(frame.size());
}
}
@@ -129,6 +129,14 @@ string Connection::getAuthCredentials()
if (!isLink)
return string();
+ if (mgmtObject.get() != 0)
+ {
+ if (links.getAuthMechanism(mgmtId) == "ANONYMOUS")
+ mgmtObject->set_authIdentity("anonymous");
+ else
+ mgmtObject->set_authIdentity(links.getAuthIdentity(mgmtId));
+ }
+
return links.getAuthCredentials(mgmtId);
}
@@ -138,6 +146,12 @@ void Connection::notifyConnectionForced(const string& text)
links.notifyConnectionForced(mgmtId, text);
}
+void Connection::setUserId(const string& userId)
+{
+ ConnectionState::setUserId(userId);
+ mgmtObject->set_authIdentity(userId);
+}
+
void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
@@ -177,7 +191,7 @@ bool Connection::doOutput()
ioCallback = 0;
if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
+ close(403, "Closed by Management Request", 0, 0);
else
//then do other output as needed:
return outputTasks.doOutput();
@@ -202,20 +216,20 @@ SessionHandler& Connection::getChannel(ChannelId id) {
return *ptr_map_ptr(i);
}
-ManagementObject::shared_ptr Connection::GetManagementObject (void) const
+ManagementObject::shared_ptr Connection::GetManagementObject(void) const
{
return dynamic_pointer_cast<ManagementObject>(mgmtObject);
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
+Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]");
+ QPID_LOG(debug, "Connection::ManagementMethod [id=" << methodId << "]");
switch (methodId)
{
- case management::Client::METHOD_CLOSE :
+ case management::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject.get()) mgmtObject->set_closing(1);
out->activateOutput();
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index e6e3d4d15e..9e713140dd 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -42,7 +42,7 @@
#include "ConnectionState.h"
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/Client.h"
+#include "qpid/management/Connection.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -88,6 +88,7 @@ class Connection : public sys::ConnectionInputHandler,
std::string getAuthMechanism();
std::string getAuthCredentials();
void notifyConnectionForced(const std::string& text);
+ void setUserId(const string& uid);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -100,7 +101,7 @@ class Connection : public sys::ConnectionInputHandler,
bool mgmtClosing;
const std::string mgmtId;
boost::function0<void> ioCallback;
- management::Client::shared_ptr mgmtObject;
+ management::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
};
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 691d47d866..698f8123e8 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -56,7 +56,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
- void setUserId(const string& uid) { userId = uid; }
+ virtual void setUserId(const string& uid) { userId = uid; }
const string& getUserId() const { return userId; }
Broker& getBroker() { return broker; }
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 72021b8d98..84a5362766 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -54,8 +54,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
Binding::shared_ptr binding (new Binding (routingKey, queue, this));
bindings[routingKey].push_back(binding);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
} else{
@@ -78,8 +78,8 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
bindings.erase(routingKey);
}
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index df723d2c8f..3483562292 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -53,8 +53,8 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
Binding::shared_ptr binding (new Binding ("", queue, this));
bindings.push_back(binding);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
} else {
@@ -73,8 +73,8 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
if (i != bindings.end()) {
bindings.erase(i);
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 5196099ed5..20d9617c8f 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -90,8 +90,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
bindings.push_back(headerMap);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
} else {
@@ -115,8 +115,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
if (i != bindings.end()) {
bindings.erase(i);
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 6bcfcf77a3..08b9d8fe3e 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -341,8 +341,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_src_is_queue,
- iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes);
+ iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes);
if (result.second && iargs.i_durable)
store->create(*result.first);
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 455cc8452e..0703c276cf 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -87,8 +87,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
std::string& src,
std::string& dest,
std::string& key,
- bool is_queue,
- bool is_local,
+ bool isQueue,
+ bool isLocal,
std::string& tag,
std::string& excludes)
{
@@ -110,14 +110,14 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
management::ArgsLinkBridge args;
Bridge::shared_ptr bridge;
- args.i_durable = durable;
- args.i_src = src;
- args.i_dest = dest;
- args.i_key = key;
- args.i_src_is_queue = is_queue;
- args.i_src_is_local = is_local;
- args.i_tag = tag;
- args.i_excludes = excludes;
+ args.i_durable = durable;
+ args.i_src = src;
+ args.i_dest = dest;
+ args.i_key = key;
+ args.i_srcIsQueue = isQueue;
+ args.i_srcIsLocal = isLocal;
+ args.i_tag = tag;
+ args.i_excludes = excludes;
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
@@ -237,4 +237,14 @@ std::string LinkRegistry::getAuthCredentials(const std::string& key)
return result;
}
+std::string LinkRegistry::getAuthIdentity(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l == links.end())
+ return string();
+
+ return l->second->getUsername();
+}
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index f902490ed3..242c0d58ba 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -81,8 +81,8 @@ namespace broker {
std::string& src,
std::string& dest,
std::string& key,
- bool is_queue,
- bool is_local,
+ bool isQueue,
+ bool isLocal,
std::string& id,
std::string& excludes);
@@ -113,6 +113,7 @@ namespace broker {
void notifyConnectionForced (const std::string& key, const std::string& text);
std::string getAuthMechanism (const std::string& key);
std::string getAuthCredentials (const std::string& key);
+ std::string getAuthIdentity (const std::string& key);
};
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 2c9717caa0..0b26762697 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -130,19 +130,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete();
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
}
}else {
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
@@ -157,13 +153,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
}
if (store && !msg->isContentLoaded()) {
@@ -176,13 +170,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
@@ -367,8 +359,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){
consumerCount++;
if (mgmtObject.get() != 0){
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
- mgmtObject->inc_consumers ();
+ mgmtObject->inc_consumerCount ();
}
}
@@ -378,8 +369,7 @@ void Queue::cancel(Consumer& c){
consumerCount--;
if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
- mgmtObject->dec_consumers ();
+ mgmtObject->dec_consumerCount ();
}
}
@@ -409,11 +399,9 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
mgmtObject->dec_msgDepth ();
- mgmtObject->dec_byteDepth (msg.payload->contentSize());
if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
@@ -682,7 +670,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
if (inst) {
ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj.get() != 0)
- mgmtObject->set_storeRef(childObj->getObjectId());
+ childObj->setReference(mgmtObject->getObjectId());
}
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dd8267a7d8..1ddb3f3026 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -111,7 +111,7 @@ void SessionState::attach(SessionHandler& h) {
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 1c4fa2ea7a..a16421b090 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -138,8 +138,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
Binding::shared_ptr binding (new Binding (routingKey, queue, this));
bindings[routingPattern].push_back(binding);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
}
@@ -159,8 +159,8 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
}
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp
index 1d8f2ae8d8..8c4d4f79a4 100644
--- a/cpp/src/qpid/broker/XmlExchange.cpp
+++ b/cpp/src/qpid/broker/XmlExchange.cpp
@@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
QPID_LOG(trace, "Bound successfully with query: " << queryText );
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
+ mgmtExchange->inc_bindingCount();
}
return true;
} else{
@@ -128,7 +128,7 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
bindingsMap.erase(routingKey);
}
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
+ mgmtExchange->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 0ddbd62350..a2802cf932 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -630,7 +630,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
agent->mgmtObject->set_sessionId (sessionId);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_sysId (systemId);
+ agent->mgmtObject->set_systemId (systemId);
addObject (agent->mgmtObject);
remoteAgents[sessionId] = agent;
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index 6af5412b99..68d7e5c886 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -37,3 +37,6 @@ void ManagementObject::writeTimestamps (Buffer& buf)
buf.putLongLong (destroyTime);
buf.putLongLong (objectId);
}
+
+void ManagementObject::setReference(uint64_t) {}
+
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 2661cf2d96..1d54d606a4 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -92,6 +92,7 @@ class ManagementObject
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
+ virtual void setReference(uint64_t objectId);
virtual std::string getClassName (void) = 0;
virtual std::string getPackageName (void) = 0;
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h
new file mode 100644
index 0000000000..d598b49427
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicCount.h
@@ -0,0 +1,52 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include "ScopedIncrement.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount {
+ public:
+ typedef ::qpid::sys::ScopedDecrement<AtomicCount> ScopedDecrement;
+ typedef ::qpid::sys::ScopedIncrement<AtomicCount> ScopedIncrement;
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h