summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ExecutionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp87
1 files changed, 63 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index d10b3d3fe8..1520ba2272 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -97,8 +97,7 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra
void ExecutionHandler::flush()
{
- //send completion
- incoming.lwm = incoming.hwm;
+ sendCompletion();
}
void ExecutionHandler::noop()
@@ -106,48 +105,88 @@ void ExecutionHandler::noop()
//do nothing
}
-void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void ExecutionHandler::result(uint32_t command, const std::string& data)
{
- //TODO: need to signal the result to the appropriate listener
+ completion.received(command, data);
}
void ExecutionHandler::sync()
{
- //TODO: implement (the application is in charge of completion of
- //some commands, so need to track completion for them).
+ //TODO: implement - need to note the mark requested and then
+ //remember to send a response when that point is reached
+}
- //This shouldn't ever need to be called by the server (in my
- //opinion) as the server never needs to synchronise with the
- //clients execution
+void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
+{
+ if (point > outgoing.lwm) {
+ sendFlushRequest();
+ }
}
-void ExecutionHandler::sendFlush()
+void ExecutionHandler::sendFlushRequest()
{
AMQFrame frame(0, ExecutionFlushBody());
- out(frame);
+ out(frame);
}
-void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
- //allocate id:
- ++outgoing.hwm;
- //register listeners if necessary:
- if (f) {
- completion.listen(outgoing.hwm, f);
- }
- if (g) {
- correlation.listen(g);
+ if (point > outgoing.lwm) {
+ sendSyncRequest();
+ }
+}
+
+
+void ExecutionHandler::sendSyncRequest()
+{
+ AMQFrame frame(0, ExecutionSyncBody());
+ out(frame);
+}
+
+void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
+{
+ if (id > completionStatus.mark) {
+ if (cumulative) {
+ completionStatus.update(completionStatus.mark, id);
+ } else {
+ completionStatus.update(id, id);
+ }
}
+ if (send) {
+ sendCompletion();
+ }
+}
- AMQFrame frame(0/*id will be filled in be channel handler*/, command);
+
+void ExecutionHandler::sendCompletion()
+{
+ SequenceNumberSet range;
+ completionStatus.collectRanges(range);
+ AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+ out(frame);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+{
+ SequenceNumber id = ++outgoing.hwm;
+ if(l) {
+ completion.listenForResult(id, l);
+ }
+ AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
out(frame);
+ return id;
}
-void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f, Correlator::Listener g)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
+ CompletionTracker::ResultListener l)
{
- send(command, f, g);
+ SequenceNumber id = send(command, l);
+ sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ return id;
+}
+void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+{
AMQHeaderBody header;
BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
header.get<BasicHeaderProperties>(true)->setContentLength(data.size());