summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-06-08 19:55:02 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-06-08 19:55:02 +0000
commite11fb93fdbffee2e74d6c1c06d645de2b144dc4a (patch)
treeb862fcb881f00cb4f08fd849ab60b5d34a07a0db
parent69efc23e4a31d88c95f5457a1a0a9caa577c4e64 (diff)
downloadqpid-python-e11fb93fdbffee2e74d6c1c06d645de2b144dc4a.tar.gz
QPID-3079: fixes that allow persistence unit tests to pass
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1133535 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableQueue.h14
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableQueue.h4
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp75
-rw-r--r--qpid/cpp/src/tests/AsyncCompletion.cpp49
7 files changed, 112 insertions, 51 deletions
diff --git a/qpid/cpp/src/qpid/broker/PersistableQueue.h b/qpid/cpp/src/qpid/broker/PersistableQueue.h
index 3514c7eb53..7b0c313f50 100644
--- a/qpid/cpp/src/qpid/broker/PersistableQueue.h
+++ b/qpid/cpp/src/qpid/broker/PersistableQueue.h
@@ -56,24 +56,20 @@ public:
typedef boost::shared_ptr<PersistableQueue> shared_ptr;
virtual const std::string& getName() const = 0;
- virtual ~PersistableQueue() {
- if (externalQueueStore)
- delete externalQueueStore;
- };
+ virtual ~PersistableQueue() {};
- virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
+ virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst) = 0;
virtual void flush() = 0;
- inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
+ inline boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const {return externalQueueStore;};
- PersistableQueue():externalQueueStore(NULL){
- };
+ PersistableQueue() {};
/** the message has finished being dequeued from the store */
virtual void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>&) = 0;
protected:
- ExternalQueueStore* externalQueueStore;
+ boost::shared_ptr<ExternalQueueStore> externalQueueStore;
};
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 04c496cbd5..bde539f9ec 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -684,6 +684,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
Mutex::ScopedLock locker(messageLock);
pendingDequeueCallbacks[pmsg.get()] = callback;
}
+ QPID_LOG(debug, "Message " << pmsg << " async dequeue started on queue " << name);
store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done
return true;
}
@@ -1041,9 +1042,8 @@ bool Queue::hasExclusiveConsumer() const
return exclusive;
}
-void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
- if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+void Queue::setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst)
+{
externalQueueStore = inst;
if (inst) {
@@ -1215,6 +1215,10 @@ const Broker* Queue::getBroker()
* message has completed. */
void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg )
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return;
+
+ QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " << name);
msg->dequeueComplete(shared_from_this(), store);
boost::shared_ptr<DequeueDoneCallback> cb;
@@ -1228,7 +1232,6 @@ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg
}
}
if (cb) (*cb)();
- QPID_LOG(error, "dequeueComplete:=" << cb);
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 1c849e28fa..0bb0903a71 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -351,7 +351,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
- virtual void setExternalQueueStore(ExternalQueueStore* inst);
+ virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst);
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
diff --git a/qpid/cpp/src/qpid/broker/RecoverableQueue.h b/qpid/cpp/src/qpid/broker/RecoverableQueue.h
index 49f05f97a1..82f066a318 100644
--- a/qpid/cpp/src/qpid/broker/RecoverableQueue.h
+++ b/qpid/cpp/src/qpid/broker/RecoverableQueue.h
@@ -48,8 +48,8 @@ public:
virtual ~RecoverableQueue() {};
virtual const std::string& getName() const = 0;
- virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
- virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+ virtual void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst) = 0;
+ virtual boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const = 0;
};
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index d08409695e..69813e6bc4 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -65,8 +65,8 @@ public:
void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
const std::string& getName() const;
- void setExternalQueueStore(ExternalQueueStore* inst);
- ExternalQueueStore* getExternalQueueStore() const;
+ void setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst);
+ boost::shared_ptr<ExternalQueueStore> getExternalQueueStore() const;
void recover(RecoverableMessage::shared_ptr msg);
void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
@@ -213,12 +213,12 @@ const std::string& RecoverableQueueImpl::getName() const
return queue->getName();
}
-void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
+void RecoverableQueueImpl::setExternalQueueStore(const boost::shared_ptr<ExternalQueueStore>& inst)
{
queue->setExternalQueueStore(inst);
}
-ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const
+boost::shared_ptr<ExternalQueueStore> RecoverableQueueImpl::getExternalQueueStore() const
{
return queue->getExternalQueueStore();
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 734ef81918..2c792c2d43 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -763,8 +763,8 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
/** Design notes for asychronous completion of Message.accept **
* Message.Accept command cannot be considered "complete" until all messages
- * identified by the sequence set that accompanys the command have been
- * completely dequeued. A message dequeue may not be able to complet
+ * identified by the sequence set that accompanies the command have been
+ * completely dequeued. A message dequeue operation may not be able to complete
* synchronously - specifically when the message is durable. Therefore, the
* Message.Accept command handling must be able to track asynchronous dequeues,
* and notify the Session when all dequeues are done (at which point the
@@ -772,12 +772,14 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
*/
namespace {
- /** Manage a Message.accept that requires async completion of one or more
- * message dequeue operations */
+ /** Manage a Message.accept command that requires async completion of one
+ * or more message dequeue operations. An instance is registered with the
+ * SessionContext for each asynchronous Message.accept that is "in
+ * flight" */
class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext
{
mutable qpid::sys::Mutex lock;
- std::set<DeliveryId> pending; // for dequeue to complete
+ std::map<DeliveryId, boost::shared_ptr<Queue> > pending; // for dequeue to complete
bool ready;
SemanticState& state;
@@ -788,28 +790,30 @@ namespace {
/** called from session to urge pending dequeues to complete ASAP */
void flush()
{
- std::set<DeliveryId> copy;
+ QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId());
+ std::map<DeliveryId, boost::shared_ptr<Queue> > copy;
{
Mutex::ScopedLock l(lock);
copy = pending;
}
- for (std::set<DeliveryId>::iterator i = copy.begin();
+ std::set<Queue *> flushedQs; // flush each queue only once!
+ for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = copy.begin();
i != copy.end(); ++i) {
- for (DeliveryRecords::iterator r = state.getUnacked().begin();
- r != state.getUnacked().end(); ++r) {
- if (r->getId() == *i) {
- r->getQueue()->flush();
- break;
- }
+ Queue *queue(i->second.get());
+ if (flushedQs.find(queue) == flushedQs.end()) {
+ flushedQs.insert(queue);
+ i->second->flush();
}
}
}
/** add a pending dequeue to track */
- void add( const DeliveryId& id )
+ void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue )
{
+ QPID_LOG(trace, "Scheduling dequeue of delivery " << id
+ << " on session " << state.getSession().getSessionId());
Mutex::ScopedLock l(lock);
- bool unique = pending.insert(id).second;
+ bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue> >(id, queue)).second;
if (!unique) {
assert(false);
}
@@ -818,22 +822,27 @@ namespace {
/** signal this dequeue done. Note: may be run in *any* thread */
void complete( const DeliveryId& id )
{
+ QPID_LOG(trace, "Dequeue of delivery " << id
+ << " completed on session " << state.getSession().getSessionId());
Mutex::ScopedLock l(lock);
- std::set<DeliveryId>::iterator i = pending.find(id);
+ std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id);
assert(i != pending.end());
pending.erase(i);
if (ready && pending.empty()) {
framing::Invoker::Result r; // message.accept does not return result data
Mutex::ScopedUnlock ul(lock);
+ QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
completed( r );
}
}
/** allow the Message.Accept to complete - do this only after all
- * deliveryIds have been added() */
+ * deliveryIds have been added() and this has been registered with the
+ * SessionContext */
void enable()
{
+ QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId());
Mutex::ScopedLock l(lock);
if (pending.empty()) {
framing::Invoker::Result r;
@@ -846,9 +855,10 @@ namespace {
};
- /** callback to indicate a single message has completed its dequeue. This
- object is made available to the queue, which will invoke it when the
- dequeue completes. */
+ /** callback to indicate a single message has completed its asynchronous
+ dequeue. This object is made available to the queue when a dequeue is
+ started. The queue will invoke the callback when the dequeue
+ completes. */
class DequeueDone : public Queue::DequeueDoneCallback
{
DeliveryId id;
@@ -862,28 +872,31 @@ namespace {
/** factory to create the above callback - passed to queue's dequeue
- method, only used if dequeue is asynchronous! */
+ method, only invoked if the dequeue operation is asynchronous! */
boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
const DeliveryId& id,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd )
+ const boost::shared_ptr<Queue>& queue,
+ boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd )
{
- if (!cmd) { // first async dequeue creates the context
- cmd.reset(new AsyncMessageAcceptCmd(*state));
+ if (!cmd->get()) { // first async dequeue creates the context
+ cmd->reset(new AsyncMessageAcceptCmd(*state));
}
- cmd->add( id );
- boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
+ (*cmd)->add( id, queue );
+ boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) );
return x;
}
- /** predicate to process unacked delivery records */
+ /** predicate to process unacked delivery records during Message.accept
+ processing */
bool acceptDelivery( SemanticState *state,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd,
+ boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd,
DeliveryRecord& dr )
{
- Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd);
+ Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(), cmd);
return dr.accept((TransactionContext*) 0, &f);
}
-}
+} // namespace
+
void SemanticState::accepted(const SequenceSet& commands) {
assertClusterSafe();
@@ -921,7 +934,7 @@ void SemanticState::accepted(const SequenceSet& commands) {
isInSequenceSetAnd(commands,
bind(acceptDelivery,
this,
- cmd,
+ &cmd,
_1)));
unacked.erase(removed, unacked.end());
diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp
index e32097106f..519e9d367b 100644
--- a/qpid/cpp/src/tests/AsyncCompletion.cpp
+++ b/qpid/cpp/src/tests/AsyncCompletion.cpp
@@ -56,6 +56,10 @@ class AsyncCompletionMessageStore : public NullMessageStore {
public:
sys::BlockingQueue<boost::intrusive_ptr<PersistableMessage> > enqueued;
+ typedef std::pair<boost::intrusive_ptr<PersistableMessage>,
+ boost::shared_ptr<PersistableQueue> > DequeueRecord;
+ sys::BlockingQueue<DequeueRecord> dequeued;
+
AsyncCompletionMessageStore() : NullMessageStore() {}
~AsyncCompletionMessageStore(){}
@@ -65,6 +69,13 @@ class AsyncCompletionMessageStore : public NullMessageStore {
{
enqueued.push(msg);
}
+
+ void dequeue(TransactionContext*,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const boost::shared_ptr<PersistableQueue>& queue)
+ {
+ dequeued.push(DequeueRecord(msg, queue));
+ }
};
QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
@@ -102,6 +113,44 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) {
enqueued[k]->enqueueComplete();
}
sync.wait(); // Should complete now, all messages are completed.
+
+ // now test the dequeue: messageAccept should not complete until all pending
+ // dequeues complete
+ SubscriptionSettings settings;
+ settings.acceptMode = ACCEPT_MODE_EXPLICIT;
+ settings.autoAck = 0;
+ settings.completionMode = COMPLETE_ON_ACCEPT;
+
+ LocalQueue q;
+ Subscription sub = fix.subs.subscribe(q, "q", settings);
+ s.messageFlush(arg::destination=sub.getName());
+ SequenceSet accepted;
+ for (int x = 0; x < count; x++) {
+ Message m;
+ BOOST_CHECK(q.get(m, TIME_SEC * 3));
+ accepted.add(m.getId());
+ }
+
+ Completion accept = s.messageAccept(accepted, arg::sync=true);
+ sync = s.executionSync(arg::sync=true);
+
+ std::vector<AsyncCompletionMessageStore::DequeueRecord> dequeued;
+ for (int y = 0; y < count; y++) {
+ dequeued.push_back(store->dequeued.pop(TIME_SEC * 3));
+ }
+
+ sleep( 1 ); // even with this, accept should NOT complete!
+
+ for (int z = count-1; z >= 0; --z) {
+ BOOST_CHECK(!accept.isComplete()); // Should not be complete yet.
+ BOOST_CHECK(!sync.isComplete()); // Should not be complete yet.
+ // now complete the dequeue of the message:
+ dequeued[z].second->dequeueComplete(dequeued[z].first);
+ }
+
+ // now both should complete.
+ accept.wait();
+ sync.wait();
}
QPID_AUTO_TEST_CASE(testGetResult) {