diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-06 11:44:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-06 11:44:36 +0000 |
commit | b7c474ea80fce5d53236aeea9f74c9107da0152d (patch) | |
tree | 6d0a36c807c309cdebe3ba18dfe4c0bbbb75bfb8 /cpp/src | |
parent | 7e162fa97ef0d430714b9630121a055fe5adece9 (diff) | |
download | qpid-python-b7c474ea80fce5d53236aeea9f74c9107da0152d.tar.gz |
Fix message delivery for 0-10 final codepath
Convert two more python tests to use 0-10 client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634229 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQHeaderBody.h | 8 |
6 files changed, 48 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 269fc2d423..d00a474aee 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -37,29 +37,30 @@ MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThre void MessageBuilder::handle(AMQFrame& frame) { + uint8_t type = frame.getBody()->type(); switch(state) { case METHOD: - checkType(METHOD_BODY, frame.getBody()->type()); + checkType(METHOD_BODY, type); state = HEADER; break; case HEADER: - switch (frame.getBody()->type()) { - case CONTENT_BODY: - //TODO: rethink how to handle non-existent headers... + if (type == CONTENT_BODY) { + //TODO: rethink how to handle non-existent headers(?)... //didn't get a header: add in a dummy - message->getFrames().append(AMQFrame(AMQHeaderBody())); - break; - case HEADER_BODY: - break; - default: + AMQFrame header; + header.setBody(AMQHeaderBody()); + header.setBof(false); + header.setEof(false); + message->getFrames().append(header); + } else if (type != HEADER_BODY) { throw CommandInvalidException( QPID_MSG("Invalid frame sequence for message, expected header or content got " - << type_str(frame.getBody()->type()) << ")")); + << type_str(type) << ")")); } state = CONTENT; break; case CONTENT: - checkType(CONTENT_BODY, frame.getBody()->type()); + checkType(CONTENT_BODY, type); break; default: throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 886008c213..22053260c5 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -82,9 +82,10 @@ struct MessageDeliveryToken : BaseToken const std::string destination; const u_int8_t confirmMode; const u_int8_t acquireMode; + const bool isPreview; - MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : - destination(d), confirmMode(c), acquireMode(a) {} + MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a, bool p) : + destination(d), confirmMode(c), acquireMode(a), isPreview(p) {} AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/) { @@ -92,9 +93,14 @@ struct MessageDeliveryToken : BaseToken if (msg->getRedelivered()){ msg->getProperties<DeliveryProperties>()->setRedelivered(true); } - return AMQFrame(in_place<MessageTransferBody>( - ProtocolVersion(), 0, destination, - confirmMode, acquireMode)); + if (isPreview) { + return AMQFrame(in_place<MessageTransferBody>( + ProtocolVersion(), 0, destination, + confirmMode, acquireMode)); + } else { + return AMQFrame(in_place<Message010TransferBody>( + ProtocolVersion(), destination, confirmMode, acquireMode)); + } } }; @@ -114,7 +120,13 @@ DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& co DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, u_int8_t confirmMode, u_int8_t acquireMode) { - return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode)); + return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false)); +} + +DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination, + u_int8_t confirmMode, u_int8_t acquireMode) +{ + return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true)); } void MessageDelivery::deliver(QueuedMessage& msg, diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h index ac7818feed..564e1456a0 100644 --- a/cpp/src/qpid/broker/MessageDelivery.h +++ b/cpp/src/qpid/broker/MessageDelivery.h @@ -40,6 +40,9 @@ class MessageDelivery { public: static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue); static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer); + static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination, + u_int8_t confirmMode, + u_int8_t acquireMode); static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination, u_int8_t confirmMode, u_int8_t acquireMode); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 69cccf0ff0..c26824a8e3 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -137,7 +137,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); string tag = destination; - state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index e012d693fb..b56b152397 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -32,6 +32,8 @@ #include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/Message010TransferBody.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -344,8 +346,12 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - std::string exchangeName = msg->getExchangeName(); - msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); + std::string exchangeName = msg->getExchangeName(); + if (msg->isA<MessageTransferBody>()) { + msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); + } else if (msg->isA<Message010TransferBody>()) { + msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName); + } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); } diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h index 96bd396330..8a3a92936e 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/cpp/src/qpid/framing/AMQHeaderBody.h @@ -26,6 +26,8 @@ #include "Buffer.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties010.h" +#include "qpid/framing/MessageProperties010.h" #include <iostream> #include <boost/optional.hpp> @@ -75,8 +77,10 @@ class AMQHeaderBody : public AMQBody }; // Could use boost::mpl::fold to construct a larger set. - typedef PropSet<PropSet<Empty, DeliveryProperties>, - MessageProperties> Properties; + typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>, + MessageProperties>, + DeliveryProperties010>, + MessageProperties010> Properties; Properties properties; |