diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-24 14:27:31 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-24 14:27:31 +0000 |
commit | a6303894d7f9a24df4a691af3ce94647c033ebff (patch) | |
tree | 943b75df7528c9fbff6b3170c3c4b66758bf22ad | |
parent | 9f120205e0d7a0b2666b9fd21a5296db07e32fd8 (diff) | |
download | qpid-python-a6303894d7f9a24df4a691af3ce94647c033ebff.tar.gz |
Initial support for latest approved 0-10 xml (with some transitional hacks included).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559059 13f79535-47bb-0310-9956-ffa450edef68
33 files changed, 218 insertions, 677 deletions
diff --git a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java index fd3684125c..f31f9615fc 100644 --- a/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java +++ b/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java @@ -141,6 +141,21 @@ public class CppGenerator extends Generator "8", // size "buffer.putLongLong(#)", // encodeExpression "# = buffer.getLongLong()")); // decodeExpression + + //some 0-10 types: + + typeMap.put("uuid", new DomainInfo( + "string", // type + "4 + #.length()", // size + "buffer.putLongString(#)", // encodeExpression + "buffer.getLongString(#)")); // decodeExpression + + //NB: this is WRONG! but is here as a transitional aid + typeMap.put("rfc1982-long-set", new DomainInfo( + "u_int16_t", // type + "2", // size + "buffer.putShort(#)", // encodeExpression + "# = buffer.getShort()")); // decodeExpression } public boolean isQuietFlag() diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 0231a58217..17cb66f9ff 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -241,7 +241,6 @@ libqpidclient_la_SOURCES = \ qpid/client/ClientExchange.cpp \ qpid/client/ClientQueue.cpp \ qpid/client/BasicMessageChannel.cpp \ - qpid/client/MessageMessageChannel.cpp \ qpid/client/Connector.cpp \ qpid/client/IncomingMessage.cpp \ qpid/client/MessageListener.cpp \ @@ -329,7 +328,6 @@ nobase_include_HEADERS = \ qpid/client/IncomingMessage.h \ qpid/client/MessageChannel.h \ qpid/client/MessageListener.h \ - qpid/client/MessageMessageChannel.h \ qpid/client/MethodBodyInstances.h \ qpid/client/ResponseHandler.h \ qpid/client/ReturnedMessageHandler.h \ diff --git a/cpp/src/generate.sh b/cpp/src/generate.sh index 4f97f72684..1d8f946ecd 100755 --- a/cpp/src/generate.sh +++ b/cpp/src/generate.sh @@ -7,7 +7,7 @@ set -e gentools_dir="$srcdir/../gentools" specs_dir="$srcdir/../../specs" -specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml $srcdir/../xml/cluster.xml" +specs="$specs_dir/amqp-transitional.0-10.xml $srcdir/../xml/cluster.xml" test -z "$JAVA" && JAVA=java ; test -z "$JAVAC" && JAVAC=javac ; diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 9bf148bcf0..376108193a 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -55,8 +55,7 @@ ProtocolVersion BrokerAdapter::getVersion() const { void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); - // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk(std::string()/* ID */); + client.openOk(); } void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ @@ -80,41 +79,63 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(){} void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& args){ + const string& alternateExchange, + bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = broker.getExchanges().get(alternateExchange); + } if(passive){ - if(!broker.getExchanges().get(exchange)) { - throw ChannelException(404, "Exchange not found: " + exchange); - } + Exchange::shared_ptr actual(broker.getExchanges().get(exchange)); + checkType(actual, type); + checkAlternate(actual, alternate); }else{ try{ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args); if (response.second) { - if (durable) broker.getStore().create(*response.first); - } else if (response.first->getType() != type) { - throw ConnectionException( - 530, - "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); + if (durable) { + broker.getStore().create(*response.first); + } + if (alternate) { + response.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + } else { + checkType(response.first, type); + checkAlternate(response.first, alternate); } }catch(UnknownExchangeTypeException& e){ throw ConnectionException( 503, "Exchange type not implemented: " + type); } } - if(!nowait){ - client.declareOk(); +} + +void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) +{ + if (!type.empty() && exchange->getType() != type) { + throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type); + } +} + +void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) +{ + if (alternate && alternate != exchange->getAlternate()) { + throw ConnectionException(530, "Exchange declared with alternate-exchange " + + exchange->getAlternate()->getName() + ", requested " + + alternate->getName()); } + } -void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, - const string& name, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); if (exchange->isDurable()) broker.getStore().destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk(); } void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) @@ -159,12 +180,17 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } } -void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, +void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = broker.getExchanges().get(alternateExchange); + } Queue::shared_ptr queue; if (passive && !name.empty()) { queue = getQueue(name); + //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare( @@ -175,6 +201,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& assert(queue); if (queue_created.second) { // This is a new queue channel.setDefaultQueue(queue); + if (alternate) { + queue->setAlternateExchange(alternate); + alternate->incAlternateUsers(); + } + //apply settings & create persistent record if required queue_created.first->create(arguments); @@ -201,7 +232,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, + const string& exchangeName, const string& routingKey, const FieldTable& arguments){ Queue::shared_ptr queue = getQueue(queueName); @@ -214,7 +245,6 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk(); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -238,7 +268,6 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk(); } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ @@ -280,7 +309,6 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk(); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -314,12 +342,12 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool now void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate) + bool rejectUnroutable, bool immediate) { Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate); channel.handlePublish(msg); }else{ throw ChannelException( @@ -351,19 +379,16 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) void BrokerAdapter::TxHandlerImpl::select() { channel.startTx(); - client.selectOk(); } void BrokerAdapter::TxHandlerImpl::commit() { channel.commit(); - client.commitOk(); } void BrokerAdapter::TxHandlerImpl::rollback() { channel.rollback(); - client.rollbackOk(); channel.recover(false); } @@ -372,28 +397,6 @@ void BrokerAdapter::ChannelHandlerImpl::ok() //no specific action required, generic response handling should be sufficient } - -// -// Message class method handlers -// -void BrokerAdapter::ChannelHandlerImpl::ping() -{ - client.ok(); - client.pong(); -} - - -void -BrokerAdapter::ChannelHandlerImpl::pong() -{ - client.ok(); -} - -void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - void BrokerAdapter::setResponseTo(RequestId r) { basicHandler.client.setResponseTo(r); diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index a7e27a0ee6..4ae8346580 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -72,10 +72,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations throw ConnectionException(540, "File class not implemented"); } StreamHandler* getStreamHandler() { throw ConnectionException(540, "Stream class not implemented"); } - DtxHandler* getDtxHandler() { - throw ConnectionException(540, "Dtx class not implemented"); } TunnelHandler* getTunnelHandler() { throw ConnectionException(540, "Tunnel class not implemented"); } + SessionHandler* getSessionHandler() { throw ConnectionException(503, "Session class not implemented yet"); } DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } @@ -117,13 +116,16 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} void declare(uint16_t ticket, - const std::string& exchange, const std::string& type, - bool passive, bool durable, bool autoDelete, - bool internal, bool nowait, + const std::string& exchange, const std::string& type, + const std::string& alternateExchange, + bool passive, bool durable, bool autoDelete, const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, - const std::string& exchange, bool ifUnused, bool nowait); + const std::string& exchange, bool ifUnused); void query(u_int16_t ticket, const string& name); + private: + void checkType(Exchange::shared_ptr exchange, const std::string& type); + void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate); }; class BindingHandlerImpl : @@ -147,13 +149,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void declare(uint16_t ticket, const std::string& queue, + void declare(uint16_t ticket, const std::string& queue, + const std::string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); void bind(uint16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, - bool nowait, const qpid::framing::FieldTable& arguments); + const qpid::framing::FieldTable& arguments); void unbind(uint16_t ticket, const std::string& queue, const std::string& exchange, @@ -186,7 +189,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations bool nowait); void publish(uint16_t ticket, const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); + bool rejectUnroutable, bool immediate); void get(uint16_t ticket, const std::string& queue, bool noAck); void ack(uint64_t deliveryTag, bool multiple); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 9b6bdf5a2b..a598717c5d 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -280,22 +280,31 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { } void Channel::complete(Message::shared_ptr msg) { - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - assert(exchange.get()); if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); - exchange->route(*deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); + route(msg, *deliverable); txBuffer->enlist(op); } else { DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); + route(msg, deliverable); } } +void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { + Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); + assert(exchange.get()); + exchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + if (!strategy.delivered) { + //TODO:if reject-unroutable, then reject + //else route to alternate exchange + if (exchange->getAlternate()) { + exchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + } + } + +} + // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index a2b6bd3ef9..a70dce0ce8 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -33,6 +33,7 @@ #include "Consumer.h" #include "DeliveryAdapter.h" #include "DeliveryRecord.h" +#include "Deliverable.h" #include "DtxBuffer.h" #include "DtxManager.h" #include "MessageBuilder.h" @@ -102,7 +103,7 @@ class Channel : public CompletionHandler MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - + void route(Message::shared_ptr msg, Deliverable& strategy); void complete(Message::shared_ptr msg);// completion handler for MessageBuilder void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h index 968775cfe5..91c295e1b7 100644 --- a/cpp/src/qpid/broker/BrokerExchange.h +++ b/cpp/src/qpid/broker/BrokerExchange.h @@ -48,7 +48,7 @@ namespace qpid { explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){} Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) - : name(_name), durable(_durable), args(_args), persistenceId(0){} + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} virtual ~Exchange(){} string getName() const { return name; } @@ -59,6 +59,7 @@ namespace qpid { void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; } void incAlternateUsers() { alternateUsers++; } void decAlternateUsers() { alternateUsers--; } + bool inUseAsAlternate() { return alternateUsers > 0; } virtual string getType() const = 0; virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index a6b039cd4d..73af3935a8 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -165,6 +165,8 @@ class Message : public PersistableMessage{ */ virtual void releaseContent(MessageStore* /*store*/) {}; + bool isImmediate() const { return immediate; } + private: const ConnectionToken* publisher; std::string exchange; diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 01f8250b84..efa295e44f 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -43,7 +43,7 @@ MessageMessage::MessageMessage( ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), - transfer_->getMandatory(), + transfer_->getRejectUnroutable(), transfer_->getImmediate(), transfer_), requestId(requestId_), @@ -57,7 +57,7 @@ MessageMessage::MessageMessage( ReferencePtr reference_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), - transfer_->getMandatory(), + transfer_->getRejectUnroutable(), transfer_->getImmediate(), transfer_), requestId(requestId_), @@ -113,6 +113,7 @@ void MessageMessage::transferMessage( transfer->getTicket(), consumerTag, getRedelivered(), + transfer->getRejectUnroutable(), transfer->getImmediate(), transfer->getTtl(), transfer->getPriority(), @@ -126,13 +127,14 @@ void MessageMessage::transferMessage( transfer->getReplyTo(), transfer->getContentType(), transfer->getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ transfer->getUserId(), transfer->getAppId(), transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - body, - transfer->getMandatory()))); + body))); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; @@ -143,6 +145,7 @@ void MessageMessage::transferMessage( transfer->getTicket(), consumerTag, getRedelivered(), + transfer->getRejectUnroutable(), transfer->getImmediate(), transfer->getTtl(), transfer->getPriority(), @@ -156,13 +159,14 @@ void MessageMessage::transferMessage( transfer->getReplyTo(), transfer->getContentType(), transfer->getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ transfer->getUserId(), transfer->getAppId(), transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname), - transfer->getMandatory())); + framing::Content(REFERENCE, refname))); ReferencePtr newRef(new Reference(refname)); Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); newRef->append(newAppend); @@ -288,6 +292,7 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version transfer->getTicket(), destination, getRedelivered(), + transfer->getRejectUnroutable(), transfer->getImmediate(), transfer->getTtl(), transfer->getPriority(), @@ -301,13 +306,14 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version transfer->getReplyTo(), transfer->getContentType(), transfer->getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ transfer->getUserId(), transfer->getAppId(), transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - body, - transfer->getMandatory()); + body); } diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index cf6beff375..f8bffa01a3 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -56,8 +56,15 @@ Queue::Queue(const string& _name, bool _autodelete, Queue::~Queue(){} void Queue::deliver(Message::shared_ptr& msg){ - enqueue(0, msg); - process(msg); + if (msg->isImmediate() && getConsumerCount() == 0) { + if (alternateExchange) { + DeliverableMessage deliverable(msg); + alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + } + } else { + enqueue(0, msg); + process(msg); + } } void Queue::recover(Message::shared_ptr& msg){ @@ -255,6 +262,7 @@ void Queue::destroy() &(msg.getMessage().getApplicationHeaders())); pop(); } + alternateExchange->decAlternateUsers(); } if (store) { @@ -318,3 +326,8 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) { alternateExchange = exchange; } + +boost::shared_ptr<Exchange> Queue::getAlternateExchange() +{ + return alternateExchange; +} diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 0ed368e404..f82a7dac55 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -155,7 +155,7 @@ namespace qpid { const QueuePolicy* const getPolicy(); void setAlternateExchange(boost::shared_ptr<Exchange> exchange); - + boost::shared_ptr<Exchange> getAlternateExchange(); //PersistableQueue support: uint64_t getPersistenceId() const; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 7a987f28d2..5b22167323 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -87,6 +87,7 @@ void Connection::closed(){ broker.getQueues().destroy(q->getName()); exclusiveQueues.erase(exclusiveQueues.begin()); q->unbind(broker.getExchanges(), q); + q->destroy(); } } catch(std::exception& e) { QPID_LOG(error, " Unhandled exception while closing session: " << diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp index bb2a66bfdb..65933660f1 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -106,15 +106,14 @@ void Handler::open(const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/) { string knownhosts; - client.openOk( - knownhosts);//GRS, context.getRequestId()); + client.openOk(knownhosts); } void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk();//GRS context.getRequestId()); + client.closeOk(); connection.getOutput().close(); } diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index b624102cd2..6890b014a4 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -67,11 +67,11 @@ public: AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + SessionHandler* getSessionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } framing::ProtocolVersion getVersion() const; }; diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index 1570917849..cd1dbaa85d 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -27,6 +27,8 @@ namespace qpid { namespace broker { class Deliverable{ public: + bool delivered; + Deliverable() : delivered(false) {} virtual void deliverTo(Queue::shared_ptr& queue) = 0; virtual ~Deliverable(){} }; diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index a713f306a8..9a3752d71c 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -29,9 +29,11 @@ DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg) void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) { queue->deliver(msg); + delivered = true; } Message& DeliverableMessage::getMessage() { return *msg; } + diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index d1f925e40c..72d3888e37 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -52,7 +52,6 @@ const int XA_OK(8); void DtxHandlerImpl::select() { channel.selectDtx(); - dClient.selectOk(); } void DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -140,7 +139,7 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/, void DtxHandlerImpl::recover(u_int16_t /*ticket*/, bool /*startscan*/, - u_int32_t /*endscan*/ ) + bool /*endscan*/ ) { //TODO: what do startscan and endscan actually mean? @@ -193,7 +192,6 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, u_int32_t timeout) { broker.getDtxManager().setTimeout(xid, timeout); - cClient.setTimeoutOk(); } void DtxHandlerImpl::setResponseTo(framing::RequestId r) diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index e18d3c153d..6139b95bd6 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -48,7 +48,7 @@ public: void prepare(u_int16_t ticket, const std::string& xid); - void recover(u_int16_t ticket, bool startscan, u_int32_t endscan); + void recover(u_int16_t ticket, bool startscan, bool endscan); void rollback(u_int16_t ticket, const std::string& xid); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 732d45dc44..edc9a5b63b 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -72,8 +72,9 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) - throw ChannelException(404, "Exchange not found:" + name); + if (i == exchanges.end()) { + throw ChannelException(404, "Exchange not found: " + name); + } return i->second; } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index de32368158..41dd8cc145 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -45,14 +45,14 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - client.ok(); + //client.ok(); } void MessageHandlerImpl::open(const string& reference) { references.open(reference); - client.ok(); + //client.ok(); } void @@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context) { MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); references.get(body->getReference())->append(body); - client.ok(); + //client.ok(); } void MessageHandlerImpl::close(const string& reference) { - Reference::shared_ptr ref = references.get(reference); - client.ok(); + Reference::shared_ptr ref = references.get(reference); + //client.ok(); // Send any transfer messages to their correct exchanges and okay them const Reference::Messages& msgs = ref->getMessages(); @@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/, { // Initial implementation (which is conforming) is to do nothing here // and return offset zero for the resume - client.ok(); + //client.ok(); } void @@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(); + //client.ok(); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -137,10 +137,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, Queue::shared_ptr queue = getQueue(queueName); GetAdapter out(adapter, queue, destination, connection.getFrameMax()); - if(channel.get(out, queue, !noAck)) + if(channel.get(out, queue, !noAck)) { client.ok(); - else + } else { client.empty(); + } } void @@ -166,14 +167,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(); + //client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - client.ok(); + //client.ok(); } void @@ -192,7 +193,7 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) if (transfer->getBody().isInline()) { MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(); + client.ok(); } else { Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f616ec2db8..2b1de1bbc0 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -75,7 +75,7 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ } } -void SemanticHandler::complete(u_int32_t mark) +void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/) { //just record it for now (will eventually need to use it to ack messages): outgoing.lwm = SequenceNumber(mark); @@ -85,7 +85,10 @@ void SemanticHandler::flush() { //flush doubles as a sync to begin with - send an execution.complete incoming.lwm = incoming.hwm; - send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue()))); + if (isOpen()) { + /*use dummy value for range which is not yet encoded correctly*/ + send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0))); + } } void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 6003bbec0c..a57559d043 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -60,7 +60,7 @@ public: void handle(framing::AMQFrame& frame); //execution class method handlers: - void complete(u_int32_t cumulativeExecutionMark); + void complete(uint32_t cumulativeExecutionMark, uint16_t); void flush(); }; diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 6e03f37dcd..db02673b1f 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -44,6 +44,7 @@ void TxPublish::rollback() throw(){ void TxPublish::deliverTo(Queue::shared_ptr& queue){ queues.push_back(queue); + delivered = true; } TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg) diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp index a1aacdee4e..70cb473426 100644 --- a/cpp/src/qpid/client/BasicMessageChannel.cpp +++ b/cpp/src/qpid/client/BasicMessageChannel.cpp @@ -197,11 +197,6 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { msg.setRedelivered(deliver->getRedelivered()); return; } - case BasicReturnBody::METHOD_ID: { - incoming.openReference(BASIC_REF); - incoming.createMessage(BASIC_RETURN, BASIC_REF); - return; - } case BasicConsumeOkBody::METHOD_ID: { Mutex::ScopedLock l(lock); BasicConsumeOkBody::shared_ptr consumeOk = @@ -332,10 +327,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand } void BasicMessageChannel::setQos(){ - channel.sendAndReceive<BasicQosOkBody>( - make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false))); + channel.send(make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version))); + channel.send(make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 816ff05e85..0033cbdbe4 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -27,7 +27,6 @@ #include "MethodBodyInstances.h" #include "Connection.h" #include "BasicMessageChannel.h" -#include "MessageMessageChannel.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -39,12 +38,18 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; +namespace qpid{ +namespace client{ + +const std::string empty; + +}} + Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) : connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) { switch (mode) { case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break; - case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break; default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode."); } } @@ -138,17 +143,14 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - sendAndReceiveSync<ExchangeDeclareOkBody>( - synch, - make_shared_ptr(new ExchangeDeclareBody( - version, 0, name, type, false, false, false, false, !synch, args))); + send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); + if (synch) synchWithServer(); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - sendAndReceiveSync<ExchangeDeleteOkBody>( - synch, - make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch))); + send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); + if (synch) synchWithServer(); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -158,7 +160,7 @@ void Channel::declareQueue(Queue& queue, bool synch){ sendAndReceiveSync<QueueDeclareOkBody>( synch, make_shared_ptr(new QueueDeclareBody( - version, 0, name, false/*passive*/, queue.isDurable(), + version, 0, name, empty, false/*passive*/, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), !synch, args))); if(synch) { if(queue.getName().length() == 0) @@ -177,17 +179,16 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - sendAndReceiveSync<QueueBindOkBody>( - synch, - make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args))); + send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); + if (synch) synchWithServer(); } void Channel::commit(){ - sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version))); + send(make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version))); + send(make_shared_ptr(new TxRollbackBody(version))); } void Channel::handleMethodInContext( @@ -206,7 +207,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt) } try { switch (method->amqpClassId()) { - case MessageOkBody::CLASS_ID: + case MessageTransferBody::CLASS_ID: case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; diff --git a/cpp/src/qpid/client/MessageMessageChannel.cpp b/cpp/src/qpid/client/MessageMessageChannel.cpp deleted file mode 100644 index 2a8f7a01c1..0000000000 --- a/cpp/src/qpid/client/MessageMessageChannel.cpp +++ /dev/null @@ -1,431 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <boost/format.hpp> -#include "MessageMessageChannel.h" -#include "qpid/framing/AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "qpid/framing/FieldTable.h" -#include "Connection.h" -#include "qpid/shared_ptr.h" -#include <boost/bind.hpp> - -namespace qpid { -namespace client { - -using namespace std; -using namespace sys; -using namespace framing; - -MessageMessageChannel::MessageMessageChannel(Channel& ch) - : channel(ch), tagCount(0) {} - -string MessageMessageChannel::newTag() { - Mutex::ScopedLock l(lock); - return (boost::format("__tag%d")%++tagCount).str(); -} - -void MessageMessageChannel::consume( - Queue& queue, std::string& tag, MessageListener* /*listener*/, - AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields) -{ - if (tag.empty()) - tag = newTag(); - channel.sendAndReceive<MessageOkBody>( - make_shared_ptr(new MessageConsumeBody( - channel.getVersion(), 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); - -// // FIXME aconway 2007-02-20: Race condition! -// // We could receive the first message for the consumer -// // before we create the consumer below. -// // Move consumer creation to handler for MessageConsumeOkBody -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if (i != consumers.end()) -// THROW_QPID_ERROR(CLIENT_ERROR, -// "Consumer already exists with tag="+tag); -// Consumer& c = consumers[tag]; -// c.listener = listener; -// c.ackMode = ackMode; -// c.lastDeliveryTag = 0; -// } -} - - -void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) { - // FIXME aconway 2007-02-23: -// Consumer c; -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if (i == consumers.end()) -// return; -// c = i->second; -// consumers.erase(i); -// } -// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) -// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); -// channel.sendAndReceiveSync<MessageCancelOkBody>( -// synch, new MessageCancelBody(channel.version, tag, !synch)); -} - -void MessageMessageChannel::close(){ - // FIXME aconway 2007-02-23: -// ConsumerMap consumersCopy; -// { -// Mutex::ScopedLock l(lock); -// consumersCopy = consumers; -// consumers.clear(); -// } -// for (ConsumerMap::iterator i=consumersCopy.begin(); -// i != consumersCopy.end(); ++i) -// { -// Consumer& c = i->second; -// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) -// && c.lastDeliveryTag > 0) -// { -// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); -// } -// } -// incoming.shutdown(); -} - -void MessageMessageChannel::cancelAll(){ -} - -/** Destination ID for the current get. - * Must not clash with a generated consumer ID. - * TODO aconway 2007-03-06: support multiple outstanding gets? - */ -const string getDestinationId("__get__"); - -/** - * A destination that provides a Correlator::Action to handle - * MessageEmpty responses. - */ -struct MessageGetDestination : public IncomingMessage::WaitableDestination -{ - void response(shared_ptr<AMQResponseBody> response) { - if (response->amqpClassId() == MessageOkBody::CLASS_ID) { - switch (response->amqpMethodId()) { - case MessageOkBody::METHOD_ID: - // Nothing to do, wait for transfer. - return; - case MessageEmptyBody::METHOD_ID: - empty(); // Wake up waiter with empty queue. - return; - } - } - throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); - } - - Correlator::Action action() { - return boost::bind(&MessageGetDestination::response, this, _1); - } -}; - -bool MessageMessageChannel::get( - Message& msg, const Queue& queue, AckMode ackMode) -{ - Mutex::ScopedLock l(lock); - std::string destName=newTag(); - MessageGetDestination dest; - incoming.addDestination(destName, dest); - channel.send( - make_shared_ptr( - new MessageGetBody( - channel.version, 0, queue.getName(), destName, ackMode)), - dest.action()); - return dest.wait(msg); -} - - -/** Convert a message to a transfer command. */ -MessageTransferBody::shared_ptr makeTransfer( - ProtocolVersion version, - const Message& msg, const string& destination, - const std::string& routingKey, bool mandatory, bool immediate) -{ - return MessageTransferBody::shared_ptr( - new MessageTransferBody( - version, - 0, // FIXME aconway 2007-04-03: ticket. - destination, - msg.isRedelivered(), - immediate, - 0, // FIXME aconway 2007-02-23: ttl - msg.getPriority(), - msg.getTimestamp(), - static_cast<uint8_t>(msg.getDeliveryMode()), - 0, // FIXME aconway 2007-04-03: Expiration - string(), // Exchange: for broker use only. - routingKey, - msg.getMessageId(), - msg.getCorrelationId(), - msg.getReplyTo(), - msg.getContentType(), - msg.getContentEncoding(), - msg.getUserId(), - msg.getAppId(), - string(), // FIXME aconway 2007-04-03: TransactionId - string(), //FIXME aconway 2007-04-03: SecurityToken - msg.getHeaders(), - Content(INLINE, msg.getData()), - mandatory - )); -} - -// FIXME aconway 2007-04-05: Generated code should provide this. -/** - * Calculate the size of a frame containing the given body type - * if all variable-lengths parts are empty. - */ -template <class T> size_t overhead() { - static AMQFrame frame( - ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion()))); - return frame.size(); -} - -void MessageMessageChannel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - MessageTransferBody::shared_ptr transfer = makeTransfer( - channel.getVersion(), - msg, exchange.getName(), routingKey, mandatory, immediate); - // Frame itself uses 8 bytes. - u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8; - if (transfer->size() <= frameMax) { - channel.sendAndReceive<MessageOkBody>(transfer); - } - else { - std::string ref = newTag(); - std::string data = transfer->getBody().getValue(); - size_t chunk = - channel.connection->getMaxFrameSize() - - (overhead<MessageAppendBody>() + ref.size()); - // TODO aconway 2007-04-05: cast around lack of generated setters - const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref); - channel.send( - make_shared_ptr(new MessageOpenBody(channel.version, ref))); - channel.send(transfer); - const char* p = data.data(); - const char* end = data.data()+data.size(); - while (p+chunk <= end) { - channel.send( - make_shared_ptr( - new MessageAppendBody(channel.version, ref, std::string(p, chunk)))); - p += chunk; - } - if (p < end) { - channel.send( - make_shared_ptr( - new MessageAppendBody(channel.version, ref, std::string(p, end-p)))); - } - channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref))); - } -} - -void copy(Message& msg, MessageTransferBody& transfer) { - // FIXME aconway 2007-04-05: Verify all required fields - // are copied. - msg.setContentType(transfer.getContentType()); - msg.setContentEncoding(transfer.getContentEncoding()); - msg.setHeaders(transfer.getApplicationHeaders()); - msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); - msg.setPriority(transfer.getPriority()); - msg.setCorrelationId(transfer.getCorrelationId()); - msg.setReplyTo(transfer.getReplyTo()); - // FIXME aconway 2007-04-05: TTL/Expiration - msg.setMessageId(transfer.getMessageId()); - msg.setTimestamp(transfer.getTimestamp()); - msg.setUserId(transfer.getUserId()); - msg.setAppId(transfer.getAppId()); - msg.setDestination(transfer.getDestination()); - msg.setRedelivered(transfer.getRedelivered()); - msg.setDeliveryTag(0); // No meaning in 0-9 - if (transfer.getBody().isInline()) - msg.setData(transfer.getBody().getValue()); -} - -void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); - switch(method->amqpMethodId()) { - case MessageAppendBody::METHOD_ID: { - MessageAppendBody::shared_ptr append = - shared_polymorphic_downcast<MessageAppendBody>(method); - incoming.appendReference(append->getReference(), append->getBytes()); - break; - } - case MessageOpenBody::METHOD_ID: { - MessageOpenBody::shared_ptr open = - shared_polymorphic_downcast<MessageOpenBody>(method); - incoming.openReference(open->getReference()); - break; - } - - case MessageCloseBody::METHOD_ID: { - MessageCloseBody::shared_ptr close = - shared_polymorphic_downcast<MessageCloseBody>(method); - incoming.closeReference(close->getReference()); - break; - } - - case MessageTransferBody::METHOD_ID: { - MessageTransferBody::shared_ptr transfer= - shared_polymorphic_downcast<MessageTransferBody>(method); - if (transfer->getBody().isInline()) { - Message msg; - copy(msg, *transfer); - // Deliver it. - incoming.getDestination(transfer->getDestination()).message(msg); - } - else { - Message& msg=incoming.createMessage( - transfer->getDestination(), transfer->getBody().getValue()); - copy(msg, *transfer); - // Will be delivered when reference closes. - } - break; - } - - case MessageEmptyBody::METHOD_ID: - case MessageOkBody::METHOD_ID: - // Nothing to do - break; - - // FIXME aconway 2007-04-03: TODO - case MessageCancelBody::METHOD_ID: - case MessageCheckpointBody::METHOD_ID: - case MessageOffsetBody::METHOD_ID: - case MessageQosBody::METHOD_ID: - case MessageRecoverBody::METHOD_ID: - case MessageRejectBody::METHOD_ID: - case MessageResumeBody::METHOD_ID: - break; - default: - throw Channel::UnknownMethod(); - } -} - -void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){ - throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); -} - -void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){ - throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); -} - -// FIXME aconway 2007-02-23: -// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){ -// //record delivery tag: -// consumer.lastDeliveryTag = msg.getDeliveryTag(); - -// //allow registered listener to handle the message -// consumer.listener->received(msg); - -// if(channel.isOpen()){ -// bool multiple(false); -// switch(consumer.ackMode){ -// case LAZY_ACK: -// multiple = true; -// if(++(consumer.count) < channel.getPrefetch()) -// break; -// //else drop-through -// case AUTO_ACK: -// consumer.lastDeliveryTag = 0; -// channel.send( -// new MessageAckBody( -// channel.version, msg.getDeliveryTag(), multiple)); -// case NO_ACK: // Nothing to do -// case CLIENT_ACK: // User code must ack. -// break; -// // TODO aconway 2007-02-22: Provide a way for user -// // to ack! -// } -// } - -// //as it stands, transactionality is entirely orthogonal to ack -// //mode, though the acks will not be processed by the broker under -// //a transaction until it commits. -// } - - -void MessageMessageChannel::run() { - // FIXME aconway 2007-02-23: -// while(channel.isOpen()) { -// try { -// Message msg = incoming.waitDispatch(); -// if(msg.getMethod()->isA<MessageReturnBody>()) { -// ReturnedMessageHandler* handler=0; -// { -// Mutex::ScopedLock l(lock); -// handler=returnsHandler; -// } -// if(handler == 0) { -// // TODO aconway 2007-02-20: proper logging. -// QPID_LOG(warn, "No handler for message."); -// } -// else -// handler->returned(msg); -// } -// else { -// MessageDeliverBody::shared_ptr deliverBody = -// boost::shared_polymorphic_downcast<MessageDeliverBody>( -// msg.getMethod()); -// std::string tag = deliverBody->getConsumerTag(); -// Consumer consumer; -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if(i == consumers.end()) -// THROW_QPID_ERROR(PROTOCOL_ERROR+504, -// "Unknown consumer tag=" + tag); -// consumer = i->second; -// } -// deliver(consumer, msg); -// } -// } -// catch (const ShutdownException&) { -// /* Orderly shutdown */ -// } -// catch (const Exception& e) { -// QPID_LOG(error, e.what()); -// } -// } -} - -void MessageMessageChannel::setReturnedMessageHandler( - ReturnedMessageHandler* ) -{ - throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns"); -} - -void MessageMessageChannel::setQos(){ - channel.sendAndReceive<MessageOkBody>( - make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>( - make_shared_ptr(new TxSelectBody(channel.version))); -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/MessageMessageChannel.h b/cpp/src/qpid/client/MessageMessageChannel.h deleted file mode 100644 index 44b64b3d80..0000000000 --- a/cpp/src/qpid/client/MessageMessageChannel.h +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef _client_MessageMessageChannel_h -#define _client_MessageMessageChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MessageChannel.h" -#include "IncomingMessage.h" -#include "qpid/sys/Monitor.h" -#include <boost/ptr_container/ptr_map.hpp> - -namespace qpid { -namespace client { -/** - * Messaging implementation using AMQP 0-9 MessageMessageChannel class - * to send and receiving messages. - */ -class MessageMessageChannel : public MessageChannel -{ - public: - MessageMessageChannel(Channel& parent); - - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - void cancel(const std::string& tag, bool synch = true); - - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - void run(); - - void handle(boost::shared_ptr<framing::AMQMethodBody>); - - void handle(shared_ptr<framing::AMQHeaderBody>); - - void handle(shared_ptr<framing::AMQContentBody>); - - void setQos(); - - void close(); - - void cancelAll(); - - private: - typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination> - Destinations; - - std::string newTag(); - - sys::Mutex lock; - Channel& channel; - IncomingMessage incoming; - long tagCount; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_MessageMessageChannel_h*/ - diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h index 516ba6e4e3..eb4188663d 100644 --- a/cpp/src/qpid/client/MethodBodyInstances.h +++ b/cpp/src/qpid/client/MethodBodyInstances.h @@ -40,8 +40,8 @@ public: const qpid::framing::BasicDeliverBody basic_deliver; const qpid::framing::BasicGetEmptyBody basic_get_empty; const qpid::framing::BasicGetOkBody basic_get_ok; - const qpid::framing::BasicQosOkBody basic_qos_ok; - const qpid::framing::BasicReturnBody basic_return; + //const qpid::framing::BasicQosOkBody basic_qos_ok; + //const qpid::framing::BasicReturnBody basic_return; const qpid::framing::ChannelCloseBody channel_close; const qpid::framing::ChannelCloseOkBody channel_close_ok; const qpid::framing::ChannelFlowBody channel_flow; @@ -52,14 +52,14 @@ public: const qpid::framing::ConnectionRedirectBody connection_redirect; const qpid::framing::ConnectionStartBody connection_start; const qpid::framing::ConnectionTuneBody connection_tune; - const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; - const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; + //const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; + //const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; const qpid::framing::QueueDeclareOkBody queue_declare_ok; const qpid::framing::QueueDeleteOkBody queue_delete_ok; - const qpid::framing::QueueBindOkBody queue_bind_ok; - const qpid::framing::TxCommitOkBody tx_commit_ok; - const qpid::framing::TxRollbackOkBody tx_rollback_ok; - const qpid::framing::TxSelectOkBody tx_select_ok; + //const qpid::framing::QueueBindOkBody queue_bind_ok; + //const qpid::framing::TxCommitOkBody tx_commit_ok; + //const qpid::framing::TxRollbackOkBody tx_rollback_ok; + //const qpid::framing::TxSelectOkBody tx_select_ok; MethodBodyInstances(uint8_t major, uint8_t minor) : version(major, minor), @@ -68,8 +68,8 @@ public: basic_deliver(version), basic_get_empty(version), basic_get_ok(version), - basic_qos_ok(version), - basic_return(version), + //basic_qos_ok(version), + //basic_return(version), channel_close(version), channel_close_ok(version), channel_flow(version), @@ -80,14 +80,14 @@ public: connection_redirect(version), connection_start(version), connection_tune(version), - exchange_declare_ok(version), - exchange_delete_ok(version), + //exchange_declare_ok(version), + //exchange_delete_ok(version), queue_declare_ok(version), - queue_delete_ok(version), - queue_bind_ok(version), - tx_commit_ok(version), - tx_rollback_ok(version), - tx_select_ok(version) + queue_delete_ok(version)//, + //queue_bind_ok(version), + //tx_commit_ok(version), + //tx_rollback_ok(version), + //tx_select_ok(version) {} }; diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 9c60af7866..134acb94f9 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -182,14 +182,14 @@ class FramingTest : public CppUnit::TestCase } void testResponseBodyFrame() { - AMQBody::shared_ptr response(new ChannelOkBody(version)); + AMQBody::shared_ptr response(new ChannelOpenOkBody(version)); AMQFrame in(version, 999, response); in.encode(buffer); buffer.flip(); AMQFrame out; out.decode(buffer); - ChannelOkBody* decoded = - dynamic_cast<ChannelOkBody*>(out.getBody().get()); + ChannelOpenOkBody* decoded = + dynamic_cast<ChannelOpenOkBody*>(out.getBody().get()); CPPUNIT_ASSERT(decoded); } @@ -400,20 +400,22 @@ class FramingTest : public CppUnit::TestCase c.declareQueue(queue); c.bind(exchange, queue, "MyTopic", framing::FieldTable()); broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++); } }; diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index eb9d567b82..fad2702f38 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -69,7 +69,7 @@ class QueueTest : public CppUnit::TestCase public: Message::shared_ptr message(std::string exchange, std::string routingKey) { return Message::shared_ptr( - new BasicMessage(0, exchange, routingKey, true, true)); + new BasicMessage(0, exchange, routingKey, false, false)); } void testConsumers(){ diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests index 57cbba6848..33d60fcf09 100755 --- a/cpp/src/tests/python_tests +++ b/cpp/src/tests/python_tests @@ -1,7 +1,7 @@ #!/bin/sh # Run the python tests. if test -d ../../../python ; then - cd ../../../python && ./run-tests -v -s "0-9" -e ../specs/amqp-dtx-preview.0-9.xml -I cpp_failing_0-9.txt -b localhost:$QPID_PORT $PYTHON_TESTS + cd ../../../python && ./run-tests -v -s ../specs/amqp-transitional.0-10.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS else echo Warning: python tests not found. fi diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 2c9746e908..fffd3aafc4 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -20,7 +20,7 @@ - --> -<amqp major="0" minor="9" port="5672" comment="AMQ protocol 0.80"> +<amqp major="0" minor="10" port="5672"> <class name = "cluster" index = "301"> |