diff options
Diffstat (limited to 'qpid/cpp/lib')
-rw-r--r-- | qpid/cpp/lib/broker/Broker.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/lib/broker/Broker.h | 4 | ||||
-rw-r--r-- | qpid/cpp/lib/broker/BrokerMessageBase.h | 12 | ||||
-rw-r--r-- | qpid/cpp/lib/broker/BrokerMessageMessage.cpp | 77 | ||||
-rw-r--r-- | qpid/cpp/lib/broker/BrokerMessageMessage.h | 14 | ||||
-rw-r--r-- | qpid/cpp/lib/common/framing/AMQRequestBody.h | 2 |
6 files changed, 98 insertions, 24 deletions
diff --git a/qpid/cpp/lib/broker/Broker.cpp b/qpid/cpp/lib/broker/Broker.cpp index f650452e33..335ce2b3a0 100644 --- a/qpid/cpp/lib/broker/Broker.cpp +++ b/qpid/cpp/lib/broker/Broker.cpp @@ -47,17 +47,13 @@ const std::string amq_match("amq.match"); Broker::Broker(const Configuration& conf) : config(conf), + store(createStore(conf)), queues(store.get()), timeout(30000), stagingThreshold(0), cleaner(&queues, timeout/10), factory(*this) { - if (config.getStore().empty()) - store.reset(new NullMessageStore(config.isTrace())); - else - store.reset(new MessageStoreModule(config.getStore())); - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); exchanges.declare(amq_topic, TopicExchange::typeName); @@ -84,6 +80,13 @@ Broker::shared_ptr Broker::create(int16_t port) Broker::shared_ptr Broker::create(const Configuration& config) { return Broker::shared_ptr(new Broker(config)); } + +MessageStore* Broker::createStore(const Configuration& config) { + if (config.getStore().empty()) + return new NullMessageStore(config.isTrace()); + else + return new MessageStoreModule(config.getStore()); +} void Broker::run() { getAcceptor().run(&factory); diff --git a/qpid/cpp/lib/broker/Broker.h b/qpid/cpp/lib/broker/Broker.h index 7c21e90b18..68c04336d8 100644 --- a/qpid/cpp/lib/broker/Broker.h +++ b/qpid/cpp/lib/broker/Broker.h @@ -90,13 +90,15 @@ class Broker : public sys::Runnable, Configuration config; sys::Acceptor::shared_ptr acceptor; - std::auto_ptr<MessageStore> store; + const std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; uint32_t timeout; uint64_t stagingThreshold; AutoDelete cleaner; ConnectionFactory factory; + + static MessageStore* createStore(const Configuration& config); }; }} diff --git a/qpid/cpp/lib/broker/BrokerMessageBase.h b/qpid/cpp/lib/broker/BrokerMessageBase.h index 709369ae2f..7739ab19e0 100644 --- a/qpid/cpp/lib/broker/BrokerMessageBase.h +++ b/qpid/cpp/lib/broker/BrokerMessageBase.h @@ -121,22 +121,18 @@ class Message { return publisher; } - virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encode(framing::Buffer& buffer) = 0; + virtual void encodeHeader(framing::Buffer& buffer) = 0; /** * @returns the size of the buffer needed to encode this * message in its entirety - * - * XXXX: Only used in tests? */ virtual uint32_t encodedSize() = 0; /** * @returns the size of the buffer needed to encode the * 'header' of this message (not just the header frame, * but other meta data e.g.routing key and exchange) - * - * XXXX: Only used in tests? */ virtual uint32_t encodedHeaderSize() = 0; /** @@ -149,6 +145,10 @@ class Message { * content size else returns 0. */ virtual uint64_t expectedContentSize() = 0; + + virtual void decodeHeader(framing::Buffer& buffer) = 0; + virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0; + // TODO: AMS 29/1/2007 Don't think these are really part of base class diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp index 3449078d70..a50375cdd3 100644 --- a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp @@ -26,6 +26,7 @@ #include "MessageCloseBody.h" #include "MessageAppendBody.h" #include "Reference.h" +#include "framing/AMQFrame.h" #include "framing/FieldTable.h" #include "framing/BasicHeaderProperties.h" @@ -61,6 +62,11 @@ MessageMessage::MessageMessage( reference(reference_) {} +/** + * Currently used by message store impls to recover messages + */ +MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} + // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( framing::ChannelAdapter& channel, @@ -213,27 +219,82 @@ bool MessageMessage::isPersistent() uint32_t MessageMessage::encodedSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return encodedHeaderSize() + encodedContentSize(); } uint32_t MessageMessage::encodedHeaderSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return transfer->size() - transfer->baseSize(); } uint32_t MessageMessage::encodedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; } uint64_t MessageMessage::expectedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; +} + +void MessageMessage::encode(Buffer& buffer) +{ + encodeHeader(buffer); +} + +void MessageMessage::encodeHeader(Buffer& buffer) +{ + if (transfer->getBody().isInline()) { + transfer->encodeContent(buffer); + } else { + string data; + for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) { + data += (*a)->getBytes(); + } + framing::Content body(INLINE, data); + std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); + copy->encodeContent(buffer); + } +} + +void MessageMessage::decodeHeader(Buffer& buffer) +{ + transfer->decodeContent(buffer); +} + +void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) +{ } +MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) +{ + return new MessageTransferBody(version, + transfer->getTicket(), + destination, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + body, + transfer->getMandatory()); + +} }} // namespace qpid::broker diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.h b/qpid/cpp/lib/broker/BrokerMessageMessage.h index 8a2ff3a063..a13a63a416 100644 --- a/qpid/cpp/lib/broker/BrokerMessageMessage.h +++ b/qpid/cpp/lib/broker/BrokerMessageMessage.h @@ -45,6 +45,7 @@ class MessageMessage: public Message{ MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer); MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference); + MessageMessage(); // Default destructor okay @@ -70,15 +71,22 @@ class MessageMessage: public Message{ const framing::FieldTable& getApplicationHeaders(); bool isPersistent(); + void encode(framing::Buffer& buffer); + void encodeHeader(framing::Buffer& buffer); uint32_t encodedSize(); uint32_t encodedHeaderSize(); uint32_t encodedContentSize(); uint64_t expectedContentSize(); + void decodeHeader(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); private: - void transferMessage(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize); + void transferMessage(framing::ChannelAdapter& channel, + const std::string& consumerTag, + uint32_t framesize); + framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version, + const std::string& destination, + const framing::Content& body); framing::RequestId requestId; const TransferPtr transfer; diff --git a/qpid/cpp/lib/common/framing/AMQRequestBody.h b/qpid/cpp/lib/common/framing/AMQRequestBody.h index e184fff1d6..f21659a57a 100644 --- a/qpid/cpp/lib/common/framing/AMQRequestBody.h +++ b/qpid/cpp/lib/common/framing/AMQRequestBody.h @@ -63,8 +63,8 @@ class AMQRequestBody : public AMQMethodBody void setResponseMark(ResponseId mark) { data.responseMark=mark; } bool isRequest()const { return true; } - protected: static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; } + protected: void printPrefix(std::ostream& out) const; private: |