summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/lib')
-rw-r--r--qpid/cpp/lib/broker/Broker.cpp13
-rw-r--r--qpid/cpp/lib/broker/Broker.h4
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageBase.h12
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.cpp77
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.h14
-rw-r--r--qpid/cpp/lib/common/framing/AMQRequestBody.h2
6 files changed, 98 insertions, 24 deletions
diff --git a/qpid/cpp/lib/broker/Broker.cpp b/qpid/cpp/lib/broker/Broker.cpp
index f650452e33..335ce2b3a0 100644
--- a/qpid/cpp/lib/broker/Broker.cpp
+++ b/qpid/cpp/lib/broker/Broker.cpp
@@ -47,17 +47,13 @@ const std::string amq_match("amq.match");
Broker::Broker(const Configuration& conf) :
config(conf),
+ store(createStore(conf)),
queues(store.get()),
timeout(30000),
stagingThreshold(0),
cleaner(&queues, timeout/10),
factory(*this)
{
- if (config.getStore().empty())
- store.reset(new NullMessageStore(config.isTrace()));
- else
- store.reset(new MessageStoreModule(config.getStore()));
-
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
exchanges.declare(amq_topic, TopicExchange::typeName);
@@ -84,6 +80,13 @@ Broker::shared_ptr Broker::create(int16_t port)
Broker::shared_ptr Broker::create(const Configuration& config) {
return Broker::shared_ptr(new Broker(config));
}
+
+MessageStore* Broker::createStore(const Configuration& config) {
+ if (config.getStore().empty())
+ return new NullMessageStore(config.isTrace());
+ else
+ return new MessageStoreModule(config.getStore());
+}
void Broker::run() {
getAcceptor().run(&factory);
diff --git a/qpid/cpp/lib/broker/Broker.h b/qpid/cpp/lib/broker/Broker.h
index 7c21e90b18..68c04336d8 100644
--- a/qpid/cpp/lib/broker/Broker.h
+++ b/qpid/cpp/lib/broker/Broker.h
@@ -90,13 +90,15 @@ class Broker : public sys::Runnable,
Configuration config;
sys::Acceptor::shared_ptr acceptor;
- std::auto_ptr<MessageStore> store;
+ const std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
uint32_t timeout;
uint64_t stagingThreshold;
AutoDelete cleaner;
ConnectionFactory factory;
+
+ static MessageStore* createStore(const Configuration& config);
};
}}
diff --git a/qpid/cpp/lib/broker/BrokerMessageBase.h b/qpid/cpp/lib/broker/BrokerMessageBase.h
index 709369ae2f..7739ab19e0 100644
--- a/qpid/cpp/lib/broker/BrokerMessageBase.h
+++ b/qpid/cpp/lib/broker/BrokerMessageBase.h
@@ -121,22 +121,18 @@ class Message {
return publisher;
}
- virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
- virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encode(framing::Buffer& buffer) = 0;
+ virtual void encodeHeader(framing::Buffer& buffer) = 0;
/**
* @returns the size of the buffer needed to encode this
* message in its entirety
- *
- * XXXX: Only used in tests?
*/
virtual uint32_t encodedSize() = 0;
/**
* @returns the size of the buffer needed to encode the
* 'header' of this message (not just the header frame,
* but other meta data e.g.routing key and exchange)
- *
- * XXXX: Only used in tests?
*/
virtual uint32_t encodedHeaderSize() = 0;
/**
@@ -149,6 +145,10 @@ class Message {
* content size else returns 0.
*/
virtual uint64_t expectedContentSize() = 0;
+
+ virtual void decodeHeader(framing::Buffer& buffer) = 0;
+ virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
+
// TODO: AMS 29/1/2007 Don't think these are really part of base class
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
index 3449078d70..a50375cdd3 100644
--- a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -26,6 +26,7 @@
#include "MessageCloseBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
+#include "framing/AMQFrame.h"
#include "framing/FieldTable.h"
#include "framing/BasicHeaderProperties.h"
@@ -61,6 +62,11 @@ MessageMessage::MessageMessage(
reference(reference_)
{}
+/**
+ * Currently used by message store impls to recover messages
+ */
+MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
+
// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
void MessageMessage::transferMessage(
framing::ChannelAdapter& channel,
@@ -213,27 +219,82 @@ bool MessageMessage::isPersistent()
uint32_t MessageMessage::encodedSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return encodedHeaderSize() + encodedContentSize();
}
uint32_t MessageMessage::encodedHeaderSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return transfer->size() - transfer->baseSize();
}
uint32_t MessageMessage::encodedContentSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return 0;
}
uint64_t MessageMessage::expectedContentSize()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return 0;
+}
+
+void MessageMessage::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+}
+
+void MessageMessage::encodeHeader(Buffer& buffer)
+{
+ if (transfer->getBody().isInline()) {
+ transfer->encodeContent(buffer);
+ } else {
+ string data;
+ for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
+ data += (*a)->getBytes();
+ }
+ framing::Content body(INLINE, data);
+ std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
+ copy->encodeContent(buffer);
+ }
+}
+
+void MessageMessage::decodeHeader(Buffer& buffer)
+{
+ transfer->decodeContent(buffer);
+}
+
+void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
+{
}
+MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
+ const string& destination,
+ const framing::Content& body)
+{
+ return new MessageTransferBody(version,
+ transfer->getTicket(),
+ destination,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ body,
+ transfer->getMandatory());
+
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.h b/qpid/cpp/lib/broker/BrokerMessageMessage.h
index 8a2ff3a063..a13a63a416 100644
--- a/qpid/cpp/lib/broker/BrokerMessageMessage.h
+++ b/qpid/cpp/lib/broker/BrokerMessageMessage.h
@@ -45,6 +45,7 @@ class MessageMessage: public Message{
MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
+ MessageMessage();
// Default destructor okay
@@ -70,15 +71,22 @@ class MessageMessage: public Message{
const framing::FieldTable& getApplicationHeaders();
bool isPersistent();
+ void encode(framing::Buffer& buffer);
+ void encodeHeader(framing::Buffer& buffer);
uint32_t encodedSize();
uint32_t encodedHeaderSize();
uint32_t encodedContentSize();
uint64_t expectedContentSize();
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
private:
- void transferMessage(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize);
+ void transferMessage(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint32_t framesize);
+ framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version,
+ const std::string& destination,
+ const framing::Content& body);
framing::RequestId requestId;
const TransferPtr transfer;
diff --git a/qpid/cpp/lib/common/framing/AMQRequestBody.h b/qpid/cpp/lib/common/framing/AMQRequestBody.h
index e184fff1d6..f21659a57a 100644
--- a/qpid/cpp/lib/common/framing/AMQRequestBody.h
+++ b/qpid/cpp/lib/common/framing/AMQRequestBody.h
@@ -63,8 +63,8 @@ class AMQRequestBody : public AMQMethodBody
void setResponseMark(ResponseId mark) { data.responseMark=mark; }
bool isRequest()const { return true; }
- protected:
static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; }
+ protected:
void printPrefix(std::ostream& out) const;
private: