summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageBuilder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageBuilder.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp17
1 files changed, 14 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index a04c6def41..b4efd3d001 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -23,12 +23,22 @@
using namespace qpid::broker;
using namespace qpid::framing;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler) : handler(_handler) {}
+MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :
+ handler(_handler),
+ store(_store),
+ stagingThreshold(_stagingThreshold),
+ staging(false)
+{}
void MessageBuilder::route(){
- if(message->isComplete()){
- if(handler) handler->complete(message);
+ if (staging && store) {
+ store->stage(message);
+ message->releaseContent();
+ }
+ if (message->isComplete()) {
+ if (handler) handler->complete(message);
message.reset();
+ staging = false;
}
}
@@ -44,6 +54,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
}
message->setHeader(header);
+ staging = stagingThreshold && header->getContentSize() >= stagingThreshold;
route();
}