summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageBuilder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageBuilder.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp38
1 files changed, 23 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index eda71ed3da..b1a2b77b05 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,10 +18,11 @@
* under the License.
*
*/
-#include "MessageBuilder.h"
+#include "qpid/broker/MessageBuilder.h"
-#include "Message.h"
-#include "MessageStore.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/NullMessageStore.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/reply_exceptions.h"
@@ -29,11 +30,13 @@ using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-namespace
+namespace
{
std::string type_str(uint8_t type);
+ const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
void MessageBuilder::handle(AMQFrame& frame)
@@ -48,14 +51,13 @@ void MessageBuilder::handle(AMQFrame& frame)
if (type == CONTENT_BODY) {
//TODO: rethink how to handle non-existent headers(?)...
//didn't get a header: add in a dummy
- AMQFrame header;
- header.setBody(AMQHeaderBody());
+ AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
- message->getFrames().append(header);
+ message->getFrames().append(header);
} else if (type != HEADER_BODY) {
throw CommandInvalidException(
- QPID_MSG("Invalid frame sequence for message, expected header or content got "
+ QPID_MSG("Invalid frame sequence for message, expected header or content got "
<< type_str(type) << ")"));
}
state = CONTENT;
@@ -72,8 +74,13 @@ void MessageBuilder::handle(AMQFrame& frame)
} else {
message->getFrames().append(frame);
//have we reached the staging limit? if so stage message and release content
- if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
- message->releaseContent(store);
+ if (state == CONTENT
+ && stagingThreshold
+ && message->getFrames().getContentSize() >= stagingThreshold
+ && !NullMessageStore::isNullStore(store)
+ && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
+ {
+ message->releaseContent();
staging = true;
}
}
@@ -89,6 +96,7 @@ void MessageBuilder::end()
void MessageBuilder::start(const SequenceNumber& id)
{
message = intrusive_ptr<Message>(new Message(id));
+ message->setStore(store);
state = METHOD;
staging = false;
}
@@ -101,7 +109,7 @@ const std::string CONTENT_BODY_S = "CONTENT";
const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
const std::string UNKNOWN = "unknown";
-std::string type_str(uint8_t type)
+std::string type_str(uint8_t type)
{
switch(type) {
case METHOD_BODY: return METHOD_BODY_S;
@@ -117,7 +125,7 @@ std::string type_str(uint8_t type)
void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
{
if (expected != actual) {
- throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
<< type_str(expected) << " got " << type_str(actual) << ")"));
}
}