summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-06-20 18:04:05 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-06-20 18:04:05 +0000
commit15c6e1552997bc618b01faac3a2ceabbeeba7946 (patch)
tree73a911cb756fc7302b01db6455019b6c1851a9d8
parentff13c8408b23ebaf9bbb09b1100def0066a4c7cc (diff)
downloadqpid-python-15c6e1552997bc618b01faac3a2ceabbeeba7946.tar.gz
QPID-3079: update with review feedback
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1137726 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h13
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp23
3 files changed, 13 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 04d61c1add..cf538aaaa7 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1263,14 +1263,12 @@ void Queue::DequeueCompletion::dequeueDone()
assert(completionsNeeded.get() > 0);
if (--completionsNeeded == 0) {
assert(cb);
- (*cb)(ctxt);
- ctxt.reset();
+ cb();
}
}
-void Queue::DequeueCompletion::registerCallback( callback *f, boost::intrusive_ptr<RefCounted>& _ctxt )
+void Queue::DequeueCompletion::registerCallback( boost::function<void()> f )
{
cb = f;
- ctxt = _ctxt;
dequeueDone(); // invoke callback if dequeue already done.
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 3bcaf0f473..86c184676f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -289,21 +289,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
class DequeueCompletion : public RefCounted
{
public:
- typedef void callback( boost::intrusive_ptr<RefCounted>& ctxt );
-
DequeueCompletion()
- : completionsNeeded(2), // one for register call, another for done call
- cb(0) {}
-
+ : completionsNeeded(2) {}// one for register call, another for done call
void dequeueDone();
- void registerCallback( callback *f, boost::intrusive_ptr<RefCounted>& _ctxt );
+ void registerCallback(boost::function<void()> f);
private:
mutable qpid::sys::AtomicValue<int> completionsNeeded;
- callback *cb;
- boost::intrusive_ptr<RefCounted> ctxt;
- friend class Queue;
-
+ boost::function<void()> cb;
};
QPID_BROKER_EXTERN boost::intrusive_ptr<DequeueCompletion> dequeue(TransactionContext* ctxt, const QueuedMessage& msg);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 96e8fe8b2d..2383978276 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -783,30 +783,24 @@ namespace {
std::vector<boost::shared_ptr<Queue> > queues; // for flush()
SemanticState& state;
- /** completes this command. Note: may run in *any* thread */
- void complete()
+ public:
+ AsyncMessageAcceptCmd(SemanticState& _state)
+ : pending(1), state(_state) {}
+
+ /** signal a dequeue is done. Note: may be run in *any* thread */
+ void dequeueDone()
{
Mutex::ScopedLock l(lock);
assert(pending);
if (--pending == 0) {
framing::Invoker::Result r; // message.accept does not return result data
+ queues.clear();
Mutex::ScopedUnlock ul(lock);
QPID_LOG(trace, "Completing async message.accept cmd=" << getId());
completed( r );
}
}
- public:
- AsyncMessageAcceptCmd(SemanticState& _state)
- : pending(1), state(_state) {}
-
- /** signal this dequeue done. Note: may be run in *any* thread */
- static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt )
- {
- boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt));
- cmd->complete();
- }
-
/** called from session to urge pending dequeues to complete ASAP, done
as a result of an execution.sync */
void flush()
@@ -894,8 +888,7 @@ void SemanticState::accepted(const SequenceSet& commands) {
if (async) {
if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this));
cmd->add(i->getQueue());
- boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd));
- async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc);
+ async->registerCallback(boost::bind(&AsyncMessageAcceptCmd::dequeueDone, cmd));
}
if (i->isRedundant())
i = unacked.erase(i);