diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-28 15:25:35 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-28 15:25:35 +0000 |
commit | 4ad5fc50f0894e219c37118252d6a618419ea212 (patch) | |
tree | b48ff062cd48b7de6b606330c5cefecf15b7691a /cpp/src | |
parent | e2665f7339231eb2d85506c86a96f0859016fa89 (diff) | |
download | qpid-python-4ad5fc50f0894e219c37118252d6a618419ea212.tar.gz |
Modifications to allow loading of message data in chunks, refragmentation of messages, plus some related refactoring and tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Content.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/InMemoryContent.cpp | 69 | ||||
-rw-r--r-- | cpp/src/qpid/broker/InMemoryContent.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 91 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 24 |
14 files changed, 404 insertions, 73 deletions
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h new file mode 100644 index 0000000000..917222fb5a --- /dev/null +++ b/cpp/src/qpid/broker/Content.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Content_ +#define _Content_ + +#include <qpid/framing/AMQContentBody.h> +#include <qpid/framing/Buffer.h> +#include <qpid/framing/OutputHandler.h> + +namespace qpid { + namespace broker { + class Content{ + public: + virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0; + virtual u_int32_t size() = 0; + virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; + virtual void encode(qpid::framing::Buffer& buffer) = 0; + virtual ~Content(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp new file mode 100644 index 0000000000..fe15def5c8 --- /dev/null +++ b/cpp/src/qpid/broker/InMemoryContent.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/InMemoryContent.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using boost::static_pointer_cast; + +void InMemoryContent::add(AMQContentBody::shared_ptr data) +{ + content.push_back(data); +} + +u_int32_t InMemoryContent::size() +{ + int sum(0); + for (content_iterator i = content.begin(); i != content.end(); i++) { + sum += (*i)->size() + 8;//8 extra bytes for the frame + //TODO: have to get rid of the frame stuff from encoded data + } + return sum; +} + +void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + if ((*i)->size() > framesize) { + u_int32_t offset = 0; + for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { + string data = (*i)->getData().substr(offset, framesize); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + offset += framesize; + } + u_int32_t remainder = (*i)->size() % framesize; + if (remainder) { + string data = (*i)->getData().substr(offset, remainder); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } + } else { + AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); + out->send(new AMQFrame(channel, contentBody)); + } + } +} + +void InMemoryContent::encode(Buffer& buffer) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + (*i)->encode(buffer); + } +} diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h new file mode 100644 index 0000000000..5e851722f2 --- /dev/null +++ b/cpp/src/qpid/broker/InMemoryContent.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _InMemoryContent_ +#define _InMemoryContent_ + +#include <qpid/broker/Content.h> +#include <vector> + +namespace qpid { + namespace broker { + class InMemoryContent : public Content{ + typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; + typedef content_list::iterator content_iterator; + + content_list content; + public: + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + ~InMemoryContent(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp new file mode 100644 index 0000000000..eb7536dde3 --- /dev/null +++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LazyLoadedContent.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t _msgId, u_int64_t _expectedSize) : + store(_store), msgId(_msgId), expectedSize(_expectedSize) {} + +void LazyLoadedContent::add(AMQContentBody::shared_ptr data) +{ + store->appendContent(msgId, data->getData()); +} + +u_int32_t LazyLoadedContent::size() +{ + return 0;//all content is written as soon as it is added +} + +void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize) +{ + if (expectedSize > framesize) { + for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { + u_int64_t remaining = expectedSize - offset; + string data; + store->loadContent(msgId, data, offset, remaining > framesize ? framesize : remaining); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } + } else { + string data; + store->loadContent(msgId, data, 0, expectedSize); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } +} + +void LazyLoadedContent::encode(Buffer&) +{ + //do nothing as all content is written as soon as it is added +} diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h new file mode 100644 index 0000000000..5a406e3131 --- /dev/null +++ b/cpp/src/qpid/broker/LazyLoadedContent.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LazyLoadedContent_ +#define _LazyLoadedContent_ + +#include <qpid/broker/Content.h> +#include <qpid/broker/MessageStore.h> + +namespace qpid { + namespace broker { + class LazyLoadedContent : public Content{ + MessageStore* const store; + const u_int64_t msgId; + const u_int64_t expectedSize; + public: + LazyLoadedContent(MessageStore* const store, u_int64_t msgId, u_int64_t expectedSize); + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + ~LazyLoadedContent(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index b0b5a85031..64e66c4a30 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -20,6 +20,10 @@ */ #include <qpid/broker/Message.h> #include <iostream> + +#include <qpid/broker/InMemoryContent.h> +#include <qpid/broker/LazyLoadedContent.h> +#include <qpid/broker/MessageStore.h> // AMQP version change - kpvdr 2006-11-17 #include <qpid/framing/ProtocolVersion.h> #include <qpid/framing/BasicDeliverBody.h> @@ -40,8 +44,10 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ - decode(buffer); +Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : + publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + + decode(buffer, headersOnly, contentChunkSize); } Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} @@ -53,7 +59,10 @@ void Message::setHeader(AMQHeaderBody::shared_ptr _header){ } void Message::addContent(AMQContentBody::shared_ptr data){ - content.push_back(data); + if (!content.get()) { + content = std::auto_ptr<Content>(new InMemoryContent()); + } + content->add(data); size += data->size(); } @@ -68,8 +77,9 @@ void Message::redeliver(){ void Message::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version + + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -80,8 +90,8 @@ void Message::sendGetOk(OutputHandler* out, u_int64_t deliveryTag, u_int32_t framesize){ - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); sendContent(out, channel, framesize); } @@ -89,15 +99,8 @@ void Message::sendGetOk(OutputHandler* out, void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); - for(content_iterator i = content.begin(); i != content.end(); i++){ - if((*i)->size() > framesize){ - //TODO: need to split it - std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl; - }else{ - AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(channel, contentBody)); - } - } + + if (content.get()) content->send(out, channel, framesize); } BasicHeaderProperties* Message::getHeaderProperties(){ @@ -115,10 +118,10 @@ bool Message::isPersistent() return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer) +void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); - decodeContent(buffer); + if (!headersOnly) decodeContent(buffer, contentChunkSize); } void Message::decodeHeader(Buffer& buffer) @@ -132,15 +135,25 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer) +void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) { - AMQContentBody::shared_ptr contentBody; - while (buffer.available()) { - AMQFrame contentFrame; - contentFrame.decode(buffer); - contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody()); + u_int64_t expected = expectedContentSize(); + if (expected != buffer.available()) { + std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + } + + if (!chunkSize || chunkSize > expected) { + chunkSize = expected; + } + + u_int64_t total = 0; + while (total < expectedContentSize()) { + u_int64_t remaining = expected - total; + AMQContentBody::shared_ptr contentBody(new AMQContentBody()); + contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); addContent(contentBody); - } + total += chunkSize; + } } void Message::encode(Buffer& buffer) @@ -159,15 +172,7 @@ void Message::encodeHeader(Buffer& buffer) void Message::encodeContent(Buffer& buffer) { - //Use a frame around each content block. Not really required but - //gives some error checking at little expense. Could change in the - //future... - AMQBody::shared_ptr body; - for (content_iterator i = content.begin(); i != content.end(); i++) { - body = static_pointer_cast<AMQBody, AMQContentBody>(*i); - AMQFrame contentFrame(0, body); - contentFrame.encode(buffer); - } + if (content.get()) content->encode(buffer); } u_int32_t Message::encodedSize() @@ -177,11 +182,7 @@ u_int32_t Message::encodedSize() u_int32_t Message::encodedContentSize() { - int encodedContentSize(0); - for (content_iterator i = content.begin(); i != content.end(); i++) { - encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame - } - return encodedContentSize; + return content.get() ? content->size() : 0; } u_int32_t Message::encodedHeaderSize() @@ -196,7 +197,15 @@ u_int64_t Message::expectedContentSize() return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent() +void Message::releaseContent(MessageStore* store) +{ + if (!content.get() || content->size() > 0) { + //set content to lazy loading mode (but only if there is stored content): + content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize())); + } +} + +void Message::setContent(std::auto_ptr<Content>& _content) { - content.clear(); + content = _content; } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 2c56c845ac..eec929c742 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -21,8 +21,10 @@ #ifndef _Message_ #define _Message_ +#include <memory> #include <boost/shared_ptr.hpp> #include <qpid/broker/ConnectionToken.h> +#include <qpid/broker/Content.h> #include <qpid/broker/TxBuffer.h> #include <qpid/framing/AMQContentBody.h> #include <qpid/framing/AMQHeaderBody.h> @@ -32,6 +34,7 @@ namespace qpid { namespace broker { + class MessageStore; using qpid::framing::string; /** * Represents an AMQP message, i.e. a header body, a list of @@ -39,9 +42,6 @@ namespace qpid { * request. */ class Message{ - typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; - typedef content_list::iterator content_iterator; - const ConnectionToken* const publisher; string exchange; string routingKey; @@ -49,7 +49,7 @@ namespace qpid { const bool immediate; bool redelivered; qpid::framing::AMQHeaderBody::shared_ptr header; - content_list content; + std::auto_ptr<Content> content; u_int64_t size; u_int64_t persistenceId; @@ -62,7 +62,7 @@ namespace qpid { Message(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate); - Message(qpid::framing::Buffer& buffer); + Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); Message(); ~Message(); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); @@ -90,9 +90,9 @@ namespace qpid { u_int64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } - void decode(qpid::framing::Buffer& buffer); + void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); void decodeHeader(qpid::framing::Buffer& buffer); - void decodeContent(qpid::framing::Buffer& buffer); + void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0); void encode(qpid::framing::Buffer& buffer); void encodeHeader(qpid::framing::Buffer& buffer); @@ -114,14 +114,22 @@ namespace qpid { */ u_int32_t encodedContentSize(); /** - * Releases the in-memory content data held by this message. + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. */ - void releaseContent(); + void releaseContent(MessageStore* store); /** * If headers have been received, returns the expected * content size else returns 0. */ u_int64_t expectedContentSize(); + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + void setContent(std::auto_ptr<Content>& content); }; } diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index b4efd3d001..1a58523c08 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -20,25 +20,23 @@ */ #include <qpid/broker/MessageBuilder.h> +#include <qpid/broker/InMemoryContent.h> +#include <qpid/broker/LazyLoadedContent.h> + using namespace qpid::broker; using namespace qpid::framing; +using std::auto_ptr; MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : handler(_handler), store(_store), - stagingThreshold(_stagingThreshold), - staging(false) + stagingThreshold(_stagingThreshold) {} void MessageBuilder::route(){ - if (staging && store) { - store->stage(message); - message->releaseContent(); - } if (message->isComplete()) { if (handler) handler->complete(message); message.reset(); - staging = false; } } @@ -54,7 +52,14 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); } message->setHeader(header); - staging = stagingThreshold && header->getContentSize() >= stagingThreshold; + if (stagingThreshold && header->getContentSize() >= stagingThreshold) { + store->stage(message); + auto_ptr<Content> content(new LazyLoadedContent(store, message->getPersistenceId(), message->expectedContentSize())); + message->setContent(content); + } else { + auto_ptr<Content> content(new InMemoryContent()); + message->setContent(content); + } route(); } diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index a533a4da6f..982601f037 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -21,6 +21,7 @@ #ifndef _MessageBuilder_ #define _MessageBuilder_ +#include <memory> #include <qpid/QpidError.h> #include <qpid/broker/Exchange.h> #include <qpid/broker/Message.h> @@ -47,7 +48,6 @@ namespace qpid { CompletionHandler* handler; MessageStore* const store; const u_int64_t stagingThreshold; - bool staging; void route(); }; diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 322b03e67c..1c5a16c50d 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -51,9 +51,9 @@ namespace qpid { * (enqueueing automatically stores the message so this is * only required if storage is required prior to that * point). If the message has not yet been stored it will - * store the headers and any available content. If the - * message has already been stored it will append any - * currently held content. + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. */ virtual void stage(Message::shared_ptr& msg) = 0; @@ -66,6 +66,21 @@ namespace qpid { virtual void destroy(Message::shared_ptr& msg) = 0; /** + * Appends content to a previously staged message + */ + virtual void appendContent(u_int64_t msgId, const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message id (previously set on the message through a + * call to stage or enqueue) into data. The offset refers + * to the content only (i.e. an offset of 0 implies that + * the start of the content should be loaded, not the + * headers or related meta-data). + */ + virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length) = 0; + + /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 3e58a329de..168cb3d5bb 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -53,6 +53,16 @@ void MessageStoreModule::destroy(Message::shared_ptr& msg) store->destroy(msg); } +void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& data) +{ + store->appendContent(msgId, data); +} + +void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t offset, u_int32_t length) +{ + store->loadContent(msgId, data, offset, length); +} + void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) { store->enqueue(ctxt, msg, queue, xid); diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 0afb7c7186..306e1aa3ea 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -41,6 +41,8 @@ namespace qpid { void recover(RecoveryManager& queues); void stage(Message::shared_ptr& msg); void destroy(Message::shared_ptr& msg); + void appendContent(u_int64_t msgId, const std::string& data); + void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length); void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void committed(const string * const xid); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index ffa444f1a2..5a2837509d 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -34,45 +34,66 @@ void NullMessageStore::create(const Queue& queue) { if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::destroy(const Queue& queue) { if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::recover(RecoveryManager&) { if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; } + void NullMessageStore::stage(Message::shared_ptr&) { if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; } + void NullMessageStore::destroy(Message::shared_ptr&) { if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; } + +void NullMessageStore::appendContent(u_int64_t, const string&) +{ + if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; +} + +void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t) +{ + if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; +} + void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) { if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) { if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::committed(const string * const) { if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; } + void NullMessageStore::aborted(const string * const) { if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; } + std::auto_ptr<TransactionContext> NullMessageStore::begin() { return std::auto_ptr<TransactionContext>(); } + void NullMessageStore::commit(TransactionContext*) { } + void NullMessageStore::abort(TransactionContext*) { } diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 5b363db662..c13a6c9f72 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -35,18 +35,20 @@ namespace qpid { const bool warn; public: NullMessageStore(bool warn = true); - void virtual create(const Queue& queue); - void virtual destroy(const Queue& queue); - void virtual recover(RecoveryManager& queues); - void virtual stage(Message::shared_ptr& msg); - void virtual destroy(Message::shared_ptr& msg); - void virtual enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); - void virtual dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); - void virtual committed(const string * const xid); - void virtual aborted(const string * const xid); + virtual void create(const Queue& queue); + virtual void destroy(const Queue& queue); + virtual void recover(RecoveryManager& queues); + virtual void stage(Message::shared_ptr& msg); + virtual void destroy(Message::shared_ptr& msg); + virtual void appendContent(u_int64_t msgId, const std::string& data); + virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length); + virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void committed(const string * const xid); + virtual void aborted(const string * const xid); virtual std::auto_ptr<TransactionContext> begin(); - void virtual commit(TransactionContext* ctxt); - void virtual abort(TransactionContext* ctxt); + virtual void commit(TransactionContext* ctxt); + virtual void abort(TransactionContext* ctxt); ~NullMessageStore(){} }; } |