summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-04-04 14:25:25 +0000
committerGordon Sim <gsim@apache.org>2011-04-04 14:25:25 +0000
commitb24711832e66d887c4d4879cc167c3ba403e120b (patch)
treedf7a6e9c2d98b63519164ac0a8898c5491fa2010
parentc8f291a1c5b869ca812e6f33968f3697f1cf30c4 (diff)
downloadqpid-python-b24711832e66d887c4d4879cc167c3ba403e120b.tar.gz
QPID-3174: remove unnecessary enqueueComplete() calls (merge of r1087868 and r1088539)
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.10@1088634 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/TxPublish.cpp9
5 files changed, 4 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 7a266fb4e3..8efa8be3dc 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -183,7 +183,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
msg->addToSyncList(shared_from_this(), store);
}
- msg->enqueueComplete(); // mark the message as enqueued
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -210,7 +209,6 @@ void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
- msg.payload->enqueueComplete(); // mark the message as enqueued
messages->reinsert(msg);
listeners.populate(copy);
@@ -632,7 +630,9 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
}
if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
+ // when it considers the message stored.
+ msg->enqueueAsync(shared_from_this(), store);
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
index 38cb8043c9..cd6735328f 100644
--- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
@@ -43,7 +43,6 @@ void RecoveredDequeue::commit() throw()
void RecoveredDequeue::rollback() throw()
{
- msg->enqueueComplete();
queue->process(msg);
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
index 6263c63e3d..6d2eaee6c4 100644
--- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
@@ -36,7 +36,6 @@ bool RecoveredEnqueue::prepare(TransactionContext*) throw(){
}
void RecoveredEnqueue::commit() throw(){
- msg->enqueueComplete();
queue->process(msg);
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 6f8b125b8b..d08409695e 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -252,7 +252,6 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared
void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
{
- msg->enqueueComplete(); // recoved nmessage to enqueued in store already
buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
}
diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp
index 36a451e62c..9c2cf4a467 100644
--- a/qpid/cpp/src/qpid/broker/TxPublish.cpp
+++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp
@@ -90,14 +90,7 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
{
- if (!queue->enqueue(ctxt, msg)){
- /**
- * if not store then mark message for ack and deleivery once
- * commit happens, as async IO will never set it when no store
- * exists
- */
- msg->enqueueComplete();
- }
+ queue->enqueue(ctxt, msg);
}
TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}