diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:17:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:17:14 +0000 |
commit | c203071366928116c7bb1609091957d1526330e6 (patch) | |
tree | 6079b5ff072529de5db68ae3304c568c72fb4878 | |
parent | 89966c062545b89b9d88e473d42c6e74cdd588df (diff) | |
download | qpid-python-c203071366928116c7bb1609091957d1526330e6.tar.gz |
QPID-3603: Fix update of acquired messages.
The changes to keep acquired messages on the queue broke replication
of acquired messages. Fix this to put acquired messages into the
MessageDeque correctly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245555 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Messages.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuedMessage.h | 22 |
5 files changed, 54 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp index 9f874e4c9a..709d99876b 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -123,17 +123,18 @@ bool MessageDeque::consume(QueuedMessage& message) return false; } +namespace { +QueuedMessage padding(uint32_t pos) { + return QueuedMessage(0, 0, pos, QueuedMessage::DELETED); +} +} // namespace + bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { //add padding to prevent gaps in sequence, which break the index //calculation (needed for queue replication) - while (messages.size() && (added.position - messages.back().position) > 1) { - QueuedMessage dummy; - dummy.position = messages.back().position + 1; - dummy.status = QueuedMessage::DELETED; - messages.push_back(dummy); - QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position); - } + while (messages.size() && (added.position - messages.back().position) > 1) + messages.push_back(padding(messages.back().position + 1)); messages.push_back(added); messages.back().status = QueuedMessage::AVAILABLE; if (head >= messages.size()) head = messages.size() - 1; @@ -141,6 +142,27 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed* return false;//adding a message never causes one to be removed for deque } +void MessageDeque::updateAcquired(const QueuedMessage& acquired) +{ + // Pad the front of the queue if necessary + while (messages.size() && (acquired.position < messages.front().position)) + messages.push_front(padding(uint32_t(messages.front().position) - 1)); + size_t i = index(acquired.position); + if (i < messages.size()) { // Replace an existing padding message + assert(messages[i].status == QueuedMessage::DELETED); + messages[i] = acquired; + messages[i].status = QueuedMessage::ACQUIRED; + } + else { // Push to the back + // Pad the back of the queue if necessary + while (messages.size() && (acquired.position - messages.back().position) > 1) + messages.push_back(padding(messages.back().position + 1)); + assert(!messages.size() || (acquired.position - messages.back().position) == 1); + messages.push_back(acquired); + messages.back().status = QueuedMessage::ACQUIRED; + } +} + void MessageDeque::clean() { while (messages.size() && messages.front().status == QueuedMessage::DELETED) { diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h index 4d3a5dcdd5..bb5943b09b 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.h +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -43,6 +43,7 @@ class MessageDeque : public Messages bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); + void updateAcquired(const QueuedMessage& acquired); void foreach(Functor); void removeIf(Predicate); diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h index 89f6d383ae..61e9fa110a 100644 --- a/qpid/cpp/src/qpid/broker/Messages.h +++ b/qpid/cpp/src/qpid/broker/Messages.h @@ -93,7 +93,15 @@ class Messages virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; /** - * Apply the functor to each message held + * Add an already acquired message to the queue. + * Used by a cluster updatee to replicate acquired messages from the updater. + * Only need be implemented by subclasses that keep track of + * acquired messages. + */ + virtual void updateAcquired(const QueuedMessage&) { } + + /** + * Apply, the functor to each message held */ virtual void foreach(Functor) = 0; /** diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 0e822d3d4a..015957927f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1519,7 +1519,8 @@ void Queue::updateEnqueued(const QueuedMessage& m) { if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; - enqueue ( 0, payload, true ); + enqueue(0, payload, true); + messages->updateAcquired(m); if (policy.get()) { policy->recoverEnqueued(payload); } diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h index 051ade41ea..806da8e720 100644 --- a/qpid/cpp/src/qpid/broker/QueuedMessage.h +++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,16 +32,20 @@ struct QueuedMessage { boost::intrusive_ptr<Message> payload; framing::SequenceNumber position; - enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status; + typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status; + Status status; Queue* queue; - QueuedMessage() : queue(0) {} - QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : - payload(msg), position(sn), queue(q) {} - QueuedMessage(Queue* q) : queue(q) {} - + QueuedMessage(Queue* q=0, + boost::intrusive_ptr<Message> msg=0, + framing::SequenceNumber sn=0, + Status st=AVAILABLE + ) : payload(msg), position(sn), status(st), queue(q) {} }; - inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } + +inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { + return a.position < b.position; +} }} |