diff options
author | Alan Conway <aconway@apache.org> | 2007-01-31 23:28:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-31 23:28:38 +0000 |
commit | d71bc6e3f85c90d5f22d186aceadd1894c55383b (patch) | |
tree | d4210b428cd57c1cea4a86db175ac44566bf84dd | |
parent | ee4d5ffba8167348ea2751202c6065e8de0fc92c (diff) | |
download | qpid-python-d71bc6e3f85c90d5f22d186aceadd1894c55383b.tar.gz |
From Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000
Branch for my work on Qpid.0-9
r724@fuschia: andrew | 2007-01-12 00:59:28 +0000
Added in empty implementation of handler class for protocol Message class
r768@fuschia: andrew | 2007-01-17 01:25:16 +0000
* Added Test for new MessageHandlerImpl (but no actual tests yet)
* Filled in lots of the blanks in the MessageHandlerImpl with code
stolen from the BasicHandlerImpl
r800@fuschia: andrew | 2007-01-17 17:34:13 +0000
Updated to latest upstream changes
r840@fuschia: andrew | 2007-01-19 00:31:59 +0000
Fixed merge errors
r841@fuschia: andrew | 2007-01-19 00:47:29 +0000
Another merge problem fixed
r878@fuschia: andrew | 2007-01-24 11:27:48 +0000
Started work on the Message class handler implementation
r976@fuschia: andrew | 2007-01-30 17:05:05 +0000
Working again after broker Message refactor
r980@fuschia: andrew | 2007-01-30 18:39:18 +0000
Fix for extra parameter to transfer
r992@fuschia: andrew | 2007-01-31 18:29:57 +0000
Checkpoint of work on broker MessageMessage
r1001@fuschia: andrew | 2007-01-31 22:02:27 +0000
MessageMessage work now compiles
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502038 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 49 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 13 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 181 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 94 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 86 | ||||
-rw-r--r-- | cpp/lib/broker/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 12 |
7 files changed, 182 insertions, 255 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index a5192beede..b738040470 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -35,22 +35,23 @@ using namespace qpid::sys; BasicMessage::BasicMessage(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate) : publisher(_publisher), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - redelivered(false), - size(0), - persistenceId(0) {} + bool _mandatory, bool _immediate) : + Message(_exchange, _routingKey, _mandatory, _immediate), + publisher(_publisher), + size(0) +{ +} BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : - publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + publisher(0), size(0) +{ decode(buffer, headersOnly, contentChunkSize); } -BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} +BasicMessage::BasicMessage() : publisher(0), size(0) +{ +} BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); @@ -72,16 +73,13 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void BasicMessage::redeliver(){ - redelivered = true; -} - void BasicMessage::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, ProtocolVersion* version){ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); + out->send(new AMQFrame(*version, channel, + new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey()))); sendContent(out, channel, framesize, version); } @@ -90,9 +88,10 @@ void BasicMessage::sendGetOk(OutputHandler* out, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize, - ProtocolVersion* version){ - // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(*version, channel, + new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); sendContent(out, channel, framesize, version); } @@ -127,8 +126,12 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu void BasicMessage::decodeHeader(Buffer& buffer) { + string exchange; + string routingKey; + buffer.getShortString(exchange); buffer.getShortString(routingKey); + setRouting(exchange, routingKey); u_int32_t headerSize = buffer.getLong(); AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); @@ -166,8 +169,8 @@ void BasicMessage::encode(Buffer& buffer) void BasicMessage::encodeHeader(Buffer& buffer) { - buffer.putShortString(exchange); - buffer.putShortString(routingKey); + buffer.putShortString(getExchange()); + buffer.putShortString(getRoutingKey()); buffer.putLong(header->size()); header->encode(buffer); } @@ -191,8 +194,8 @@ u_int32_t BasicMessage::encodedContentSize() u_int32_t BasicMessage::encodedHeaderSize() { - return exchange.size() + 1 - + routingKey.size() + 1 + return getExchange().size() + 1 + + getRoutingKey().size() + 1 + header->size() + 4;//4 extra bytes for size } @@ -204,7 +207,7 @@ u_int64_t BasicMessage::expectedContentSize() void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); - if (!isPersistent() && persistenceId == 0) { + if (!isPersistent() && getPersistenceId() == 0) { store->stage(this); } if (!content.get() || content->size() > 0) { diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index d9ab9b7220..4001af97a5 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -48,16 +48,10 @@ namespace qpid { */ class BasicMessage : public Message{ const ConnectionToken* const publisher; - string exchange; - string routingKey; - const bool mandatory; - const bool immediate; - bool redelivered; qpid::framing::AMQHeaderBody::shared_ptr header; std::auto_ptr<Content> content; - u_int64_t size; - u_int64_t persistenceId; qpid::sys::Mutex contentLock; + u_int64_t size; void sendContent(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version); @@ -88,15 +82,10 @@ namespace qpid { u_int64_t deliveryTag, u_int32_t framesize, qpid::framing::ProtocolVersion* version); - void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); bool isPersistent(); - const string& getRoutingKey() const { return routingKey; } - const string& getExchange() const { return exchange; } u_int64_t contentSize() const { return size; } - u_int64_t getPersistenceId() const { return persistenceId; } - void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); void decodeHeader(qpid::framing::Buffer& buffer); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index e0139519ae..53fcf66aac 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -48,38 +48,73 @@ namespace qpid { * TODO; AMS: for the moment this is mostly a placeholder */ class Message{ + std::string exchange; + std::string routingKey; + const bool mandatory; + const bool immediate; + u_int64_t persistenceId; + + bool redelivered; public: typedef boost::shared_ptr<Message> shared_ptr; + Message(const std::string& _exchange, const std::string& _routingKey, + bool _mandatory, bool _immediate) : + exchange(_exchange), + routingKey(_routingKey), + mandatory(_mandatory), + immediate(_immediate), + persistenceId(0), + redelivered(false) + {} + + Message() : + mandatory(false), + immediate(false), + persistenceId(0), + redelivered(false) + {} + virtual ~Message() {}; + // Accessors + const std::string& getRoutingKey() const { return routingKey; } + const std::string& getExchange() const { return exchange; } + u_int64_t getPersistenceId() const { return persistenceId; } + bool getRedelivered() const { return redelivered; } + + void setRouting(const std::string& _exchange, const std::string& _routingKey) + { exchange = _exchange; routingKey = _routingKey; } + void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests? + void redeliver() { redelivered = true; } + + /** + * Used to deliver the message from the queue + */ virtual void deliver(qpid::framing::OutputHandler* out, int channel, const std::string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, qpid::framing::ProtocolVersion* version) = 0; + /** + * Used to return a message in response to a get from a queue + */ virtual void sendGetOk(qpid::framing::OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize, qpid::framing::ProtocolVersion* version) = 0; - virtual void redeliver() = 0; virtual bool isComplete() = 0; virtual u_int64_t contentSize() const = 0; virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; virtual bool isPersistent() = 0; - virtual const std::string& getRoutingKey() const = 0; virtual const ConnectionToken* const getPublisher() = 0; - virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? - virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? - virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? - virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? @@ -108,12 +143,6 @@ namespace qpid { * content size else returns 0. */ virtual u_int64_t expectedContentSize() = 0; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - virtual void releaseContent(MessageStore* /*store*/) {}; // TODO: AMS 29/1/2007 Don't think these are really part of base class @@ -125,140 +154,12 @@ namespace qpid { virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; - }; - - } -} - - -#endif /*!_broker_BrokerMessage_h*/ -#ifndef _broker_BrokerMessageBase_h -#define _broker_BrokerMessageBase_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "AMQContentBody.h" -#include "AMQHeaderBody.h" -#include "Content.h" - -#include <string> -#include <boost/shared_ptr.hpp> - -namespace qpid { - - namespace framing { - class OutputHandler; - class ProtocolVersion; - class BasicHeaderProperties; - } - - namespace broker { - - class MessageStore; - class ConnectionToken; - - /** - * Base class for all types of internal broker messages - * abstracting away the operations - * TODO; AMS: for the moment this is mostly a placeholder - */ - class Message{ - - public: - typedef boost::shared_ptr<Message> shared_ptr; - - virtual ~Message() {}; - - virtual void deliver(qpid::framing::OutputHandler* out, - int channel, - const std::string& consumerTag, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version) = 0; - virtual void sendGetOk(qpid::framing::OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version) = 0; - virtual void redeliver() = 0; - - virtual bool isComplete() = 0; - - virtual u_int64_t contentSize() const = 0; - virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0; - virtual bool isPersistent() = 0; - virtual const std::string& getRoutingKey() const = 0; - virtual const ConnectionToken* const getPublisher() = 0; - virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests? - virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests? - - virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests? - - virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - * - * XXXX: Only used in tests? - */ - virtual u_int32_t encodedSize() = 0; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - * - * XXXX: Only used in tests? - */ - virtual u_int32_t encodedHeaderSize() = 0; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - virtual u_int32_t encodedContentSize() = 0; - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - virtual u_int64_t expectedContentSize() = 0; /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can * be reloaded. */ virtual void releaseContent(MessageStore* /*store*/) {}; - - // TODO: AMS 29/1/2007 Don't think these are really part of base class - - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {}; - virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {}; }; } diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp new file mode 100644 index 0000000000..46f583b978 --- /dev/null +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "BrokerMessageMessage.h" + +using namespace qpid::broker; + +MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody& _methodBody, + const std::string& _exchange, const std::string& _routingKey, + bool _mandatory, bool _immediate) : + Message(_exchange, _routingKey, _mandatory, _immediate), + methodBody(_methodBody) +{ +} + +void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/, + int /*channel*/, + const std::string& /*consumerTag*/, + u_int64_t /*deliveryTag*/, + u_int32_t /*framesize*/, + qpid::framing::ProtocolVersion* /*version*/) +{ +} + +void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/, + int /*channel*/, + u_int32_t /*messageCount*/, + u_int64_t /*deliveryTag*/, + u_int32_t /*framesize*/, + qpid::framing::ProtocolVersion* /*version*/) +{ +} + +bool MessageMessage::isComplete() +{ + return true; +} + +u_int64_t MessageMessage::contentSize() const +{ + return 0; +} + +qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() +{ + return 0; +} +bool MessageMessage::isPersistent() +{ + return false; +} + +const ConnectionToken* const MessageMessage::getPublisher() +{ + return 0; +} + +u_int32_t MessageMessage::encodedSize() +{ + return 0; +} + +u_int32_t MessageMessage::encodedHeaderSize() +{ + return 0; +} + +u_int32_t MessageMessage::encodedContentSize() +{ + return 0; +} + +u_int64_t MessageMessage::expectedContentSize() +{ + return 0; +} + diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index f25405db72..b49f60f5df 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -25,12 +25,21 @@ #include "BrokerMessageBase.h" namespace qpid { + namespace framing { + class AMQMethodBody; + } + namespace broker { class MessageMessage: public Message{ + const qpid::framing::AMQMethodBody& methodBody; public: - ~MessageMessage(); + MessageMessage(const qpid::framing::AMQMethodBody& methodBody, + const std::string& exchange, const std::string& routingKey, + bool mandatory, bool immediate); + // Default destructor okay + void deliver(qpid::framing::OutputHandler* out, int channel, const std::string& consumerTag, @@ -43,88 +52,17 @@ namespace qpid { u_int64_t deliveryTag, u_int32_t framesize, qpid::framing::ProtocolVersion* version); - void redeliver(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr data); bool isComplete(); - void setContent(std::auto_ptr<Content>& content); u_int64_t contentSize() const; qpid::framing::BasicHeaderProperties* getHeaderProperties(); bool isPersistent(); - const std::string& getRoutingKey() const; const ConnectionToken* const getPublisher(); + u_int32_t encodedSize(); + u_int32_t encodedHeaderSize(); u_int32_t encodedContentSize(); u_int64_t expectedContentSize(); - void releaseContent(MessageStore* store); - }; - - } -} - - -#endif /*!_broker_BrokerMessage_h*/ -#ifndef _broker_BrokerMessageMessage_h -#define _broker_BrokerMessageMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "BrokerMessageBase.h" - -namespace qpid { - namespace broker { - class MessageMessage: public Message{ - - public: - ~MessageMessage(); - - void deliver(qpid::framing::OutputHandler* out, - int channel, - const std::string& consumerTag, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version); - void sendGetOk(qpid::framing::OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize, - qpid::framing::ProtocolVersion* version); - void redeliver(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr data); - bool isComplete(); - void setContent(std::auto_ptr<Content>& content); - - u_int64_t contentSize() const; - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - bool isPersistent(); - const std::string& getRoutingKey() const; - const ConnectionToken* const getPublisher(); - - u_int32_t encodedContentSize(); - u_int64_t expectedContentSize(); - void releaseContent(MessageStore* store); }; } diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 2366069128..064b592124 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -23,6 +23,8 @@ libqpidbroker_la_SOURCES = \ BrokerExchange.h \ BrokerMessage.cpp \ BrokerMessage.h \ + BrokerMessageMessage.cpp \ + BrokerMessageMessage.h \ BrokerQueue.cpp \ BrokerQueue.h \ Configuration.cpp \ diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 7361d8827a..f04b749996 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -189,14 +189,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool /*immediate*/, + bool immediate, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& /*routingKey*/, + const string& routingKey, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -208,7 +208,7 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool /*mandatory*/ ) + bool mandatory ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature @@ -216,9 +216,9 @@ MessageHandlerImpl::transfer(const MethodContext& context, broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ if (body.isInline()) { -// MessageMessage* msg = -// new MessageMessage(&connection, exchangeName, routingKey, immediate); -// channel.handlePublish(msg, exchange); + MessageMessage* msg = + new MessageMessage(*(context.methodBody), exchangeName, routingKey, mandatory, immediate); + channel.handlePublish(msg, exchange); connection.client->getMessageHandler()->ok(context); } else { |