summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-06-04 15:46:00 +0000
committerGordon Sim <gsim@apache.org>2008-06-04 15:46:00 +0000
commit41a04a45030b3d3663d3dea918b94106c4d83db3 (patch)
treeab0d8625205140e73b312c158d8cc707db29bcc6 /qpid/cpp/src
parent38dff548132dd614a2eff96903c6f57ae09d30b6 (diff)
downloadqpid-python-41a04a45030b3d3663d3dea918b94106c4d83db3.tar.gz
Change to lazy-loading to avoid relying on the content-size to be set by client.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663243 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp3
3 files changed, 22 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 1b1cec9f85..331bb5e716 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -41,13 +41,6 @@ Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redel
Message::~Message()
{
- if (staged) {
- if (store) {
- store->destroy(*this);
- } else {
- QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
- }
- }
}
std::string Message::getRoutingKey() const
@@ -178,32 +171,43 @@ void Message::releaseContent(MessageStore* _store)
}
}
+void Message::destroy()
+{
+ if (staged) {
+ if (store) {
+ store->destroy(*this);
+ } else {
+ QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+ }
+ }
+}
+
void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
if (isContentReleased()) {
//load content from store in chunks of maxContentSize
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- uint64_t expectedSize(frames.getHeaders()->getContentLength());
intrusive_ptr<const PersistableMessage> pmsg(this);
- for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
+
+ bool done = false;
+ for (uint64_t offset = 0; !done; offset += maxContentSize)
{
- uint64_t remaining = expectedSize - offset;
AMQFrame frame(in_place<AMQContentBody>());
string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset,
- remaining > maxContentSize ? maxContentSize : remaining);
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
frame.setBof(false);
frame.setEof(true);
if (offset > 0) {
frame.setBos(false);
}
- if (remaining > maxContentSize) {
+ if (!done) {
frame.setEos(false);
}
+ QPID_LOG(debug, "loaded frame for delivery: " << frame);
out.handle(frame);
}
-
} else {
Count c;
frames.map_if(c, TypeFilter<CONTENT_BODY>());
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index ab13529adf..0a95fedea6 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -119,6 +119,7 @@ public:
* be reloaded.
*/
void releaseContent(MessageStore* store);
+ void destroy();
void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index b3167d0377..ad617c1bc1 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -370,6 +370,9 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
}
+ if (!strategy.delivered) {
+ msg->destroy();
+ }
}
}