diff options
author | Alan Conway <aconway@apache.org> | 2007-02-13 02:41:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-13 02:41:14 +0000 |
commit | 9517deedff9691dbe3429b0b917dfd4208b0b1b8 (patch) | |
tree | f8868a2fbc63e92c770b401eeff2aee3a522697a | |
parent | d26ea3376f66f69486fe214c8a7a8b96a7605c99 (diff) | |
download | qpid-python-9517deedff9691dbe3429b0b917dfd4208b0b1b8.tar.gz |
* gentools/templ.cpp/*Proxy*, CppGenerator.java: Changes to Proxy
classes to make them directly usable as an API for low-level AMQP access.
- Proxies hold reference to a ChannelAdapter not just an output handler.
- Removed MethodContext parameter, makes no sense on requester end.
- Return RequestId from request methods so caller can correlate
incoming responses.
- Add RequestId parameter to response methods so caller can provide
correlation for outgoing responses.
- No longer inherit from *Operations classes as the signatures no
longer match. Proxy is for caller (client/requester) and Operations
is for callee (server/responder)
* cpp/lib/client/ClientChannel.h: Channel provides a raw proxy to the broker.
Normal users will still use the Channel API to deal with the broker, but
advanced users (incl ourselves!) can use the raw API to directly send
and receive any AMQP message.
* cpp/lib/broker/BrokerChannel,BrokerAdapter: Refactor for new proxies.
broker::Channel is also a ClientProxy
* Sundry files:
- Pass ProtcolVersion by value, it is only two bytes.
- Misc. const correctness fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@506823 13f79535-47bb-0310-9956-ffa450edef68
49 files changed, 798 insertions, 563 deletions
diff --git a/cpp/lib/Makefile.am b/cpp/lib/Makefile.am index 377cd4ede4..09a689bc76 100644 --- a/cpp/lib/Makefile.am +++ b/cpp/lib/Makefile.am @@ -1 +1 @@ -SUBDIRS = client common broker +SUBDIRS = common broker client diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index 27d2fec006..e2ca88d4d0 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -30,7 +30,6 @@ #include <MessageStore.h> #include <AutoDelete.h> #include <ExchangeRegistry.h> -#include <BrokerChannel.h> #include <ConnectionToken.h> #include <DirectExchange.h> #include <OutputHandler.h> diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 8b081874fc..ec80241c66 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -18,11 +18,10 @@ #include <boost/format.hpp> #include "BrokerAdapter.h" +#include "BrokerChannel.h" #include "Connection.h" -#include "Exception.h" #include "AMQMethodBody.h" #include "Exception.h" -#include "MessageHandlerImpl.h" namespace qpid { namespace broker { @@ -33,18 +32,37 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext& context , const FieldTable& /*clientProperties*/, + +BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : + CoreRefs(ch, c, b), + connection(c), + basicHandler(*this), + channelHandler(*this), + connectionHandler(*this), + exchangeHandler(*this), + messageHandler(*this), + queueHandler(*this), + txHandler(*this) +{} + + +ProtocolVersion BrokerAdapter::getVersion() const { + return connection.getVersion(); +} + +void BrokerAdapter::ConnectionHandlerImpl::startOk( + const MethodContext&, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune( - context, 100, connection.getFrameMax(), connection.getHeartbeat()); + const string& /*response*/, const string& /*locale*/) +{ + client.tune( + 100, connection.getFrameMax(), connection.getHeartbeat()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk( +void BrokerAdapter::ConnectionHandlerImpl::secureOk( const MethodContext&, const string& /*response*/){} -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( +void BrokerAdapter::ConnectionHandlerImpl::tuneOk( const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat) { @@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( connection.setHeartbeat(heartbeat); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ConnectionHandlerImpl::open( + const MethodContext& context, const string& /*virtualHost*/, + const string& /*capabilities*/, bool /*insist*/) +{ string knownhosts; - connection.client->getConnection().openOk(context, knownhosts); + client.openOk( + knownhosts, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close( +void BrokerAdapter::ConnectionHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getConnection().closeOk(context); + client.closeOk(context.getRequestId()); connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ +void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open( +void BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - connection.client->getChannel().openOk(context, std::string()/* ID */); + client.openOk( + std::string()/* ID */, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close( +void BrokerAdapter::ChannelHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getChannel().closeOk(context); + client.closeOk(context.getRequestId()); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont } } if(!nowait){ - connection.client->getExchange().declareOk(context); + client.declareOk(context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(context); + if(!nowait) client.deleteOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel.getId()); } else { std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); + broker.getQueues().declare( + name, durable, + autoDelete ? connection.getTimeout() : 0, + exclusive ? &connection : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext % queue->getName()); if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); + client.declareOk( + queueName, queue->getMessageCount(), queue->getConsumerCount(), + context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(context); + if(!nowait) client.bindOk(context.getRequestId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c } void -BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( +BrokerAdapter::QueueHandlerImpl::unbind( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(context); + client.unbindOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(context, count); + if(!nowait) client.purgeOk( count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); @@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(context, count); + if(!nowait) + client.deleteOk(count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(context); + client.qosOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( +void BrokerAdapter::BasicHandlerImpl::consume( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(context, newTag); + if(!nowait) client.consumeOk(newTag, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); + if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( +void BrokerAdapter::BasicHandlerImpl::publish( const MethodContext& context, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate) @@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(context, clusterId); + client.getEmpty(clusterId, context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); - connection.client->getTx().selectOk(context); + client.selectOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); - connection.client->getTx().commitOk(context); + client.commitOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); - connection.client->getTx().rollbackOk(context); + client.rollbackOk(context.getRequestId()); channel.recover(false); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) +BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) { //no specific action required, generic response handling should be sufficient } @@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) // Message class method handlers // void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(context); - connection.client->getChannel().pong(context); + client.ok(context.getRequestId()); + client.pong(); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(context); + client.ok(context.getRequestId()); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( +BrokerAdapter::ChannelHandlerImpl::resume( const MethodContext&, const string& /*channel*/ ) { diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index cd34f17e58..166ec78ddd 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -19,8 +19,9 @@ * */ #include "AMQP_ServerOperations.h" +#include "HandlerImpl.h" #include "MessageHandlerImpl.h" -#include "BrokerChannel.h" +#include "Exception.h" namespace qpid { namespace broker { @@ -28,14 +29,6 @@ namespace broker { class Channel; class Connection; class Broker; - -/** - * Per-channel protocol adapter. - * - * Translates protocol bodies into calls on the core Channel, - * Connection and Broker objects. - */ - class ChannelHandler; class ConnectionHandler; class BasicHandler; @@ -48,20 +41,23 @@ class FileHandler; class StreamHandler; class DtxHandler; class TunnelHandler; +class MessageHandlerImpl; -class BrokerAdapter : public framing::AMQP_ServerOperations +/** + * Per-channel protocol adapter. + * + * A container for a collection of AMQP-class adapters that translate + * AMQP method bodies into calls on the core Channel, Connection and + * Broker objects. Each adapter class also provides a client proxy + * to send methods to the peer. + * + */ +class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - basicHandler(ch, c, b), - channelHandler(ch, c, b), - connectionHandler(ch, c, b), - exchangeHandler(ch, c, b), - messageHandler(ch, c, b), - queueHandler(ch, c, b), - txHandler(ch, c, b) - {} - + BrokerAdapter(Channel& ch, Connection& c, Broker& b); + + framing::ProtocolVersion getVersion() const; ChannelHandler* getChannelHandler() { return &channelHandler; } ConnectionHandler* getConnectionHandler() { return &connectionHandler; } BasicHandler* getBasicHandler() { return &basicHandler; } @@ -80,19 +76,16 @@ class BrokerAdapter : public framing::AMQP_ServerOperations TunnelHandler* getTunnelHandler() { throw ConnectionException(540, "Tunnel class not implemented"); } + framing::AMQP_ClientProxy& getProxy() { return proxy; } + private: - struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} - Channel& channel; - Connection& connection; - Broker& broker; - }; - - class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { + class ConnectionHandlerImpl : + public ConnectionHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Connection> + { public: - ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} void startOk(const framing::MethodContext& context, const qpid::framing::FieldTable& clientProperties, @@ -112,9 +105,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void closeOk(const framing::MethodContext& context); }; - class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ + class ChannelHandlerImpl : + public ChannelHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Channel> + { public: - ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void open(const framing::MethodContext& context, const std::string& outOfBand); void flow(const framing::MethodContext& context, bool active); void flowOk(const framing::MethodContext& context, bool active); @@ -127,9 +124,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void closeOk(const framing::MethodContext& context); }; - class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ + class ExchangeHandlerImpl : + public ExchangeHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Exchange> + { public: - ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& exchange, const std::string& type, bool passive, bool durable, bool autoDelete, @@ -139,9 +140,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations const std::string& exchange, bool ifUnused, bool nowait); }; - class QueueHandlerImpl : private CoreRefs, public QueueHandler{ + class QueueHandlerImpl : + public QueueHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Queue> + { public: - QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, @@ -162,9 +167,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations bool nowait); }; - class BasicHandlerImpl : private CoreRefs, public BasicHandler{ + class BasicHandlerImpl : + public BasicHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Basic> + { public: - BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void qos(const framing::MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); void consume( @@ -184,14 +193,19 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void recover(const framing::MethodContext& context, bool requeue); }; - class TxHandlerImpl : private CoreRefs, public TxHandler{ + class TxHandlerImpl : + public TxHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Tx> + { public: - TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void select(const framing::MethodContext& context); void commit(const framing::MethodContext& context); void rollback(const framing::MethodContext& context); }; + Connection& connection; BasicHandlerImpl basicHandler; ChannelHandlerImpl channelHandler; ConnectionHandlerImpl connectionHandler; @@ -199,7 +213,7 @@ class BrokerAdapter : public framing::AMQP_ServerOperations MessageHandlerImpl messageHandler; QueueHandlerImpl queueHandler; TxHandlerImpl txHandler; - + }; }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 07636216a6..74e5504f17 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -25,6 +25,8 @@ #include <algorithm> #include <functional> +#include <boost/bind.hpp> + #include "BrokerChannel.h" #include "DeletingTxOp.h" #include "framing/ChannelAdapter.h" @@ -50,7 +52,7 @@ Channel::Channel( u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold ) : - ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()), + ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), transactional(false), @@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, +// TODO aconway 2007-02-12: Why is connection token passed in instead +// of using the channel's parent connection? +void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - } catch(...) { - // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt. - delete c; - throw; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c( + new ConsumerImpl(this, tagInOut, queue, connection, acks)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); } void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); } void Channel::close(){ - if (isOpen()) { - opened = false; - while (!consumers.empty()) - cancel(consumers.begin()); - //requeue: - recover(true); - } + opened = false; + consumers.clear(); + recover(true); } void Channel::begin(){ @@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack +) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ return false; } +Channel::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); + if(queue) + queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); + if(blocked) + queue->dispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg) @@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg) connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + *deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); } } @@ -244,7 +238,8 @@ void Channel::ack(){ ack(getRequestInProgress(), false); } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) +{ if(transactional){ accumulatedAck.update(deliveryTag, multiple); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } + std::for_each(consumers.begin(), consumers.end(), + boost::bind(&ConsumerImpl::requestDispatch, _1)); } } @@ -328,8 +322,8 @@ void Channel::handleMethodInContext( method->invoke(*adapter, context); } }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), + adapter->getProxy().getChannel().close( + e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); }catch(ConnectionException& e){ @@ -338,4 +332,3 @@ void Channel::handleMethodInContext( connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } - diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 58c4f0a45b..538e86b0a8 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -23,10 +23,10 @@ */ #include <list> -#include <map> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <boost/ptr_container/ptr_map.hpp> #include <AccumulatedAck.h> #include <Consumer.h> @@ -56,7 +56,7 @@ using framing::string; class Channel : public framing::ChannelAdapter, public CompletionHandler { - class ConsumerImpl : public virtual Consumer + class ConsumerImpl : public Consumer { Channel* parent; const string tag; @@ -64,23 +64,25 @@ class Channel : public framing::ChannelAdapter, ConnectionToken* const connection; const bool ackExpected; bool blocked; + public: ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + ~ConsumerImpl(); virtual bool deliver(Message::shared_ptr& msg); void cancel(); void requestDispatch(); }; - typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; Connection& connection; u_int16_t id; u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; - std::map<string, ConsumerImpl*> consumers; + ConsumerImplMap consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; Prefetch outstanding; @@ -93,18 +95,17 @@ class Channel : public framing::ChannelAdapter, MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message bool opened; - boost::scoped_ptr<BrokerAdapter> adapter; // completion handler for MessageBuilder void complete(Message::shared_ptr msg); - void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); - void cancel(consumer_iterator consumer); + void deliver(Message::shared_ptr& msg, const string& tag, + Queue::shared_ptr& queue, bool ackExpected); bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(Connection& channel, + Channel(Connection& parent, framing::ChannelId id, u_int32_t framesize, MessageStore* const _store = 0, @@ -112,8 +113,8 @@ class Channel : public framing::ChannelAdapter, ~Channel(); - // For ChannelAdapter bool isOpen() const { return opened; } + BrokerAdapter& getAdatper() { return *adapter; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -122,7 +123,11 @@ class Channel : public framing::ChannelAdapter, u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; } bool exists(const string& consumerTag); - void consume(string& tag, Queue::shared_ptr queue, bool acks, + + /** + *@param tagInOut - if empty it is updated with the generated token. + */ + void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); @@ -146,7 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleMethodInContext( boost::shared_ptr<framing::AMQMethodBody> method, const framing::MethodContext& context); - }; struct InvalidAckException{}; diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index e8a993942a..bff4492a49 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -28,6 +28,9 @@ #include <MessageStore.h> #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include "AMQMethodBody.h" #include "AMQFrame.h" #include "framing/ChannelAdapter.h" diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 9e77eab446..871514e55f 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -22,12 +22,10 @@ * */ -#include <BrokerMessageBase.h> #include <memory> #include <boost/shared_ptr.hpp> -#include <AMQContentBody.h> -#include <AMQHeaderBody.h> -#include "AMQMethodBody.h" + +#include <BrokerMessageBase.h> #include <BasicHeaderProperties.h> #include <ConnectionToken.h> #include <Content.h> @@ -39,6 +37,7 @@ namespace qpid { namespace framing { class MethodContext; class ChannelAdapter; +class AMQHeaderBody; } namespace broker { @@ -52,7 +51,7 @@ using framing::string; * request. */ class BasicMessage : public Message { - framing::AMQHeaderBody::shared_ptr header; + boost::shared_ptr<framing::AMQHeaderBody> header; std::auto_ptr<Content> content; sys::Mutex contentLock; u_int64_t size; @@ -65,10 +64,10 @@ class BasicMessage : public Message { BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate, - framing::AMQMethodBody::shared_ptr respondTo); + boost::shared_ptr<framing::AMQMethodBody> respondTo); BasicMessage(); ~BasicMessage(); - void setHeader(framing::AMQHeaderBody::shared_ptr header); + void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header); void addContent(framing::AMQContentBody::shared_ptr data); bool isComplete(); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 41a0cd45fa..3bba95a5f8 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -22,14 +22,10 @@ * */ -#include "AMQContentBody.h" -#include "AMQHeaderBody.h" -#include "AMQMethodBody.h" -#include "Content.h" -#include "framing/amqp_types.h" - #include <string> #include <boost/shared_ptr.hpp> +#include "Content.h" +#include "framing/amqp_types.h" namespace qpid { @@ -38,6 +34,9 @@ class MethodContext; class ChannelAdapter; class BasicHeaderProperties; class FieldTable; +class AMQMethodBody; +class AMQContentBody; +class AMQHeaderBody; } @@ -50,24 +49,17 @@ class MessageStore; * abstracting away the operations * TODO; AMS: for the moment this is mostly a placeholder */ -class Message{ - const ConnectionToken* publisher; - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - u_int64_t persistenceId; - bool redelivered; - framing::AMQMethodBody::shared_ptr respondTo; - +class Message { public: typedef boost::shared_ptr<Message> shared_ptr; + typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr; + Message(const ConnectionToken* publisher_, const std::string& _exchange, const std::string& _routingKey, bool _mandatory, bool _immediate, - framing::AMQMethodBody::shared_ptr respondTo_) : + AMQMethodBodyPtr respondTo_) : publisher(publisher_), exchange(_exchange), routingKey(_routingKey), @@ -92,9 +84,7 @@ class Message{ const std::string& getExchange() const { return exchange; } u_int64_t getPersistenceId() const { return persistenceId; } bool getRedelivered() const { return redelivered; } - framing::AMQMethodBody::shared_ptr getRespondTo() const { - return respondTo; - } + AMQMethodBodyPtr getRespondTo() const { return respondTo; } void setRouting(const std::string& _exchange, const std::string& _routingKey) { exchange = _exchange; routingKey = _routingKey; } @@ -168,14 +158,24 @@ class Message{ * it uses). */ virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {}; - virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {}; + virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {}; + virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {}; /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can * be reloaded. */ virtual void releaseContent(MessageStore* /*store*/) {}; + + private: + const ConnectionToken* publisher; + std::string exchange; + std::string routingKey; + const bool mandatory; + const bool immediate; + u_int64_t persistenceId; + bool redelivered; + AMQMethodBodyPtr respondTo; }; }} diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 99c045c59a..789e652947 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -149,7 +149,9 @@ void Queue::consume(Consumer* c, bool requestExclusive){ void Queue::cancel(Consumer* c){ Mutex::ScopedLock locker(lock); - consumers.erase(find(consumers.begin(), consumers.end(), c)); + Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); + if (i != consumers.end()) + consumers.erase(i); if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; if(exclusive == c) exclusive = 0; } diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h index 40fa4bd415..015b27fe76 100644 --- a/cpp/lib/broker/BrokerQueue.h +++ b/cpp/lib/broker/BrokerQueue.h @@ -53,13 +53,17 @@ namespace qpid { * or more consumers registers. */ class Queue{ + typedef std::vector<Consumer*> Consumers; + typedef std::queue<Binding*> Bindings; + typedef std::queue<Message::shared_ptr> Messages; + const string name; const u_int32_t autodelete; MessageStore* const store; const ConnectionToken* const owner; - std::vector<Consumer*> consumers; - std::queue<Binding*> bindings; - std::queue<Message::shared_ptr> messages; + Consumers consumers; + Bindings bindings; + Messages messages; bool queueing; bool dispatching; int next; diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index 000199a65e..3d9e5cdaf8 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -22,6 +22,9 @@ #include <assert.h> #include "Connection.h" +#include "BrokerChannel.h" +#include "AMQP_ClientProxy.h" +#include "BrokerAdapter.h" using namespace boost; using namespace qpid::sys; @@ -33,12 +36,15 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : broker(broker_), - settings(broker.getTimeout(), broker.getStagingThreshold()), out(out_), framemax(65536), - heartbeat(0) + heartbeat(0), + client(0), + timeout(broker.getTimeout()), + stagingThreshold(broker.getStagingThreshold()) {} + Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { @@ -59,31 +65,27 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ } -void Connection::received(qpid::framing::AMQFrame* frame){ +void Connection::received(framing::AMQFrame* frame){ getChannel(frame->getChannel()).handleBody(frame->getBody()); } -void Connection::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId){ - client->getConnection().close(MethodContext(&getChannel(0)), code, text, classId, methodId); +void Connection::close( + ReplyCode code, const string& text, ClassId classId, MethodId methodId) +{ + client->close(code, text, classId, methodId); getOutput().close(); } -// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter -// as it is part of the protocol. -void Connection::initiated(qpid::framing::ProtocolInitiation* header) { - if (client.get()) - // TODO aconway 2007-01-16: correct error code. - throw ConnectionException(0, "Connection initiated twice"); - client.reset(new qpid::framing::AMQP_ClientProxy( - out, header->getMajor(), header->getMinor())); +void Connection::initiated(framing::ProtocolInitiation* header) { + version = ProtocolVersion(header->getMajor(), header->getMinor()); FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - client->getConnection().start( - MethodContext(&getChannel(0)), + getChannel(0).init(0, *out, getVersion()); + client = &getChannel(0).getAdatper().getProxy().getConnection(); + client->start( header->getMajor(), header->getMinor(), properties, mechanisms, locales); - getChannel(0).init(0, *out, client->getProtocolVersion()); } void Connection::idleOut(){} @@ -103,9 +105,10 @@ void Connection::closed(){ } } -void Connection::closeChannel(u_int16_t channel) { - getChannel(channel).close(); - channels.erase(channels.find(channel)); +void Connection::closeChannel(u_int16_t id) { + ChannelMap::iterator i = channels.find(id); + if (i != channels.end()) + i->close(); } @@ -115,7 +118,7 @@ Channel& Connection::getChannel(ChannelId id) { i = channels.insert( id, new Channel( *this, id, framemax, broker.getQueues().getStore(), - settings.stagingThreshold)).first; + broker.getStagingThreshold())).first; } return *i; } diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index 4f1156dd01..27faab4967 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -27,66 +27,64 @@ #include <boost/ptr_container/ptr_map.hpp> #include <AMQFrame.h> -#include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> +#include <AMQP_ClientProxy.h> #include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> +#include "framing/ProtocolVersion.h" #include "Broker.h" #include "Exception.h" +#include "BrokerChannel.h" namespace qpid { namespace broker { -class Settings { - public: - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - const u_int64_t stagingThreshold; - - Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} -}; +class Channel; class Connection : public sys::ConnectionInputHandler, public ConnectionToken { public: Connection(sys::ConnectionOutputHandler* out, Broker& broker); - // ConnectionInputHandler methods - void received(framing::AMQFrame* frame); - void initiated(framing::ProtocolInitiation* header); - void idleOut(); - void idleIn(); - void closed(); - sys::ConnectionOutputHandler& getOutput() { return *out; } + /** Get a channel. Create if it does not already exist */ + Channel& getChannel(framing::ChannelId channel); - const framing::ProtocolVersion& getVersion() { - return client->getProtocolVersion(); } + /** Close a channel */ + void closeChannel(framing::ChannelId channel); + + /** Close the connection */ + void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + + sys::ConnectionOutputHandler& getOutput() const { return *out; } + framing::ProtocolVersion getVersion() const { return version; } u_int32_t getFrameMax() const { return framemax; } u_int16_t getHeartbeat() const { return heartbeat; } + u_int32_t getTimeout() const { return timeout; } + u_int64_t getStagingThreshold() const { return stagingThreshold; } void setFrameMax(u_int32_t fm) { framemax = fm; } void setHeartbeat(u_int16_t hb) { heartbeat = hb; } - - Broker& broker; - std::auto_ptr<framing::AMQP_ClientProxy> client; - Settings settings; - - std::vector<Queue::shared_ptr> exclusiveQueues; - + /** * Get named queue, never returns 0. * @return: named queue or default queue for channel if name="" * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if no queue specified and channel has not declared one. + * @exception: ConnectionException if name="" and channel has no default. */ Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - Channel& newChannel(framing::ChannelId channel); - Channel& getChannel(framing::ChannelId channel); - void closeChannel(framing::ChannelId channel); - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + Broker& broker; + std::vector<Queue::shared_ptr> exclusiveQueues; + + // ConnectionInputHandler methods + void received(framing::AMQFrame* frame); + void initiated(framing::ProtocolInitiation* header); + void idleOut(); + void idleIn(); + void closed(); private: typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; @@ -94,10 +92,15 @@ class Connection : public sys::ConnectionInputHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); + framing::ProtocolVersion version; ChannelMap channels; sys::ConnectionOutputHandler* out; u_int32_t framemax; u_int16_t heartbeat; + framing::AMQP_ClientProxy::Connection* client; + const u_int32_t timeout; //timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + }; }} diff --git a/cpp/lib/broker/HandlerImpl.h b/cpp/lib/broker/HandlerImpl.h new file mode 100644 index 0000000000..c55a36da45 --- /dev/null +++ b/cpp/lib/broker/HandlerImpl.h @@ -0,0 +1,71 @@ +#ifndef _broker_HandlerImpl_h +#define _broker_HandlerImpl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "BrokerChannel.h" +#include "AMQP_ClientProxy.h" + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class Broker; +class Channel; +class Connection; + +/** + * A collection of references to the core objects required by an adapter, + * and a client proxy. + */ +struct CoreRefs +{ + CoreRefs(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b), proxy(ch) {} + + Channel& channel; + Connection& connection; + Broker& broker; + framing::AMQP_ClientProxy proxy; +}; + + +/** + * Base template for protocol handler implementations. + * Provides the core references and appropriate AMQP class proxy. + */ +template <class ProxyType> +struct HandlerImpl : public CoreRefs { + typedef HandlerImpl<ProxyType> HandlerImplType; + HandlerImpl(CoreRefs& parent) + : CoreRefs(parent), client(ProxyType::get(proxy)) {} + ProxyType client; +}; + + + +}} // namespace qpid::broker + + + +#endif /*!_broker_HandlerImpl_h*/ diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 797e3fbbf9..0853aebcb1 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -25,16 +25,15 @@ #include "BrokerMessageMessage.h" #include "MessageAppendBody.h" #include "MessageTransferBody.h" +#include "BrokerAdapter.h" namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), references(ch), - client(connection.client->getMessage()) -{} +MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) + : HandlerImplType(parent), references(channel) {} // // Message class method handlers @@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context, references.get(reference).append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); - client.ok(context); + client.ok(context.getRequestId()); } @@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context, const string& destination ) { channel.cancel(destination); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context, const string& reference) { references.get(reference).close(); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context, bool noLocal, bool noAck, bool exclusive, - const qpid::framing::FieldTable& filter ) + const framing::FieldTable& filter ) { Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!destination.empty() && channel.exists(destination)) @@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context, channel.consume( tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(context); + client.ok(context.getRequestId()); // Dispatch messages as there is now a consumer. queue->dispatch(); } @@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context, connection.getQueue(queueName, context.channel->getId()); if(channel.get(queue, destination, !noAck)) - client.ok(context); + client.ok(context.getRequestId()); else - client.empty(context); + client.empty(context.getRequestId()); } void @@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context, const string& reference) { references.open(reference); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context, bool requeue) { channel.recover(requeue); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*appId*/, const string& /*transactionId*/, const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content body, + const framing::FieldTable& /*applicationHeaders*/, + const framing::Content& body, bool /*mandatory*/) { MessageTransferBody::shared_ptr transfer( @@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context, channel.handleInlineTransfer(message); else references.get(body.getValue()).addMessage(message); - client.ok(context); + client.ok(context.getRequestId()); } diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 0fef45bb19..cb7e7e3126 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -24,7 +24,7 @@ #include "AMQP_ServerOperations.h" #include "AMQP_ClientProxy.h" #include "Reference.h" -#include "BrokerChannel.h" +#include "HandlerImpl.h" namespace qpid { namespace broker { @@ -34,10 +34,11 @@ class Broker; class MessageMessage; class MessageHandlerImpl : - public framing::AMQP_ServerOperations::MessageHandler + public framing::AMQP_ServerOperations::MessageHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Message> { public: - MessageHandlerImpl(Channel& ch, Connection& c, Broker& b); + MessageHandlerImpl(CoreRefs& parent); void append(const framing::MethodContext&, const std::string& reference, @@ -116,14 +117,10 @@ class MessageHandlerImpl : const std::string& transactionId, const std::string& securityToken, const framing::FieldTable& applicationHeaders, - framing::Content body, + const framing::Content& body, bool mandatory ); private: - Channel& channel; - Connection& connection; - Broker& broker; ReferenceRegistry references; - framing::AMQP_ClientProxy::Message& client; }; }} // namespace qpid::broker diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index dd93c6ae8b..52910f5161 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -24,6 +24,7 @@ #include <QpidError.h> #include <MethodBodyInstances.h> #include "Connection.h" +#include "AMQP_ServerProxy.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -48,12 +49,23 @@ Channel::~Channel(){ close(); } +AMQP_ServerProxy& Channel::brokerProxy() { + assert(proxy.get()); + return *proxy; +} + +AMQMethodBody::shared_ptr Channel::brokerResponse() { + // FIXME aconway 2007-02-08: implement responses. + return AMQMethodBody::shared_ptr(); +} + void Channel::open(ChannelId id, Connection& con) { if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); connection = &con; init(id, con, con.getVersion()); // ChannelAdapter initialization. + proxy.reset(new AMQP_ServerProxy(*this)); string oob; if (id != 0) sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index a34c95d2c4..1c082f3b59 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -24,6 +24,7 @@ #include <map> #include <string> #include <queue> +#include <boost/scoped_ptr.hpp> #include "sys/types.h" #include <framing/amqp_framing.h> @@ -39,8 +40,10 @@ #include "Thread.h" namespace qpid { + namespace framing { class ChannelCloseBody; +class AMQP_ServerProxy; } namespace client { @@ -102,6 +105,7 @@ class Channel : public framing::ChannelAdapter, u_int16_t prefetch; const bool transactional; framing::ProtocolVersion version; + boost::scoped_ptr<framing::AMQP_ServerProxy> proxy; void enqueue(); void retrieve(Message& msg); @@ -151,8 +155,6 @@ class Channel : public framing::ChannelAdapter, public: - bool isOpen() const; - /** * Creates a channel object. * @@ -358,9 +360,21 @@ class Channel : public framing::ChannelAdapter, * @see publish() */ void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + bool isOpen() const; + + /** + * Returns a proxy for the "raw" AMQP broker protocol. Only for use by + * protocol experts. + */ + + framing::AMQP_ServerProxy& brokerProxy(); + /** + * Wait for the next method from the broker. + */ + framing::AMQMethodBody::shared_ptr brokerResponse(); }; -} -} +}} #endif /*!_client_ClientChannel_h*/ diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 0fafd29b90..2f91c44a22 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -43,7 +43,7 @@ const std::string Connection::OK("OK"); Connection::Connection( bool _debug, u_int32_t _max_frame_size, - const framing::ProtocolVersion& _version + framing::ProtocolVersion _version ) : version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 2f9b35d5ef..275e02a105 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -96,12 +96,12 @@ class Connection : public ConnectionForChannel Connector* connector; framing::OutputHandler* out; volatile bool isOpen; + Channel channel0; + bool debug; void erase(framing::ChannelId); void channelException( Channel&, framing::AMQMethodBody*, const QpidError&); - Channel channel0; - bool debug; // TODO aconway 2007-01-26: too many friendships, untagle these classes. friend class Channel; @@ -120,9 +120,8 @@ class Connection : public ConnectionForChannel * @param max_frame_size the maximum frame size that the * client will accept. Optional and defaults to 65536. */ - Connection( - bool debug = false, u_int32_t max_frame_size = 65536, - const framing::ProtocolVersion& = framing::highestProtocolVersion); + Connection(bool debug = false, u_int32_t max_frame_size = 65536, + framing::ProtocolVersion=framing::highestProtocolVersion); ~Connection(); /** @@ -185,7 +184,7 @@ class Connection : public ConnectionForChannel inline u_int32_t getMaxFrameSize(){ return max_frame_size; } /** @return protocol version in use on this connection. */ - const framing::ProtocolVersion& getVersion() const { return version; } + framing::ProtocolVersion getVersion() const { return version; } }; }} // namespace qpid::client diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index 425cecaf6f..657ee77f1a 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -23,17 +23,19 @@ #include <sys/Time.h> #include "Connector.h" +namespace qpid { +namespace client { + using namespace qpid::sys; -using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; -Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, - bool _debug, u_int32_t buffer_size) : - debug(_debug), +Connector::Connector( + ProtocolVersion ver, bool _debug, u_int32_t buffer_size +) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), - version(pVersion), + version(ver), closed(true), lastIn(0), lastOut(0), timeout(0), @@ -180,3 +182,5 @@ void Connector::run(){ handleClosed(); } } + +}} // namespace qpid::client diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 1126e861e0..ccac39f849 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -77,7 +77,7 @@ class Connector : public framing::OutputHandler, friend class Channel; public: - Connector(const framing::ProtocolVersion& pVersion, + Connector(framing::ProtocolVersion pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index eefff79d6f..c44480bddf 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -63,6 +63,7 @@ libqpidcommon_la_SOURCES = \ $(framing)/AMQHeaderBody.cpp \ $(framing)/AMQHeartbeatBody.cpp \ $(framing)/AMQMethodBody.cpp \ + $(framing)/MethodContext.cpp \ $(framing)/BasicHeaderProperties.cpp \ $(framing)/BodyHandler.cpp \ $(framing)/ChannelAdapter.cpp \ @@ -76,8 +77,8 @@ libqpidcommon_la_SOURCES = \ $(framing)/Requester.cpp \ $(framing)/Responder.cpp \ $(framing)/Value.cpp \ + $(framing)/Proxy.cpp \ $(gen)/AMQP_ClientProxy.cpp \ - $(gen)/AMQP_HighestVersion.h \ $(gen)/AMQP_MethodVersionMap.cpp \ $(gen)/AMQP_ServerProxy.cpp \ Exception.cpp \ @@ -87,6 +88,7 @@ libqpidcommon_la_SOURCES = \ sys/Time.cpp nobase_pkginclude_HEADERS = \ + $(gen)/AMQP_HighestVersion.h \ $(platform_hdr) \ $(framing)/AMQBody.h \ $(framing)/AMQContentBody.h \ @@ -95,6 +97,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/AMQHeaderBody.h \ $(framing)/AMQHeartbeatBody.h \ $(framing)/AMQMethodBody.h \ + $(framing)/MethodContext.h \ $(framing)/BasicHeaderProperties.h \ $(framing)/BodyHandler.h \ $(framing)/ChannelAdapter.h \ @@ -111,6 +114,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/Value.h \ $(framing)/amqp_framing.h \ $(framing)/amqp_types.h \ + $(framing)/Proxy.h \ Exception.h \ ExceptionHolder.h \ QpidError.h \ @@ -121,9 +125,9 @@ nobase_pkginclude_HEADERS = \ sys/Monitor.h \ sys/Mutex.h \ sys/Runnable.h \ - sys/ConnectionOutputHandler.h \ - sys/ConnectionInputHandler.h \ - sys/ConnectionInputHandlerFactory.h \ + sys/ConnectionOutputHandler.h \ + sys/ConnectionInputHandler.h \ + sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ sys/Socket.h \ sys/Thread.h \ diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 9c5e295e22..4e061af2e1 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -26,19 +26,24 @@ #include "AMQRequestBody.h" #include "AMQResponseBody.h" -using namespace qpid::framing; + +namespace qpid { +namespace framing { + AMQP_MethodVersionMap AMQFrame::versionMap; -AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version): +AMQFrame::AMQFrame(ProtocolVersion _version): version(_version) - {} + { + assert(version != ProtocolVersion(0,0)); + } -AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : +AMQFrame::AMQFrame(ProtocolVersion _version, u_int16_t _channel, AMQBody* _body) : version(_version), channel(_channel), body(_body) {} -AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : +AMQFrame::AMQFrame(ProtocolVersion _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : version(_version), channel(_channel), body(_body) {} @@ -119,7 +124,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) body->decode(buffer, size); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +std::ostream& operator<<(std::ostream& out, const AMQFrame& t) { out << "Frame[channel=" << t.channel << "; "; if (t.body.get() == 0) @@ -130,3 +135,5 @@ std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) return out; } + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index 15b294a373..4e49b8871f 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -41,9 +41,9 @@ namespace framing { class AMQFrame : public AMQDataBlock { public: - AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion); - AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); - AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); + AMQFrame(ProtocolVersion _version = highestProtocolVersion); + AMQFrame(ProtocolVersion _version, u_int16_t channel, AMQBody* body); + AMQFrame(ProtocolVersion _version, u_int16_t channel, const AMQBody::shared_ptr& body); virtual ~AMQFrame(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); @@ -62,7 +62,7 @@ class AMQFrame : public AMQDataBlock private: static AMQP_MethodVersionMap versionMap; - qpid::framing::ProtocolVersion version; + ProtocolVersion version; u_int16_t channel; u_int8_t type; diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp index 53ab30faa0..40241660f2 100644 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -19,6 +19,7 @@ #include "ChannelAdapter.h" #include "AMQFrame.h" +#include "Exception.h" using boost::format; @@ -26,7 +27,7 @@ namespace qpid { namespace framing { void ChannelAdapter::init( - ChannelId i, OutputHandler& o, const ProtocolVersion& v) + ChannelId i, OutputHandler& o, ProtocolVersion v) { assertChannelNotOpen(); id = i; @@ -34,13 +35,15 @@ void ChannelAdapter::init( version = v; } -void ChannelAdapter::send(AMQBody::shared_ptr body) { +RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { + RequestId result = 0; assertChannelOpen(); switch (body->type()) { case REQUEST_BODY: { AMQRequestBody::shared_ptr request = boost::shared_polymorphic_downcast<AMQRequestBody>(body); requester.sending(request->getData()); + result = request->getData().requestId; break; } case RESPONSE_BODY: { @@ -51,6 +54,7 @@ void ChannelAdapter::send(AMQBody::shared_ptr body) { } } out->send(new AMQFrame(getVersion(), getId(), body)); + return result; } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h index c2eba2f4e9..9f654d9a5b 100644 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -35,9 +35,8 @@ namespace framing { class MethodContext; /** - * Base class for client and broker channel adapters. + * Base class for client and broker channels. * - * BodyHandler::handl* * - receives frame bodies from the network. * - Updates request/response data. * - Dispatches requests with a MethodContext for responses. @@ -55,21 +54,21 @@ class ChannelAdapter : public BodyHandler { *@param output Processed frames are forwarded to this handler. */ ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0, - const ProtocolVersion& ver=ProtocolVersion()) + ProtocolVersion ver=ProtocolVersion()) : id(id_), out(out_), version(ver) {} /** Initialize the channel adapter. */ - void init(ChannelId, OutputHandler&, const ProtocolVersion&); + void init(ChannelId, OutputHandler&, ProtocolVersion); ChannelId getId() const { return id; } - const ProtocolVersion& getVersion() const { return version; } + ProtocolVersion getVersion() const { return version; } /** * Wrap body in a frame and send the frame. * Takes ownership of body. */ - void send(AMQBody::shared_ptr body); - void send(AMQBody* body) { send(AMQBody::shared_ptr(body)); } + RequestId send(AMQBody::shared_ptr body); + RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); @@ -95,7 +94,7 @@ class ChannelAdapter : public BodyHandler { ProtocolVersion version; Requester requester; Responder responder; - RequestId requestInProgress; // TODO aconway 2007-01-24: use it. + RequestId requestInProgress; }; }} diff --git a/cpp/lib/common/framing/FramingContent.h b/cpp/lib/common/framing/FramingContent.h index 0f4a4b8f64..cffd46ee24 100644 --- a/cpp/lib/common/framing/FramingContent.h +++ b/cpp/lib/common/framing/FramingContent.h @@ -27,9 +27,9 @@ class Content void encode(Buffer& buffer) const; void decode(Buffer& buffer); size_t size() const; - bool isInline() { return discriminator == INLINE; } - bool isReference() { return discriminator == REFERENCE; } - const string& getValue() { return value; } + bool isInline() const { return discriminator == INLINE; } + bool isReference() const { return discriminator == REFERENCE; } + const string& getValue() const { return value; } friend std::ostream& operator<<(std::ostream&, const Content&); }; diff --git a/cpp/lib/common/framing/MethodContext.cpp b/cpp/lib/common/framing/MethodContext.cpp new file mode 100644 index 0000000000..73af73f8e5 --- /dev/null +++ b/cpp/lib/common/framing/MethodContext.cpp @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "MethodContext.h" +#include "amqp_types.h" +#include "AMQRequestBody.h" + +namespace qpid { +namespace framing { + +RequestId MethodContext::getRequestId() const { + return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody) + ->getRequestId(); +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index afb499023d..3493924bf6 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -24,8 +24,6 @@ #include "OutputHandler.h" #include "ProtocolVersion.h" -#include <boost/shared_ptr.hpp> - namespace qpid { namespace framing { @@ -61,6 +59,12 @@ struct MethodContext * It's also provides the request ID when constructing a response. */ BodyPtr methodBody; + + /** + * Return methodBody's request ID. + * It is an error to call this if methodBody is not a request. + */ + RequestId getRequestId() const; }; // FIXME aconway 2007-02-01: Method context only required on Handler diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp index 471f736a7d..c119b79d6d 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.cpp +++ b/cpp/lib/common/framing/ProtocolInitiation.cpp @@ -20,15 +20,18 @@ */ #include <ProtocolInitiation.h> -qpid::framing::ProtocolInitiation::ProtocolInitiation(){} +namespace qpid { +namespace framing { -qpid::framing::ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {} +ProtocolInitiation::ProtocolInitiation(){} -qpid::framing::ProtocolInitiation::ProtocolInitiation(const qpid::framing::ProtocolVersion& p) : version(p) {} +ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {} -qpid::framing::ProtocolInitiation::~ProtocolInitiation(){} +ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} -void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){ +ProtocolInitiation::~ProtocolInitiation(){} + +void ProtocolInitiation::encode(Buffer& buffer){ buffer.putOctet('A'); buffer.putOctet('M'); buffer.putOctet('Q'); @@ -39,7 +42,7 @@ void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){ buffer.putOctet(version.getMinor()); } -bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ +bool ProtocolInitiation::decode(Buffer& buffer){ if(buffer.available() >= 8){ buffer.getOctet();//A buffer.getOctet();//M @@ -56,3 +59,5 @@ bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ } //TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h index 6b3dbac88d..fb5cb3abed 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.h +++ b/cpp/lib/common/framing/ProtocolInitiation.h @@ -37,14 +37,14 @@ private: public: ProtocolInitiation(); ProtocolInitiation(u_int8_t major, u_int8_t minor); - ProtocolInitiation(const ProtocolVersion& p); + ProtocolInitiation(ProtocolVersion p); virtual ~ProtocolInitiation(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); inline virtual u_int32_t size() const { return 8; } inline u_int8_t getMajor() const { return version.getMajor(); } inline u_int8_t getMinor() const { return version.getMinor(); } - inline const ProtocolVersion& getVersion() const { return version; } + inline ProtocolVersion getVersion() const { return version; } }; } diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp index e65c8b79b8..fd4b1a645f 100644 --- a/cpp/lib/common/framing/ProtocolVersion.cpp +++ b/cpp/lib/common/framing/ProtocolVersion.cpp @@ -20,37 +20,9 @@ */ #include <ProtocolVersion.h> #include <sstream> -#include "AMQP_HighestVersion.h" using namespace qpid::framing; -ProtocolVersion::ProtocolVersion() { - *this = highestProtocolVersion; -} - -ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) : - major_(_major), - minor_(_minor) -{} - -ProtocolVersion::ProtocolVersion(const ProtocolVersion::ProtocolVersion& p): - major_(p.major_), - minor_(p.minor_) -{} - -ProtocolVersion::~ProtocolVersion() -{} - -bool ProtocolVersion::equals(u_int8_t _major, u_int8_t _minor) const -{ - return major_ == _major && minor_ == _minor; -} - -bool ProtocolVersion::equals(const ProtocolVersion::ProtocolVersion& p) const -{ - return major_ == p.major_ && minor_ == p.minor_; -} - const std::string ProtocolVersion::toString() const { std::stringstream ss; @@ -58,10 +30,15 @@ const std::string ProtocolVersion::toString() const return ss.str(); } -ProtocolVersion::ProtocolVersion ProtocolVersion::operator=(const ProtocolVersion& p) +ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p) { major_ = p.major_; minor_ = p.minor_; return *this; } +bool ProtocolVersion::operator==(ProtocolVersion p) const +{ + return major_ == p.major_ && minor_ == p.minor_; +} + diff --git a/cpp/lib/common/framing/ProtocolVersion.h b/cpp/lib/common/framing/ProtocolVersion.h index 6aba87c6f5..aa526aa293 100644 --- a/cpp/lib/common/framing/ProtocolVersion.h +++ b/cpp/lib/common/framing/ProtocolVersion.h @@ -35,19 +35,19 @@ private: u_int8_t minor_; public: - ProtocolVersion(); - ProtocolVersion(u_int8_t _major, u_int8_t _minor); - ProtocolVersion(const ProtocolVersion& p); - virtual ~ProtocolVersion(); - - inline u_int8_t getMajor() const { return major_; } - inline void setMajor(u_int8_t major) { major_ = major; } - inline u_int8_t getMinor() const { return minor_; } - inline void setMinor(u_int8_t minor) { minor_ = minor; } - virtual bool equals(u_int8_t _major, u_int8_t _minor) const; - virtual bool equals(const ProtocolVersion& p) const; - virtual const std::string toString() const; - ProtocolVersion operator=(const ProtocolVersion& p); + ProtocolVersion(u_int8_t _major=0, u_int8_t _minor=0) + : major_(_major), minor_(_minor) {} + + u_int8_t getMajor() const { return major_; } + void setMajor(u_int8_t major) { major_ = major; } + u_int8_t getMinor() const { return minor_; } + void setMinor(u_int8_t minor) { minor_ = minor; } + const std::string toString() const; + + ProtocolVersion& operator=(ProtocolVersion p); + + bool operator==(ProtocolVersion p) const; + bool operator!=(ProtocolVersion p) const { return ! (*this == p); } }; } // namespace framing diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h index aff0cd91d6..8e2de8b843 100644 --- a/cpp/lib/common/framing/ProtocolVersionException.h +++ b/cpp/lib/common/framing/ProtocolVersionException.h @@ -40,7 +40,7 @@ public: template <class T> ProtocolVersionException( - const ProtocolVersion& ver, const T& msg) throw () : versionFound(ver) + ProtocolVersion ver, const T& msg) throw () : versionFound(ver) { init(boost::lexical_cast<std::string>(msg)); } template <class T> diff --git a/cpp/lib/common/framing/Proxy.cpp b/cpp/lib/common/framing/Proxy.cpp new file mode 100644 index 0000000000..0b2a882a49 --- /dev/null +++ b/cpp/lib/common/framing/Proxy.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Proxy.h" +#include "ChannelAdapter.h" +#include "ProtocolVersion.h" + +namespace qpid { +namespace framing { + +Proxy::~Proxy() {} + +ProtocolVersion Proxy::getProtocolVersion() const { + return channel.getVersion(); +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/Proxy.h b/cpp/lib/common/framing/Proxy.h new file mode 100644 index 0000000000..8ed46ed748 --- /dev/null +++ b/cpp/lib/common/framing/Proxy.h @@ -0,0 +1,51 @@ +#ifndef _framing_Proxy_h +#define _framing_Proxy_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ProtocolVersion.h" + +namespace qpid { +namespace framing { + +class ChannelAdapter; +class FieldTable; +class Content; + +/** + * Base class for proxies. + */ +class Proxy +{ + + public: + Proxy(ChannelAdapter& ch) : channel(ch) {} + virtual ~Proxy(); + + ProtocolVersion getProtocolVersion() const; + + protected: + ChannelAdapter& channel; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Proxy_h*/ diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h index 777d9e7bc5..2f56cb877e 100644 --- a/cpp/lib/common/framing/amqp_types.h +++ b/cpp/lib/common/framing/amqp_types.h @@ -20,6 +20,12 @@ * under the License. * */ + +/** \file + * Type definitions and forward declarations of all types used to + * in AMQP messages. + */ + #include <string> #ifdef _WINDOWS #include "windows.h" @@ -44,5 +50,8 @@ typedef u_int16_t ClassId; typedef u_int16_t MethodId; typedef u_int16_t ReplyCode; +// Types represented by classes. +class Content; +class FieldTable; }} // namespace qpid::framing #endif diff --git a/cpp/lib/common/framing/amqp_types_full.h b/cpp/lib/common/framing/amqp_types_full.h new file mode 100644 index 0000000000..6a24a99d38 --- /dev/null +++ b/cpp/lib/common/framing/amqp_types_full.h @@ -0,0 +1,36 @@ +#ifndef _framing_amqp_types_decl_h +#define _framing_amqp_types_decl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** \file + * Type definitions and full declarations of all types used to + * in AMQP messages. + * + * Its better to include amqp_types.h in another header instead of this file + * unless the header actually needs the full declarations. Including + * full declarations when forward declarations would do increases compile + * times. + */ + +#include "amqp_types.h" +#include "FramingContent.h" +#include "FieldTable.h" + +#endif /*!_framing_amqp_types_decl_h*/ diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index 2f49a28b83..95f1a9b90a 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -74,7 +74,7 @@ class MessageTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize()); MockChannel channel(1); - // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion& + // FIXME aconway 2007-02-02: deliver should take ProtocolVersion msg->deliver(channel, "ignore", 0, 100); CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2]->getBody())); diff --git a/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/gentools/src/org/apache/qpid/gentools/CppGenerator.java index 8b6a7eb49c..b46d6e8cfc 100644 --- a/gentools/src/org/apache/qpid/gentools/CppGenerator.java +++ b/gentools/src/org/apache/qpid/gentools/CppGenerator.java @@ -408,7 +408,7 @@ public class CppGenerator extends Generator } else if (token.compareTo("${cph_inner_class_defn}") == 0) { - codeSnippet = generateProxyInnerClassDefinitions(model, false, 4, 4); + codeSnippet = generateProxyInnerClassDeclarations(model, false, 4, 4); } else if (token.compareTo("${cpc_constructor_initializer}") == 0) { @@ -442,7 +442,7 @@ public class CppGenerator extends Generator } else if (token.compareTo("${sph_inner_class_defn}") == 0) { - codeSnippet = generateProxyInnerClassDefinitions(model, true, 4, 4); + codeSnippet = generateProxyInnerClassDeclarations(model, true, 4, 4); } else if (token.compareTo("${spc_constructor_initializer}") == 0) { @@ -707,7 +707,6 @@ public class CppGenerator extends Generator throws AmqpTypeMappingException { - String proxyClassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; String indent = Utils.createSpaces(indentSize); String tab = Utils.createSpaces(tabSize); StringBuffer sb = new StringBuffer(); @@ -726,16 +725,9 @@ public class CppGenerator extends Generator else sb.append(cr); sb.append(indent + "{" + cr); - sb.append(indent + "private:" + cr); - sb.append(indent + tab + proxyClassName+ "* parent;" + cr); - sb.append(cr); sb.append(indent + tab + "// Constructors and destructors" + cr); - sb.append(cr); - sb.append(indent + "protected:" + cr); - sb.append(indent + tab + handlerClassName + "() {}" + cr); sb.append(indent + "public:" + cr); - sb.append(indent + tab + handlerClassName + - "(" + proxyClassName + "* _parent) {parent = _parent;}" + cr); + sb.append(indent + tab + handlerClassName + "(){};\n"); sb.append(indent + tab + "virtual ~" + handlerClassName + "() {}" + cr); sb.append(cr); sb.append(indent + tab + "// Protocol methods" + cr); @@ -758,6 +750,7 @@ public class CppGenerator extends Generator for (String thisMethodName : thisClass.methodMap.keySet()) { AmqpMethod method = thisClass.methodMap.get(thisMethodName); + String returnType = (abstractMethodFlag || method.isResponse(null))? "void" : "RequestId"; boolean clientChassisFlag = method.clientMethodFlagMap.isSet(); boolean serverChassisFlag = method.serverMethodFlagMap.isSet(); if ((serverFlag && serverChassisFlag) || (!serverFlag && clientChassisFlag)) @@ -768,10 +761,16 @@ public class CppGenerator extends Generator for (AmqpOrdinalFieldMap thisFieldMap : overloadededParameterMap.keySet()) { AmqpVersionSet versionSet = overloadededParameterMap.get(thisFieldMap); - if (!first) - sb.append(cr); - sb.append(indent + "virtual void " + methodName + "(const MethodContext& context"); - sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), true, true, true)); + if (!first) sb.append(cr); + sb.append(indent + "virtual "+returnType+" "+ methodName + "("); + if (abstractMethodFlag) sb.append("const MethodContext& context"); + boolean leadingComma = abstractMethodFlag; + int paramIndent = indentSize + (5*tabSize); + sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true)); + if (!abstractMethodFlag && method.isResponse(null)) { + if (!thisFieldMap.isEmpty()) sb.append(", \n"+Utils.createSpaces(paramIndent)); + sb.append(" RequestId responseTo"); + } sb.append(" )"); if (abstractMethodFlag) sb.append(" = 0"); @@ -818,18 +817,23 @@ public class CppGenerator extends Generator return sb.toString(); } + public String proxyInstanceName(AmqpClass inner, String outer) { + return parseForReservedWords(Utils.firstLower(inner.name), outer) + + "Proxy"; + } + protected String generateProxyInnerClassInstances(AmqpModel model, boolean serverFlag, int indentSize) { String indent = Utils.createSpaces(indentSize); StringBuffer sb = new StringBuffer(); - String outerClassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; + String outerClassName = proxyOuterClassName(serverFlag); for (String thisClassName : model.classMap.keySet()) { AmqpClass thisClass = model.classMap.get(thisClassName); - String instanceName = parseForReservedWords(Utils.firstLower(thisClass.name), outerClassName); String className = parseForReservedWords(thisClass.name, null); - sb.append(indent + className + " " + instanceName + ";"); + sb.append(indent + className + " " + + proxyInstanceName(thisClass, outerClassName) + ";"); if (thisClass.versionSet.size() != globalVersionSet.size()) sb.append(" // AMQP Version(s) " + thisClass.versionSet + cr); else @@ -843,11 +847,10 @@ public class CppGenerator extends Generator { String indent = Utils.createSpaces(indentSize); StringBuffer sb = new StringBuffer(); - String outerClassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; for (String thisClassName : model.classMap.keySet()) { AmqpClass thisClass = model.classMap.get(thisClassName); - String className = parseForReservedWords(thisClass.name, outerClassName); + String className = parseForReservedWords(thisClass.name, proxyOuterClassName(serverFlag)); sb.append(indent + className + "& get" + className + "();"); if (thisClass.versionSet.size() != globalVersionSet.size()) sb.append(" // AMQP Version(s) " + thisClass.versionSet + cr); @@ -856,12 +859,15 @@ public class CppGenerator extends Generator } return sb.toString(); } + + private String proxyOuterClassName(boolean serverFlag) { + return "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; + } - protected String generateProxyInnerClassDefinitions(AmqpModel model, boolean serverFlag, + protected String generateProxyInnerClassDeclarations(AmqpModel model, boolean serverFlag, int indentSize, int tabSize) throws AmqpTypeMappingException { - String proxyClassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; String indent = Utils.createSpaces(indentSize); String tab = Utils.createSpaces(tabSize); StringBuffer sb = new StringBuffer(); @@ -870,29 +876,27 @@ public class CppGenerator extends Generator { AmqpClass thisClass = model.classMap.get(thisClassName); String className = thisClass.name; - String superclassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Operations::" + - thisClass.name + "Handler"; if (!first) sb.append(cr); sb.append(indent + "// ==================== class " + className + " ====================" + cr); - sb.append(indent + "class " + className + " : virtual public " + superclassName); + sb.append(indent + "class " + className); if (thisClass.versionSet.size() != globalVersionSet.size()) sb.append(" // AMQP Version(s) " + thisClass.versionSet + cr); else sb.append(cr); sb.append(indent + "{" + cr); sb.append(indent + "private:" + cr); - sb.append(indent + tab + "OutputHandler* out;" + cr); - sb.append(indent + tab + proxyClassName + "* parent;" + cr); + sb.append(indent + tab + "ChannelAdapter& channel;" + cr); sb.append(cr); sb.append(indent + "public:" + cr); sb.append(indent + tab + "// Constructors and destructors" + cr); sb.append(cr); - sb.append(indent + tab + className + "(OutputHandler* out, " + proxyClassName + "* _parent) : " + cr); - sb.append(indent + tab + tab + "out(out) {parent = _parent;}" + cr); + sb.append(indent + tab + className + "(ChannelAdapter& ch) : " + cr); + sb.append(indent + tab + tab + "channel(ch) {}" + cr); sb.append(indent + tab + "virtual ~" + className + "() {}" + cr); sb.append(cr); + sb.append(indent + tab + "static "+className+"& get(" + proxyOuterClassName(serverFlag)+"& proxy) { return proxy.get"+className+"();}\n\n"); sb.append(indent + tab + "// Protocol methods" + cr); sb.append(cr); sb.append(generateInnerClassMethods(thisClass, serverFlag, false, indentSize + tabSize, tabSize)); @@ -905,22 +909,17 @@ public class CppGenerator extends Generator protected String generateProxyConstructorInitializers(AmqpModel model, boolean serverFlag, int indentSize) { - String outerClassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; - String superclassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Operations"; String indent = Utils.createSpaces(indentSize); - StringBuffer sb = new StringBuffer(indent + superclassName + "(major, minor)," + cr); - sb.append(indent + "version(major, minor)," + cr); - sb.append(indent + "out(out)"); + StringBuffer sb = new StringBuffer(); Iterator<String> cItr = model.classMap.keySet().iterator(); while (cItr.hasNext()) { AmqpClass thisClass = model.classMap.get(cItr.next()); - String instanceName = parseForReservedWords(Utils.firstLower(thisClass.name), outerClassName); - sb.append("," + cr); - sb.append(indent + instanceName + "(out, this)"); - if (!cItr.hasNext()) - sb.append(cr); + sb.append(",\n"); + sb.append(indent + proxyInstanceName(thisClass, proxyOuterClassName(serverFlag)) + + "(channel)"); } + sb.append("\n"); return sb.toString(); } @@ -931,13 +930,12 @@ public class CppGenerator extends Generator String indent = Utils.createSpaces(indentSize); String tab = Utils.createSpaces(tabSize); StringBuffer sb = new StringBuffer(); - String outerClassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; + String outerClassName = proxyOuterClassName(serverFlag); Iterator<String> cItr = model.classMap.keySet().iterator(); while (cItr.hasNext()) { AmqpClass thisClass = model.classMap.get(cItr.next()); String className = thisClass.name; - String instanceName = parseForReservedWords(Utils.firstLower(thisClass.name), outerClassName); sb.append(indent + outerClassName + "::" + className + "& " + outerClassName + "::get" + className + "()" + cr); sb.append(indent + "{" + cr); @@ -946,7 +944,8 @@ public class CppGenerator extends Generator sb.append(indent + tab + "if (!" + generateVersionCheck(thisClass.versionSet) + ")" + cr); sb.append(indent + tab + tab + "throw new ProtocolVersionException();" + cr); } - sb.append(indent + tab + "return " + instanceName + ";" + cr); + sb.append(indent + tab + "return " + + proxyInstanceName(thisClass, outerClassName) + ";" + cr); sb.append(indent + "}" + cr); if (cItr.hasNext()) sb.append(cr); @@ -981,7 +980,7 @@ public class CppGenerator extends Generator { String indent = Utils.createSpaces(indentSize); StringBuffer sb = new StringBuffer(); - String outerclassName = "AMQP_" + (serverFlag ? "Server" : "Client") + "Proxy"; + String outerclassName = proxyOuterClassName(serverFlag); boolean first = true; for (String thisMethodName : thisClass.methodMap.keySet()) { @@ -1000,10 +999,15 @@ public class CppGenerator extends Generator AmqpVersionSet versionSet = overloadededParameterMap.get(thisFieldMap); if (!first) sb.append(cr); - sb.append(indent + "void " + outerclassName + "::" + thisClass.name + "::" + - methodName + "(const MethodContext& context"); - sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), true, true, true)); - sb.append(" )"); + String returnType = method.isResponse(null) ? "void" : "RequestId"; + sb.append(indent + returnType + " " + outerclassName + "::" + thisClass.name + "::" + + methodName + "("); + sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true)); + if (method.isResponse(null)) { + if (!thisFieldMap.isEmpty()) sb.append(", "); + sb.append("RequestId responseTo"); + } + sb.append(")"); if (versionSet.size() != globalVersionSet.size()) sb.append(" // AMQP Version(s) " + versionSet); sb.append(cr); @@ -1029,7 +1033,7 @@ public class CppGenerator extends Generator StringBuffer sb = new StringBuffer(); if (versionConsistentFlag) { - sb.append(generateMethodBodyCall(method, fieldMap, methodBodyClassName, null, indentSize, tabSize)); + sb.append(generateProxyMethodBody(method, fieldMap, methodBodyClassName, null, indentSize, tabSize)); } else { @@ -1041,7 +1045,7 @@ public class CppGenerator extends Generator sb.append("else "); sb.append("if (" + generateVersionCheck(thisVersion) + ")" + cr); sb.append(indent + "{" + cr); - sb.append(generateMethodBodyCall(method, fieldMap, methodBodyClassName, thisVersion, + sb.append(generateProxyMethodBody(method, fieldMap, methodBodyClassName, thisVersion, indentSize + tabSize, tabSize)); sb.append(indent + "}" + cr); firstOverloadedMethodFlag = false; @@ -1058,7 +1062,7 @@ public class CppGenerator extends Generator return sb.toString(); } - protected String generateMethodBodyCall(AmqpMethod method, AmqpOrdinalFieldMap fieldMap, String methodBodyClassName, + protected String generateProxyMethodBody(AmqpMethod method, AmqpOrdinalFieldMap fieldMap, String methodBodyClassName, AmqpVersion version, int indentSize, int tabSize) throws AmqpTypeMappingException { @@ -1066,10 +1070,9 @@ public class CppGenerator extends Generator String tab = Utils.createSpaces(tabSize); String namespace = version != null ? version.namespace() + "::" : ""; StringBuffer sb = new StringBuffer(); - sb.append(indent+tab+"context.channel->send(new "); - sb.append(namespace + methodBodyClassName + "( parent->getProtocolVersion()"); - if (method.isResponse(version)) - sb.append(", context.methodBody->getRequestId()"); + sb.append(indent+tab+(method.isResponse(version) ? "" : "return ")+"channel.send(new "); + sb.append(namespace + methodBodyClassName + "( channel.getVersion()"); + if (method.isResponse(version)) sb.append(", responseTo"); sb.append(generateMethodParameterList(fieldMap, indentSize + (5*tabSize), true, false, true)); sb.append("));\n"); return sb.toString(); @@ -1429,7 +1432,7 @@ public class CppGenerator extends Generator StringBuffer sb = new StringBuffer(); if (method.fieldMap.size() > 0 || method.isResponse(version)) { - sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(const ProtocolVersion& version," + cr); + sb.append(indent + thisClass.name + Utils.firstUpper(method.name) + "Body(ProtocolVersion version," + cr); if (method.isResponse(version)) { sb.append(indent+tab+"RequestId toRequest"); if (method.fieldMap.size() >0) @@ -1609,8 +1612,9 @@ public class CppGenerator extends Generator private String setRef(String codeType) { - if (codeType.compareTo("string") == 0 || - codeType.compareTo("FieldTable") == 0) + if (codeType.equals("string") || + codeType.equals("FieldTable") || + codeType.equals("Content")) return "const " + codeType + "&"; return codeType; } diff --git a/gentools/templ.cpp/AMQP_ClientOperations.h.tmpl b/gentools/templ.cpp/AMQP_ClientOperations.h.tmpl index 996c919443..d4edfddd89 100644 --- a/gentools/templ.cpp/AMQP_ClientOperations.h.tmpl +++ b/gentools/templ.cpp/AMQP_ClientOperations.h.tmpl @@ -26,48 +26,26 @@ %{VLIST} * ${major}-${minor} */ -#include <sstream> - #ifndef _AMQP_ClientOperations_ #define _AMQP_ClientOperations_ -#include <FieldTable.h> -#include <FramingContent.h> -#include <ProtocolVersion.h> -#include <ProtocolVersionException.h> -#include "MethodContext.h" +#include "ProtocolVersion.h" namespace qpid { namespace framing { -class AMQP_ClientProxy; +class MethodContext; class AMQP_ClientOperations { -protected: - ProtocolVersion version; - AMQP_ClientOperations() {} - public: - AMQP_ClientOperations(u_int8_t major, u_int8_t minor) : version(major, minor) {} - AMQP_ClientOperations(const ProtocolVersion& version) : version(version) {} virtual ~AMQP_ClientOperations() {} - inline u_int8_t getMajor() const { return version.getMajor(); } - inline u_int8_t getMinor() const { return version.getMinor(); } - inline const ProtocolVersion& getVersion() const { return version; } - inline bool isVersion(u_int8_t _major, u_int8_t _minor) const - { - return version.equals(_major, _minor); - } - inline bool isVersion(const ProtocolVersion& _version) const - { - return version.equals(_version); - } + virtual ProtocolVersion getVersion() const = 0; + + // Include framing constant declarations + #include <AMQP_Constants.h> - // Include framing constant declarations - #include <AMQP_Constants.h> - // Inner classes %{CLIST} ${coh_inner_class} diff --git a/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl b/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl index e9ca5756e2..6a0e6eedb3 100644 --- a/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl +++ b/gentools/templ.cpp/AMQP_ClientProxy.cpp.tmpl @@ -25,30 +25,28 @@ * Supported AMQP versions: %{VLIST} * ${major}-${minor} */ - #include <sstream> - -#include <AMQP_ClientProxy.h> -#include <AMQFrame.h> +#include "AMQP_ClientProxy.h" #include "framing/ChannelAdapter.h" +#include "framing/amqp_types_full.h" %{MLIST} ${cpc_method_body_include} namespace qpid { namespace framing { -AMQP_ClientProxy::AMQP_ClientProxy(OutputHandler* out, u_int8_t major, u_int8_t minor) : -%{CLIST} ${cpc_constructor_initializer} - -{} - // Inner class instance get methods +AMQP_ClientProxy::AMQP_ClientProxy(ChannelAdapter& ch) : + Proxy(ch)%{CLIST} ${cpc_constructor_initializer} + {} + + // Inner class instance get methods %{CLIST} ${cpc_inner_class_get_method} - // Inner class implementation + // Inner class implementation %{CLIST} ${cpc_inner_class_impl} -} /* namespace framing */ -} /* namespace qpid */ +}} // namespae qpid::framing + diff --git a/gentools/templ.cpp/AMQP_ClientProxy.h.tmpl b/gentools/templ.cpp/AMQP_ClientProxy.h.tmpl index f5ad0106be..4b3932d309 100644 --- a/gentools/templ.cpp/AMQP_ClientProxy.h.tmpl +++ b/gentools/templ.cpp/AMQP_ClientProxy.h.tmpl @@ -29,45 +29,28 @@ #ifndef _AMQP_ClientProxy_ #define _AMQP_ClientProxy_ -#include <AMQP_ClientOperations.h> -#include <FieldTable.h> -#include <FramingContent.h> -#include <OutputHandler.h> +#include "framing/Proxy.h" namespace qpid { namespace framing { -class AMQP_ClientProxy : public AMQP_ClientOperations +class AMQP_ClientProxy : public Proxy { -private: - - ProtocolVersion version; - OutputHandler* out; -%{CLIST} ${cph_handler_pointer_defn} - public: - AMQP_ClientProxy(OutputHandler* out, u_int8_t major, u_int8_t minor); - const ProtocolVersion& getProtocolVersion() {return version;} - virtual ~AMQP_ClientProxy() {} - - // Get methods for handlers + AMQP_ClientProxy(ChannelAdapter& ch); -%{CLIST} ${cph_handler_pointer_get_method} - - // Inner class definitions + // Inner class definitions %{CLIST} ${cph_inner_class_defn} -private: - // Inner class instances - -%{CLIST} ${cph_inner_class_instance} - -public: - // Inner class instance get methods + // Inner class instance get methods %{CLIST} ${cph_inner_class_get_method} +private: + // Inner class instances + +%{CLIST} ${cph_inner_class_instance} }; /* class AMQP_ClientProxy */ } /* namespace framing */ diff --git a/gentools/templ.cpp/AMQP_MethodVersionMap.cpp.tmpl b/gentools/templ.cpp/AMQP_MethodVersionMap.cpp.tmpl index dc2a890c88..6fc79180b2 100644 --- a/gentools/templ.cpp/AMQP_MethodVersionMap.cpp.tmpl +++ b/gentools/templ.cpp/AMQP_MethodVersionMap.cpp.tmpl @@ -27,8 +27,8 @@ */ #include <sstream> - -#include <AMQP_MethodVersionMap.h> +#include "framing/ProtocolVersionException.h" +#include "AMQP_MethodVersionMap.h" namespace qpid { diff --git a/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl b/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl index c6db6f002a..aca065c757 100644 --- a/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl +++ b/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl @@ -29,43 +29,25 @@ #ifndef _AMQP_ServerOperations_ #define _AMQP_ServerOperations_ -#include <FieldTable.h> -#include <FramingContent.h> -#include <ProtocolVersion.h> -#include <ProtocolVersionException.h> -#include "MethodContext.h" +#include "ProtocolVersion.h" namespace qpid { namespace framing { -class AMQP_ServerProxy; -class AMQP_ClientProxy; +class MethodContext; class AMQP_ServerOperations { protected: ProtocolVersion version; - AMQP_ServerOperations() {} public: - AMQP_ServerOperations(u_int8_t major, u_int8_t minor) : version(major, minor) {} - AMQP_ServerOperations(const ProtocolVersion& version) : version(version) {} virtual ~AMQP_ServerOperations() {} - inline u_int8_t getMajor() const { return version.getMajor(); } - inline u_int8_t getMinor() const { return version.getMinor(); } - inline const ProtocolVersion& getVersion() const { return version; } - inline bool isVersion(u_int8_t _major, u_int8_t _minor) const - { - return version.equals(_major, _minor); - } - inline bool isVersion(const ProtocolVersion& _version) const - { - return version.equals(_version); - } + virtual ProtocolVersion getVersion() const = 0; - // Include framing constant declarations - #include <AMQP_Constants.h> + // Include framing constant declarations + #include "AMQP_Constants.h" // Inner classes diff --git a/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl b/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl index c5c1e17bad..5575f3b1df 100644 --- a/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl +++ b/gentools/templ.cpp/AMQP_ServerProxy.cpp.tmpl @@ -27,27 +27,25 @@ */ #include <sstream> - -#include <AMQP_ServerProxy.h> -#include <AMQFrame.h> +#include "AMQP_ServerProxy.h" #include "framing/ChannelAdapter.h" +#include "framing/amqp_types_full.h" %{MLIST} ${spc_method_body_include} namespace qpid { namespace framing { -AMQP_ServerProxy::AMQP_ServerProxy(OutputHandler* out, u_int8_t major, u_int8_t minor) : -%{CLIST} ${spc_constructor_initializer} -{} +AMQP_ServerProxy::AMQP_ServerProxy(ChannelAdapter& ch) : + Proxy(ch)%{CLIST} ${spc_constructor_initializer} + {} - // Inner class instance get methods + // Inner class instance get methods %{CLIST} ${spc_inner_class_get_method} - // Inner class implementation + // Inner class implementation %{CLIST} ${spc_inner_class_impl} -} /* namespace framing */ -} /* namespace qpid */ +}} // namespae qpid::framing diff --git a/gentools/templ.cpp/AMQP_ServerProxy.h.tmpl b/gentools/templ.cpp/AMQP_ServerProxy.h.tmpl index 3206359fb6..69638e325c 100644 --- a/gentools/templ.cpp/AMQP_ServerProxy.h.tmpl +++ b/gentools/templ.cpp/AMQP_ServerProxy.h.tmpl @@ -29,44 +29,28 @@ #ifndef _AMQP_ServerProxy_ #define _AMQP_ServerProxy_ -#include <AMQP_ServerOperations.h> -#include <FieldTable.h> -#include <FramingContent.h> -#include <OutputHandler.h> +#include "framing/Proxy.h" namespace qpid { namespace framing { -class AMQP_ServerProxy : public AMQP_ServerOperations +class AMQP_ServerProxy : public Proxy { -private: - ProtocolVersion version; - OutputHandler* out; -%{CLIST} ${sph_handler_pointer_defn} - public: - AMQP_ServerProxy(OutputHandler* out, u_int8_t major, u_int8_t minor); - const ProtocolVersion& getProtocolVersion() {return version;} - virtual ~AMQP_ServerProxy() {} - - // Get methods for handlers + AMQP_ServerProxy(ChannelAdapter& ch); -%{CLIST} ${sph_handler_pointer_get_method} - - // Inner class definitions + // Inner class definitions %{CLIST} ${sph_inner_class_defn} + // Inner class instance get methods + +%{CLIST} ${sph_inner_class_get_method} + private: - // Inner class instances + // Inner class instances %{CLIST} ${sph_inner_class_instance} - -public: - // Inner class instance get methods - -%{CLIST} ${sph_inner_class_get_method} - }; /* class AMQP_ServerProxy */ } /* namespace framing */ diff --git a/gentools/templ.cpp/MethodBodyClass.h.tmpl b/gentools/templ.cpp/MethodBodyClass.h.tmpl index 91d094a440..2031f8e346 100644 --- a/gentools/templ.cpp/MethodBodyClass.h.tmpl +++ b/gentools/templ.cpp/MethodBodyClass.h.tmpl @@ -62,7 +62,7 @@ public: ${mb_constructor_with_initializers} - ${CLASS}${METHOD}Body(const ProtocolVersion& version): ${mb_base_class}(version) {} + ${CLASS}${METHOD}Body(ProtocolVersion version): ${mb_base_class}(version) {} virtual ~${CLASS}${METHOD}Body() {} // Attribute get methods |