summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:17:14 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:17:14 +0000
commitc203071366928116c7bb1609091957d1526330e6 (patch)
tree6079b5ff072529de5db68ae3304c568c72fb4878
parent89966c062545b89b9d88e473d42c6e74cdd588df (diff)
downloadqpid-python-c203071366928116c7bb1609091957d1526330e6.tar.gz
QPID-3603: Fix update of acquired messages.
The changes to keep acquired messages on the queue broke replication of acquired messages. Fix this to put acquired messages into the MessageDeque correctly. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245555 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp36
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Messages.h10
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.h22
5 files changed, 54 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index 9f874e4c9a..709d99876b 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -123,17 +123,18 @@ bool MessageDeque::consume(QueuedMessage& message)
return false;
}
+namespace {
+QueuedMessage padding(uint32_t pos) {
+ return QueuedMessage(0, 0, pos, QueuedMessage::DELETED);
+}
+} // namespace
+
bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
//add padding to prevent gaps in sequence, which break the index
//calculation (needed for queue replication)
- while (messages.size() && (added.position - messages.back().position) > 1) {
- QueuedMessage dummy;
- dummy.position = messages.back().position + 1;
- dummy.status = QueuedMessage::DELETED;
- messages.push_back(dummy);
- QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position);
- }
+ while (messages.size() && (added.position - messages.back().position) > 1)
+ messages.push_back(padding(messages.back().position + 1));
messages.push_back(added);
messages.back().status = QueuedMessage::AVAILABLE;
if (head >= messages.size()) head = messages.size() - 1;
@@ -141,6 +142,27 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*
return false;//adding a message never causes one to be removed for deque
}
+void MessageDeque::updateAcquired(const QueuedMessage& acquired)
+{
+ // Pad the front of the queue if necessary
+ while (messages.size() && (acquired.position < messages.front().position))
+ messages.push_front(padding(uint32_t(messages.front().position) - 1));
+ size_t i = index(acquired.position);
+ if (i < messages.size()) { // Replace an existing padding message
+ assert(messages[i].status == QueuedMessage::DELETED);
+ messages[i] = acquired;
+ messages[i].status = QueuedMessage::ACQUIRED;
+ }
+ else { // Push to the back
+ // Pad the back of the queue if necessary
+ while (messages.size() && (acquired.position - messages.back().position) > 1)
+ messages.push_back(padding(messages.back().position + 1));
+ assert(!messages.size() || (acquired.position - messages.back().position) == 1);
+ messages.push_back(acquired);
+ messages.back().status = QueuedMessage::ACQUIRED;
+ }
+}
+
void MessageDeque::clean()
{
while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
index 4d3a5dcdd5..bb5943b09b 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.h
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -43,6 +43,7 @@ class MessageDeque : public Messages
bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void updateAcquired(const QueuedMessage& acquired);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h
index 89f6d383ae..61e9fa110a 100644
--- a/qpid/cpp/src/qpid/broker/Messages.h
+++ b/qpid/cpp/src/qpid/broker/Messages.h
@@ -93,7 +93,15 @@ class Messages
virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
/**
- * Apply the functor to each message held
+ * Add an already acquired message to the queue.
+ * Used by a cluster updatee to replicate acquired messages from the updater.
+ * Only need be implemented by subclasses that keep track of
+ * acquired messages.
+ */
+ virtual void updateAcquired(const QueuedMessage&) { }
+
+ /**
+ * Apply, the functor to each message held
*/
virtual void foreach(Functor) = 0;
/**
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 0e822d3d4a..015957927f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1519,7 +1519,8 @@ void Queue::updateEnqueued(const QueuedMessage& m)
{
if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
- enqueue ( 0, payload, true );
+ enqueue(0, payload, true);
+ messages->updateAcquired(m);
if (policy.get()) {
policy->recoverEnqueued(payload);
}
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h
index 051ade41ea..806da8e720 100644
--- a/qpid/cpp/src/qpid/broker/QueuedMessage.h
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,16 +32,20 @@ struct QueuedMessage
{
boost::intrusive_ptr<Message> payload;
framing::SequenceNumber position;
- enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
+ typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status;
+ Status status;
Queue* queue;
- QueuedMessage() : queue(0) {}
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
- payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
-
+ QueuedMessage(Queue* q=0,
+ boost::intrusive_ptr<Message> msg=0,
+ framing::SequenceNumber sn=0,
+ Status st=AVAILABLE
+ ) : payload(msg), position(sn), status(st), queue(q) {}
};
- inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
+
+inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
+ return a.position < b.position;
+}
}}