diff options
author | Alan Conway <aconway@apache.org> | 2007-01-18 07:54:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-18 07:54:09 +0000 |
commit | cdf7469e2688f9f52487b7968664ced2db560980 (patch) | |
tree | 3cce5e41fd7792e1b8305ca0a813664cc89ee98b /cpp | |
parent | 9e8a7c77a94a92c6cf92cf60be508817f0778040 (diff) | |
download | qpid-python-cdf7469e2688f9f52487b7968664ced2db560980.tar.gz |
From: Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000
Branch for my work on Qpid.0-9
r724@fuschia: andrew | 2007-01-12 00:59:28 +0000
Added in empty implementation of handler class for protocol Message class
r768@fuschia: andrew | 2007-01-17 01:25:16 +0000
* Added Test for new MessageHandlerImpl (but no actual tests yet)
* Filled in lots of the blanks in the MessageHandlerImpl with code
stolen from the BasicHandlerImpl
r800@fuschia: andrew | 2007-01-17 17:34:13 +0000
Updated to latest upstream changes
Alan Conway <aconway@redhat.com>
* Took the changes from Andrew's patch and separated the
MessageHandlerImpl into its own .cpp/.h file. Other handlers
should be separated also.
* BrokerAdapter inner classes ignore channel arg and use
channel member instead.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 390 | ||||
-rw-r--r-- | cpp/lib/broker/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 219 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 124 | ||||
-rw-r--r-- | cpp/tests/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/tests/MessageHandlerTest.cpp | 57 | ||||
-rw-r--r-- | cpp/tests/client_test.cpp | 2 | ||||
-rwxr-xr-x | cpp/tests/start_broker | 2 |
8 files changed, 490 insertions, 309 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 2a5b136b3e..16519ec646 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -20,10 +20,14 @@ #include "Exception.h" #include "AMQMethodBody.h" #include "Exception.h" +#include "MessageHandlerImpl.h" namespace qpid { namespace broker { +// FIXME aconway 2007-01-18: Remove channel argument from signatures, +// adapter is already associated with a cahnnel. + using namespace qpid; using namespace qpid::framing; @@ -74,16 +78,16 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations public: ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void startOk(u_int16_t channel, + void startOk(u_int16_t /*channel*/, const qpid::framing::FieldTable& clientProperties, const std::string& mechanism, const std::string& response, const std::string& locale); - void secureOk(u_int16_t channel, const std::string& response); - void tuneOk(u_int16_t channel, u_int16_t channelMax, + void secureOk(u_int16_t /*channel*/, const std::string& response); + void tuneOk(u_int16_t /*channel*/, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - void open(u_int16_t channel, const std::string& virtualHost, + void open(u_int16_t /*channel*/, const std::string& virtualHost, const std::string& capabilities, bool insist); - void close(u_int16_t channel, u_int16_t replyCode, + void close(u_int16_t /*channel*/, u_int16_t replyCode, const std::string& replyText, u_int16_t classId, u_int16_t methodId); void closeOk(u_int16_t channel); @@ -92,14 +96,14 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ public: ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void open(u_int16_t channel, const std::string& outOfBand); - void flow(u_int16_t channel, bool active); - void flowOk(u_int16_t channel, bool active); + void open(u_int16_t /*channel*/, const std::string& outOfBand); + void flow(u_int16_t /*channel*/, bool active); + void flowOk(u_int16_t /*channel*/, bool active); void ok( u_int16_t channel ); void ping( u_int16_t channel ); void pong( u_int16_t channel ); - void resume( u_int16_t channel, const std::string& channelId ); - void close(u_int16_t channel, u_int16_t replyCode, const + void resume( u_int16_t /*channel*/, const std::string& channelId ); + void close(u_int16_t /*channel*/, u_int16_t replyCode, const std::string& replyText, u_int16_t classId, u_int16_t methodId); void closeOk(u_int16_t channel); }; @@ -107,14 +111,14 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ public: ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(u_int16_t channel, u_int16_t ticket, + void declare(u_int16_t /*channel*/, u_int16_t ticket, const std::string& exchange, const std::string& type, bool passive, bool durable, bool autoDelete, bool internal, bool nowait, const qpid::framing::FieldTable& arguments); - void delete_(u_int16_t channel, u_int16_t ticket, + void delete_(u_int16_t /*channel*/, u_int16_t ticket, const std::string& exchange, bool ifUnused, bool nowait); - void unbind(u_int16_t channel, + void unbind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); @@ -123,22 +127,22 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations class QueueHandlerImpl : private CoreRefs, public QueueHandler{ public: QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, + void declare(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); - void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, + void bind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(u_int16_t channel, + void unbind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, + void purge(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, bool nowait); - void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue, + void delete_(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty, bool nowait); }; @@ -146,23 +150,23 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations class BasicHandlerImpl : private CoreRefs, public BasicHandler{ public: BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void qos(u_int16_t channel, u_int32_t prefetchSize, + void qos(u_int16_t /*channel*/, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); void consume( - u_int16_t channel, u_int16_t ticket, const std::string& queue, + u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, const std::string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const qpid::framing::FieldTable& fields); - void cancel(u_int16_t channel, const std::string& consumerTag, + void cancel(u_int16_t /*channel*/, const std::string& consumerTag, bool nowait); - void publish(u_int16_t channel, u_int16_t ticket, + void publish(u_int16_t /*channel*/, u_int16_t ticket, const std::string& exchange, const std::string& routingKey, bool mandatory, bool immediate); - void get(u_int16_t channel, u_int16_t ticket, const std::string& queue, + void get(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue, bool noAck); - void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); - void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); - void recover(u_int16_t channel, bool requeue); + void ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple); + void reject(u_int16_t /*channel*/, u_int64_t deliveryTag, bool requeue); + void recover(u_int16_t /*channel*/, bool requeue); }; class TxHandlerImpl : private CoreRefs, public TxHandler{ @@ -173,90 +177,6 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations void rollback(u_int16_t channel); }; - class MessageHandlerImpl : private CoreRefs, public MessageHandler { - public: - MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - - void append( u_int16_t channel, - const std::string& reference, - const std::string& bytes ); - - void cancel( u_int16_t channel, - const std::string& destination ); - - void checkpoint( u_int16_t channel, - const std::string& reference, - const std::string& identifier ); - - void close( u_int16_t channel, - const std::string& reference ); - - void consume( u_int16_t channel, - u_int16_t ticket, - const std::string& queue, - const std::string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ); - - void empty( u_int16_t channel ); - - void get( u_int16_t channel, - u_int16_t ticket, - const std::string& queue, - const std::string& destination, - bool noAck ); - - void offset( u_int16_t channel, - u_int64_t value ); - - void ok( u_int16_t channel ); - - void open( u_int16_t channel, - const std::string& reference ); - - void qos( u_int16_t channel, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool global ); - - void recover( u_int16_t channel, - bool requeue ); - - void reject( u_int16_t channel, - u_int16_t code, - const std::string& text ); - - void resume( u_int16_t channel, - const std::string& reference, - const std::string& identifier ); - - void transfer( u_int16_t channel, - u_int16_t ticket, - const std::string& destination, - bool redelivered, - bool immediate, - u_int64_t ttl, - u_int8_t priority, - u_int64_t timestamp, - u_int8_t deliveryMode, - u_int64_t expiration, - const std::string& exchange, - const std::string& routingKey, - const std::string& messageId, - const std::string& correlationId, - const std::string& replyTo, - const std::string& contentType, - const std::string& contentEncoding, - const std::string& userId, - const std::string& appId, - const std::string& transactionId, - const std::string& securityToken, - const qpid::framing::FieldTable& applicationHeaders, - qpid::framing::Content body ); - }; - BasicHandlerImpl basicHandler; ChannelHandlerImpl channelHandler; ConnectionHandlerImpl connectionHandler; @@ -298,31 +218,31 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channe } void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( - u_int16_t channelId, const string& /*outOfBand*/){ + u_int16_t /*channel*/, const string& /*outOfBand*/){ // FIXME aconway 2007-01-17: Assertions on all channel methods, - // Drop channelId param. assertChannelNonZero(channel.getId()); if (channel.isOpen()) throw ConnectionException(504, "Channel already open"); channel.open(); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - connection.client->getChannel().openOk(channelId, std::string()/* ID */); + connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, +void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ - connection.closeChannel(channel); - connection.client->getChannel().closeOk(channel); + connection.client->getChannel().closeOk(channel.getId()); + // FIXME aconway 2007-01-18: Following line destroys this. Ugly. + connection.closeChannel(channel.getId()); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& /*arguments*/){ @@ -345,7 +265,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u } } if(!nowait){ - connection.client->getExchange().declareOk(channel); + connection.client->getExchange().declareOk(channel.getId()); } } @@ -363,27 +283,27 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind( -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(channel); + if(!nowait) connection.client->getExchange().deleteOk(channel.getId()); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, +void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = connection.getQueue(name, channel); + queue = connection.getQueue(name, channel.getId()); } else { std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - connection.getChannel(channel).setDefaultQueue(queue); + channel.setDefaultQueue(queue); //apply settings & create persistent record if required queue_created.first->create(arguments); @@ -402,15 +322,15 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_in } if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount()); } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, +void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel); + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ // kpvdr - cannot use this any longer as routingKey is now const @@ -418,25 +338,25 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16 // exchange->bind(queue, routingKey, &arguments); string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(channel); + if(!nowait) connection.client->getQueue().bindOk(channel.getId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool nowait){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel); + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(channel, count); + if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, +void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); - Queue::shared_ptr q = connection.getQueue(queue, channel); + Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); if(ifEmpty && q->getMessageCount() > 0){ throw ChannelException(406, "Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -452,28 +372,27 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_in broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(channel, count); + if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global - connection.getChannel(channel).setPrefetchSize(prefetchSize); - connection.getChannel(channel).setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(channel); + channel.setPrefetchSize(prefetchSize); + channel.setPrefetchCount(prefetchCount); + connection.client->getBasic().qosOk(channel.getId()); } void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( - u_int16_t channelId, u_int16_t /*ticket*/, + u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!consumerTag.empty() && channel.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -483,7 +402,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(channelId, newTag); + if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -494,38 +413,38 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( } -void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ - connection.getChannel(channel).cancel(consumerTag); +void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t /*channel*/, const string& consumerTag, bool nowait){ + channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag); + if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, +void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate); - connection.getChannel(channel).handlePublish(msg, exchange); + channel.handlePublish(msg, exchange); }else{ throw ChannelException( 404, "Exchange not found '" + exchangeName + "'"); } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - if(!connection.getChannel(channelId).get(queue, !noAck)){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool noAck){ + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(channelId, clusterId); + connection.client->getBasic().getEmpty(channel.getId(), clusterId); } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple){ try{ - connection.getChannel(channel).ack(deliveryTag, multiple); + channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ throw ConnectionException(530, "Received ack for unrecognised delivery tag"); } @@ -533,25 +452,25 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_ void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ - connection.getChannel(channel).recover(requeue); +void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t /*channel*/, bool requeue){ + channel.recover(requeue); } -void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t channel){ - connection.getChannel(channel).begin(); - connection.client->getTx().selectOk(channel); +void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t /*channel*/){ + channel.begin(); + connection.client->getTx().selectOk(channel.getId()); } -void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t channel){ - connection.getChannel(channel).commit(); - connection.client->getTx().commitOk(channel); +void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t /*channel*/){ + channel.commit(); + connection.client->getTx().commitOk(channel.getId()); } -void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t channel){ +void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){ - connection.getChannel(channel).rollback(); - connection.client->getTx().rollbackOk(channel); - connection.getChannel(channel).recover(false); + channel.rollback(); + connection.client->getTx().rollbackOk(channel.getId()); + channel.recover(false); } void @@ -587,152 +506,11 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) void BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*channelId*/ ) + const string& /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } -// Message class method handlers -void -BrokerAdapter::ServerOps::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::ServerOps::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - BrokerAdapter::BrokerAdapter( Channel* ch, Connection& c, Broker& b ) : diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 06d81d13e5..105abc8ad2 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -71,6 +71,8 @@ libqpidbroker_la_SOURCES = \ Connection.h \ BrokerAdapter.cpp \ BrokerAdapter.h \ + MessageHandlerImpl.cpp \ + MessageHandlerImpl.h \ TopicExchange.cpp \ TopicExchange.h \ TransactionalStore.h \ diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp new file mode 100644 index 0000000000..0c0f9e96eb --- /dev/null +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -0,0 +1,219 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "MessageHandlerImpl.h" +#include "BrokerChannel.h" +#include "Connection.h" +#include "Broker.h" +namespace qpid { +namespace broker { + +// +// Message class method handlers +// +void +MessageHandlerImpl::append( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*bytes*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + + +void +MessageHandlerImpl::cancel( u_int16_t channel, + const string& destination ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + connection.getChannel(channel).cancel(destination); + + connection.client->getMessageHandler()->ok(channel); +} + +void +MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::close( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::consume( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& queueName, + const string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Channel& channel = connection.getChannel(channel.getId()); + if(!destination.empty() && channel.exists(destination)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = destination; + channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); + + connection.client->getMessageHandler()->ok(channel.getId()); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } +} + +void +MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::get( u_int16_t /*channelId*/, + u_int16_t /*ticket*/, + const string& queueName, + const string& /*destination*/, + bool noAck ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + + // FIXME: get is probably Basic specific + if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ + + connection.client->getMessageHandler()->empty(channel.getId()); + } + +} + +void +MessageHandlerImpl::offset( u_int16_t /*channel*/, + u_int64_t /*value*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::open( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::qos( u_int16_t /*channel*/, + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool /*global*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + //TODO: handle global + channel.setPrefetchSize(prefetchSize); + channel.setPrefetchCount(prefetchCount); + + connection.client->getMessageHandler()->ok(channel.getId()); +} + +void +MessageHandlerImpl::recover( u_int16_t /*channel*/, + bool requeue ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + channel.recover(requeue); + +} + +void +MessageHandlerImpl::reject( u_int16_t /*channel*/, + u_int16_t /*code*/, + const string& /*text*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::resume( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +MessageHandlerImpl::transfer( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool immediate, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& exchangeName, + const string& routingKey, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + Exchange::shared_ptr exchange = exchangeName.empty() ? + broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); + if(exchange){ + Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); + channel.handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h new file mode 100644 index 0000000000..77e30abe05 --- /dev/null +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -0,0 +1,124 @@ +#ifndef _broker_MessageHandlerImpl_h +#define _broker_MessageHandlerImpl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQP_ServerOperations.h" + +namespace qpid { +namespace broker { + +class Channel; +class Connection; +class Broker; + +class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler { + Channel& channel; + Connection& connection; + Broker& broker; + + public: + MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b) {} + + void append( u_int16_t channel, + const std::string& reference, + const std::string& bytes ); + + void cancel( u_int16_t channel, + const std::string& destination ); + + void checkpoint( u_int16_t channel, + const std::string& reference, + const std::string& identifier ); + + void close( u_int16_t channel, + const std::string& reference ); + + void consume( u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ); + + void empty( u_int16_t channel ); + + void get( u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& destination, + bool noAck ); + + void offset( u_int16_t channel, + u_int64_t value ); + + void ok( u_int16_t channel ); + + void open( u_int16_t channel, + const std::string& reference ); + + void qos( u_int16_t channel, + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool global ); + + void recover( u_int16_t channel, + bool requeue ); + + void reject( u_int16_t channel, + u_int16_t code, + const std::string& text ); + + void resume( u_int16_t channel, + const std::string& reference, + const std::string& identifier ); + + void transfer( u_int16_t channel, + u_int16_t ticket, + const std::string& destination, + bool redelivered, + bool immediate, + u_int64_t ttl, + u_int8_t priority, + u_int64_t timestamp, + u_int8_t deliveryMode, + u_int64_t expiration, + const std::string& exchange, + const std::string& routingKey, + const std::string& messageId, + const std::string& correlationId, + const std::string& replyTo, + const std::string& contentType, + const std::string& contentEncoding, + const std::string& userId, + const std::string& appId, + const std::string& transactionId, + const std::string& securityToken, + const qpid::framing::FieldTable& applicationHeaders, + qpid::framing::Content body ); +}; + +}} // namespace qpid::broker + + + +#endif /*!_broker_MessageHandlerImpl_h*/ diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 774ecafa6e..8fb0350d2b 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -38,7 +38,8 @@ broker_tests = \ TxAckTest \ TxBufferTest \ TxPublishTest \ - ValueTest + ValueTest \ + MessageHandlerTest framing_tests = \ FieldTableTest \ diff --git a/cpp/tests/MessageHandlerTest.cpp b/cpp/tests/MessageHandlerTest.cpp new file mode 100644 index 0000000000..55971355f6 --- /dev/null +++ b/cpp/tests/MessageHandlerTest.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +//#include <iostream> +//#include <AMQP_HighestVersion.h> +#include <amqp_framing.h> +#include <qpid_test_plugin.h> + +#include <BrokerAdapter.h> + +using namespace qpid::framing; +using namespace qpid::broker; + +class MessageHandlerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(MessageHandlerTest); + CPPUNIT_TEST(testOpenMethod); + CPPUNIT_TEST_SUITE_END(); +private: + +public: + + MessageHandlerTest() + { + } + + void testOpenMethod() + { + //AMQFrame frame(highestProtocolVersion, 0, method); + //TestBodyHandler handler(method); + //handler.handleBody(frame.getBody()); + } + +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageHandlerTest); + diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp index 869cbd33e7..aaa7f4e9ca 100644 --- a/cpp/tests/client_test.cpp +++ b/cpp/tests/client_test.cpp @@ -70,7 +70,7 @@ int main(int argc, char**) Queue queue("MyQueue", true); - Connection con(argc > 1); + Connection con(verbose); string host("localhost"); con.open(host); if (verbose) std::cout << "Opened connection." << std::endl; diff --git a/cpp/tests/start_broker b/cpp/tests/start_broker index cc95083d85..05510b17ac 100755 --- a/cpp/tests/start_broker +++ b/cpp/tests/start_broker @@ -11,4 +11,4 @@ rm -rf $LOG $PID # FIXME aconway 2007-01-18: qpidd should not return till it is accepting # connections, remove arbitrary sleep. -sleep 1 +sleep 2 |