summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageBuilder.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp114
1 files changed, 114 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
new file mode 100644
index 0000000000..a6d605c296
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageBuilder.h"
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/reply_exceptions.h"
+
+using boost::intrusive_ptr;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace
+{
+ std::string type_str(uint8_t type);
+ const std::string QPID_MANAGEMENT("qpid.management");
+}
+
+MessageBuilder::MessageBuilder(MessageStore* const _store) :
+ state(DORMANT), store(_store) {}
+
+void MessageBuilder::handle(AMQFrame& frame)
+{
+ uint8_t type = frame.getBody()->type();
+ switch(state) {
+ case METHOD:
+ checkType(METHOD_BODY, type);
+ state = HEADER;
+ break;
+ case HEADER:
+ if (type == CONTENT_BODY) {
+ //TODO: rethink how to handle non-existent headers(?)...
+ //didn't get a header: add in a dummy
+ AMQFrame header((AMQHeaderBody()));
+ header.setBof(false);
+ header.setEof(false);
+ message->getFrames().append(header);
+ } else if (type != HEADER_BODY) {
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame sequence for message, expected header or content got "
+ << type_str(type) << ")"));
+ }
+ state = CONTENT;
+ break;
+ case CONTENT:
+ checkType(CONTENT_BODY, type);
+ break;
+ default:
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
+ }
+ message->getFrames().append(frame);
+}
+
+void MessageBuilder::end()
+{
+ message = 0;
+ state = DORMANT;
+}
+
+void MessageBuilder::start(const SequenceNumber& id)
+{
+ message = intrusive_ptr<Message>(new Message(id));
+ message->setStore(store);
+ state = METHOD;
+}
+
+namespace {
+
+const std::string HEADER_BODY_S = "HEADER";
+const std::string METHOD_BODY_S = "METHOD";
+const std::string CONTENT_BODY_S = "CONTENT";
+const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
+const std::string UNKNOWN = "unknown";
+
+std::string type_str(uint8_t type)
+{
+ switch(type) {
+ case METHOD_BODY: return METHOD_BODY_S;
+ case HEADER_BODY: return HEADER_BODY_S;
+ case CONTENT_BODY: return CONTENT_BODY_S;
+ case HEARTBEAT_BODY: return HEARTBEAT_BODY_S;
+ }
+ return UNKNOWN;
+}
+
+}
+
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ << type_str(expected) << " got " << type_str(actual) << ")"));
+ }
+}