summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-24 17:21:47 +0000
committerGordon Sim <gsim@apache.org>2006-11-24 17:21:47 +0000
commitd6befaeb77df8a09845e4c11070afe8ab4d5052d (patch)
treeb5f9ec40dedf2053d04c87f0117f0953a3026180 /cpp/src
parentb442c78351bf330cf23b67e86aa17424d5a78966 (diff)
downloadqpid-python-d6befaeb77df8a09845e4c11070afe8ab4d5052d.tar.gz
Initial sketching out of staging functionality for large messages (i.e. allowing content to be stored as it arrives, rather than collecting it in memory until complete).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478923 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Channel.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp95
-rw-r--r--cpp/src/qpid/broker/Message.h32
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp17
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h6
-rw-r--r--cpp/src/qpid/broker/MessageStore.h19
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp10
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp54
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h24
10 files changed, 201 insertions, 60 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index 2894e294e0..8b26099f1f 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -167,6 +167,8 @@ void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
messageBuilder.setHeader(header);
+ //at this point, decide based on the size of the message whether we want
+ //to stage it by saving content directly to disk as it arrives
}
void Channel::handleContent(AMQContentBody::shared_ptr content){
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index f71324f3fa..b0b5a85031 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -41,23 +41,11 @@ Message::Message(const ConnectionToken* const _publisher,
persistenceId(0) {}
Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
-
- AMQFrame headerFrame;
- headerFrame.decode(buffer);
- AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, AMQBody>(headerFrame.getBody());
- setHeader(headerBody);
-
- AMQContentBody::shared_ptr contentBody;
- while (buffer.available()) {
- AMQFrame contentFrame;
- contentFrame.decode(buffer);
- contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
- addContent(contentBody);
- }
+ decode(buffer);
}
+Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+
Message::~Message(){}
void Message::setHeader(AMQHeaderBody::shared_ptr _header){
@@ -83,7 +71,6 @@ void Message::deliver(OutputHandler* out, int channel,
// AMQP version change - kpvdr 2006-11-17
// TODO: Make this class version-aware and link these hard-wired numbers to that version
out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
-// out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -128,18 +115,54 @@ bool Message::isPersistent()
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::encode(Buffer& buffer)
+void Message::decode(Buffer& buffer)
{
- buffer.putShortString(exchange);
- buffer.putShortString(routingKey);
+ decodeHeader(buffer);
+ decodeContent(buffer);
+}
+
+void Message::decodeHeader(Buffer& buffer)
+{
+ buffer.getShortString(exchange);
+ buffer.getShortString(routingKey);
- AMQBody::shared_ptr body;
+ u_int32_t headerSize = buffer.getLong();
+ AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
+ headerBody->decode(buffer, headerSize);
+ setHeader(headerBody);
+}
- body = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+void Message::decodeContent(Buffer& buffer)
+{
+ AMQContentBody::shared_ptr contentBody;
+ while (buffer.available()) {
+ AMQFrame contentFrame;
+ contentFrame.decode(buffer);
+ contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
+ addContent(contentBody);
+ }
+}
- AMQFrame headerFrame(0, body);
- headerFrame.encode(buffer);
-
+void Message::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+ encodeContent(buffer);
+}
+
+void Message::encodeHeader(Buffer& buffer)
+{
+ buffer.putShortString(exchange);
+ buffer.putShortString(routingKey);
+ buffer.putLong(header->size());
+ header->encode(buffer);
+}
+
+void Message::encodeContent(Buffer& buffer)
+{
+ //Use a frame around each content block. Not really required but
+ //gives some error checking at little expense. Could change in the
+ //future...
+ AMQBody::shared_ptr body;
for (content_iterator i = content.begin(); i != content.end(); i++) {
body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
AMQFrame contentFrame(0, body);
@@ -149,13 +172,31 @@ void Message::encode(Buffer& buffer)
u_int32_t Message::encodedSize()
{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+u_int32_t Message::encodedContentSize()
+{
int encodedContentSize(0);
for (content_iterator i = content.begin(); i != content.end(); i++) {
- encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame (TODO, could replace frame by simple size)
+ encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
}
+ return encodedContentSize;
+}
+u_int32_t Message::encodedHeaderSize()
+{
return exchange.size() + 1
+ routingKey.size() + 1
- + header->size() + 8 //8 extra bytes for frame (TODO, could actually remove the frame)
- + encodedContentSize;
+ + header->size() + 4;//4 extra bytes for size
+}
+
+u_int64_t Message::expectedContentSize()
+{
+ return header.get() ? header->getContentSize() : 0;
+}
+
+void Message::releaseContent()
+{
+ content.clear();
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 3d0a0d358b..2c56c845ac 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -63,6 +63,7 @@ namespace qpid {
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
Message(qpid::framing::Buffer& buffer);
+ Message();
~Message();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void addContent(qpid::framing::AMQContentBody::shared_ptr data);
@@ -88,12 +89,39 @@ namespace qpid {
u_int64_t contentSize() const { return size; }
u_int64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
+
+ void decode(qpid::framing::Buffer& buffer);
+ void decodeHeader(qpid::framing::Buffer& buffer);
+ void decodeContent(qpid::framing::Buffer& buffer);
+
void encode(qpid::framing::Buffer& buffer);
+ void encodeHeader(qpid::framing::Buffer& buffer);
+ void encodeContent(qpid::framing::Buffer& buffer);
/**
- * @returns the size of the buffer needed to encode this message
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
*/
u_int32_t encodedSize();
-
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ u_int32_t encodedHeaderSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ u_int32_t encodedContentSize();
+ /**
+ * Releases the in-memory content data held by this message.
+ */
+ void releaseContent();
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ u_int64_t expectedContentSize();
};
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index a04c6def41..b4efd3d001 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -23,12 +23,22 @@
using namespace qpid::broker;
using namespace qpid::framing;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler) : handler(_handler) {}
+MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :
+ handler(_handler),
+ store(_store),
+ stagingThreshold(_stagingThreshold),
+ staging(false)
+{}
void MessageBuilder::route(){
- if(message->isComplete()){
- if(handler) handler->complete(message);
+ if (staging && store) {
+ store->stage(message);
+ message->releaseContent();
+ }
+ if (message->isComplete()) {
+ if (handler) handler->complete(message);
message.reset();
+ staging = false;
}
}
@@ -44,6 +54,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
}
message->setHeader(header);
+ staging = stagingThreshold && header->getContentSize() >= stagingThreshold;
route();
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index f4a9240b74..a533a4da6f 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -24,6 +24,7 @@
#include <qpid/QpidError.h>
#include <qpid/broker/Exchange.h>
#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageStore.h>
#include <qpid/framing/AMQContentBody.h>
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/BasicPublishBody.h>
@@ -37,13 +38,16 @@ namespace qpid {
virtual void complete(Message::shared_ptr&) = 0;
virtual ~CompletionHandler(){}
};
- MessageBuilder(CompletionHandler* _handler);
+ MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
private:
Message::shared_ptr message;
CompletionHandler* handler;
+ MessageStore* const store;
+ const u_int64_t stagingThreshold;
+ bool staging;
void route();
};
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 13b5ba1152..322b03e67c 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -47,6 +47,25 @@ namespace qpid {
virtual void recover(RecoveryManager& queues) = 0;
/**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers and any available content. If the
+ * message has already been stored it will append any
+ * currently held content.
+ */
+ virtual void stage(Message::shared_ptr& msg) = 0;
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(Message::shared_ptr& msg) = 0;
+
+ /**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 1f26807f54..3e58a329de 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -43,6 +43,16 @@ void MessageStoreModule::recover(RecoveryManager& registry)
store->recover(registry);
}
+void MessageStoreModule::stage(Message::shared_ptr& msg)
+{
+ store->stage(msg);
+}
+
+void MessageStoreModule::destroy(Message::shared_ptr& msg)
+{
+ store->destroy(msg);
+}
+
void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
{
store->enqueue(ctxt, msg, queue, xid);
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 29b62ccfa2..0afb7c7186 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -39,6 +39,8 @@ namespace qpid {
void create(const Queue& queue);
void destroy(const Queue& queue);
void recover(RecoveryManager& queues);
+ void stage(Message::shared_ptr& msg);
+ void destroy(Message::shared_ptr& msg);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void committed(const string * const xid);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 7bc95225b4..ffa444f1a2 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -28,31 +28,51 @@
using namespace qpid::broker;
-void NullMessageStore::create(const Queue& queue){
- std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+
+void NullMessageStore::create(const Queue& queue)
+{
+ if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::destroy(const Queue& queue)
+{
+ if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::recover(RecoveryManager&)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
}
-void NullMessageStore::destroy(const Queue& queue){
- std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+void NullMessageStore::stage(Message::shared_ptr&)
+{
+ if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
}
-void NullMessageStore::recover(RecoveryManager&){
- std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
+void NullMessageStore::destroy(Message::shared_ptr&)
+{
+ if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
}
-void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){
- std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const){
- std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::committed(const string * const){
- std::cout << "WARNING: Persistence not enabled." << std::endl;
+void NullMessageStore::committed(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
}
-void NullMessageStore::aborted(const string * const){
- std::cout << "WARNING: Persistence not enabled." << std::endl;
+void NullMessageStore::aborted(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
}
-std::auto_ptr<TransactionContext> NullMessageStore::begin(){
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
+{
return std::auto_ptr<TransactionContext>();
}
-void NullMessageStore::commit(TransactionContext*){
+void NullMessageStore::commit(TransactionContext*)
+{
}
-void NullMessageStore::abort(TransactionContext*){
+void NullMessageStore::abort(TransactionContext*)
+{
}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 7916467091..5b363db662 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -32,17 +32,21 @@ namespace qpid {
* A null implementation of the MessageStore interface
*/
class NullMessageStore : public MessageStore{
+ const bool warn;
public:
- void create(const Queue& queue);
- void destroy(const Queue& queue);
- void recover(RecoveryManager& queues);
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void committed(const string * const xid);
- void aborted(const string * const xid);
- std::auto_ptr<TransactionContext> begin();
- void commit(TransactionContext* ctxt);
- void abort(TransactionContext* ctxt);
+ NullMessageStore(bool warn = true);
+ void virtual create(const Queue& queue);
+ void virtual destroy(const Queue& queue);
+ void virtual recover(RecoveryManager& queues);
+ void virtual stage(Message::shared_ptr& msg);
+ void virtual destroy(Message::shared_ptr& msg);
+ void virtual enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void virtual dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void virtual committed(const string * const xid);
+ void virtual aborted(const string * const xid);
+ virtual std::auto_ptr<TransactionContext> begin();
+ void virtual commit(TransactionContext* ctxt);
+ void virtual abort(TransactionContext* ctxt);
~NullMessageStore(){}
};
}