diff options
author | Gordon Sim <gsim@apache.org> | 2008-06-04 15:46:00 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-06-04 15:46:00 +0000 |
commit | 41a04a45030b3d3663d3dea918b94106c4d83db3 (patch) | |
tree | ab0d8625205140e73b312c158d8cc707db29bcc6 /qpid/cpp/src | |
parent | 38dff548132dd614a2eff96903c6f57ae09d30b6 (diff) | |
download | qpid-python-41a04a45030b3d3663d3dea918b94106c4d83db3.tar.gz |
Change to lazy-loading to avoid relying on the content-size to be set by client.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663243 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 3 |
3 files changed, 22 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 1b1cec9f85..331bb5e716 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -41,13 +41,6 @@ Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redel Message::~Message() { - if (staged) { - if (store) { - store->destroy(*this); - } else { - QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); - } - } } std::string Message::getRoutingKey() const @@ -178,32 +171,43 @@ void Message::releaseContent(MessageStore* _store) } } +void Message::destroy() +{ + if (staged) { + if (store) { + store->destroy(*this); + } else { + QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); + } + } +} + void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { if (isContentReleased()) { //load content from store in chunks of maxContentSize uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - uint64_t expectedSize(frames.getHeaders()->getContentLength()); intrusive_ptr<const PersistableMessage> pmsg(this); - for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) + + bool done = false; + for (uint64_t offset = 0; !done; offset += maxContentSize) { - uint64_t remaining = expectedSize - offset; AMQFrame frame(in_place<AMQContentBody>()); string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, - remaining > maxContentSize ? maxContentSize : remaining); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; frame.setBof(false); frame.setEof(true); if (offset > 0) { frame.setBos(false); } - if (remaining > maxContentSize) { + if (!done) { frame.setEos(false); } + QPID_LOG(debug, "loaded frame for delivery: " << frame); out.handle(frame); } - } else { Count c; frames.map_if(c, TypeFilter<CONTENT_BODY>()); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index ab13529adf..0a95fedea6 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -119,6 +119,7 @@ public: * be reloaded. */ void releaseContent(MessageStore* store); + void destroy(); void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index b3167d0377..ad617c1bc1 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -370,6 +370,9 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); } + if (!strategy.delivered) { + msg->destroy(); + } } } |