diff options
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 49 |
1 files changed, 33 insertions, 16 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 8ea2cc64e6..95cdc7032a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -62,19 +62,16 @@ void ExecutionHandler::handle(AMQFrame& frame) { AMQBody* body = frame.getBody(); if (!invoke(body, this)) { - if (isContentFrame(frame)) { - if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); - } - arriving->append(frame); - if (arriving->isComplete()) { + if (!arriving) { + arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); + } + arriving->append(frame); + if (arriving->isComplete()) { + if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) { demux.handle(arriving); - arriving.reset(); - } - } else { - ++incoming.hwm; - correlation.receive(body->getMethod()); - } + } + arriving.reset(); + } } } @@ -168,11 +165,19 @@ void ExecutionHandler::sendCompletion() SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) { + return send(command, l, false); +} + +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) +{ SequenceNumber id = ++outgoing.hwm; if(l) { completion.listenForResult(id, l); } AMQFrame frame(0/*channel will be filled in be channel handler*/, command); + if (hasContent) { + frame.setEof(false); + } out(frame); return id; } @@ -180,7 +185,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, CompletionTracker::ResultListener l) { - SequenceNumber id = send(command, l); + SequenceNumber id = send(command, l, true); sendContent(content); return id; } @@ -188,14 +193,16 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten void ExecutionHandler::sendContent(const MethodContent& content) { AMQFrame header(0, content.getHeader()); - out(header); - + header.setBof(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ + header.setEof(false); + out(header); //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ AMQFrame frame(0, AMQContentBody(content.getData())); + frame.setBof(false); out(frame); }else{ u_int32_t offset = 0; @@ -204,10 +211,20 @@ void ExecutionHandler::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(0, AMQContentBody(frag)); - out(frame); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } offset += length; remaining = data_length - offset; + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } + out(frame); } } + } else { + out(header); } } |