diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-16 00:37:33 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-16 00:37:33 +0000 |
commit | 7762a152b3dabd859089abb388268101b2c72ea0 (patch) | |
tree | dd8eb0e638e04857b4d5fe56a6a70b3974c38935 | |
parent | 0ed156cc8320a0f6baa2ed255831147b5973c247 (diff) | |
download | qpid-python-7762a152b3dabd859089abb388268101b2c72ea0.tar.gz |
QPID-3346: checkpoint - added mgmt interfaces, fleshed out sub-optimal implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1158073 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 58 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageMap.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageMap.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Messages.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 438 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 20 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 81 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 10 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/management.py | 30 |
16 files changed, 559 insertions, 134 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index f80e0f1e61..a7a1370801 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,7 @@ #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" @@ -452,7 +453,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter)) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; @@ -482,6 +483,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_QUERY : + { + _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); + status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -654,6 +662,49 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } +Manageable::status_t Broker::queryObject(const std::string& type, + const std::string& name, + Variant::Map& results, + const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")"); + + if (type == TYPE_QUEUE) + return queryQueue( name, userId, connectionId, results ); + + if (type == TYPE_EXCHANGE || + type == TYPE_TOPIC || + type == TYPE_BINDING) + return Manageable::STATUS_NOT_IMPLEMENTED; + + throw UnknownObjectType(type); +} + +Manageable::status_t Broker::queryQueue( const std::string& name, + const std::string& userId, + const std::string& /*connectionId*/, + Variant::Map& results ) +{ + (void) results; + if (acl) { + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId)); + } + + boost::shared_ptr<Queue> q(queues.find(name)); + if (!q) { + QPID_LOG(error, "Query failed: queue not found, name=" << name); + return Manageable::STATUS_UNKNOWN_OBJECT; + } + return Manageable::STATUS_OK;; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); @@ -723,7 +774,8 @@ void Broker::connect( uint32_t Broker::queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty) + uint32_t qty, + const Variant::Map& filter) { Queue::shared_ptr src_queue = queues.find(srcQueue); if (!src_queue) @@ -732,7 +784,7 @@ uint32_t Broker::queueMoveMessages( if (!dest_queue) return 0; - return src_queue->move(dest_queue, qty); + return src_queue->move(dest_queue, qty, &filter); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 40f7b6273f..1612726693 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -157,7 +157,12 @@ public: const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context); void deleteObject(const std::string& type, const std::string& name, const qpid::types::Variant::Map& options, const ConnectionState* context); - + Manageable::status_t queryObject(const std::string& type, const std::string& name, + qpid::types::Variant::Map& results, const ConnectionState* context); + Manageable::status_t queryQueue( const std::string& name, + const std::string& userId, + const std::string& connectionId, + qpid::types::Variant::Map& results); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; std::auto_ptr<sys::Timer> clusterTimer; @@ -258,7 +263,8 @@ public: */ uint32_t queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty); + uint32_t qty, + const qpid::types::Variant::Map& filter); boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const; diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index a811a86492..bc009f14d1 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -44,6 +44,11 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m } } +bool LegacyLVQ::next(const QueuedMessage& message, QueuedMessage& next) +{ + return this->next(message.position, next); +} + bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message) { if (MessageMap::next(position, message)) { diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h index dd0fd7aaec..33d4117845 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -42,6 +42,7 @@ class LegacyLVQ : public MessageMap LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); bool remove(const framing::SequenceNumber&, QueuedMessage&); bool next(const framing::SequenceNumber&, QueuedMessage&); + bool next(const QueuedMessage&, QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void removeIf(Predicate); void setNoBrowse(bool); diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp index 24b8f6f895..b4fdfb3d6a 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -74,6 +74,11 @@ bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& return find(position, message, false); } +bool MessageDeque::next(const QueuedMessage& message, QueuedMessage& next) +{ + return this->next(message.position, next); +} + bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) { if (messages.empty()) { diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h index 0e1aef2986..0726e1dee6 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.h +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -41,6 +41,7 @@ class MessageDeque : public Messages bool remove(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool next(const framing::SequenceNumber&, QueuedMessage&); + bool next(const QueuedMessage&, QueuedMessage&); QueuedMessage& front(); void pop(); diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp index 39e23df533..504b2e033c 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.cpp +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -77,6 +77,11 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } +bool MessageMap::next(const QueuedMessage& message, QueuedMessage& next) +{ + return this->next(message.position, next); +} + bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message) { if (!messages.empty() && position < front().position) { diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h index 1128a1d54a..9cfd9ea9af 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.h +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -47,6 +47,7 @@ class MessageMap : public Messages virtual bool remove(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); virtual bool next(const framing::SequenceNumber&, QueuedMessage&); + virtual bool next(const QueuedMessage&, QueuedMessage&); QueuedMessage& front(); void pop(); diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h index c535fd1936..4b3a6d764f 100644 --- a/qpid/cpp/src/qpid/broker/Messages.h +++ b/qpid/cpp/src/qpid/broker/Messages.h @@ -78,6 +78,14 @@ class Messages virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; /** + * Return the next message based on the supplied queued message. + * The next messages is passed back via the second parameter. + * @todo replace with queue iterator + * @return true if there is another message, false otherwise. + */ + virtual bool next(const QueuedMessage&, QueuedMessage&) = 0; + + /** * Note: Caller is responsible for ensuring that there is a front * (e.g. empty() returns false) * diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index e07e73d323..3305bb4f79 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -88,6 +88,25 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& return find(position, message, false); } +bool PriorityQueue::next(const QueuedMessage& message, QueuedMessage& next) +{ + uint p = getPriorityLevel(message); + QueuedMessage match; + match.position = message.position+1; + Deque::iterator m = lower_bound(messages[p].begin(), messages[p].end(), match); + if (m != messages[p].end()) { + next = *m; + return true; + } + while (p-- > 0) { + if (!messages[p].empty()) { + next = messages[p].front(); + return true; + } + } + return false; +} + bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message) { QueuedMessage match; diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h index 4bf9d26a9d..7464040e17 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.h +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -46,6 +46,7 @@ class PriorityQueue : public Messages bool remove(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool next(const framing::SequenceNumber&, QueuedMessage&); + bool next(const QueuedMessage&, QueuedMessage&); QueuedMessage& front(); void pop(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 68dd2ae125..551584f9d9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -42,6 +42,7 @@ #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" @@ -111,8 +112,70 @@ class MessageAllocator */ virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm, const Mutex::ScopedLock&); + + /** hook to add any interesting management state to the status map (lock held) */ + virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {}; + + /** for move, purge, reroute - check if message matches against a filter, + * return true if message matches. + */ + virtual bool match(const qpid::types::Variant::Map* filter, + const QueuedMessage& message) const; }; + + +class MessageGroupManager : public QueueObserver, public MessageAllocator +{ + const std::string groupIdHeader; // msg header holding group identifier + const unsigned int timestamp; // mark messages with timestamp if set + + struct GroupState { + //const std::string group; // group identifier + //Consumer::shared_ptr owner; // consumer with outstanding acquired messages + std::string owner; // consumer with outstanding acquired messages + uint32_t acquired; // count of outstanding acquired messages + uint32_t total; // count of enqueued messages in this group + GroupState() : acquired(0), total(0) {} + }; + typedef std::map<std::string, struct GroupState> GroupMap; + typedef std::set<std::string> Consumers; + + GroupMap messageGroups; + Consumers consumers; + + static const std::string qpidMessageGroupKey; + static const std::string qpidMessageGroupTimestamp; + static const std::string qpidMessageGroupDefault; + static const std::string qpidMessageGroupFilter; // key for move/purge filter map + + const std::string getGroupId( const QueuedMessage& qm ) const; + + public: + + static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings ); + + MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 ) + : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {} + void enqueued( const QueuedMessage& qm ); + void acquired( const QueuedMessage& qm ); + void requeued( const QueuedMessage& qm ); + void dequeued( const QueuedMessage& qm ); + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); + bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock&); + bool canAcquire(const std::string& consumer, const QueuedMessage& msg, + const Mutex::ScopedLock&); + void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const; + bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; +}; + +const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ +const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id"); + }} Queue::Queue(const string& _name, bool _autodelete, @@ -140,7 +203,8 @@ Queue::Queue(const string& _name, bool _autodelete, deleted(false), barrier(*this), autoDeleteTimeout(0), - allocator(new MessageAllocator( this )) // KAG TODO: FIX!! + allocator(new MessageAllocator( this )), + type(FIFO) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -275,7 +339,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess } } -bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) +bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -535,6 +599,29 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) } } + +namespace { + // for use with purge/move below - collect messages that match a given filter + struct Collector { + const uint32_t maxMatches; + const qpid::types::Variant::Map *filter; + std::deque<QueuedMessage> matches; + boost::shared_ptr<MessageAllocator> allocator; + Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m, + const qpid::types::Variant::Map *f) + : maxMatches(m), filter(f), allocator(a) {} + void operator() (QueuedMessage& qm) + { + if (maxMatches == 0 || matches.size() < maxMatches) { + if (allocator->match( filter, qm )) { + matches.push_back(qm); + } + } + } + }; +} + + /** * purge - for purging all or some messages on a queue * depending on the purge_request @@ -546,52 +633,61 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, * even if the queue is ordered by priority. + * + * An optional filter can be supplied that will be applied against each message. The + * message is purged only if the filter matches. See MessageAllocator for more detail. */ -uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, + const qpid::types::Variant::Map *filter) { - Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 - std::deque<DeliverableMessage> rerouteQueue; + Collector c(allocator, purge_request, filter); - uint32_t count = 0; - // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages->empty()) { - if (dest.get()) { - // - // If there is a destination exchange, stage the messages onto a reroute queue - // so they don't wind up getting purged more than once. - // - DeliverableMessage msg(messages->front().payload); - rerouteQueue.push_back(msg); + Mutex::ScopedLock locker(messageLock); + messages->foreach( boost::bind<void>(boost::ref(c), _1) ); + + uint32_t count = c.matches.size(); + + // first remove all matches + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); qmsg++) { + /** @todo KAG: need a direct remove method here */ + bool ok = acquire(qmsg->position, *qmsg); + (void) ok; assert(ok); + dequeue(0, *qmsg); + } + + // now reroute if necessary + if (dest.get()) { + while (!c.matches.empty()) { + QueuedMessage msg = c.matches.front(); + c.matches.pop_front(); + assert(msg.payload); + DeliverableMessage dmsg(msg.payload); + dest->routeWithAlternate(dmsg); } - popAndDequeue(); - count++; - } - - // - // Re-route purged messages into the destination exchange. Note that there's no need - // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. - // - while (!rerouteQueue.empty()) { - DeliverableMessage msg(rerouteQueue.front()); - rerouteQueue.pop_front(); - dest->routeWithAlternate(msg); } - return count; } -uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter) +{ + Collector c(allocator, qty, filter); + Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 - uint32_t count = 0; // count how many were moved for returning + messages->foreach( boost::bind<void>(boost::ref(c), _1) ); + + uint32_t count = c.matches.size(); - while((!qty || move_count--) && !messages->empty()) { - QueuedMessage qmsg = messages->front(); - boost::intrusive_ptr<Message> msg = qmsg.payload; - destq->deliver(msg); // deliver message to the destination queue - popAndDequeue(); - count++; + while (!c.matches.empty()) { + QueuedMessage qmsg = c.matches.front(); + c.matches.pop_front(); + /** @todo KAG: need a direct remove method here */ + bool ok = acquire(qmsg.position, qmsg); + (void) ok; assert(ok); + dequeue(0, qmsg); + assert(qmsg.payload); + destq->deliver(qmsg.payload); } return count; } @@ -614,6 +710,7 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage { if (messages->remove(position, msg)) { acquired( msg ); + ++dequeueSincePurge; return true; } return false; @@ -912,17 +1009,29 @@ void Queue::configureImpl(const FieldTable& _settings) if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + type = LVQ; } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + type = LVQ; } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + type = LVQ; } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + type = PRIORITY; + } + } + + { // override default message allocator if message groups configured. + boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); + if (ma) { + allocator = ma; + type = GROUP; } } @@ -1181,7 +1290,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str case _qmf::Queue::METHOD_PURGE : { _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; - purge(purgeArgs.i_request); + purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1202,7 +1311,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str } } - purge(rerouteArgs.i_request, dest); + purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1211,6 +1320,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::query(qpid::types::Variant::Map& results) const +{ + Mutex::ScopedLock locker(messageLock); + /** @todo add any interesting queue state into results */ + if (allocator) allocator->query( results, messageLock ); +} + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; @@ -1357,145 +1474,258 @@ void Queue::UsageBarrier::destroy() // KAG TBD: flesh out... -class MessageGroupManager : public QueueObserver, public MessageAllocator -{ - const std::string groupIdHeader; // msg header holding group identifier - struct GroupState { - const std::string group; // group identifier - //Consumer::shared_ptr owner; // consumer with outstanding acquired messages - std::string owner; // consumer with outstanding acquired messages - uint32_t acquired; // count of outstanding acquired messages - uint32_t total; // count of enqueued messages in this group - GroupState() : acquired(0), total(0) {} - }; - std::map<std::string, struct GroupState> messageGroups; - std::set<std::string> consumers; - - public: - - MessageGroupManager(const std::string& header, Queue *q ) - : QueueObserver(), MessageAllocator(q), groupIdHeader( header ) {} - void enqueued( const QueuedMessage& qm ); - void removed( const QueuedMessage& qm ); - void requeued( const QueuedMessage& qm ); - void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ); - void consumerRemoved( const Consumer& ); - bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock&); - bool canAcquire(const std::string& consumer, const QueuedMessage& msg, - const Mutex::ScopedLock&); -}; -namespace { - const std::string NO_GROUP(""); - const std::string getGroupId( const QueuedMessage& qm, const std::string& key ) - { - const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return NO_GROUP; - return headers->getAsString(key); - } +const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +{ + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return qpidMessageGroupDefault; + FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault; + return id->get<std::string>(); } void MessageGroupManager::enqueued( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - messageGroups[group].total++; + std::string group( getGroupId(qm) ); + uint32_t total = ++messageGroups[group].total; + QPID_LOG( trace, "group queue " << queue->getName() << + ": added message to group id=" << group << " total=" << total ); } -void MessageGroupManager::removed( const QueuedMessage& qm ) +void MessageGroupManager::acquired( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); - gs->second.acquired += 1; + GroupState& state( gs->second ); + state.acquired += 1; + QPID_LOG( trace, "group queue " << queue->getName() << + ": acquired message in group id=" << group << " acquired=" << state.acquired ); } void MessageGroupManager::requeued( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.acquired != 0 ); state.acquired -= 1; if (state.acquired == 0 && !state.owner.empty()) { + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << state.owner << " released group id=" << gs->first); state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? } + QPID_LOG( trace, "group queue " << queue->getName() << + ": requeued message to group id=" << group << " acquired=" << state.acquired ); } void MessageGroupManager::dequeued( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.total != 0 ); - state.total -= 1; + uint32_t total = state.total -= 1; assert( state.acquired != 0 ); state.acquired -= 1; - if (state.total == 0) messageGroups.erase( gs ); - else if (state.acquired == 0 && !state.owner.empty()) { - state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + if (state.total == 0) { + QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first); + messageGroups.erase( gs ); + } else { + if (state.acquired == 0 && !state.owner.empty()) { + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << state.owner << " released group id=" << gs->first); + state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + } } + QPID_LOG( trace, "group queue " << queue->getName() << + ": dequeued message from group id=" << group << " total=" << total ); } void MessageGroupManager::consumerAdded( const Consumer& c ) { - bool unique = consumers.insert( c.getName() ).second; + const std::string& name(c.getName()); + bool unique = consumers.insert( name ).second; (void) unique; assert( unique ); + QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer name=" << name ); } void MessageGroupManager::consumerRemoved( const Consumer& c ) { - size_t count = consumers.erase( c.getName() ); + const std::string& name(c.getName()); + size_t count = consumers.erase( name ); (void) count; assert( count == 1 ); bool needReset = false; - for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin(); + for (GroupMap::iterator gs = messageGroups.begin(); gs != messageGroups.end(); ++gs) { GroupState& state( gs->second ); - if (state.owner == c.getName()) { + if (state.owner == name) { state.owner.clear(); needReset = true; + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << name << " released group id=" << gs->first); } } if (needReset) { // KAG TODO: How do I invalidate all consumers that need invalidating???? } + QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name ); } bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock& l) + const Mutex::ScopedLock& ) { - // KAG TODO: FIX!!! - return MessageAllocator::nextMessage( c, next, l ); + Messages& messages(queue->getMessages()); + + if (messages.empty()) + return false; + + if (c->preAcquires()) { // not browsing + next = messages.front(); + QueuedMessage current; + do { + current = next; + /** @todo KAG: horrifingly suboptimal - optimize */ + std::string group( getGroupId( current ) ); + GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */ + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (state.owner.empty()) { + state.owner = c->getName(); + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << c->getName() << " has acquired group id=" << group); + return true; + } + if (state.owner == c->getName()) { + return true; + } + } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */ + return false; + } else if (messages.next(c->position, next)) + return true; + return false; } bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage& qm, const Mutex::ScopedLock&) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); if (state.owner.empty()) { state.owner = consumer; + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << consumer << " has acquired group id=" << gs->first); return true; } return state.owner == consumer; } +namespace { + const std::string GroupQueryKey("qpid.message_group_queue"); + const std::string GroupHeaderKey("group_header_key"); + const std::string GroupStateKey("group_state"); + const std::string GroupIdKey("group_id"); + const std::string GroupMsgCount("msg_count"); + const std::string GroupTimestamp("timestamp"); + const std::string GroupConsumer("consumer"); +} + +void MessageGroupManager::query(qpid::types::Variant::Map& status, + const Mutex::ScopedLock&) const +{ + /** Add a description of the current state of the message groups for this queue. + FORMAT: + { "qpid.message_group_queue": + { "group_header_key" : "<KEY>", + "group_state" : + [ { "group_id" : "<name>", + "msg_count" : <int>, + "timestamp" : <absTime>, + "consumer" : <consumer name> }, + {...} // one for each known group + ] + } + } + **/ + + assert(status.find(GroupQueryKey) == status.end()); + qpid::types::Variant::Map state; + qpid::types::Variant::List groups; + + state[GroupHeaderKey] = groupIdHeader; + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + qpid::types::Variant::Map info; + info[GroupIdKey] = g->first; + info[GroupMsgCount] = g->second.total; + info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GroupConsumer] = g->second.owner; + groups.push_back(info); + } + state[GroupStateKey] = groups; + status[GroupQueryKey] = state; +} + +bool MessageGroupManager::match(const qpid::types::Variant::Map* filter, + const QueuedMessage& message) const +{ + if (!filter) return true; + qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter ); + if (i == filter->end()) return true; + if (i->second.asString() == getGroupId(message)) return true; + return false; +} + +boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, + const qpid::framing::FieldTable& settings ) +{ + boost::shared_ptr<MessageGroupManager> empty; + + if (settings.isSet(qpidMessageGroupKey)) { + + Queue::Disposition qt = q->getDisposition(); + + if (qt == Queue::LVQ) { + QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName()); + return empty; + } + if (qt == Queue::PRIORITY) { + QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName()); + return empty; + } + std::string headerKey = settings.getAsString(qpidMessageGroupKey); + if (headerKey.empty()) { + QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); + return empty; + } + unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); + + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) ); + + q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) ); + + QPID_LOG( debug, "Configured Queue '" << q->getName() << + "' for message grouping using header key '" << headerKey << "'" << + " (timestamp=" << timestamp << ")"); + return manager; + } + return empty; +} @@ -1526,5 +1756,13 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&, } +// default match - ignore filter and always match. +bool MessageAllocator::match(const qpid::types::Variant::Map*, + const QueuedMessage&) const +{ + return true; +} + + diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 5c895ceb1b..67da0a1403 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -60,7 +60,7 @@ class QueueEvents; class QueueRegistry; class TransactionContext; class MessageAllocator; - + /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to @@ -70,6 +70,10 @@ class MessageAllocator; class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { + public: + enum Disposition {FIFO, LVQ, PRIORITY, GROUP}; + + private: struct UsageBarrier { Queue& parent; @@ -129,7 +133,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; - std::auto_ptr<MessageAllocator> allocator; + boost::shared_ptr<MessageAllocator> allocator; + Disposition type; + void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -262,11 +268,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + uint32_t purge(const uint32_t purge_request=0, //defaults to all messages + boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), + const ::qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -353,6 +362,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { @@ -402,6 +412,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); + + Disposition getDisposition() const { return type; } }; } } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 0eb3c9d194..e3dfa1452d 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -56,12 +56,12 @@ class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr<TestConsumer> shared_ptr; - intrusive_ptr<Message> last; + QueuedMessage last; bool received; - TestConsumer(bool acquire = true):Consumer("test", acquire), received(false) {}; + TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ - last = msg.payload; + last = msg; received = true; return true; }; @@ -148,16 +148,16 @@ QPID_AUTO_TEST_CASE(testConsumers){ queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); + BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); - BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); + BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); //Test cancellation: queue->cancel(c1); @@ -213,7 +213,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ if (!consumer->received) sleep(2); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; @@ -297,14 +297,14 @@ QPID_AUTO_TEST_CASE(testSeek){ queue->deliver(msg2); queue->deliver(msg3); - TestConsumer::shared_ptr consumer(new TestConsumer(false)); + TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); SequenceNumber seq(2); consumer->position = seq; QueuedMessage qm; queue->dispatch(consumer); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); queue->dispatch(consumer); queue->dispatch(consumer); // make sure over-run is safe @@ -571,7 +571,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ // set mode to no browse and check args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); - TestConsumer::shared_ptr c1(new TestConsumer(false)); + TestConsumer::shared_ptr c1(new TestConsumer("test", false)); queue->dispatch(c1); queue->dispatch(c1); @@ -705,6 +705,67 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } +QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { + FieldTable args; + Queue::shared_ptr queue(new Queue("my_queue", true)); + args.setString("qpid.group_header_key", "GROUP-ID"); + queue->configure(args); + + for (int i = 0; i < 3; ++i) { + intrusive_ptr<Message> msg1 = create_message("e", "A"); + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","a"); + queue->deliver(msg1); + + intrusive_ptr<Message> msg2 = create_message("e", "A"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","b"); + queue->deliver(msg2); + } + + BOOST_CHECK_EQUAL(uint32_t(6), queue->getMessageCount()); + + TestConsumer::shared_ptr c1(new TestConsumer("C1")); + TestConsumer::shared_ptr c2(new TestConsumer("C2")); + + queue->consume(c1); + queue->consume(c2); + + std::deque<QueuedMessage> dequeMe; + + queue->dispatch(c1); // now owns group "a" + dequeMe.push_back(c1->last); + queue->dispatch(c2); // now owns group "b" + dequeMe.push_back(c2->last); + + queue->dispatch(c2); // should skip next "a", get last "b" + std::string group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + dequeMe.push_back(c2->last); + BOOST_CHECK_EQUAL( group, std::string("b") ); + + queue->dispatch(c1); // should get last "a" + group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + dequeMe.push_back(c1->last); + BOOST_CHECK_EQUAL( group, std::string("a") ); + + // now "free up" the groups + while (!dequeMe.empty()) { + queue->dequeue( 0, dequeMe.front() ); + dequeMe.pop_front(); + } + + // now c2 should be able to acquire group "a", and c1 group "b" + + queue->dispatch(c2); + group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + BOOST_CHECK_EQUAL( group, std::string("a") ); + + queue->dispatch(c1); + group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + BOOST_CHECK_EQUAL( group, std::string("b") ); + + queue->cancel(c1); + queue->cancel(c2); +} + QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ TestMessageStoreOC testStore; diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 9f54b0cd31..f6b195f7dd 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -92,6 +92,7 @@ <arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/> <arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/> <arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/> + <arg name="filter" dir="I" type="map" default="{}" desc="if specified, move only those messages matching this filter"/> </method> <method name="setLogLevel" desc="Set the log level"> @@ -115,6 +116,13 @@ <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> </method> + <method name="query" desc="Query the current state of an object."> + <arg name="type" dir="I" type="sstr" desc="The type of object to query."/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/> + <arg name="results" dir="O" type="map" desc="A snapshot of the object's state."/> + </method> + + </class> <!-- @@ -180,12 +188,14 @@ <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> + <arg name="filter" dir="I" type="map" default="{}" desc="if specified, purge only those messages matching this filter"/> </method> <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> <arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/> <arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/> + <arg name="filter" dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/> </method> </class> diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py index 952878e0b7..5aaa1a7c7d 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py @@ -156,7 +156,7 @@ class ManagementTest (TestBase010): queues = self.qmf.getObjects(_class="queue") "Move 10 messages from src-queue to dest-queue" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {}) self.assertEqual (result.status, 0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -166,7 +166,7 @@ class ManagementTest (TestBase010): self.assertEqual (dq.msgDepth,10) "Move all remaining messages to destination" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {}) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -176,16 +176,16 @@ class ManagementTest (TestBase010): self.assertEqual (dq.msgDepth,20) "Use a bad source queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {}) self.assertEqual (result.status,4) "Use a bad destination queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {}) self.assertEqual (result.status,4) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {}) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] @@ -225,19 +225,19 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] "Purge top message from purge-queue" - result = pq.purge(1) + result = pq.purge(1, {}) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,19) "Purge top 9 messages from purge-queue" - result = pq.purge(9) + result = pq.purge(9, {}) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,10) "Purge all messages from purge-queue" - result = pq.purge(0) + result = pq.purge(0, {}) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) @@ -263,7 +263,7 @@ class ManagementTest (TestBase010): #reroute messages from test queue to amq.fanout (and hence to #rerouted queue): pq = self.qmf.getObjects(_class="queue", name="test-queue")[0] - result = pq.reroute(0, False, "amq.fanout") + result = pq.reroute(0, False, "amq.fanout", {}) self.assertEqual(result.status, 0) #verify messages are all rerouted: @@ -301,7 +301,7 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] "Reroute top message from reroute-queue to alternate exchange" - result = pq.reroute(1, True, "") + result = pq.reroute(1, True, "", {}) self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] @@ -309,7 +309,7 @@ class ManagementTest (TestBase010): self.assertEqual(aq.msgDepth,1) "Reroute top 9 messages from reroute-queue to alt.direct2" - result = pq.reroute(9, False, "alt.direct2") + result = pq.reroute(9, False, "alt.direct2", {}) self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] @@ -317,11 +317,11 @@ class ManagementTest (TestBase010): self.assertEqual(aq.msgDepth,9) "Reroute using a non-existent exchange" - result = pq.reroute(0, False, "amq.nosuchexchange") + result = pq.reroute(0, False, "amq.nosuchexchange", {}) self.assertEqual(result.status, 4) "Reroute all messages from reroute-queue" - result = pq.reroute(0, False, "alt.direct2") + result = pq.reroute(0, False, "alt.direct2", {}) self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] @@ -337,7 +337,7 @@ class ManagementTest (TestBase010): session.message_transfer(destination="amq.direct", message=msg) "Reroute onto the same queue" - result = pq.reroute(0, False, "amq.direct") + result = pq.reroute(0, False, "amq.direct", {}) self.assertEqual(result.status, 0) pq.update() self.assertEqual(pq.msgDepth,20) @@ -365,7 +365,7 @@ class ManagementTest (TestBase010): # 4. Call reroute on queue Y and specify that messages should # be sent to exchange A y = self.qmf.getObjects(_class="queue", name="Y")[0] - result = y.reroute(1, False, "A") + result = y.reroute(1, False, "A", {}) self.assertEqual(result.status, 0) # 5. verify that the message is rerouted through B (as A has |