summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-28 15:25:35 +0000
committerGordon Sim <gsim@apache.org>2006-11-28 15:25:35 +0000
commit4ad5fc50f0894e219c37118252d6a618419ea212 (patch)
treeb48ff062cd48b7de6b606330c5cefecf15b7691a /cpp
parente2665f7339231eb2d85506c86a96f0859016fa89 (diff)
downloadqpid-python-4ad5fc50f0894e219c37118252d6a618419ea212.tar.gz
Modifications to allow loading of message data in chunks, refragmentation of messages, plus some related refactoring and tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Content.h42
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp69
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.h45
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp58
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.h45
-rw-r--r--cpp/src/qpid/broker/Message.cpp91
-rw-r--r--cpp/src/qpid/broker/Message.h26
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp21
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h2
-rw-r--r--cpp/src/qpid/broker/MessageStore.h21
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp10
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp21
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h24
-rw-r--r--cpp/test/unit/qpid/broker/InMemoryContentTest.cpp97
-rw-r--r--cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp122
-rw-r--r--cpp/test/unit/qpid/broker/MessageBuilderTest.cpp18
-rw-r--r--cpp/test/unit/qpid/broker/MessageTest.cpp11
18 files changed, 639 insertions, 86 deletions
diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h
new file mode 100644
index 0000000000..917222fb5a
--- /dev/null
+++ b/cpp/src/qpid/broker/Content.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Content_
+#define _Content_
+
+#include <qpid/framing/AMQContentBody.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/OutputHandler.h>
+
+namespace qpid {
+ namespace broker {
+ class Content{
+ public:
+ virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
+ virtual u_int32_t size() = 0;
+ virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
+ virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual ~Content(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
new file mode 100644
index 0000000000..fe15def5c8
--- /dev/null
+++ b/cpp/src/qpid/broker/InMemoryContent.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/InMemoryContent.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using boost::static_pointer_cast;
+
+void InMemoryContent::add(AMQContentBody::shared_ptr data)
+{
+ content.push_back(data);
+}
+
+u_int32_t InMemoryContent::size()
+{
+ int sum(0);
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ sum += (*i)->size() + 8;//8 extra bytes for the frame
+ //TODO: have to get rid of the frame stuff from encoded data
+ }
+ return sum;
+}
+
+void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ if ((*i)->size() > framesize) {
+ u_int32_t offset = 0;
+ for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
+ string data = (*i)->getData().substr(offset, framesize);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ offset += framesize;
+ }
+ u_int32_t remainder = (*i)->size() % framesize;
+ if (remainder) {
+ string data = (*i)->getData().substr(offset, remainder);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ }
+ } else {
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ out->send(new AMQFrame(channel, contentBody));
+ }
+ }
+}
+
+void InMemoryContent::encode(Buffer& buffer)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ (*i)->encode(buffer);
+ }
+}
diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h
new file mode 100644
index 0000000000..5e851722f2
--- /dev/null
+++ b/cpp/src/qpid/broker/InMemoryContent.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _InMemoryContent_
+#define _InMemoryContent_
+
+#include <qpid/broker/Content.h>
+#include <vector>
+
+namespace qpid {
+ namespace broker {
+ class InMemoryContent : public Content{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef content_list::iterator content_iterator;
+
+ content_list content;
+ public:
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ u_int32_t size();
+ void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ ~InMemoryContent(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
new file mode 100644
index 0000000000..eb7536dde3
--- /dev/null
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/LazyLoadedContent.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t _msgId, u_int64_t _expectedSize) :
+ store(_store), msgId(_msgId), expectedSize(_expectedSize) {}
+
+void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+{
+ store->appendContent(msgId, data->getData());
+}
+
+u_int32_t LazyLoadedContent::size()
+{
+ return 0;//all content is written as soon as it is added
+}
+
+void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize)
+{
+ if (expectedSize > framesize) {
+ for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {
+ u_int64_t remaining = expectedSize - offset;
+ string data;
+ store->loadContent(msgId, data, offset, remaining > framesize ? framesize : remaining);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ }
+ } else {
+ string data;
+ store->loadContent(msgId, data, 0, expectedSize);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ }
+}
+
+void LazyLoadedContent::encode(Buffer&)
+{
+ //do nothing as all content is written as soon as it is added
+}
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h
new file mode 100644
index 0000000000..5a406e3131
--- /dev/null
+++ b/cpp/src/qpid/broker/LazyLoadedContent.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LazyLoadedContent_
+#define _LazyLoadedContent_
+
+#include <qpid/broker/Content.h>
+#include <qpid/broker/MessageStore.h>
+
+namespace qpid {
+ namespace broker {
+ class LazyLoadedContent : public Content{
+ MessageStore* const store;
+ const u_int64_t msgId;
+ const u_int64_t expectedSize;
+ public:
+ LazyLoadedContent(MessageStore* const store, u_int64_t msgId, u_int64_t expectedSize);
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ u_int32_t size();
+ void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ ~LazyLoadedContent(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index b0b5a85031..64e66c4a30 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -20,6 +20,10 @@
*/
#include <qpid/broker/Message.h>
#include <iostream>
+
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/MessageStore.h>
// AMQP version change - kpvdr 2006-11-17
#include <qpid/framing/ProtocolVersion.h>
#include <qpid/framing/BasicDeliverBody.h>
@@ -40,8 +44,10 @@ Message::Message(const ConnectionToken* const _publisher,
size(0),
persistenceId(0) {}
-Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
- decode(buffer);
+Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+ publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+
+ decode(buffer, headersOnly, contentChunkSize);
}
Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
@@ -53,7 +59,10 @@ void Message::setHeader(AMQHeaderBody::shared_ptr _header){
}
void Message::addContent(AMQContentBody::shared_ptr data){
- content.push_back(data);
+ if (!content.get()) {
+ content = std::auto_ptr<Content>(new InMemoryContent());
+ }
+ content->add(data);
size += data->size();
}
@@ -68,8 +77,9 @@ void Message::redeliver(){
void Message::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
+
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -80,8 +90,8 @@ void Message::sendGetOk(OutputHandler* out,
u_int64_t deliveryTag,
u_int32_t framesize){
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired numbers to that version
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount)));
sendContent(out, channel, framesize);
}
@@ -89,15 +99,8 @@ void Message::sendGetOk(OutputHandler* out,
void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
- for(content_iterator i = content.begin(); i != content.end(); i++){
- if((*i)->size() > framesize){
- //TODO: need to split it
- std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
- }else{
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(channel, contentBody));
- }
- }
+
+ if (content.get()) content->send(out, channel, framesize);
}
BasicHeaderProperties* Message::getHeaderProperties(){
@@ -115,10 +118,10 @@ bool Message::isPersistent()
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::decode(Buffer& buffer)
+void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
{
decodeHeader(buffer);
- decodeContent(buffer);
+ if (!headersOnly) decodeContent(buffer, contentChunkSize);
}
void Message::decodeHeader(Buffer& buffer)
@@ -132,15 +135,25 @@ void Message::decodeHeader(Buffer& buffer)
setHeader(headerBody);
}
-void Message::decodeContent(Buffer& buffer)
+void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
- AMQContentBody::shared_ptr contentBody;
- while (buffer.available()) {
- AMQFrame contentFrame;
- contentFrame.decode(buffer);
- contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
+ u_int64_t expected = expectedContentSize();
+ if (expected != buffer.available()) {
+ std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+ }
+
+ if (!chunkSize || chunkSize > expected) {
+ chunkSize = expected;
+ }
+
+ u_int64_t total = 0;
+ while (total < expectedContentSize()) {
+ u_int64_t remaining = expected - total;
+ AMQContentBody::shared_ptr contentBody(new AMQContentBody());
+ contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
addContent(contentBody);
- }
+ total += chunkSize;
+ }
}
void Message::encode(Buffer& buffer)
@@ -159,15 +172,7 @@ void Message::encodeHeader(Buffer& buffer)
void Message::encodeContent(Buffer& buffer)
{
- //Use a frame around each content block. Not really required but
- //gives some error checking at little expense. Could change in the
- //future...
- AMQBody::shared_ptr body;
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- AMQFrame contentFrame(0, body);
- contentFrame.encode(buffer);
- }
+ if (content.get()) content->encode(buffer);
}
u_int32_t Message::encodedSize()
@@ -177,11 +182,7 @@ u_int32_t Message::encodedSize()
u_int32_t Message::encodedContentSize()
{
- int encodedContentSize(0);
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
- }
- return encodedContentSize;
+ return content.get() ? content->size() : 0;
}
u_int32_t Message::encodedHeaderSize()
@@ -196,7 +197,15 @@ u_int64_t Message::expectedContentSize()
return header.get() ? header->getContentSize() : 0;
}
-void Message::releaseContent()
+void Message::releaseContent(MessageStore* store)
+{
+ if (!content.get() || content->size() > 0) {
+ //set content to lazy loading mode (but only if there is stored content):
+ content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize()));
+ }
+}
+
+void Message::setContent(std::auto_ptr<Content>& _content)
{
- content.clear();
+ content = _content;
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 2c56c845ac..eec929c742 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -21,8 +21,10 @@
#ifndef _Message_
#define _Message_
+#include <memory>
#include <boost/shared_ptr.hpp>
#include <qpid/broker/ConnectionToken.h>
+#include <qpid/broker/Content.h>
#include <qpid/broker/TxBuffer.h>
#include <qpid/framing/AMQContentBody.h>
#include <qpid/framing/AMQHeaderBody.h>
@@ -32,6 +34,7 @@
namespace qpid {
namespace broker {
+ class MessageStore;
using qpid::framing::string;
/**
* Represents an AMQP message, i.e. a header body, a list of
@@ -39,9 +42,6 @@ namespace qpid {
* request.
*/
class Message{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
- typedef content_list::iterator content_iterator;
-
const ConnectionToken* const publisher;
string exchange;
string routingKey;
@@ -49,7 +49,7 @@ namespace qpid {
const bool immediate;
bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
- content_list content;
+ std::auto_ptr<Content> content;
u_int64_t size;
u_int64_t persistenceId;
@@ -62,7 +62,7 @@ namespace qpid {
Message(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
- Message(qpid::framing::Buffer& buffer);
+ Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
Message();
~Message();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
@@ -90,9 +90,9 @@ namespace qpid {
u_int64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
- void decode(qpid::framing::Buffer& buffer);
+ void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
void decodeHeader(qpid::framing::Buffer& buffer);
- void decodeContent(qpid::framing::Buffer& buffer);
+ void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
void encode(qpid::framing::Buffer& buffer);
void encodeHeader(qpid::framing::Buffer& buffer);
@@ -114,14 +114,22 @@ namespace qpid {
*/
u_int32_t encodedContentSize();
/**
- * Releases the in-memory content data held by this message.
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
*/
- void releaseContent();
+ void releaseContent(MessageStore* store);
/**
* If headers have been received, returns the expected
* content size else returns 0.
*/
u_int64_t expectedContentSize();
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ void setContent(std::auto_ptr<Content>& content);
};
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index b4efd3d001..1a58523c08 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -20,25 +20,23 @@
*/
#include <qpid/broker/MessageBuilder.h>
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+
using namespace qpid::broker;
using namespace qpid::framing;
+using std::auto_ptr;
MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :
handler(_handler),
store(_store),
- stagingThreshold(_stagingThreshold),
- staging(false)
+ stagingThreshold(_stagingThreshold)
{}
void MessageBuilder::route(){
- if (staging && store) {
- store->stage(message);
- message->releaseContent();
- }
if (message->isComplete()) {
if (handler) handler->complete(message);
message.reset();
- staging = false;
}
}
@@ -54,7 +52,14 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
}
message->setHeader(header);
- staging = stagingThreshold && header->getContentSize() >= stagingThreshold;
+ if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
+ store->stage(message);
+ auto_ptr<Content> content(new LazyLoadedContent(store, message->getPersistenceId(), message->expectedContentSize()));
+ message->setContent(content);
+ } else {
+ auto_ptr<Content> content(new InMemoryContent());
+ message->setContent(content);
+ }
route();
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index a533a4da6f..982601f037 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -21,6 +21,7 @@
#ifndef _MessageBuilder_
#define _MessageBuilder_
+#include <memory>
#include <qpid/QpidError.h>
#include <qpid/broker/Exchange.h>
#include <qpid/broker/Message.h>
@@ -47,7 +48,6 @@ namespace qpid {
CompletionHandler* handler;
MessageStore* const store;
const u_int64_t stagingThreshold;
- bool staging;
void route();
};
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index 322b03e67c..1c5a16c50d 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -51,9 +51,9 @@ namespace qpid {
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
* point). If the message has not yet been stored it will
- * store the headers and any available content. If the
- * message has already been stored it will append any
- * currently held content.
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
*/
virtual void stage(Message::shared_ptr& msg) = 0;
@@ -66,6 +66,21 @@ namespace qpid {
virtual void destroy(Message::shared_ptr& msg) = 0;
/**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(u_int64_t msgId, const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message id (previously set on the message through a
+ * call to stage or enqueue) into data. The offset refers
+ * to the content only (i.e. an offset of 0 implies that
+ * the start of the content should be loaded, not the
+ * headers or related meta-data).
+ */
+ virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length) = 0;
+
+ /**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 3e58a329de..168cb3d5bb 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -53,6 +53,16 @@ void MessageStoreModule::destroy(Message::shared_ptr& msg)
store->destroy(msg);
}
+void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& data)
+{
+ store->appendContent(msgId, data);
+}
+
+void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t offset, u_int32_t length)
+{
+ store->loadContent(msgId, data, offset, length);
+}
+
void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
{
store->enqueue(ctxt, msg, queue, xid);
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 0afb7c7186..306e1aa3ea 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -41,6 +41,8 @@ namespace qpid {
void recover(RecoveryManager& queues);
void stage(Message::shared_ptr& msg);
void destroy(Message::shared_ptr& msg);
+ void appendContent(u_int64_t msgId, const std::string& data);
+ void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
void committed(const string * const xid);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index ffa444f1a2..5a2837509d 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -34,45 +34,66 @@ void NullMessageStore::create(const Queue& queue)
{
if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::destroy(const Queue& queue)
{
if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::recover(RecoveryManager&)
{
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
}
+
void NullMessageStore::stage(Message::shared_ptr&)
{
if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::destroy(Message::shared_ptr&)
{
if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
}
+
+void NullMessageStore::appendContent(u_int64_t, const string&)
+{
+ if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t)
+{
+ if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
+}
+
void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::committed(const string * const)
{
if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
}
+
void NullMessageStore::aborted(const string * const)
{
if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
}
+
std::auto_ptr<TransactionContext> NullMessageStore::begin()
{
return std::auto_ptr<TransactionContext>();
}
+
void NullMessageStore::commit(TransactionContext*)
{
}
+
void NullMessageStore::abort(TransactionContext*)
{
}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 5b363db662..c13a6c9f72 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -35,18 +35,20 @@ namespace qpid {
const bool warn;
public:
NullMessageStore(bool warn = true);
- void virtual create(const Queue& queue);
- void virtual destroy(const Queue& queue);
- void virtual recover(RecoveryManager& queues);
- void virtual stage(Message::shared_ptr& msg);
- void virtual destroy(Message::shared_ptr& msg);
- void virtual enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void virtual dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void virtual committed(const string * const xid);
- void virtual aborted(const string * const xid);
+ virtual void create(const Queue& queue);
+ virtual void destroy(const Queue& queue);
+ virtual void recover(RecoveryManager& queues);
+ virtual void stage(Message::shared_ptr& msg);
+ virtual void destroy(Message::shared_ptr& msg);
+ virtual void appendContent(u_int64_t msgId, const std::string& data);
+ virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length);
+ virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void committed(const string * const xid);
+ virtual void aborted(const string * const xid);
virtual std::auto_ptr<TransactionContext> begin();
- void virtual commit(TransactionContext* ctxt);
- void virtual abort(TransactionContext* ctxt);
+ virtual void commit(TransactionContext* ctxt);
+ virtual void abort(TransactionContext* ctxt);
~NullMessageStore(){}
};
}
diff --git a/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp b/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
new file mode 100644
index 0000000000..175ef0cf27
--- /dev/null
+++ b/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+
+using std::list;
+using std::string;
+using boost::dynamic_pointer_cast;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
+ }
+};
+
+class InMemoryContentTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(InMemoryContentTest);
+ CPPUNIT_TEST(testRefragmentation);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ void testRefragmentation()
+ {
+ {//no remainder
+ string out[] = {"abcde", "fghij", "klmno", "pqrst"};
+ string in[] = {out[0] + out[1], out[2] + out[3]};
+ refragment(2, in, 4, out);
+ }
+ {//remainder for last frame
+ string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"};
+ string in[] = {out[0] + out[1], out[2] + out[3] + out[4]};
+ refragment(2, in, 5, out);
+ }
+ }
+
+
+ void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)
+ {
+ InMemoryContent content;
+ DummyHandler handler;
+ u_int16_t channel = 3;
+
+ addframes(content, inCount, in);
+ content.send(&handler, channel, framesize);
+ check(handler, channel, outCount, out);
+ }
+
+ void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
+ {
+ for (unsigned int i = 0; i < frameCount; i++) {
+ AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i]));
+ content.add(frame);
+ }
+ }
+
+ void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
+ {
+ CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+
+ for (unsigned int i = 0; i < expectedChunkCount; i++) {
+ AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
+ CPPUNIT_ASSERT(chunk);
+ CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest);
+
diff --git a/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
new file mode 100644
index 0000000000..4d267887ba
--- /dev/null
+++ b/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/NullMessageStore.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <sstream>
+
+using std::list;
+using std::string;
+using boost::dynamic_pointer_cast;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
+ }
+};
+
+
+class LazyLoadedContentTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(LazyLoadedContentTest);
+ CPPUNIT_TEST(testFragmented);
+ CPPUNIT_TEST(testWhole);
+ CPPUNIT_TEST(testHalved);
+ CPPUNIT_TEST_SUITE_END();
+
+ class TestMessageStore : public NullMessageStore
+ {
+ const string content;
+
+ public:
+ TestMessageStore(const string& _content) : content(_content) {}
+
+ void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t length)
+ {
+ if (offset + length <= content.size()) {
+ data = content.substr(offset, length);
+ } else{
+ std::stringstream error;
+ error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size();
+ throw qpid::Exception(error.str());
+ }
+ }
+ };
+
+
+public:
+ void testFragmented()
+ {
+ string data = "abcdefghijklmnopqrstuvwxyz";
+ u_int32_t framesize = 5;
+ string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"};
+ load(data, 6, out, framesize);
+ }
+
+ void testWhole()
+ {
+ string data = "abcdefghijklmnopqrstuvwxyz";
+ u_int32_t framesize = 50;
+ string out[] = {data};
+ load(data, 1, out, framesize);
+ }
+
+ void testHalved()
+ {
+ string data = "abcdefghijklmnopqrstuvwxyz";
+ u_int32_t framesize = 13;
+ string out[] = {"abcdefghijklm", "nopqrstuvwxyz"};
+ load(data, 2, out, framesize);
+ }
+
+ void load(string& in, size_t outCount, string* out, u_int32_t framesize)
+ {
+ TestMessageStore store(in);
+ LazyLoadedContent content(&store, 1, in.size());
+ DummyHandler handler;
+ u_int16_t channel = 3;
+ content.send(&handler, channel, framesize);
+ check(handler, channel, outCount, out);
+ }
+
+ void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
+ {
+ CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+
+ for (unsigned int i = 0; i < expectedChunkCount; i++) {
+ AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
+ CPPUNIT_ASSERT(chunk);
+ CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest);
+
diff --git a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
index a5f7911fc8..fa80f8f939 100644
--- a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
+++ b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
@@ -56,13 +56,19 @@ class MessageBuilderTest : public CppUnit::TestCase
header = new Buffer(msg->encodedHeaderSize());
msg->encodeHeader(*header);
content = new Buffer(contentBufferSize);
- msg->encodeContent(*content);
- } else if (!header || !content) {
- throw qpid::Exception("Buffers not initialised!");
+ msg->setPersistenceId(1);
} else {
- msg->encodeContent(*content);
+ throw qpid::Exception("Message already staged!");
+ }
+ }
+
+ void appendContent(u_int64_t msgId, const string& data)
+ {
+ if (msgId == 1) {
+ content->putRawData(data);
+ } else {
+ throw qpid::Exception("Invalid message id!");
}
- msg->setPersistenceId(1);
}
Message::shared_ptr getRestoredMessage()
@@ -159,7 +165,7 @@ class MessageBuilderTest : public CppUnit::TestCase
void testStaging(){
DummyHandler handler;
- TestMessageStore store(50);//more than enough for two frames of 14 bytes
+ TestMessageStore store(14);
MessageBuilder builder(&handler, &store, 5);
string data1("abcdefg");
diff --git a/cpp/test/unit/qpid/broker/MessageTest.cpp b/cpp/test/unit/qpid/broker/MessageTest.cpp
index ec724894a5..b497588c6c 100644
--- a/cpp/test/unit/qpid/broker/MessageTest.cpp
+++ b/cpp/test/unit/qpid/broker/MessageTest.cpp
@@ -77,13 +77,10 @@ class MessageTest : public CppUnit::TestCase
DummyHandler handler;
msg->deliver(&handler, 0, "ignore", 0, 100);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- AMQContentBody::shared_ptr contentBody1(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- AMQContentBody::shared_ptr contentBody2(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3]->getBody()));
- CPPUNIT_ASSERT(contentBody1);
- CPPUNIT_ASSERT(contentBody2);
- CPPUNIT_ASSERT_EQUAL(data1, contentBody1->getData());
- CPPUNIT_ASSERT_EQUAL(data2, contentBody2->getData());
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
}
};