summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-31 23:28:38 +0000
committerAlan Conway <aconway@apache.org>2007-01-31 23:28:38 +0000
commitd71bc6e3f85c90d5f22d186aceadd1894c55383b (patch)
treed4210b428cd57c1cea4a86db175ac44566bf84dd
parentee4d5ffba8167348ea2751202c6065e8de0fc92c (diff)
downloadqpid-python-d71bc6e3f85c90d5f22d186aceadd1894c55383b.tar.gz
From Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000 Branch for my work on Qpid.0-9 r724@fuschia: andrew | 2007-01-12 00:59:28 +0000 Added in empty implementation of handler class for protocol Message class r768@fuschia: andrew | 2007-01-17 01:25:16 +0000 * Added Test for new MessageHandlerImpl (but no actual tests yet) * Filled in lots of the blanks in the MessageHandlerImpl with code stolen from the BasicHandlerImpl r800@fuschia: andrew | 2007-01-17 17:34:13 +0000 Updated to latest upstream changes r840@fuschia: andrew | 2007-01-19 00:31:59 +0000 Fixed merge errors r841@fuschia: andrew | 2007-01-19 00:47:29 +0000 Another merge problem fixed r878@fuschia: andrew | 2007-01-24 11:27:48 +0000 Started work on the Message class handler implementation r976@fuschia: andrew | 2007-01-30 17:05:05 +0000 Working again after broker Message refactor r980@fuschia: andrew | 2007-01-30 18:39:18 +0000 Fix for extra parameter to transfer r992@fuschia: andrew | 2007-01-31 18:29:57 +0000 Checkpoint of work on broker MessageMessage r1001@fuschia: andrew | 2007-01-31 22:02:27 +0000 MessageMessage work now compiles git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502038 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp49
-rw-r--r--cpp/lib/broker/BrokerMessage.h13
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h181
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp94
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h86
-rw-r--r--cpp/lib/broker/Makefile.am2
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp12
7 files changed, 182 insertions, 255 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index a5192beede..b738040470 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -35,22 +35,23 @@ using namespace qpid::sys;
BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) : publisher(_publisher),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- redelivered(false),
- size(0),
- persistenceId(0) {}
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate),
+ publisher(_publisher),
+ size(0)
+{
+}
BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
- publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+ publisher(0), size(0)
+{
decode(buffer, headersOnly, contentChunkSize);
}
-BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+BasicMessage::BasicMessage() : publisher(0), size(0)
+{
+}
BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
@@ -72,16 +73,13 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void BasicMessage::redeliver(){
- redelivered = true;
-}
-
void BasicMessage::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize,
ProtocolVersion* version){
// CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ out->send(new AMQFrame(*version, channel,
+ new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())));
sendContent(out, channel, framesize, version);
}
@@ -90,9 +88,10 @@ void BasicMessage::sendGetOk(OutputHandler* out,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(*version, channel,
+ new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)));
sendContent(out, channel, framesize, version);
}
@@ -127,8 +126,12 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu
void BasicMessage::decodeHeader(Buffer& buffer)
{
+ string exchange;
+ string routingKey;
+
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
+ setRouting(exchange, routingKey);
u_int32_t headerSize = buffer.getLong();
AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
@@ -166,8 +169,8 @@ void BasicMessage::encode(Buffer& buffer)
void BasicMessage::encodeHeader(Buffer& buffer)
{
- buffer.putShortString(exchange);
- buffer.putShortString(routingKey);
+ buffer.putShortString(getExchange());
+ buffer.putShortString(getRoutingKey());
buffer.putLong(header->size());
header->encode(buffer);
}
@@ -191,8 +194,8 @@ u_int32_t BasicMessage::encodedContentSize()
u_int32_t BasicMessage::encodedHeaderSize()
{
- return exchange.size() + 1
- + routingKey.size() + 1
+ return getExchange().size() + 1
+ + getRoutingKey().size() + 1
+ header->size() + 4;//4 extra bytes for size
}
@@ -204,7 +207,7 @@ u_int64_t BasicMessage::expectedContentSize()
void BasicMessage::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && persistenceId == 0) {
+ if (!isPersistent() && getPersistenceId() == 0) {
store->stage(this);
}
if (!content.get() || content->size() > 0) {
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index d9ab9b7220..4001af97a5 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -48,16 +48,10 @@ namespace qpid {
*/
class BasicMessage : public Message{
const ConnectionToken* const publisher;
- string exchange;
- string routingKey;
- const bool mandatory;
- const bool immediate;
- bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
std::auto_ptr<Content> content;
- u_int64_t size;
- u_int64_t persistenceId;
qpid::sys::Mutex contentLock;
+ u_int64_t size;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
@@ -88,15 +82,10 @@ namespace qpid {
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version);
- void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
bool isPersistent();
- const string& getRoutingKey() const { return routingKey; }
- const string& getExchange() const { return exchange; }
u_int64_t contentSize() const { return size; }
- u_int64_t getPersistenceId() const { return persistenceId; }
- void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
void decodeHeader(qpid::framing::Buffer& buffer);
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index e0139519ae..53fcf66aac 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -48,38 +48,73 @@ namespace qpid {
* TODO; AMS: for the moment this is mostly a placeholder
*/
class Message{
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ u_int64_t persistenceId;
+
+ bool redelivered;
public:
typedef boost::shared_ptr<Message> shared_ptr;
+ Message(const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
+ Message() :
+ mandatory(false),
+ immediate(false),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
virtual ~Message() {};
+ // Accessors
+ const std::string& getRoutingKey() const { return routingKey; }
+ const std::string& getExchange() const { return exchange; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ bool getRedelivered() const { return redelivered; }
+
+ void setRouting(const std::string& _exchange, const std::string& _routingKey)
+ { exchange = _exchange; routingKey = _routingKey; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
+ void redeliver() { redelivered = true; }
+
+ /**
+ * Used to deliver the message from the queue
+ */
virtual void deliver(qpid::framing::OutputHandler* out,
int channel,
const std::string& consumerTag,
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version) = 0;
+ /**
+ * Used to return a message in response to a get from a queue
+ */
virtual void sendGetOk(qpid::framing::OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version) = 0;
- virtual void redeliver() = 0;
virtual bool isComplete() = 0;
virtual u_int64_t contentSize() const = 0;
virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
virtual bool isPersistent() = 0;
- virtual const std::string& getRoutingKey() const = 0;
virtual const ConnectionToken* const getPublisher() = 0;
- virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests?
- virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests?
- virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests?
-
virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
@@ -108,12 +143,6 @@ namespace qpid {
* content size else returns 0.
*/
virtual u_int64_t expectedContentSize() = 0;
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
// TODO: AMS 29/1/2007 Don't think these are really part of base class
@@ -125,140 +154,12 @@ namespace qpid {
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
- };
-
- }
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
-#ifndef _broker_BrokerMessageBase_h
-#define _broker_BrokerMessageBase_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "AMQContentBody.h"
-#include "AMQHeaderBody.h"
-#include "Content.h"
-
-#include <string>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-
- namespace framing {
- class OutputHandler;
- class ProtocolVersion;
- class BasicHeaderProperties;
- }
-
- namespace broker {
-
- class MessageStore;
- class ConnectionToken;
-
- /**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
- class Message{
-
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
-
- virtual ~Message() {};
-
- virtual void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const std::string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version) = 0;
- virtual void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version) = 0;
- virtual void redeliver() = 0;
-
- virtual bool isComplete() = 0;
-
- virtual u_int64_t contentSize() const = 0;
- virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
- virtual bool isPersistent() = 0;
- virtual const std::string& getRoutingKey() const = 0;
- virtual const ConnectionToken* const getPublisher() = 0;
- virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests?
- virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests?
-
- virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests?
-
- virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
- virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedHeaderSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual u_int32_t encodedContentSize() = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual u_int64_t expectedContentSize() = 0;
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
* be reloaded.
*/
virtual void releaseContent(MessageStore* /*store*/) {};
-
- // TODO: AMS 29/1/2007 Don't think these are really part of base class
-
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
- virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
};
}
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
new file mode 100644
index 0000000000..46f583b978
--- /dev/null
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerMessageMessage.h"
+
+using namespace qpid::broker;
+
+MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody& _methodBody,
+ const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate),
+ methodBody(_methodBody)
+{
+}
+
+void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/,
+ int /*channel*/,
+ const std::string& /*consumerTag*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/,
+ qpid::framing::ProtocolVersion* /*version*/)
+{
+}
+
+void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/,
+ int /*channel*/,
+ u_int32_t /*messageCount*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/,
+ qpid::framing::ProtocolVersion* /*version*/)
+{
+}
+
+bool MessageMessage::isComplete()
+{
+ return true;
+}
+
+u_int64_t MessageMessage::contentSize() const
+{
+ return 0;
+}
+
+qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
+{
+ return 0;
+}
+bool MessageMessage::isPersistent()
+{
+ return false;
+}
+
+const ConnectionToken* const MessageMessage::getPublisher()
+{
+ return 0;
+}
+
+u_int32_t MessageMessage::encodedSize()
+{
+ return 0;
+}
+
+u_int32_t MessageMessage::encodedHeaderSize()
+{
+ return 0;
+}
+
+u_int32_t MessageMessage::encodedContentSize()
+{
+ return 0;
+}
+
+u_int64_t MessageMessage::expectedContentSize()
+{
+ return 0;
+}
+
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index f25405db72..b49f60f5df 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -25,12 +25,21 @@
#include "BrokerMessageBase.h"
namespace qpid {
+ namespace framing {
+ class AMQMethodBody;
+ }
+
namespace broker {
class MessageMessage: public Message{
+ const qpid::framing::AMQMethodBody& methodBody;
public:
- ~MessageMessage();
+ MessageMessage(const qpid::framing::AMQMethodBody& methodBody,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ // Default destructor okay
+
void deliver(qpid::framing::OutputHandler* out,
int channel,
const std::string& consumerTag,
@@ -43,88 +52,17 @@ namespace qpid {
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version);
- void redeliver();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
bool isComplete();
- void setContent(std::auto_ptr<Content>& content);
u_int64_t contentSize() const;
qpid::framing::BasicHeaderProperties* getHeaderProperties();
bool isPersistent();
- const std::string& getRoutingKey() const;
const ConnectionToken* const getPublisher();
+ u_int32_t encodedSize();
+ u_int32_t encodedHeaderSize();
u_int32_t encodedContentSize();
u_int64_t expectedContentSize();
- void releaseContent(MessageStore* store);
- };
-
- }
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
-#ifndef _broker_BrokerMessageMessage_h
-#define _broker_BrokerMessageMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "BrokerMessageBase.h"
-
-namespace qpid {
- namespace broker {
- class MessageMessage: public Message{
-
- public:
- ~MessageMessage();
-
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const std::string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- qpid::framing::ProtocolVersion* version);
- void redeliver();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
- bool isComplete();
- void setContent(std::auto_ptr<Content>& content);
-
- u_int64_t contentSize() const;
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- bool isPersistent();
- const std::string& getRoutingKey() const;
- const ConnectionToken* const getPublisher();
-
- u_int32_t encodedContentSize();
- u_int64_t expectedContentSize();
- void releaseContent(MessageStore* store);
};
}
diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am
index 2366069128..064b592124 100644
--- a/cpp/lib/broker/Makefile.am
+++ b/cpp/lib/broker/Makefile.am
@@ -23,6 +23,8 @@ libqpidbroker_la_SOURCES = \
BrokerExchange.h \
BrokerMessage.cpp \
BrokerMessage.h \
+ BrokerMessageMessage.cpp \
+ BrokerMessageMessage.h \
BrokerQueue.cpp \
BrokerQueue.h \
Configuration.cpp \
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 7361d8827a..f04b749996 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -189,14 +189,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool /*immediate*/,
+ bool immediate,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& /*routingKey*/,
+ const string& routingKey,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -208,7 +208,7 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool /*mandatory*/ )
+ bool mandatory )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -216,9 +216,9 @@ MessageHandlerImpl::transfer(const MethodContext& context,
broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
if (body.isInline()) {
-// MessageMessage* msg =
-// new MessageMessage(&connection, exchangeName, routingKey, immediate);
-// channel.handlePublish(msg, exchange);
+ MessageMessage* msg =
+ new MessageMessage(*(context.methodBody), exchangeName, routingKey, mandatory, immediate);
+ channel.handlePublish(msg, exchange);
connection.client->getMessageHandler()->ok(context);
} else {