summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-06-03 20:10:17 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-06-03 20:10:17 +0000
commit427a8c19ec9e14da39fc9fd0c1e10b2865abeec1 (patch)
tree9c55eb930ec8187490344d9a8d0782d507d7ce3a
parentc578ed1e72954def2a5904990d7a97b9756c4f35 (diff)
downloadqpid-python-427a8c19ec9e14da39fc9fd0c1e10b2865abeec1.tar.gz
checkpoint - various fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1131182 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp52
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h10
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp66
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h51
5 files changed, 87 insertions, 94 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index bfd6718dcd..721e862bce 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -89,7 +89,7 @@ class AsyncCompletion : private boost::noncopyable
* callback object will be used by the last completer thread, and
* released when the callback returns.
*/
- class Callback : virtual public RefCounted
+ class Callback : public RefCounted
{
public:
virtual void completed(bool) = 0;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 0f670ded83..3530f3ef6f 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -824,13 +824,14 @@ namespace {
pending.erase(i);
if (ready && pending.empty()) {
- framing::Invoker::Result r;
+ framing::Invoker::Result r; // message.accept does not return result data
Mutex::ScopedUnlock ul(lock);
completed( r );
}
}
- /** allow the Message.Accept to complete */
+ /** allow the Message.Accept to complete - do this only after all
+ * deliveryIds have been added() */
void enable()
{
Mutex::ScopedLock l(lock);
@@ -854,14 +855,14 @@ namespace {
boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
public:
DequeueDone( const DeliveryId & _id,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
+ const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
: id(_id), cmd(_cmd) {}
void operator()() { cmd->complete( id ); }
};
/** factory to create the above callback - passed to queue's dequeue
- method, only called if dequeue is async! */
+ method, only used if dequeue is asynchronous! */
boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
const DeliveryId& id,
boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd )
@@ -873,10 +874,18 @@ namespace {
boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
return x;
}
+
+ /** predicate to process unacked delivery records */
+ bool acceptDelivery( SemanticState *state,
+ boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd,
+ DeliveryRecord dr )
+ {
+ Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd);
+ return dr.accept((TransactionContext*) 0, &f);
+ }
}
void SemanticState::accepted(const SequenceSet& commands) {
- QPID_LOG(error, "SemanticState::accepted (" << commands << ")");
assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -900,26 +909,21 @@ void SemanticState::accepted(const SequenceSet& commands) {
unacked.erase(removed, unacked.end());
}
} else {
- /** @todo KAG - the following code removes the command from unacked
- even if the dequeue has not completed. note that the command will
- still not complete until all dequeues complete. I'm doing this to
- avoid having to lock the unacked list, which would be necessary if
- we remove when the dequeue completes. Is this ok? */
+ /** @todo KAG - the following code removes the message from unacked
+ list even if the dequeue has not yet completed. Note that the
+ entire command will still not complete until all dequeues
+ complete. I'm doing this to avoid having to lock the unacked list,
+ which would be necessary if we remove when the dequeue
+ completes. Is this ok? */
boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
- DeliveryRecords::iterator i;
- DeliveryRecords undone;
- for (i = unacked.begin(); i < unacked.end(); ++i) {
- if (i->coveredBy(&commands)) {
- Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd);
- if (i->accept((TransactionContext*) 0, &f) == false) {
- undone.push_back(*i);
- }
- }
- }
- if (undone.empty())
- unacked.clear();
- else
- unacked.swap(undone);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(acceptDelivery,
+ this,
+ cmd,
+ _1)));
+ unacked.erase(removed, unacked.end());
if (cmd) {
boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 6f4a894f22..a774ab6c52 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -59,7 +59,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
// class for commands that need to complete asynchronously
friend class AsyncCommandContext;
- class AsyncCommandContext : virtual public RefCounted
+ class AsyncCommandContext : public RefCounted
{
private:
framing::SequenceNumber id;
@@ -71,11 +71,11 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
AsyncCommandContext() : id(0), requiresAccept(false), syncBitSet(false) {}
virtual ~AsyncCommandContext() {}
- framing::SequenceNumber getId() { return id; }
+ framing::SequenceNumber getId() const { return id; }
void setId(const framing::SequenceNumber seq) { id = seq; }
- bool getRequiresAccept() { return requiresAccept; }
+ bool getRequiresAccept() const { return requiresAccept; }
void setRequiresAccept(const bool a) { requiresAccept = a; }
- bool getSyncBitSet() { return syncBitSet; }
+ bool getSyncBitSet() const { return syncBitSet; }
void setSyncBitSet(const bool s) { syncBitSet = s; }
void setManager(SessionContext::AsyncCommandManager *m) { manager.reset(m); }
@@ -95,7 +95,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
class AsyncCommandManager : public RefCounted
{
public:
- virtual void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
+ virtual void completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&,
const framing::Invoker::Result&) = 0;
};
};
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index d84256b61b..4f6e7ff075 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -208,6 +208,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
{
currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
syncCurrentCommand = method->isSync();
+ currentCommandId = id;
acceptRequired = false;
Invoker::Result invocation = invoke(adapter, *method);
if (!invocation.wasHandled()) {
@@ -263,6 +264,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
currentCommandComplete = true; // assumed
syncCurrentCommand = msg->getFrames().getMethod()->isSync();
acceptRequired = msg->requiresAccept();
+ currentCommandId = msg->getCommandId();
semanticState.handle(msg);
msgBuilder.end();
IncompleteIngressMsgXfer xfer(this, msg);
@@ -428,21 +430,23 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
// (called via the invoker() in handleCommand() above)
void SessionState::addPendingExecutionSync()
{
- SequenceNumber syncCommandId = receiverGetCurrent();
- if (receiverGetIncomplete().front() < syncCommandId) {
+ if (receiverGetIncomplete().front() < currentCommandId) {
currentCommandComplete = false;
- pendingExecutionSyncs.push(syncCommandId);
+ pendingExecutionSyncs.push(currentCommandId);
asyncCommandManager->flushPendingCommands();
- QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << currentCommandId);
}
}
void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
{
- /** @todo KAG: ensure this is invoked during handleCommand() context! */
+ /** @todo KAG: ensure this is invoked during handleCommand()/handleContent() context! */
currentCommandComplete = false;
- asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand );
+ asyncCommandManager->addPendingCommand( aCmd, currentCommandId, acceptRequired, syncCurrentCommand );
+ if (syncCurrentCommand) { // if client wants ack, force command to complete asap.
+ aCmd->flush();
+ }
}
@@ -460,14 +464,10 @@ SessionState::IncompleteIngressMsgXfer::clone()
boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
// this routine is *only* invoked when the message needs to be asynchronously completed. Otherwise, ::completed()
- // will be invoked directly.
- pending = true;
- boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb));
+ // will be invoked directly. Thus, let the SessionState know this command is not going to complete immediately:
+ pendingCmdCtxt = boost::intrusive_ptr<CommandContext>(new CommandContext(msg));
+ boost::intrusive_ptr<qpid::broker::SessionContext::AsyncCommandContext> ctxt(pendingCmdCtxt);
session->registerAsyncCommand(ctxt);
- if (ctxt->getSyncBitSet()) {
- // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
- msg->flush();
- }
return cb;
}
@@ -478,35 +478,27 @@ SessionState::IncompleteIngressMsgXfer::clone()
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
- if (!sync) {
- /** note well: this path may execute in any thread. It is safe to access
- * the scheduledCompleterContext, since *this has a shared pointer to it.
- * but not session!
- */
+ if (pendingCmdCtxt) {
session = 0;
- QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId());
- completed(framing::Invoker::Result());
+ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << pendingCmdCtxt->getId());
+ pendingCmdCtxt->completed(framing::Invoker::Result());
+ pendingCmdCtxt.reset();
} else {
+ // Since "clone()" above was _not_ called, this -better- be sync!
+ if (!sync) assert(false);
// this path runs directly from the ac->end() call in handleContent() above,
// so *session is definately valid.
if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << getId());
- session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet());
- }
- if (pending) {
- boost::intrusive_ptr<AsyncCommandContext> p(this);
- session->cancelAsyncCommand(p);
+ QPID_LOG(debug, ": receive completed for msg seq=" << session->currentCommandId);
+ session->completeCommand(session->currentCommandId,
+ framing::Invoker::Result(), // dummy
+ session->acceptRequired,
+ session->syncCurrentCommand);
}
}
}
-void SessionState::IncompleteIngressMsgXfer::flush()
-{
- msg->flush();
-}
-
-
/** Scheduled from an asynchronous command's completed callback to run on
* the IO thread.
*/
@@ -516,9 +508,9 @@ void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncComma
}
-void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
- framing::SequenceNumber seq,
- bool acceptRequired, bool syncBitSet)
+void SessionState::AsyncCommandManager::addPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd,
+ const framing::SequenceNumber& seq,
+ const bool acceptRequired, const bool syncBitSet)
{
cmd->setId(seq);
cmd->setRequiresAccept(acceptRequired);
@@ -531,7 +523,7 @@ void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<A
}
-void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd)
+void SessionState::AsyncCommandManager::cancelPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
pendingCommands.erase(cmd->getId());
@@ -559,7 +551,7 @@ void SessionState::AsyncCommandManager::flushPendingCommands()
/** mark a pending command as completed.
* This method must be thread safe - it may run on any thread.
*/
-void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+void SessionState::AsyncCommandManager::completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd,
const framing::Invoker::Result& result)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 6ee8a38f4e..cd6123688b 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -186,10 +186,11 @@ class SessionState : public qpid::SessionState,
// sequence numbers of received Execution.Sync commands that are pending completion.
std::queue<SequenceNumber> pendingExecutionSyncs;
- // true if command completes during call to handleCommand()
- bool currentCommandComplete;
- bool syncCurrentCommand;
- bool acceptRequired;
+ // flags that reflect the state of the currently executing received command:
+ bool currentCommandComplete; // true if the current command completed synchronously
+ SequenceNumber currentCommandId;
+ bool syncCurrentCommand; // true if sync-bit set in current command headers
+ bool acceptRequired; // true if current ingress message.transfer requires accept
protected:
/** This class provides a context for completing asynchronous commands in a thread
@@ -231,24 +232,16 @@ class SessionState : public qpid::SessionState,
AsyncCommandManager(SessionState *s) : session(s), isAttached(s->isAttached()) {};
~AsyncCommandManager() {};
- /** track a message pending ingress completion */
- //void addPendingMessage(boost::intrusive_ptr<Message> m);
- //void deletePendingMessage(SequenceNumber id);
- //void flushPendingMessages();
- /** schedule the processing of a completed ingress message.transfer command */
- //void scheduleMsgCompletion(SequenceNumber cmd,
- // bool requiresAccept,
- // bool requiresSync);
void cancel(); // called by SessionState destructor.
void attached(); // called by SessionState on attach()
void detached(); // called by SessionState on detach()
- /** called by async command handlers */
- void addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&,
- framing::SequenceNumber, bool, bool);
- void cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&);
+ /** for mananging asynchronous commands */
+ void addPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&,
+ const framing::SequenceNumber&, const bool, const bool);
+ void cancelPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&);
void flushPendingCommands();
- void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&);
+ void completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&);
};
boost::intrusive_ptr<AsyncCommandManager> asyncCommandManager;
@@ -256,28 +249,32 @@ class SessionState : public qpid::SessionState,
/** incomplete Message.transfer commands - inbound to broker from client
*/
- class IncompleteIngressMsgXfer : public AsyncCommandContext,
- public AsyncCompletion::Callback
+ friend class IncompleteIngressMsgXfer;
+ class IncompleteIngressMsgXfer : public AsyncCompletion::Callback
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
boost::intrusive_ptr<Message> m )
- : session(ss),
- msg(m),
- pending(false) {}
+ : session(ss), msg(m) {};
virtual ~IncompleteIngressMsgXfer() {};
- // async completion calls
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
- // async cmd calls
- virtual void flush();
-
private:
+ /** @todo KAG COMMENT ME */
+ class CommandContext : public AsyncCommandContext
+ {
+ boost::intrusive_ptr<Message> msg;
+ public:
+ CommandContext(boost::intrusive_ptr<Message> _m)
+ : msg(_m) {}
+ virtual void flush() { msg->flush(); }
+ };
+
SessionState *session; // only valid if sync flag in callback is true
boost::intrusive_ptr<Message> msg;
- bool pending; // true if msg saved on pending list...
+ boost::intrusive_ptr<CommandContext> pendingCmdCtxt;
};
friend class SessionManager;