diff options
author | Ted Ross <tross@apache.org> | 2008-06-02 16:01:51 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-06-02 16:01:51 +0000 |
commit | 40c8b6f844ce64fc4245e5f91e6b1eaea2fc9e94 (patch) | |
tree | 80438fbfb8739e9189fdad70d8271ae2ca8d26f4 /cpp/src/qpid | |
parent | e1c0b830b67e68be71e65ef18657e746ed6b971f (diff) | |
download | qpid-python-40c8b6f844ce64fc4245e5f91e6b1eaea2fc9e94.tar.gz |
QPID-1113 Management cleanup and performance enhancements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662470 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/XmlExchange.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AtomicCount.h | 52 |
18 files changed, 152 insertions, 82 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index f3e103dfaf..18b2c52dad 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -39,7 +39,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_src_is_queue, args.i_src_is_local, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes)), listener(l), name(Uuid(true).str()), persistenceId(0) { @@ -61,10 +61,10 @@ void Bridge::create(ConnectionState& c) session->attach(name, false); session->commandPoint(0,0); - if (args.i_src_is_local) { + if (args.i_srcIsLocal) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { - if (args.i_src_is_queue) { + if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); @@ -79,7 +79,7 @@ void Bridge::create(ConnectionState& c) queueSettings.setString("qpid.trace.exclude", args.i_excludes); } - bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues? + bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); @@ -150,8 +150,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); buffer.putShortString(args.i_key); - buffer.putOctet(args.i_src_is_queue ? 1 : 0); - buffer.putOctet(args.i_src_is_local ? 1 : 0); + buffer.putOctet(args.i_srcIsQueue ? 1 : 0); + buffer.putOctet(args.i_srcIsLocal ? 1 : 0); buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); } @@ -165,8 +165,8 @@ uint32_t Bridge::encodedSize() const + args.i_src.size() + 1 + args.i_dest.size() + 1 + args.i_key.size() + 1 - + 1 // src_is_queue - + 1 // src_is_local + + 1 // srcIsQueue + + 1 // srcIsLocal + args.i_tag.size() + 1 + args.i_excludes.size() + 1; } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ea3d3547f5..61319f3c09 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -55,34 +55,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtId(mgmtId_), links(broker_.getLinks()) { - Manageable* parent = broker.GetVhostObject (); + Manageable* parent = broker.GetVhostObject(); if (isLink) - links.notifyConnection (mgmtId, this); + links.notifyConnection(mgmtId, this); if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); - if (agent.get () != 0) - mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink)); - agent->addObject (mgmtObject); + if (agent.get() != 0) + mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink)); + agent->addObject(mgmtObject); } } -void Connection::requestIOProcessing (boost::function0<void> callback) +void Connection::requestIOProcessing(boost::function0<void> callback) { ioCallback = callback; out->activateOutput(); } -Connection::~Connection () +Connection::~Connection() { if (mgmtObject.get() != 0) mgmtObject->resourceDestroy(); if (isLink) - links.notifyClosed (mgmtId); + links.notifyClosed(mgmtId); } void Connection::received(framing::AMQFrame& frame){ @@ -98,21 +98,21 @@ void Connection::received(framing::AMQFrame& frame){ recordFromClient(frame); } -void Connection::recordFromServer (framing::AMQFrame& frame) +void Connection::recordFromServer(framing::AMQFrame& frame) { - if (mgmtObject.get () != 0) + if (mgmtObject.get() != 0) { - mgmtObject->inc_framesToClient (); - mgmtObject->inc_bytesToClient (frame.size ()); + mgmtObject->inc_framesToClient(); + mgmtObject->inc_bytesToClient(frame.size()); } } -void Connection::recordFromClient (framing::AMQFrame& frame) +void Connection::recordFromClient(framing::AMQFrame& frame) { - if (mgmtObject.get () != 0) + if (mgmtObject.get() != 0) { - mgmtObject->inc_framesFromClient (); - mgmtObject->inc_bytesFromClient (frame.size ()); + mgmtObject->inc_framesFromClient(); + mgmtObject->inc_bytesFromClient(frame.size()); } } @@ -129,6 +129,14 @@ string Connection::getAuthCredentials() if (!isLink) return string(); + if (mgmtObject.get() != 0) + { + if (links.getAuthMechanism(mgmtId) == "ANONYMOUS") + mgmtObject->set_authIdentity("anonymous"); + else + mgmtObject->set_authIdentity(links.getAuthIdentity(mgmtId)); + } + return links.getAuthCredentials(mgmtId); } @@ -138,6 +146,12 @@ void Connection::notifyConnectionForced(const string& text) links.notifyConnectionForced(mgmtId, text); } +void Connection::setUserId(const string& userId) +{ + ConnectionState::setUserId(userId); + mgmtObject->set_authIdentity(userId); +} + void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { @@ -177,7 +191,7 @@ bool Connection::doOutput() ioCallback = 0; if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); + close(403, "Closed by Management Request", 0, 0); else //then do other output as needed: return outputTasks.doOutput(); @@ -202,20 +216,20 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject::shared_ptr Connection::GetManagementObject (void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { return dynamic_pointer_cast<ManagementObject>(mgmtObject); } -Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) +Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]"); + QPID_LOG(debug, "Connection::ManagementMethod [id=" << methodId << "]"); switch (methodId) { - case management::Client::METHOD_CLOSE : + case management::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject.get()) mgmtObject->set_closing(1); out->activateOutput(); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index e6e3d4d15e..9e713140dd 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -42,7 +42,7 @@ #include "ConnectionState.h" #include "SessionHandler.h" #include "qpid/management/Manageable.h" -#include "qpid/management/Client.h" +#include "qpid/management/Connection.h" #include <boost/ptr_container/ptr_map.hpp> @@ -88,6 +88,7 @@ class Connection : public sys::ConnectionInputHandler, std::string getAuthMechanism(); std::string getAuthCredentials(); void notifyConnectionForced(const std::string& text); + void setUserId(const string& uid); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; @@ -100,7 +101,7 @@ class Connection : public sys::ConnectionInputHandler, bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; - management::Client::shared_ptr mgmtObject; + management::Connection::shared_ptr mgmtObject; LinkRegistry& links; }; diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 691d47d866..698f8123e8 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -56,7 +56,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setStagingThreshold(uint64_t st) { stagingThreshold = st; } - void setUserId(const string& uid) { userId = uid; } + virtual void setUserId(const string& uid) { userId = uid; } const string& getUserId() const { return userId; } Broker& getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 72021b8d98..84a5362766 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -54,8 +54,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingKey].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else{ @@ -78,8 +78,8 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c bindings.erase(routingKey); } if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index df723d2c8f..3483562292 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -53,8 +53,8 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, Binding::shared_ptr binding (new Binding ("", queue, this)); bindings.push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -73,8 +73,8 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* if (i != bindings.end()) { bindings.erase(i); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 5196099ed5..20d9617c8f 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -90,8 +90,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co bindings.push_back(headerMap); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -115,8 +115,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, if (i != bindings.end()) { bindings.erase(i); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 6bcfcf77a3..08b9d8fe3e 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -341,8 +341,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args std::pair<Bridge::shared_ptr, bool> result = links->declare (host, port, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_src_is_queue, - iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes); + iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes); if (result.second && iargs.i_durable) store->create(*result.first); diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 455cc8452e..0703c276cf 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -87,8 +87,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, std::string& src, std::string& dest, std::string& key, - bool is_queue, - bool is_local, + bool isQueue, + bool isLocal, std::string& tag, std::string& excludes) { @@ -110,14 +110,14 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, management::ArgsLinkBridge args; Bridge::shared_ptr bridge; - args.i_durable = durable; - args.i_src = src; - args.i_dest = dest; - args.i_key = key; - args.i_src_is_queue = is_queue; - args.i_src_is_local = is_local; - args.i_tag = tag; - args.i_excludes = excludes; + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_srcIsQueue = isQueue; + args.i_srcIsLocal = isLocal; + args.i_tag = tag; + args.i_excludes = excludes; bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), @@ -237,4 +237,14 @@ std::string LinkRegistry::getAuthCredentials(const std::string& key) return result; } +std::string LinkRegistry::getAuthIdentity(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l == links.end()) + return string(); + + return l->second->getUsername(); +} + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index f902490ed3..242c0d58ba 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -81,8 +81,8 @@ namespace broker { std::string& src, std::string& dest, std::string& key, - bool is_queue, - bool is_local, + bool isQueue, + bool isLocal, std::string& id, std::string& excludes); @@ -113,6 +113,7 @@ namespace broker { void notifyConnectionForced (const std::string& key, const std::string& text); std::string getAuthMechanism (const std::string& key); std::string getAuthCredentials (const std::string& key); + std::string getAuthIdentity (const std::string& key); }; } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 2c9717caa0..0b26762697 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -130,19 +130,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); } }else { if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -157,13 +153,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); } if (store && !msg->isContentLoaded()) { @@ -176,13 +170,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -367,8 +359,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->inc_consumers (); + mgmtObject->inc_consumerCount (); } } @@ -378,8 +369,7 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->dec_consumers (); + mgmtObject->dec_consumerCount (); } } @@ -409,11 +399,9 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); - mgmtObject->dec_byteDepth (msg.payload->contentSize()); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); @@ -682,7 +670,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { if (inst) { ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj.get() != 0) - mgmtObject->set_storeRef(childObj->getObjectId()); + childObj->setReference(mgmtObject->getObjectId()); } } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dd8267a7d8..1ddb3f3026 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -111,7 +111,7 @@ void SessionState::attach(SessionHandler& h) { if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 1c4fa2ea7a..a16421b090 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -138,8 +138,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingPattern].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } @@ -159,8 +159,8 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co qv.erase(q); if(qv.empty()) bindings.erase(bi); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp index 1d8f2ae8d8..8c4d4f79a4 100644 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const QPID_LOG(trace, "Bound successfully with query: " << queryText ); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); + mgmtExchange->inc_bindingCount(); } return true; } else{ @@ -128,7 +128,7 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons bindingsMap.erase(routingKey); } if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); + mgmtExchange->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 0ddbd62350..a2802cf932 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -630,7 +630,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_sessionId (sessionId); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_sysId (systemId); + agent->mgmtObject->set_systemId (systemId); addObject (agent->mgmtObject); remoteAgents[sessionId] = agent; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 6af5412b99..68d7e5c886 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -37,3 +37,6 @@ void ManagementObject::writeTimestamps (Buffer& buf) buf.putLongLong (destroyTime); buf.putLongLong (objectId); } + +void ManagementObject::setReference(uint64_t) {} + diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 2661cf2d96..1d54d606a4 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -92,6 +92,7 @@ class ManagementObject virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; + virtual void setReference(uint64_t objectId); virtual std::string getClassName (void) = 0; virtual std::string getPackageName (void) = 0; diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h new file mode 100644 index 0000000000..d598b49427 --- /dev/null +++ b/cpp/src/qpid/sys/AtomicCount.h @@ -0,0 +1,52 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include "ScopedIncrement.h" + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount { + public: + typedef ::qpid::sys::ScopedDecrement<AtomicCount> ScopedDecrement; + typedef ::qpid::sys::ScopedIncrement<AtomicCount> ScopedIncrement; + + AtomicCount(long value = 0) : count(value) {} + + void operator++() { ++count ; } + + long operator--() { return --count; } + + operator long() const { return count; } + + private: + boost::detail::atomic_count count; +}; + + +}} + + +#endif // _posix_AtomicCount_h |