summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ExecutionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp40
1 files changed, 22 insertions, 18 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index abfce4f9d1..6ee6429b6b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -23,31 +23,35 @@
#include "qpid/Exception.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-bool isMessageMethod(AMQMethodBody::shared_ptr method)
+bool isMessageMethod(AMQMethodBody* method)
{
return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
}
-bool isMessageMethod(AMQBody::shared_ptr body)
+bool isMessageMethod(AMQBody* body)
{
- return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method=body->getMethod();
+ return method && isMessageMethod(method);
}
bool isContentFrame(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
uint8_t type = body->type();
return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
}
-bool invoke(AMQBody::shared_ptr body, Invocable* target)
+bool invoke(AMQBody* body, Invocable* target)
{
- return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+ AMQMethodBody* method=body->getMethod();
+ return method && method->invoke(target);
}
ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
@@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
//incoming:
void ExecutionHandler::handle(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
@@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
}
} else {
++incoming.hwm;
- correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ correlation.receive(body->getMethod());
}
}
}
@@ -95,16 +99,15 @@ void ExecutionHandler::flush()
{
//send completion
incoming.lwm = incoming.hwm;
- //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ AMQFrame frame(version, 0, ExecutionFlushBody());
out(frame);
}
-void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
{
//allocate id:
++outgoing.hwm;
@@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
+ command);
out(frame);
}
-void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f, Correlator::Listener g)
{
send(command, f, g);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
- header->setContentSize(data.size());
+ AMQHeaderBody header(BASIC);
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
+ header.setContentSize(data.size());
AMQFrame h(version, 0, header);
out(h);
@@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ AMQFrame frame(version, 0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ AMQFrame frame(version, 0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;