summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageGroupManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp110
1 files changed, 0 insertions, 110 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
index 47e40a4794..c083e4ee0f 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -302,19 +302,6 @@ void MessageGroupManager::setDefaults(const std::string& groupId) // static
defaultGroupId = groupId;
}
-/** Cluster replication:
-
- state map format:
-
- { "group-state": [ {"name": <group-name>,
- "owner": <consumer-name>-or-empty,
- "acquired-ct": <acquired count>,
- "positions": [Seqnumbers, ... ]},
- {...}
- ]
- }
-*/
-
namespace {
const std::string GROUP_NAME("name");
const std::string GROUP_OWNER("owner");
@@ -324,100 +311,3 @@ namespace {
const std::string GROUP_STATE("group-state");
}
-
-/** Runs on UPDATER to snapshot current state */
-void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
-{
- using namespace qpid::framing;
- state.clear();
- framing::Array groupState(TYPE_CODE_MAP);
- for (GroupMap::const_iterator g = messageGroups.begin();
- g != messageGroups.end(); ++g) {
-
- framing::FieldTable group;
- group.setString(GROUP_NAME, g->first);
- group.setString(GROUP_OWNER, g->second.owner);
- group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
- framing::Array positions(TYPE_CODE_UINT32);
- framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
- for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
- p != g->second.members.end(); ++p) {
- positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
- acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
- }
- group.setArray(GROUP_POSITIONS, positions);
- group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
- groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
- }
- state.setArray(GROUP_STATE, groupState);
-
- QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
-{
- using namespace qpid::framing;
- messageGroups.clear();
- freeGroups.clear();
- cachedGroup = 0;
-
- framing::Array groupState(TYPE_CODE_MAP);
-
- bool ok = state.getArray(GROUP_STATE, groupState);
- if (!ok) {
- QPID_LOG(error, "Unable to find message group state information for queue \"" <<
- qName << "\": cluster inconsistency error!");
- return;
- }
-
- for (framing::Array::const_iterator g = groupState.begin();
- g != groupState.end(); ++g) {
- framing::FieldTable group;
- ok = framing::getEncodedValue<FieldTable>(*g, group);
- if (!ok) {
- QPID_LOG(error, "Invalid message group state information for queue \"" <<
- qName << "\": table encoding error!");
- return;
- }
- MessageGroupManager::GroupState state;
- if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
- QPID_LOG(error, "Invalid message group state information for queue \"" <<
- qName << "\": fields missing error!");
- return;
- }
- state.group = group.getAsString(GROUP_NAME);
- state.owner = group.getAsString(GROUP_OWNER);
- state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
- framing::Array positions(TYPE_CODE_UINT32);
- ok = group.getArray(GROUP_POSITIONS, positions);
- if (!ok) {
- QPID_LOG(error, "Invalid message group state information for queue \"" <<
- qName << "\": position encoding error!");
- return;
- }
- framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
- ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
- if (!ok || positions.count() != acquiredMsgs.count()) {
- QPID_LOG(error, "Invalid message group state information for queue \"" <<
- qName << "\": acquired flag encoding error!");
- return;
- }
-
- Array::const_iterator a = acquiredMsgs.begin();
- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
- GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
- mState.acquired = (*a++)->getIntegerValue<bool>();
- state.members.push_back(mState);
- }
-
- messageGroups[state.group] = state;
- if (!state.owned()) {
- assert(state.members.size());
- freeGroups[state.members.front().position] = &messageGroups[state.group];
- }
- }
-
- QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader)
-}