diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-08 19:55:02 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-08 19:55:02 +0000 |
commit | e11fb93fdbffee2e74d6c1c06d645de2b144dc4a (patch) | |
tree | b862fcb881f00cb4f08fd849ab60b5d34a07a0db | |
parent | 69efc23e4a31d88c95f5457a1a0a9caa577c4e64 (diff) | |
download | qpid-python-e11fb93fdbffee2e74d6c1c06d645de2b144dc4a.tar.gz |
QPID-3079: fixes that allow persistence unit tests to pass
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1133535 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableQueue.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/RecoverableQueue.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 75 | ||||
-rw-r--r-- | qpid/cpp/src/tests/AsyncCompletion.cpp | 49 |
7 files changed, 112 insertions, 51 deletions
diff --git a/qpid/cpp/src/qpid/broker/PersistableQueue.h b/qpid/cpp/src/qpid/broker/PersistableQueue.h index 3514c7eb53..7b0c313f50 100644 --- a/qpid/cpp/src/qpid/broker/PersistableQueue.h +++ b/qpid/cpp/src/qpid/broker/PersistableQueue.h @@ -56,24 +56,20 @@ public: typedef boost::shared_ptr<PersistableQueue> shared_ptr; virtual const std::string& getName() const = 0; - virtual ~PersistableQueue() { - if (externalQueueStore) - delete externalQueueStore; - }; + virtual ~PersistableQueue() {}; - virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; + virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst) = 0; virtual void flush() = 0; - inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}; + inline boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const {return externalQueueStore;}; - PersistableQueue():externalQueueStore(NULL){ - }; + PersistableQueue() {}; /** the message has finished being dequeued from the store */ virtual void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>&) = 0; protected: - ExternalQueueStore* externalQueueStore; + boost::shared_ptr<ExternalQueueStore> externalQueueStore; }; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 04c496cbd5..bde539f9ec 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -684,6 +684,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg, Mutex::ScopedLock locker(messageLock); pendingDequeueCallbacks[pmsg.get()] = callback; } + QPID_LOG(debug, "Message " << pmsg << " async dequeue started on queue " << name); store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done return true; } @@ -1041,9 +1042,8 @@ bool Queue::hasExclusiveConsumer() const return exclusive; } -void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; +void Queue::setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst) +{ externalQueueStore = inst; if (inst) { @@ -1215,6 +1215,10 @@ const Broker* Queue::getBroker() * message has completed. */ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg ) { + ScopedUse u(barrier); + if (!u.acquired) return; + + QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " << name); msg->dequeueComplete(shared_from_this(), store); boost::shared_ptr<DequeueDoneCallback> cb; @@ -1228,7 +1232,6 @@ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg } } if (cb) (*cb)(); - QPID_LOG(error, "dequeueComplete:=" << cb); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 1c849e28fa..0bb0903a71 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -351,7 +351,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); static void tryAutoDelete(Broker& broker, Queue::shared_ptr); - virtual void setExternalQueueStore(ExternalQueueStore* inst); + virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst); // Manageable entry points management::ManagementObject* GetManagementObject (void) const; diff --git a/qpid/cpp/src/qpid/broker/RecoverableQueue.h b/qpid/cpp/src/qpid/broker/RecoverableQueue.h index 49f05f97a1..82f066a318 100644 --- a/qpid/cpp/src/qpid/broker/RecoverableQueue.h +++ b/qpid/cpp/src/qpid/broker/RecoverableQueue.h @@ -48,8 +48,8 @@ public: virtual ~RecoverableQueue() {}; virtual const std::string& getName() const = 0; - virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; - virtual ExternalQueueStore* getExternalQueueStore() const = 0; + virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst) = 0; + virtual boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const = 0; }; diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index d08409695e..69813e6bc4 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -65,8 +65,8 @@ public: void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; const std::string& getName() const; - void setExternalQueueStore(ExternalQueueStore* inst); - ExternalQueueStore* getExternalQueueStore() const; + void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst); + boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const; void recover(RecoverableMessage::shared_ptr msg); void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); @@ -213,12 +213,12 @@ const std::string& RecoverableQueueImpl::getName() const return queue->getName(); } -void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) +void RecoverableQueueImpl::setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst) { queue->setExternalQueueStore(inst); } -ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const +boost::shared_ptr<ExternalQueueStore> RecoverableQueueImpl::getExternalQueueStore() const { return queue->getExternalQueueStore(); } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 734ef81918..2c792c2d43 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -763,8 +763,8 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) { /** Design notes for asychronous completion of Message.accept ** * Message.Accept command cannot be considered "complete" until all messages - * identified by the sequence set that accompanys the command have been - * completely dequeued. A message dequeue may not be able to complet + * identified by the sequence set that accompanies the command have been + * completely dequeued. A message dequeue operation may not be able to complete * synchronously - specifically when the message is durable. Therefore, the * Message.Accept command handling must be able to track asynchronous dequeues, * and notify the Session when all dequeues are done (at which point the @@ -772,12 +772,14 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) { */ namespace { - /** Manage a Message.accept that requires async completion of one or more - * message dequeue operations */ + /** Manage a Message.accept command that requires async completion of one + * or more message dequeue operations. An instance is registered with the + * SessionContext for each asynchronous Message.accept that is "in + * flight" */ class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext { mutable qpid::sys::Mutex lock; - std::set<DeliveryId> pending; // for dequeue to complete + std::map<DeliveryId, boost::shared_ptr<Queue> > pending; // for dequeue to complete bool ready; SemanticState& state; @@ -788,28 +790,30 @@ namespace { /** called from session to urge pending dequeues to complete ASAP */ void flush() { - std::set<DeliveryId> copy; + QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId()); + std::map<DeliveryId, boost::shared_ptr<Queue> > copy; { Mutex::ScopedLock l(lock); copy = pending; } - for (std::set<DeliveryId>::iterator i = copy.begin(); + std::set<Queue *> flushedQs; // flush each queue only once! + for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = copy.begin(); i != copy.end(); ++i) { - for (DeliveryRecords::iterator r = state.getUnacked().begin(); - r != state.getUnacked().end(); ++r) { - if (r->getId() == *i) { - r->getQueue()->flush(); - break; - } + Queue *queue(i->second.get()); + if (flushedQs.find(queue) == flushedQs.end()) { + flushedQs.insert(queue); + i->second->flush(); } } } /** add a pending dequeue to track */ - void add( const DeliveryId& id ) + void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue ) { + QPID_LOG(trace, "Scheduling dequeue of delivery " << id + << " on session " << state.getSession().getSessionId()); Mutex::ScopedLock l(lock); - bool unique = pending.insert(id).second; + bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue> >(id, queue)).second; if (!unique) { assert(false); } @@ -818,22 +822,27 @@ namespace { /** signal this dequeue done. Note: may be run in *any* thread */ void complete( const DeliveryId& id ) { + QPID_LOG(trace, "Dequeue of delivery " << id + << " completed on session " << state.getSession().getSessionId()); Mutex::ScopedLock l(lock); - std::set<DeliveryId>::iterator i = pending.find(id); + std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id); assert(i != pending.end()); pending.erase(i); if (ready && pending.empty()) { framing::Invoker::Result r; // message.accept does not return result data Mutex::ScopedUnlock ul(lock); + QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); completed( r ); } } /** allow the Message.Accept to complete - do this only after all - * deliveryIds have been added() */ + * deliveryIds have been added() and this has been registered with the + * SessionContext */ void enable() { + QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId()); Mutex::ScopedLock l(lock); if (pending.empty()) { framing::Invoker::Result r; @@ -846,9 +855,10 @@ namespace { }; - /** callback to indicate a single message has completed its dequeue. This - object is made available to the queue, which will invoke it when the - dequeue completes. */ + /** callback to indicate a single message has completed its asynchronous + dequeue. This object is made available to the queue when a dequeue is + started. The queue will invoke the callback when the dequeue + completes. */ class DequeueDone : public Queue::DequeueDoneCallback { DeliveryId id; @@ -862,28 +872,31 @@ namespace { /** factory to create the above callback - passed to queue's dequeue - method, only used if dequeue is asynchronous! */ + method, only invoked if the dequeue operation is asynchronous! */ boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state, const DeliveryId& id, - boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd ) + const boost::shared_ptr<Queue>& queue, + boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd ) { - if (!cmd) { // first async dequeue creates the context - cmd.reset(new AsyncMessageAcceptCmd(*state)); + if (!cmd->get()) { // first async dequeue creates the context + cmd->reset(new AsyncMessageAcceptCmd(*state)); } - cmd->add( id ); - boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) ); + (*cmd)->add( id, queue ); + boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) ); return x; } - /** predicate to process unacked delivery records */ + /** predicate to process unacked delivery records during Message.accept + processing */ bool acceptDelivery( SemanticState *state, - boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd, + boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd, DeliveryRecord& dr ) { - Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd); + Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(), cmd); return dr.accept((TransactionContext*) 0, &f); } -} +} // namespace + void SemanticState::accepted(const SequenceSet& commands) { assertClusterSafe(); @@ -921,7 +934,7 @@ void SemanticState::accepted(const SequenceSet& commands) { isInSequenceSetAnd(commands, bind(acceptDelivery, this, - cmd, + &cmd, _1))); unacked.erase(removed, unacked.end()); diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp index e32097106f..519e9d367b 100644 --- a/qpid/cpp/src/tests/AsyncCompletion.cpp +++ b/qpid/cpp/src/tests/AsyncCompletion.cpp @@ -56,6 +56,10 @@ class AsyncCompletionMessageStore : public NullMessageStore { public: sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued; + typedef std::pair<boost::intrusive_ptr<PersistableMessage>, + boost::shared_ptr<PersistableQueue> > DequeueRecord; + sys::BlockingQueue<DequeueRecord> dequeued; + AsyncCompletionMessageStore() : NullMessageStore() {} ~AsyncCompletionMessageStore(){} @@ -65,6 +69,13 @@ class AsyncCompletionMessageStore : public NullMessageStore { { enqueued.push(msg); } + + void dequeue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& msg, + const boost::shared_ptr<PersistableQueue>& queue) + { + dequeued.push(DequeueRecord(msg, queue)); + } }; QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite) @@ -102,6 +113,44 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) { enqueued[k]->enqueueComplete(); } sync.wait(); // Should complete now, all messages are completed. + + // now test the dequeue: messageAccept should not complete until all pending + // dequeues complete + SubscriptionSettings settings; + settings.acceptMode = ACCEPT_MODE_EXPLICIT; + settings.autoAck = 0; + settings.completionMode = COMPLETE_ON_ACCEPT; + + LocalQueue q; + Subscription sub = fix.subs.subscribe(q, "q", settings); + s.messageFlush(arg::destination=sub.getName()); + SequenceSet accepted; + for (int x = 0; x < count; x++) { + Message m; + BOOST_CHECK(q.get(m, TIME_SEC * 3)); + accepted.add(m.getId()); + } + + Completion accept = s.messageAccept(accepted, arg::sync=true); + sync = s.executionSync(arg::sync=true); + + std::vector<AsyncCompletionMessageStore::DequeueRecord> dequeued; + for (int y = 0; y < count; y++) { + dequeued.push_back(store->dequeued.pop(TIME_SEC * 3)); + } + + sleep( 1 ); // even with this, accept should NOT complete! + + for (int z = count-1; z >= 0; --z) { + BOOST_CHECK(!accept.isComplete()); // Should not be complete yet. + BOOST_CHECK(!sync.isComplete()); // Should not be complete yet. + // now complete the dequeue of the message: + dequeued[z].second->dequeueComplete(dequeued[z].first); + } + + // now both should complete. + accept.wait(); + sync.wait(); } QPID_AUTO_TEST_CASE(testGetResult) { |