diff options
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 537 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.h | 261 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 16 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 1 | ||||
-rw-r--r-- | cpp/lib/broker/Connection.cpp | 590 | ||||
-rw-r--r-- | cpp/lib/broker/Connection.h | 324 | ||||
-rw-r--r-- | cpp/lib/broker/Makefile.am | 10 | ||||
-rw-r--r-- | cpp/lib/common/framing/amqp_types.h | 2 |
8 files changed, 914 insertions, 827 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp new file mode 100644 index 0000000000..86a7d166b0 --- /dev/null +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -0,0 +1,537 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "BrokerAdapter.h" +#include "Connection.h" +#include "Exception.h" + +namespace qpid { +namespace broker { + +using namespace qpid; +using namespace qpid::framing; + +typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + +BrokerAdapter::BrokerAdapter(Connection& c) : + connection(c), + basicHandler(c), + channelHandler(c), + connectionHandler(c), + exchangeHandler(c), + messageHandler(c), + queueHandler(c), + txHandler(c) +{} + +typedef qpid::framing::AMQP_ServerOperations Ops; + +Ops::ChannelHandler* BrokerAdapter::getChannelHandler() { + return &channelHandler; +} +Ops::ConnectionHandler* BrokerAdapter::getConnectionHandler() { + return &connectionHandler; +} +Ops::BasicHandler* BrokerAdapter::getBasicHandler() { + return &basicHandler; +} +Ops::ExchangeHandler* BrokerAdapter::getExchangeHandler() { + return &exchangeHandler; +} +Ops::QueueHandler* BrokerAdapter::getQueueHandler() { + return &queueHandler; +} +Ops::TxHandler* BrokerAdapter::getTxHandler() { + return &txHandler; +} +Ops::MessageHandler* BrokerAdapter::getMessageHandler() { + return &messageHandler; +} +Ops::AccessHandler* BrokerAdapter::getAccessHandler() { + throw ConnectionException(540, "Access class not implemented"); +} +Ops::FileHandler* BrokerAdapter::getFileHandler() { + throw ConnectionException(540, "File class not implemented"); +} +Ops::StreamHandler* BrokerAdapter::getStreamHandler() { + throw ConnectionException(540, "Stream class not implemented"); +} +Ops::DtxHandler* BrokerAdapter::getDtxHandler() { + throw ConnectionException(540, "Dtx class not implemented"); +} +Ops::TunnelHandler* BrokerAdapter::getTunnelHandler() { + throw ConnectionException(540, "Tunnel class not implemented"); +} + +void BrokerAdapter::ConnectionHandlerImpl::startOk( + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ + connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat); +} + +void BrokerAdapter::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} + +void BrokerAdapter::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ + connection.framemax = framemax; + connection.heartbeat = heartbeat; +} + +void BrokerAdapter::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ + string knownhosts; + connection.client->getConnection().openOk(0, knownhosts); +} + +void BrokerAdapter::ConnectionHandlerImpl::close( + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/) +{ + connection.client->getConnection().closeOk(0); + connection.context->close(); +} + +void BrokerAdapter::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ + connection.context->close(); +} + +void BrokerAdapter::ChannelHandlerImpl::open( + u_int16_t channel, const string& /*outOfBand*/){ + connection.openChannel(channel); + // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 + connection.client->getChannel().openOk(channel, std::string()/* ID */); +} + +void BrokerAdapter::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} + +void BrokerAdapter::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, + u_int16_t /*classId*/, u_int16_t /*methodId*/){ + connection.closeChannel(channel); + connection.client->getChannel().closeOk(channel); +} + +void BrokerAdapter::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} + + + +void BrokerAdapter::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ + + if(passive){ + if(!connection.broker.getExchanges().get(exchange)){ + throw ChannelException(404, "Exchange not found: " + exchange); + } + }else{ + try{ + std::pair<Exchange::shared_ptr, bool> response = connection.broker.getExchanges().declare(exchange, type); + if(!response.second && response.first->getType() != type){ + throw ConnectionException(507, "Exchange already declared to be of type " + + response.first->getType() + ", requested " + type); + } + }catch(UnknownExchangeTypeException& e){ + throw ConnectionException(503, "Exchange type not implemented: " + type); + } + } + if(!nowait){ + connection.client->getExchange().declareOk(channel); + } +} + + +void BrokerAdapter::ExchangeHandlerImpl::unbind( + u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const qpid::framing::FieldTable& /*arguments*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + + + +void BrokerAdapter::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ + + //TODO: implement unused + connection.broker.getExchanges().destroy(exchange); + if(!nowait) connection.client->getExchange().deleteOk(channel); +} + +void BrokerAdapter::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = connection.getQueue(name, channel); + } else { + std::pair<Queue::shared_ptr, bool> queue_created = + connection.broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + connection.getChannel(channel).setDefaultQueue(queue); + + //apply settings & create persistent record if required + queue_created.first->create(arguments); + + //add default binding: + connection.broker.getExchanges().getDefault()->bind(queue, name, 0); + if (exclusive) { + connection.exclusiveQueues.push_back(queue); + } else if(autoDelete){ + connection.broker.getCleaner().add(queue); + } + } + } + if (exclusive && !queue->isExclusiveOwner(&connection)) { + throw ChannelException(405, "Cannot grant exclusive access to queue"); + } + if (!nowait) { + string queueName = queue->getName(); + connection.client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + } +} + +void BrokerAdapter::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ + + Queue::shared_ptr queue = connection.getQueue(queueName, channel); + Exchange::shared_ptr exchange = connection.broker.getExchanges().get(exchangeName); + if(exchange){ + // kpvdr - cannot use this any longer as routingKey is now const + // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); + // exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); + if(!nowait) connection.client->getQueue().bindOk(channel); + }else{ + throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); + } +} + +void BrokerAdapter::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ + + Queue::shared_ptr queue = connection.getQueue(queueName, channel); + int count = queue->purge(); + if(!nowait) connection.client->getQueue().purgeOk(channel, count); +} + +void BrokerAdapter::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ + ChannelException error(0, ""); + int count(0); + Queue::shared_ptr q = connection.getQueue(queue, channel); + if(ifEmpty && q->getMessageCount() > 0){ + throw ChannelException(406, "Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw ChannelException(406, "Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(&connection)){ + queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); + if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); + } + count = q->getMessageCount(); + q->destroy(); + connection.broker.getQueues().destroy(queue); + } + + if(!nowait) connection.client->getQueue().deleteOk(channel, count); +} + + + + +void BrokerAdapter::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ + //TODO: handle global + connection.getChannel(channel).setPrefetchSize(prefetchSize); + connection.getChannel(channel).setPrefetchCount(prefetchCount); + connection.client->getBasic().qosOk(channel); +} + +void BrokerAdapter::BasicHandlerImpl::consume( + u_int16_t channelId, u_int16_t /*ticket*/, + const string& queueName, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait, const FieldTable& fields) +{ + + Queue::shared_ptr queue = connection.getQueue(queueName, channelId); + Channel& channel = connection.getChannel(channelId); + if(!consumerTag.empty() && channel.exists(consumerTag)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = consumerTag; + channel.consume( + newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); + + if(!nowait) connection.client->getBasic().consumeOk(channelId, newTag); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } + +} + +void BrokerAdapter::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ + connection.getChannel(channel).cancel(consumerTag); + + if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag); +} + +void BrokerAdapter::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate){ + + Exchange::shared_ptr exchange = exchangeName.empty() ? connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName); + if(exchange){ + Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate); + connection.getChannel(channel).handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + +void BrokerAdapter::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ + Queue::shared_ptr queue = connection.getQueue(queueName, channelId); + if(!connection.getChannel(channelId).get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + + connection.client->getBasic().getEmpty(channelId, clusterId); + } +} + +void BrokerAdapter::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ + try{ + connection.getChannel(channel).ack(deliveryTag, multiple); + }catch(InvalidAckException& e){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + } +} + +void BrokerAdapter::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} + +void BrokerAdapter::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ + connection.getChannel(channel).recover(requeue); +} + +void BrokerAdapter::TxHandlerImpl::select(u_int16_t channel){ + connection.getChannel(channel).begin(); + connection.client->getTx().selectOk(channel); +} + +void BrokerAdapter::TxHandlerImpl::commit(u_int16_t channel){ + connection.getChannel(channel).commit(); + connection.client->getTx().commitOk(channel); +} + +void BrokerAdapter::TxHandlerImpl::rollback(u_int16_t channel){ + + connection.getChannel(channel).rollback(); + connection.client->getTx().rollbackOk(channel); + connection.getChannel(channel).recover(false); +} + +void +BrokerAdapter::QueueHandlerImpl::unbind( + u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const qpid::framing::FieldTable& /*arguments*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +BrokerAdapter::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +BrokerAdapter::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +BrokerAdapter::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +void +BrokerAdapter::ChannelHandlerImpl::resume( + u_int16_t /*channel*/, + const string& /*channelId*/ ) +{ + assert(0); // FIXME aconway 2007-01-04: 0-9 feature +} + +// Message class method handlers +void +BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*bytes*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + + +void +BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t /*channel*/, + const string& /*destination*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::consume( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::get( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/, + u_int64_t /*value*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::qos( u_int16_t /*channel*/, + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::recover( u_int16_t /*channel*/, + bool /*requeue*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/, + u_int16_t /*code*/, + const string& /*text*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t /*channel*/, + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h new file mode 100644 index 0000000000..ec304c67d5 --- /dev/null +++ b/cpp/lib/broker/BrokerAdapter.h @@ -0,0 +1,261 @@ +#ifndef _broker_BrokerAdapter_h +#define _broker_BrokerAdapter_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "AMQP_ServerOperations.h" + +namespace qpid { +namespace broker { + +class Connection; + +/** + * Protocol adapter class for the broker. + */ +class BrokerAdapter : public qpid::framing::AMQP_ServerOperations +{ + public: + BrokerAdapter(Connection& connection); + AccessHandler* getAccessHandler(); + BasicHandler* getBasicHandler(); + ChannelHandler* getChannelHandler(); + ConnectionHandler* getConnectionHandler(); + DtxHandler* getDtxHandler(); + ExchangeHandler* getExchangeHandler(); + FileHandler* getFileHandler(); + MessageHandler* getMessageHandler(); + QueueHandler* getQueueHandler(); + StreamHandler* getStreamHandler(); + TunnelHandler* getTunnelHandler(); + TxHandler* getTxHandler(); + + private: + + class ConnectionHandlerImpl : public ConnectionHandler{ + Connection& connection; + public: + ConnectionHandlerImpl(Connection& c) : connection(c) {} + + void startOk(u_int16_t channel, + const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(u_int16_t channel, const std::string& response); + void tuneOk(u_int16_t channel, u_int16_t channelMax, + u_int32_t frameMax, u_int16_t heartbeat); + void open(u_int16_t channel, const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(u_int16_t channel, u_int16_t replyCode, + const std::string& replyText, + u_int16_t classId, u_int16_t methodId); + void closeOk(u_int16_t channel); + }; + + class ChannelHandlerImpl : public ChannelHandler{ + Connection& connection; + public: + ChannelHandlerImpl(Connection& c) : connection(c) {} + void open(u_int16_t channel, const std::string& outOfBand); + void flow(u_int16_t channel, bool active); + void flowOk(u_int16_t channel, bool active); + void ok( u_int16_t channel ); + void ping( u_int16_t channel ); + void pong( u_int16_t channel ); + void resume( u_int16_t channel, const std::string& channelId ); + void close(u_int16_t channel, u_int16_t replyCode, const + std::string& replyText, u_int16_t classId, u_int16_t methodId); + void closeOk(u_int16_t channel); + }; + + class ExchangeHandlerImpl : public ExchangeHandler{ + Connection& connection; + public: + ExchangeHandlerImpl(Connection& c) : connection(c) {} + void declare(u_int16_t channel, u_int16_t ticket, + const std::string& exchange, const std::string& type, + bool passive, bool durable, bool autoDelete, + bool internal, bool nowait, + const qpid::framing::FieldTable& arguments); + void delete_(u_int16_t channel, u_int16_t ticket, + const std::string& exchange, bool ifUnused, bool nowait); + void unbind(u_int16_t channel, + u_int16_t ticket, const std::string& queue, + const std::string& exchange, const std::string& routingKey, + const qpid::framing::FieldTable& arguments ); + }; + + class QueueHandlerImpl : public QueueHandler{ + Connection& connection; + public: + QueueHandlerImpl(Connection& c) : connection(c) {} + void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, + const qpid::framing::FieldTable& arguments); + void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue, + const std::string& exchange, const std::string& routingKey, + bool nowait, const qpid::framing::FieldTable& arguments); + void unbind(u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& exchange, + const std::string& routingKey, + const qpid::framing::FieldTable& arguments ); + void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool nowait); + void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool ifUnused, bool ifEmpty, + bool nowait); + }; + + class BasicHandlerImpl : public BasicHandler{ + Connection& connection; + public: + BasicHandlerImpl(Connection& c) : connection(c) {} + void qos(u_int16_t channel, u_int32_t prefetchSize, + u_int16_t prefetchCount, bool global); + void consume( + u_int16_t channel, u_int16_t ticket, const std::string& queue, + const std::string& consumerTag, bool noLocal, bool noAck, + bool exclusive, bool nowait, + const qpid::framing::FieldTable& fields); + void cancel(u_int16_t channel, const std::string& consumerTag, + bool nowait); + void publish(u_int16_t channel, u_int16_t ticket, + const std::string& exchange, const std::string& routingKey, + bool mandatory, bool immediate); + void get(u_int16_t channel, u_int16_t ticket, const std::string& queue, + bool noAck); + void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + void recover(u_int16_t channel, bool requeue); + }; + + class TxHandlerImpl : public TxHandler{ + Connection& connection; + public: + TxHandlerImpl(Connection& c) : connection(c) {} + void select(u_int16_t channel); + void commit(u_int16_t channel); + void rollback(u_int16_t channel); + }; + + class MessageHandlerImpl : public MessageHandler { + Connection& connection; + public: + MessageHandlerImpl(Connection& c) : connection(c) {} + + void append( u_int16_t channel, + const std::string& reference, + const std::string& bytes ); + + void cancel( u_int16_t channel, + const std::string& destination ); + + void checkpoint( u_int16_t channel, + const std::string& reference, + const std::string& identifier ); + + void close( u_int16_t channel, + const std::string& reference ); + + void consume( u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ); + + void empty( u_int16_t channel ); + + void get( u_int16_t channel, + u_int16_t ticket, + const std::string& queue, + const std::string& destination, + bool noAck ); + + void offset( u_int16_t channel, + u_int64_t value ); + + void ok( u_int16_t channel ); + + void open( u_int16_t channel, + const std::string& reference ); + + void qos( u_int16_t channel, + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool global ); + + void recover( u_int16_t channel, + bool requeue ); + + void reject( u_int16_t channel, + u_int16_t code, + const std::string& text ); + + void resume( u_int16_t channel, + const std::string& reference, + const std::string& identifier ); + + void transfer( u_int16_t channel, + u_int16_t ticket, + const std::string& destination, + bool redelivered, + bool immediate, + u_int64_t ttl, + u_int8_t priority, + u_int64_t timestamp, + u_int8_t deliveryMode, + u_int64_t expiration, + const std::string& exchange, + const std::string& routingKey, + const std::string& messageId, + const std::string& correlationId, + const std::string& replyTo, + const std::string& contentType, + const std::string& contentEncoding, + const std::string& userId, + const std::string& appId, + const std::string& transactionId, + const std::string& securityToken, + const qpid::framing::FieldTable& applicationHeaders, + qpid::framing::Content body ); + }; + + Connection& connection; + + BasicHandlerImpl basicHandler; + ChannelHandlerImpl channelHandler; + ConnectionHandlerImpl connectionHandler; + ExchangeHandlerImpl exchangeHandler; + MessageHandlerImpl messageHandler; + QueueHandlerImpl queueHandler; + TxHandlerImpl txHandler; +}; + + +}} // namespace qpid::broker + + + +#endif /*!_broker_BrokerAdapter_h*/ diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index f569872770..50b5d15b7d 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -42,12 +42,15 @@ Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out, tagGenerator("sgen"), store(_store), messageBuilder(this, _store, _stagingThreshold), - version(_version){ + version(_version), + isClosed(false) +{ outstanding.reset(); } Channel::~Channel(){ + close(); } bool Channel::exists(const string& consumerTag){ @@ -83,12 +86,13 @@ void Channel::cancel(const string& tag){ } void Channel::close(){ - //cancel all consumers - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - cancel(i); + if (!isClosed) { + isClosed = true; + while (!consumers.empty()) + cancel(consumers.begin()); + //requeue: + recover(true); } - //requeue: - recover(true); } void Channel::begin(){ diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 2d4f97fef6..8ba56f20f1 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -90,6 +90,7 @@ namespace qpid { MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to qpid::framing::ProtocolVersion version; // version used for this channel + bool isClosed; virtual void complete(Message::shared_ptr& msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index c391ff6db5..c220fb9092 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -21,11 +21,9 @@ #include <iostream> #include <assert.h> -#include "Connection.h" - -#include "FanOutExchange.h" -#include "HeadersExchange.h" +#include "Connection.h" +// TODO aconway 2007-01-16: move to channel. #include "Requester.h" #include "Responder.h" @@ -37,50 +35,24 @@ using namespace qpid::sys; namespace qpid { namespace broker { -Connection::Connection( - SessionContext* _context, Broker& broker) : - - context(_context), - client(0), - queues(broker.getQueues()), - exchanges(broker.getExchanges()), - cleaner(broker.getCleaner()), - settings(broker.getTimeout(), broker.getStagingThreshold()), +Connection::Connection(SessionContext* context_, Broker& broker_) : + adapter(*this), requester(broker.getRequester()), responder(broker.getResponder()), - basicHandler(new BasicHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), - connectionHandler(new ConnectionHandlerImpl(this)), - exchangeHandler(new ExchangeHandlerImpl(this)), - queueHandler(new QueueHandlerImpl(this)), - txHandler(new TxHandlerImpl(this)), - messageHandler(new MessageHandlerImpl(this)), + context(context_), framemax(65536), - heartbeat(0) + heartbeat(0), + broker(broker_), + settings(broker.getTimeout(), broker.getStagingThreshold()) {} -Connection::~Connection(){ - - if (client != NULL) - delete client; - -} - -Channel* Connection::getChannel(u_int16_t channel){ - channel_iterator i = channels.find(channel); - if(i == channels.end()){ - throw ConnectionException(504, "Unknown channel: " + channel); - } - return i->second; -} - Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { - queue = getChannel(channel)->getDefaultQueue(); + queue = getChannel(channel).getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { - queue = queues.find(name); + queue = broker.getQueues().find(name); if (queue == 0) { throw ChannelException( 404, "Queue not found: " + name); } @@ -90,7 +62,7 @@ Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Exchange::shared_ptr Connection::findExchange(const string& name){ - return exchanges.get(name); + return broker.getExchanges().get(name); } void Connection::handleMethod( @@ -99,10 +71,9 @@ void Connection::handleMethod( AMQMethodBody::shared_ptr method = shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); try{ - method->invoke(*this, channel); + method->invoke(adapter, channel); }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); + closeChannel(channel); client->getChannel().close( channel, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); @@ -172,529 +143,78 @@ void Connection::Sender::send(AMQFrame* frame) { out.send(frame); } -void Connection::initiated(qpid::framing::ProtocolInitiation* header){ - - if (client == 0) - { - client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); - +void Connection::initiated(qpid::framing::ProtocolInitiation* header) { + if (client.get()) + // TODO aconway 2007-01-16: correct code. + throw ConnectionException(0, "Connection initiated twice"); - std::cout << "---------------" << this << std::endl; - - //send connection start - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); // channel, majour, minor - client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); - } + client.reset(new qpid::framing::AMQP_ClientProxy( + context, header->getMajor(), header->getMinor())); + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); + // TODO aconway 2007-01-16: Move to adapter. + client->getConnection().start( + 0, header->getMajor(), header->getMinor(), properties, + mechanisms, locales); } -void Connection::idleOut(){ - -} +void Connection::idleOut(){} -void Connection::idleIn(){ - -} +void Connection::idleIn(){} void Connection::closed(){ try { - for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ - Channel* c = i->second; - channels.erase(i); - c->close(); - delete c; - } - for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ - string name = (*i)->getName(); - queues.destroy(name); - exclusiveQueues.erase(i); + while (!exclusiveQueues.empty()) { + broker.getQueues().destroy(exclusiveQueues.front()->getName()); + exclusiveQueues.erase(exclusiveQueues.begin()); } } catch(std::exception& e) { - std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; + std::cout << "Caught unhandled exception while closing session: " << + e.what() << std::endl; + assert(0); } } +// TODO aconway 2007-01-16: colapse these. void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body); + getChannel(channel).handleHeader(body); } void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body); + getChannel(channel).handleContent(body); } void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ std::cout << "Connection::handleHeartbeat()" << std::endl; } - -void Connection::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); -} - -void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} - -void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ - parent->framemax = framemax; - parent->heartbeat = heartbeat; -} - -void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ - string knownhosts; - parent->client->getConnection().openOk(0, knownhosts); -} - -void Connection::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/) -{ - parent->client->getConnection().closeOk(0); - parent->context->close(); -} - -void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ - parent->context->close(); -} - - - -void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - - parent->channels[channel] = new Channel( - parent->client->getProtocolVersion() , parent->context, channel, - parent->framemax, parent->queues.getStore(), - parent->settings.stagingThreshold); - - // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - parent->client->getChannel().openOk(channel, std::string()/* ID */); -} - -void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} - -void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/){ - Channel* c = parent->getChannel(channel); - if(c){ - parent->channels.erase(channel); - c->close(); - delete c; - parent->client->getChannel().closeOk(channel); - } -} - -void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} - - - -void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ - - if(passive){ - if(!parent->exchanges.get(exchange)){ - throw ChannelException(404, "Exchange not found: " + exchange); - } - }else{ - try{ - std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type); - if(!response.second && response.first->getType() != type){ - throw ConnectionException(507, "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); - } - }catch(UnknownExchangeTypeException& e){ - throw ConnectionException(503, "Exchange type not implemented: " + type); - } - } - if(!nowait){ - parent->client->getExchange().declareOk(channel); - } +void Connection::openChannel(u_int16_t channel) { + if (channel == 0) + throw ConnectionException(504, "Illegal channel 0"); + if (channels.find(channel) != channels.end()) + throw ConnectionException(504, "Channel already open: " + channel); + channels.insert( + channel, + new Channel( + client->getProtocolVersion(), context, channel, framemax, + broker.getQueues().getStore(), settings.stagingThreshold)); } - -void Connection::ExchangeHandlerImpl::unbind( - u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const qpid::framing::FieldTable& /*arguments*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature +void Connection::closeChannel(u_int16_t channel) { + getChannel(channel).close(); // throws if channel does not exist. + channels.erase(channels.find(channel)); } - -void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ - - //TODO: implement unused - parent->exchanges.destroy(exchange); - if(!nowait) parent->client->getExchange().deleteOk(channel); -} - -void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = parent->getQueue(name, channel); - } else { - std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - parent->getChannel(channel)->setDefaultQueue(queue); - - //apply settings & create persistent record if required - queue_created.first->create(arguments); - - //add default binding: - parent->exchanges.getDefault()->bind(queue, name, 0); - if (exclusive) { - parent->exclusiveQueues.push_back(queue); - } else if(autoDelete){ - parent->cleaner.add(queue); - } - } - } - if (exclusive && !queue->isExclusiveOwner(parent)) { - throw ChannelException(405, "Cannot grant exclusive access to queue"); - } - if (!nowait) { - string queueName = queue->getName(); - parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); - } -} - -void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); - if(exchange){ - // kpvdr - cannot use this any longer as routingKey is now const - // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - // exchange->bind(queue, routingKey, &arguments); - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) parent->client->getQueue().bindOk(channel); - }else{ - throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - int count = queue->purge(); - if(!nowait) parent->client->getQueue().purgeOk(channel, count); -} - -void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = parent->getQueue(queue, channel); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(parent)){ - queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); - if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - q->destroy(); - parent->queues.destroy(queue); - } - - if(!nowait) parent->client->getQueue().deleteOk(channel, count); -} - - - - -void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ - //TODO: handle global - parent->getChannel(channel)->setPrefetchSize(prefetchSize); - parent->getChannel(channel)->setPrefetchCount(prefetchCount); - parent->client->getBasic().qosOk(channel); -} - -void Connection::BasicHandlerImpl::consume( - u_int16_t channelId, u_int16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) -{ - - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - Channel* channel = parent->channels[channelId]; - if(!consumerTag.empty() && channel->exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = consumerTag; - channel->consume( - newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); - - if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - -} - -void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ - parent->getChannel(channel)->cancel(consumerTag); - - if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); -} - -void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate){ - - Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); - if(exchange){ - Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg, exchange); - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); - } -} - -void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - if(!parent->getChannel(channelId)->get(queue, !noAck)){ - string clusterId;//not used, part of an imatix hack - - parent->client->getBasic().getEmpty(channelId, clusterId); - } -} - -void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ - try{ - parent->getChannel(channel)->ack(deliveryTag, multiple); - }catch(InvalidAckException& e){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - } -} - -void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} - -void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ - parent->getChannel(channel)->recover(requeue); -} - -void Connection::TxHandlerImpl::select(u_int16_t channel){ - parent->getChannel(channel)->begin(); - parent->client->getTx().selectOk(channel); -} - -void Connection::TxHandlerImpl::commit(u_int16_t channel){ - parent->getChannel(channel)->commit(); - parent->client->getTx().commitOk(channel); -} - -void Connection::TxHandlerImpl::rollback(u_int16_t channel){ - - parent->getChannel(channel)->rollback(); - parent->client->getTx().rollbackOk(channel); - parent->getChannel(channel)->recover(false); -} - -void -Connection::QueueHandlerImpl::unbind( - u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const qpid::framing::FieldTable& /*arguments*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::resume( - u_int16_t /*channel*/, - const string& /*channelId*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -// Message class method handlers -void -Connection::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +Channel& Connection::getChannel(u_int16_t channel){ + ChannelMap::iterator i = channels.find(channel); + if(i == channels.end()) + throw ConnectionException(504, "Unknown channel: " + channel); + return *i; } -void -Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} -void -Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - }} diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index 2f2770b28d..1ad3353da8 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -21,10 +21,11 @@ #ifndef _Connection_ #define _Connection_ -#include <map> #include <sstream> #include <vector> +#include <boost/ptr_container/ptr_map.hpp> + #include <AMQFrame.h> #include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> @@ -32,6 +33,7 @@ #include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> #include "Broker.h" +#include "BrokerAdapter.h" #include "Exception.h" namespace qpid { @@ -46,11 +48,13 @@ class Settings { }; class Connection : public qpid::sys::ConnectionInputHandler, - public qpid::framing::AMQP_ServerOperations, - public ConnectionToken + public ConnectionToken { - typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; + typedef boost::ptr_map<u_int16_t, Channel> ChannelMap; + + // TODO aconway 2007-01-16: belongs on broker. typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + class Sender : public qpid::framing::OutputHandler { public: Sender(qpid::framing::OutputHandler&, @@ -61,35 +65,44 @@ class Connection : public qpid::sys::ConnectionInputHandler, qpid::framing::Requester& requester; qpid::framing::Responder& responder; }; - - qpid::sys::SessionContext* context; - qpid::framing::AMQP_ClientProxy* client; - QueueRegistry& queues; - ExchangeRegistry& exchanges; - AutoDelete& cleaner; - Settings settings; + + BrokerAdapter adapter; + // FIXME aconway 2007-01-16: On Channel qpid::framing::Requester& requester; qpid::framing::Responder& responder; - std::auto_ptr<BasicHandler> basicHandler; - std::auto_ptr<ChannelHandler> channelHandler; - std::auto_ptr<ConnectionHandler> connectionHandler; - std::auto_ptr<ExchangeHandler> exchangeHandler; - std::auto_ptr<QueueHandler> queueHandler; - std::auto_ptr<TxHandler> txHandler; - std::auto_ptr<MessageHandler> messageHandler; - - std::map<u_int16_t, Channel*> channels; - std::vector<Queue::shared_ptr> exclusiveQueues; + ChannelMap channels; + + void handleHeader(u_int16_t channel, + qpid::framing::AMQHeaderBody::shared_ptr body); + void handleContent(u_int16_t channel, + qpid::framing::AMQContentBody::shared_ptr body); + void handleMethod(u_int16_t channel, + qpid::framing::AMQBody::shared_ptr body); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + // FIXME aconway 2007-01-16: on broker. + Exchange::shared_ptr findExchange(const string& name); + + public: + Connection(qpid::sys::SessionContext* context, Broker& broker); + // ConnectionInputHandler methods + void received(qpid::framing::AMQFrame* frame); + void initiated(qpid::framing::ProtocolInitiation* header); + void idleOut(); + void idleIn(); + void closed(); + + // FIXME aconway 2007-01-16: encapsulate. + qpid::sys::SessionContext* context; u_int32_t framemax; u_int16_t heartbeat; + Broker& broker; + std::auto_ptr<qpid::framing::AMQP_ClientProxy> client; + Settings settings; + // FIXME aconway 2007-01-16: Belongs on broker? + std::vector<Queue::shared_ptr> exclusiveQueues; - void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); - void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); - void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body); - void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - Channel* getChannel(u_int16_t channel); + // FIXME aconway 2007-01-16: move to broker. /** * Get named queue, never returns 0. * @return: named queue or default queue for channel if name="" @@ -98,261 +111,10 @@ class Connection : public qpid::sys::ConnectionInputHandler, */ Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - Exchange::shared_ptr findExchange(const string& name); - - public: - Connection(qpid::sys::SessionContext* context, Broker& broker); - virtual void received(qpid::framing::AMQFrame* frame); - virtual void initiated(qpid::framing::ProtocolInitiation* header); - virtual void idleOut(); - virtual void idleIn(); - virtual void closed(); - virtual ~Connection(); - - class ConnectionHandlerImpl : public ConnectionHandler{ - Connection* parent; - public: - inline ConnectionHandlerImpl(Connection* _parent) : parent(_parent) {} - - virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, - const string& response, const string& locale); - - virtual void secureOk(u_int16_t channel, const string& response); - - virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - - virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); - - virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, - u_int16_t methodId); - - virtual void closeOk(u_int16_t channel); - - virtual ~ConnectionHandlerImpl(){} - }; - - class ChannelHandlerImpl : public ChannelHandler{ - Connection* parent; - public: - inline ChannelHandlerImpl(Connection* _parent) : parent(_parent) {} - - virtual void open(u_int16_t channel, const string& outOfBand); - - virtual void flow(u_int16_t channel, bool active); - - virtual void flowOk(u_int16_t channel, bool active); - - virtual void ok( u_int16_t channel ); - - virtual void ping( u_int16_t channel ); - - virtual void pong( u_int16_t channel ); - - virtual void resume( u_int16_t channel, - const string& channelId ); - - virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, - u_int16_t classId, u_int16_t methodId); - - virtual void closeOk(u_int16_t channel); - - virtual ~ChannelHandlerImpl(){} - }; - - class ExchangeHandlerImpl : public ExchangeHandler{ - Connection* parent; - public: - inline ExchangeHandlerImpl(Connection* _parent) : parent(_parent) {} - - virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, - bool passive, bool durable, bool autoDelete, bool internal, bool nowait, - const qpid::framing::FieldTable& arguments); - - virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); - - virtual void unbind(u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& exchange, - const string& routingKey, - const qpid::framing::FieldTable& arguments ); - - virtual ~ExchangeHandlerImpl(){} - }; - - - class QueueHandlerImpl : public QueueHandler{ - Connection* parent; - public: - inline QueueHandlerImpl(Connection* _parent) : parent(_parent) {} - - virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); - - virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, - const string& exchange, const string& routingKey, bool nowait, - const qpid::framing::FieldTable& arguments); - - virtual void unbind(u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& exchange, - const string& routingKey, - const qpid::framing::FieldTable& arguments ); - - virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, - bool nowait); - - virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, - bool nowait); - - virtual ~QueueHandlerImpl(){} - }; - - class BasicHandlerImpl : public BasicHandler{ - Connection* parent; - public: - inline BasicHandlerImpl(Connection* _parent) : parent(_parent) {} - - virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); - - virtual void consume( - u_int16_t channel, u_int16_t ticket, const string& queue, - const string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - - virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); - - virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, - bool mandatory, bool immediate); - - virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); - - virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); - - virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); - - virtual void recover(u_int16_t channel, bool requeue); - - virtual ~BasicHandlerImpl(){} - }; - - class TxHandlerImpl : public TxHandler{ - Connection* parent; - public: - TxHandlerImpl(Connection* _parent) : parent(_parent) {} - virtual ~TxHandlerImpl() {} - virtual void select(u_int16_t channel); - virtual void commit(u_int16_t channel); - virtual void rollback(u_int16_t channel); - }; - - class MessageHandlerImpl : public MessageHandler { - Connection* parent; - - // Constructors and destructors - - public: - MessageHandlerImpl() {} - MessageHandlerImpl(Connection* _parent) : parent(_parent) {} - virtual ~MessageHandlerImpl() {} - - // Protocol methods - virtual void append( u_int16_t channel, - const string& reference, - const string& bytes ); - - virtual void cancel( u_int16_t channel, - const string& destination ); - - virtual void checkpoint( u_int16_t channel, - const string& reference, - const string& identifier ); - - virtual void close( u_int16_t channel, - const string& reference ); - - virtual void consume( u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ); - - virtual void empty( u_int16_t channel ); - - virtual void get( u_int16_t channel, - u_int16_t ticket, - const string& queue, - const string& destination, - bool noAck ); - - virtual void offset( u_int16_t channel, - u_int64_t value ); - - virtual void ok( u_int16_t channel ); - - virtual void open( u_int16_t channel, - const string& reference ); - - virtual void qos( u_int16_t channel, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool global ); - - virtual void recover( u_int16_t channel, - bool requeue ); - - virtual void reject( u_int16_t channel, - u_int16_t code, - const string& text ); - - virtual void resume( u_int16_t channel, - const string& reference, - const string& identifier ); - - virtual void transfer( u_int16_t channel, - u_int16_t ticket, - const string& destination, - bool redelivered, - bool immediate, - u_int64_t ttl, - u_int8_t priority, - u_int64_t timestamp, - u_int8_t deliveryMode, - u_int64_t expiration, - const string& exchange, - const string& routingKey, - const string& messageId, - const string& correlationId, - const string& replyTo, - const string& contentType, - const string& contentEncoding, - const string& userId, - const string& appId, - const string& transactionId, - const string& securityToken, - const qpid::framing::FieldTable& applicationHeaders, - qpid::framing::Content body ); - }; - virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } - virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } - virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } - virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } - virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } - virtual TxHandler* getTxHandler(){ return txHandler.get(); } - virtual MessageHandler* getMessageHandler(){ return messageHandler.get(); } - - virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); } - virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); } - virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); } - virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); } - virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } + void openChannel(u_int16_t channel); + void closeChannel(u_int16_t channel); + Channel& getChannel(u_int16_t channel); }; }} diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index b1a0c1af78..06d81d13e5 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -65,10 +65,12 @@ libqpidbroker_la_SOURCES = \ QueueRegistry.h \ RecoveryManager.cpp \ RecoveryManager.h \ - ConnectionFactory.cpp \ - ConnectionFactory.h \ - Connection.cpp \ - Connection.h \ + ConnectionFactory.cpp \ + ConnectionFactory.h \ + Connection.cpp \ + Connection.h \ + BrokerAdapter.cpp \ + BrokerAdapter.h \ TopicExchange.cpp \ TopicExchange.h \ TransactionalStore.h \ diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h index e1e3821584..4fac13e93b 100644 --- a/cpp/lib/common/framing/amqp_types.h +++ b/cpp/lib/common/framing/amqp_types.h @@ -36,7 +36,7 @@ namespace qpid { namespace framing { using std::string; - +typedef u_int16_t ChannelId; typedef u_int64_t RequestId; typedef u_int64_t ResponseId; typedef u_int32_t BatchOffset; |