summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-09-07 18:23:01 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-09-07 18:23:01 +0000
commit9a5a8e164ef91abdcf5be637035ddb3f25444b71 (patch)
tree132f4a97d2734b6a47602d5a38585b598276beb3
parentd394ed70098af4bd8dc7a2012470387d85a305ed (diff)
downloadqpid-python-9a5a8e164ef91abdcf5be637035ddb3f25444b71.tar.gz
QPID-3346: move message group code into its own files.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1166299 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am4
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAllocator.cpp60
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAllocator.h79
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp307
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h116
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp438
6 files changed, 571 insertions, 433 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 4e13e9ad9c..9533053473 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -671,6 +671,10 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.cpp \
qpid/broker/Vhost.h \
+ qpid/broker/MessageAllocator.h \
+ qpid/broker/MessageAllocator.cpp \
+ qpid/broker/MessageGroupManager.cpp \
+ qpid/broker/MessageGroupManager.h \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementAgent.h \
qpid/management/ManagementDirectExchange.cpp \
diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.cpp b/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
new file mode 100644
index 0000000000..62d5402b0a
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Used by queues to allocate the next "most desirable" message to a consuming client */
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageAllocator.h"
+
+using namespace qpid::broker;
+
+bool MessageAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next,
+ const sys::Mutex::ScopedLock&)
+{
+ Messages& messages(queue->getMessages());
+ if (!messages.empty()) {
+ next = messages.front(); // by default, consume oldest msg
+ return true;
+ }
+ return false;
+}
+
+bool MessageAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
+ const sys::Mutex::ScopedLock&)
+{
+ Messages& messages(queue->getMessages());
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+}
+
+
+bool MessageAllocator::acquirable( const std::string&,
+ const QueuedMessage&,
+ const sys::Mutex::ScopedLock&)
+{
+ return true;
+}
+
+void MessageAllocator::query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const
+{
+}
+
diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.h b/qpid/cpp/src/qpid/broker/MessageAllocator.h
new file mode 100644
index 0000000000..4daf7f2174
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageAllocator.h
@@ -0,0 +1,79 @@
+#ifndef _broker_MessageAllocator_h
+#define _broker_MessageAllocator_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Used by queues to allocate the next "most desirable" message to a consuming client */
+
+
+#include "qpid/broker/Consumer.h"
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+class QueuedMessage;
+
+class MessageAllocator
+{
+ protected:
+ Queue *queue;
+ public:
+ MessageAllocator( Queue *q ) : queue(q) {}
+ virtual ~MessageAllocator() {};
+
+ // Note: all methods taking a mutex assume the caller is holding the
+ // Queue::messageLock during the method call.
+
+ /** Determine the next message available for consumption by the consumer
+ * @param next set to the next message that the consumer may acquire.
+ * @return true if message is available
+ */
+ virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
+ QueuedMessage& next,
+ const sys::Mutex::ScopedLock& lock);
+
+ /** Determine the next message available for browsing by the consumer
+ * @param next set to the next message that the consumer may browse.
+ * @return true if a message is available
+ */
+ virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
+ QueuedMessage& next,
+ const sys::Mutex::ScopedLock& lock);
+
+ /** check if a message previously returned via next*Message() may be acquired.
+ * @param consumer name of consumer that is attempting to acquire the message
+ * @param qm the message to be acquired
+ * @param messageLock - ensures caller is holding it!
+ * @return true if acquire is permitted, false if acquire is no longer permitted.
+ */
+ virtual bool acquirable( const std::string&,
+ const QueuedMessage&,
+ const sys::Mutex::ScopedLock&);
+
+ /** hook to add any interesting management state to the status map */
+ virtual void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const;
+};
+
+}}
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
new file mode 100644
index 0000000000..4fc142f553
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -0,0 +1,307 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageGroupManager.h"
+
+using namespace qpid::broker;
+
+namespace {
+ const std::string GroupQueryKey("qpid.message_group_queue");
+ const std::string GroupHeaderKey("group_header_key");
+ const std::string GroupStateKey("group_state");
+ const std::string GroupIdKey("group_id");
+ const std::string GroupMsgCount("msg_count");
+ const std::string GroupTimestamp("timestamp");
+ const std::string GroupConsumer("consumer");
+}
+
+
+const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
+
+
+const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+{
+ const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+ if (!headers) return qpidMessageGroupDefault;
+ qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
+ return id->get<std::string>();
+}
+
+
+void MessageGroupManager::enqueued( const QueuedMessage& qm )
+{
+ // @todo KAG optimization - store reference to group state in QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupState &state(messageGroups[group]);
+ state.members.push_back(qm.position);
+ uint32_t total = state.members.size();
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": added message to group id=" << group << " total=" << total );
+ if (total == 1) {
+ // newly created group, no owner
+ state.group = group;
+#ifdef NDEBUG
+ freeGroups[qm.position] = &state;
+#else
+ bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
+ (void) unique; assert(unique);
+#endif
+ }
+}
+
+
+void MessageGroupManager::acquired( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ state.acquired += 1;
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": acquired message in group id=" << group << " acquired=" << state.acquired );
+}
+
+
+void MessageGroupManager::requeued( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ // @todo KAG BUG - how to ensure requeue happens in the correct order?
+ // @todo KAG BUG - if requeue is not in correct order - what do we do? throw?
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ }
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": requeued message to group id=" << group << " acquired=" << state.acquired );
+}
+
+
+void MessageGroupManager::dequeued( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.members.size() != 0 );
+
+ // likely to be at or near begin() if dequeued in order
+ {
+ GroupState::PositionFifo::iterator pos = state.members.begin();
+ GroupState::PositionFifo::iterator end = state.members.end();
+ while (pos != end) {
+ if (*pos == qm.position) {
+ state.members.erase(pos);
+ break;
+ }
+ ++pos;
+ }
+ }
+
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ uint32_t total = state.members.size();
+ if (total == 0) {
+ if (!state.owned()) { // unlikely, but need to remove from the free list before erase
+ unFree( state );
+ }
+ QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first);
+ messageGroups.erase( gs );
+ } else {
+ if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ }
+ }
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": dequeued message from group id=" << group << " total=" << total );
+}
+
+void MessageGroupManager::consumerAdded( const Consumer& c )
+{
+ assert(consumers.find(c.getName()) == consumers.end());
+ consumers[c.getName()] = 0; // no groups owned yet
+ QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() );
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c )
+{
+ const std::string& name(c.getName());
+ Consumers::iterator consumer = consumers.find(name);
+ assert(consumer != consumers.end());
+ size_t count = consumer->second;
+
+ for (GroupMap::iterator gs = messageGroups.begin();
+ count && gs != messageGroups.end(); ++gs) {
+
+ GroupState& state( gs->second );
+ if (state.owner == name) {
+ --count;
+ disown(state);
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << name << " released group id=" << gs->first);
+ }
+ }
+ consumers.erase( consumer );
+ QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name );
+}
+
+
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
+ const qpid::sys::Mutex::ScopedLock& )
+{
+ Messages& messages(queue->getMessages());
+
+ if (messages.empty())
+ return false;
+
+ if (!freeGroups.empty()) {
+ framing::SequenceNumber nextFree = freeGroups.begin()->first;
+ if (nextFree < c->position) { // next free group's msg is older than current position
+ bool ok = messages.find(nextFree, next);
+ (void) ok; assert( ok );
+ } else {
+ if (!messages.next( c->position, next ))
+ return false; // shouldn't happen - should find nextFree
+ }
+ } else { // no free groups available
+ if (consumers[c->getName()] == 0) { // and none currently owned
+ return false; // so nothing available to consume
+ }
+ if (!messages.next( c->position, next ))
+ return false;
+ }
+
+ do {
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId( next ) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ if (!state.owned() || state.owner == c->getName()) {
+ return true;
+ }
+ } while (messages.next( next.position, next ));
+ return false;
+}
+
+
+bool MessageGroupManager::acquirable(const std::string& consumer, const QueuedMessage& qm,
+ const qpid::sys::Mutex::ScopedLock&)
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+
+ if (!state.owned()) {
+ own( state, consumer );
+ QPID_LOG( trace, "group queue " << queue->getName() <<
+ ": consumer name=" << consumer << " has acquired group id=" << gs->first);
+ return true;
+ }
+ return state.owner == consumer;
+}
+
+
+void MessageGroupManager::query(qpid::types::Variant::Map& status,
+ const qpid::sys::Mutex::ScopedLock&) const
+{
+ /** Add a description of the current state of the message groups for this queue.
+ FORMAT:
+ { "qpid.message_group_queue":
+ { "group_header_key" : "<KEY>",
+ "group_state" :
+ [ { "group_id" : "<name>",
+ "msg_count" : <int>,
+ "timestamp" : <absTime>,
+ "consumer" : <consumer name> },
+ {...} // one for each known group
+ ]
+ }
+ }
+ **/
+
+ assert(status.find(GroupQueryKey) == status.end());
+ qpid::types::Variant::Map state;
+ qpid::types::Variant::List groups;
+
+ state[GroupHeaderKey] = groupIdHeader;
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+ qpid::types::Variant::Map info;
+ info[GroupIdKey] = g->first;
+ info[GroupMsgCount] = g->second.members.size();
+ info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+ info[GroupConsumer] = g->second.owner;
+ groups.push_back(info);
+ }
+ state[GroupStateKey] = groups;
+ status[GroupQueryKey] = state;
+}
+
+
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
+ const qpid::framing::FieldTable& settings )
+{
+ boost::shared_ptr<MessageGroupManager> empty;
+
+ if (settings.isSet(qpidMessageGroupKey)) {
+
+ std::string headerKey = settings.getAsString(qpidMessageGroupKey);
+ if (headerKey.empty()) {
+ QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
+ return empty;
+ }
+ unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
+
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) );
+
+ q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
+
+ QPID_LOG( debug, "Configured Queue '" << q->getName() <<
+ "' for message grouping using header key '" << headerKey << "'" <<
+ " (timestamp=" << timestamp << ")");
+ return manager;
+ }
+ return empty;
+}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
new file mode 100644
index 0000000000..0a1551f3ba
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -0,0 +1,116 @@
+#ifndef _broker_MessageGroupManager_h
+#define _broker_MessageGroupManager_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* for managing message grouping on Queues */
+
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/MessageAllocator.h"
+
+
+namespace qpid {
+namespace broker {
+
+class QueueObserver;
+class MessageAllocator;
+
+class MessageGroupManager : public QueueObserver, public MessageAllocator
+{
+ const std::string groupIdHeader; // msg header holding group identifier
+ const unsigned int timestamp; // mark messages with timestamp if set
+
+ struct GroupState {
+ typedef std::list<framing::SequenceNumber> PositionFifo;
+
+ std::string group; // group identifier
+ std::string owner; // consumer with outstanding acquired messages
+ uint32_t acquired; // count of outstanding acquired messages
+ //uint32_t total; // count of enqueued messages in this group
+ PositionFifo members; // msgs belonging to this group
+
+ GroupState() : acquired(0) {}
+ bool owned() const {return !owner.empty();}
+ };
+ typedef std::map<std::string, struct GroupState> GroupMap;
+ typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
+ typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+
+ GroupMap messageGroups; // index: group name
+ GroupFifo freeGroups; // ordered by oldest free msg
+ Consumers consumers; // index: consumer name
+
+ static const std::string qpidMessageGroupKey;
+ static const std::string qpidMessageGroupTimestamp;
+ static const std::string qpidMessageGroupDefault;
+
+ const std::string getGroupId( const QueuedMessage& qm ) const;
+ void unFree( const GroupState& state )
+ {
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+ }
+ void own( GroupState& state, const std::string& owner )
+ {
+ state.owner = owner;
+ consumers[state.owner]++;
+ unFree( state );
+ }
+ void disown( GroupState& state )
+ {
+ assert(consumers[state.owner]);
+ consumers[state.owner]--;
+ state.owner.clear();
+ assert(state.members.size());
+#ifdef NDEBUG
+ freeGroups[state.members.front()] = &state;
+#else
+ bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
+ (void) unique; assert(unique);
+#endif
+ }
+
+ public:
+
+ static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings );
+
+ MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 )
+ : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {}
+ void enqueued( const QueuedMessage& qm );
+ void acquired( const QueuedMessage& qm );
+ void requeued( const QueuedMessage& qm );
+ void dequeued( const QueuedMessage& qm );
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
+ bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
+ const sys::Mutex::ScopedLock&);
+ // uses default nextBrowsableMessage()
+ bool acquirable(const std::string& consumer, const QueuedMessage& msg,
+ const sys::Mutex::ScopedLock&);
+ void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const;
+ bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+};
+
+}}
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 12ecfa882e..224446adb2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -33,6 +33,8 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/MessageAllocator.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -88,152 +90,6 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-
-// KAG TBD: find me a home....
-namespace qpid {
-namespace broker {
-
-class MessageAllocator
-{
- protected:
- Queue *queue;
- public:
- MessageAllocator( Queue *q ) : queue(q) {}
- virtual ~MessageAllocator() {};
-
- // Note: all methods taking a mutex assume the caller is holding the
- // Queue::messageLock during the method call.
-
- /** Determine the next message available for consumption by the consumer
- * @param next set to the next message that the consumer may acquire.
- * @return true if message is available
- */
- virtual bool nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next,
- const Mutex::ScopedLock&)
- {
- Messages& messages(queue->getMessages());
- if (!messages.empty()) {
- next = messages.front(); // by default, consume oldest msg
- return true;
- }
- return false;
- }
- /** Determine the next message available for browsing by the consumer
- * @param next set to the next message that the consumer may browse.
- * @return true if a message is available
- */
- virtual bool nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
- const Mutex::ScopedLock&)
- {
- Messages& messages(queue->getMessages());
- if (!messages.empty() && messages.next(c->position, next))
- return true;
- return false;
- }
- /** acquire a message previously returned via next*Message().
- * @param consumer name of consumer that is attempting to acquire the message
- * @param qm the message to be acquired
- * @param messageLock - ensures caller is holding it!
- * @returns true if acquire is successful, false if acquire failed.
- */
- virtual bool acquireMessage( const std::string&, const QueuedMessage&,
- const Mutex::ScopedLock&)
- {
- return true;
- }
-
- /** hook to add any interesting management state to the status map */
- virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
-};
-
-
-
-class MessageGroupManager : public QueueObserver, public MessageAllocator
-{
- const std::string groupIdHeader; // msg header holding group identifier
- const unsigned int timestamp; // mark messages with timestamp if set
-
- struct GroupState {
- typedef std::list<framing::SequenceNumber> PositionFifo;
-
- std::string group; // group identifier
- std::string owner; // consumer with outstanding acquired messages
- uint32_t acquired; // count of outstanding acquired messages
- //uint32_t total; // count of enqueued messages in this group
- PositionFifo members; // msgs belonging to this group
-
- GroupState() : acquired(0) {}
- bool owned() const {return !owner.empty();}
- };
- typedef std::map<std::string, struct GroupState> GroupMap;
- typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
- typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
-
- GroupMap messageGroups; // index: group name
- GroupFifo freeGroups; // ordered by oldest free msg
- Consumers consumers; // index: consumer name
-
- static const std::string qpidMessageGroupKey;
- static const std::string qpidMessageGroupTimestamp;
- static const std::string qpidMessageGroupDefault;
-
- const std::string getGroupId( const QueuedMessage& qm ) const;
- void unFree( const GroupState& state )
- {
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
- assert( pos != freeGroups.end() && pos->second == &state );
- freeGroups.erase( pos );
- }
- void own( GroupState& state, const std::string& owner )
- {
- state.owner = owner;
- consumers[state.owner]++;
- unFree( state );
- }
- void disown( GroupState& state )
- {
- assert(consumers[state.owner]);
- consumers[state.owner]--;
- state.owner.clear();
- assert(state.members.size());
-#ifdef NDEBUG
- freeGroups[state.members.front()] = &state;
-#else
- bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
- (void) unique; assert(unique);
-#endif
- }
-
- public:
-
- static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings );
-
- MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 )
- : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {}
- void enqueued( const QueuedMessage& qm );
- void acquired( const QueuedMessage& qm );
- void requeued( const QueuedMessage& qm );
- void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& );
- void consumerRemoved( const Consumer& );
- bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
- const Mutex::ScopedLock&);
- // uses default nextBrowsableMessage()
- bool acquireMessage(const std::string& consumer, const QueuedMessage& msg,
- const Mutex::ScopedLock&);
- void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const;
- bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
-};
-
-const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
-const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
-
-}}
-// KAG TBD: END find me a home....
-
-
-
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -400,7 +256,7 @@ bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
assertClusterSafe();
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
- if (!allocator->acquireMessage( consumer, msg, locker )) {
+ if (!allocator->acquirable( consumer, msg, locker )) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
return false;
}
@@ -447,7 +303,6 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
-
while (true) {
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
@@ -471,7 +326,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
- bool ok = allocator->acquireMessage( c->getName(), msg, locker ); // inform allocator
+ bool ok = allocator->acquirable( c->getName(), msg, locker ); // inform allocator
(void) ok; assert(ok);
ok = acquire( msg.position, msg );
(void) ok; assert(ok);
@@ -1071,6 +926,7 @@ void Queue::create(const FieldTable& _settings)
configureImpl(_settings);
}
+
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
@@ -1584,287 +1440,3 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
-
-
-// KAG TBD: flesh out...
-
-
-
-
-const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
-{
- const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
- if (!headers) return qpidMessageGroupDefault;
- FieldTable::ValuePtr id = headers->get( groupIdHeader );
- if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
- return id->get<std::string>();
-}
-
-
-void MessageGroupManager::enqueued( const QueuedMessage& qm )
-{
- // @todo KAG optimization - store reference to group state in QueuedMessage
- // issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupState &state(messageGroups[group]);
- state.members.push_back(qm.position);
- uint32_t total = state.members.size();
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": added message to group id=" << group << " total=" << total );
- if (total == 1) {
- // newly created group, no owner
- state.group = group;
-#ifdef NDEBUG
- freeGroups[qm.position] = &state;
-#else
- bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
- (void) unique; assert(unique);
-#endif
- }
-}
-
-
-void MessageGroupManager::acquired( const QueuedMessage& qm )
-{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- state.acquired += 1;
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": acquired message in group id=" << group << " acquired=" << state.acquired );
-}
-
-
-void MessageGroupManager::requeued( const QueuedMessage& qm )
-{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
- // @todo KAG BUG - how to ensure requeue happens in the correct order?
- // @todo KAG BUG - if requeue is not in correct order - what do we do? throw?
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- assert( state.acquired != 0 );
- state.acquired -= 1;
- if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": consumer name=" << state.owner << " released group id=" << gs->first);
- disown(state);
- }
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": requeued message to group id=" << group << " acquired=" << state.acquired );
-}
-
-
-void MessageGroupManager::dequeued( const QueuedMessage& qm )
-{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- assert( state.members.size() != 0 );
-
- // likely to be at or near begin() if dequeued in order
- {
- GroupState::PositionFifo::iterator pos = state.members.begin();
- GroupState::PositionFifo::iterator end = state.members.end();
- while (pos != end) {
- if (*pos == qm.position) {
- state.members.erase(pos);
- break;
- }
- ++pos;
- }
- }
-
- assert( state.acquired != 0 );
- state.acquired -= 1;
- uint32_t total = state.members.size();
- if (total == 0) {
- if (!state.owned()) { // unlikely, but need to remove from the free list before erase
- unFree( state );
- }
- QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first);
- messageGroups.erase( gs );
- } else {
- if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": consumer name=" << state.owner << " released group id=" << gs->first);
- disown(state);
- }
- }
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": dequeued message from group id=" << group << " total=" << total );
-}
-
-void MessageGroupManager::consumerAdded( const Consumer& c )
-{
- assert(consumers.find(c.getName()) == consumers.end());
- consumers[c.getName()] = 0; // no groups owned yet
- QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() );
-}
-
-void MessageGroupManager::consumerRemoved( const Consumer& c )
-{
- const std::string& name(c.getName());
- Consumers::iterator consumer = consumers.find(name);
- assert(consumer != consumers.end());
- size_t count = consumer->second;
-
- for (GroupMap::iterator gs = messageGroups.begin();
- count && gs != messageGroups.end(); ++gs) {
-
- GroupState& state( gs->second );
- if (state.owner == name) {
- --count;
- disown(state);
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": consumer name=" << name << " released group id=" << gs->first);
- }
- }
- consumers.erase( consumer );
- QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name );
-}
-
-
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
- const Mutex::ScopedLock& )
-{
- Messages& messages(queue->getMessages());
-
- if (messages.empty())
- return false;
-
- if (!freeGroups.empty()) {
- framing::SequenceNumber nextFree = freeGroups.begin()->first;
- if (nextFree < c->position) { // next free group's msg is older than current position
- bool ok = messages.find(nextFree, next);
- (void) ok; assert( ok );
- } else {
- if (!messages.next( c->position, next ))
- return false; // shouldn't happen - should find nextFree
- }
- } else { // no free groups available
- if (consumers[c->getName()] == 0) { // and none currently owned
- return false; // so nothing available to consume
- }
- if (!messages.next( c->position, next ))
- return false;
- }
-
- do {
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- std::string group( getGroupId( next ) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- if (!state.owned() || state.owner == c->getName()) {
- return true;
- }
- } while (messages.next( next.position, next ));
- return false;
-}
-
-
-bool MessageGroupManager::acquireMessage(const std::string& consumer, const QueuedMessage& qm,
- const Mutex::ScopedLock&)
-{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
-
- if (!state.owned()) {
- own( state, consumer );
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": consumer name=" << consumer << " has acquired group id=" << gs->first);
- return true;
- }
- return state.owner == consumer;
-}
-
-namespace {
- const std::string GroupQueryKey("qpid.message_group_queue");
- const std::string GroupHeaderKey("group_header_key");
- const std::string GroupStateKey("group_state");
- const std::string GroupIdKey("group_id");
- const std::string GroupMsgCount("msg_count");
- const std::string GroupTimestamp("timestamp");
- const std::string GroupConsumer("consumer");
-}
-
-void MessageGroupManager::query(qpid::types::Variant::Map& status,
- const Mutex::ScopedLock&) const
-{
- /** Add a description of the current state of the message groups for this queue.
- FORMAT:
- { "qpid.message_group_queue":
- { "group_header_key" : "<KEY>",
- "group_state" :
- [ { "group_id" : "<name>",
- "msg_count" : <int>,
- "timestamp" : <absTime>,
- "consumer" : <consumer name> },
- {...} // one for each known group
- ]
- }
- }
- **/
-
- assert(status.find(GroupQueryKey) == status.end());
- qpid::types::Variant::Map state;
- qpid::types::Variant::List groups;
-
- state[GroupHeaderKey] = groupIdHeader;
- for (GroupMap::const_iterator g = messageGroups.begin();
- g != messageGroups.end(); ++g) {
- qpid::types::Variant::Map info;
- info[GroupIdKey] = g->first;
- info[GroupMsgCount] = g->second.members.size();
- info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
- info[GroupConsumer] = g->second.owner;
- groups.push_back(info);
- }
- state[GroupStateKey] = groups;
- status[GroupQueryKey] = state;
-}
-
-
-boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
- const qpid::framing::FieldTable& settings )
-{
- boost::shared_ptr<MessageGroupManager> empty;
-
- if (settings.isSet(qpidMessageGroupKey)) {
-
- std::string headerKey = settings.getAsString(qpidMessageGroupKey);
- if (headerKey.empty()) {
- QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
- return empty;
- }
- unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
-
- boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) );
-
- q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
-
- QPID_LOG( debug, "Configured Queue '" << q->getName() <<
- "' for message grouping using header key '" << headerKey << "'" <<
- " (timestamp=" << timestamp << ")");
- return manager;
- }
- return empty;
-}
-
-
-
-
-
-