summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageBuilder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageBuilder.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp85
1 files changed, 47 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index f19927b708..1a84aa9b65 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -20,55 +20,64 @@
*/
#include "MessageBuilder.h"
-#include "InMemoryContent.h"
-#include "LazyLoadedContent.h"
+#include "Message.h"
+#include "MessageStore.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
-using std::auto_ptr;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler,
- MessageStore* const _store,
- uint64_t _stagingThreshold
-) :
- handler(_handler),
- store(_store),
- stagingThreshold(_stagingThreshold)
-{}
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+ state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
-void MessageBuilder::route(){
- if (message->isComplete()) {
- if (handler) handler->complete(message);
- message.reset();
+void MessageBuilder::handle(AMQFrame& frame)
+{
+ switch(state) {
+ case METHOD:
+ checkType(METHOD_BODY, frame.getBody()->type());
+ state = HEADER;
+ break;
+ case HEADER:
+ checkType(HEADER_BODY, frame.getBody()->type());
+ state = CONTENT;
+ break;
+ case CONTENT:
+ checkType(CONTENT_BODY, frame.getBody()->type());
+ break;
+ default:
+ throw ConnectionException(504, "Invalid frame sequence for message.");
+ }
+ if (staging) {
+ store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
+ } else {
+ message->getFrames().append(frame);
+ //have we reached the staging limit? if so stage message and release content
+ if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
+ store->stage(*message);
+ message->releaseContent(store);
+ staging = true;
+ }
}
}
-void MessageBuilder::initialise(Message::shared_ptr& msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw ConnectionException(504, "Invalid frame sequence for message.");
}
- message = msg;
}
-void MessageBuilder::setHeader(AMQHeaderBody* header){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
- }
- message->setHeader(header);
- if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(*message);
- message->releaseContent(store);
- } else {
- auto_ptr<Content> content(new InMemoryContent());
- message->setContent(content);
- }
- route();
+void MessageBuilder::end()
+{
+ message.reset();
+ state = DORMANT;
+ staging = false;
}
-void MessageBuilder::addContent(AMQContentBody* content){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
- }
- message->addContent(content);
- route();
+void MessageBuilder::start(const SequenceNumber& id)
+{
+ message = Message::shared_ptr(new Message(id));
+ state = METHOD;
+ staging = false;
}