summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp23
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp24
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.h3
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp10
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h8
6 files changed, 48 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 269fc2d423..d00a474aee 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -37,29 +37,30 @@ MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThre
void MessageBuilder::handle(AMQFrame& frame)
{
+ uint8_t type = frame.getBody()->type();
switch(state) {
case METHOD:
- checkType(METHOD_BODY, frame.getBody()->type());
+ checkType(METHOD_BODY, type);
state = HEADER;
break;
case HEADER:
- switch (frame.getBody()->type()) {
- case CONTENT_BODY:
- //TODO: rethink how to handle non-existent headers...
+ if (type == CONTENT_BODY) {
+ //TODO: rethink how to handle non-existent headers(?)...
//didn't get a header: add in a dummy
- message->getFrames().append(AMQFrame(AMQHeaderBody()));
- break;
- case HEADER_BODY:
- break;
- default:
+ AMQFrame header;
+ header.setBody(AMQHeaderBody());
+ header.setBof(false);
+ header.setEof(false);
+ message->getFrames().append(header);
+ } else if (type != HEADER_BODY) {
throw CommandInvalidException(
QPID_MSG("Invalid frame sequence for message, expected header or content got "
- << type_str(frame.getBody()->type()) << ")"));
+ << type_str(type) << ")"));
}
state = CONTENT;
break;
case CONTENT:
- checkType(CONTENT_BODY, frame.getBody()->type());
+ checkType(CONTENT_BODY, type);
break;
default:
throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 886008c213..22053260c5 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -82,9 +82,10 @@ struct MessageDeliveryToken : BaseToken
const std::string destination;
const u_int8_t confirmMode;
const u_int8_t acquireMode;
+ const bool isPreview;
- MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
- destination(d), confirmMode(c), acquireMode(a) {}
+ MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a, bool p) :
+ destination(d), confirmMode(c), acquireMode(a), isPreview(p) {}
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
{
@@ -92,9 +93,14 @@ struct MessageDeliveryToken : BaseToken
if (msg->getRedelivered()){
msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
- return AMQFrame(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, destination,
- confirmMode, acquireMode));
+ if (isPreview) {
+ return AMQFrame(in_place<MessageTransferBody>(
+ ProtocolVersion(), 0, destination,
+ confirmMode, acquireMode));
+ } else {
+ return AMQFrame(in_place<Message010TransferBody>(
+ ProtocolVersion(), destination, confirmMode, acquireMode));
+ }
}
};
@@ -114,7 +120,13 @@ DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& co
DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode, u_int8_t acquireMode)
{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode));
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false));
+}
+
+DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode, u_int8_t acquireMode)
+{
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true));
}
void MessageDelivery::deliver(QueuedMessage& msg,
diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h
index ac7818feed..564e1456a0 100644
--- a/cpp/src/qpid/broker/MessageDelivery.h
+++ b/cpp/src/qpid/broker/MessageDelivery.h
@@ -40,6 +40,9 @@ class MessageDelivery {
public:
static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
+ static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode);
static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode,
u_int8_t acquireMode);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 69cccf0ff0..c26824a8e3 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -137,7 +137,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
string tag = destination;
- state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index e012d693fb..b56b152397 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -32,6 +32,8 @@
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Message010TransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -344,8 +346,12 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- std::string exchangeName = msg->getExchangeName();
- msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ std::string exchangeName = msg->getExchangeName();
+ if (msg->isA<MessageTransferBody>()) {
+ msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ } else if (msg->isA<Message010TransferBody>()) {
+ msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName);
+ }
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 96bd396330..8a3a92936e 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -26,6 +26,8 @@
#include "Buffer.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties010.h"
+#include "qpid/framing/MessageProperties010.h"
#include <iostream>
#include <boost/optional.hpp>
@@ -75,8 +77,10 @@ class AMQHeaderBody : public AMQBody
};
// Could use boost::mpl::fold to construct a larger set.
- typedef PropSet<PropSet<Empty, DeliveryProperties>,
- MessageProperties> Properties;
+ typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>,
+ MessageProperties>,
+ DeliveryProperties010>,
+ MessageProperties010> Properties;
Properties properties;