summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-24 14:27:31 +0000
committerGordon Sim <gsim@apache.org>2007-07-24 14:27:31 +0000
commita6303894d7f9a24df4a691af3ce94647c033ebff (patch)
tree943b75df7528c9fbff6b3170c3c4b66758bf22ad
parent9f120205e0d7a0b2666b9fd21a5296db07e32fd8 (diff)
downloadqpid-python-a6303894d7f9a24df4a691af3ce94647c033ebff.tar.gz
Initial support for latest approved 0-10 xml (with some transitional hacks included).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559059 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java15
-rw-r--r--cpp/src/Makefile.am2
-rwxr-xr-xcpp/src/generate.sh2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp103
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h21
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp23
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h3
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.h3
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h2
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp22
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp17
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp1
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp5
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h2
-rw-r--r--cpp/src/qpid/broker/Deliverable.h2
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp4
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h2
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp5
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp25
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp7
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h2
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp1
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp10
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp33
-rw-r--r--cpp/src/qpid/client/MessageMessageChannel.cpp431
-rw-r--r--cpp/src/qpid/client/MessageMessageChannel.h84
-rw-r--r--cpp/src/qpid/client/MethodBodyInstances.h34
-rw-r--r--cpp/src/tests/FramingTest.cpp24
-rw-r--r--cpp/src/tests/QueueTest.cpp2
-rwxr-xr-xcpp/src/tests/python_tests2
-rw-r--r--cpp/xml/cluster.xml2
33 files changed, 218 insertions, 677 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
index fd3684125c..f31f9615fc 100644
--- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
+++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
@@ -141,6 +141,21 @@ public class CppGenerator extends Generator
"8", // size
"buffer.putLongLong(#)", // encodeExpression
"# = buffer.getLongLong()")); // decodeExpression
+
+ //some 0-10 types:
+
+ typeMap.put("uuid", new DomainInfo(
+ "string", // type
+ "4 + #.length()", // size
+ "buffer.putLongString(#)", // encodeExpression
+ "buffer.getLongString(#)")); // decodeExpression
+
+ //NB: this is WRONG! but is here as a transitional aid
+ typeMap.put("rfc1982-long-set", new DomainInfo(
+ "u_int16_t", // type
+ "2", // size
+ "buffer.putShort(#)", // encodeExpression
+ "# = buffer.getShort()")); // decodeExpression
}
public boolean isQuietFlag()
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 0231a58217..17cb66f9ff 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -241,7 +241,6 @@ libqpidclient_la_SOURCES = \
qpid/client/ClientExchange.cpp \
qpid/client/ClientQueue.cpp \
qpid/client/BasicMessageChannel.cpp \
- qpid/client/MessageMessageChannel.cpp \
qpid/client/Connector.cpp \
qpid/client/IncomingMessage.cpp \
qpid/client/MessageListener.cpp \
@@ -329,7 +328,6 @@ nobase_include_HEADERS = \
qpid/client/IncomingMessage.h \
qpid/client/MessageChannel.h \
qpid/client/MessageListener.h \
- qpid/client/MessageMessageChannel.h \
qpid/client/MethodBodyInstances.h \
qpid/client/ResponseHandler.h \
qpid/client/ReturnedMessageHandler.h \
diff --git a/cpp/src/generate.sh b/cpp/src/generate.sh
index 4f97f72684..1d8f946ecd 100755
--- a/cpp/src/generate.sh
+++ b/cpp/src/generate.sh
@@ -7,7 +7,7 @@ set -e
gentools_dir="$srcdir/../gentools"
specs_dir="$srcdir/../../specs"
-specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml $srcdir/../xml/cluster.xml"
+specs="$specs_dir/amqp-transitional.0-10.xml $srcdir/../xml/cluster.xml"
test -z "$JAVA" && JAVA=java ;
test -z "$JAVAC" && JAVAC=javac ;
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 9bf148bcf0..376108193a 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -55,8 +55,7 @@ ProtocolVersion BrokerAdapter::getVersion() const {
void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
- // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(std::string()/* ID */);
+ client.openOk();
}
void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
@@ -80,41 +79,63 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& args){
+ const string& alternateExchange,
+ bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = broker.getExchanges().get(alternateExchange);
+ }
if(passive){
- if(!broker.getExchanges().get(exchange)) {
- throw ChannelException(404, "Exchange not found: " + exchange);
- }
+ Exchange::shared_ptr actual(broker.getExchanges().get(exchange));
+ checkType(actual, type);
+ checkAlternate(actual, alternate);
}else{
try{
std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
if (response.second) {
- if (durable) broker.getStore().create(*response.first);
- } else if (response.first->getType() != type) {
- throw ConnectionException(
- 530,
- "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
+ if (durable) {
+ broker.getStore().create(*response.first);
+ }
+ if (alternate) {
+ response.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ } else {
+ checkType(response.first, type);
+ checkAlternate(response.first, alternate);
}
}catch(UnknownExchangeTypeException& e){
throw ConnectionException(
503, "Exchange type not implemented: " + type);
}
}
- if(!nowait){
- client.declareOk();
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
+{
+ if (!type.empty() && exchange->getType() != type) {
+ throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+ }
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
+{
+ if (alternate && alternate != exchange->getAlternate()) {
+ throw ConnectionException(530, "Exchange declared with alternate-exchange "
+ + exchange->getAlternate()->getName() + ", requested "
+ + alternate->getName());
}
+
}
-void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
- const string& name, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk();
}
void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
@@ -159,12 +180,17 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
}
-void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name,
+void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = broker.getExchanges().get(alternateExchange);
+ }
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = getQueue(name);
+ //TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
@@ -175,6 +201,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
assert(queue);
if (queue_created.second) { // This is a new queue
channel.setDefaultQueue(queue);
+ if (alternate) {
+ queue->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
//apply settings & create persistent record if required
queue_created.first->create(arguments);
@@ -201,7 +232,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
+ const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
Queue::shared_ptr queue = getQueue(queueName);
@@ -214,7 +245,6 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk();
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -238,7 +268,6 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk();
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
@@ -280,7 +309,6 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk();
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -314,12 +342,12 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool now
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate)
+ bool rejectUnroutable, bool immediate)
{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate);
channel.handlePublish(msg);
}else{
throw ChannelException(
@@ -351,19 +379,16 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
void BrokerAdapter::TxHandlerImpl::select()
{
channel.startTx();
- client.selectOk();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
channel.commit();
- client.commitOk();
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
channel.rollback();
- client.rollbackOk();
channel.recover(false);
}
@@ -372,28 +397,6 @@ void BrokerAdapter::ChannelHandlerImpl::ok()
//no specific action required, generic response handling should be sufficient
}
-
-//
-// Message class method handlers
-//
-void BrokerAdapter::ChannelHandlerImpl::ping()
-{
- client.ok();
- client.pong();
-}
-
-
-void
-BrokerAdapter::ChannelHandlerImpl::pong()
-{
- client.ok();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
void BrokerAdapter::setResponseTo(RequestId r)
{
basicHandler.client.setResponseTo(r);
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index a7e27a0ee6..4ae8346580 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -72,10 +72,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
throw ConnectionException(540, "File class not implemented"); }
StreamHandler* getStreamHandler() {
throw ConnectionException(540, "Stream class not implemented"); }
- DtxHandler* getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented"); }
TunnelHandler* getTunnelHandler() {
throw ConnectionException(540, "Tunnel class not implemented"); }
+ SessionHandler* getSessionHandler() { throw ConnectionException(503, "Session class not implemented yet"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
@@ -117,13 +116,16 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
void declare(uint16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
+ const std::string& exchange, const std::string& type,
+ const std::string& alternateExchange,
+ bool passive, bool durable, bool autoDelete,
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
+ const std::string& exchange, bool ifUnused);
void query(u_int16_t ticket, const string& name);
+ private:
+ void checkType(Exchange::shared_ptr exchange, const std::string& type);
+ void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
};
class BindingHandlerImpl :
@@ -147,13 +149,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void declare(uint16_t ticket, const std::string& queue,
+ void declare(uint16_t ticket, const std::string& queue,
+ const std::string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
const qpid::framing::FieldTable& arguments);
void bind(uint16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
+ const qpid::framing::FieldTable& arguments);
void unbind(uint16_t ticket,
const std::string& queue,
const std::string& exchange,
@@ -186,7 +189,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
bool nowait);
void publish(uint16_t ticket,
const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
+ bool rejectUnroutable, bool immediate);
void get(uint16_t ticket, const std::string& queue,
bool noAck);
void ack(uint64_t deliveryTag, bool multiple);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 9b6bdf5a2b..a598717c5d 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -280,22 +280,31 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
}
void Channel::complete(Message::shared_ptr msg) {
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- assert(exchange.get());
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
- exchange->route(*deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
+ route(msg, *deliverable);
txBuffer->enlist(op);
} else {
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
+ route(msg, deliverable);
}
}
+void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
+ Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ exchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ if (!strategy.delivered) {
+ //TODO:if reject-unroutable, then reject
+ //else route to alternate exchange
+ if (exchange->getAlternate()) {
+ exchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ }
+
+}
+
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index a2b6bd3ef9..a70dce0ce8 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -33,6 +33,7 @@
#include "Consumer.h"
#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
+#include "Deliverable.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
#include "MessageBuilder.h"
@@ -102,7 +103,7 @@ class Channel : public CompletionHandler
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
-
+ void route(Message::shared_ptr msg, Deliverable& strategy);
void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h
index 968775cfe5..91c295e1b7 100644
--- a/cpp/src/qpid/broker/BrokerExchange.h
+++ b/cpp/src/qpid/broker/BrokerExchange.h
@@ -48,7 +48,7 @@ namespace qpid {
explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
- : name(_name), durable(_durable), args(_args), persistenceId(0){}
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
virtual ~Exchange(){}
string getName() const { return name; }
@@ -59,6 +59,7 @@ namespace qpid {
void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; }
void incAlternateUsers() { alternateUsers++; }
void decAlternateUsers() { alternateUsers--; }
+ bool inUseAsAlternate() { return alternateUsers > 0; }
virtual string getType() const = 0;
virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index a6b039cd4d..73af3935a8 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -165,6 +165,8 @@ class Message : public PersistableMessage{
*/
virtual void releaseContent(MessageStore* /*store*/) {};
+ bool isImmediate() const { return immediate; }
+
private:
const ConnectionToken* publisher;
std::string exchange;
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 01f8250b84..efa295e44f 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -43,7 +43,7 @@ MessageMessage::MessageMessage(
ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
- transfer_->getMandatory(),
+ transfer_->getRejectUnroutable(),
transfer_->getImmediate(),
transfer_),
requestId(requestId_),
@@ -57,7 +57,7 @@ MessageMessage::MessageMessage(
ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
- transfer_->getMandatory(),
+ transfer_->getRejectUnroutable(),
transfer_->getImmediate(),
transfer_),
requestId(requestId_),
@@ -113,6 +113,7 @@ void MessageMessage::transferMessage(
transfer->getTicket(),
consumerTag,
getRedelivered(),
+ transfer->getRejectUnroutable(),
transfer->getImmediate(),
transfer->getTtl(),
transfer->getPriority(),
@@ -126,13 +127,14 @@ void MessageMessage::transferMessage(
transfer->getReplyTo(),
transfer->getContentType(),
transfer->getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
transfer->getUserId(),
transfer->getAppId(),
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory())));
+ body)));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
@@ -143,6 +145,7 @@ void MessageMessage::transferMessage(
transfer->getTicket(),
consumerTag,
getRedelivered(),
+ transfer->getRejectUnroutable(),
transfer->getImmediate(),
transfer->getTtl(),
transfer->getPriority(),
@@ -156,13 +159,14 @@ void MessageMessage::transferMessage(
transfer->getReplyTo(),
transfer->getContentType(),
transfer->getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
transfer->getUserId(),
transfer->getAppId(),
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname),
- transfer->getMandatory()));
+ framing::Content(REFERENCE, refname)));
ReferencePtr newRef(new Reference(refname));
Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
newRef->append(newAppend);
@@ -288,6 +292,7 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version
transfer->getTicket(),
destination,
getRedelivered(),
+ transfer->getRejectUnroutable(),
transfer->getImmediate(),
transfer->getTtl(),
transfer->getPriority(),
@@ -301,13 +306,14 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version
transfer->getReplyTo(),
transfer->getContentType(),
transfer->getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
transfer->getUserId(),
transfer->getAppId(),
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory());
+ body);
}
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index cf6beff375..f8bffa01a3 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -56,8 +56,15 @@ Queue::Queue(const string& _name, bool _autodelete,
Queue::~Queue(){}
void Queue::deliver(Message::shared_ptr& msg){
- enqueue(0, msg);
- process(msg);
+ if (msg->isImmediate() && getConsumerCount() == 0) {
+ if (alternateExchange) {
+ DeliverableMessage deliverable(msg);
+ alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ } else {
+ enqueue(0, msg);
+ process(msg);
+ }
}
void Queue::recover(Message::shared_ptr& msg){
@@ -255,6 +262,7 @@ void Queue::destroy()
&(msg.getMessage().getApplicationHeaders()));
pop();
}
+ alternateExchange->decAlternateUsers();
}
if (store) {
@@ -318,3 +326,8 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
}
+
+boost::shared_ptr<Exchange> Queue::getAlternateExchange()
+{
+ return alternateExchange;
+}
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 0ed368e404..f82a7dac55 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -155,7 +155,7 @@ namespace qpid {
const QueuePolicy* const getPolicy();
void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
-
+ boost::shared_ptr<Exchange> getAlternateExchange();
//PersistableQueue support:
uint64_t getPersistenceId() const;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 7a987f28d2..5b22167323 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -87,6 +87,7 @@ void Connection::closed(){
broker.getQueues().destroy(q->getName());
exclusiveQueues.erase(exclusiveQueues.begin());
q->unbind(broker.getExchanges(), q);
+ q->destroy();
}
} catch(std::exception& e) {
QPID_LOG(error, " Unhandled exception while closing session: " <<
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index bb2a66bfdb..65933660f1 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -106,15 +106,14 @@ void Handler::open(const string& /*virtualHost*/,
const string& /*capabilities*/, bool /*insist*/)
{
string knownhosts;
- client.openOk(
- knownhosts);//GRS, context.getRequestId());
+ client.openOk(knownhosts);
}
void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk();//GRS context.getRequestId());
+ client.closeOk();
connection.getOutput().close();
}
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index b624102cd2..6890b014a4 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -67,11 +67,11 @@ public:
AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
- DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ SessionHandler* getSessionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
framing::ProtocolVersion getVersion() const;
};
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index 1570917849..cd1dbaa85d 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -27,6 +27,8 @@ namespace qpid {
namespace broker {
class Deliverable{
public:
+ bool delivered;
+ Deliverable() : delivered(false) {}
virtual void deliverTo(Queue::shared_ptr& queue) = 0;
virtual ~Deliverable(){}
};
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index a713f306a8..9a3752d71c 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -29,9 +29,11 @@ DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
{
queue->deliver(msg);
+ delivered = true;
}
Message& DeliverableMessage::getMessage()
{
return *msg;
}
+
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index d1f925e40c..72d3888e37 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -52,7 +52,6 @@ const int XA_OK(8);
void DtxHandlerImpl::select()
{
channel.selectDtx();
- dClient.selectOk();
}
void DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -140,7 +139,7 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
bool /*startscan*/,
- u_int32_t /*endscan*/ )
+ bool /*endscan*/ )
{
//TODO: what do startscan and endscan actually mean?
@@ -193,7 +192,6 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
u_int32_t timeout)
{
broker.getDtxManager().setTimeout(xid, timeout);
- cClient.setTimeoutOk();
}
void DtxHandlerImpl::setResponseTo(framing::RequestId r)
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index e18d3c153d..6139b95bd6 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -48,7 +48,7 @@ public:
void prepare(u_int16_t ticket, const std::string& xid);
- void recover(u_int16_t ticket, bool startscan, u_int32_t endscan);
+ void recover(u_int16_t ticket, bool startscan, bool endscan);
void rollback(u_int16_t ticket, const std::string& xid);
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 732d45dc44..edc9a5b63b 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -72,8 +72,9 @@ void ExchangeRegistry::destroy(const string& name){
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end())
- throw ChannelException(404, "Exchange not found:" + name);
+ if (i == exchanges.end()) {
+ throw ChannelException(404, "Exchange not found: " + name);
+ }
return i->second;
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index de32368158..41dd8cc145 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -45,14 +45,14 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- client.ok();
+ //client.ok();
}
void
MessageHandlerImpl::open(const string& reference)
{
references.open(reference);
- client.ok();
+ //client.ok();
}
void
@@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context)
{
MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
references.get(body->getReference())->append(body);
- client.ok();
+ //client.ok();
}
void
MessageHandlerImpl::close(const string& reference)
{
- Reference::shared_ptr ref = references.get(reference);
- client.ok();
+ Reference::shared_ptr ref = references.get(reference);
+ //client.ok();
// Send any transfer messages to their correct exchanges and okay them
const Reference::Messages& msgs = ref->getMessages();
@@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/,
{
// Initial implementation (which is conforming) is to do nothing here
// and return offset zero for the resume
- client.ok();
+ //client.ok();
}
void
@@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok();
+ //client.ok();
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -137,10 +137,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
Queue::shared_ptr queue = getQueue(queueName);
GetAdapter out(adapter, queue, destination, connection.getFrameMax());
- if(channel.get(out, queue, !noAck))
+ if(channel.get(out, queue, !noAck)) {
client.ok();
- else
+ } else {
client.empty();
+ }
}
void
@@ -166,14 +167,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok();
+ //client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- client.ok();
+ //client.ok();
}
void
@@ -192,7 +193,7 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok();
+ client.ok();
} else {
Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f616ec2db8..2b1de1bbc0 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -75,7 +75,7 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
}
}
-void SemanticHandler::complete(u_int32_t mark)
+void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/)
{
//just record it for now (will eventually need to use it to ack messages):
outgoing.lwm = SequenceNumber(mark);
@@ -85,7 +85,10 @@ void SemanticHandler::flush()
{
//flush doubles as a sync to begin with - send an execution.complete
incoming.lwm = incoming.hwm;
- send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue())));
+ if (isOpen()) {
+ /*use dummy value for range which is not yet encoded correctly*/
+ send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0)));
+ }
}
void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 6003bbec0c..a57559d043 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -60,7 +60,7 @@ public:
void handle(framing::AMQFrame& frame);
//execution class method handlers:
- void complete(u_int32_t cumulativeExecutionMark);
+ void complete(uint32_t cumulativeExecutionMark, uint16_t);
void flush();
};
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 6e03f37dcd..db02673b1f 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -44,6 +44,7 @@ void TxPublish::rollback() throw(){
void TxPublish::deliverTo(Queue::shared_ptr& queue){
queues.push_back(queue);
+ delivered = true;
}
TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg)
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
index a1aacdee4e..70cb473426 100644
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -197,11 +197,6 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
msg.setRedelivered(deliver->getRedelivered());
return;
}
- case BasicReturnBody::METHOD_ID: {
- incoming.openReference(BASIC_REF);
- incoming.createMessage(BASIC_RETURN, BASIC_REF);
- return;
- }
case BasicConsumeOkBody::METHOD_ID: {
Mutex::ScopedLock l(lock);
BasicConsumeOkBody::shared_ptr consumeOk =
@@ -332,10 +327,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand
}
void BasicMessageChannel::setQos(){
- channel.sendAndReceive<BasicQosOkBody>(
- make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
+ channel.send(make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
+ channel.send(make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 816ff05e85..0033cbdbe4 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -27,7 +27,6 @@
#include "MethodBodyInstances.h"
#include "Connection.h"
#include "BasicMessageChannel.h"
-#include "MessageMessageChannel.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -39,12 +38,18 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+namespace qpid{
+namespace client{
+
+const std::string empty;
+
+}}
+
Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
switch (mode) {
case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
- case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break;
default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
}
}
@@ -138,17 +143,14 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- sendAndReceiveSync<ExchangeDeclareOkBody>(
- synch,
- make_shared_ptr(new ExchangeDeclareBody(
- version, 0, name, type, false, false, false, false, !synch, args)));
+ send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args)));
+ if (synch) synchWithServer();
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- sendAndReceiveSync<ExchangeDeleteOkBody>(
- synch,
- make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
+ send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false)));
+ if (synch) synchWithServer();
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -158,7 +160,7 @@ void Channel::declareQueue(Queue& queue, bool synch){
sendAndReceiveSync<QueueDeclareOkBody>(
synch,
make_shared_ptr(new QueueDeclareBody(
- version, 0, name, false/*passive*/, queue.isDurable(),
+ version, 0, name, empty, false/*passive*/, queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
if(synch) {
if(queue.getName().length() == 0)
@@ -177,17 +179,16 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- sendAndReceiveSync<QueueBindOkBody>(
- synch,
- make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
+ send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args)));
+ if (synch) synchWithServer();
}
void Channel::commit(){
- sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
+ send(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
+ send(make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::handleMethodInContext(
@@ -206,7 +207,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
}
try {
switch (method->amqpClassId()) {
- case MessageOkBody::CLASS_ID:
+ case MessageTransferBody::CLASS_ID:
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
diff --git a/cpp/src/qpid/client/MessageMessageChannel.cpp b/cpp/src/qpid/client/MessageMessageChannel.cpp
deleted file mode 100644
index 2a8f7a01c1..0000000000
--- a/cpp/src/qpid/client/MessageMessageChannel.cpp
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <boost/format.hpp>
-#include "MessageMessageChannel.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "ClientChannel.h"
-#include "ReturnedMessageHandler.h"
-#include "MessageListener.h"
-#include "qpid/framing/FieldTable.h"
-#include "Connection.h"
-#include "qpid/shared_ptr.h"
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace std;
-using namespace sys;
-using namespace framing;
-
-MessageMessageChannel::MessageMessageChannel(Channel& ch)
- : channel(ch), tagCount(0) {}
-
-string MessageMessageChannel::newTag() {
- Mutex::ScopedLock l(lock);
- return (boost::format("__tag%d")%++tagCount).str();
-}
-
-void MessageMessageChannel::consume(
- Queue& queue, std::string& tag, MessageListener* /*listener*/,
- AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields)
-{
- if (tag.empty())
- tag = newTag();
- channel.sendAndReceive<MessageOkBody>(
- make_shared_ptr(new MessageConsumeBody(
- channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
-
-// // FIXME aconway 2007-02-20: Race condition!
-// // We could receive the first message for the consumer
-// // before we create the consumer below.
-// // Move consumer creation to handler for MessageConsumeOkBody
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if (i != consumers.end())
-// THROW_QPID_ERROR(CLIENT_ERROR,
-// "Consumer already exists with tag="+tag);
-// Consumer& c = consumers[tag];
-// c.listener = listener;
-// c.ackMode = ackMode;
-// c.lastDeliveryTag = 0;
-// }
-}
-
-
-void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) {
- // FIXME aconway 2007-02-23:
-// Consumer c;
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if (i == consumers.end())
-// return;
-// c = i->second;
-// consumers.erase(i);
-// }
-// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
-// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
-// channel.sendAndReceiveSync<MessageCancelOkBody>(
-// synch, new MessageCancelBody(channel.version, tag, !synch));
-}
-
-void MessageMessageChannel::close(){
- // FIXME aconway 2007-02-23:
-// ConsumerMap consumersCopy;
-// {
-// Mutex::ScopedLock l(lock);
-// consumersCopy = consumers;
-// consumers.clear();
-// }
-// for (ConsumerMap::iterator i=consumersCopy.begin();
-// i != consumersCopy.end(); ++i)
-// {
-// Consumer& c = i->second;
-// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
-// && c.lastDeliveryTag > 0)
-// {
-// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
-// }
-// }
-// incoming.shutdown();
-}
-
-void MessageMessageChannel::cancelAll(){
-}
-
-/** Destination ID for the current get.
- * Must not clash with a generated consumer ID.
- * TODO aconway 2007-03-06: support multiple outstanding gets?
- */
-const string getDestinationId("__get__");
-
-/**
- * A destination that provides a Correlator::Action to handle
- * MessageEmpty responses.
- */
-struct MessageGetDestination : public IncomingMessage::WaitableDestination
-{
- void response(shared_ptr<AMQResponseBody> response) {
- if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
- switch (response->amqpMethodId()) {
- case MessageOkBody::METHOD_ID:
- // Nothing to do, wait for transfer.
- return;
- case MessageEmptyBody::METHOD_ID:
- empty(); // Wake up waiter with empty queue.
- return;
- }
- }
- throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
- }
-
- Correlator::Action action() {
- return boost::bind(&MessageGetDestination::response, this, _1);
- }
-};
-
-bool MessageMessageChannel::get(
- Message& msg, const Queue& queue, AckMode ackMode)
-{
- Mutex::ScopedLock l(lock);
- std::string destName=newTag();
- MessageGetDestination dest;
- incoming.addDestination(destName, dest);
- channel.send(
- make_shared_ptr(
- new MessageGetBody(
- channel.version, 0, queue.getName(), destName, ackMode)),
- dest.action());
- return dest.wait(msg);
-}
-
-
-/** Convert a message to a transfer command. */
-MessageTransferBody::shared_ptr makeTransfer(
- ProtocolVersion version,
- const Message& msg, const string& destination,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- return MessageTransferBody::shared_ptr(
- new MessageTransferBody(
- version,
- 0, // FIXME aconway 2007-04-03: ticket.
- destination,
- msg.isRedelivered(),
- immediate,
- 0, // FIXME aconway 2007-02-23: ttl
- msg.getPriority(),
- msg.getTimestamp(),
- static_cast<uint8_t>(msg.getDeliveryMode()),
- 0, // FIXME aconway 2007-04-03: Expiration
- string(), // Exchange: for broker use only.
- routingKey,
- msg.getMessageId(),
- msg.getCorrelationId(),
- msg.getReplyTo(),
- msg.getContentType(),
- msg.getContentEncoding(),
- msg.getUserId(),
- msg.getAppId(),
- string(), // FIXME aconway 2007-04-03: TransactionId
- string(), //FIXME aconway 2007-04-03: SecurityToken
- msg.getHeaders(),
- Content(INLINE, msg.getData()),
- mandatory
- ));
-}
-
-// FIXME aconway 2007-04-05: Generated code should provide this.
-/**
- * Calculate the size of a frame containing the given body type
- * if all variable-lengths parts are empty.
- */
-template <class T> size_t overhead() {
- static AMQFrame frame(
- ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
- return frame.size();
-}
-
-void MessageMessageChannel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- MessageTransferBody::shared_ptr transfer = makeTransfer(
- channel.getVersion(),
- msg, exchange.getName(), routingKey, mandatory, immediate);
- // Frame itself uses 8 bytes.
- u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
- if (transfer->size() <= frameMax) {
- channel.sendAndReceive<MessageOkBody>(transfer);
- }
- else {
- std::string ref = newTag();
- std::string data = transfer->getBody().getValue();
- size_t chunk =
- channel.connection->getMaxFrameSize() -
- (overhead<MessageAppendBody>() + ref.size());
- // TODO aconway 2007-04-05: cast around lack of generated setters
- const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
- channel.send(
- make_shared_ptr(new MessageOpenBody(channel.version, ref)));
- channel.send(transfer);
- const char* p = data.data();
- const char* end = data.data()+data.size();
- while (p+chunk <= end) {
- channel.send(
- make_shared_ptr(
- new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
- p += chunk;
- }
- if (p < end) {
- channel.send(
- make_shared_ptr(
- new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
- }
- channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
- }
-}
-
-void copy(Message& msg, MessageTransferBody& transfer) {
- // FIXME aconway 2007-04-05: Verify all required fields
- // are copied.
- msg.setContentType(transfer.getContentType());
- msg.setContentEncoding(transfer.getContentEncoding());
- msg.setHeaders(transfer.getApplicationHeaders());
- msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
- msg.setPriority(transfer.getPriority());
- msg.setCorrelationId(transfer.getCorrelationId());
- msg.setReplyTo(transfer.getReplyTo());
- // FIXME aconway 2007-04-05: TTL/Expiration
- msg.setMessageId(transfer.getMessageId());
- msg.setTimestamp(transfer.getTimestamp());
- msg.setUserId(transfer.getUserId());
- msg.setAppId(transfer.getAppId());
- msg.setDestination(transfer.getDestination());
- msg.setRedelivered(transfer.getRedelivered());
- msg.setDeliveryTag(0); // No meaning in 0-9
- if (transfer.getBody().isInline())
- msg.setData(transfer.getBody().getValue());
-}
-
-void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
- assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
- switch(method->amqpMethodId()) {
- case MessageAppendBody::METHOD_ID: {
- MessageAppendBody::shared_ptr append =
- shared_polymorphic_downcast<MessageAppendBody>(method);
- incoming.appendReference(append->getReference(), append->getBytes());
- break;
- }
- case MessageOpenBody::METHOD_ID: {
- MessageOpenBody::shared_ptr open =
- shared_polymorphic_downcast<MessageOpenBody>(method);
- incoming.openReference(open->getReference());
- break;
- }
-
- case MessageCloseBody::METHOD_ID: {
- MessageCloseBody::shared_ptr close =
- shared_polymorphic_downcast<MessageCloseBody>(method);
- incoming.closeReference(close->getReference());
- break;
- }
-
- case MessageTransferBody::METHOD_ID: {
- MessageTransferBody::shared_ptr transfer=
- shared_polymorphic_downcast<MessageTransferBody>(method);
- if (transfer->getBody().isInline()) {
- Message msg;
- copy(msg, *transfer);
- // Deliver it.
- incoming.getDestination(transfer->getDestination()).message(msg);
- }
- else {
- Message& msg=incoming.createMessage(
- transfer->getDestination(), transfer->getBody().getValue());
- copy(msg, *transfer);
- // Will be delivered when reference closes.
- }
- break;
- }
-
- case MessageEmptyBody::METHOD_ID:
- case MessageOkBody::METHOD_ID:
- // Nothing to do
- break;
-
- // FIXME aconway 2007-04-03: TODO
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
- case MessageOffsetBody::METHOD_ID:
- case MessageQosBody::METHOD_ID:
- case MessageRecoverBody::METHOD_ID:
- case MessageRejectBody::METHOD_ID:
- case MessageResumeBody::METHOD_ID:
- break;
- default:
- throw Channel::UnknownMethod();
- }
-}
-
-void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){
- throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
-}
-
-void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){
- throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
-}
-
-// FIXME aconway 2007-02-23:
-// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){
-// //record delivery tag:
-// consumer.lastDeliveryTag = msg.getDeliveryTag();
-
-// //allow registered listener to handle the message
-// consumer.listener->received(msg);
-
-// if(channel.isOpen()){
-// bool multiple(false);
-// switch(consumer.ackMode){
-// case LAZY_ACK:
-// multiple = true;
-// if(++(consumer.count) < channel.getPrefetch())
-// break;
-// //else drop-through
-// case AUTO_ACK:
-// consumer.lastDeliveryTag = 0;
-// channel.send(
-// new MessageAckBody(
-// channel.version, msg.getDeliveryTag(), multiple));
-// case NO_ACK: // Nothing to do
-// case CLIENT_ACK: // User code must ack.
-// break;
-// // TODO aconway 2007-02-22: Provide a way for user
-// // to ack!
-// }
-// }
-
-// //as it stands, transactionality is entirely orthogonal to ack
-// //mode, though the acks will not be processed by the broker under
-// //a transaction until it commits.
-// }
-
-
-void MessageMessageChannel::run() {
- // FIXME aconway 2007-02-23:
-// while(channel.isOpen()) {
-// try {
-// Message msg = incoming.waitDispatch();
-// if(msg.getMethod()->isA<MessageReturnBody>()) {
-// ReturnedMessageHandler* handler=0;
-// {
-// Mutex::ScopedLock l(lock);
-// handler=returnsHandler;
-// }
-// if(handler == 0) {
-// // TODO aconway 2007-02-20: proper logging.
-// QPID_LOG(warn, "No handler for message.");
-// }
-// else
-// handler->returned(msg);
-// }
-// else {
-// MessageDeliverBody::shared_ptr deliverBody =
-// boost::shared_polymorphic_downcast<MessageDeliverBody>(
-// msg.getMethod());
-// std::string tag = deliverBody->getConsumerTag();
-// Consumer consumer;
-// {
-// Mutex::ScopedLock l(lock);
-// ConsumerMap::iterator i = consumers.find(tag);
-// if(i == consumers.end())
-// THROW_QPID_ERROR(PROTOCOL_ERROR+504,
-// "Unknown consumer tag=" + tag);
-// consumer = i->second;
-// }
-// deliver(consumer, msg);
-// }
-// }
-// catch (const ShutdownException&) {
-// /* Orderly shutdown */
-// }
-// catch (const Exception& e) {
-// QPID_LOG(error, e.what());
-// }
-// }
-}
-
-void MessageMessageChannel::setReturnedMessageHandler(
- ReturnedMessageHandler* )
-{
- throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns");
-}
-
-void MessageMessageChannel::setQos(){
- channel.sendAndReceive<MessageOkBody>(
- make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
- if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(
- make_shared_ptr(new TxSelectBody(channel.version)));
-}
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/MessageMessageChannel.h b/cpp/src/qpid/client/MessageMessageChannel.h
deleted file mode 100644
index 44b64b3d80..0000000000
--- a/cpp/src/qpid/client/MessageMessageChannel.h
+++ /dev/null
@@ -1,84 +0,0 @@
-#ifndef _client_MessageMessageChannel_h
-#define _client_MessageMessageChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MessageChannel.h"
-#include "IncomingMessage.h"
-#include "qpid/sys/Monitor.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
-namespace qpid {
-namespace client {
-/**
- * Messaging implementation using AMQP 0-9 MessageMessageChannel class
- * to send and receiving messages.
- */
-class MessageMessageChannel : public MessageChannel
-{
- public:
- MessageMessageChannel(Channel& parent);
-
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- void cancel(const std::string& tag, bool synch = true);
-
- bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
-
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- void run();
-
- void handle(boost::shared_ptr<framing::AMQMethodBody>);
-
- void handle(shared_ptr<framing::AMQHeaderBody>);
-
- void handle(shared_ptr<framing::AMQContentBody>);
-
- void setQos();
-
- void close();
-
- void cancelAll();
-
- private:
- typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
- Destinations;
-
- std::string newTag();
-
- sys::Mutex lock;
- Channel& channel;
- IncomingMessage incoming;
- long tagCount;
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_MessageMessageChannel_h*/
-
diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h
index 516ba6e4e3..eb4188663d 100644
--- a/cpp/src/qpid/client/MethodBodyInstances.h
+++ b/cpp/src/qpid/client/MethodBodyInstances.h
@@ -40,8 +40,8 @@ public:
const qpid::framing::BasicDeliverBody basic_deliver;
const qpid::framing::BasicGetEmptyBody basic_get_empty;
const qpid::framing::BasicGetOkBody basic_get_ok;
- const qpid::framing::BasicQosOkBody basic_qos_ok;
- const qpid::framing::BasicReturnBody basic_return;
+ //const qpid::framing::BasicQosOkBody basic_qos_ok;
+ //const qpid::framing::BasicReturnBody basic_return;
const qpid::framing::ChannelCloseBody channel_close;
const qpid::framing::ChannelCloseOkBody channel_close_ok;
const qpid::framing::ChannelFlowBody channel_flow;
@@ -52,14 +52,14 @@ public:
const qpid::framing::ConnectionRedirectBody connection_redirect;
const qpid::framing::ConnectionStartBody connection_start;
const qpid::framing::ConnectionTuneBody connection_tune;
- const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
- const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
+ //const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
+ //const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
const qpid::framing::QueueDeclareOkBody queue_declare_ok;
const qpid::framing::QueueDeleteOkBody queue_delete_ok;
- const qpid::framing::QueueBindOkBody queue_bind_ok;
- const qpid::framing::TxCommitOkBody tx_commit_ok;
- const qpid::framing::TxRollbackOkBody tx_rollback_ok;
- const qpid::framing::TxSelectOkBody tx_select_ok;
+ //const qpid::framing::QueueBindOkBody queue_bind_ok;
+ //const qpid::framing::TxCommitOkBody tx_commit_ok;
+ //const qpid::framing::TxRollbackOkBody tx_rollback_ok;
+ //const qpid::framing::TxSelectOkBody tx_select_ok;
MethodBodyInstances(uint8_t major, uint8_t minor) :
version(major, minor),
@@ -68,8 +68,8 @@ public:
basic_deliver(version),
basic_get_empty(version),
basic_get_ok(version),
- basic_qos_ok(version),
- basic_return(version),
+ //basic_qos_ok(version),
+ //basic_return(version),
channel_close(version),
channel_close_ok(version),
channel_flow(version),
@@ -80,14 +80,14 @@ public:
connection_redirect(version),
connection_start(version),
connection_tune(version),
- exchange_declare_ok(version),
- exchange_delete_ok(version),
+ //exchange_declare_ok(version),
+ //exchange_delete_ok(version),
queue_declare_ok(version),
- queue_delete_ok(version),
- queue_bind_ok(version),
- tx_commit_ok(version),
- tx_rollback_ok(version),
- tx_select_ok(version)
+ queue_delete_ok(version)//,
+ //queue_bind_ok(version),
+ //tx_commit_ok(version),
+ //tx_rollback_ok(version),
+ //tx_select_ok(version)
{}
};
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 9c60af7866..134acb94f9 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -182,14 +182,14 @@ class FramingTest : public CppUnit::TestCase
}
void testResponseBodyFrame() {
- AMQBody::shared_ptr response(new ChannelOkBody(version));
+ AMQBody::shared_ptr response(new ChannelOpenOkBody(version));
AMQFrame in(version, 999, response);
in.encode(buffer);
buffer.flip();
AMQFrame out;
out.decode(buffer);
- ChannelOkBody* decoded =
- dynamic_cast<ChannelOkBody*>(out.getBody().get());
+ ChannelOpenOkBody* decoded =
+ dynamic_cast<ChannelOpenOkBody*>(out.getBody().get());
CPPUNIT_ASSERT(decoded);
}
@@ -400,20 +400,22 @@ class FramingTest : public CppUnit::TestCase
c.declareQueue(queue);
c.bind(exchange, queue, "MyTopic", framing::FieldTable());
broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++);
}
};
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index eb9d567b82..fad2702f38 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -69,7 +69,7 @@ class QueueTest : public CppUnit::TestCase
public:
Message::shared_ptr message(std::string exchange, std::string routingKey) {
return Message::shared_ptr(
- new BasicMessage(0, exchange, routingKey, true, true));
+ new BasicMessage(0, exchange, routingKey, false, false));
}
void testConsumers(){
diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests
index 57cbba6848..33d60fcf09 100755
--- a/cpp/src/tests/python_tests
+++ b/cpp/src/tests/python_tests
@@ -1,7 +1,7 @@
#!/bin/sh
# Run the python tests.
if test -d ../../../python ; then
- cd ../../../python && ./run-tests -v -s "0-9" -e ../specs/amqp-dtx-preview.0-9.xml -I cpp_failing_0-9.txt -b localhost:$QPID_PORT $PYTHON_TESTS
+ cd ../../../python && ./run-tests -v -s ../specs/amqp-transitional.0-10.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS
else
echo Warning: python tests not found.
fi
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 2c9746e908..fffd3aafc4 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -20,7 +20,7 @@
-
-->
-<amqp major="0" minor="9" port="5672" comment="AMQ protocol 0.80">
+<amqp major="0" minor="10" port="5672">
<class name = "cluster" index = "301">