summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp215
1 files changed, 120 insertions, 95 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 551584f9d9..853bf09a9c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -115,12 +115,6 @@ class MessageAllocator
/** hook to add any interesting management state to the status map (lock held) */
virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
-
- /** for move, purge, reroute - check if message matches against a filter,
- * return true if message matches.
- */
- virtual bool match(const qpid::types::Variant::Map* filter,
- const QueuedMessage& message) const;
};
@@ -147,7 +141,6 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
static const std::string qpidMessageGroupKey;
static const std::string qpidMessageGroupTimestamp;
static const std::string qpidMessageGroupDefault;
- static const std::string qpidMessageGroupFilter; // key for move/purge filter map
const std::string getGroupId( const QueuedMessage& qm ) const;
@@ -174,9 +167,11 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
-const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id");
}}
+// KAG TBD: END find me a home....
+
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -203,8 +198,7 @@ Queue::Queue(const string& _name, bool _autodelete,
deleted(false),
barrier(*this),
autoDeleteTimeout(0),
- allocator(new MessageAllocator( this )),
- type(FIFO)
+ allocator(new MessageAllocator( this ))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -602,24 +596,101 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
namespace {
// for use with purge/move below - collect messages that match a given filter
+ //
+ class MessageFilter
+ {
+ public:
+ static const std::string typeKey;
+ static const std::string paramsKey;
+ static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
+ virtual bool match( const QueuedMessage& ) const { return true; }
+ virtual ~MessageFilter() {}
+ protected:
+ MessageFilter() {};
+ };
+ const std::string MessageFilter::typeKey("filter_type");
+ const std::string MessageFilter::paramsKey("filter_params");
+
+ // filter by message header string value exact match
+ class HeaderMatchFilter : public MessageFilter
+ {
+ public:
+ /* Config:
+ { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "<header name>",
+ 'header_value' : "<value to match>"
+ }
+ }
+ */
+ static const std::string typeKey;
+ static const std::string headerKey;
+ static const std::string valueKey;
+ HeaderMatchFilter( const std::string& _header, const std::string& _value )
+ : MessageFilter (), header(_header), value(_value) {}
+ bool match( const QueuedMessage& msg ) const
+ {
+ const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
+ if (!headers) return false;
+ FieldTable::ValuePtr h = headers->get(header);
+ if (!h || !h->convertsTo<std::string>()) return false;
+ return h->get<std::string>() == value;
+ }
+ private:
+ const std::string header;
+ const std::string value;
+ };
+ const std::string HeaderMatchFilter::typeKey("header_match_str");
+ const std::string HeaderMatchFilter::headerKey("header_key");
+ const std::string HeaderMatchFilter::valueKey("header_value");
+
+ // factory to create correct filter based on map
+ MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
+ {
+ using namespace qpid::types;
+ if (filter) {
+ Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
+ if (i != filter->end()) {
+
+ if (i->second.asString() == HeaderMatchFilter::typeKey) {
+ Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
+ if (p != filter->end() && p->second.getType() == VAR_MAP) {
+ Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
+ Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
+ if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
+ std::string headerKey(k->second.asString());
+ std::string value(v->second.asString());
+ QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
+ return new HeaderMatchFilter( headerKey, value );
+ }
+ }
+ }
+ }
+ QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
+ }
+ return new MessageFilter();
+ }
+
+ // used by removeIf() to collect all messages matching a filter, maximum match count is
+ // optional.
struct Collector {
const uint32_t maxMatches;
- const qpid::types::Variant::Map *filter;
+ MessageFilter& filter;
std::deque<QueuedMessage> matches;
- boost::shared_ptr<MessageAllocator> allocator;
- Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m,
- const qpid::types::Variant::Map *f)
- : maxMatches(m), filter(f), allocator(a) {}
- void operator() (QueuedMessage& qm)
+ Collector(MessageFilter& filter, uint32_t max)
+ : maxMatches(max), filter(filter) {}
+ bool operator() (QueuedMessage& qm)
{
if (maxMatches == 0 || matches.size() < maxMatches) {
- if (allocator->match( filter, qm )) {
+ if (filter.match( qm )) {
matches.push_back(qm);
+ return true;
}
}
+ return false;
}
};
-}
+
+} // end namespace
/**
@@ -640,56 +711,45 @@ namespace {
uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
- Collector c(allocator, purge_request, filter);
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), purge_request);
Mutex::ScopedLock locker(messageLock);
- messages->foreach( boost::bind<void>(boost::ref(c), _1) );
-
- uint32_t count = c.matches.size();
-
- // first remove all matches
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
- qmsg != c.matches.end(); qmsg++) {
- /** @todo KAG: need a direct remove method here */
- bool ok = acquire(qmsg->position, *qmsg);
- (void) ok; assert(ok);
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ acquired(*qmsg);
dequeue(0, *qmsg);
- }
-
- // now reroute if necessary
- if (dest.get()) {
- while (!c.matches.empty()) {
- QueuedMessage msg = c.matches.front();
- c.matches.pop_front();
- assert(msg.payload);
- DeliverableMessage dmsg(msg.payload);
+ // now reroute if necessary
+ if (dest.get()) {
+ assert(qmsg->payload);
+ DeliverableMessage dmsg(qmsg->payload);
dest->routeWithAlternate(dmsg);
}
}
- return count;
+ return c.matches.size();
}
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
- Collector c(allocator, qty, filter);
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), qty);
Mutex::ScopedLock locker(messageLock);
- messages->foreach( boost::bind<void>(boost::ref(c), _1) );
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- uint32_t count = c.matches.size();
-
- while (!c.matches.empty()) {
- QueuedMessage qmsg = c.matches.front();
- c.matches.pop_front();
- /** @todo KAG: need a direct remove method here */
- bool ok = acquire(qmsg.position, qmsg);
- (void) ok; assert(ok);
- dequeue(0, qmsg);
- assert(qmsg.payload);
- destq->deliver(qmsg.payload);
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ acquired(*qmsg);
+ dequeue(0, *qmsg);
+ // and move to destination Queue.
+ assert(qmsg->payload);
+ destq->deliver(qmsg->payload);
}
- return count;
+ return c.matches.size();
}
/** Acquire the front (oldest) message from the in-memory queue.
@@ -939,7 +999,6 @@ void Queue::acquired(const QueuedMessage& msg)
}
}
-
void Queue::create(const FieldTable& _settings)
{
settings = _settings;
@@ -949,7 +1008,6 @@ void Queue::create(const FieldTable& _settings)
configureImpl(_settings);
}
-
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
@@ -1009,29 +1067,23 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
- type = LVQ;
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
- type = LVQ;
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
- type = LVQ;
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
- type = PRIORITY;
- }
- }
-
- { // override default message allocator if message groups configured.
- boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
- if (ma) {
- allocator = ma;
- type = GROUP;
+ } else { // default (FIFO) queue type
+ // override default message allocator if message groups configured.
+ boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
+ if (ma) {
+ allocator = ma;
+ }
}
}
@@ -1593,11 +1645,9 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne
if (c->preAcquires()) { // not browsing
next = messages.front();
- QueuedMessage current;
do {
- current = next;
/** @todo KAG: horrifingly suboptimal - optimize */
- std::string group( getGroupId( current ) );
+ std::string group( getGroupId( next ) );
GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
@@ -1610,7 +1660,7 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne
if (state.owner == c->getName()) {
return true;
}
- } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */
+ } while (messages.next( next.position, next )); /** @todo: .next() is a linear search from front - optimize */
return false;
} else if (messages.next(c->position, next))
return true;
@@ -1681,15 +1731,6 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status,
status[GroupQueryKey] = state;
}
-bool MessageGroupManager::match(const qpid::types::Variant::Map* filter,
- const QueuedMessage& message) const
-{
- if (!filter) return true;
- qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter );
- if (i == filter->end()) return true;
- if (i->second.asString() == getGroupId(message)) return true;
- return false;
-}
boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
const qpid::framing::FieldTable& settings )
@@ -1698,16 +1739,6 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
if (settings.isSet(qpidMessageGroupKey)) {
- Queue::Disposition qt = q->getDisposition();
-
- if (qt == Queue::LVQ) {
- QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName());
- return empty;
- }
- if (qt == Queue::PRIORITY) {
- QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName());
- return empty;
- }
std::string headerKey = settings.getAsString(qpidMessageGroupKey);
if (headerKey.empty()) {
QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
@@ -1756,12 +1787,6 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&,
}
-// default match - ignore filter and always match.
-bool MessageAllocator::match(const qpid::types::Variant::Map*,
- const QueuedMessage&) const
-{
- return true;
-}