summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-08-16 00:37:33 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-08-16 00:37:33 +0000
commit7762a152b3dabd859089abb388268101b2c72ea0 (patch)
treedd8eb0e638e04857b4d5fe56a6a70b3974c38935
parent0ed156cc8320a0f6baa2ed255831147b5973c247 (diff)
downloadqpid-python-7762a152b3dabd859089abb388268101b2c72ea0.tar.gz
QPID-3346: checkpoint - added mgmt interfaces, fleshed out sub-optimal implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1158073 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp58
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h10
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Messages.h8
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp438
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h20
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp81
-rw-r--r--qpid/specs/management-schema.xml10
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/management.py30
16 files changed, 559 insertions, 134 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index f80e0f1e61..a7a1370801 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -37,6 +37,7 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
@@ -452,7 +453,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
@@ -482,6 +483,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_QUERY :
+ {
+ _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
+ status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -654,6 +662,49 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
}
+Manageable::status_t Broker::queryObject(const std::string& type,
+ const std::string& name,
+ Variant::Map& results,
+ const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
+
+ if (type == TYPE_QUEUE)
+ return queryQueue( name, userId, connectionId, results );
+
+ if (type == TYPE_EXCHANGE ||
+ type == TYPE_TOPIC ||
+ type == TYPE_BINDING)
+ return Manageable::STATUS_NOT_IMPLEMENTED;
+
+ throw UnknownObjectType(type);
+}
+
+Manageable::status_t Broker::queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& /*connectionId*/,
+ Variant::Map& results )
+{
+ (void) results;
+ if (acl) {
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId));
+ }
+
+ boost::shared_ptr<Queue> q(queues.find(name));
+ if (!q) {
+ QPID_LOG(error, "Query failed: queue not found, name=" << name);
+ return Manageable::STATUS_UNKNOWN_OBJECT;
+ }
+ return Manageable::STATUS_OK;;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
@@ -723,7 +774,8 @@ void Broker::connect(
uint32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty)
+ uint32_t qty,
+ const Variant::Map& filter)
{
Queue::shared_ptr src_queue = queues.find(srcQueue);
if (!src_queue)
@@ -732,7 +784,7 @@ uint32_t Broker::queueMoveMessages(
if (!dest_queue)
return 0;
- return src_queue->move(dest_queue, qty);
+ return src_queue->move(dest_queue, qty, &filter);
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 40f7b6273f..1612726693 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -157,7 +157,12 @@ public:
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& options, const ConnectionState* context);
-
+ Manageable::status_t queryObject(const std::string& type, const std::string& name,
+ qpid::types::Variant::Map& results, const ConnectionState* context);
+ Manageable::status_t queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ qpid::types::Variant::Map& results);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -258,7 +263,8 @@ public:
*/
uint32_t queueMoveMessages( const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty);
+ uint32_t qty,
+ const qpid::types::Variant::Map& filter);
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index a811a86492..bc009f14d1 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -44,6 +44,11 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m
}
}
+bool LegacyLVQ::next(const QueuedMessage& message, QueuedMessage& next)
+{
+ return this->next(message.position, next);
+}
+
bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (MessageMap::next(position, message)) {
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
index dd0fd7aaec..33d4117845 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -42,6 +42,7 @@ class LegacyLVQ : public MessageMap
LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const QueuedMessage&, QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void removeIf(Predicate);
void setNoBrowse(bool);
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index 24b8f6f895..b4fdfb3d6a 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -74,6 +74,11 @@ bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage&
return find(position, message, false);
}
+bool MessageDeque::next(const QueuedMessage& message, QueuedMessage& next)
+{
+ return this->next(message.position, next);
+}
+
bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (messages.empty()) {
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
index 0e1aef2986..0726e1dee6 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.h
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -41,6 +41,7 @@ class MessageDeque : public Messages
bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const QueuedMessage&, QueuedMessage&);
QueuedMessage& front();
void pop();
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 39e23df533..504b2e033c 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -77,6 +77,11 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
+bool MessageMap::next(const QueuedMessage& message, QueuedMessage& next)
+{
+ return this->next(message.position, next);
+}
+
bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (!messages.empty() && position < front().position) {
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
index 1128a1d54a..9cfd9ea9af 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.h
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -47,6 +47,7 @@ class MessageMap : public Messages
virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
+ virtual bool next(const QueuedMessage&, QueuedMessage&);
QueuedMessage& front();
void pop();
diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h
index c535fd1936..4b3a6d764f 100644
--- a/qpid/cpp/src/qpid/broker/Messages.h
+++ b/qpid/cpp/src/qpid/broker/Messages.h
@@ -78,6 +78,14 @@ class Messages
virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
/**
+ * Return the next message based on the supplied queued message.
+ * The next messages is passed back via the second parameter.
+ * @todo replace with queue iterator
+ * @return true if there is another message, false otherwise.
+ */
+ virtual bool next(const QueuedMessage&, QueuedMessage&) = 0;
+
+ /**
* Note: Caller is responsible for ensuring that there is a front
* (e.g. empty() returns false)
*
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index e07e73d323..3305bb4f79 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -88,6 +88,25 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage&
return find(position, message, false);
}
+bool PriorityQueue::next(const QueuedMessage& message, QueuedMessage& next)
+{
+ uint p = getPriorityLevel(message);
+ QueuedMessage match;
+ match.position = message.position+1;
+ Deque::iterator m = lower_bound(messages[p].begin(), messages[p].end(), match);
+ if (m != messages[p].end()) {
+ next = *m;
+ return true;
+ }
+ while (p-- > 0) {
+ if (!messages[p].empty()) {
+ next = messages[p].front();
+ return true;
+ }
+ }
+ return false;
+}
+
bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
QueuedMessage match;
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
index 4bf9d26a9d..7464040e17 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.h
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -46,6 +46,7 @@ class PriorityQueue : public Messages
bool remove(const framing::SequenceNumber&, QueuedMessage&);
bool find(const framing::SequenceNumber&, QueuedMessage&);
bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const QueuedMessage&, QueuedMessage&);
QueuedMessage& front();
void pop();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 68dd2ae125..551584f9d9 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -42,6 +42,7 @@
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -111,8 +112,70 @@ class MessageAllocator
*/
virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm,
const Mutex::ScopedLock&);
+
+ /** hook to add any interesting management state to the status map (lock held) */
+ virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
+
+ /** for move, purge, reroute - check if message matches against a filter,
+ * return true if message matches.
+ */
+ virtual bool match(const qpid::types::Variant::Map* filter,
+ const QueuedMessage& message) const;
};
+
+
+class MessageGroupManager : public QueueObserver, public MessageAllocator
+{
+ const std::string groupIdHeader; // msg header holding group identifier
+ const unsigned int timestamp; // mark messages with timestamp if set
+
+ struct GroupState {
+ //const std::string group; // group identifier
+ //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
+ std::string owner; // consumer with outstanding acquired messages
+ uint32_t acquired; // count of outstanding acquired messages
+ uint32_t total; // count of enqueued messages in this group
+ GroupState() : acquired(0), total(0) {}
+ };
+ typedef std::map<std::string, struct GroupState> GroupMap;
+ typedef std::set<std::string> Consumers;
+
+ GroupMap messageGroups;
+ Consumers consumers;
+
+ static const std::string qpidMessageGroupKey;
+ static const std::string qpidMessageGroupTimestamp;
+ static const std::string qpidMessageGroupDefault;
+ static const std::string qpidMessageGroupFilter; // key for move/purge filter map
+
+ const std::string getGroupId( const QueuedMessage& qm ) const;
+
+ public:
+
+ static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings );
+
+ MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 )
+ : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {}
+ void enqueued( const QueuedMessage& qm );
+ void acquired( const QueuedMessage& qm );
+ void requeued( const QueuedMessage& qm );
+ void dequeued( const QueuedMessage& qm );
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
+ bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock&);
+ bool canAcquire(const std::string& consumer, const QueuedMessage& msg,
+ const Mutex::ScopedLock&);
+ void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const;
+ bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+};
+
+const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
+const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id");
+
}}
Queue::Queue(const string& _name, bool _autodelete,
@@ -140,7 +203,8 @@ Queue::Queue(const string& _name, bool _autodelete,
deleted(false),
barrier(*this),
autoDeleteTimeout(0),
- allocator(new MessageAllocator( this )) // KAG TODO: FIX!!
+ allocator(new MessageAllocator( this )),
+ type(FIFO)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -275,7 +339,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
}
}
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
+bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -535,6 +599,29 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
}
}
+
+namespace {
+ // for use with purge/move below - collect messages that match a given filter
+ struct Collector {
+ const uint32_t maxMatches;
+ const qpid::types::Variant::Map *filter;
+ std::deque<QueuedMessage> matches;
+ boost::shared_ptr<MessageAllocator> allocator;
+ Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m,
+ const qpid::types::Variant::Map *f)
+ : maxMatches(m), filter(f), allocator(a) {}
+ void operator() (QueuedMessage& qm)
+ {
+ if (maxMatches == 0 || matches.size() < maxMatches) {
+ if (allocator->match( filter, qm )) {
+ matches.push_back(qm);
+ }
+ }
+ }
+ };
+}
+
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
@@ -546,52 +633,61 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
+ *
+ * An optional filter can be supplied that will be applied against each message. The
+ * message is purged only if the filter matches. See MessageAllocator for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+ const qpid::types::Variant::Map *filter)
{
- Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
- std::deque<DeliverableMessage> rerouteQueue;
+ Collector c(allocator, purge_request, filter);
- uint32_t count = 0;
- // Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages->empty()) {
- if (dest.get()) {
- //
- // If there is a destination exchange, stage the messages onto a reroute queue
- // so they don't wind up getting purged more than once.
- //
- DeliverableMessage msg(messages->front().payload);
- rerouteQueue.push_back(msg);
+ Mutex::ScopedLock locker(messageLock);
+ messages->foreach( boost::bind<void>(boost::ref(c), _1) );
+
+ uint32_t count = c.matches.size();
+
+ // first remove all matches
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); qmsg++) {
+ /** @todo KAG: need a direct remove method here */
+ bool ok = acquire(qmsg->position, *qmsg);
+ (void) ok; assert(ok);
+ dequeue(0, *qmsg);
+ }
+
+ // now reroute if necessary
+ if (dest.get()) {
+ while (!c.matches.empty()) {
+ QueuedMessage msg = c.matches.front();
+ c.matches.pop_front();
+ assert(msg.payload);
+ DeliverableMessage dmsg(msg.payload);
+ dest->routeWithAlternate(dmsg);
}
- popAndDequeue();
- count++;
- }
-
- //
- // Re-route purged messages into the destination exchange. Note that there's no need
- // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
- //
- while (!rerouteQueue.empty()) {
- DeliverableMessage msg(rerouteQueue.front());
- rerouteQueue.pop_front();
- dest->routeWithAlternate(msg);
}
-
return count;
}
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter)
+{
+ Collector c(allocator, qty, filter);
+
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
- uint32_t count = 0; // count how many were moved for returning
+ messages->foreach( boost::bind<void>(boost::ref(c), _1) );
+
+ uint32_t count = c.matches.size();
- while((!qty || move_count--) && !messages->empty()) {
- QueuedMessage qmsg = messages->front();
- boost::intrusive_ptr<Message> msg = qmsg.payload;
- destq->deliver(msg); // deliver message to the destination queue
- popAndDequeue();
- count++;
+ while (!c.matches.empty()) {
+ QueuedMessage qmsg = c.matches.front();
+ c.matches.pop_front();
+ /** @todo KAG: need a direct remove method here */
+ bool ok = acquire(qmsg.position, qmsg);
+ (void) ok; assert(ok);
+ dequeue(0, qmsg);
+ assert(qmsg.payload);
+ destq->deliver(qmsg.payload);
}
return count;
}
@@ -614,6 +710,7 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage
{
if (messages->remove(position, msg)) {
acquired( msg );
+ ++dequeueSincePurge;
return true;
}
return false;
@@ -912,17 +1009,29 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ type = LVQ;
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ type = LVQ;
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ type = LVQ;
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ type = PRIORITY;
+ }
+ }
+
+ { // override default message allocator if message groups configured.
+ boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
+ if (ma) {
+ allocator = ma;
+ type = GROUP;
}
}
@@ -1181,7 +1290,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
case _qmf::Queue::METHOD_PURGE :
{
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
- purge(purgeArgs.i_request);
+ purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1202,7 +1311,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
}
}
- purge(rerouteArgs.i_request, dest);
+ purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1211,6 +1320,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
return status;
}
+
+void Queue::query(qpid::types::Variant::Map& results) const
+{
+ Mutex::ScopedLock locker(messageLock);
+ /** @todo add any interesting queue state into results */
+ if (allocator) allocator->query( results, messageLock );
+}
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
@@ -1357,145 +1474,258 @@ void Queue::UsageBarrier::destroy()
// KAG TBD: flesh out...
-class MessageGroupManager : public QueueObserver, public MessageAllocator
-{
- const std::string groupIdHeader; // msg header holding group identifier
- struct GroupState {
- const std::string group; // group identifier
- //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
- std::string owner; // consumer with outstanding acquired messages
- uint32_t acquired; // count of outstanding acquired messages
- uint32_t total; // count of enqueued messages in this group
- GroupState() : acquired(0), total(0) {}
- };
- std::map<std::string, struct GroupState> messageGroups;
- std::set<std::string> consumers;
-
- public:
-
- MessageGroupManager(const std::string& header, Queue *q )
- : QueueObserver(), MessageAllocator(q), groupIdHeader( header ) {}
- void enqueued( const QueuedMessage& qm );
- void removed( const QueuedMessage& qm );
- void requeued( const QueuedMessage& qm );
- void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& );
- void consumerRemoved( const Consumer& );
- bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock&);
- bool canAcquire(const std::string& consumer, const QueuedMessage& msg,
- const Mutex::ScopedLock&);
-};
-namespace {
- const std::string NO_GROUP("");
- const std::string getGroupId( const QueuedMessage& qm, const std::string& key )
- {
- const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
- if (!headers) return NO_GROUP;
- return headers->getAsString(key);
- }
+const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+{
+ const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+ if (!headers) return qpidMessageGroupDefault;
+ FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
+ return id->get<std::string>();
}
void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- messageGroups[group].total++;
+ std::string group( getGroupId(qm) );
+ uint32_t total = ++messageGroups[group].total;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": added message to group id=" << group << " total=" << total );
}
-void MessageGroupManager::removed( const QueuedMessage& qm )
+void MessageGroupManager::acquired( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
- gs->second.acquired += 1;
+ GroupState& state( gs->second );
+ state.acquired += 1;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": acquired message in group id=" << group << " acquired=" << state.acquired );
}
void MessageGroupManager::requeued( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
assert( state.acquired != 0 );
state.acquired -= 1;
if (state.acquired == 0 && !state.owner.empty()) {
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
}
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": requeued message to group id=" << group << " acquired=" << state.acquired );
}
void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
assert( state.total != 0 );
- state.total -= 1;
+ uint32_t total = state.total -= 1;
assert( state.acquired != 0 );
state.acquired -= 1;
- if (state.total == 0) messageGroups.erase( gs );
- else if (state.acquired == 0 && !state.owner.empty()) {
- state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ if (state.total == 0) {
+ QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first);
+ messageGroups.erase( gs );
+ } else {
+ if (state.acquired == 0 && !state.owner.empty()) {
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ }
}
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": dequeued message from group id=" << group << " total=" << total );
}
void MessageGroupManager::consumerAdded( const Consumer& c )
{
- bool unique = consumers.insert( c.getName() ).second;
+ const std::string& name(c.getName());
+ bool unique = consumers.insert( name ).second;
(void) unique; assert( unique );
+ QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer name=" << name );
}
void MessageGroupManager::consumerRemoved( const Consumer& c )
{
- size_t count = consumers.erase( c.getName() );
+ const std::string& name(c.getName());
+ size_t count = consumers.erase( name );
(void) count; assert( count == 1 );
bool needReset = false;
- for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin();
+ for (GroupMap::iterator gs = messageGroups.begin();
gs != messageGroups.end(); ++gs) {
GroupState& state( gs->second );
- if (state.owner == c.getName()) {
+ if (state.owner == name) {
state.owner.clear();
needReset = true;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << name << " released group id=" << gs->first);
}
}
if (needReset) {
// KAG TODO: How do I invalidate all consumers that need invalidating????
}
+ QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name );
}
bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock& l)
+ const Mutex::ScopedLock& )
{
- // KAG TODO: FIX!!!
- return MessageAllocator::nextMessage( c, next, l );
+ Messages& messages(queue->getMessages());
+
+ if (messages.empty())
+ return false;
+
+ if (c->preAcquires()) { // not browsing
+ next = messages.front();
+ QueuedMessage current;
+ do {
+ current = next;
+ /** @todo KAG: horrifingly suboptimal - optimize */
+ std::string group( getGroupId( current ) );
+ GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ if (state.owner.empty()) {
+ state.owner = c->getName();
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << c->getName() << " has acquired group id=" << group);
+ return true;
+ }
+ if (state.owner == c->getName()) {
+ return true;
+ }
+ } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */
+ return false;
+ } else if (messages.next(c->position, next))
+ return true;
+ return false;
}
bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage& qm,
const Mutex::ScopedLock&)
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
if (state.owner.empty()) {
state.owner = consumer;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << consumer << " has acquired group id=" << gs->first);
return true;
}
return state.owner == consumer;
}
+namespace {
+ const std::string GroupQueryKey("qpid.message_group_queue");
+ const std::string GroupHeaderKey("group_header_key");
+ const std::string GroupStateKey("group_state");
+ const std::string GroupIdKey("group_id");
+ const std::string GroupMsgCount("msg_count");
+ const std::string GroupTimestamp("timestamp");
+ const std::string GroupConsumer("consumer");
+}
+
+void MessageGroupManager::query(qpid::types::Variant::Map& status,
+ const Mutex::ScopedLock&) const
+{
+ /** Add a description of the current state of the message groups for this queue.
+ FORMAT:
+ { "qpid.message_group_queue":
+ { "group_header_key" : "<KEY>",
+ "group_state" :
+ [ { "group_id" : "<name>",
+ "msg_count" : <int>,
+ "timestamp" : <absTime>,
+ "consumer" : <consumer name> },
+ {...} // one for each known group
+ ]
+ }
+ }
+ **/
+
+ assert(status.find(GroupQueryKey) == status.end());
+ qpid::types::Variant::Map state;
+ qpid::types::Variant::List groups;
+
+ state[GroupHeaderKey] = groupIdHeader;
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+ qpid::types::Variant::Map info;
+ info[GroupIdKey] = g->first;
+ info[GroupMsgCount] = g->second.total;
+ info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+ info[GroupConsumer] = g->second.owner;
+ groups.push_back(info);
+ }
+ state[GroupStateKey] = groups;
+ status[GroupQueryKey] = state;
+}
+
+bool MessageGroupManager::match(const qpid::types::Variant::Map* filter,
+ const QueuedMessage& message) const
+{
+ if (!filter) return true;
+ qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter );
+ if (i == filter->end()) return true;
+ if (i->second.asString() == getGroupId(message)) return true;
+ return false;
+}
+
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
+ const qpid::framing::FieldTable& settings )
+{
+ boost::shared_ptr<MessageGroupManager> empty;
+
+ if (settings.isSet(qpidMessageGroupKey)) {
+
+ Queue::Disposition qt = q->getDisposition();
+
+ if (qt == Queue::LVQ) {
+ QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName());
+ return empty;
+ }
+ if (qt == Queue::PRIORITY) {
+ QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName());
+ return empty;
+ }
+ std::string headerKey = settings.getAsString(qpidMessageGroupKey);
+ if (headerKey.empty()) {
+ QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
+ return empty;
+ }
+ unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
+
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) );
+
+ q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
+
+ QPID_LOG( debug, "Configured Queue '" << q->getName() <<
+ "' for message grouping using header key '" << headerKey << "'" <<
+ " (timestamp=" << timestamp << ")");
+ return manager;
+ }
+ return empty;
+}
@@ -1526,5 +1756,13 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&,
}
+// default match - ignore filter and always match.
+bool MessageAllocator::match(const qpid::types::Variant::Map*,
+ const QueuedMessage&) const
+{
+ return true;
+}
+
+
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 5c895ceb1b..67da0a1403 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -60,7 +60,7 @@ class QueueEvents;
class QueueRegistry;
class TransactionContext;
class MessageAllocator;
-
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -70,6 +70,10 @@ class MessageAllocator;
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
+ public:
+ enum Disposition {FIFO, LVQ, PRIORITY, GROUP};
+
+ private:
struct UsageBarrier
{
Queue& parent;
@@ -129,7 +133,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
- std::auto_ptr<MessageAllocator> allocator;
+ boost::shared_ptr<MessageAllocator> allocator;
+ Disposition type;
+
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -262,11 +268,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
+ boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
+ const ::qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -353,6 +362,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+ void query(::qpid::types::Variant::Map&) const;
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
@@ -402,6 +412,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
+
+ Disposition getDisposition() const { return type; }
};
}
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 0eb3c9d194..e3dfa1452d 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -56,12 +56,12 @@ class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- intrusive_ptr<Message> last;
+ QueuedMessage last;
bool received;
- TestConsumer(bool acquire = true):Consumer("test", acquire), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
- last = msg.payload;
+ last = msg;
received = true;
return true;
};
@@ -148,16 +148,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
//Test cancellation:
queue->cancel(c1);
@@ -213,7 +213,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;
@@ -297,14 +297,14 @@ QPID_AUTO_TEST_CASE(testSeek){
queue->deliver(msg2);
queue->deliver(msg3);
- TestConsumer::shared_ptr consumer(new TestConsumer(false));
+ TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
SequenceNumber seq(2);
consumer->position = seq;
QueuedMessage qm;
queue->dispatch(consumer);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
@@ -571,7 +571,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
// set mode to no browse and check
args.setOrdering(client::LVQ_NO_BROWSE);
queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer(false));
+ TestConsumer::shared_ptr c1(new TestConsumer("test", false));
queue->dispatch(c1);
queue->dispatch(c1);
@@ -705,6 +705,67 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ queue->configure(args);
+
+ for (int i = 0; i < 3; ++i) {
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","a");
+ queue->deliver(msg1);
+
+ intrusive_ptr<Message> msg2 = create_message("e", "A");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","b");
+ queue->deliver(msg2);
+ }
+
+ BOOST_CHECK_EQUAL(uint32_t(6), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMe;
+
+ queue->dispatch(c1); // now owns group "a"
+ dequeMe.push_back(c1->last);
+ queue->dispatch(c2); // now owns group "b"
+ dequeMe.push_back(c2->last);
+
+ queue->dispatch(c2); // should skip next "a", get last "b"
+ std::string group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ dequeMe.push_back(c2->last);
+ BOOST_CHECK_EQUAL( group, std::string("b") );
+
+ queue->dispatch(c1); // should get last "a"
+ group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ dequeMe.push_back(c1->last);
+ BOOST_CHECK_EQUAL( group, std::string("a") );
+
+ // now "free up" the groups
+ while (!dequeMe.empty()) {
+ queue->dequeue( 0, dequeMe.front() );
+ dequeMe.pop_front();
+ }
+
+ // now c2 should be able to acquire group "a", and c1 group "b"
+
+ queue->dispatch(c2);
+ group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ BOOST_CHECK_EQUAL( group, std::string("a") );
+
+ queue->dispatch(c1);
+ group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ BOOST_CHECK_EQUAL( group, std::string("b") );
+
+ queue->cancel(c1);
+ queue->cancel(c2);
+}
+
QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
TestMessageStoreOC testStore;
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 9f54b0cd31..f6b195f7dd 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -92,6 +92,7 @@
<arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/>
<arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/>
<arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, move only those messages matching this filter"/>
</method>
<method name="setLogLevel" desc="Set the log level">
@@ -115,6 +116,13 @@
<arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
</method>
+ <method name="query" desc="Query the current state of an object.">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>
+ <arg name="results" dir="O" type="map" desc="A snapshot of the object's state."/>
+ </method>
+
+
</class>
<!--
@@ -180,12 +188,14 @@
<method name="purge" desc="Discard all or some messages on a queue">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, purge only those messages matching this filter"/>
</method>
<method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
<arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
<arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/>
</method>
</class>
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
index 952878e0b7..5aaa1a7c7d 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -156,7 +156,7 @@ class ManagementTest (TestBase010):
queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
self.assertEqual (result.status, 0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -166,7 +166,7 @@ class ManagementTest (TestBase010):
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -176,16 +176,16 @@ class ManagementTest (TestBase010):
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,4)
"Use a bad destination queue name"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -225,19 +225,19 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
- result = pq.purge(1)
+ result = pq.purge(1, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
- result = pq.purge(9)
+ result = pq.purge(9, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
- result = pq.purge(0)
+ result = pq.purge(0, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
@@ -263,7 +263,7 @@ class ManagementTest (TestBase010):
#reroute messages from test queue to amq.fanout (and hence to
#rerouted queue):
pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
- result = pq.reroute(0, False, "amq.fanout")
+ result = pq.reroute(0, False, "amq.fanout", {})
self.assertEqual(result.status, 0)
#verify messages are all rerouted:
@@ -301,7 +301,7 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
"Reroute top message from reroute-queue to alternate exchange"
- result = pq.reroute(1, True, "")
+ result = pq.reroute(1, True, "", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
@@ -309,7 +309,7 @@ class ManagementTest (TestBase010):
self.assertEqual(aq.msgDepth,1)
"Reroute top 9 messages from reroute-queue to alt.direct2"
- result = pq.reroute(9, False, "alt.direct2")
+ result = pq.reroute(9, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -317,11 +317,11 @@ class ManagementTest (TestBase010):
self.assertEqual(aq.msgDepth,9)
"Reroute using a non-existent exchange"
- result = pq.reroute(0, False, "amq.nosuchexchange")
+ result = pq.reroute(0, False, "amq.nosuchexchange", {})
self.assertEqual(result.status, 4)
"Reroute all messages from reroute-queue"
- result = pq.reroute(0, False, "alt.direct2")
+ result = pq.reroute(0, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -337,7 +337,7 @@ class ManagementTest (TestBase010):
session.message_transfer(destination="amq.direct", message=msg)
"Reroute onto the same queue"
- result = pq.reroute(0, False, "amq.direct")
+ result = pq.reroute(0, False, "amq.direct", {})
self.assertEqual(result.status, 0)
pq.update()
self.assertEqual(pq.msgDepth,20)
@@ -365,7 +365,7 @@ class ManagementTest (TestBase010):
# 4. Call reroute on queue Y and specify that messages should
# be sent to exchange A
y = self.qmf.getObjects(_class="queue", name="Y")[0]
- result = y.reroute(1, False, "A")
+ result = y.reroute(1, False, "A", {})
self.assertEqual(result.status, 0)
# 5. verify that the message is rerouted through B (as A has