diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageBuilder.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageBuilder.cpp | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp new file mode 100644 index 0000000000..f5e9332052 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageBuilder.h" + +#include "qpid/broker/Message.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" + +using boost::intrusive_ptr; +using namespace qpid::broker; +using namespace qpid::framing; + +namespace +{ + std::string type_str(uint8_t type); + const std::string QPID_MANAGEMENT("qpid.management"); +} + +MessageBuilder::MessageBuilder() : state(DORMANT) {} + +void MessageBuilder::handle(AMQFrame& frame) +{ + uint8_t type = frame.getBody()->type(); + switch(state) { + case METHOD: + checkType(METHOD_BODY, type); + if (!frame.getMethod()->isA<qpid::framing::MessageTransferBody>()) + throw NotImplementedException(QPID_MSG("Unexpected method: " << *(frame.getMethod()))); + + exchange = frame.castBody<qpid::framing::MessageTransferBody>()->getDestination(); + state = HEADER; + break; + case HEADER: + if (type == CONTENT_BODY) { + //TODO: rethink how to handle non-existent headers(?)... + //didn't get a header: add in a dummy + AMQFrame header((AMQHeaderBody())); + header.setBof(false); + header.setEof(false); + message->getFrames().append(header); + } else if (type == HEADER_BODY) { + frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange); + } else { + throw CommandInvalidException( + QPID_MSG("Invalid frame sequence for message, expected header or content got " + << type_str(type) << ")")); + } + state = CONTENT; + break; + case CONTENT: + checkType(CONTENT_BODY, type); + break; + default: + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); + } + message->getFrames().append(frame); +} + +void MessageBuilder::end() +{ + message->computeRequiredCredit(); + message = 0; + state = DORMANT; +} + +void MessageBuilder::start(const SequenceNumber& id) +{ + message = intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer>(new qpid::broker::amqp_0_10::MessageTransfer(id)); + state = METHOD; +} + +namespace { + +const std::string HEADER_BODY_S = "HEADER"; +const std::string METHOD_BODY_S = "METHOD"; +const std::string CONTENT_BODY_S = "CONTENT"; +const std::string HEARTBEAT_BODY_S = "HEARTBEAT"; +const std::string UNKNOWN = "unknown"; + +std::string type_str(uint8_t type) +{ + switch(type) { + case METHOD_BODY: return METHOD_BODY_S; + case HEADER_BODY: return HEADER_BODY_S; + case CONTENT_BODY: return CONTENT_BODY_S; + case HEARTBEAT_BODY: return HEARTBEAT_BODY_S; + } + return UNKNOWN; +} + +} + +void MessageBuilder::checkType(uint8_t expected, uint8_t actual) +{ + if (expected != actual) { + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected " + << type_str(expected) << " got " << type_str(actual) << ")")); + } +} + +boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> MessageBuilder::getMessage() { return message; } |