diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-31 00:43:11 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-31 00:43:11 +0000 |
commit | 7dad525a5c71fe346a275646945793d36adf4c9a (patch) | |
tree | 80380e4d0286d93dca96c4b01c7704f091e07aaa /qpid/cpp | |
parent | 9fd2a66990309c75ffcc461fc010f0cf32505b71 (diff) | |
download | qpid-python-7dad525a5c71fe346a275646945793d36adf4c9a.tar.gz |
- QPID-667
- for Ted Ross
- built and tested.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@590523 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/ManagementAgent.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ManagementAgent.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ManagementExchange.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ManagementObject.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 62 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 5 |
8 files changed, 84 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/broker/ManagementAgent.cpp b/qpid/cpp/src/qpid/broker/ManagementAgent.cpp index 71b027c7df..2f6a0597f0 100644 --- a/qpid/cpp/src/qpid/broker/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/broker/ManagementAgent.cpp @@ -25,6 +25,7 @@ #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> #include <qpid/framing/AMQFrame.h> +#include <list> using namespace qpid::framing; using namespace qpid::broker; @@ -57,7 +58,7 @@ void ManagementAgent::Periodic::fire () void ManagementAgent::clientAdded (void) { - for (ManagementObjectList::iterator iter = managementObjects.begin (); + for (ManagementObjectVector::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { @@ -74,6 +75,7 @@ void ManagementAgent::PeriodicProcessing (void) char msgChars[BUFSIZE]; Buffer msgBuffer (msgChars, BUFSIZE); uint32_t contentSize; + std::list<uint32_t> deleteList; if (managementObjects.empty ()) return; @@ -86,11 +88,9 @@ void ManagementAgent::PeriodicProcessing (void) msgBuffer.putOctet ('0'); msgBuffer.putOctet ('1'); - for (ManagementObjectList::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) + for (uint32_t idx = 0; idx < managementObjects.size (); idx++) { - ManagementObject::shared_ptr object = *iter; + ManagementObject::shared_ptr object = managementObjects[idx]; if (object->getSchemaNeeded ()) { @@ -147,10 +147,7 @@ void ManagementAgent::PeriodicProcessing (void) } if (object->isDeleted ()) - { - managementObjects.remove (object); - QPID_LOG (debug, "Management Object Removed"); - } + deleteList.push_back (idx); // Temporary protection against buffer overrun. // This needs to be replaced with frame fragmentation. @@ -189,5 +186,20 @@ void ManagementAgent::PeriodicProcessing (void) DeliverableMessage deliverable (msg); exchange->route (deliverable, "mgmt", 0); + + // Delete flagged objects + for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + { + managementObjects.erase (managementObjects.begin () + *iter); + } + deleteList.clear (); +} + +void ManagementAgent::dispatchCommand (Deliverable& /*msg*/, + const string& /*routingKey*/, + const FieldTable* /*args*/) +{ } diff --git a/qpid/cpp/src/qpid/broker/ManagementAgent.h b/qpid/cpp/src/qpid/broker/ManagementAgent.h index 1e332023a6..e1d62270db 100644 --- a/qpid/cpp/src/qpid/broker/ManagementAgent.h +++ b/qpid/cpp/src/qpid/broker/ManagementAgent.h @@ -40,9 +40,12 @@ class ManagementAgent ManagementAgent (uint16_t interval); - void setExchange (Exchange::shared_ptr exchange); - void addObject (ManagementObject::shared_ptr object); - void clientAdded (void); + void setExchange (Exchange::shared_ptr exchange); + void addObject (ManagementObject::shared_ptr object); + void clientAdded (void); + void dispatchCommand (Deliverable& msg, + const string& routingKey, + const FieldTable* args); private: @@ -55,10 +58,10 @@ class ManagementAgent void fire (); }; - ManagementObjectList managementObjects; - Timer timer; - Exchange::shared_ptr exchange; - uint16_t interval; + ManagementObjectVector managementObjects; + Timer timer; + Exchange::shared_ptr exchange; + uint16_t interval; void PeriodicProcessing (void); }; diff --git a/qpid/cpp/src/qpid/broker/ManagementExchange.cpp b/qpid/cpp/src/qpid/broker/ManagementExchange.cpp index d3de8bc8e1..5d829477ba 100644 --- a/qpid/cpp/src/qpid/broker/ManagementExchange.cpp +++ b/qpid/cpp/src/qpid/broker/ManagementExchange.cpp @@ -57,8 +57,7 @@ void ManagementExchange::route (Deliverable& msg, if (routingKey.length () > 7 && routingKey.substr (0, 7).compare ("method.") == 0) { - QPID_LOG (debug, "ManagementExchange: Intercept command " << routingKey); - // TODO: Send intercepted commands to ManagementAgent for dispatch + managementAgent->dispatchCommand (msg, routingKey, args); return; } diff --git a/qpid/cpp/src/qpid/broker/ManagementObject.h b/qpid/cpp/src/qpid/broker/ManagementObject.h index 243d853727..237f2f3d79 100644 --- a/qpid/cpp/src/qpid/broker/ManagementObject.h +++ b/qpid/cpp/src/qpid/broker/ManagementObject.h @@ -25,7 +25,7 @@ #include "qpid/sys/Time.h" #include <qpid/framing/Buffer.h> #include <boost/shared_ptr.hpp> -#include <list> +#include <vector> namespace qpid { namespace broker { @@ -108,7 +108,7 @@ class ManagementObject }; - typedef std::list<ManagementObject::shared_ptr> ManagementObjectList; + typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 18c1ab1056..d668ee0505 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -71,18 +71,18 @@ void Queue::deliver(Message::shared_ptr& msg){ } else { - // if no store then mark as enqueued + // if no store then mark as enqueued if (!enqueue(0, msg)){ push(msg); - msg->enqueueComplete(); - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize ()); - }else { - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + msg->enqueueComplete(); + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize ()); + }else { + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); push(msg); - } - QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); + } + QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); serializer.execute(dispatchCallback); } } @@ -91,8 +91,8 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -108,20 +108,19 @@ void Queue::process(Message::shared_ptr& msg){ mask |= MSG_MASK_PERSIST; push(msg); - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize (), mask); + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize (), mask); serializer.execute(dispatchCallback); } void Queue::requeue(const QueuedMessage& msg){ { - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); } serializer.execute(dispatchCallback); - } bool Queue::acquire(const QueuedMessage& msg) { @@ -221,7 +220,7 @@ void Queue::dispatch() QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]"); } else { break; - } + } } serviceAllBrowsers(); } @@ -290,8 +289,8 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){ browsers.push_back(c); } - if (mgmtObjectPtr != 0){ - mgmtObjectPtr->incConsumers (); + if (mgmtObject != 0){ + mgmtObject->incConsumers (); } } @@ -302,8 +301,8 @@ void Queue::cancel(Consumer::ptr c){ } else { cancel(c, browsers); } - if (mgmtObjectPtr != 0){ - mgmtObjectPtr->decConsumers (); + if (mgmtObject != 0){ + mgmtObject->decConsumers (); } if(exclusive == c) exclusive.reset(); } @@ -318,17 +317,18 @@ void Queue::cancel(Consumer::ptr c, Consumers& consumers) QueuedMessage Queue::dequeue(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg; + if(!messages.empty()){ msg = messages.front(); pop(); - if (mgmtObjectPtr != 0){ - uint32_t mask = 0; + if (mgmtObject != 0){ + uint32_t mask = 0; - if (msg.payload->isPersistent ()) - mask |= MSG_MASK_PERSIST; + if (msg.payload->isPersistent ()) + mask |= MSG_MASK_PERSIST; - mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask); - } + mgmtObject->dequeue (msg.payload->contentSize (), mask); + } } return msg; } @@ -343,7 +343,7 @@ uint32_t Queue::purge(){ void Queue::pop(){ Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); - messages.pop_front(); + messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ @@ -390,7 +390,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg) if (msg->isPersistent() && store) { msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue store->enqueue(ctxt, *msg.get(), *this); - return true; + return true; } return false; } @@ -401,7 +401,7 @@ bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg) if (msg->isPersistent() && store) { msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue store->dequeue(ctxt, *msg.get(), *this); - return true; + return true; } return false; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 5146024b6b..17f2d8ba91 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -95,7 +95,7 @@ namespace qpid { qpid::sys::Serializer<DispatchFunctor> serializer; DispatchFunctor dispatchCallback; framing::SequenceNumber sequence; - ManagementObjectQueue::shared_ptr mgmtObjectPtr; + ManagementObjectQueue::shared_ptr mgmtObject; void pop(); void push(Message::shared_ptr& msg); @@ -103,7 +103,7 @@ namespace qpid { void setPolicy(std::auto_ptr<QueuePolicy> policy); /** * only called by serilizer - */ + */ void dispatch(); void cancel(Consumer::ptr c, Consumers& set); void serviceAllBrowsers(); @@ -115,16 +115,16 @@ namespace qpid { bool exclude(Message::shared_ptr msg); protected: - /** - * Call back from store - */ - virtual void notifyDurableIOComplete(); + /** + * Call back from store + */ + virtual void notifyDurableIOComplete(); public: typedef boost::shared_ptr<Queue> shared_ptr; typedef std::vector<shared_ptr> vector; - + Queue(const string& name, bool autodelete = false, MessageStore* const store = 0, const ConnectionToken* const owner = 0); @@ -135,8 +135,8 @@ namespace qpid { void destroy(); void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); - void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObjectPtr = mgmt; } - ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObjectPtr; } + void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObject = mgmt; } + ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObject; } bool acquire(const QueuedMessage& msg); @@ -165,7 +165,7 @@ namespace qpid { * Request dispatch any queued messages providing there are * consumers for them. Only one thread can be dispatching * at any time, so this call schedules the despatch based on - * the serilizer policy. + * the serilizer policy. */ void requestDispatch(Consumer::ptr c = Consumer::ptr()); void flush(DispatchCompletion& callback); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 6c87e5ff98..bc572e4238 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -59,8 +59,7 @@ QueueRegistry::declare(const string& declareName, bool durable, } } -void QueueRegistry::destroy(const string& name){ - RWlock::ScopedWlock locker(lock); +void QueueRegistry::destroyLH (const string& name){ if (managementAgent){ ManagementObjectQueue::shared_ptr mgmtObject; QueueMap::iterator i = queues.find(name); @@ -74,6 +73,11 @@ void QueueRegistry::destroy(const string& name){ queues.erase(name); } +void QueueRegistry::destroy (const string& name){ + RWlock::ScopedWlock locker(lock); + destroyLH (name); +} + Queue::shared_ptr QueueRegistry::find(const string& name){ RWlock::ScopedRlock locker(lock); QueueMap::iterator i = queues.find(name); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 07669bb3a1..03b4778f7a 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -62,12 +62,13 @@ class QueueRegistry{ * subsequent calls to find or declare with the same name. * */ - void destroy(const string& name); + void destroyLH (const string& name); + void destroy (const string& name); template <class Test> bool destroyIf(const string& name, Test test) { qpid::sys::RWlock::ScopedWlock locker(lock); if (test()) { - queues.erase(name); + destroyLH (name); return true; } else { return false; |