summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-30 20:07:41 +0000
committerAlan Conway <aconway@apache.org>2007-01-30 20:07:41 +0000
commitf9f848394de0662248cf62d4ec5e4818949403b2 (patch)
tree4f13105e2223b704d7850300116dcc56116acae2
parent98ccae7574a18f8d0a1f9e28e86ccfde4541c81f (diff)
downloadqpid-python-f9f848394de0662248cf62d4ec5e4818949403b2.tar.gz
Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000 Branch for my work on Qpid.0-9 r724@fuschia: andrew | 2007-01-12 00:59:28 +0000 Added in empty implementation of handler class for protocol Message class r768@fuschia: andrew | 2007-01-17 01:25:16 +0000 * Added Test for new MessageHandlerImpl (but no actual tests yet) * Filled in lots of the blanks in the MessageHandlerImpl with code stolen from the BasicHandlerImpl r800@fuschia: andrew | 2007-01-17 17:34:13 +0000 Updated to latest upstream changes r840@fuschia: andrew | 2007-01-19 00:31:59 +0000 Fixed merge errors r841@fuschia: andrew | 2007-01-19 00:47:29 +0000 Another merge problem fixed r878@fuschia: andrew | 2007-01-24 11:27:48 +0000 Started work on the Message class handler implementation r976@fuschia: andrew | 2007-01-30 17:05:05 +0000 Working again after broker Message refactor r980@fuschia: andrew | 2007-01-30 18:39:18 +0000 Fix for extra parameter to transfer git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501534 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp16
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp52
-rw-r--r--cpp/lib/broker/BrokerMessage.h13
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h268
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h134
-rw-r--r--cpp/lib/broker/Connection.cpp2
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp38
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h3
-rw-r--r--cpp/lib/client/Connection.cpp7
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp6
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h2
-rw-r--r--cpp/lib/common/framing/MethodContext.h9
-rw-r--r--cpp/tests/ChannelTest.cpp4
-rw-r--r--cpp/tests/ExchangeTest.cpp2
-rw-r--r--cpp/tests/InProcessBroker.h153
-rw-r--r--cpp/tests/MessageBuilderTest.cpp16
-rw-r--r--cpp/tests/MessageTest.cpp4
-rw-r--r--cpp/tests/QueueTest.cpp12
-rw-r--r--cpp/tests/TxAckTest.cpp2
-rw-r--r--cpp/tests/TxPublishTest.cpp2
20 files changed, 661 insertions, 84 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 5cf767a8e1..abf0b3852d 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -322,7 +322,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte
void
BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
- const MethodContext&,
+ const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
const string& exchangeName,
@@ -337,7 +337,7 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
exchange->unbind(queue, routingKey, &arguments);
- connection.client->getQueue().unbindOk(channel.getId());
+ connection.client->getQueue().unbindOk(context);
}
void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
@@ -420,7 +420,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
@@ -475,16 +475,16 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context)
{
- connection.client->getChannel().ok(channel.getId());
- connection.client->getChannel().pong(channel.getId());
+ connection.client->getChannel().ok(context);
+ connection.client->getChannel().pong(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& )
+BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context)
{
- connection.client->getChannel().ok(channel.getId());
+ connection.client->getChannel().ok(context);
}
void
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 07b14a4eff..a5192beede 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -33,7 +33,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-Message::Message(const ConnectionToken* const _publisher,
+BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate) : publisher(_publisher),
exchange(_exchange),
@@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher,
size(0),
persistenceId(0) {}
-Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
decode(buffer, headersOnly, contentChunkSize);
}
-Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
-Message::~Message(){
+BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
}
-void Message::setHeader(AMQHeaderBody::shared_ptr _header){
+void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
this->header = _header;
}
-void Message::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody::shared_ptr data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){
size += data->size();
}
-bool Message::isComplete(){
+bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void Message::redeliver(){
+void BasicMessage::redeliver(){
redelivered = true;
}
-void Message::deliver(OutputHandler* out, int channel,
+void BasicMessage::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize,
ProtocolVersion* version){
@@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel,
sendContent(out, channel, framesize, version);
}
-void Message::sendGetOk(OutputHandler* out,
+void BasicMessage::sendGetOk(OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
@@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out,
sendContent(out, channel, framesize, version);
}
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
+void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(*version, channel, headerBody));
@@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize,
if (content.get()) content->send(*version, out, channel, framesize);
}
-BasicHeaderProperties* Message::getHeaderProperties(){
+BasicHeaderProperties* BasicMessage::getHeaderProperties(){
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-const ConnectionToken* const Message::getPublisher(){
+const ConnectionToken* const BasicMessage::getPublisher(){
return publisher;
}
-bool Message::isPersistent()
+bool BasicMessage::isPersistent()
{
if(!header) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
+void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
{
decodeHeader(buffer);
if (!headersOnly) decodeContent(buffer, contentChunkSize);
}
-void Message::decodeHeader(Buffer& buffer)
+void BasicMessage::decodeHeader(Buffer& buffer)
{
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
@@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer)
setHeader(headerBody);
}
-void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
+void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
u_int64_t expected = expectedContentSize();
if (expected != buffer.available()) {
@@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
}
}
-void Message::encode(Buffer& buffer)
+void BasicMessage::encode(Buffer& buffer)
{
encodeHeader(buffer);
encodeContent(buffer);
}
-void Message::encodeHeader(Buffer& buffer)
+void BasicMessage::encodeHeader(Buffer& buffer)
{
buffer.putShortString(exchange);
buffer.putShortString(routingKey);
@@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer)
header->encode(buffer);
}
-void Message::encodeContent(Buffer& buffer)
+void BasicMessage::encodeContent(Buffer& buffer)
{
Mutex::ScopedLock locker(contentLock);
if (content.get()) content->encode(buffer);
}
-u_int32_t Message::encodedSize()
+u_int32_t BasicMessage::encodedSize()
{
return encodedHeaderSize() + encodedContentSize();
}
-u_int32_t Message::encodedContentSize()
+u_int32_t BasicMessage::encodedContentSize()
{
Mutex::ScopedLock locker(contentLock);
return content.get() ? content->size() : 0;
}
-u_int32_t Message::encodedHeaderSize()
+u_int32_t BasicMessage::encodedHeaderSize()
{
return exchange.size() + 1
+ routingKey.size() + 1
+ header->size() + 4;//4 extra bytes for size
}
-u_int64_t Message::expectedContentSize()
+u_int64_t BasicMessage::expectedContentSize()
{
return header.get() ? header->getContentSize() : 0;
}
-void Message::releaseContent(MessageStore* store)
+void BasicMessage::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
if (!isPersistent() && persistenceId == 0) {
@@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store)
}
}
-void Message::setContent(std::auto_ptr<Content>& _content)
+void BasicMessage::setContent(std::auto_ptr<Content>& _content)
{
Mutex::ScopedLock locker(contentLock);
content = _content;
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 388bfba51e..d9ab9b7220 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -22,6 +22,7 @@
*
*/
+#include <BrokerMessageBase.h>
#include <memory>
#include <boost/shared_ptr.hpp>
#include <AMQContentBody.h>
@@ -45,7 +46,7 @@ namespace qpid {
* content bodies and some details about the publication
* request.
*/
- class Message{
+ class BasicMessage : public Message{
const ConnectionToken* const publisher;
string exchange;
string routingKey;
@@ -62,14 +63,14 @@ namespace qpid {
int channel, u_int32_t framesize, qpid::framing::ProtocolVersion* version);
public:
- typedef boost::shared_ptr<Message> shared_ptr;
+ typedef boost::shared_ptr<BasicMessage> shared_ptr;
- Message(const ConnectionToken* const publisher,
+ BasicMessage(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
- Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
- Message();
- ~Message();
+ BasicMessage(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
+ BasicMessage();
+ ~BasicMessage();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void addContent(qpid::framing::AMQContentBody::shared_ptr data);
bool isComplete();
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
new file mode 100644
index 0000000000..e0139519ae
--- /dev/null
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -0,0 +1,268 @@
+#ifndef _broker_BrokerMessageBase_h
+#define _broker_BrokerMessageBase_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "Content.h"
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+ namespace framing {
+ class OutputHandler;
+ class ProtocolVersion;
+ class BasicHeaderProperties;
+ }
+
+ namespace broker {
+
+ class MessageStore;
+ class ConnectionToken;
+
+ /**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+ class Message{
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ virtual ~Message() {};
+
+ virtual void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void redeliver() = 0;
+
+ virtual bool isComplete() = 0;
+
+ virtual u_int64_t contentSize() const = 0;
+ virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const std::string& getRoutingKey() const = 0;
+ virtual const ConnectionToken* const getPublisher() = 0;
+ virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests?
+ virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests?
+
+ virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests?
+
+ virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual u_int32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual u_int64_t expectedContentSize() = 0;
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
+
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
+ virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
+#ifndef _broker_BrokerMessageBase_h
+#define _broker_BrokerMessageBase_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "Content.h"
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+ namespace framing {
+ class OutputHandler;
+ class ProtocolVersion;
+ class BasicHeaderProperties;
+ }
+
+ namespace broker {
+
+ class MessageStore;
+ class ConnectionToken;
+
+ /**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+ class Message{
+
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ virtual ~Message() {};
+
+ virtual void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version) = 0;
+ virtual void redeliver() = 0;
+
+ virtual bool isComplete() = 0;
+
+ virtual u_int64_t contentSize() const = 0;
+ virtual qpid::framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const std::string& getRoutingKey() const = 0;
+ virtual const ConnectionToken* const getPublisher() = 0;
+ virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used in tests?
+ virtual const std::string& getExchange() const = 0; // XXXX: Only used in tests?
+
+ virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; // XXXX: Only used in tests?
+
+ virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual u_int32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual u_int32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual u_int64_t expectedContentSize() = 0;
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
+
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr /*header*/) {};
+ virtual void addContent(qpid::framing::AMQContentBody::shared_ptr /*data*/) {};
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
new file mode 100644
index 0000000000..f25405db72
--- /dev/null
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -0,0 +1,134 @@
+#ifndef _broker_BrokerMessageMessage_h
+#define _broker_BrokerMessageMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerMessageBase.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageMessage: public Message{
+
+ public:
+ ~MessageMessage();
+
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void redeliver();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ void setContent(std::auto_ptr<Content>& content);
+
+ u_int64_t contentSize() const;
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const std::string& getRoutingKey() const;
+ const ConnectionToken* const getPublisher();
+
+ u_int32_t encodedContentSize();
+ u_int64_t expectedContentSize();
+ void releaseContent(MessageStore* store);
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
+#ifndef _broker_BrokerMessageMessage_h
+#define _broker_BrokerMessageMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerMessageBase.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageMessage: public Message{
+
+ public:
+ ~MessageMessage();
+
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const std::string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize,
+ qpid::framing::ProtocolVersion* version);
+ void redeliver();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ void setContent(std::auto_ptr<Content>& content);
+
+ u_int64_t contentSize() const;
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ bool isPersistent();
+ const std::string& getRoutingKey() const;
+ const ConnectionToken* const getPublisher();
+
+ u_int32_t encodedContentSize();
+ u_int64_t expectedContentSize();
+ void releaseContent(MessageStore* store);
+ };
+
+ }
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index d34422c93d..0f58278a5a 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -74,7 +74,7 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
string locales("en_US");
// TODO aconway 2007-01-16: Client call, move to adapter.
client->getConnection().start(
- MethodContext(0, &getAdapter(0)),
+ MethodContext(0, 0, &getAdapter(0)),
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
getAdapter(0).init(0, *out, client->getProtocolVersion());
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 33f7a63d45..7361d8827a 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -18,8 +19,11 @@
#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
+#include "FramingContent.h"
#include "Connection.h"
#include "Broker.h"
+#include "BrokerMessageMessage.h"
+
namespace qpid {
namespace broker {
@@ -41,7 +45,7 @@ void
MessageHandlerImpl::cancel( const MethodContext& context,
const string& destination )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
channel.cancel(destination);
@@ -73,10 +77,9 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- Channel& channel = connection.getChannel(channel.getId());
if(!destination.empty() && channel.exists(destination)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -108,7 +111,7 @@ MessageHandlerImpl::get( const MethodContext& context,
const string& /*destination*/,
bool noAck )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channelId);
@@ -146,7 +149,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
u_int16_t prefetchCount,
bool /*global*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
@@ -159,7 +162,7 @@ void
MessageHandlerImpl::recover(const MethodContext&,
bool requeue )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
channel.recover(requeue);
@@ -182,18 +185,18 @@ MessageHandlerImpl::resume(const MethodContext&,
}
void
-MessageHandlerImpl::transfer(const MethodContext&,
+MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /*immediate*/,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /*routingKey*/,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -204,15 +207,24 @@ MessageHandlerImpl::transfer(const MethodContext&,
const string& /*transactionId*/,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+ qpid::framing::Content body,
+ bool /*mandatory*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
Exchange::shared_ptr exchange = exchangeName.empty() ?
broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
- channel.handlePublish(msg, exchange);
+ if (body.isInline()) {
+// MessageMessage* msg =
+// new MessageMessage(&connection, exchangeName, routingKey, immediate);
+// channel.handlePublish(msg, exchange);
+
+ connection.client->getMessageHandler()->ok(context);
+ } else {
+ // Don't handle reference content yet
+ assert(body.isInline());
+ }
}else{
throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
}
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 0eb9e119f5..985efe3847 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -114,7 +114,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
const std::string& transactionId,
const std::string& securityToken,
const framing::FieldTable& applicationHeaders,
- framing::Content body );
+ framing::Content body,
+ bool mandatory );
};
}} // namespace qpid::broker
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index bf6c44570d..78e340eb11 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -46,10 +46,11 @@ Connection::Connection(
bool _debug, u_int32_t _max_frame_size,
const framing::ProtocolVersion& _version
) : version(_version), max_frame_size(_max_frame_size),
- defaultConnector(version, debug, max_frame_size),
- connector(&defaultConnector),
+ defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
-{}
+{
+ setConnector(defaultConnector);
+}
Connection::~Connection(){
close();
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 1fdb8d6691..653e47048e 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -29,7 +29,7 @@ void ChannelAdapter::init(
id = i;
out = &o;
version = v;
- context = MethodContext(id, this);
+ context = MethodContext(0, id, this);
}
void ChannelAdapter::send(AMQFrame* frame) {
@@ -59,7 +59,7 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) {
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
responder.received(request->getData());
- context =MethodContext(id, this, request->getRequestId());
+ context =MethodContext(request.get(), id, this, request->getRequestId());
handleMethodInContext(request, context);
}
@@ -73,7 +73,7 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
assertMethodOk(*method);
- context = MethodContext(id, this);
+ context = MethodContext(method.get(), id, this);
handleMethodInContext(method, context);
}
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index b2a5ef6ff5..f0b3d2469a 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -54,7 +54,7 @@ class ChannelAdapter : public BodyHandler, public OutputHandler {
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter() : context(0), id(0), out(0) {}
+ ChannelAdapter() : context(0, 0), id(0), out(0) {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, const ProtocolVersion&);
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
index 46d2e064b5..54e05f0fb2 100644
--- a/cpp/lib/common/framing/MethodContext.h
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -26,6 +26,7 @@ namespace qpid {
namespace framing {
class BodyHandler;
+class AMQMethodBody;
/**
* Invocation context for an AMQP method.
@@ -46,8 +47,10 @@ struct MethodContext
* will automatically construct the MethodContext.
*/
MethodContext(
+ const AMQMethodBody* method,
ChannelId channel, OutputHandler* output=0, RequestId request=0)
- : channelId(channel), out(output), requestId(request){}
+ : channelId(channel), out(output), requestId(request),
+ methodBody(method) {}
/** \internal Channel on which the method is sent. */
ChannelId channelId;
@@ -60,6 +63,10 @@ struct MethodContext
*/
RequestId requestId;
+ /** \internal This is the Method Body itself
+ * It's useful for passing around instead of unpacking all its parameters
+ */
+ const AMQMethodBody* methodBody;
};
}} // namespace qpid::framing
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index b31ff6a321..760a4d3344 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -221,7 +221,7 @@ class ChannelTest : public CppUnit::TestCase
Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
const string data[] = {"abcde", "fghij", "klmno"};
- Message* msg = new Message(0, "my_exchange", "my_routing_key", false, false);
+ Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
store.expect();
store.stage(msg);
@@ -309,7 +309,7 @@ class ChannelTest : public CppUnit::TestCase
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize)
{
- Message* msg = new Message(0, exchange, routingKey, false, false);
+ BasicMessage* msg = new BasicMessage(0, exchange, routingKey, false, false);
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(contentSize);
msg->setHeader(header);
diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp
index 8fef4ccaac..a31c369fe1 100644
--- a/cpp/tests/ExchangeTest.cpp
+++ b/cpp/tests/ExchangeTest.cpp
@@ -54,7 +54,7 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h
index 4ef352e677..af0f4e84fe 100644
--- a/cpp/tests/InProcessBroker.h
+++ b/cpp/tests/InProcessBroker.h
@@ -151,3 +151,156 @@ std::ostream& operator<<(
}} // namespace qpid::broker
#endif /*!_tests_InProcessBroker_h*/
+#ifndef _tests_InProcessBroker_h
+#define _tests_InProcessBroker_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+#include <algorithm>
+
+#include "framing/AMQFrame.h"
+#include "broker/Broker.h"
+#include "broker/Connection.h"
+#include "client/Connector.h"
+
+namespace qpid {
+namespace broker {
+
+/** Make a copy of a frame body. Inefficient, only intended for tests. */
+// TODO aconway 2007-01-29: from should be const, need to fix
+// AMQPFrame::encode as const.
+framing::AMQFrame copy(framing::AMQFrame& from) {
+ framing::Buffer buffer(from.size());
+ from.encode(buffer);
+ buffer.flip();
+ framing::AMQFrame result;
+ result.decode(buffer);
+ return result;
+}
+
+/**
+ * A broker that implements client::Connector allowing direct
+ * in-process connection of client to broker. Used to write round-trip
+ * tests without requiring an external broker process.
+ *
+ * Also allows you to "snoop" on frames exchanged between client & broker.
+ *
+ * Use as follows:
+ *
+ \code
+ broker::InProcessBroker ibroker(version);
+ client::Connection clientConnection;
+ clientConnection.setConnector(ibroker);
+ clientConnection.open("");
+ ... use as normal
+ \endcode
+ *
+ */
+class InProcessBroker : public client::Connector {
+ public:
+ enum Sender {CLIENT,BROKER};
+ struct Frame : public framing::AMQFrame {
+ Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
+ bool fromBroker() const { return from == BROKER; }
+ bool fromClient() const { return from == CLIENT; }
+
+ template <class MethodType>
+ MethodType* asMethod() {
+ return dynamic_cast<MethodType*>(getBody().get());
+ }
+
+ Sender from;
+ };
+ typedef std::vector<Frame> Conversation;
+
+ InProcessBroker(const framing::ProtocolVersion& ver) :
+ Connector(ver),
+ protocolInit(ver),
+ broker(broker::Broker::create()),
+ brokerOut(BROKER, conversation),
+ brokerConnection(&brokerOut, *broker),
+ clientOut(CLIENT, conversation, &brokerConnection)
+ {}
+
+ void connect(const std::string& /*host*/, int /*port*/) {}
+ void init() { brokerConnection.initiated(&protocolInit); }
+ void close() {}
+
+ /** Client's input handler. */
+ void setInputHandler(framing::InputHandler* handler) {
+ brokerOut.in = handler;
+ }
+
+ /** Called by client to send a frame */
+ void send(framing::AMQFrame* frame) {
+ clientOut.send(frame);
+ }
+
+ /** Entire client-broker conversation is recorded here */
+ Conversation conversation;
+
+ private:
+ /** OutputHandler that forwards data to an InputHandler */
+ struct OutputToInputHandler : public sys::ConnectionOutputHandler {
+ OutputToInputHandler(
+ Sender from_, Conversation& conversation_,
+ framing::InputHandler* ih=0
+ ) : from(from_), conversation(conversation_), in(ih) {}
+
+ void send(framing::AMQFrame* frame) {
+ conversation.push_back(Frame(from, copy(*frame)));
+ in->received(frame);
+ }
+
+ void close() {}
+
+ Sender from;
+ Conversation& conversation;
+ framing::InputHandler* in;
+ };
+
+ framing::ProtocolInitiation protocolInit;
+ Broker::shared_ptr broker;
+ OutputToInputHandler brokerOut;
+ broker::Connection brokerConnection;
+ OutputToInputHandler clientOut;
+};
+
+std::ostream& operator<<(
+ std::ostream& out, const InProcessBroker::Frame& frame)
+{
+ return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
+ static_cast<const framing::AMQFrame&>(frame);
+}
+std::ostream& operator<<(
+ std::ostream& out, const InProcessBroker::Conversation& conv)
+{
+ for (InProcessBroker::Conversation::const_iterator i = conv.begin();
+ i != conv.end(); ++i)
+ {
+ out << *i << std::endl;
+ }
+ return out;
+}
+
+
+}} // namespace qpid::broker
+
+#endif /*!_tests_InProcessBroker_h*/
diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp
index 21f5935218..e84d1df0e7 100644
--- a/cpp/tests/MessageBuilderTest.cpp
+++ b/cpp/tests/MessageBuilderTest.cpp
@@ -74,14 +74,14 @@ class MessageBuilderTest : public CppUnit::TestCase
// Don't hide overloads.
using NullMessageStore::destroy;
- void destroy(Message* msg)
+ void destroy(BasicMessage* msg)
{
CPPUNIT_ASSERT(msg->getPersistenceId());
}
- Message::shared_ptr getRestoredMessage()
+ BasicMessage::shared_ptr getRestoredMessage()
{
- Message::shared_ptr msg(new Message());
+ BasicMessage::shared_ptr msg(new BasicMessage());
if (header) {
header->flip();
msg->decodeHeader(*header);
@@ -116,7 +116,7 @@ class MessageBuilderTest : public CppUnit::TestCase
DummyHandler handler;
MessageBuilder builder(&handler);
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(0);
@@ -133,7 +133,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(7);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -154,7 +154,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -183,7 +183,7 @@ class MessageBuilderTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false));
+ Message::shared_ptr message(new BasicMessage(0, "test", "my_routing_key", false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties());
@@ -200,7 +200,7 @@ class MessageBuilderTest : public CppUnit::TestCase
CPPUNIT_ASSERT(handler.msg);
CPPUNIT_ASSERT_EQUAL(message, handler.msg);
- Message::shared_ptr restored = store.getRestoredMessage();
+ BasicMessage::shared_ptr restored = store.getRestoredMessage();
CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId());
diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp
index 8bb570e598..62249e1f5f 100644
--- a/cpp/tests/MessageTest.cpp
+++ b/cpp/tests/MessageTest.cpp
@@ -52,7 +52,7 @@ class MessageTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false));
+ Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, routingKey, false, false));
AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
header->setContentSize(14);
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
@@ -69,7 +69,7 @@ class MessageTest : public CppUnit::TestCase
msg->encode(buffer);
buffer.flip();
- msg = Message::shared_ptr(new Message(buffer));
+ msg = Message::shared_ptr(new BasicMessage(buffer));
CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp
index 9d655781c1..e156efc507 100644
--- a/cpp/tests/QueueTest.cpp
+++ b/cpp/tests/QueueTest.cpp
@@ -66,9 +66,9 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
//Test basic delivery:
- Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
queue->deliver(msg1);
CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
@@ -123,9 +123,9 @@ class QueueTest : public CppUnit::TestCase
void testDequeue(){
Queue::shared_ptr queue(new Queue("my_queue", true));
- Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr msg1 = Message::shared_ptr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new BasicMessage(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new BasicMessage(0, "e", "C", true, true));
Message::shared_ptr received;
queue->deliver(msg1);
diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp
index 0ffe984ded..a6fe0f1010 100644
--- a/cpp/tests/TxAckTest.cpp
+++ b/cpp/tests/TxAckTest.cpp
@@ -69,7 +69,7 @@ public:
TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
+ Message::shared_ptr msg(new BasicMessage(0, "exchange", "routing_key", false, false));
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp
index 3542e08f45..87658af62e 100644
--- a/cpp/tests/TxPublishTest.cpp
+++ b/cpp/tests/TxPublishTest.cpp
@@ -75,7 +75,7 @@ public:
TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(new Message(0, "exchange", "routing_key", false, false)),
+ msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
op(msg, &xid)
{
msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));