diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-28 15:25:35 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-28 15:25:35 +0000 |
commit | 4ad5fc50f0894e219c37118252d6a618419ea212 (patch) | |
tree | b48ff062cd48b7de6b606330c5cefecf15b7691a /cpp | |
parent | e2665f7339231eb2d85506c86a96f0859016fa89 (diff) | |
download | qpid-python-4ad5fc50f0894e219c37118252d6a618419ea212.tar.gz |
Modifications to allow loading of message data in chunks, refragmentation of messages, plus some related refactoring and tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Content.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/InMemoryContent.cpp | 69 | ||||
-rw-r--r-- | cpp/src/qpid/broker/InMemoryContent.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 91 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 24 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/InMemoryContentTest.cpp | 97 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp | 122 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/MessageBuilderTest.cpp | 18 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/MessageTest.cpp | 11 |
18 files changed, 639 insertions, 86 deletions
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h new file mode 100644 index 0000000000..917222fb5a --- /dev/null +++ b/cpp/src/qpid/broker/Content.h @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Content_ +#define _Content_ + +#include <qpid/framing/AMQContentBody.h> +#include <qpid/framing/Buffer.h> +#include <qpid/framing/OutputHandler.h> + +namespace qpid { + namespace broker { + class Content{ + public: + virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0; + virtual u_int32_t size() = 0; + virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0; + virtual void encode(qpid::framing::Buffer& buffer) = 0; + virtual ~Content(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp new file mode 100644 index 0000000000..fe15def5c8 --- /dev/null +++ b/cpp/src/qpid/broker/InMemoryContent.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/InMemoryContent.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using boost::static_pointer_cast; + +void InMemoryContent::add(AMQContentBody::shared_ptr data) +{ + content.push_back(data); +} + +u_int32_t InMemoryContent::size() +{ + int sum(0); + for (content_iterator i = content.begin(); i != content.end(); i++) { + sum += (*i)->size() + 8;//8 extra bytes for the frame + //TODO: have to get rid of the frame stuff from encoded data + } + return sum; +} + +void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + if ((*i)->size() > framesize) { + u_int32_t offset = 0; + for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { + string data = (*i)->getData().substr(offset, framesize); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + offset += framesize; + } + u_int32_t remainder = (*i)->size() % framesize; + if (remainder) { + string data = (*i)->getData().substr(offset, remainder); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } + } else { + AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); + out->send(new AMQFrame(channel, contentBody)); + } + } +} + +void InMemoryContent::encode(Buffer& buffer) +{ + for (content_iterator i = content.begin(); i != content.end(); i++) { + (*i)->encode(buffer); + } +} diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h new file mode 100644 index 0000000000..5e851722f2 --- /dev/null +++ b/cpp/src/qpid/broker/InMemoryContent.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _InMemoryContent_ +#define _InMemoryContent_ + +#include <qpid/broker/Content.h> +#include <vector> + +namespace qpid { + namespace broker { + class InMemoryContent : public Content{ + typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; + typedef content_list::iterator content_iterator; + + content_list content; + public: + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + ~InMemoryContent(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp new file mode 100644 index 0000000000..eb7536dde3 --- /dev/null +++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LazyLoadedContent.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t _msgId, u_int64_t _expectedSize) : + store(_store), msgId(_msgId), expectedSize(_expectedSize) {} + +void LazyLoadedContent::add(AMQContentBody::shared_ptr data) +{ + store->appendContent(msgId, data->getData()); +} + +u_int32_t LazyLoadedContent::size() +{ + return 0;//all content is written as soon as it is added +} + +void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize) +{ + if (expectedSize > framesize) { + for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) { + u_int64_t remaining = expectedSize - offset; + string data; + store->loadContent(msgId, data, offset, remaining > framesize ? framesize : remaining); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } + } else { + string data; + store->loadContent(msgId, data, 0, expectedSize); + out->send(new AMQFrame(channel, new AMQContentBody(data))); + } +} + +void LazyLoadedContent::encode(Buffer&) +{ + //do nothing as all content is written as soon as it is added +} diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h new file mode 100644 index 0000000000..5a406e3131 --- /dev/null +++ b/cpp/src/qpid/broker/LazyLoadedContent.h @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _LazyLoadedContent_ +#define _LazyLoadedContent_ + +#include <qpid/broker/Content.h> +#include <qpid/broker/MessageStore.h> + +namespace qpid { + namespace broker { + class LazyLoadedContent : public Content{ + MessageStore* const store; + const u_int64_t msgId; + const u_int64_t expectedSize; + public: + LazyLoadedContent(MessageStore* const store, u_int64_t msgId, u_int64_t expectedSize); + void add(qpid::framing::AMQContentBody::shared_ptr data); + u_int32_t size(); + void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize); + void encode(qpid::framing::Buffer& buffer); + ~LazyLoadedContent(){} + }; + } +} + + +#endif diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index b0b5a85031..64e66c4a30 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -20,6 +20,10 @@ */ #include <qpid/broker/Message.h> #include <iostream> + +#include <qpid/broker/InMemoryContent.h> +#include <qpid/broker/LazyLoadedContent.h> +#include <qpid/broker/MessageStore.h> // AMQP version change - kpvdr 2006-11-17 #include <qpid/framing/ProtocolVersion.h> #include <qpid/framing/BasicDeliverBody.h> @@ -40,8 +44,10 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ - decode(buffer); +Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : + publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + + decode(buffer, headersOnly, contentChunkSize); } Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} @@ -53,7 +59,10 @@ void Message::setHeader(AMQHeaderBody::shared_ptr _header){ } void Message::addContent(AMQContentBody::shared_ptr data){ - content.push_back(data); + if (!content.get()) { + content = std::auto_ptr<Content>(new InMemoryContent()); + } + content->add(data); size += data->size(); } @@ -68,8 +77,9 @@ void Message::redeliver(){ void Message::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version + + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey))); sendContent(out, channel, framesize); } @@ -80,8 +90,8 @@ void Message::sendGetOk(OutputHandler* out, u_int64_t deliveryTag, u_int32_t framesize){ - // AMQP version change - kpvdr 2006-11-17 - // TODO: Make this class version-aware and link these hard-wired numbers to that version + // AMQP version change - kpvdr 2006-11-17 + // TODO: Make this class version-aware and link these hard-wired numbers to that version out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount))); sendContent(out, channel, framesize); } @@ -89,15 +99,8 @@ void Message::sendGetOk(OutputHandler* out, void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); - for(content_iterator i = content.begin(); i != content.end(); i++){ - if((*i)->size() > framesize){ - //TODO: need to split it - std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl; - }else{ - AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(channel, contentBody)); - } - } + + if (content.get()) content->send(out, channel, framesize); } BasicHeaderProperties* Message::getHeaderProperties(){ @@ -115,10 +118,10 @@ bool Message::isPersistent() return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer) +void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); - decodeContent(buffer); + if (!headersOnly) decodeContent(buffer, contentChunkSize); } void Message::decodeHeader(Buffer& buffer) @@ -132,15 +135,25 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer) +void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) { - AMQContentBody::shared_ptr contentBody; - while (buffer.available()) { - AMQFrame contentFrame; - contentFrame.decode(buffer); - contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody()); + u_int64_t expected = expectedContentSize(); + if (expected != buffer.available()) { + std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + } + + if (!chunkSize || chunkSize > expected) { + chunkSize = expected; + } + + u_int64_t total = 0; + while (total < expectedContentSize()) { + u_int64_t remaining = expected - total; + AMQContentBody::shared_ptr contentBody(new AMQContentBody()); + contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); addContent(contentBody); - } + total += chunkSize; + } } void Message::encode(Buffer& buffer) @@ -159,15 +172,7 @@ void Message::encodeHeader(Buffer& buffer) void Message::encodeContent(Buffer& buffer) { - //Use a frame around each content block. Not really required but - //gives some error checking at little expense. Could change in the - //future... - AMQBody::shared_ptr body; - for (content_iterator i = content.begin(); i != content.end(); i++) { - body = static_pointer_cast<AMQBody, AMQContentBody>(*i); - AMQFrame contentFrame(0, body); - contentFrame.encode(buffer); - } + if (content.get()) content->encode(buffer); } u_int32_t Message::encodedSize() @@ -177,11 +182,7 @@ u_int32_t Message::encodedSize() u_int32_t Message::encodedContentSize() { - int encodedContentSize(0); - for (content_iterator i = content.begin(); i != content.end(); i++) { - encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame - } - return encodedContentSize; + return content.get() ? content->size() : 0; } u_int32_t Message::encodedHeaderSize() @@ -196,7 +197,15 @@ u_int64_t Message::expectedContentSize() return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent() +void Message::releaseContent(MessageStore* store) +{ + if (!content.get() || content->size() > 0) { + //set content to lazy loading mode (but only if there is stored content): + content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize())); + } +} + +void Message::setContent(std::auto_ptr<Content>& _content) { - content.clear(); + content = _content; } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 2c56c845ac..eec929c742 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -21,8 +21,10 @@ #ifndef _Message_ #define _Message_ +#include <memory> #include <boost/shared_ptr.hpp> #include <qpid/broker/ConnectionToken.h> +#include <qpid/broker/Content.h> #include <qpid/broker/TxBuffer.h> #include <qpid/framing/AMQContentBody.h> #include <qpid/framing/AMQHeaderBody.h> @@ -32,6 +34,7 @@ namespace qpid { namespace broker { + class MessageStore; using qpid::framing::string; /** * Represents an AMQP message, i.e. a header body, a list of @@ -39,9 +42,6 @@ namespace qpid { * request. */ class Message{ - typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; - typedef content_list::iterator content_iterator; - const ConnectionToken* const publisher; string exchange; string routingKey; @@ -49,7 +49,7 @@ namespace qpid { const bool immediate; bool redelivered; qpid::framing::AMQHeaderBody::shared_ptr header; - content_list content; + std::auto_ptr<Content> content; u_int64_t size; u_int64_t persistenceId; @@ -62,7 +62,7 @@ namespace qpid { Message(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate); - Message(qpid::framing::Buffer& buffer); + Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); Message(); ~Message(); void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); @@ -90,9 +90,9 @@ namespace qpid { u_int64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; } - void decode(qpid::framing::Buffer& buffer); + void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0); void decodeHeader(qpid::framing::Buffer& buffer); - void decodeContent(qpid::framing::Buffer& buffer); + void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0); void encode(qpid::framing::Buffer& buffer); void encodeHeader(qpid::framing::Buffer& buffer); @@ -114,14 +114,22 @@ namespace qpid { */ u_int32_t encodedContentSize(); /** - * Releases the in-memory content data held by this message. + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. */ - void releaseContent(); + void releaseContent(MessageStore* store); /** * If headers have been received, returns the expected * content size else returns 0. */ u_int64_t expectedContentSize(); + /** + * Sets the 'content' implementation of this message (the + * message controls the lifecycle of the content instance + * it uses). + */ + void setContent(std::auto_ptr<Content>& content); }; } diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index b4efd3d001..1a58523c08 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -20,25 +20,23 @@ */ #include <qpid/broker/MessageBuilder.h> +#include <qpid/broker/InMemoryContent.h> +#include <qpid/broker/LazyLoadedContent.h> + using namespace qpid::broker; using namespace qpid::framing; +using std::auto_ptr; MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : handler(_handler), store(_store), - stagingThreshold(_stagingThreshold), - staging(false) + stagingThreshold(_stagingThreshold) {} void MessageBuilder::route(){ - if (staging && store) { - store->stage(message); - message->releaseContent(); - } if (message->isComplete()) { if (handler) handler->complete(message); message.reset(); - staging = false; } } @@ -54,7 +52,14 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); } message->setHeader(header); - staging = stagingThreshold && header->getContentSize() >= stagingThreshold; + if (stagingThreshold && header->getContentSize() >= stagingThreshold) { + store->stage(message); + auto_ptr<Content> content(new LazyLoadedContent(store, message->getPersistenceId(), message->expectedContentSize())); + message->setContent(content); + } else { + auto_ptr<Content> content(new InMemoryContent()); + message->setContent(content); + } route(); } diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index a533a4da6f..982601f037 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -21,6 +21,7 @@ #ifndef _MessageBuilder_ #define _MessageBuilder_ +#include <memory> #include <qpid/QpidError.h> #include <qpid/broker/Exchange.h> #include <qpid/broker/Message.h> @@ -47,7 +48,6 @@ namespace qpid { CompletionHandler* handler; MessageStore* const store; const u_int64_t stagingThreshold; - bool staging; void route(); }; diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 322b03e67c..1c5a16c50d 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -51,9 +51,9 @@ namespace qpid { * (enqueueing automatically stores the message so this is * only required if storage is required prior to that * point). If the message has not yet been stored it will - * store the headers and any available content. If the - * message has already been stored it will append any - * currently held content. + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. */ virtual void stage(Message::shared_ptr& msg) = 0; @@ -66,6 +66,21 @@ namespace qpid { virtual void destroy(Message::shared_ptr& msg) = 0; /** + * Appends content to a previously staged message + */ + virtual void appendContent(u_int64_t msgId, const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message id (previously set on the message through a + * call to stage or enqueue) into data. The offset refers + * to the content only (i.e. an offset of 0 implies that + * the start of the content should be loaded, not the + * headers or related meta-data). + */ + virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length) = 0; + + /** * Enqueues a message, storing the message if it has not * been previously stored and recording that the given * message is on the given queue. diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 3e58a329de..168cb3d5bb 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -53,6 +53,16 @@ void MessageStoreModule::destroy(Message::shared_ptr& msg) store->destroy(msg); } +void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& data) +{ + store->appendContent(msgId, data); +} + +void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t offset, u_int32_t length) +{ + store->loadContent(msgId, data, offset, length); +} + void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid) { store->enqueue(ctxt, msg, queue, xid); diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 0afb7c7186..306e1aa3ea 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -41,6 +41,8 @@ namespace qpid { void recover(RecoveryManager& queues); void stage(Message::shared_ptr& msg); void destroy(Message::shared_ptr& msg); + void appendContent(u_int64_t msgId, const std::string& data); + void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length); void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); void committed(const string * const xid); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index ffa444f1a2..5a2837509d 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -34,45 +34,66 @@ void NullMessageStore::create(const Queue& queue) { if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::destroy(const Queue& queue) { if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::recover(RecoveryManager&) { if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; } + void NullMessageStore::stage(Message::shared_ptr&) { if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; } + void NullMessageStore::destroy(Message::shared_ptr&) { if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; } + +void NullMessageStore::appendContent(u_int64_t, const string&) +{ + if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; +} + +void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t) +{ + if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; +} + void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) { if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const) { if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; } + void NullMessageStore::committed(const string * const) { if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; } + void NullMessageStore::aborted(const string * const) { if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; } + std::auto_ptr<TransactionContext> NullMessageStore::begin() { return std::auto_ptr<TransactionContext>(); } + void NullMessageStore::commit(TransactionContext*) { } + void NullMessageStore::abort(TransactionContext*) { } diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 5b363db662..c13a6c9f72 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -35,18 +35,20 @@ namespace qpid { const bool warn; public: NullMessageStore(bool warn = true); - void virtual create(const Queue& queue); - void virtual destroy(const Queue& queue); - void virtual recover(RecoveryManager& queues); - void virtual stage(Message::shared_ptr& msg); - void virtual destroy(Message::shared_ptr& msg); - void virtual enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); - void virtual dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); - void virtual committed(const string * const xid); - void virtual aborted(const string * const xid); + virtual void create(const Queue& queue); + virtual void destroy(const Queue& queue); + virtual void recover(RecoveryManager& queues); + virtual void stage(Message::shared_ptr& msg); + virtual void destroy(Message::shared_ptr& msg); + virtual void appendContent(u_int64_t msgId, const std::string& data); + virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length); + virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid); + virtual void committed(const string * const xid); + virtual void aborted(const string * const xid); virtual std::auto_ptr<TransactionContext> begin(); - void virtual commit(TransactionContext* ctxt); - void virtual abort(TransactionContext* ctxt); + virtual void commit(TransactionContext* ctxt); + virtual void abort(TransactionContext* ctxt); ~NullMessageStore(){} }; } diff --git a/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp b/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp new file mode 100644 index 0000000000..175ef0cf27 --- /dev/null +++ b/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/broker/InMemoryContent.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> + +using std::list; +using std::string; +using boost::dynamic_pointer_cast; +using namespace qpid::broker; +using namespace qpid::framing; + +struct DummyHandler : OutputHandler{ + std::vector<AMQFrame*> frames; + + virtual void send(AMQFrame* frame){ + frames.push_back(frame); + } +}; + +class InMemoryContentTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(InMemoryContentTest); + CPPUNIT_TEST(testRefragmentation); + CPPUNIT_TEST_SUITE_END(); + +public: + void testRefragmentation() + { + {//no remainder + string out[] = {"abcde", "fghij", "klmno", "pqrst"}; + string in[] = {out[0] + out[1], out[2] + out[3]}; + refragment(2, in, 4, out); + } + {//remainder for last frame + string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"}; + string in[] = {out[0] + out[1], out[2] + out[3] + out[4]}; + refragment(2, in, 5, out); + } + } + + + void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5) + { + InMemoryContent content; + DummyHandler handler; + u_int16_t channel = 3; + + addframes(content, inCount, in); + content.send(&handler, channel, framesize); + check(handler, channel, outCount, out); + } + + void addframes(InMemoryContent& content, size_t frameCount, string* frameData) + { + for (unsigned int i = 0; i < frameCount; i++) { + AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i])); + content.add(frame); + } + } + + void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks) + { + CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size()); + + for (unsigned int i = 0; i < expectedChunkCount; i++) { + AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody())); + CPPUNIT_ASSERT(chunk); + CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData()); + CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel()); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest); + diff --git a/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp new file mode 100644 index 0000000000..4d267887ba --- /dev/null +++ b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/broker/LazyLoadedContent.h> +#include <qpid/broker/NullMessageStore.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> +#include <sstream> + +using std::list; +using std::string; +using boost::dynamic_pointer_cast; +using namespace qpid::broker; +using namespace qpid::framing; + +struct DummyHandler : OutputHandler{ + std::vector<AMQFrame*> frames; + + virtual void send(AMQFrame* frame){ + frames.push_back(frame); + } +}; + + +class LazyLoadedContentTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(LazyLoadedContentTest); + CPPUNIT_TEST(testFragmented); + CPPUNIT_TEST(testWhole); + CPPUNIT_TEST(testHalved); + CPPUNIT_TEST_SUITE_END(); + + class TestMessageStore : public NullMessageStore + { + const string content; + + public: + TestMessageStore(const string& _content) : content(_content) {} + + void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t length) + { + if (offset + length <= content.size()) { + data = content.substr(offset, length); + } else{ + std::stringstream error; + error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size(); + throw qpid::Exception(error.str()); + } + } + }; + + +public: + void testFragmented() + { + string data = "abcdefghijklmnopqrstuvwxyz"; + u_int32_t framesize = 5; + string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"}; + load(data, 6, out, framesize); + } + + void testWhole() + { + string data = "abcdefghijklmnopqrstuvwxyz"; + u_int32_t framesize = 50; + string out[] = {data}; + load(data, 1, out, framesize); + } + + void testHalved() + { + string data = "abcdefghijklmnopqrstuvwxyz"; + u_int32_t framesize = 13; + string out[] = {"abcdefghijklm", "nopqrstuvwxyz"}; + load(data, 2, out, framesize); + } + + void load(string& in, size_t outCount, string* out, u_int32_t framesize) + { + TestMessageStore store(in); + LazyLoadedContent content(&store, 1, in.size()); + DummyHandler handler; + u_int16_t channel = 3; + content.send(&handler, channel, framesize); + check(handler, channel, outCount, out); + } + + void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks) + { + CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size()); + + for (unsigned int i = 0; i < expectedChunkCount; i++) { + AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody())); + CPPUNIT_ASSERT(chunk); + CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData()); + CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel()); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest); + diff --git a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp index a5f7911fc8..fa80f8f939 100644 --- a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp +++ b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp @@ -56,13 +56,19 @@ class MessageBuilderTest : public CppUnit::TestCase header = new Buffer(msg->encodedHeaderSize()); msg->encodeHeader(*header); content = new Buffer(contentBufferSize); - msg->encodeContent(*content); - } else if (!header || !content) { - throw qpid::Exception("Buffers not initialised!"); + msg->setPersistenceId(1); } else { - msg->encodeContent(*content); + throw qpid::Exception("Message already staged!"); + } + } + + void appendContent(u_int64_t msgId, const string& data) + { + if (msgId == 1) { + content->putRawData(data); + } else { + throw qpid::Exception("Invalid message id!"); } - msg->setPersistenceId(1); } Message::shared_ptr getRestoredMessage() @@ -159,7 +165,7 @@ class MessageBuilderTest : public CppUnit::TestCase void testStaging(){ DummyHandler handler; - TestMessageStore store(50);//more than enough for two frames of 14 bytes + TestMessageStore store(14); MessageBuilder builder(&handler, &store, 5); string data1("abcdefg"); diff --git a/cpp/test/unit/qpid/broker/MessageTest.cpp b/cpp/test/unit/qpid/broker/MessageTest.cpp index ec724894a5..b497588c6c 100644 --- a/cpp/test/unit/qpid/broker/MessageTest.cpp +++ b/cpp/test/unit/qpid/broker/MessageTest.cpp @@ -77,13 +77,10 @@ class MessageTest : public CppUnit::TestCase DummyHandler handler; msg->deliver(&handler, 0, "ignore", 0, 100); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - AMQContentBody::shared_ptr contentBody1(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - AMQContentBody::shared_ptr contentBody2(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3]->getBody())); - CPPUNIT_ASSERT(contentBody1); - CPPUNIT_ASSERT(contentBody2); - CPPUNIT_ASSERT_EQUAL(data1, contentBody1->getData()); - CPPUNIT_ASSERT_EQUAL(data2, contentBody2->getData()); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); } }; |