diff options
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 87 |
1 files changed, 63 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index d10b3d3fe8..1520ba2272 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -97,8 +97,7 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra void ExecutionHandler::flush() { - //send completion - incoming.lwm = incoming.hwm; + sendCompletion(); } void ExecutionHandler::noop() @@ -106,48 +105,88 @@ void ExecutionHandler::noop() //do nothing } -void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void ExecutionHandler::result(uint32_t command, const std::string& data) { - //TODO: need to signal the result to the appropriate listener + completion.received(command, data); } void ExecutionHandler::sync() { - //TODO: implement (the application is in charge of completion of - //some commands, so need to track completion for them). + //TODO: implement - need to note the mark requested and then + //remember to send a response when that point is reached +} - //This shouldn't ever need to be called by the server (in my - //opinion) as the server never needs to synchronise with the - //clients execution +void ExecutionHandler::flushTo(const framing::SequenceNumber& point) +{ + if (point > outgoing.lwm) { + sendFlushRequest(); + } } -void ExecutionHandler::sendFlush() +void ExecutionHandler::sendFlushRequest() { AMQFrame frame(0, ExecutionFlushBody()); - out(frame); + out(frame); } -void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g) +void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { - //allocate id: - ++outgoing.hwm; - //register listeners if necessary: - if (f) { - completion.listen(outgoing.hwm, f); - } - if (g) { - correlation.listen(g); + if (point > outgoing.lwm) { + sendSyncRequest(); + } +} + + +void ExecutionHandler::sendSyncRequest() +{ + AMQFrame frame(0, ExecutionSyncBody()); + out(frame); +} + +void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) +{ + if (id > completionStatus.mark) { + if (cumulative) { + completionStatus.update(completionStatus.mark, id); + } else { + completionStatus.update(id, id); + } } + if (send) { + sendCompletion(); + } +} - AMQFrame frame(0/*id will be filled in be channel handler*/, command); + +void ExecutionHandler::sendCompletion() +{ + SequenceNumberSet range; + completionStatus.collectRanges(range); + AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range)); + out(frame); +} + +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) +{ + SequenceNumber id = ++outgoing.hwm; + if(l) { + completion.listenForResult(id, l); + } + AMQFrame frame(0/*channel will be filled in be channel handler*/, command); out(frame); + return id; } -void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, - CompletionTracker::Listener f, Correlator::Listener g) +SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, + CompletionTracker::ResultListener l) { - send(command, f, g); + SequenceNumber id = send(command, l); + sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData()); + return id; +} +void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data) +{ AMQHeaderBody header; BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers); header.get<BasicHeaderProperties>(true)->setContentLength(data.size()); |