diff options
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 38 |
1 files changed, 22 insertions, 16 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 95cdc7032a..4e0ee05da2 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -63,7 +63,7 @@ void ExecutionHandler::handle(AMQFrame& frame) AMQBody* body = frame.getBody(); if (!invoke(body, this)) { if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); + arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter)); } arriving->append(frame); if (arriving->isComplete()) { @@ -77,16 +77,12 @@ void ExecutionHandler::handle(AMQFrame& frame) void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { - SequenceNumber mark(cumulative); - if (outgoing.lwm < mark) { - outgoing.lwm = mark; - completion.completed(outgoing.lwm); - } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { - //TODO: need to manage (record and accumulate) ranges such - //that we can implictly move the mark when appropriate + SequenceNumber mark(cumulative); + outgoingCompletionStatus.update(mark, range); + completion.completed(outgoingCompletionStatus.mark); //TODO: signal listeners of early notification? } @@ -115,7 +111,7 @@ void ExecutionHandler::sync() void ExecutionHandler::flushTo(const framing::SequenceNumber& point) { - if (point > outgoing.lwm) { + if (point > outgoingCompletionStatus.mark) { sendFlushRequest(); } } @@ -128,7 +124,7 @@ void ExecutionHandler::sendFlushRequest() void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { - if (point > outgoing.lwm) { + if (point > outgoingCompletionStatus.mark) { sendSyncRequest(); } } @@ -142,11 +138,11 @@ void ExecutionHandler::sendSyncRequest() void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) { - if (id > completionStatus.mark) { + if (id > incomingCompletionStatus.mark) { if (cumulative) { - completionStatus.update(completionStatus.mark, id); + incomingCompletionStatus.update(incomingCompletionStatus.mark, id); } else { - completionStatus.update(id, id); + incomingCompletionStatus.update(id, id); } } if (send) { @@ -158,8 +154,8 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool void ExecutionHandler::sendCompletion() { SequenceNumberSet range; - completionStatus.collectRanges(range); - AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range)); + incomingCompletionStatus.collectRanges(range); + AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range)); out(frame); } @@ -170,7 +166,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) { - SequenceNumber id = ++outgoing.hwm; + SequenceNumber id = ++outgoingCounter; if(l) { completion.listenForResult(id, l); } @@ -228,3 +224,13 @@ void ExecutionHandler::sendContent(const MethodContent& content) out(header); } } + +bool ExecutionHandler::isComplete(const SequenceNumber& id) +{ + return outgoingCompletionStatus.covers(id); +} + +bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) +{ + return outgoingCompletionStatus.mark >= id; +} |