summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-06-16 15:09:41 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-06-16 15:09:41 +0000
commit2519eecbf08f512535aaef845c498774700948ab (patch)
tree9475b21dfc0b8a205e599ba49bc0727f6e2540f9
parent16b17128fe65f7b29b3e4d1b8e2d0f4af25f368d (diff)
downloadqpid-python-2519eecbf08f512535aaef845c498774700948ab.tar.gz
QPID-3079: restructured Queue::dequeue() interface. Performance tuning changes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1136478 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp47
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h59
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp147
6 files changed, 134 insertions, 141 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 635123d242..d0a450ce1e 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -111,13 +111,16 @@ void DeliveryRecord::complete() {
}
/** Accept msg, and optionally notify caller when dequeue completes */
-bool DeliveryRecord::accept(TransactionContext* ctxt, Queue::DequeueDoneCallbackFactory *f) {
+boost::intrusive_ptr<Queue::DequeueCompletion>
+DeliveryRecord::accept(TransactionContext* ctxt) {
+ static const boost::intrusive_ptr<Queue::DequeueCompletion> empty;
if (acquired && !ended) {
- queue->dequeue(ctxt, msg, f);
- setEnded();
QPID_LOG(debug, "Accepted " << id);
+ boost::intrusive_ptr<Queue::DequeueCompletion> dq(queue->dequeue(ctxt, msg));
+ setEnded();
+ return dq;
}
- return isRedundant();
+ return empty;
}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index 10491df94c..43d5ef7074 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -86,7 +86,7 @@ class DeliveryRecord
void redeliver(SemanticState* const);
void acquire(DeliveryIds& results);
void complete();
- bool accept(TransactionContext*, Queue::DequeueDoneCallbackFactory *); // Returns isRedundant()
+ boost::intrusive_ptr<Queue::DequeueCompletion> accept(TransactionContext*);
bool setEnded(); // Returns isRedundant()
void committed() const;
@@ -104,7 +104,7 @@ class DeliveryRecord
void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
- typedef std::deque<DeliveryRecord> DeliveryRecords;
+ typedef std::list<DeliveryRecord> DeliveryRecords;
static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
const QueuedMessage& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index 0d5e32b40d..74f76da656 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -70,11 +70,12 @@ bool PersistableMessage::isContentReleased() const
bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
if (store && (queue->getPersistenceId()!=0)) {
sys::ScopedLock<sys::Mutex> l(storeLock);
- for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+ syncList::iterator i = synclist.find(queue->getName());
+ if (i != synclist.end()) {
PersistableQueue::shared_ptr q(i->second.lock());
if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
- }
- }
+ }
+ }
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bde539f9ec..830589e687 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -438,8 +438,7 @@ void Queue::purgeExpired()
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
- for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
- (DequeueDoneCallbackFactory*)0));
+ for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -614,8 +613,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
- (DequeueDoneCallbackFactory*)0));
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
@@ -655,19 +653,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
}
/**
- * returns false if the dequeue completed, otherwise the dequeue will complete
- * asynchronously. If the caller needs to know when an asynchronous dequeue
- * completes, it must provide a factory that will provide the callback.
+ * Returns a null pointer if the dequeue completed, otherwise the dequeue will complete
+ * asynchronously, and a pointer to a DequeueCompletion object is returned.
*/
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
- DequeueDoneCallbackFactory *factory)
+boost::intrusive_ptr<Queue::DequeueCompletion>
+Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ static const boost::intrusive_ptr<DequeueCompletion> empty;
ScopedUse u(barrier);
- if (!u.acquired) return false;
+ if (!u.acquired) return empty;
{
Mutex::ScopedLock locker(messageLock);
- if (!isEnqueued(msg)) return false;
+ if (!isEnqueued(msg)) return empty;
if (!ctxt) {
dequeued(msg);
}
@@ -679,17 +677,17 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
msg.payload->dequeueAsync(shared_from_this(), store);
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
- if (factory) {
- boost::shared_ptr<DequeueDoneCallback> callback((*factory)());
+ boost::intrusive_ptr<DequeueCompletion> dc(new DequeueCompletion());
+ {
Mutex::ScopedLock locker(messageLock);
- pendingDequeueCallbacks[pmsg.get()] = callback;
+ pendingDequeueCompletions[pmsg.get()] = dc;
}
QPID_LOG(debug, "Message " << pmsg << " async dequeue started on queue " << name);
store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done
- return true;
+ return dc;
}
}
- return false;
+ return empty;
}
void Queue::dequeueCommitted(const QueuedMessage& msg)
@@ -1121,8 +1119,7 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
}
}
//process any pending dequeues
- for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
- (DequeueDoneCallbackFactory*)0));
+ for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
pendingDequeues.clear();
}
@@ -1221,17 +1218,17 @@ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg
QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " << name);
msg->dequeueComplete(shared_from_this(), store);
- boost::shared_ptr<DequeueDoneCallback> cb;
+ boost::intrusive_ptr<DequeueCompletion> dc;
{
Mutex::ScopedLock locker(messageLock);
- std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i;
- i = pendingDequeueCallbacks.find(msg.get());
- if (i != pendingDequeueCallbacks.end()) {
- cb = i->second;
- pendingDequeueCallbacks.erase(i);
+ std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> >::iterator i;
+ i = pendingDequeueCompletions.find(msg.get());
+ if (i != pendingDequeueCompletions.end()) {
+ dc = i->second;
+ pendingDequeueCompletions.erase(i);
}
}
- if (cb) (*cb)();
+ if (dc) dc->dequeueDone();
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 0bb0903a71..69c07b1b78 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -40,6 +40,7 @@
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/AtomicValue.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -275,26 +276,50 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
- * dequeue from store (only done once messages is acknowledged). Dequeue
- * -may- complete asynchronously. This method returns 'false' if the
- * dequeue has completed, else 'true' if the dequeue is asynchronous. If
- * the caller is interested in receiving notification when the asynchronous
- * dequeue completes, it may pass a pointer to a factory functor that
- * returns a shareable DequeueDoneCallback object. If the dequeue is
- * completed synchronously, this pointer is ignored. If the dequeue will
- * complete asynchronously, the factory is called to obtain a
- * DequeueDoneCallback. When the dequeue completes, the
- * DequeueDoneCallback is invoked. The callback should be prepared to
- * execute on any thread.
+ * dequeue from Queue (only done once messages is acknowledged). Dequeue
+ * -may- complete asynchronously. This method returns a DequeueCompletion
+ * object for use by the caller to determine when the dequeue has
+ * completed. If the dequeue is able to complete during the dequeue call,
+ * a null pointer is returned. If the caller is interested in receiving
+ * notification when an asynchronous dequeue completes, it may register a
+ * callback function and a context. The callback should be prepared to
+ * execute on any thread, and the callback object's lifecycle must ensure
+ * that it survives until the dequeue callback is made.
*/
- class DequeueDoneCallback
+ class DequeueCompletion : public RefCounted
{
public:
- virtual void operator()() = 0;
+ typedef void callback( boost::intrusive_ptr<RefCounted>& ctxt );
+
+ DequeueCompletion()
+ : completionsNeeded(2), // one for register call, another for done call
+ cb(0) {}
+
+ void dequeueDone()
+ {
+ assert(completionsNeeded.get() > 0);
+ if (--completionsNeeded == 0) {
+ assert(cb);
+ (*cb)(ctxt);
+ ctxt.reset();
+ }
+ }
+
+ void registerCallback( callback *f, boost::intrusive_ptr<RefCounted>& _ctxt )
+ {
+ cb = f;
+ ctxt = _ctxt;
+ dequeueDone(); // invoke callback if dequeue already done.
+ }
+
+ private:
+ mutable qpid::sys::AtomicValue<int> completionsNeeded;
+ callback *cb;
+ boost::intrusive_ptr<RefCounted> ctxt;
+ friend class Queue;
+
};
- typedef boost::function<boost::shared_ptr<DequeueDoneCallback>()> DequeueDoneCallbackFactory;
- QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
- DequeueDoneCallbackFactory *factory = 0);
+ QPID_BROKER_EXTERN boost::intrusive_ptr<DequeueCompletion> dequeue(TransactionContext* ctxt, const QueuedMessage& msg);
/** invoked by store to signal dequeue() has completed */
QPID_BROKER_EXTERN void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg);
@@ -405,7 +430,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const Broker* getBroker();
private:
- std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> > pendingDequeueCallbacks;
+ std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> > pendingDequeueCompletions;
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2c792c2d43..96e8fe8b2d 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -548,7 +548,7 @@ void SemanticState::recover(bool requeue)
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
- sort(unacked.begin(), unacked.end());
+ unacked.sort();
}
}
@@ -779,63 +779,60 @@ namespace {
class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext
{
mutable qpid::sys::Mutex lock;
- std::map<DeliveryId, boost::shared_ptr<Queue> > pending; // for dequeue to complete
- bool ready;
+ unsigned int pending;
+ std::vector<boost::shared_ptr<Queue> > queues; // for flush()
SemanticState& state;
+ /** completes this command. Note: may run in *any* thread */
+ void complete()
+ {
+ Mutex::ScopedLock l(lock);
+ assert(pending);
+ if (--pending == 0) {
+ framing::Invoker::Result r; // message.accept does not return result data
+ Mutex::ScopedUnlock ul(lock);
+ QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
+ completed( r );
+ }
+ }
+
public:
AsyncMessageAcceptCmd(SemanticState& _state)
- : ready(false), state(_state) {}
+ : pending(1), state(_state) {}
- /** called from session to urge pending dequeues to complete ASAP */
+ /** signal this dequeue done. Note: may be run in *any* thread */
+ static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt )
+ {
+ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt));
+ cmd->complete();
+ }
+
+ /** called from session to urge pending dequeues to complete ASAP, done
+ as a result of an execution.sync */
void flush()
{
QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId());
- std::map<DeliveryId, boost::shared_ptr<Queue> > copy;
+ std::vector<boost::shared_ptr<Queue> > copy;
{
Mutex::ScopedLock l(lock);
- copy = pending;
+ copy.swap(queues); // no longer needed after flushing...
}
- std::set<Queue *> flushedQs; // flush each queue only once!
- for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = copy.begin();
+ for (std::vector<boost::shared_ptr<Queue> >::iterator i = copy.begin();
i != copy.end(); ++i) {
- Queue *queue(i->second.get());
- if (flushedQs.find(queue) == flushedQs.end()) {
- flushedQs.insert(queue);
- i->second->flush();
- }
+ (*i)->flush();
}
}
/** add a pending dequeue to track */
- void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue )
+ void add( const boost::shared_ptr<Queue>& queue )
{
- QPID_LOG(trace, "Scheduling dequeue of delivery " << id
+ QPID_LOG(trace, "Scheduling dequeue of delivery " << getId()
<< " on session " << state.getSession().getSessionId());
Mutex::ScopedLock l(lock);
- bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue> >(id, queue)).second;
- if (!unique) {
- assert(false);
- }
+ ++pending;
+ queues.push_back(queue);
}
- /** signal this dequeue done. Note: may be run in *any* thread */
- void complete( const DeliveryId& id )
- {
- QPID_LOG(trace, "Dequeue of delivery " << id
- << " completed on session " << state.getSession().getSessionId());
- Mutex::ScopedLock l(lock);
- std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id);
- assert(i != pending.end());
- pending.erase(i);
-
- if (ready && pending.empty()) {
- framing::Invoker::Result r; // message.accept does not return result data
- Mutex::ScopedUnlock ul(lock);
- QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
- completed( r );
- }
- }
/** allow the Message.Accept to complete - do this only after all
* deliveryIds have been added() and this has been registered with the
@@ -844,57 +841,15 @@ namespace {
{
QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId());
Mutex::ScopedLock l(lock);
- if (pending.empty()) {
+ assert(pending);
+ if (--pending == 0) {
framing::Invoker::Result r;
Mutex::ScopedUnlock ul(lock);
+ QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
completed( r );
- return;
}
- ready = true;
}
};
-
-
- /** callback to indicate a single message has completed its asynchronous
- dequeue. This object is made available to the queue when a dequeue is
- started. The queue will invoke the callback when the dequeue
- completes. */
- class DequeueDone : public Queue::DequeueDoneCallback
- {
- DeliveryId id;
- boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
- public:
- DequeueDone( const DeliveryId & _id,
- const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
- : id(_id), cmd(_cmd) {}
- void operator()() { cmd->complete( id ); }
- };
-
-
- /** factory to create the above callback - passed to queue's dequeue
- method, only invoked if the dequeue operation is asynchronous! */
- boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
- const DeliveryId& id,
- const boost::shared_ptr<Queue>& queue,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd )
- {
- if (!cmd->get()) { // first async dequeue creates the context
- cmd->reset(new AsyncMessageAcceptCmd(*state));
- }
- (*cmd)->add( id, queue );
- boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) );
- return x;
- }
-
- /** predicate to process unacked delivery records during Message.accept
- processing */
- bool acceptDelivery( SemanticState *state,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd,
- DeliveryRecord& dr )
- {
- Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(), cmd);
- return dr.accept((TransactionContext*) 0, &f);
- }
} // namespace
@@ -929,14 +884,26 @@ void SemanticState::accepted(const SequenceSet& commands) {
which would be necessary if we remove when the dequeue
completes. Is this ok? */
boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
- DeliveryRecords::iterator removed =
- remove_if(unacked.begin(), unacked.end(),
- isInSequenceSetAnd(commands,
- bind(acceptDelivery,
- this,
- &cmd,
- _1)));
- unacked.erase(removed, unacked.end());
+ IsInSequenceSet isInSeq(commands);
+ DeliveryRecords::const_iterator end(unacked.end());
+ DeliveryRecords::iterator i = unacked.begin();
+ while (i != end) {
+ const SequenceNumber seq(i->getId());
+ if (isInSeq(seq)) {
+ boost::intrusive_ptr<Queue::DequeueCompletion> async(i->accept((TransactionContext *)0));
+ if (async) {
+ if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this));
+ cmd->add(i->getQueue());
+ boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd));
+ async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc);
+ }
+ if (i->isRedundant())
+ i = unacked.erase(i);
+ else
+ ++i;
+ } else
+ ++i;
+ }
if (cmd) {
boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));