summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ExecutionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp49
1 files changed, 33 insertions, 16 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 8ea2cc64e6..95cdc7032a 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -62,19 +62,16 @@ void ExecutionHandler::handle(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
- if (isContentFrame(frame)) {
- if (!arriving) {
- arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
- }
- arriving->append(frame);
- if (arriving->isComplete()) {
+ if (!arriving) {
+ arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+ }
+ arriving->append(frame);
+ if (arriving->isComplete()) {
+ if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) {
demux.handle(arriving);
- arriving.reset();
- }
- } else {
- ++incoming.hwm;
- correlation.receive(body->getMethod());
- }
+ }
+ arriving.reset();
+ }
}
}
@@ -168,11 +165,19 @@ void ExecutionHandler::sendCompletion()
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
{
+ return send(command, l, false);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
+{
SequenceNumber id = ++outgoing.hwm;
if(l) {
completion.listenForResult(id, l);
}
AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+ if (hasContent) {
+ frame.setEof(false);
+ }
out(frame);
return id;
}
@@ -180,7 +185,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:
SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
CompletionTracker::ResultListener l)
{
- SequenceNumber id = send(command, l);
+ SequenceNumber id = send(command, l, true);
sendContent(content);
return id;
}
@@ -188,14 +193,16 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten
void ExecutionHandler::sendContent(const MethodContent& content)
{
AMQFrame header(0, content.getHeader());
- out(header);
-
+ header.setBof(false);
u_int64_t data_length = content.getData().length();
if(data_length > 0){
+ header.setEof(false);
+ out(header);
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
AMQFrame frame(0, AMQContentBody(content.getData()));
+ frame.setBof(false);
out(frame);
}else{
u_int32_t offset = 0;
@@ -204,10 +211,20 @@ void ExecutionHandler::sendContent(const MethodContent& content)
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
AMQFrame frame(0, AMQContentBody(frag));
- out(frame);
+ frame.setBof(false);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
offset += length;
remaining = data_length - offset;
+ if (remaining) {
+ frame.setEos(false);
+ frame.setEof(false);
+ }
+ out(frame);
}
}
+ } else {
+ out(header);
}
}