summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-13 17:33:08 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-13 17:33:08 +0000
commitd9ef71a5813ea9f54928392997e48e5205b4c4f6 (patch)
treec79149cc8c6f6608638ec353448efe0c953d7e3c
parent927b8dfc86563f97e2ea7498f4ff8439d1c3387a (diff)
downloadqpid-python-d9ef71a5813ea9f54928392997e48e5205b4c4f6.tar.gz
QPID-3346: code cleanup - move private code out of headers, delete dead code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1183001 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp76
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h34
2 files changed, 30 insertions, 80 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index d4ca6af1d5..07b05f3b92 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,6 +43,27 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+void MessageGroupManager::unFree( const GroupState& state )
+{
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+}
+
+void MessageGroupManager::own( GroupState& state, const std::string& owner )
+{
+ state.owner = owner;
+ unFree( state );
+}
+
+void MessageGroupManager::disown( GroupState& state )
+{
+ state.owner.clear();
+ assert(state.members.size());
+ assert(freeGroups.find(state.members.front()) == freeGroups.end());
+ freeGroups[state.members.front()] = &state;
+}
+
const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
{
const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
@@ -155,51 +176,6 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
": dequeued message from group id=" << group << " total=" << total );
}
-void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
-{
-#if 0
- // allow a re-subscribing consumer
- if (consumers.find(c.getName()) == consumers.end()) {
- consumers[c.getName()] = 0; // no groups owned yet
- QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
- } else {
- QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() );
- }
-#endif
-}
-
-void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
-{
-#if 0
- const std::string& name(c.getName());
- Consumers::iterator consumer = consumers.find(name);
- assert(consumer != consumers.end());
- size_t count = consumer->second;
-
- for (GroupMap::iterator gs = messageGroups.begin();
- count && gs != messageGroups.end(); ++gs) {
-
- GroupState& state( gs->second );
- if (state.owner == name) {
- if (state.acquired == 0) {
- --count;
- disown(state);
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << name << " released group id=" << gs->first);
- }
- }
- }
- if (count == 0) {
- consumers.erase( consumer );
- QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
- } else {
- // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
- QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages.");
- }
-#endif
-}
-
-
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
if (messages.empty())
@@ -215,11 +191,6 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
return false; // shouldn't happen - should find nextFree
}
} else { // no free groups available
-#if 0
- if (consumers[c->getName()] == 0) { // and none currently owned
- return false; // so nothing available to consume
- }
-#endif
if (!messages.next( c->position, next ))
return false;
}
@@ -430,10 +401,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
- if (state.owned())
- //consumers[state.owner]++;
- ;
- else {
+ if (!state.owned()) {
assert(state.members.size());
freeGroups[state.members.front()] = &messageGroups[state.group];
}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index 35bdda94d5..6c81ec14d1 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -44,22 +44,20 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
const std::string qName; // name of parent queue (for logs)
struct GroupState {
+ // note: update getState()/setState() when changing this object's state implementation
typedef std::deque<framing::SequenceNumber> PositionFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- //uint32_t total; // count of enqueued messages in this group
PositionFifo members; // msgs belonging to this group
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
};
typedef std::map<std::string, struct GroupState> GroupMap;
- //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
- // note: update getState()/setState() when changing this object's state implementation
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg
//Consumers consumers; // index: consumer name
@@ -69,27 +67,9 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
static const std::string qpidMessageGroupTimestamp;
const std::string getGroupId( const QueuedMessage& qm ) const;
- void unFree( const GroupState& state )
- {
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
- assert( pos != freeGroups.end() && pos->second == &state );
- freeGroups.erase( pos );
- }
- void own( GroupState& state, const std::string& owner )
- {
- state.owner = owner;
- //consumers[state.owner]++;
- unFree( state );
- }
- void disown( GroupState& state )
- {
- //assert(consumers[state.owner]);
- //consumers[state.owner]--;
- state.owner.clear();
- assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
- }
+ void unFree( const GroupState& state );
+ void own( GroupState& state, const std::string& owner );
+ void disown( GroupState& state );
public:
@@ -102,12 +82,14 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
Messages& container, unsigned int _timestamp=0 )
: StatefulQueueObserver(std::string("MessageGroupManager:") + header),
groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {}
+
+ // QueueObserver iface
void enqueued( const QueuedMessage& qm );
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& );
- void consumerRemoved( const Consumer& );
+ void consumerAdded( const Consumer& ) {};
+ void consumerRemoved( const Consumer& ) {};
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);