diff options
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index abfce4f9d1..6ee6429b6b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -23,31 +23,35 @@ #include "qpid/Exception.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; using namespace boost; -bool isMessageMethod(AMQMethodBody::shared_ptr method) +bool isMessageMethod(AMQMethodBody* method) { return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>(); } -bool isMessageMethod(AMQBody::shared_ptr body) +bool isMessageMethod(AMQBody* body) { - return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body)); + AMQMethodBody* method=body->getMethod(); + return method && isMessageMethod(method); } bool isContentFrame(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); uint8_t type = body->type(); return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); } -bool invoke(AMQBody::shared_ptr body, Invocable* target) +bool invoke(AMQBody* body, Invocable* target) { - return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target); + AMQMethodBody* method=body->getMethod(); + return method && method->invoke(target); } ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : @@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : //incoming: void ExecutionHandler::handle(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (!invoke(body, this)) { if (isContentFrame(frame)) { if (!arriving) { @@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame) } } else { ++incoming.hwm; - correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body)); + correlation.receive(body->getMethod()); } } } @@ -95,16 +99,15 @@ void ExecutionHandler::flush() { //send completion incoming.lwm = incoming.hwm; - //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } void ExecutionHandler::sendFlush() { - AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version))); + AMQFrame frame(version, 0, ExecutionFlushBody()); out(frame); } -void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g) +void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g) { //allocate id: ++outgoing.hwm; @@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List correlation.listen(g); } - AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command); + AMQFrame frame(version, 0/*id will be filled in be channel handler*/, + command); out(frame); } -void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data, +void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, CompletionTracker::Listener f, Correlator::Listener g) { send(command, f, g); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers); - header->setContentSize(data.size()); + AMQHeaderBody header(BASIC); + BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers); + header.setContentSize(data.size()); AMQFrame h(version, 0, header); out(h); @@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data))); + AMQFrame frame(version, 0, AMQContentBody(data)); out(frame); }else{ u_int32_t offset = 0; @@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag))); + AMQFrame frame(version, 0, AMQContentBody(frag)); out(frame); offset += length; remaining = data_length - offset; |