summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp147
1 files changed, 57 insertions, 90 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 2c792c2d43..96e8fe8b2d 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -548,7 +548,7 @@ void SemanticState::recover(bool requeue)
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
- sort(unacked.begin(), unacked.end());
+ unacked.sort();
}
}
@@ -779,63 +779,60 @@ namespace {
class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext
{
mutable qpid::sys::Mutex lock;
- std::map<DeliveryId, boost::shared_ptr<Queue> > pending; // for dequeue to complete
- bool ready;
+ unsigned int pending;
+ std::vector<boost::shared_ptr<Queue> > queues; // for flush()
SemanticState& state;
+ /** completes this command. Note: may run in *any* thread */
+ void complete()
+ {
+ Mutex::ScopedLock l(lock);
+ assert(pending);
+ if (--pending == 0) {
+ framing::Invoker::Result r; // message.accept does not return result data
+ Mutex::ScopedUnlock ul(lock);
+ QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
+ completed( r );
+ }
+ }
+
public:
AsyncMessageAcceptCmd(SemanticState& _state)
- : ready(false), state(_state) {}
+ : pending(1), state(_state) {}
- /** called from session to urge pending dequeues to complete ASAP */
+ /** signal this dequeue done. Note: may be run in *any* thread */
+ static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt )
+ {
+ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt));
+ cmd->complete();
+ }
+
+ /** called from session to urge pending dequeues to complete ASAP, done
+ as a result of an execution.sync */
void flush()
{
QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId());
- std::map<DeliveryId, boost::shared_ptr<Queue> > copy;
+ std::vector<boost::shared_ptr<Queue> > copy;
{
Mutex::ScopedLock l(lock);
- copy = pending;
+ copy.swap(queues); // no longer needed after flushing...
}
- std::set<Queue *> flushedQs; // flush each queue only once!
- for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = copy.begin();
+ for (std::vector<boost::shared_ptr<Queue> >::iterator i = copy.begin();
i != copy.end(); ++i) {
- Queue *queue(i->second.get());
- if (flushedQs.find(queue) == flushedQs.end()) {
- flushedQs.insert(queue);
- i->second->flush();
- }
+ (*i)->flush();
}
}
/** add a pending dequeue to track */
- void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue )
+ void add( const boost::shared_ptr<Queue>& queue )
{
- QPID_LOG(trace, "Scheduling dequeue of delivery " << id
+ QPID_LOG(trace, "Scheduling dequeue of delivery " << getId()
<< " on session " << state.getSession().getSessionId());
Mutex::ScopedLock l(lock);
- bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue> >(id, queue)).second;
- if (!unique) {
- assert(false);
- }
+ ++pending;
+ queues.push_back(queue);
}
- /** signal this dequeue done. Note: may be run in *any* thread */
- void complete( const DeliveryId& id )
- {
- QPID_LOG(trace, "Dequeue of delivery " << id
- << " completed on session " << state.getSession().getSessionId());
- Mutex::ScopedLock l(lock);
- std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id);
- assert(i != pending.end());
- pending.erase(i);
-
- if (ready && pending.empty()) {
- framing::Invoker::Result r; // message.accept does not return result data
- Mutex::ScopedUnlock ul(lock);
- QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
- completed( r );
- }
- }
/** allow the Message.Accept to complete - do this only after all
* deliveryIds have been added() and this has been registered with the
@@ -844,57 +841,15 @@ namespace {
{
QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId());
Mutex::ScopedLock l(lock);
- if (pending.empty()) {
+ assert(pending);
+ if (--pending == 0) {
framing::Invoker::Result r;
Mutex::ScopedUnlock ul(lock);
+ QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
completed( r );
- return;
}
- ready = true;
}
};
-
-
- /** callback to indicate a single message has completed its asynchronous
- dequeue. This object is made available to the queue when a dequeue is
- started. The queue will invoke the callback when the dequeue
- completes. */
- class DequeueDone : public Queue::DequeueDoneCallback
- {
- DeliveryId id;
- boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
- public:
- DequeueDone( const DeliveryId & _id,
- const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
- : id(_id), cmd(_cmd) {}
- void operator()() { cmd->complete( id ); }
- };
-
-
- /** factory to create the above callback - passed to queue's dequeue
- method, only invoked if the dequeue operation is asynchronous! */
- boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
- const DeliveryId& id,
- const boost::shared_ptr<Queue>& queue,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd )
- {
- if (!cmd->get()) { // first async dequeue creates the context
- cmd->reset(new AsyncMessageAcceptCmd(*state));
- }
- (*cmd)->add( id, queue );
- boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) );
- return x;
- }
-
- /** predicate to process unacked delivery records during Message.accept
- processing */
- bool acceptDelivery( SemanticState *state,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd,
- DeliveryRecord& dr )
- {
- Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(), cmd);
- return dr.accept((TransactionContext*) 0, &f);
- }
} // namespace
@@ -929,14 +884,26 @@ void SemanticState::accepted(const SequenceSet& commands) {
which would be necessary if we remove when the dequeue
completes. Is this ok? */
boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
- DeliveryRecords::iterator removed =
- remove_if(unacked.begin(), unacked.end(),
- isInSequenceSetAnd(commands,
- bind(acceptDelivery,
- this,
- &cmd,
- _1)));
- unacked.erase(removed, unacked.end());
+ IsInSequenceSet isInSeq(commands);
+ DeliveryRecords::const_iterator end(unacked.end());
+ DeliveryRecords::iterator i = unacked.begin();
+ while (i != end) {
+ const SequenceNumber seq(i->getId());
+ if (isInSeq(seq)) {
+ boost::intrusive_ptr<Queue::DequeueCompletion> async(i->accept((TransactionContext *)0));
+ if (async) {
+ if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this));
+ cmd->add(i->getQueue());
+ boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd));
+ async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc);
+ }
+ if (i->isRedundant())
+ i = unacked.erase(i);
+ else
+ ++i;
+ } else
+ ++i;
+ }
if (cmd) {
boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));