summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp438
1 files changed, 338 insertions, 100 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 68dd2ae125..551584f9d9 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -42,6 +42,7 @@
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -111,8 +112,70 @@ class MessageAllocator
*/
virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm,
const Mutex::ScopedLock&);
+
+ /** hook to add any interesting management state to the status map (lock held) */
+ virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
+
+ /** for move, purge, reroute - check if message matches against a filter,
+ * return true if message matches.
+ */
+ virtual bool match(const qpid::types::Variant::Map* filter,
+ const QueuedMessage& message) const;
};
+
+
+class MessageGroupManager : public QueueObserver, public MessageAllocator
+{
+ const std::string groupIdHeader; // msg header holding group identifier
+ const unsigned int timestamp; // mark messages with timestamp if set
+
+ struct GroupState {
+ //const std::string group; // group identifier
+ //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
+ std::string owner; // consumer with outstanding acquired messages
+ uint32_t acquired; // count of outstanding acquired messages
+ uint32_t total; // count of enqueued messages in this group
+ GroupState() : acquired(0), total(0) {}
+ };
+ typedef std::map<std::string, struct GroupState> GroupMap;
+ typedef std::set<std::string> Consumers;
+
+ GroupMap messageGroups;
+ Consumers consumers;
+
+ static const std::string qpidMessageGroupKey;
+ static const std::string qpidMessageGroupTimestamp;
+ static const std::string qpidMessageGroupDefault;
+ static const std::string qpidMessageGroupFilter; // key for move/purge filter map
+
+ const std::string getGroupId( const QueuedMessage& qm ) const;
+
+ public:
+
+ static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings );
+
+ MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 )
+ : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {}
+ void enqueued( const QueuedMessage& qm );
+ void acquired( const QueuedMessage& qm );
+ void requeued( const QueuedMessage& qm );
+ void dequeued( const QueuedMessage& qm );
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
+ bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock&);
+ bool canAcquire(const std::string& consumer, const QueuedMessage& msg,
+ const Mutex::ScopedLock&);
+ void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const;
+ bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+};
+
+const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
+const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id");
+
}}
Queue::Queue(const string& _name, bool _autodelete,
@@ -140,7 +203,8 @@ Queue::Queue(const string& _name, bool _autodelete,
deleted(false),
barrier(*this),
autoDeleteTimeout(0),
- allocator(new MessageAllocator( this )) // KAG TODO: FIX!!
+ allocator(new MessageAllocator( this )),
+ type(FIFO)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -275,7 +339,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
}
}
-bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
+bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -535,6 +599,29 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
}
}
+
+namespace {
+ // for use with purge/move below - collect messages that match a given filter
+ struct Collector {
+ const uint32_t maxMatches;
+ const qpid::types::Variant::Map *filter;
+ std::deque<QueuedMessage> matches;
+ boost::shared_ptr<MessageAllocator> allocator;
+ Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m,
+ const qpid::types::Variant::Map *f)
+ : maxMatches(m), filter(f), allocator(a) {}
+ void operator() (QueuedMessage& qm)
+ {
+ if (maxMatches == 0 || matches.size() < maxMatches) {
+ if (allocator->match( filter, qm )) {
+ matches.push_back(qm);
+ }
+ }
+ }
+ };
+}
+
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
@@ -546,52 +633,61 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
+ *
+ * An optional filter can be supplied that will be applied against each message. The
+ * message is purged only if the filter matches. See MessageAllocator for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+ const qpid::types::Variant::Map *filter)
{
- Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
- std::deque<DeliverableMessage> rerouteQueue;
+ Collector c(allocator, purge_request, filter);
- uint32_t count = 0;
- // Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages->empty()) {
- if (dest.get()) {
- //
- // If there is a destination exchange, stage the messages onto a reroute queue
- // so they don't wind up getting purged more than once.
- //
- DeliverableMessage msg(messages->front().payload);
- rerouteQueue.push_back(msg);
+ Mutex::ScopedLock locker(messageLock);
+ messages->foreach( boost::bind<void>(boost::ref(c), _1) );
+
+ uint32_t count = c.matches.size();
+
+ // first remove all matches
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); qmsg++) {
+ /** @todo KAG: need a direct remove method here */
+ bool ok = acquire(qmsg->position, *qmsg);
+ (void) ok; assert(ok);
+ dequeue(0, *qmsg);
+ }
+
+ // now reroute if necessary
+ if (dest.get()) {
+ while (!c.matches.empty()) {
+ QueuedMessage msg = c.matches.front();
+ c.matches.pop_front();
+ assert(msg.payload);
+ DeliverableMessage dmsg(msg.payload);
+ dest->routeWithAlternate(dmsg);
}
- popAndDequeue();
- count++;
- }
-
- //
- // Re-route purged messages into the destination exchange. Note that there's no need
- // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
- //
- while (!rerouteQueue.empty()) {
- DeliverableMessage msg(rerouteQueue.front());
- rerouteQueue.pop_front();
- dest->routeWithAlternate(msg);
}
-
return count;
}
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter)
+{
+ Collector c(allocator, qty, filter);
+
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
- uint32_t count = 0; // count how many were moved for returning
+ messages->foreach( boost::bind<void>(boost::ref(c), _1) );
+
+ uint32_t count = c.matches.size();
- while((!qty || move_count--) && !messages->empty()) {
- QueuedMessage qmsg = messages->front();
- boost::intrusive_ptr<Message> msg = qmsg.payload;
- destq->deliver(msg); // deliver message to the destination queue
- popAndDequeue();
- count++;
+ while (!c.matches.empty()) {
+ QueuedMessage qmsg = c.matches.front();
+ c.matches.pop_front();
+ /** @todo KAG: need a direct remove method here */
+ bool ok = acquire(qmsg.position, qmsg);
+ (void) ok; assert(ok);
+ dequeue(0, qmsg);
+ assert(qmsg.payload);
+ destq->deliver(qmsg.payload);
}
return count;
}
@@ -614,6 +710,7 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage
{
if (messages->remove(position, msg)) {
acquired( msg );
+ ++dequeueSincePurge;
return true;
}
return false;
@@ -912,17 +1009,29 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ type = LVQ;
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ type = LVQ;
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ type = LVQ;
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ type = PRIORITY;
+ }
+ }
+
+ { // override default message allocator if message groups configured.
+ boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
+ if (ma) {
+ allocator = ma;
+ type = GROUP;
}
}
@@ -1181,7 +1290,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
case _qmf::Queue::METHOD_PURGE :
{
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
- purge(purgeArgs.i_request);
+ purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1202,7 +1311,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
}
}
- purge(rerouteArgs.i_request, dest);
+ purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1211,6 +1320,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
return status;
}
+
+void Queue::query(qpid::types::Variant::Map& results) const
+{
+ Mutex::ScopedLock locker(messageLock);
+ /** @todo add any interesting queue state into results */
+ if (allocator) allocator->query( results, messageLock );
+}
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
@@ -1357,145 +1474,258 @@ void Queue::UsageBarrier::destroy()
// KAG TBD: flesh out...
-class MessageGroupManager : public QueueObserver, public MessageAllocator
-{
- const std::string groupIdHeader; // msg header holding group identifier
- struct GroupState {
- const std::string group; // group identifier
- //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
- std::string owner; // consumer with outstanding acquired messages
- uint32_t acquired; // count of outstanding acquired messages
- uint32_t total; // count of enqueued messages in this group
- GroupState() : acquired(0), total(0) {}
- };
- std::map<std::string, struct GroupState> messageGroups;
- std::set<std::string> consumers;
-
- public:
-
- MessageGroupManager(const std::string& header, Queue *q )
- : QueueObserver(), MessageAllocator(q), groupIdHeader( header ) {}
- void enqueued( const QueuedMessage& qm );
- void removed( const QueuedMessage& qm );
- void requeued( const QueuedMessage& qm );
- void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& );
- void consumerRemoved( const Consumer& );
- bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock&);
- bool canAcquire(const std::string& consumer, const QueuedMessage& msg,
- const Mutex::ScopedLock&);
-};
-namespace {
- const std::string NO_GROUP("");
- const std::string getGroupId( const QueuedMessage& qm, const std::string& key )
- {
- const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
- if (!headers) return NO_GROUP;
- return headers->getAsString(key);
- }
+const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+{
+ const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+ if (!headers) return qpidMessageGroupDefault;
+ FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
+ return id->get<std::string>();
}
void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- messageGroups[group].total++;
+ std::string group( getGroupId(qm) );
+ uint32_t total = ++messageGroups[group].total;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": added message to group id=" << group << " total=" << total );
}
-void MessageGroupManager::removed( const QueuedMessage& qm )
+void MessageGroupManager::acquired( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
- gs->second.acquired += 1;
+ GroupState& state( gs->second );
+ state.acquired += 1;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": acquired message in group id=" << group << " acquired=" << state.acquired );
}
void MessageGroupManager::requeued( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
assert( state.acquired != 0 );
state.acquired -= 1;
if (state.acquired == 0 && !state.owner.empty()) {
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
}
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": requeued message to group id=" << group << " acquired=" << state.acquired );
}
void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
assert( state.total != 0 );
- state.total -= 1;
+ uint32_t total = state.total -= 1;
assert( state.acquired != 0 );
state.acquired -= 1;
- if (state.total == 0) messageGroups.erase( gs );
- else if (state.acquired == 0 && !state.owner.empty()) {
- state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ if (state.total == 0) {
+ QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first);
+ messageGroups.erase( gs );
+ } else {
+ if (state.acquired == 0 && !state.owner.empty()) {
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ }
}
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": dequeued message from group id=" << group << " total=" << total );
}
void MessageGroupManager::consumerAdded( const Consumer& c )
{
- bool unique = consumers.insert( c.getName() ).second;
+ const std::string& name(c.getName());
+ bool unique = consumers.insert( name ).second;
(void) unique; assert( unique );
+ QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer name=" << name );
}
void MessageGroupManager::consumerRemoved( const Consumer& c )
{
- size_t count = consumers.erase( c.getName() );
+ const std::string& name(c.getName());
+ size_t count = consumers.erase( name );
(void) count; assert( count == 1 );
bool needReset = false;
- for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin();
+ for (GroupMap::iterator gs = messageGroups.begin();
gs != messageGroups.end(); ++gs) {
GroupState& state( gs->second );
- if (state.owner == c.getName()) {
+ if (state.owner == name) {
state.owner.clear();
needReset = true;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << name << " released group id=" << gs->first);
}
}
if (needReset) {
// KAG TODO: How do I invalidate all consumers that need invalidating????
}
+ QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name );
}
bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock& l)
+ const Mutex::ScopedLock& )
{
- // KAG TODO: FIX!!!
- return MessageAllocator::nextMessage( c, next, l );
+ Messages& messages(queue->getMessages());
+
+ if (messages.empty())
+ return false;
+
+ if (c->preAcquires()) { // not browsing
+ next = messages.front();
+ QueuedMessage current;
+ do {
+ current = next;
+ /** @todo KAG: horrifingly suboptimal - optimize */
+ std::string group( getGroupId( current ) );
+ GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ if (state.owner.empty()) {
+ state.owner = c->getName();
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << c->getName() << " has acquired group id=" << group);
+ return true;
+ }
+ if (state.owner == c->getName()) {
+ return true;
+ }
+ } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */
+ return false;
+ } else if (messages.next(c->position, next))
+ return true;
+ return false;
}
bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage& qm,
const Mutex::ScopedLock&)
{
- std::string group( getGroupId(qm, groupIdHeader) );
- std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
if (state.owner.empty()) {
state.owner = consumer;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << consumer << " has acquired group id=" << gs->first);
return true;
}
return state.owner == consumer;
}
+namespace {
+ const std::string GroupQueryKey("qpid.message_group_queue");
+ const std::string GroupHeaderKey("group_header_key");
+ const std::string GroupStateKey("group_state");
+ const std::string GroupIdKey("group_id");
+ const std::string GroupMsgCount("msg_count");
+ const std::string GroupTimestamp("timestamp");
+ const std::string GroupConsumer("consumer");
+}
+
+void MessageGroupManager::query(qpid::types::Variant::Map& status,
+ const Mutex::ScopedLock&) const
+{
+ /** Add a description of the current state of the message groups for this queue.
+ FORMAT:
+ { "qpid.message_group_queue":
+ { "group_header_key" : "<KEY>",
+ "group_state" :
+ [ { "group_id" : "<name>",
+ "msg_count" : <int>,
+ "timestamp" : <absTime>,
+ "consumer" : <consumer name> },
+ {...} // one for each known group
+ ]
+ }
+ }
+ **/
+
+ assert(status.find(GroupQueryKey) == status.end());
+ qpid::types::Variant::Map state;
+ qpid::types::Variant::List groups;
+
+ state[GroupHeaderKey] = groupIdHeader;
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+ qpid::types::Variant::Map info;
+ info[GroupIdKey] = g->first;
+ info[GroupMsgCount] = g->second.total;
+ info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+ info[GroupConsumer] = g->second.owner;
+ groups.push_back(info);
+ }
+ state[GroupStateKey] = groups;
+ status[GroupQueryKey] = state;
+}
+
+bool MessageGroupManager::match(const qpid::types::Variant::Map* filter,
+ const QueuedMessage& message) const
+{
+ if (!filter) return true;
+ qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter );
+ if (i == filter->end()) return true;
+ if (i->second.asString() == getGroupId(message)) return true;
+ return false;
+}
+
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
+ const qpid::framing::FieldTable& settings )
+{
+ boost::shared_ptr<MessageGroupManager> empty;
+
+ if (settings.isSet(qpidMessageGroupKey)) {
+
+ Queue::Disposition qt = q->getDisposition();
+
+ if (qt == Queue::LVQ) {
+ QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName());
+ return empty;
+ }
+ if (qt == Queue::PRIORITY) {
+ QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName());
+ return empty;
+ }
+ std::string headerKey = settings.getAsString(qpidMessageGroupKey);
+ if (headerKey.empty()) {
+ QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
+ return empty;
+ }
+ unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
+
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) );
+
+ q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
+
+ QPID_LOG( debug, "Configured Queue '" << q->getName() <<
+ "' for message grouping using header key '" << headerKey << "'" <<
+ " (timestamp=" << timestamp << ")");
+ return manager;
+ }
+ return empty;
+}
@@ -1526,5 +1756,13 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&,
}
+// default match - ignore filter and always match.
+bool MessageAllocator::match(const qpid::types::Variant::Map*,
+ const QueuedMessage&) const
+{
+ return true;
+}
+
+