summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ExecutionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp38
1 files changed, 22 insertions, 16 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 95cdc7032a..4e0ee05da2 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -63,7 +63,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
if (!arriving) {
- arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+ arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter));
}
arriving->append(frame);
if (arriving->isComplete()) {
@@ -77,16 +77,12 @@ void ExecutionHandler::handle(AMQFrame& frame)
void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- completion.completed(outgoing.lwm);
- }
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- //TODO: need to manage (record and accumulate) ranges such
- //that we can implictly move the mark when appropriate
+ SequenceNumber mark(cumulative);
+ outgoingCompletionStatus.update(mark, range);
+ completion.completed(outgoingCompletionStatus.mark);
//TODO: signal listeners of early notification?
}
@@ -115,7 +111,7 @@ void ExecutionHandler::sync()
void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
{
- if (point > outgoing.lwm) {
+ if (point > outgoingCompletionStatus.mark) {
sendFlushRequest();
}
}
@@ -128,7 +124,7 @@ void ExecutionHandler::sendFlushRequest()
void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
- if (point > outgoing.lwm) {
+ if (point > outgoingCompletionStatus.mark) {
sendSyncRequest();
}
}
@@ -142,11 +138,11 @@ void ExecutionHandler::sendSyncRequest()
void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
{
- if (id > completionStatus.mark) {
+ if (id > incomingCompletionStatus.mark) {
if (cumulative) {
- completionStatus.update(completionStatus.mark, id);
+ incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
} else {
- completionStatus.update(id, id);
+ incomingCompletionStatus.update(id, id);
}
}
if (send) {
@@ -158,8 +154,8 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool
void ExecutionHandler::sendCompletion()
{
SequenceNumberSet range;
- completionStatus.collectRanges(range);
- AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+ incomingCompletionStatus.collectRanges(range);
+ AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range));
out(frame);
}
@@ -170,7 +166,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
{
- SequenceNumber id = ++outgoing.hwm;
+ SequenceNumber id = ++outgoingCounter;
if(l) {
completion.listenForResult(id, l);
}
@@ -228,3 +224,13 @@ void ExecutionHandler::sendContent(const MethodContent& content)
out(header);
}
}
+
+bool ExecutionHandler::isComplete(const SequenceNumber& id)
+{
+ return outgoingCompletionStatus.covers(id);
+}
+
+bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
+{
+ return outgoingCompletionStatus.mark >= id;
+}