summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am28
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.h7
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp19
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h3
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp52
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h25
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.h2
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp299
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h146
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h168
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp328
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h90
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h3
-rw-r--r--cpp/src/qpid/broker/CompletionHandler.h39
-rw-r--r--cpp/src/qpid/broker/Consumer.h2
-rw-r--r--cpp/src/qpid/broker/Content.h64
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h1
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h1
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h1
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp70
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp68
-rw-r--r--cpp/src/qpid/broker/Message.cpp195
-rw-r--r--cpp/src/qpid/broker/Message.h139
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h108
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp85
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h38
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp140
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.h60
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp25
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h7
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h1
-rw-r--r--cpp/src/qpid/broker/NameGenerator.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h1
-rw-r--r--cpp/src/qpid/broker/PersistableExchange.h2
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h9
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h2
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h2
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp23
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.h4
-rw-r--r--cpp/src/qpid/broker/Reference.cpp53
-rw-r--r--cpp/src/qpid/broker/Reference.h115
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp198
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h12
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h1
-rw-r--r--cpp/src/qpid/broker/TxPublish.h4
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp8
-rw-r--r--cpp/src/qpid/client/ClientChannel.h2
-rw-r--r--cpp/src/qpid/client/ClientMessage.h16
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.cpp2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp21
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h6
-rw-r--r--cpp/src/qpid/client/ReceivedContent.cpp105
-rw-r--r--cpp/src/qpid/client/ReceivedContent.h75
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp2
-rw-r--r--cpp/src/qpid/client/SessionCore.h4
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp3
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp2
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.h1
-rw-r--r--cpp/src/qpid/framing/AMQDataBlock.h2
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp9
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h19
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.cpp68
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h95
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h1
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.cpp35
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h10
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp2
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp83
-rw-r--r--cpp/src/qpid/framing/FrameSet.h102
-rw-r--r--cpp/src/qpid/framing/ProtocolInitiation.cpp2
-rw-r--r--cpp/src/qpid/framing/ProtocolInitiation.h2
-rw-r--r--cpp/src/qpid/framing/SendContent.cpp51
-rw-r--r--cpp/src/qpid/framing/SendContent.h (renamed from cpp/src/qpid/broker/LazyLoadedContent.h)49
-rw-r--r--cpp/src/qpid/framing/TypeFilter.h (renamed from cpp/src/qpid/broker/InMemoryContent.h)42
-rw-r--r--cpp/src/qpid/framing/frame_functors.h108
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp97
-rw-r--r--cpp/src/tests/Cluster.cpp8
-rw-r--r--cpp/src/tests/Cluster_child.cpp2
-rw-r--r--cpp/src/tests/ExchangeTest.cpp3
-rw-r--r--cpp/src/tests/FramingTest.cpp5
-rw-r--r--cpp/src/tests/HeaderTest.cpp27
-rw-r--r--cpp/src/tests/InMemoryContentTest.cpp91
-rw-r--r--cpp/src/tests/LazyLoadedContentTest.cpp113
-rw-r--r--cpp/src/tests/Makefile.am4
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp284
-rw-r--r--cpp/src/tests/MessageTest.cpp65
-rw-r--r--cpp/src/tests/MessageUtils.h53
-rw-r--r--cpp/src/tests/QueueTest.cpp9
-rw-r--r--cpp/src/tests/ReferenceTest.cpp94
-rw-r--r--cpp/src/tests/TxAckTest.cpp12
-rw-r--r--cpp/src/tests/TxPublishTest.cpp7
98 files changed, 1840 insertions, 2621 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 387d4dce91..f1fee3e61d 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -111,10 +111,12 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Buffer.cpp \
qpid/framing/FieldTable.cpp \
qpid/framing/FramingContent.cpp \
+ qpid/framing/FrameSet.cpp \
qpid/framing/InitiationHandler.cpp \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
qpid/framing/ProtocolVersionException.cpp \
+ qpid/framing/SendContent.cpp \
qpid/framing/SequenceNumber.cpp \
qpid/framing/SequenceNumberSet.cpp \
qpid/framing/Value.cpp \
@@ -159,8 +161,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/BrokerChannel.cpp \
qpid/broker/BrokerExchange.cpp \
- qpid/broker/BrokerMessage.cpp \
- qpid/broker/BrokerMessageMessage.cpp \
qpid/broker/BrokerQueue.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionAdapter.cpp \
@@ -178,9 +178,9 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
- qpid/broker/InMemoryContent.cpp \
- qpid/broker/LazyLoadedContent.cpp \
+ qpid/broker/Message.cpp \
qpid/broker/MessageBuilder.cpp \
+ qpid/broker/MessageDelivery.cpp \
qpid/broker/MessageHandlerImpl.cpp \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/NameGenerator.cpp \
@@ -191,7 +191,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
- qpid/broker/Reference.cpp \
qpid/broker/SessionState.h \
qpid/broker/SuspendedSessions.h \
qpid/broker/SuspendedSessions.cpp \
@@ -222,7 +221,6 @@ libqpidclient_la_SOURCES = \
qpid/client/FutureCompletion.cpp \
qpid/client/FutureResponse.cpp \
qpid/client/FutureFactory.cpp \
- qpid/client/ReceivedContent.cpp \
qpid/client/SessionCore.cpp \
qpid/client/StateManager.cpp
@@ -232,10 +230,7 @@ nobase_include_HEADERS = \
qpid/broker/AccumulatedAck.h \
qpid/broker/BrokerChannel.h \
qpid/broker/BrokerExchange.h \
- qpid/broker/BrokerMessage.h \
- qpid/broker/BrokerMessageBase.h \
qpid/broker/BrokerQueue.h \
- qpid/broker/CompletionHandler.h \
qpid/broker/Consumer.h \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
@@ -252,8 +247,10 @@ nobase_include_HEADERS = \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
qpid/broker/HandlerImpl.h \
- qpid/broker/InMemoryContent.h \
+ qpid/broker/Message.h \
+ qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
+ qpid/broker/MessageDelivery.h \
qpid/broker/MessageHandlerImpl.h \
qpid/broker/MessageStoreModule.h \
qpid/broker/NameGenerator.h \
@@ -269,23 +266,19 @@ nobase_include_HEADERS = \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveredEnqueue.h \
qpid/broker/RecoveredDequeue.h \
- qpid/broker/Reference.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
qpid/broker/TxPublish.h \
qpid/broker/Broker.h \
qpid/broker/BrokerAdapter.h \
- qpid/broker/BrokerMessageMessage.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Connection.h \
qpid/broker/ConnectionAdapter.h \
qpid/broker/ConnectionFactory.h \
qpid/broker/ConnectionToken.h \
- qpid/broker/Content.h \
qpid/broker/Daemon.h \
qpid/broker/DeliveryRecord.h \
qpid/broker/HeadersExchange.h \
- qpid/broker/LazyLoadedContent.h \
qpid/broker/MessageStore.h \
qpid/broker/PersistableExchange.h \
qpid/broker/PersistableMessage.h \
@@ -316,7 +309,6 @@ nobase_include_HEADERS = \
qpid/client/FutureCompletion.h \
qpid/client/FutureResponse.h \
qpid/client/FutureFactory.h \
- qpid/client/ReceivedContent.h \
qpid/client/Response.h \
qpid/client/SessionCore.h \
qpid/client/StateManager.h \
@@ -334,6 +326,8 @@ nobase_include_HEADERS = \
qpid/framing/FieldTable.h \
qpid/framing/FrameDefaultVisitor.h \
qpid/framing/FramingContent.h \
+ qpid/framing/FrameSet.h \
+ qpid/framing/frame_functors.h \
qpid/framing/HeaderProperties.h \
qpid/framing/InitiationHandler.h \
qpid/framing/InputHandler.h \
@@ -343,10 +337,12 @@ nobase_include_HEADERS = \
qpid/framing/ProtocolVersion.h \
qpid/framing/ProtocolVersionException.h \
qpid/framing/Proxy.h \
- qpid/framing/SerializeHandler.h \
+ qpid/framing/SendContent.h \
qpid/framing/SequenceNumber.h \
qpid/framing/SequenceNumberSet.h \
+ qpid/framing/SerializeHandler.h \
qpid/framing/StructHelper.h \
+ qpid/framing/TypeFilter.h \
qpid/framing/Value.h \
qpid/framing/Visitor.h \
qpid/framing/Uuid.h \
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h
index be01c5e02c..b53f4a8ba5 100644
--- a/cpp/src/qpid/broker/AccumulatedAck.h
+++ b/cpp/src/qpid/broker/AccumulatedAck.h
@@ -48,13 +48,12 @@ namespace qpid {
class AccumulatedAck {
public:
/**
- * If not zero, then everything up to this value has been
- * acked.
+ * Everything up to this value has been acked.
*/
DeliveryId mark;
/**
- * List of individually acked messages that are not
- * included in the range marked by 'range'.
+ * List of individually acked messages greater than the
+ * 'mark'.
*/
std::list<Range> ranges;
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index b733f77390..07b7b4f638 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -21,6 +21,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "DeliveryToken.h"
+#include "MessageDelivery.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
@@ -327,7 +328,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//need to generate name here, so we have it for the adapter (it is
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -340,21 +341,9 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
channel.cancel(consumerTag);
}
-void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool rejectUnroutable, bool immediate)
-{
-
- // exeption moved to ChannelAdaptor -- TODO this code should be removed once basic is removed
-
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate);
- channel.handlePublish(msg);
-
-}
-
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
- DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
if(!channel.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -384,7 +373,7 @@ void BrokerAdapter::TxHandlerImpl::select()
void BrokerAdapter::TxHandlerImpl::commit()
{
- channel.commit();
+ channel.commit(&broker.getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 99b7f14525..9e0cf64b7f 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -183,9 +183,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
bool noLocal, bool noAck, bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
void cancel(const std::string& consumerTag);
- void publish(uint16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool rejectUnroutable, bool immediate);
void get(uint16_t ticket, const std::string& queue, bool noAck);
void ack(uint64_t deliveryTag, bool multiple);
void reject(uint64_t deliveryTag, bool requeue);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 9712b3903f..615a26beab 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -32,13 +32,12 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
-#include "BrokerMessage.h"
#include "BrokerQueue.h"
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
#include "DtxTimeout.h"
-#include "MessageStore.h"
+#include "Message.h"
#include "TxAck.h"
#include "TxPublish.h"
@@ -49,7 +48,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) :
+Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) :
id(_id),
connection(con),
out(_out),
@@ -58,8 +57,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS
tagGenerator("sgen"),
dtxSelected(false),
accumulatedAck(0),
- store(_store),
- messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
flowActive(true)
{
@@ -108,7 +105,7 @@ void Channel::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Channel::commit()
+void Channel::commit(MessageStore* const store)
{
if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
@@ -296,34 +293,7 @@ void Channel::ConsumerImpl::requestDispatch()
queue->requestDispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
- complete(msg);
-}
-
-void Channel::handlePublish(Message* _message)
-{
- Message::shared_ptr message(_message);
- messageBuilder.initialise(message);
-}
-
-void Channel::handleHeader(AMQHeaderBody* header)
-{
- messageBuilder.setHeader(header);
- //at this point, decide based on the size of the message whether we want
- //to stage it by saving content directly to disk as it arrives
-}
-
-void Channel::handleContent(AMQContentBody* content)
-{
- messageBuilder.addContent(content);
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody*) {
- // TODO aconway 2007-01-17: Implement heartbeating.
-}
-
-void Channel::complete(Message::shared_ptr msg) {
+void Channel::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
@@ -335,20 +305,12 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
-
-
void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
-
- std::string routeToExchangeName = msg->getExchange();
- // cache the exchange lookup
- if (!cacheExchange.get() || cacheExchangeName != routeToExchangeName){
- cacheExchangeName = routeToExchangeName;
- cacheExchange = connection.broker.getExchanges().get(routeToExchangeName);
+ std::string exchangeName = msg->getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName){
+ cacheExchange = connection.broker.getExchanges().get(exchangeName);
}
- if (!cacheExchange.get() )
- throw ChannelException(404, "Exchange not found '" + routeToExchangeName + "'");
-
cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
if (!strategy.delivered) {
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index fcfcd73679..cdbab37ebc 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -37,14 +37,12 @@
#include "Deliverable.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
-#include "MessageBuilder.h"
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelOpenBody.h"
-#include "CompletionHandler.h"
namespace qpid {
namespace broker {
@@ -60,7 +58,7 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : public CompletionHandler
+class Channel
{
class ConsumerImpl : public Consumer
{
@@ -113,25 +111,22 @@ class Channel : public CompletionHandler
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
AccumulatedAck accumulatedAck;
- MessageStore* const store;
- MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
- std::string cacheExchangeName; // pair holds last exchange used for routing
- Exchange::shared_ptr cacheExchange;
+ boost::shared_ptr<Exchange> cacheExchange;
void route(Message::shared_ptr msg, Deliverable& strategy);
- void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void acknowledged(const DeliveryRecord&);
-
+
+
public:
- Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0);
+ Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id);
~Channel();
bool isOpen() const { return opened; }
@@ -162,7 +157,7 @@ class Channel : public CompletionHandler
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
- void commit();
+ void commit(MessageStore* const store);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
@@ -174,12 +169,8 @@ class Channel : public CompletionHandler
void recover(bool requeue);
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
- void handlePublish(Message* msg);
- void handleHeader(framing::AMQHeaderBody*);
- void handleContent(framing::AMQContentBody*);
- void handleHeartbeat(framing::AMQHeartbeatBody*);
-
- void handleInlineTransfer(Message::shared_ptr msg);
+
+ void handle(Message::shared_ptr msg);
};
}} // namespace broker
diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h
index 91c295e1b7..c3dd7b998d 100644
--- a/cpp/src/qpid/broker/BrokerExchange.h
+++ b/cpp/src/qpid/broker/BrokerExchange.h
@@ -51,7 +51,7 @@ namespace qpid {
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
virtual ~Exchange(){}
- string getName() const { return name; }
+ const string& getName() const { return name; }
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
deleted file mode 100644
index bddd5802cf..0000000000
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/cast.hpp>
-
-#include "BrokerMessage.h"
-#include <iostream>
-
-#include "InMemoryContent.h"
-#include "LazyLoadedContent.h"
-#include "MessageStore.h"
-#include "BrokerQueue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/BasicPublishBody.h"
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "RecoveryManagerImpl.h"
-
-namespace qpid{
-namespace broker{
-
-struct BasicGetToken : DeliveryToken
-{
- typedef boost::shared_ptr<BasicGetToken> shared_ptr;
-
- Queue::shared_ptr queue;
-
- BasicGetToken(Queue::shared_ptr q) : queue(q) {}
-};
-
-struct BasicConsumeToken : DeliveryToken
-{
- typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
-
- const string consumer;
-
- BasicConsumeToken(const string c) : consumer(c) {}
-};
-
-}
-}
-
-using namespace boost;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-BasicMessage::BasicMessage(
- const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate
-) :
- Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
- size(0)
-{}
-
-// For tests only.
-BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
-
-BasicMessage::~BasicMessage(){}
-
-void BasicMessage::setHeader(AMQHeaderBody* _header){
- if (_header) {
- this->header = *_header;
- isHeaderSet = true;
- }
- else
- isHeaderSet = false;
-}
-
-void BasicMessage::addContent(AMQContentBody* data){
- if (!content.get()) {
- content = std::auto_ptr<Content>(new InMemoryContent());
- }
- content->add(data);
- size += data->size();
-}
-
-bool BasicMessage::isComplete(){
- return isHeaderSet && (header.getContentSize() == contentSize());
-}
-
-DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
-{
- return DeliveryToken::shared_ptr(new BasicGetToken(queue));
-}
-
-DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer)
-{
- return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
-}
-
-void BasicMessage::deliver(ChannelAdapter& channel,
- const string& consumerTag, DeliveryId id,
- uint32_t framesize)
-{
- channel.send(BasicDeliverBody(
- channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey()));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::sendGetOk(ChannelAdapter& channel,
- uint32_t messageCount,
- DeliveryId id,
- uint32_t framesize)
-{
- channel.send(
- BasicGetOkBody(
- channel.getVersion(),
- id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize)
-{
- BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
- if (consume) {
- deliver(channel, consume->consumer, id, framesize);
- } else {
- BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
- if (get) {
- sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize);
- } else {
- //TODO:
- //either need to be able to convert to a message transfer or
- //throw error of some kind to allow this to be handled higher up
- throw Exception("Conversion to BasicMessage not defined!");
- }
- }
-}
-
-void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
-{
- channel.send(header);
- Mutex::ScopedLock locker(contentLock);
- if (content.get())
- content->send(channel, framesize);
-}
-
-BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
-}
-
-const FieldTable& BasicMessage::getApplicationHeaders(){
- return getHeaderProperties()->getHeaders();
-}
-
-bool BasicMessage::isPersistent()
-{
- if(!isHeaderSet) return false;
- BasicHeaderProperties* props = getHeaderProperties();
- return props && props->getDeliveryMode() == PERSISTENT;
-}
-
-void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize)
-{
- decodeHeader(buffer);
- if (!headersOnly) decodeContent(buffer, contentChunkSize);
-}
-
-void BasicMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
-
- string exchange;
- string routingKey;
-
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
- setRouting(exchange, routingKey);
-
- uint32_t headerSize = buffer.getLong();
- AMQHeaderBody headerBody;
- headerBody.decode(buffer, headerSize);
- setHeader(&headerBody);
-}
-
-void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
-{
- uint64_t expected = expectedContentSize();
- if (expected != buffer.available()) {
- QPID_LOG(error, "Expected " << expectedContentSize() << " bytes, got " << buffer.available());
- throw Exception("Cannot decode content, buffer not large enough.");
- }
-
- if (!chunkSize || chunkSize > expected) {
- chunkSize = expected;
- }
-
- uint64_t total = 0;
- while (total < expectedContentSize()) {
- uint64_t remaining = expected - total;
- AMQContentBody contentBody;
- contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(&contentBody);
- total += chunkSize;
- }
-}
-
-void BasicMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
- encodeContent(buffer);
-}
-
-void BasicMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- buffer.putShortString(getExchange());
- buffer.putShortString(getRoutingKey());
- buffer.putLong(header.size());
- header.encode(buffer);
-}
-
-void BasicMessage::encodeContent(Buffer& buffer) const
-{
- Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->encode(buffer);
-}
-
-uint32_t BasicMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t BasicMessage::encodedContentSize() const
-{
- Mutex::ScopedLock locker(contentLock);
- return content.get() ? content->size() : 0;
-}
-
-uint32_t BasicMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize()
- +getExchange().size() + 1
- + getRoutingKey().size() + 1
- + header.size() + 4;//4 extra bytes for size
-}
-
-uint64_t BasicMessage::expectedContentSize()
-{
- return isHeaderSet ? header.getContentSize() : 0;
-}
-
-void BasicMessage::releaseContent(MessageStore* store)
-{
- Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && getPersistenceId() == 0) {
- store->stage(*this);
- }
- if (!content.get() || content->size() > 0) {
- //set content to lazy loading mode (but only if there is
- //stored content):
-
- //Note: the LazyLoadedContent instance contains a raw pointer
- //to the message, however it is then set as a member of that
- //message so its lifetime is guaranteed to be no longer than
- //that of the message itself
- content = std::auto_ptr<Content>(
- new LazyLoadedContent(store, this, expectedContentSize()));
- }
-}
-
-void BasicMessage::setContent(std::auto_ptr<Content>& _content)
-{
- Mutex::ScopedLock locker(contentLock);
- content = _content;
-}
-
-
-uint32_t BasicMessage::getRequiredCredit() const
-{
- return header.size() + contentSize();
-}
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
deleted file mode 100644
index 0f46ff2e83..0000000000
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ /dev/null
@@ -1,146 +0,0 @@
-#ifndef _broker_BrokerMessage_h
-#define _broker_BrokerMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <boost/shared_ptr.hpp>
-
-#include "BrokerMessageBase.h"
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "ConnectionToken.h"
-#include "Content.h"
-#include "qpid/sys/Mutex.h"
-#include "TxBuffer.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-class AMQHeaderBody;
-}
-
-namespace broker {
-
-class MessageStore;
-class Queue;
-using framing::string;
-
-/**
- * Represents an AMQP message, i.e. a header body, a list of
- * content bodies and some details about the publication
- * request.
- */
-class BasicMessage : public Message {
- framing::AMQHeaderBody header;
- bool isHeaderSet;
- std::auto_ptr<Content> content;
- mutable sys::Mutex contentLock;
- uint64_t size;
-
- void sendContent(framing::ChannelAdapter&, uint32_t framesize);
-
- public:
- typedef boost::shared_ptr<BasicMessage> shared_ptr;
-
- BasicMessage(const ConnectionToken* const publisher,
- const string& exchange, const string& routingKey,
- bool mandatory, bool immediate);
- BasicMessage();
- ~BasicMessage();
- void setHeader(framing::AMQHeaderBody* header);
- void addContent(framing::AMQContentBody* data);
- bool isComplete();
-
- static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
- static DeliveryToken::shared_ptr createConsumeToken(const string& consumer);
- void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
-
- void deliver(framing::ChannelAdapter&,
- const string& consumerTag,
- DeliveryId deliveryTag,
- uint32_t framesize);
-
- void sendGetOk(framing::ChannelAdapter& channel,
- uint32_t messageCount,
- DeliveryId deliveryTag,
- uint32_t framesize);
-
- framing::BasicHeaderProperties* getHeaderProperties();
- const framing::FieldTable& getApplicationHeaders();
- bool isPersistent();
- uint64_t contentSize() const { return size; }
-
- void decode(framing::Buffer& buffer, bool headersOnly = false,
- uint32_t contentChunkSize = 0);
- void decodeHeader(framing::Buffer& buffer);
- void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
-
- void encode(framing::Buffer& buffer) const;
- void encodeHeader(framing::Buffer& buffer) const;
- void encodeContent(framing::Buffer& buffer) const;
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- uint32_t encodedSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- uint32_t encodedHeaderSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- uint32_t encodedContentSize() const;
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- uint64_t expectedContentSize();
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- void setContent(std::auto_ptr<Content>& content);
-
- /**
- * Returns the byte credits required to transfer this message.
- */
- uint32_t getRequiredCredit() const;
-};
-
-}
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
deleted file mode 100644
index bac5dc6386..0000000000
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ /dev/null
@@ -1,168 +0,0 @@
-#ifndef _broker_BrokerMessageBase_h
-#define _broker_BrokerMessageBase_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include <boost/shared_ptr.hpp>
-#include "Content.h"
-#include "DeliveryId.h"
-#include "DeliveryToken.h"
-#include "PersistableMessage.h"
-#include "qpid/framing/amqp_types.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-class BasicHeaderProperties;
-class FieldTable;
-class AMQMethodBody;
-class AMQContentBody;
-class AMQHeaderBody;
-}
-
-
-namespace broker {
-class ConnectionToken;
-class MessageStore;
-
-/**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
-class Message : public PersistableMessage{
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
-
- Message(const ConnectionToken* publisher_,
- const std::string& _exchange,
- const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- publisher(publisher_),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- persistenceId(0),
- redelivered(false)
- {}
-
- Message() :
- mandatory(false),
- immediate(false),
- persistenceId(0),
- redelivered(false)
- {}
-
- virtual ~Message() {};
-
- // Accessors
- const std::string& getRoutingKey() const { return routingKey; }
- const std::string& getExchange() const { return exchange; }
- uint64_t getPersistenceId() const { return persistenceId; }
- bool getRedelivered() const { return redelivered; }
-
- void setRouting(const std::string& _exchange, const std::string& _routingKey)
- { exchange = _exchange; routingKey = _routingKey; }
- void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
- void redeliver() { redelivered = true; }
-
- virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/,
- DeliveryToken::shared_ptr token, uint32_t framesize) = 0;
-
- virtual bool isComplete() = 0;
-
- virtual uint64_t contentSize() const = 0;
- virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
- virtual const framing::FieldTable& getApplicationHeaders() = 0;
- virtual bool isPersistent() = 0;
- virtual const ConnectionToken* getPublisher() const {
- return publisher;
- }
-
- virtual uint32_t getRequiredCredit() const = 0;
-
- virtual void encode(framing::Buffer& buffer) const = 0;
- virtual void encodeHeader(framing::Buffer& buffer) const = 0;
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- virtual uint32_t encodedSize() const = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- virtual uint32_t encodedHeaderSize() const = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual uint32_t encodedContentSize() const = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual uint64_t expectedContentSize() = 0;
-
- virtual void decodeHeader(framing::Buffer& buffer) = 0;
- virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
-
- static shared_ptr decode(framing::Buffer& buffer);
-
- // TODO: AMS 29/1/2007 Don't think these are really part of base class
-
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(framing::AMQHeaderBody*) {};
- virtual void addContent(framing::AMQContentBody*) {};
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
-
- bool isImmediate() const { return immediate; }
-
- private:
- const ConnectionToken* publisher;
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- mutable uint64_t persistenceId;
- bool redelivered;
-};
-
-}}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
deleted file mode 100644
index 1184885aeb..0000000000
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/QpidError.h"
-#include "BrokerMessageMessage.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageOpenBody.h"
-#include "qpid/framing/MessageCloseBody.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "Reference.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "RecoveryManagerImpl.h"
-
-#include <algorithm>
-
-using namespace std;
-using namespace boost;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace broker {
-
-struct MessageDeliveryToken : public DeliveryToken
-{
- const std::string destination;
-
- MessageDeliveryToken(const std::string& d) : destination(d) {}
-};
-
-MessageMessage::MessageMessage(
- ConnectionToken* publisher, const MessageTransferBody* transfer_
-) : Message(publisher, transfer_->getDestination(),
- transfer_->getRoutingKey(),
- transfer_->getRejectUnroutable(),
- transfer_->getImmediate()),
- transfer(*transfer_)
-{
- assert(transfer.getBody().isInline());
-}
-
-MessageMessage::MessageMessage(
- ConnectionToken* publisher, const MessageTransferBody* transfer_,
- ReferencePtr reference_
-) : Message(publisher, transfer_->getDestination(),
- transfer_->getRoutingKey(),
- transfer_->getRejectUnroutable(),
- transfer_->getImmediate()),
- transfer(*transfer_),
- reference(reference_)
-{
- assert(!transfer.getBody().isInline());
- assert(reference_);
-}
-
-/**
- * Currently used by message store impls to recover messages
- */
-MessageMessage::MessageMessage() {}
-
-// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
-void MessageMessage::transferMessage(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize)
-{
- const framing::Content& body = transfer.getBody();
- // Send any reference data
- ReferencePtr ref= getReference();
- if (ref){
-
- // Open
- channel.send(MessageOpenBody(channel.getVersion(), ref->getId()));
- // Appends
- for(Reference::Appends::const_iterator a = ref->getAppends().begin();
- a != ref->getAppends().end();
- ++a) {
- uint32_t sizeleft = a->size();
- const string& content = a->getBytes();
- // Calculate overhead bytes
- // Assume that the overhead is constant as the reference name doesn't change
- uint32_t overhead = sizeleft - content.size();
- string::size_type contentStart = 0;
- while (sizeleft) {
- string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
-
- channel.send(MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize)));
- sizeleft -= contentSize;
- contentStart += contentSize;
- }
- }
- }
-
- // The transfer
- if ( transfer.size()<=framesize ) {
- channel.send(MessageTransferBody(ProtocolVersion(),
- transfer.getTicket(),
- consumerTag,
- getRedelivered(),
- transfer.getRejectUnroutable(),
- transfer.getImmediate(),
- transfer.getTtl(),
- transfer.getPriority(),
- transfer.getTimestamp(),
- transfer.getDeliveryMode(),
- transfer.getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer.getMessageId(),
- transfer.getCorrelationId(),
- transfer.getReplyTo(),
- transfer.getContentType(),
- transfer.getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer.getUserId(),
- transfer.getAppId(),
- transfer.getTransactionId(),
- transfer.getSecurityToken(),
- transfer.getApplicationHeaders(),
- body));
- } else {
- // Thing to do here is to construct a simple reference message then deliver that instead
- // fragmentation will be taken care of in the delivery if necessary;
- string content = body.getValue();
- string refname = "dummy";
- MessageTransferBody newTransfer(channel.getVersion(),
- transfer.getTicket(),
- consumerTag,
- getRedelivered(),
- transfer.getRejectUnroutable(),
- transfer.getImmediate(),
- transfer.getTtl(),
- transfer.getPriority(),
- transfer.getTimestamp(),
- transfer.getDeliveryMode(),
- transfer.getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer.getMessageId(),
- transfer.getCorrelationId(),
- transfer.getReplyTo(),
- transfer.getContentType(),
- transfer.getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer.getUserId(),
- transfer.getAppId(),
- transfer.getTransactionId(),
- transfer.getSecurityToken(),
- transfer.getApplicationHeaders(),
- framing::Content(REFERENCE, refname));
- ReferencePtr newRef(new Reference(refname));
- newRef->append(MessageAppendBody(channel.getVersion(), refname, content));
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef);
- newMsg.transferMessage(channel, consumerTag, framesize);
- return;
- }
- // Close any reference data
- if (ref)
- channel.send(MessageCloseBody(ProtocolVersion(), ref->getId()));
-}
-
-
-void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize)
-{
- transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize);
-}
-
-void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize)
-{
- transferMessage(channel, destination, framesize);
-}
-
-bool MessageMessage::isComplete()
-{
- return true;
-}
-
-uint64_t MessageMessage::contentSize() const
-{
- if (transfer.getBody().isInline())
- return transfer.getBody().getValue().size();
- else {
- assert(getReference());
- return getReference()->getSize();
- }
-}
-
-qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
-{
- return 0; // FIXME aconway 2007-02-05:
-}
-
-const FieldTable& MessageMessage::getApplicationHeaders()
-{
- return transfer.getApplicationHeaders();
-}
-bool MessageMessage::isPersistent()
-{
- return transfer.getDeliveryMode() == PERSISTENT;
-}
-
-uint32_t MessageMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t MessageMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size();
-}
-
-uint32_t MessageMessage::encodedContentSize() const
-{
- return 0;
-}
-
-uint64_t MessageMessage::expectedContentSize()
-{
- return 0;
-}
-
-void MessageMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
-}
-
-void MessageMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer.getBody().isInline()) {
- transfer.encode(buffer);
- } else {
- assert(getReference());
- string data;
- const Reference::Appends& appends = getReference()->getAppends();
- for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) {
- data += a->getBytes();
- }
- framing::Content body(INLINE, data);
- copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer);
- }
-}
-
-void MessageMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
- transfer.decode(buffer);
-}
-
-void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
-{
-}
-
-
-MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
-{
- return MessageTransferBody(version,
- transfer.getTicket(),
- destination,
- getRedelivered(),
- transfer.getRejectUnroutable(),
- transfer.getImmediate(),
- transfer.getTtl(),
- transfer.getPriority(),
- transfer.getTimestamp(),
- transfer.getDeliveryMode(),
- transfer.getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer.getMessageId(),
- transfer.getCorrelationId(),
- transfer.getReplyTo(),
- transfer.getContentType(),
- transfer.getContentEncoding(),
- 0, /*content-length*/
- string(), /*type*/
- transfer.getUserId(),
- transfer.getAppId(),
- transfer.getTransactionId(),
- transfer.getSecurityToken(),
- transfer.getApplicationHeaders(),
- body);
-}
-
-MessageMessage::ReferencePtr MessageMessage::getReference() const {
- return reference;
-}
-
-uint32_t MessageMessage::getRequiredCredit() const
-{
- //TODO: change when encoding changes. Should be the payload of any
- //header & body frames.
- return transfer.size();
-}
-
-
-DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination)
-{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination));
-}
-
-}} // namespace qpid::broker
-
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
deleted file mode 100644
index 6bfd0e045d..0000000000
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ /dev/null
@@ -1,90 +0,0 @@
-#ifndef _broker_BrokerMessageMessage_h
-#define _broker_BrokerMessageMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "BrokerMessageBase.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/amqp_types.h"
-#include <boost/weak_ptr.hpp>
-#include <vector>
-
-namespace qpid {
-
-namespace broker {
-class ConnectionToken;
-class Reference;
-
-class MessageMessage: public Message{
- public:
- typedef boost::shared_ptr<MessageMessage> shared_ptr;
- typedef boost::shared_ptr<Reference> ReferencePtr;
-
- MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer);
- MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference);
- MessageMessage();
-
- // Default destructor okay
-
- framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); }
- ReferencePtr getReference() const ;
-
- void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
- void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize);
-
- bool isComplete();
-
- uint64_t contentSize() const;
- framing::BasicHeaderProperties* getHeaderProperties();
- const framing::FieldTable& getApplicationHeaders();
- bool isPersistent();
-
- void encode(framing::Buffer& buffer) const;
- void encodeHeader(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
- uint32_t encodedHeaderSize() const;
- uint32_t encodedContentSize() const;
- uint64_t expectedContentSize();
- void decodeHeader(framing::Buffer& buffer);
- void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
- uint32_t getRequiredCredit() const;
-
- static DeliveryToken::shared_ptr getToken(const std::string& destination);
-
- private:
- void transferMessage(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize);
-
- framing::MessageTransferBody copyTransfer(
- const framing::ProtocolVersion& version,
- const std::string& destination,
- const framing::Content& body) const;
-
- framing::MessageTransferBody transfer;
- const boost::shared_ptr<Reference> reference;
-};
-
-}}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 5ff9f950eb..7311d043d0 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -88,7 +88,7 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
+ if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
msg->releaseContent(store);
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 962c11d8ee..5ba103d3ed 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -28,7 +28,7 @@
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
#include "Consumer.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Serializer.h"
#include "qpid/sys/Monitor.h"
@@ -43,6 +43,7 @@ namespace qpid {
namespace broker {
class MessageStore;
class QueueRegistry;
+ class TransactionContext;
class Exchange;
/**
diff --git a/cpp/src/qpid/broker/CompletionHandler.h b/cpp/src/qpid/broker/CompletionHandler.h
deleted file mode 100644
index 9d51656282..0000000000
--- a/cpp/src/qpid/broker/CompletionHandler.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#ifndef _broker_CompletionHandler_h
-#define _broker_CompletionHandler_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-/**
- * Callback interface to handle completion of a message.
- */
-class CompletionHandler
-{
- public:
- virtual ~CompletionHandler(){}
- virtual void complete(Message::shared_ptr) = 0;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_CompletionHandler_h*/
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index d0c397d184..dc229947b9 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -21,7 +21,7 @@
#ifndef _Consumer_
#define _Consumer_
-#include "BrokerMessage.h"
+#include "Message.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h
deleted file mode 100644
index 97dce0d3f7..0000000000
--- a/cpp/src/qpid/broker/Content.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Content_
-#define _Content_
-
-#include <boost/function.hpp>
-
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/OutputHandler.h"
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-}
-
-namespace broker {
-class Content{
- public:
- typedef std::string DataBlock;
- typedef boost::function1<void, const DataBlock&> SendFn;
-
- virtual ~Content(){}
-
- /** Add a block of data to the content */
- virtual void add(framing::AMQContentBody* data) = 0;
-
- /** Total size of content in bytes */
- virtual uint32_t size() = 0;
-
- /**
- * Iterate over the content calling SendFn for each block.
- * Subdivide blocks if necessary to ensure each block is
- * <= framesize bytes long.
- */
- virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0;
-
- //FIXME aconway 2007-02-07: This is inconsistently implemented
- //find out what is needed.
- virtual void encode(qpid::framing::Buffer& buffer) = 0;
-};
-}}
-
-
-#endif
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index e8c4f5ba19..9719d972fc 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -22,8 +22,8 @@
#define _DeliverableMessage_
#include "Deliverable.h"
-#include "BrokerMessage.h"
#include "BrokerQueue.h"
+#include "Message.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
index d59c4769d7..f645b37c23 100644
--- a/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -21,9 +21,9 @@
#ifndef _DeliveryAdapter_
#define _DeliveryAdapter_
-#include "BrokerMessageBase.h"
#include "DeliveryId.h"
#include "DeliveryToken.h"
+#include "Message.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 745a246c78..a1f82cb757 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -25,10 +25,10 @@
#include <list>
#include <ostream>
#include "AccumulatedAck.h"
-#include "BrokerMessage.h"
-#include "Prefetch.h"
#include "BrokerQueue.h"
#include "DeliveryId.h"
+#include "Message.h"
+#include "Prefetch.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 554be295bf..7b20bd610c 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 3cbffc6f2f..070e438bcc 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index a99cc1c92c..48d115c1ec 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -24,7 +24,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
deleted file mode 100644
index d69dcfafe7..0000000000
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "InMemoryContent.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using boost::static_pointer_cast;
-
-void InMemoryContent::add(AMQContentBody* data)
-{
- content.push_back(*data);
-}
-
-uint32_t InMemoryContent::size()
-{
- int sum(0);
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- sum += i->size();
- }
- return sum;
-}
-
-void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
-{
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- if (i->size() > framesize) {
- uint32_t offset = 0;
- for (int chunk = i->size() / framesize; chunk > 0; chunk--) {
- string data = i->getData().substr(offset, framesize);
- channel.send(AMQContentBody(data));
- offset += framesize;
- }
- uint32_t remainder = i->size() % framesize;
- if (remainder) {
- string data = i->getData().substr(offset, remainder);
- channel.send(AMQContentBody(data));
- }
- } else {
- channel.send(*i);
- }
- }
-}
-
-void InMemoryContent::encode(Buffer& buffer)
-{
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- i->encode(buffer);
- }
-}
-
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
deleted file mode 100644
index b8b5b37f45..0000000000
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "LazyLoadedContent.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-LazyLoadedContent::~LazyLoadedContent()
-{
- store->destroy(*msg);
-}
-
-LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
- store(_store), msg(_msg), expectedSize(_expectedSize) {}
-
-void LazyLoadedContent::add(AMQContentBody* data)
-{
- store->appendContent(*msg, data->getData());
-}
-
-uint32_t LazyLoadedContent::size()
-{
- return 0;//all content is written as soon as it is added
-}
-
-void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
-{
- if (expectedSize > framesize) {
- for (uint64_t offset = 0; offset < expectedSize; offset += framesize)
- {
- uint64_t remaining = expectedSize - offset;
- string data;
- store->loadContent(*msg, data, offset,
- remaining > framesize ? framesize : remaining);
- channel.send(AMQContentBody(data));
- }
- } else {
- string data;
- store->loadContent(*msg, data, 0, expectedSize);
- channel.send(AMQContentBody(data));
- }
-}
-
-void LazyLoadedContent::encode(Buffer&)
-{
- //do nothing as all content is written as soon as it is added
-}
-
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
new file mode 100644
index 0000000000..e5f92297b7
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Message.h"
+#include "ExchangeRegistry.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SendContent.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/TypeFilter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using std::string;
+
+TransferAdapter Message::TRANSFER;
+PublishAdapter Message::PUBLISH;
+
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
+
+const std::string& Message::getRoutingKey() const
+{
+ return getAdapter().getRoutingKey(frames);
+}
+
+const std::string& Message::getExchangeName() const
+{
+ return getAdapter().getExchange(frames);
+}
+
+const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
+{
+ if (!exchange) {
+ exchange = registry.get(getExchangeName());
+ }
+ return exchange;
+}
+
+bool Message::isImmediate() const
+{
+ return getAdapter().isImmediate(frames);
+}
+
+const FieldTable& Message::getApplicationHeaders() const
+{
+ return getAdapter().getApplicationHeaders(frames);
+}
+
+bool Message::isPersistent()
+{
+ return getAdapter().isPersistent(frames);
+}
+
+uint32_t Message::getRequiredCredit() const
+{
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY));
+ return sum.getSize();
+}
+
+void Message::encode(framing::Buffer& buffer) const
+{
+ //encode method and header frames
+ EncodeFrame f1(buffer);
+ frames.map_if(f1, TypeFilter(METHOD_BODY, HEADER_BODY));
+
+ //then encode the payload of each content frame
+ EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter(CONTENT_BODY));
+}
+
+uint32_t Message::encodedSize() const
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t Message::encodedContentSize() const
+{
+ return frames.getContentSize();
+}
+
+uint32_t Message::encodedHeaderSize() const
+{
+ //add up the size for all method and header frames in the frameset
+ SumFrameSize sum;
+ frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY));
+ return sum.getSize();
+}
+
+void Message::decodeHeader(framing::Buffer& buffer)
+{
+ AMQFrame method;
+ method.decode(buffer);
+ frames.append(method);
+
+ AMQFrame header;
+ header.decode(buffer);
+ frames.append(header);
+}
+
+void Message::decodeContent(framing::Buffer& buffer)
+{
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame;
+ frame.setBody(AMQContentBody());
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frames.append(frame);
+}
+
+void Message::releaseContent(MessageStore* _store)
+{
+ store = _store;
+ if (!getPersistenceId()) {
+ store->stage(*this);
+ }
+ //remove any content frames from the frameset
+ frames.remove(TypeFilter(CONTENT_BODY));
+}
+
+void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize)
+{
+ if (isContentReleased()) {
+ //load content from store in chunks of maxContentSize
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load?
+ for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
+ {
+ uint64_t remaining = expectedSize - offset;
+ AMQFrame frame(channel, AMQContentBody());
+ string& data = frame.castBody<AMQContentBody>()->getData();
+
+ store->loadContent(*this, data, offset,
+ remaining > maxContentSize ? maxContentSize : remaining);
+ out.handle(frame);
+ }
+
+ } else {
+ SendContent f(out, channel, maxFrameSize);
+ frames.map_if(f, TypeFilter(CONTENT_BODY));
+ }
+}
+
+void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/)
+{
+ Relay f(out, channel);
+ frames.map_if(f, TypeFilter(HEADER_BODY));
+}
+
+MessageAdapter& Message::getAdapter() const
+{
+ if (!adapter) {
+ if (frames.isA<BasicPublishBody>()) {
+ adapter = &PUBLISH;
+ } else if(frames.isA<MessageTransferBody>()) {
+ adapter = &TRANSFER;
+ } else {
+ const AMQMethodBody* method = frames.getMethod();
+ if (!method) throw Exception("Can't adapt message with no method");
+ else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
+ }
+ }
+ return *adapter;
+}
+
+uint64_t Message::contentSize() const
+{
+ return frames.getContentSize();
+}
+
+bool Message::isContentLoaded() const
+{
+ return contentSize() > 0;
+}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
new file mode 100644
index 0000000000..95b3f38b55
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.h
@@ -0,0 +1,139 @@
+#ifndef _broker_Message_h
+#define _broker_Message_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/variant.hpp>
+#include "PersistableMessage.h"
+#include "MessageAdapter.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+class SequenceNumber;
+}
+
+namespace broker {
+class ConnectionToken;
+class Exchange;
+class ExchangeRegistry;
+class MessageStore;
+
+class Message : public PersistableMessage {
+public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+
+ Message(const framing::SequenceNumber& id = framing::SequenceNumber());
+
+ uint64_t getPersistenceId() const { return persistenceId; }
+ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+ bool getRedelivered() const { return redelivered; }
+ void redeliver() { redelivered = true; }
+
+ const ConnectionToken* getPublisher() const { return publisher; }
+ void setPublisher(ConnectionToken* p) { publisher = p; }
+
+ uint64_t contentSize() const;
+
+ const std::string& getRoutingKey() const;
+ const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
+ const std::string& getExchangeName() const;
+ bool isImmediate() const;
+ const framing::FieldTable& getApplicationHeaders() const;
+ bool isPersistent();
+
+ framing::FrameSet& getFrames() { return frames; }
+ const framing::FrameSet& getFrames() const { return frames; }
+
+ template <class T> T* getProperties() {
+ return frames.getHeaders()->get<T>(true);
+ }
+
+ template <class T> const T* getProperties() const {
+ return frames.getHeaders()->get<T>();
+ }
+
+ template <class T> const T* getMethod() const {
+ return frames.as<T>();
+ }
+
+ template <class T> bool isA() const {
+ return frames.isA<T>();
+ }
+
+ uint32_t getRequiredCredit() const;
+
+ void encode(framing::Buffer& buffer) const;
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ */
+ uint32_t encodedSize() const;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ uint32_t encodedHeaderSize() const;
+ uint32_t encodedContentSize() const;
+
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer);
+
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ void releaseContent(MessageStore* store);
+
+ void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
+ void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
+
+ bool isContentLoaded() const;
+
+ private:
+ framing::FrameSet frames;
+ mutable boost::shared_ptr<Exchange> exchange;
+ mutable uint64_t persistenceId;
+ bool redelivered;
+ ConnectionToken* publisher;
+ MessageStore* store;
+ mutable MessageAdapter* adapter;
+
+ static TransferAdapter TRANSFER;
+ static PublishAdapter PUBLISH;
+
+ MessageAdapter& getAdapter() const;
+ bool isContentReleased() { return store; }
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
new file mode 100644
index 0000000000..0b2dc6307a
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -0,0 +1,108 @@
+#ifndef _broker_MessageAdapter_h
+#define _broker_MessageAdapter_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+namespace qpid {
+namespace broker {
+
+struct MessageAdapter
+{
+ virtual ~MessageAdapter() {}
+
+ virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0;
+ virtual const std::string& getExchange(const framing::FrameSet& f) = 0;
+ virtual bool isImmediate(const framing::FrameSet& f) = 0;
+ virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0;
+ virtual bool isPersistent(const framing::FrameSet& f) = 0;
+};
+
+struct PublishAdapter : MessageAdapter
+{
+ const std::string& getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getRoutingKey();
+ }
+
+ const std::string& getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getExchange();
+ }
+
+ bool isImmediate(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getImmediate();
+ }
+
+ const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders();
+ }
+
+ bool isPersistent(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2;
+ }
+};
+
+struct TransferAdapter : MessageAdapter
+{
+ const std::string& getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey();
+ }
+
+ const std::string& getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
+
+ bool isImmediate(const framing::FrameSet&)
+ {
+ //TODO: we seem to have lost the immediate flag
+ return false;
+ }
+
+ const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders();
+ }
+
+ bool isPersistent(const framing::FrameSet& f)
+ {
+ return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2;
+ }
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index f19927b708..1a84aa9b65 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -20,55 +20,64 @@
*/
#include "MessageBuilder.h"
-#include "InMemoryContent.h"
-#include "LazyLoadedContent.h"
+#include "Message.h"
+#include "MessageStore.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
-using std::auto_ptr;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler,
- MessageStore* const _store,
- uint64_t _stagingThreshold
-) :
- handler(_handler),
- store(_store),
- stagingThreshold(_stagingThreshold)
-{}
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+ state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
-void MessageBuilder::route(){
- if (message->isComplete()) {
- if (handler) handler->complete(message);
- message.reset();
+void MessageBuilder::handle(AMQFrame& frame)
+{
+ switch(state) {
+ case METHOD:
+ checkType(METHOD_BODY, frame.getBody()->type());
+ state = HEADER;
+ break;
+ case HEADER:
+ checkType(HEADER_BODY, frame.getBody()->type());
+ state = CONTENT;
+ break;
+ case CONTENT:
+ checkType(CONTENT_BODY, frame.getBody()->type());
+ break;
+ default:
+ throw ConnectionException(504, "Invalid frame sequence for message.");
+ }
+ if (staging) {
+ store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
+ } else {
+ message->getFrames().append(frame);
+ //have we reached the staging limit? if so stage message and release content
+ if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
+ store->stage(*message);
+ message->releaseContent(store);
+ staging = true;
+ }
}
}
-void MessageBuilder::initialise(Message::shared_ptr& msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw ConnectionException(504, "Invalid frame sequence for message.");
}
- message = msg;
}
-void MessageBuilder::setHeader(AMQHeaderBody* header){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
- }
- message->setHeader(header);
- if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(*message);
- message->releaseContent(store);
- } else {
- auto_ptr<Content> content(new InMemoryContent());
- message->setContent(content);
- }
- route();
+void MessageBuilder::end()
+{
+ message.reset();
+ state = DORMANT;
+ staging = false;
}
-void MessageBuilder::addContent(AMQContentBody* content){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
- }
- message->addContent(content);
- route();
+void MessageBuilder::start(const SequenceNumber& id)
+{
+ message = Message::shared_ptr(new Message(id));
+ state = METHOD;
+ staging = false;
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index 18e85d7383..134f93b68f 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -21,37 +21,35 @@
#ifndef _MessageBuilder_
#define _MessageBuilder_
-#include <memory>
-#include "qpid/QpidError.h"
-#include "BrokerExchange.h"
-#include "BrokerMessage.h"
-#include "MessageStore.h"
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/BasicPublishBody.h"
-#include "CompletionHandler.h"
+#include "boost/shared_ptr.hpp"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace broker {
- class MessageBuilder{
+ class Message;
+ class MessageStore;
+
+ class MessageBuilder : public framing::FrameHandler{
public:
- MessageBuilder(CompletionHandler* _handler,
- MessageStore* const store = 0,
- uint64_t stagingThreshold = 0);
- void initialise(Message::shared_ptr& msg);
- void setHeader(framing::AMQHeaderBody* header);
- void addContent(framing::AMQContentBody* content);
- Message::shared_ptr getMessage() { return message; }
+ MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0);
+ void handle(framing::AMQFrame& frame);
+ boost::shared_ptr<Message> getMessage() { return message; }
+ void start(const framing::SequenceNumber& id);
+ void end();
private:
- Message::shared_ptr message;
- CompletionHandler* handler;
+ enum State {DORMANT, METHOD, HEADER, CONTENT};
+ State state;
+ boost::shared_ptr<Message> message;
MessageStore* const store;
const uint64_t stagingThreshold;
+ bool staging;
- void route();
+ void checkType(uint8_t expected, uint8_t actual);
};
}
}
#endif
+
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
new file mode 100644
index 0000000000..09ab8ec465
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageDelivery.h"
+
+#include "DeliveryToken.h"
+#include "Message.h"
+#include "BrokerQueue.h"
+#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace qpid{
+namespace broker{
+
+struct BaseToken : DeliveryToken
+{
+ virtual ~BaseToken() {}
+ virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0;
+};
+
+struct BasicGetToken : BaseToken
+{
+ typedef boost::shared_ptr<BasicGetToken> shared_ptr;
+
+ Queue::shared_ptr queue;
+
+ BasicGetToken(Queue::shared_ptr q) : queue(q) {}
+
+ void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ {
+ channel.send(BasicGetOkBody(
+ channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
+ msg->getRoutingKey(), queue->getMessageCount()));
+
+ }
+};
+
+struct BasicConsumeToken : BaseToken
+{
+ typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
+
+ const string consumer;
+
+ BasicConsumeToken(const string c) : consumer(c) {}
+
+ void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ {
+ channel.send(BasicDeliverBody(
+ channel.getVersion(), consumer, id.getValue(),
+ msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey()));
+ }
+
+};
+
+struct MessageDeliveryToken : BaseToken
+{
+ const std::string destination;
+ const u_int8_t confirmMode;
+ const u_int8_t acquireMode;
+
+ MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
+ destination(d), confirmMode(c), acquireMode(a) {}
+
+ void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/)
+ {
+ //TODO; need to figure out how the acquire mode gets
+ //communicated (this is just a temporary solution)
+ channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode));
+
+ //may need to set the redelivered flag:
+ if (msg->getRedelivered()){
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
+ }
+ }
+};
+
+}
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue)
+{
+ return DeliveryToken::shared_ptr(new BasicGetToken(queue));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer)
+{
+ return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode, u_int8_t acquireMode)
+{
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode));
+}
+
+void MessageDelivery::deliver(Message::shared_ptr msg,
+ framing::ChannelAdapter& channel,
+ DeliveryId id,
+ DeliveryToken::shared_ptr token,
+ uint16_t framesize)
+{
+ //currently a message published from one class and delivered to
+ //another may well have the wrong headers; however we will only
+ //have one content class for 0-10 proper
+
+ //send method
+ boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
+ t->sendMethod(msg, channel, id);
+
+ boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out;
+ //send header
+ msg->sendHeader(*handler, channel.getId(), framesize);
+
+ //send content
+ msg->sendContent(*handler, channel.getId(), framesize);
+}
diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h
new file mode 100644
index 0000000000..b87ef2a5ce
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDelivery.h
@@ -0,0 +1,60 @@
+#ifndef _broker_MessageDelivery_h
+#define _broker_MessageDelivery_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include "DeliveryId.h"
+
+namespace qpid {
+
+namespace framing {
+
+class ChannelAdapter;
+
+}
+
+namespace broker {
+
+class DeliveryToken;
+class Message;
+class Queue;
+
+/**
+ * Encapsulates the different options for message delivery currently supported.
+ */
+class MessageDelivery {
+public:
+ static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
+ static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
+ static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode);
+
+ static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel,
+ DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize);
+};
+
+}
+}
+
+
+#endif /*!_broker_MessageDelivery_h*/
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index ce1fa1e028..a4ceb77c12 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -22,7 +22,7 @@
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
-#include "BrokerMessageMessage.h"
+#include "MessageDelivery.h"
#include "qpid/framing/MessageAppendBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "BrokerAdapter.h"
@@ -55,7 +55,7 @@ MessageHandlerImpl::open(const string& /*reference*/)
}
void
-MessageHandlerImpl::append(const framing::AMQMethodBody& )
+MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
{
throw ConnectionException(540, "References no longer supported");
}
@@ -92,7 +92,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
const string& destination,
bool noLocal,
u_int8_t confirmMode,
- u_int8_t /*acquireMode*/,//TODO: implement acquire modes
+ u_int8_t acquireMode,//TODO: implement acquire modes
bool exclusive,
const framing::FieldTable& filter )
{
@@ -101,7 +101,8 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
+ channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -115,7 +116,7 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = getQueue(queueName);
- if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){
+ if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -160,20 +161,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*co
//TODO: implement
}
-void
-MessageHandlerImpl::transfer(const framing::AMQMethodBody& context)
-{
- const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context);
- if (transfer->getBody().isInline()) {
- MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer));
- channel.handleInlineTransfer(message);
- } else {
- throw ConnectionException(540, "References no longer supported");
- }
-}
-
-
-
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
{
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index f4d9fa0c76..35d34bf94e 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -23,7 +23,6 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include "Reference.h"
#include "HandlerImpl.h"
namespace qpid {
@@ -40,7 +39,7 @@ class MessageHandlerImpl :
public:
MessageHandlerImpl(CoreRefs& parent);
- void append(const framing::AMQMethodBody& context);
+ void append(const std::string& reference, const std::string& bytes);
void cancel(const std::string& destination );
@@ -75,8 +74,6 @@ class MessageHandlerImpl :
void resume(const std::string& reference,
const std::string& identifier );
- void transfer(const framing::AMQMethodBody& context);
-
void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
void flowMode(const std::string& destination, u_int8_t mode);
@@ -98,8 +95,6 @@ class MessageHandlerImpl :
bool exclusive,
const framing::FieldTable& filter);
- private:
- ReferenceRegistry references;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 0da12a1a75..1254c3890b 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -21,7 +21,6 @@
#ifndef _MessageStoreModule_
#define _MessageStoreModule_
-#include "BrokerMessage.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
#include "RecoveryManager.h"
diff --git a/cpp/src/qpid/broker/NameGenerator.h b/cpp/src/qpid/broker/NameGenerator.h
index affcedba41..6ea25c9797 100644
--- a/cpp/src/qpid/broker/NameGenerator.h
+++ b/cpp/src/qpid/broker/NameGenerator.h
@@ -21,7 +21,7 @@
#ifndef _NameGenerator_
#define _NameGenerator_
-#include "BrokerMessage.h"
+#include <string>
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 0d5a5b55f9..95f55f21b9 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -22,7 +22,6 @@
#define _NullMessageStore_
#include <set>
-#include "BrokerMessage.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h
index 9ba883cec0..683b740ddc 100644
--- a/cpp/src/qpid/broker/PersistableExchange.h
+++ b/cpp/src/qpid/broker/PersistableExchange.h
@@ -35,7 +35,7 @@ namespace broker {
class PersistableExchange : public Persistable
{
public:
- virtual std::string getName() const = 0;
+ virtual const std::string& getName() const = 0;
virtual ~PersistableExchange() {};
};
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index e47ca0ae48..06fc59107e 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -34,7 +34,7 @@ namespace broker {
* The interface messages must expose to the MessageStore in order to
* be persistable.
*/
- class PersistableMessage : public Persistable
+class PersistableMessage : public Persistable
{
@@ -72,10 +72,11 @@ public:
virtual uint32_t encodedHeaderSize() const = 0;
virtual ~PersistableMessage() {};
+
PersistableMessage():
- enqueueCompleted(false),
- asyncCounter(0),
- dequeueCompleted(false){};
+ enqueueCompleted(false),
+ asyncCounter(0),
+ dequeueCompleted(false){};
inline bool isEnqueueComplete() {return enqueueCompleted;};
inline void enqueueComplete() {
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index 9e0c334dc3..9dcc9d4233 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -25,7 +25,7 @@
#include <functional>
#include <list>
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
#include "TxOp.h"
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index 25c5baf364..a571343e93 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -25,7 +25,7 @@
#include <functional>
#include <list>
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
#include "TxOp.h"
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 954c50faee..29390a6452 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -20,8 +20,7 @@
*/
#include "RecoveryManagerImpl.h"
-#include "BrokerMessage.h"
-#include "BrokerMessageMessage.h"
+#include "Message.h"
#include "BrokerQueue.h"
#include "RecoveredEnqueue.h"
#include "RecoveredDequeue.h"
@@ -110,10 +109,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
{
buffer.record();
//peek at type:
- Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ?
- ((Message*) new MessageMessage()) :
- ((Message*) new BasicMessage()));
- buffer.restore();
+ Message::shared_ptr message(new Message());
message->decodeHeader(buffer);
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
}
@@ -131,21 +127,6 @@ void RecoveryManagerImpl::recoveryComplete()
//TODO (finalise binding setup etc)
}
-uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer)
-{
- return buffer.getOctet();
-}
-
-void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer)
-{
- buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC);
-}
-
-uint32_t RecoveryManagerImpl::encodedMessageTypeSize()
-{
- return 1;
-}
-
bool RecoverableMessageImpl::loadContent(uint64_t available)
{
return !stagingThreshold || available < stagingThreshold;
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h
index bcd71defb1..58ec63926c 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.h
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h
@@ -45,10 +45,6 @@ namespace broker {
RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn);
void recoveryComplete();
-
- static uint8_t decodeMessageType(framing::Buffer& buffer);
- static void encodeMessageType(const Message& msg, framing::Buffer& buffer);
- static uint32_t encodedMessageTypeSize();
};
diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp
deleted file mode 100644
index 283b231b60..0000000000
--- a/cpp/src/qpid/broker/Reference.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/bind.hpp>
-#include "Reference.h"
-#include "BrokerMessageMessage.h"
-#include "qpid/QpidError.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "CompletionHandler.h"
-
-namespace qpid {
-namespace broker {
-
-Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) {
- ReferenceMap::iterator i = references.find(id);
- if (i != references.end())
- throw ConnectionException(503, "Attempt to re-open reference " +id);
- return references[id] = Reference::shared_ptr(new Reference(id, this));
-}
-
-Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
- ReferenceMap::iterator i = references.find(id);
- if (i == references.end())
- throw ConnectionException(503, "Attempt to use non-existent reference "+id);
- return i->second;
-}
-
-void Reference::append(const framing::MessageAppendBody& app) {
- appends.push_back(app);
- size += app.getBytes().length();
-}
-
-void Reference::close() {
- messages.clear();
- registry->references.erase(getId());
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h
deleted file mode 100644
index 5a373fbeba..0000000000
--- a/cpp/src/qpid/broker/Reference.h
+++ /dev/null
@@ -1,115 +0,0 @@
-#ifndef _broker_Reference_h
-#define _broker_Reference_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/MessageAppendBody.h"
-
-#include <string>
-#include <vector>
-#include <map>
-#include <boost/shared_ptr.hpp>
-#include <boost/range.hpp>
-
-namespace qpid {
-
-namespace framing {
-class MessageAppendBody;
-}
-
-namespace broker {
-
-class MessageMessage;
-class ReferenceRegistry;
-
-// FIXME aconway 2007-03-27: Merge with client::IncomingMessage
-// to common reference handling code.
-
-/**
- * A reference is an accumulation point for data in a multi-frame
- * message. A reference can be used by multiple transfer commands to
- * create multiple messages, so the reference tracks which commands
- * are using it. When the reference is closed, all the associated
- * transfers are completed.
- *
- * THREAD UNSAFE: per-channel resource, access to channels is
- * serialized.
- */
-class Reference
-{
- public:
- typedef std::string Id;
- typedef boost::shared_ptr<Reference> shared_ptr;
- typedef boost::shared_ptr<MessageMessage> MessagePtr;
- typedef std::vector<MessagePtr> Messages;
- typedef std::vector<framing::MessageAppendBody> Appends;
-
- Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
- : id(id_), size(0), registry(reg) {}
-
- const std::string& getId() const { return id; }
- uint64_t getSize() const { return size; }
-
- /** Add a message to be completed with this reference */
- void addMessage(MessagePtr message) { messages.push_back(message); }
-
- /** Append more data to the reference */
- void append(const framing::MessageAppendBody&);
-
- /** Close the reference, complete each associated message */
- void close();
-
- const Appends& getAppends() const { return appends; }
- const Messages& getMessages() const { return messages; }
-
- private:
- Id id;
- uint64_t size;
- ReferenceRegistry* registry;
- Messages messages;
- Appends appends;
-};
-
-
-/**
- * A registry/factory for references.
- *
- * THREAD UNSAFE: per-channel resource, access to channels is
- * serialized.
- */
-class ReferenceRegistry {
- public:
- ReferenceRegistry() {};
- Reference::shared_ptr open(const Reference::Id& id);
- Reference::shared_ptr get(const Reference::Id& id);
-
- private:
- typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap;
- ReferenceMap references;
-
- // Reference calls references.erase().
- friend class Reference;
-};
-
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_Reference_h*/
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f65e450e82..5e9106c1dd 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -20,7 +20,10 @@
*/
#include "SemanticHandler.h"
+
+#include "boost/format.hpp"
#include "BrokerAdapter.h"
+#include "MessageDelivery.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelCloseOkBody.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -32,18 +35,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
- connection(c),
- channel(c, *this, id, &c.broker.getStore())
+ connection(c), channel(c, *this, id)
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
}
-
void SemanticHandler::handle(framing::AMQFrame& frame)
{
- //TODO: assembly etc when move to 0-10 framing
- //
+ //TODO: assembly for method and headers
+
//have potentially three separate tracks at this point:
//
// (1) execution controls
@@ -51,46 +52,43 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
// (3) data i.e. content-bearing commands
//
//framesets on each can be interleaved. framesets on the latter
- //two share a command-id sequence.
+ //two share a command-id sequence. controls on the first track are
+ //used to communicate details about that command-id sequence.
//
//need to decide what to do if a frame on the command track
//arrives while a frameset on the data track is still
//open. execute it (i.e. out-of order execution with respect to
- //the command id sequence) or queue it up.
+ //the command id sequence) or queue it up?
- //if ready to execute (i.e. if segment is complete or frame is
- //message content):
- handleBody(frame.getBody());
-}
-
-//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
-{
- try {
- if (!method->invoke(this)) {
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.lwm);
- }
+ try{
- //else do the usual:
- handleL4(method);
- //(if the frameset is complete) we can move the execution-mark
- //forward
-
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- //TODO: need to account for async store opreations
- //when this command is a message publication
- ++(incoming.hwm);
+ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
+
+ switch(track) {
+ case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
+ handleL2(frame.castBody<AMQMethodBody>());
+ break;
+ case EXECUTION_CONTROL_TRACK:
+ handleL3(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_COMMAND_TRACK:
+ if (!isOpen()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
}
-
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset (that
- //can't happen until 0-10 framing is implemented)
+ handleCommand(frame.castBody<AMQMethodBody>());
+ break;
+ case MODEL_CONTENT_TRACK:
+ handleContent(frame);
+ break;
}
+
+ }catch(const ChannelException& e){
+ adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
+ connection.closeChannel(getId());
+ }catch(const ConnectionException& e){
+ connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
}catch(const std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
}
}
@@ -102,7 +100,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
outgoing.lwm = mark;
//ack messages:
channel.ackCumulative(mark.getValue());
- //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
@@ -116,7 +113,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
void SemanticHandler::flush()
{
//flush doubles as a sync to begin with - send an execution.complete
- incoming.lwm = incoming.hwm;
if (isOpen()) {
Mutex::ScopedLock l(outLock);
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
@@ -142,52 +138,59 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
//never actually sent by client at present
}
-void SemanticHandler::handleL4(framing::AMQMethodBody* method)
+void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- InvocationVisitor v(adapter.get());
- method->accept(v);
- if (!v.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
- } else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
- }
- }
- }catch(const ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ ++(incoming.lwm);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
}
}
-bool SemanticHandler::isOpen() const
-{
- return channel.isOpen();
+void SemanticHandler::handleL2(framing::AMQMethodBody* method)
+{
+ if(!method->isA<ChannelOpenBody>() && !isOpen()) {
+ if (!method->isA<ChannelCloseOkBody>()) {
+ throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
+ }
+ } else {
+ method->invoke(adapter->getChannelHandler());
+ }
}
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body)
+void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
- channel.handleHeader(body);
+ if (!method->invoke(this)) {
+ throw ConnectionException(540, "Not implemented");
+ }
}
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body)
+void SemanticHandler::handleContent(AMQFrame& frame)
{
- channel.handleContent(body);
+ Message::shared_ptr msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(++(incoming.lwm));
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&connection);
+ channel.handle(msg);
+ msgBuilder.end();
+ //TODO: need to account for async store operations and interleaving
+ ++(incoming.hwm);
+ }
}
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body)
-{
- channel.handleHeartbeat(body);
+bool SemanticHandler::isOpen() const
+{
+ return channel.isOpen();
}
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -195,14 +198,13 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha
Mutex::ScopedLock l(outLock);
SequenceNumber copy(outgoing.hwm);
++copy;
- msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
- //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- msg->deliver(*this, tag, token, connection.getFrameMax());
+ MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
}
void SemanticHandler::send(const AMQBody& body)
@@ -214,3 +216,49 @@ void SemanticHandler::send(const AMQBody& body)
}
ChannelAdapter::send(body);
}
+
+uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
+}
+
+uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
+{
+ return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
+}
+
+SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
+{
+ //will be replaced by field in 0-10 frame header
+ uint8_t type = frame.getBody()->type();
+ uint16_t classId;
+ switch(type) {
+ case METHOD_BODY:
+ if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
+ return MODEL_CONTENT_TRACK;
+ }
+
+ classId = frame.castBody<AMQMethodBody>()->amqpClassId();
+ switch (classId) {
+ case ChannelOpenBody::CLASS_ID:
+ return SESSION_CONTROL_TRACK;
+ case ExecutionCompleteBody::CLASS_ID:
+ return EXECUTION_CONTROL_TRACK;
+ }
+
+ return MODEL_COMMAND_TRACK;
+ case HEADER_BODY:
+ case CONTENT_BODY:
+ return MODEL_CONTENT_TRACK;
+ }
+ throw Exception("Could not determine track");
+}
+
+//ChannelAdapter virtual methods, no longer used:
+void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
+
+void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
+
+void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
+
+void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 672c6ad929..611cd3a99b 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -25,6 +25,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
@@ -55,8 +56,17 @@ class SemanticHandler : private framing::ChannelAdapter,
framing::Window incoming;
framing::Window outgoing;
sys::Mutex outLock;
+ MessageBuilder msgBuilder;
- void handleL4(framing::AMQMethodBody* method);
+ enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
+ TrackId getTrack(const framing::AMQFrame& frame);
+ uint16_t getClassId(const framing::AMQFrame& frame);
+ uint16_t getMethodId(const framing::AMQFrame& frame);
+
+ void handleL3(framing::AMQMethodBody* method);
+ void handleL2(framing::AMQMethodBody* method);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
void handleMethod(framing::AMQMethodBody* method);
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 6536a7c4ce..c411fb1965 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -25,7 +25,6 @@
#include <vector>
#include "BrokerExchange.h"
#include "qpid/framing/FieldTable.h"
-#include "BrokerMessage.h"
#include "qpid/sys/Monitor.h"
#include "BrokerQueue.h"
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 29b1dc38af..564e021c5a 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -24,10 +24,10 @@
#include <algorithm>
#include <functional>
#include <list>
+#include "BrokerQueue.h"
#include "Deliverable.h"
-#include "BrokerMessage.h"
+#include "Message.h"
#include "MessageStore.h"
-#include "BrokerQueue.h"
#include "TxOp.h"
namespace qpid {
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
index b3d720baf0..754b0544c6 100644
--- a/cpp/src/qpid/client/ChannelHandler.cpp
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id)
id = _id;
setState(OPENING);
- AMQFrame f(version, id, ChannelOpenBody(version));
+ AMQFrame f(id, ChannelOpenBody(version));
out(f);
std::set<int> states;
@@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id)
void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
+ AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId));
out(f);
}
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index d1cc4734eb..cc2b7aedc8 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -181,8 +181,8 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
- ReceivedContent::shared_ptr content = gets.pop();
- content->populate(msg);
+ FrameSet::shared_ptr content = gets.pop();
+ msg.populate(*content);
return true;
}
}
@@ -232,13 +232,13 @@ void Channel::join() {
void Channel::run() {
try {
while (true) {
- ReceivedContent::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session->get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
if (i != consumers.end()) {
Message msg;
- content->populate(msg);
+ msg.populate(*content);
i->second.listener->received(msg);
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index d73addc950..c355fe007a 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -83,7 +83,7 @@ class Channel : private sys::Runnable
std::auto_ptr<Session> session;
SessionCore::shared_ptr sessionCore;
framing::ChannelId channelId;
- BlockingQueue<ReceivedContent::shared_ptr> gets;
+ BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
uint32_t nameCounter;
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
index fd33fbc830..19b0f867bc 100644
--- a/cpp/src/qpid/client/ClientMessage.h
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -23,8 +23,13 @@
*/
#include <string>
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
namespace qpid {
namespace client {
@@ -55,6 +60,17 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon
const HeaderProperties& getMethodHeaders() const { return *this; }
+
+ //TODO: move this elsewhere (GRS 24/08/2007)
+ void populate(framing::FrameSet& frameset)
+ {
+ const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
+ if (properties) {
+ BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
+ }
+ frameset.getContent(data);
+ }
+
private:
std::string data;
std::string destination;
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 66db9384e2..40e13593ea 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -109,7 +109,7 @@ void ConnectionHandler::close()
void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f(ProtocolVersion(), 0, body);
+ AMQFrame f(0, body);
out(f);
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index e63ac69da6..b4d2156c31 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -107,7 +107,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
- AMQFrame frame(version, 0, new AMQHeartbeatBody());
+ AMQFrame frame(0, new AMQHeartbeatBody());
connector->send(frame);
}
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index b25f19e4ba..6e12a9c84f 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -180,7 +180,7 @@ void Connector::run(){
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
- AMQFrame frame(version);
+ AMQFrame frame;
while(frame.decode(inbuf)){
QPID_LOG(trace, "RECV: " << frame);
input->received(frame);
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 6c2600d00b..d10b3d3fe8 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -64,9 +64,9 @@ void ExecutionHandler::handle(AMQFrame& frame)
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
- arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+ arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
}
- arriving->append(body);
+ arriving->append(frame);
if (arriving->isComplete()) {
received.push(arriving);
arriving.reset();
@@ -123,7 +123,7 @@ void ExecutionHandler::sync()
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, ExecutionFlushBody());
+ AMQFrame frame(0, ExecutionFlushBody());
out(frame);
}
@@ -139,8 +139,7 @@ void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
- command);
+ AMQFrame frame(0/*id will be filled in be channel handler*/, command);
out(frame);
}
@@ -149,10 +148,10 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp
{
send(command, f, g);
- AMQHeaderBody header(BASIC);
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
- header.setContentSize(data.size());
- AMQFrame h(version, 0, header);
+ AMQHeaderBody header;
+ BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
+ header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
+ AMQFrame h(0, header);
out(h);
u_int64_t data_length = data.length();
@@ -160,7 +159,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, AMQContentBody(data));
+ AMQFrame frame(0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -168,7 +167,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, AMQContentBody(frag));
+ AMQFrame frame(0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index b409d5df7b..f740e14185 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -23,12 +23,12 @@
#include <queue>
#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/SequenceNumber.h"
#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
-#include "ReceivedContent.h"
namespace qpid {
namespace client {
@@ -39,7 +39,7 @@ class ExecutionHandler :
{
framing::Window incoming;
framing::Window outgoing;
- ReceivedContent::shared_ptr arriving;
+ framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
framing::ProtocolVersion version;
@@ -52,7 +52,7 @@ class ExecutionHandler :
void sync();
public:
- BlockingQueue<ReceivedContent::shared_ptr> received;
+ BlockingQueue<framing::FrameSet::shared_ptr> received;
ExecutionHandler(uint64_t maxFrameSize = 65536);
diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp
deleted file mode 100644
index 5a1f48901a..0000000000
--- a/cpp/src/qpid/client/ReceivedContent.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ReceivedContent.h"
-#include "qpid/framing/all_method_bodies.h"
-
-using qpid::client::ReceivedContent;
-using namespace qpid::framing;
-using namespace boost;
-
-ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
-
-void ReceivedContent::append(AMQBody* part)
-{
- parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
-}
-
-bool ReceivedContent::isComplete() const
-{
- if (parts.empty()) {
- return false;
- } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- const AMQHeaderBody* headers(getHeaders());
- return headers && headers->getContentSize() == getContentSize();
- } else if (isA<MessageTransferBody>()) {
- //no longer support references, headers and data are still method fields
- return true;
- } else {
- throw Exception("Unknown content class");
- }
-}
-
-
-const AMQMethodBody* ReceivedContent::getMethod() const
-{
- return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
-}
-
-const AMQHeaderBody* ReceivedContent::getHeaders() const
-{
- return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
-}
-
-uint64_t ReceivedContent::getContentSize() const
-{
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- uint64_t size(0);
- for (uint i = 2; i < parts.size(); i++) {
- size += parts[i].getBody()->size();
- }
- return size;
- } else if (isA<MessageTransferBody>()) {
- return as<MessageTransferBody>()->getBody().getValue().size();
- } else {
- throw Exception("Unknown content class");
- }
-}
-
-std::string ReceivedContent::getContent() const
-{
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- string data;
- for (uint i = 2; i < parts.size(); i++) {
- data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
- }
- return data;
- } else if (isA<MessageTransferBody>()) {
- return as<MessageTransferBody>()->getBody().getValue();
- } else {
- throw Exception("Unknown content class");
- }
-}
-
-void ReceivedContent::populate(Message& msg)
-{
- if (!isComplete()) throw Exception("Incomplete message");
-
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
- BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
- msg.setData(getContent());
- } else if (isA<MessageTransferBody>()) {
- throw Exception("Transfer not yet supported");
- } else {
- throw Exception("Unknown content class");
- }
-}
diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h
deleted file mode 100644
index 4f84039c10..0000000000
--- a/cpp/src/qpid/client/ReceivedContent.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <vector>
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "ClientMessage.h"
-
-#ifndef _ReceivedContent_
-#define _ReceivedContent_
-
-namespace qpid {
-namespace client {
-
-/**
- * Collects the frames representing some received 'content'. This
- * provides a raw interface to 'message' data and attributes.
- */
-class ReceivedContent
-{
- const framing::SequenceNumber id;
- std::vector<framing::AMQFrame> parts;
-
-public:
- typedef boost::shared_ptr<ReceivedContent> shared_ptr;
-
- ReceivedContent(const framing::SequenceNumber& id);
- void append(framing::AMQBody* part);
- bool isComplete() const;
-
- uint64_t getContentSize() const;
- std::string getContent() const;
-
- const framing::AMQMethodBody* getMethod() const;
- const framing::AMQHeaderBody* getHeaders() const;
-
- template <class T> bool isA() const {
- const framing::AMQMethodBody* method=getMethod();
- return method && method->isA<T>();
- }
-
- template <class T> const T* as() const {
- const framing::AMQMethodBody* method=getMethod();
- return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
- }
-
- const framing::SequenceNumber& getId() const { return id; }
-
- void populate(Message& msg);
-};
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index f7ed7416cd..1b04e74af4 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -77,7 +77,7 @@ Response SessionCore::send(const AMQMethodBody& method, const MethodContent& con
return Response(f);
}
-ReceivedContent::shared_ptr SessionCore::get()
+FrameSet::shared_ptr SessionCore::get()
{
return l3.received.pop();
}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index bcbaf0028d..0febb956b9 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -25,11 +25,11 @@
#include <boost/shared_ptr.hpp>
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "ChannelHandler.h"
#include "ExecutionHandler.h"
#include "FutureFactory.h"
-#include "ReceivedContent.h"
#include "Response.h"
namespace qpid {
@@ -49,7 +49,7 @@ public:
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
- ReceivedContent::shared_ptr get();
+ framing::FrameSet::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
bool isSync();
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e80912a2ea..b8cf568bf7 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -94,8 +94,7 @@ void Cluster::handle(AMQFrame& frame) {
}
void Cluster::notify() {
- AMQFrame frame(ProtocolVersion(), 0,
- ClusterNotifyBody(ProtocolVersion(), url));
+ AMQFrame frame(0, ClusterNotifyBody(ProtocolVersion(), url));
handle(frame);
}
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 44c5ff24c5..5aa9c3dc21 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -51,7 +51,7 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter, private Deli
//
BrokerHandler(Broker& broker) :
connection(0, broker),
- channel(connection, *this, 1, 0),
+ channel(connection, *this, 1),
adapter(channel, connection, broker, *this) {}
void handle(AMQFrame& frame) {
diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h
index dd4ab10d7d..5d530a1b9a 100644
--- a/cpp/src/qpid/framing/AMQContentBody.h
+++ b/cpp/src/qpid/framing/AMQContentBody.h
@@ -38,6 +38,7 @@ public:
inline virtual ~AMQContentBody(){}
inline uint8_t type() const { return CONTENT_BODY; };
inline const string& getData() const { return data; }
+ inline string& getData() { return data; }
uint32_t size() const;
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, uint32_t size);
diff --git a/cpp/src/qpid/framing/AMQDataBlock.h b/cpp/src/qpid/framing/AMQDataBlock.h
index 6ff61b185e..9b6fdfd966 100644
--- a/cpp/src/qpid/framing/AMQDataBlock.h
+++ b/cpp/src/qpid/framing/AMQDataBlock.h
@@ -30,7 +30,7 @@ class AMQDataBlock
{
public:
virtual ~AMQDataBlock() {}
- virtual void encode(Buffer& buffer) = 0;
+ virtual void encode(Buffer& buffer) const = 0;
virtual bool decode(Buffer& buffer) = 0;
virtual uint32_t size() const = 0;
};
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 780af71be4..a7fd068ee4 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -70,7 +70,7 @@ const AMQBody* AMQFrame::getBody() const {
return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
}
-void AMQFrame::encode(Buffer& buffer)
+void AMQFrame::encode(Buffer& buffer) const
{
buffer.putOctet(getBody()->type());
buffer.putShort(channel);
@@ -80,8 +80,11 @@ void AMQFrame::encode(Buffer& buffer)
}
uint32_t AMQFrame::size() const{
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
- boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
+ return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
+}
+
+uint32_t AMQFrame::frameOverhead() {
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 9e825a9936..84e7660218 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -37,14 +37,14 @@ namespace framing {
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame(ProtocolVersion=ProtocolVersion()) {}
+ AMQFrame() : channel(0) {}
/** Construct a frame with a copy of b */
- AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody* b) : channel(c) {
setBody(*b);
}
- AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody& b) : channel(c) {
setBody(b);
}
@@ -52,21 +52,26 @@ class AMQFrame : public AMQDataBlock
void setChannel(ChannelId c) { channel = c; }
AMQBody* getBody();
- const AMQBody* getBody() const;
+ const AMQBody* getBody() const;
/** Copy a body instance to the frame */
void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
/** Convenience template to cast the body to an expected type. */
template <class T> T* castBody() {
- boost::polymorphic_downcast<T*>(getBody());
+ return boost::polymorphic_downcast<T*>(getBody());
+ }
+
+ template <class T> const T* castBody() const {
+ return boost::polymorphic_downcast<const T*>(getBody());
}
bool empty() { return boost::get<boost::blank>(&body); }
- void encode(Buffer& buffer);
+ void encode(Buffer& buffer) const;
bool decode(Buffer& buffer);
uint32_t size() const;
+ static uint32_t frameOverhead();
private:
struct CopyVisitor : public AMQBodyConstVisitor {
@@ -77,7 +82,7 @@ class AMQFrame : public AMQDataBlock
void visit(const AMQHeartbeatBody& x) { frame.body=x; }
void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); }
};
- friend struct CopyVisitor;
+ friend struct CopyVisitor;
typedef boost::variant<boost::blank,
AMQHeaderBody,
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp
index 6a3c8f27d1..7083709fde 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.cpp
+++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp
@@ -19,37 +19,65 @@
*
*/
#include "AMQHeaderBody.h"
-#include "qpid/QpidError.h"
-#include "BasicHeaderProperties.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
-qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {}
+qpid::framing::AMQHeaderBody::AMQHeaderBody() {}
-qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){}
-
-qpid::framing::AMQHeaderBody::~AMQHeaderBody(){}
+qpid::framing::AMQHeaderBody::~AMQHeaderBody() {}
uint32_t qpid::framing::AMQHeaderBody::size() const{
- return 12 + properties.size();
+ CalculateSize visitor;
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+ return visitor.totalSize() + (properties.size() * (2/*type codes*/ + 4/*size*/));
}
void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
- buffer.putShort(properties.classId());
- buffer.putShort(weight);
- buffer.putLongLong(contentSize);
- properties.encode(buffer);
+ Encode visitor(buffer);
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+}
+
+void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size){
+ uint32_t limit = buffer.available() - size;
+ while (buffer.available() > limit + 2) {
+ uint32_t len = buffer.getLong();
+ uint16_t type = buffer.getShort();
+ //The following switch could be generated as the number of options increases:
+ switch(type) {
+ case BasicHeaderProperties::TYPE:
+ decode(BasicHeaderProperties(), buffer, len - 2);
+ break;
+ case MessageProperties::TYPE:
+ decode(MessageProperties(), buffer, len - 2);
+ break;
+ case DeliveryProperties::TYPE:
+ decode(DeliveryProperties(), buffer, len - 2);
+ break;
+ default:
+ //TODO: should just skip over them keeping them for later dispatch as is
+ throw Exception(QPID_MSG("Unexpected property type: " << type));
+ }
+ }
}
-void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
- buffer.getShort(); // Ignore classId
- weight = buffer.getShort();
- contentSize = buffer.getLongLong();
- properties.decode(buffer, bufSize - 12);
+uint64_t qpid::framing::AMQHeaderBody::getContentLength() const
+{
+ const MessageProperties* mProps = get<MessageProperties>();
+ if (mProps) {
+ return mProps->getContentLength();
+ }
+ const BasicHeaderProperties* bProps = get<BasicHeaderProperties>();
+ if (bProps) {
+ return bProps->getContentLength();
+ }
+ return 0;
}
void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
{
- out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
- out << ", message_id=" << properties.getMessageId();
- out << ", delivery_mode=" << (int) properties.getDeliveryMode();
- out << ", headers=" << properties.getHeaders();
+ out << "header (" << size() << " bytes)";
+ out << "; properties={";
+ Print visitor(out);
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+ out << "}";
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 894936060c..76bd60559e 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -22,6 +22,12 @@
#include "AMQBody.h"
#include "Buffer.h"
#include "BasicHeaderProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include <iostream>
+#include <vector>
+#include <boost/variant.hpp>
+#include <boost/variant/get.hpp>
#ifndef _AMQHeaderBody_
#define _AMQHeaderBody_
@@ -31,24 +37,85 @@ namespace framing {
class AMQHeaderBody : public AMQBody
{
- BasicHeaderProperties properties;
- uint16_t weight;
- uint64_t contentSize;
- public:
- AMQHeaderBody(int classId);
+ typedef std::vector< boost::variant<BasicHeaderProperties, DeliveryProperties, MessageProperties> > PropertyList;
+
+ PropertyList properties;
+
+ template <class T> void decode(T t, Buffer& b, uint32_t size) {
+ t.decode(b, size);
+ properties.push_back(t);
+ }
+
+ class Encode : public boost::static_visitor<> {
+ Buffer& buffer;
+ public:
+ Encode(Buffer& b) : buffer(b) {}
+
+ template <class T> void operator()(T& t) const {
+ buffer.putLong(t.size() + 2/*typecode*/);
+ buffer.putShort(T::TYPE);
+ t.encode(buffer);
+ }
+ };
+
+ class CalculateSize : public boost::static_visitor<> {
+ uint32_t size;
+ public:
+ CalculateSize() : size(0) {}
+
+ template <class T> void operator()(T& t) {
+ size += t.size();
+ }
+
+ uint32_t totalSize() {
+ return size;
+ }
+ };
+
+ class Print : public boost::static_visitor<> {
+ std::ostream& out;
+ public:
+ Print(std::ostream& o) : out(o) {}
+
+ template <class T> void operator()(T& t) {
+ out << t;
+ }
+ };
+
+public:
+
AMQHeaderBody();
+ ~AMQHeaderBody();
inline uint8_t type() const { return HEADER_BODY; }
- BasicHeaderProperties* getProperties(){ return &properties; }
- const BasicHeaderProperties* getProperties() const { return &properties; }
- inline uint64_t getContentSize() const { return contentSize; }
- inline void setContentSize(uint64_t _size) { contentSize = _size; }
- virtual ~AMQHeaderBody();
- virtual uint32_t size() const;
- virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
- virtual void print(std::ostream& out) const;
+
+ uint32_t size() const;
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, uint32_t size);
+ uint64_t getContentLength() const;
+ void print(std::ostream& out) const;
void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+
+ template <class T> T* get(bool create) {
+ for (PropertyList::iterator i = properties.begin(); i != properties.end(); i++) {
+ T* p = boost::get<T>(&(*i));
+ if (p) return p;
+ }
+ if (create) {
+ properties.push_back(T());
+ return boost::get<T>(&(properties.back()));
+ } else {
+ return 0;
+ }
+ }
+
+ template <class T> const T* get() const {
+ for (PropertyList::const_iterator i = properties.begin(); i != properties.end(); i++) {
+ const T* p = boost::get<T>(&(*i));
+ if (p) return p;
+ }
+ return 0;
+ }
};
}
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 5acb3a7b66..a5c14a37e9 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -49,6 +49,7 @@ class AMQMethodBody : public AMQBody {
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
+ virtual bool isContentBearing() const = 0;
void invoke(AMQP_ServerOperations&);
bool invoke(Invocable*);
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.cpp b/cpp/src/qpid/framing/BasicHeaderProperties.cpp
index dfa5e1bc3f..7d933d0db8 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.cpp
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.cpp
@@ -22,7 +22,10 @@
//TODO: This could be easily generated from the spec
-qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)),
+ priority(0),
+ timestamp(0),
+ contentLength(0){}
qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
uint32_t qpid::framing::BasicHeaderProperties::size() const{
@@ -41,6 +44,7 @@ uint32_t qpid::framing::BasicHeaderProperties::size() const{
if(userId.length() > 0) bytes += userId.length() + 1;
if(appId.length() > 0) bytes += appId.length() + 1;
if(clusterId.length() > 0) bytes += clusterId.length() + 1;
+ if(contentLength != 0) bytes += 8;
return bytes;
}
@@ -63,6 +67,7 @@ void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer)
if(userId.length() > 0) buffer.putShortString(userId);
if(appId.length() > 0) buffer.putShortString(appId);
if(clusterId.length() > 0) buffer.putShortString(clusterId);
+ if(contentLength != 0) buffer.putLongLong(contentLength);
}
void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){
@@ -81,6 +86,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer,
if(flags & (1 << 4)) buffer.getShortString(userId);
if(flags & (1 << 3)) buffer.getShortString(appId);
if(flags & (1 << 2)) buffer.getShortString(clusterId);
+ if(flags & (1 << 1)) contentLength = buffer.getLongLong();
}
uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
@@ -99,5 +105,32 @@ uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
if(userId.length() > 0) flags |= (1 << 4);
if(appId.length() > 0) flags |= (1 << 3);
if(clusterId.length() > 0) flags |= (1 << 2);
+ if(contentLength != 0) flags |= (1 << 1);
return flags;
}
+
+namespace qpid{
+namespace framing{
+
+ std::ostream& operator<<(std::ostream& out, const BasicHeaderProperties& props)
+ {
+ if(props.contentType.length() > 0) out << "contentType=" << props.contentType << ";";
+ if(props.contentEncoding.length() > 0) out << "contentEncoding=" << props.contentEncoding << ";";
+ if(props.headers.count() > 0) out << "headers=" << props.headers << ";";
+ if(props.deliveryMode != 0) out << "deliveryMode=" << props.deliveryMode << ";";
+ if(props.priority != 0) out << "priority=" << props.priority << ";";
+ if(props.correlationId.length() > 0) out << "correlationId=" << props.correlationId << ";";
+ if(props.replyTo.length() > 0) out << "replyTo=" << props.replyTo << ";";
+ if(props.expiration.length() > 0) out << "expiration=" << props.expiration << ";";
+ if(props.messageId.length() > 0) out << "messageId=" << props.messageId << ";";
+ if(props.timestamp != 0) out << "timestamp=" << props.timestamp << ";";
+ if(props.type.length() > 0) out << "type=" << props.type << ";";
+ if(props.userId.length() > 0) out << "userId=" << props.userId << ";";
+ if(props.appId.length() > 0) out << "appId=" << props.appId << ";";
+ if(props.clusterId.length() > 0) out << "clusterId=" << props.clusterId << ";";
+ if(props.contentLength != 0) out << "contentLength=" << props.contentLength << ";";
+
+ return out;
+ }
+
+}}
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index a8ef401b50..d6c71437fb 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -47,15 +47,18 @@ class BasicHeaderProperties : public HeaderProperties
string userId;
string appId;
string clusterId;
+ uint64_t contentLength;
uint16_t getFlags() const;
public:
+ static const uint16_t TYPE = BASIC;
+
BasicHeaderProperties();
virtual ~BasicHeaderProperties();
virtual uint32_t size() const;
virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
+ virtual void decode(Buffer& buffer, uint32_t size = 0);
virtual uint8_t classId() const { return BASIC; }
@@ -74,6 +77,7 @@ class BasicHeaderProperties : public HeaderProperties
string getUserId() const { return userId; }
string getAppId() const { return appId; }
string getClusterId() const { return clusterId; }
+ uint64_t getContentLength() const { return contentLength; }
void setContentType(const string& _type){ contentType = _type; }
void setContentEncoding(const string& encoding){ contentEncoding = encoding; }
@@ -89,6 +93,9 @@ class BasicHeaderProperties : public HeaderProperties
void setUserId(const string& _userId){ userId = _userId; }
void setAppId(const string& _appId){appId = _appId; }
void setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+ void setContentLength(uint64_t _contentLength){ contentLength = _contentLength; }
+
+ friend std::ostream& operator<<(std::ostream&, const BasicHeaderProperties&);
/** \internal
* Template to copy between types like BasicHeaderProperties.
@@ -109,6 +116,7 @@ class BasicHeaderProperties : public HeaderProperties
to.setUserId(from.getUserId());
to.setAppId(from.getAppId());
to.setClusterId(from.getClusterId());
+ to.setContentLength(from.getContentLength());
}
};
}}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 25ff46acdd..86b60d896b 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -51,7 +51,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
void ChannelAdapter::send(const AMQBody& body)
{
assertChannelOpen();
- AMQFrame frame(getVersion(), getId(), body);
+ AMQFrame frame(getId(), body);
handlers.out->handle(frame);
}
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
new file mode 100644
index 0000000000..434f1b3aad
--- /dev/null
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FrameSet.h"
+#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/TypeFilter.h"
+
+using namespace qpid::framing;
+using namespace boost;
+
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {}
+
+void FrameSet::append(AMQFrame& part)
+{
+ parts.push_back(part);
+}
+
+bool FrameSet::isComplete() const
+{
+ //TODO: should eventually use the 0-10 frame header flags when available
+ const AMQMethodBody* method = getMethod();
+ if (!method) {
+ return false;
+ } else if (method->isContentBearing()) {
+ const AMQHeaderBody* header = getHeaders();
+ if (header) {
+ return header->getContentLength() == getContentSize();
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+}
+
+const AMQMethodBody* FrameSet::getMethod() const
+{
+ return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
+}
+
+const AMQHeaderBody* FrameSet::getHeaders() const
+{
+ return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
+}
+
+AMQHeaderBody* FrameSet::getHeaders()
+{
+ return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
+}
+
+uint64_t FrameSet::getContentSize() const
+{
+ SumBodySize sum;
+ map_if(sum, TypeFilter(CONTENT_BODY));
+ return sum.getSize();
+}
+
+void FrameSet::getContent(std::string& out) const
+{
+ AccumulateContent accumulator(out);
+ map_if(accumulator, TypeFilter(CONTENT_BODY));
+}
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
new file mode 100644
index 0000000000..d6d5cd7a13
--- /dev/null
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/SequenceNumber.h"
+
+#ifndef _FrameSet_
+#define _FrameSet_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Collects the frames representing a message.
+ */
+class FrameSet
+{
+ typedef std::vector<AMQFrame> Frames;
+ const SequenceNumber id;
+ Frames parts;
+
+public:
+ typedef boost::shared_ptr<FrameSet> shared_ptr;
+
+ FrameSet(const SequenceNumber& id);
+ void append(AMQFrame& part);
+ bool isComplete() const;
+
+ uint64_t getContentSize() const;
+ void getContent(std::string&) const;
+
+ const AMQMethodBody* getMethod() const;
+ const AMQHeaderBody* getHeaders() const;
+ AMQHeaderBody* getHeaders();
+
+ template <class T> bool isA() const {
+ const AMQMethodBody* method = getMethod();
+ return method && method->isA<T>();
+ }
+
+ template <class T> const T* as() const {
+ const AMQMethodBody* method = getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
+ }
+
+ template <class T> const T* getHeaderProperties() const {
+ const AMQHeaderBody* header = getHeaders();
+ return header ? header->get<T>() : 0;
+ }
+
+ const SequenceNumber& getId() const { return id; }
+
+ template <class P> void remove(P predicate) {
+ parts.erase(remove_if(parts.begin(), parts.end(), predicate), parts.end());
+ }
+
+ template <class F> void map(F& functor) {
+ for_each(parts.begin(), parts.end(), functor);
+ }
+
+ template <class F> void map(F& functor) const {
+ for_each(parts.begin(), parts.end(), functor);
+ }
+
+ template <class F, class P> void map_if(F& functor, P predicate) {
+ for(Frames::iterator i = parts.begin(); i != parts.end(); i++) {
+ if (predicate(*i)) functor(*i);
+ }
+ }
+
+ template <class F, class P> void map_if(F& functor, P predicate) const {
+ for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) {
+ if (predicate(*i)) functor(*i);
+ }
+ }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/framing/ProtocolInitiation.cpp b/cpp/src/qpid/framing/ProtocolInitiation.cpp
index a6d1b17f6e..7164bceb12 100644
--- a/cpp/src/qpid/framing/ProtocolInitiation.cpp
+++ b/cpp/src/qpid/framing/ProtocolInitiation.cpp
@@ -31,7 +31,7 @@ ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {}
ProtocolInitiation::~ProtocolInitiation(){}
-void ProtocolInitiation::encode(Buffer& buffer){
+void ProtocolInitiation::encode(Buffer& buffer) const {
buffer.putOctet('A');
buffer.putOctet('M');
buffer.putOctet('Q');
diff --git a/cpp/src/qpid/framing/ProtocolInitiation.h b/cpp/src/qpid/framing/ProtocolInitiation.h
index adfdc8215d..31c73eb124 100644
--- a/cpp/src/qpid/framing/ProtocolInitiation.h
+++ b/cpp/src/qpid/framing/ProtocolInitiation.h
@@ -39,7 +39,7 @@ public:
ProtocolInitiation(uint8_t major, uint8_t minor);
ProtocolInitiation(ProtocolVersion p);
virtual ~ProtocolInitiation();
- virtual void encode(Buffer& buffer);
+ virtual void encode(Buffer& buffer) const;
virtual bool decode(Buffer& buffer);
inline virtual uint32_t size() const { return 8; }
inline uint8_t getMajor() const { return version.getMajor(); }
diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp
new file mode 100644
index 0000000000..568cc01665
--- /dev/null
+++ b/cpp/src/qpid/framing/SendContent.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SendContent.h"
+
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+
+void qpid::framing::SendContent::operator()(AMQFrame& f) const
+{
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ const AMQContentBody* body(f.castBody<AMQContentBody>());
+ if (body->size() > maxContentSize) {
+ uint32_t offset = 0;
+ for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
+ sendFragment(*body, offset, maxContentSize);
+ offset += maxContentSize;
+ }
+ uint32_t remainder = body->size() % maxContentSize;
+ if (remainder) {
+ sendFragment(*body, offset, remainder);
+ }
+ } else {
+ AMQFrame copy(f);
+ copy.setChannel(channel);
+ handler.handle(copy);
+ }
+}
+
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+{
+ AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+ handler.handle(fragment);
+}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/framing/SendContent.h
index 79a33ed7a9..a88319e2f9 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.h
+++ b/cpp/src/qpid/framing/SendContent.h
@@ -18,32 +18,35 @@
* under the License.
*
*/
-#ifndef _LazyLoadedContent_
-#define _LazyLoadedContent_
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
-#include "Content.h"
-#include "MessageStore.h"
-#include "BrokerMessageBase.h"
+#ifndef _SendContent_
+#define _SendContent_
namespace qpid {
- namespace broker {
- class LazyLoadedContent : public Content{
- MessageStore* const store;
- Message* const msg;
- const uint64_t expectedSize;
- public:
- LazyLoadedContent(
- MessageStore* const store, Message* const msg,
- uint64_t expectedSize);
- ~LazyLoadedContent();
- void add(qpid::framing::AMQContentBody* data);
- uint32_t size();
- void send(
- framing::ChannelAdapter&,
- uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
- };
- }
+namespace framing {
+
+/**
+ * Functor that sends frame to handler, refragmenting if
+ * necessary. Currently only works on content frames but this could be
+ * changed once we support multi-frame segments in general.
+ */
+class SendContent
+{
+ mutable FrameHandler& handler;
+ const uint16_t channel;
+ const uint16_t maxFrameSize;
+
+ void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+public:
+ SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
+ void operator()(AMQFrame& f) const;
+};
+
+}
}
diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/framing/TypeFilter.h
index a6fca7ca98..3a607190fd 100644
--- a/cpp/src/qpid/broker/InMemoryContent.h
+++ b/cpp/src/qpid/framing/TypeFilter.h
@@ -18,28 +18,34 @@
* under the License.
*
*/
-#ifndef _InMemoryContent_
-#define _InMemoryContent_
-
-#include "Content.h"
-#include "qpid/framing/AMQContentBody.h"
-#include <vector>
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+#ifndef _TypeFilter_
+#define _TypeFilter_
namespace qpid {
- namespace broker {
- class InMemoryContent : public Content{
- typedef std::vector<framing::AMQContentBody> content_list;
- typedef content_list::iterator content_iterator;
+namespace framing {
+
+/**
+ * Predicate that selects frames by type
+ */
+class TypeFilter
+{
+ std::vector<uint8_t> types;
+public:
+ TypeFilter(uint8_t type) { add(type); }
+ TypeFilter(uint8_t type1, uint8_t type2) { add(type1); add(type2); }
+ void add(uint8_t type) { types.push_back(type); }
+ bool operator()(const AMQFrame& f) const
+ {
+ return find(types.begin(), types.end(), f.getBody()->type()) != types.end();
+ }
+};
- content_list content;
- public:
- void add(framing::AMQContentBody* data);
- uint32_t size();
- void send(framing::ChannelAdapter&, uint32_t framesize);
- void encode(framing::Buffer& buffer);
- };
- }
+}
}
diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h
new file mode 100644
index 0000000000..3112da8e24
--- /dev/null
+++ b/cpp/src/qpid/framing/frame_functors.h
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <ostream>
+#include <iostream>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+
+#ifndef _frame_functors_
+#define _frame_functors_
+
+namespace qpid {
+namespace framing {
+
+class SumFrameSize
+{
+ uint64_t size;
+public:
+ SumFrameSize() : size(0) {}
+ void operator()(const AMQFrame& f) { size += f.size(); }
+ uint64_t getSize() { return size; }
+};
+
+class SumBodySize
+{
+ uint64_t size;
+public:
+ SumBodySize() : size(0) {}
+ void operator()(const AMQFrame& f) { size += f.getBody()->size(); }
+ uint64_t getSize() { return size; }
+};
+
+class EncodeFrame
+{
+ Buffer& buffer;
+public:
+ EncodeFrame(Buffer& b) : buffer(b) {}
+ void operator()(const AMQFrame& f) { f.encode(buffer); }
+};
+
+class EncodeBody
+{
+ Buffer& buffer;
+public:
+ EncodeBody(Buffer& b) : buffer(b) {}
+ void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); }
+};
+
+class AccumulateContent
+{
+ std::string& content;
+public:
+ AccumulateContent(std::string& c) : content(c) {}
+ void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); }
+};
+
+class Relay
+{
+ FrameHandler& handler;
+ const uint16_t channel;
+
+public:
+ Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {}
+
+ void operator()(AMQFrame& f)
+ {
+ AMQFrame copy(f);
+ copy.setChannel(channel);
+ handler.handle(copy);
+ }
+};
+
+class Print
+{
+ std::ostream& out;
+public:
+ Print(std::ostream& o) : out(o) {}
+
+ void operator()(const AMQFrame& f)
+ {
+ out << f << std::endl;
+ }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 3253a3d27a..1e5a30f157 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -19,12 +19,14 @@
*
*/
#include "qpid/broker/BrokerChannel.h"
-#include "qpid/broker/BrokerMessage.h"
#include "qpid/broker/BrokerQueue.h"
#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid_test_plugin.h"
#include <iostream>
+#include <sstream>
#include <memory>
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
@@ -72,7 +74,6 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE(BrokerChannelTest);
CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
@@ -155,7 +156,16 @@ class BrokerChannelTest : public CppUnit::TestCase
void check()
{
- CPPUNIT_ASSERT(expected.empty());
+ if (!expected.empty()) {
+ std::stringstream error;
+ error << "Expected: ";
+ while (!expected.empty()) {
+ MethodCall& m = expected.front();
+ error << m.name << "(" << m.msg << ", '" << m.data << "'); ";
+ expected.pop();
+ }
+ CPPUNIT_FAIL(error.str());
+ }
}
};
@@ -173,7 +183,7 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, recorder, 0, 0);
+ Channel channel(connection, recorder, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -203,7 +213,7 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
@@ -213,48 +223,6 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- void testStaging(){
- MockMessageStore store;
- connection.setFrameMax(1000);
- connection.setStagingThreshold(10);
- Channel channel(connection, recorder, 1, &store);
- const string data[] = {"abcde", "fghij", "klmno"};
-
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
-
- store.expect();
- store.stage(*msg);
- for (int i = 0; i < 3; i++) {
- store.appendContent(*msg, data[i]);
- }
- store.destroy(*msg);
- store.test();
-
- Exchange::shared_ptr exchange =
- broker->getExchanges().declare("my_exchange", "fanout").first;
- Queue::shared_ptr queue(new Queue("my_queue"));
- exchange->bind(queue, "", 0);
-
- AMQHeaderBody header(BASIC);
- uint64_t contentSize(0);
- for (int i = 0; i < 3; i++) {
- contentSize += data[i].size();
- }
- header.setContentSize(contentSize);
- channel.handlePublish(msg);
- channel.handleHeader(&header);
-
- for (int i = 0; i < 3; i++) {
- AMQContentBody body(data[i]);
- channel.handleContent(&body);
- }
- Message::shared_ptr msg2 = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
- msg2.reset();//should trigger destroy call
-
- store.check();
- }
-
//NOTE: strictly speaking this should/could be part of QueueTest,
//but as it can usefully use the same utility classes as this
@@ -279,7 +247,6 @@ class BrokerChannelTest : public CppUnit::TestCase
store.expect();
store.stage(*msg3);
- store.destroy(*msg3);
store.test();
Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
@@ -348,16 +315,17 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
- const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
+
+ //'publish' a message
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, "abcdefghijklmn");
queue->deliver(msg);
+
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
@@ -369,21 +337,26 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
+ Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(
- 0, exchange, routingKey, false, false);
- AMQHeaderBody header(BASIC);
- header.setContentSize(contentSize);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
return msg;
}
void addContent(Message::shared_ptr msg, const string& data)
{
- AMQContentBody body(data);
- msg->addContent(&body);
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
}
};
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index a9caa89321..b3a6a745b8 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -34,7 +34,7 @@ static const ProtocolVersion VER;
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame send(VER, 1, SessionOpenBody(VER));
+ AMQFrame send(1, SessionOpenBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame send(VER, 1, SessionOpenBody(VER));
+ AMQFrame send(1, SessionOpenBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -91,8 +91,8 @@ struct CountHandler : public FrameHandler {
/** Test the ClassifierHandler */
BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
- AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER));
- AMQFrame messageTrans(VER, 0, MessageTransferBody(VER));
+ AMQFrame queueDecl(0, QueueDeclareBody(VER));
+ AMQFrame messageTrans(0, MessageTransferBody(VER));
shared_ptr<CountHandler> wiring(new CountHandler());
shared_ptr<CountHandler> other(new CountHandler());
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index bd76e58127..c03d7396f0 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -40,7 +40,7 @@ void clusterTwo() {
BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame send(VER, 1, SessionAttachedBody(VER));
+ AMQFrame send(1, SessionAttachedBody(VER));
cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitPop(frame));
BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody());
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index ef2646519d..59941864e2 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -31,6 +31,7 @@
#include "qpid_test_plugin.h"
#include <iostream>
#include "qpid/framing/BasicGetBody.h"
+#include "MessageUtils.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -63,7 +64,7 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index a0dd8d37f6..1b843defc1 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -137,8 +137,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(version, 999,
- ConnectionRedirectBody(version, a, b));
+ AMQFrame in(999, ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -149,7 +148,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, BasicConsumeOkBody(version, s));
+ AMQFrame in(999, BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
diff --git a/cpp/src/tests/HeaderTest.cpp b/cpp/src/tests/HeaderTest.cpp
index 17381cc868..df2230342c 100644
--- a/cpp/src/tests/HeaderTest.cpp
+++ b/cpp/src/tests/HeaderTest.cpp
@@ -36,8 +36,8 @@ public:
void testGenericProperties()
{
- AMQHeaderBody body(BASIC);
- dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE");
+ AMQHeaderBody body;
+ body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE");
Buffer buffer(100);
body.encode(buffer);
@@ -45,7 +45,7 @@ public:
AMQHeaderBody body2;
body2.decode(buffer, body.size());
BasicHeaderProperties* props =
- dynamic_cast<BasicHeaderProperties*>(body2.getProperties());
+ body2.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"),
props->getHeaders().getString("A"));
}
@@ -64,10 +64,11 @@ public:
string userId("guest");
string appId("just testing");
string clusterId("no clustering required");
+ uint64_t contentLength(54321);
- AMQHeaderBody body(BASIC);
+ AMQFrame out(0, AMQHeaderBody());
BasicHeaderProperties* properties =
- dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+ out.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
properties->setContentType(contentType);
properties->getHeaders().setString("A", "BCDE");
properties->setDeliveryMode(deliveryMode);
@@ -81,13 +82,14 @@ public:
properties->setUserId(userId);
properties->setAppId(appId);
properties->setClusterId(clusterId);
+ properties->setContentLength(contentLength);
Buffer buffer(10000);
- body.encode(buffer);
+ out.encode(buffer);
buffer.flip();
- AMQHeaderBody temp;
- temp.decode(buffer, body.size());
- properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+ AMQFrame in;
+ in.decode(buffer);
+ properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A"));
@@ -102,6 +104,7 @@ public:
CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId());
CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId());
CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId());
+ CPPUNIT_ASSERT_EQUAL(contentLength, properties->getContentLength());
}
void testSomeSpecificProperties(){
@@ -111,9 +114,9 @@ public:
string expiration("Z");
uint64_t timestamp(0xabe4a34a);
- AMQHeaderBody body(BASIC);
+ AMQHeaderBody body;
BasicHeaderProperties* properties =
- dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+ body.get<BasicHeaderProperties>(true);
properties->setContentType(contentType);
properties->setDeliveryMode(deliveryMode);
properties->setPriority(priority);
@@ -125,7 +128,7 @@ public:
buffer.flip();
AMQHeaderBody temp;
temp.decode(buffer, body.size());
- properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+ properties = temp.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode());
diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp
deleted file mode 100644
index bc95548d45..0000000000
--- a/cpp/src/tests/InMemoryContentTest.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/InMemoryContent.h"
-#include "qpid_test_plugin.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include <iostream>
-#include <list>
-#include "qpid/framing/AMQFrame.h"
-#include "MockChannel.h"
-
-using std::list;
-using std::string;
-using boost::dynamic_pointer_cast;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-class InMemoryContentTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(InMemoryContentTest);
- CPPUNIT_TEST(testRefragmentation);
- CPPUNIT_TEST_SUITE_END();
-
-public:
- void testRefragmentation()
- {
- {//no remainder
- string out[] = {"abcde", "fghij", "klmno", "pqrst"};
- string in[] = {out[0] + out[1], out[2] + out[3]};
- refragment(2, in, 4, out);
- }
- {//remainder for last frame
- string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"};
- string in[] = {out[0] + out[1], out[2] + out[3] + out[4]};
- refragment(2, in, 5, out);
- }
- }
-
-
- void refragment(size_t inCount, string* in, size_t outCount, string* out, uint32_t framesize = 5)
- {
- InMemoryContent content;
- MockChannel channel(3);
-
- addframes(content, inCount, in);
- content.send(channel, framesize);
- CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
-
- for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody* chunk = dynamic_cast<AMQContentBody*>(
- channel.out.frames[i].getBody());
- CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(
- ChannelId(3), channel.out.frames[i].getChannel());
- }
- }
-
- void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
- {
- for (unsigned int i = 0; i < frameCount; i++) {
- AMQContentBody frame(frameData[i]);
- content.add(&frame);
- }
- }
-
-
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest);
-
diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp
deleted file mode 100644
index df46f6b48e..0000000000
--- a/cpp/src/tests/LazyLoadedContentTest.cpp
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/LazyLoadedContent.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid_test_plugin.h"
-#include <iostream>
-#include <list>
-#include <sstream>
-#include "qpid/framing/AMQFrame.h"
-#include "MockChannel.h"
-using std::list;
-using std::string;
-using boost::dynamic_pointer_cast;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-
-class LazyLoadedContentTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(LazyLoadedContentTest);
- CPPUNIT_TEST(testFragmented);
- CPPUNIT_TEST(testWhole);
- CPPUNIT_TEST(testHalved);
- CPPUNIT_TEST_SUITE_END();
-
- class TestMessageStore : public NullMessageStore
- {
- const string content;
-
- public:
- TestMessageStore(const string& _content) : content(_content) {}
-
- void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length)
- {
- if (offset + length <= content.size()) {
- data = content.substr(offset, length);
- } else{
- std::stringstream error;
- error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size();
- throw qpid::Exception(error.str());
- }
- }
- };
-
-
-public:
- void testFragmented()
- {
- string data = "abcdefghijklmnopqrstuvwxyz";
- uint32_t framesize = 5;
- string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"};
- load(data, 6, out, framesize);
- }
-
- void testWhole()
- {
- string data = "abcdefghijklmnopqrstuvwxyz";
- uint32_t framesize = 50;
- string out[] = {data};
- load(data, 1, out, framesize);
- }
-
- void testHalved()
- {
- string data = "abcdefghijklmnopqrstuvwxyz";
- uint32_t framesize = 13;
- string out[] = {"abcdefghijklm", "nopqrstuvwxyz"};
- load(data, 2, out, framesize);
- }
-
- void load(string& in, size_t outCount, string* out, uint32_t framesize)
- {
- TestMessageStore store(in);
- LazyLoadedContent content(&store, 0, in.size());
- MockChannel channel(3);
- content.send(channel, framesize);
- CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
-
- for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody* chunk(dynamic_cast<AMQContentBody*>(
- channel.out.frames[i].getBody()));
- CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(
- ChannelId(3), channel.out.frames[i].getChannel());
- }
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest);
-
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 34e7e973ac..7ff6a843a9 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -82,11 +82,8 @@ broker_unit_tests = \
DtxWorkRecordTest \
ExchangeTest \
HeadersExchangeTest \
- InMemoryContentTest \
- LazyLoadedContentTest \
MessageBuilderTest \
MessageTest \
- ReferenceTest \
QueueRegistryTest \
QueueTest \
QueuePolicyTest \
@@ -142,6 +139,7 @@ EXTRA_DIST += \
.valgrind.supp-default \
.valgrindrc-default \
InProcessBroker.h \
+ MessageUtils.h \
MockChannel.h \
MockConnectionInputHandler.h \
TxMocks.h \
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index a12fc603ce..341fdf56f5 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -18,15 +18,13 @@
* under the License.
*
*/
-#include "qpid/Exception.h"
-#include "qpid/broker/BrokerMessage.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NullMessageStore.h"
-#include "qpid/framing/Buffer.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/TypeFilter.h"
#include "qpid_test_plugin.h"
-#include <iostream>
-#include <memory>
-#include "MockChannel.h"
+#include <list>
using namespace boost;
using namespace qpid::broker;
@@ -35,72 +33,55 @@ using namespace qpid::sys;
class MessageBuilderTest : public CppUnit::TestCase
{
- struct MockHandler : CompletionHandler {
- Message::shared_ptr msg;
+ class MockMessageStore : public NullMessageStore
+ {
+ enum Op {STAGE=1, APPEND=2};
- virtual void complete(Message::shared_ptr _msg){
- msg = _msg;
+ uint64_t id;
+ PersistableMessage* expectedMsg;
+ string expectedData;
+ std::list<Op> ops;
+
+ void checkExpectation(Op actual)
+ {
+ CPPUNIT_ASSERT_EQUAL(ops.front(), actual);
+ ops.pop_front();
}
- };
- class TestMessageStore : public NullMessageStore
- {
- Buffer* header;
- Buffer* content;
- const uint32_t contentBufferSize;
-
- public:
+ public:
+ MockMessageStore() : id(0), expectedMsg(0) {}
- void stage(PersistableMessage& msg)
- {
- if (msg.getPersistenceId() == 0) {
- header = new Buffer(msg.encodedSize());
- msg.encode(*header);
- content = new Buffer(contentBufferSize);
- msg.setPersistenceId(1);
- } else {
- throw qpid::Exception("Message already staged!");
- }
+ void expectStage(PersistableMessage& msg)
+ {
+ expectedMsg = &msg;
+ ops.push_back(STAGE);
}
- void appendContent(PersistableMessage& msg, const string& data)
- {
- if (msg.getPersistenceId() == 1) {
- content->putRawData(data);
- } else {
- throw qpid::Exception("Invalid message id!");
- }
+ void expectAppendContent(PersistableMessage& msg, const string& data)
+ {
+ expectedMsg = &msg;
+ expectedData = data;
+ ops.push_back(APPEND);
}
- using NullMessageStore::destroy;
+ void stage(PersistableMessage& msg)
+ {
+ checkExpectation(STAGE);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
+ msg.setPersistenceId(++id);
+ }
- void destroy(PersistableMessage& msg)
+ void appendContent(PersistableMessage& msg, const string& data)
{
- CPPUNIT_ASSERT(msg.getPersistenceId());
+ checkExpectation(APPEND);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
+ CPPUNIT_ASSERT_EQUAL(expectedData, data);
}
- BasicMessage::shared_ptr getRestoredMessage()
+ bool expectationsMet()
{
- BasicMessage::shared_ptr msg(new BasicMessage());
- if (header) {
- header->flip();
- msg->decodeHeader(*header);
- delete header;
- header = 0;
- if (content) {
- content->flip();
- msg->decodeContent(*content);
- delete content;
- content = 0;
- }
- }
- return msg;
+ return ops.empty();
}
-
- //dont care about any of the other methods:
- TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0),
- contentBufferSize(_contentBufferSize) {}
- ~TestMessageStore(){}
};
CPPUNIT_TEST_SUITE(MessageBuilderTest);
@@ -113,106 +94,115 @@ class MessageBuilderTest : public CppUnit::TestCase
public:
void testHeaderOnly(){
- MockHandler handler;
- MessageBuilder builder(&handler);
-
- Message::shared_ptr message(
- new BasicMessage(
- 0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(0);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
+
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT_EQUAL(exchange, builder.getMessage()->getExchangeName());
+ CPPUNIT_ASSERT_EQUAL(key, builder.getMessage()->getRoutingKey());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void test1ContentFrame(){
- MockHandler handler;
- MessageBuilder builder(&handler);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
- string data1("abcdefg");
+ std::string data("abcdefg");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(7);
- AMQContentBody part1(data1);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part1);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content(0, AMQContentBody(data));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(header);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(content);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void test2ContentFrames(){
- MockHandler handler;
- MessageBuilder builder(&handler);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part1);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
+
+ std::string data1("abcdefg");
+ std::string data2("hijklmn");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+ builder.handle(content1);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(content2);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void testStaging(){
- //store must be the last thing to be destroyed or destructor
- //of Message fails (it uses the store to call destroy if lazy
- //loaded content is in use)
- TestMessageStore store(14);
- {
- MockHandler handler;
- MessageBuilder builder(&handler, &store, 5);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties());
- properties->setMessageId("MyMessage");
- properties->getHeaders().setString("abc", "xyz");
-
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
-
- builder.initialise(message);
- builder.setHeader(&header);
- builder.addContent(&part1);
- builder.addContent(&part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
-
- BasicMessage::shared_ptr restored = store.getRestoredMessage();
- CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
- CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
- restored->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize());
- }
+ MockMessageStore store;
+ MessageBuilder builder(&store, 5);
+ builder.start(SequenceNumber());
+
+ std::string data1("abcdefg");
+ std::string data2("hijklmn");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+
+ store.expectStage(*builder.getMessage());
+ builder.handle(content1);
+ CPPUNIT_ASSERT(store.expectationsMet());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId());
+
+ store.expectAppendContent(*builder.getMessage(), data2);
+ builder.handle(content2);
+ CPPUNIT_ASSERT(store.expectationsMet());
+
+ //were the content frames dropped?
+ CPPUNIT_ASSERT_EQUAL((uint64_t) 0, builder.getMessage()->contentSize());
}
};
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 1fbb18b7d3..3d080ef3dc 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "qpid/broker/BrokerMessage.h"
+#include "qpid/broker/Message.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include "qpid/framing/AMQP_HighestVersion.h"
@@ -45,40 +45,45 @@ class MessageTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- BasicMessage::shared_ptr msg(
- new BasicMessage(0, exchange, routingKey, false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
- msg->setHeader(&header);
- msg->addContent(&part1);
- msg->addContent(&part2);
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().append(content1);
+ msg->getFrames().append(content2);
+
+ MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ mProps->setContentLength(data1.size() + data2.size());
+ mProps->setMessageId(messageId);
+ FieldTable applicationHeaders;
+ applicationHeaders.setString("abc", "xyz");
+ mProps->setApplicationHeaders(applicationHeaders);
+ DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dProps->setRoutingKey(routingKey);
+ dProps->setDeliveryMode(PERSISTENT);
+ CPPUNIT_ASSERT(msg->isPersistent());
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
- msg->getHeaderProperties()->getHeaders().setString("abc", "xyz");
Buffer buffer(msg->encodedSize());
msg->encode(buffer);
- buffer.flip();
-
- msg.reset(new BasicMessage());
- msg->decode(buffer);
- CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
- CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize());
+ buffer.flip();
+ msg.reset(new Message());
+ msg->decodeHeader(buffer);
+ msg->decodeContent(buffer);
- MockChannel channel(1);
- msg->deliver(channel, "ignore", 0, 100);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
- AMQContentBody* contentBody(
- dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody()));
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
+ CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
+ CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
+ CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
+ CPPUNIT_ASSERT(msg->isPersistent());
}
};
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
new file mode 100644
index 0000000000..7fb1755c4b
--- /dev/null
+++ b/cpp/src/tests/MessageUtils.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
+#include "qpid/framing/AMQFrame.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct MessageUtils
+{
+ static Message::shared_ptr createMessage(const string& exchange, const string& routingKey,
+ const string& messageId, uint64_t contentSize = 0)
+ {
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+ }
+
+ static void addContent(Message::shared_ptr msg, const string& data)
+ {
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
+ }
+};
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index e7ca124631..ef1518af4c 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -70,8 +70,13 @@ class QueueTest : public CppUnit::TestCase
public:
Message::shared_ptr message(std::string exchange, std::string routingKey) {
- return Message::shared_ptr(
- new BasicMessage(0, exchange, routingKey, false, false));
+ Message::shared_ptr msg(new Message());
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
}
diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp
deleted file mode 100644
index 411462564a..0000000000
--- a/cpp/src/tests/ReferenceTest.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <memory>
-#include "qpid_test_plugin.h"
-#include "qpid/broker/Reference.h"
-#include "qpid/broker/BrokerMessageMessage.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "qpid/broker/CompletionHandler.h"
-
-using namespace boost;
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace std;
-
-class ReferenceTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ReferenceTest);
- CPPUNIT_TEST(testRegistry);
- CPPUNIT_TEST(testReference);
- CPPUNIT_TEST_SUITE_END();
-
- ProtocolVersion v;
- ReferenceRegistry registry;
-
- public:
- void testRegistry() {
- Reference::shared_ptr ref = registry.open("foo");
- CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId());
- CPPUNIT_ASSERT(ref == registry.get("foo"));
- try {
- registry.get("none");
- CPPUNIT_FAIL("Expected exception");
- } catch (...) {}
- try {
- registry.open("foo");
- CPPUNIT_FAIL("Expected exception");
- } catch(...) {}
- ref->close();
- try {
- registry.get("foo");
- CPPUNIT_FAIL("Expected exception");
- } catch(...) {}
- }
-
- void testReference() {
-
- Reference::shared_ptr r1(registry.open("bar"));
-
- MessageTransferBody t1(v);
- // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up.
- const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1));
-
- MessageTransferBody t2(v);
- const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1));
-
- MessageAppendBody a1(v);
- MessageAppendBody a2(v);
-
- r1->addMessage(m1);
- r1->addMessage(m2);
- CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size());
- r1->append(a1);
- r1->append(a2);
- CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
- r1->close();
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest);
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 24e8aac701..89a907d495 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -68,11 +68,13 @@ public:
TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(
- new BasicMessage(0, "exchange", "routing_key", false, false));
- AMQHeaderBody body(BASIC);
- msg->setHeader(&body);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ Message::shared_ptr msg(new Message());
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, "exchange", 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
messages.push_back(msg);
deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
}
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index d009dd9112..5628cf1d1c 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -26,6 +26,7 @@
#include <list>
#include <vector>
#include "MockChannel.h"
+#include "MessageUtils.h"
using std::list;
using std::pair;
@@ -70,12 +71,10 @@ public:
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
+ msg(MessageUtils::createMessage("exchange", "routing_key", "id")),
op(msg)
{
- AMQHeaderBody body(BASIC);
- msg->setHeader(&body);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}