diff options
author | Alan Conway <aconway@apache.org> | 2007-01-17 03:49:35 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-17 03:49:35 +0000 |
commit | 41ef372b003840d21339e26300faedb95e627426 (patch) | |
tree | dba5e04b56b3672ef649ec0193b8786b50c7c3d5 /cpp/lib/broker/Connection.cpp | |
parent | 0cddff7761f482fe7d59c2025f00c6103aad7c17 (diff) | |
download | qpid-python-41ef372b003840d21339e26300faedb95e627426.tar.gz |
Separated adapter code from Connection class: Extracted all
HandlerImpl classes to BrokerAdapter. The Connection is now part of the
version-invariant core, all version-dependent code is in BrokerAdapter.
The extraction exposes some ugly dependencies between adapter, Connection
and parts of the Broker. More refactoring to follow to improve encapsulation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496926 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Connection.cpp')
-rw-r--r-- | cpp/lib/broker/Connection.cpp | 590 |
1 files changed, 55 insertions, 535 deletions
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index c391ff6db5..c220fb9092 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -21,11 +21,9 @@ #include <iostream> #include <assert.h> -#include "Connection.h" - -#include "FanOutExchange.h" -#include "HeadersExchange.h" +#include "Connection.h" +// TODO aconway 2007-01-16: move to channel. #include "Requester.h" #include "Responder.h" @@ -37,50 +35,24 @@ using namespace qpid::sys; namespace qpid { namespace broker { -Connection::Connection( - SessionContext* _context, Broker& broker) : - - context(_context), - client(0), - queues(broker.getQueues()), - exchanges(broker.getExchanges()), - cleaner(broker.getCleaner()), - settings(broker.getTimeout(), broker.getStagingThreshold()), +Connection::Connection(SessionContext* context_, Broker& broker_) : + adapter(*this), requester(broker.getRequester()), responder(broker.getResponder()), - basicHandler(new BasicHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), - connectionHandler(new ConnectionHandlerImpl(this)), - exchangeHandler(new ExchangeHandlerImpl(this)), - queueHandler(new QueueHandlerImpl(this)), - txHandler(new TxHandlerImpl(this)), - messageHandler(new MessageHandlerImpl(this)), + context(context_), framemax(65536), - heartbeat(0) + heartbeat(0), + broker(broker_), + settings(broker.getTimeout(), broker.getStagingThreshold()) {} -Connection::~Connection(){ - - if (client != NULL) - delete client; - -} - -Channel* Connection::getChannel(u_int16_t channel){ - channel_iterator i = channels.find(channel); - if(i == channels.end()){ - throw ConnectionException(504, "Unknown channel: " + channel); - } - return i->second; -} - Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { - queue = getChannel(channel)->getDefaultQueue(); + queue = getChannel(channel).getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { - queue = queues.find(name); + queue = broker.getQueues().find(name); if (queue == 0) { throw ChannelException( 404, "Queue not found: " + name); } @@ -90,7 +62,7 @@ Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Exchange::shared_ptr Connection::findExchange(const string& name){ - return exchanges.get(name); + return broker.getExchanges().get(name); } void Connection::handleMethod( @@ -99,10 +71,9 @@ void Connection::handleMethod( AMQMethodBody::shared_ptr method = shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); try{ - method->invoke(*this, channel); + method->invoke(adapter, channel); }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); + closeChannel(channel); client->getChannel().close( channel, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); @@ -172,529 +143,78 @@ void Connection::Sender::send(AMQFrame* frame) { out.send(frame); } -void Connection::initiated(qpid::framing::ProtocolInitiation* header){ - - if (client == 0) - { - client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); - +void Connection::initiated(qpid::framing::ProtocolInitiation* header) { + if (client.get()) + // TODO aconway 2007-01-16: correct code. + throw ConnectionException(0, "Connection initiated twice"); - std::cout << "---------------" << this << std::endl; - - //send connection start - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); // channel, majour, minor - client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); - } + client.reset(new qpid::framing::AMQP_ClientProxy( + context, header->getMajor(), header->getMinor())); + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); + // TODO aconway 2007-01-16: Move to adapter. + client->getConnection().start( + 0, header->getMajor(), header->getMinor(), properties, + mechanisms, locales); } -void Connection::idleOut(){ - -} +void Connection::idleOut(){} -void Connection::idleIn(){ - -} +void Connection::idleIn(){} void Connection::closed(){ try { - for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ - Channel* c = i->second; - channels.erase(i); - c->close(); - delete c; - } - for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ - string name = (*i)->getName(); - queues.destroy(name); - exclusiveQueues.erase(i); + while (!exclusiveQueues.empty()) { + broker.getQueues().destroy(exclusiveQueues.front()->getName()); + exclusiveQueues.erase(exclusiveQueues.begin()); } } catch(std::exception& e) { - std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; + std::cout << "Caught unhandled exception while closing session: " << + e.what() << std::endl; + assert(0); } } +// TODO aconway 2007-01-16: colapse these. void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body); + getChannel(channel).handleHeader(body); } void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body); + getChannel(channel).handleContent(body); } void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ std::cout << "Connection::handleHeartbeat()" << std::endl; } - -void Connection::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); -} - -void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} - -void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ - parent->framemax = framemax; - parent->heartbeat = heartbeat; -} - -void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ - string knownhosts; - parent->client->getConnection().openOk(0, knownhosts); -} - -void Connection::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/) -{ - parent->client->getConnection().closeOk(0); - parent->context->close(); -} - -void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ - parent->context->close(); -} - - - -void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - - parent->channels[channel] = new Channel( - parent->client->getProtocolVersion() , parent->context, channel, - parent->framemax, parent->queues.getStore(), - parent->settings.stagingThreshold); - - // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - parent->client->getChannel().openOk(channel, std::string()/* ID */); -} - -void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} - -void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/){ - Channel* c = parent->getChannel(channel); - if(c){ - parent->channels.erase(channel); - c->close(); - delete c; - parent->client->getChannel().closeOk(channel); - } -} - -void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} - - - -void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ - - if(passive){ - if(!parent->exchanges.get(exchange)){ - throw ChannelException(404, "Exchange not found: " + exchange); - } - }else{ - try{ - std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type); - if(!response.second && response.first->getType() != type){ - throw ConnectionException(507, "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); - } - }catch(UnknownExchangeTypeException& e){ - throw ConnectionException(503, "Exchange type not implemented: " + type); - } - } - if(!nowait){ - parent->client->getExchange().declareOk(channel); - } +void Connection::openChannel(u_int16_t channel) { + if (channel == 0) + throw ConnectionException(504, "Illegal channel 0"); + if (channels.find(channel) != channels.end()) + throw ConnectionException(504, "Channel already open: " + channel); + channels.insert( + channel, + new Channel( + client->getProtocolVersion(), context, channel, framemax, + broker.getQueues().getStore(), settings.stagingThreshold)); } - -void Connection::ExchangeHandlerImpl::unbind( - u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const qpid::framing::FieldTable& /*arguments*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature +void Connection::closeChannel(u_int16_t channel) { + getChannel(channel).close(); // throws if channel does not exist. + channels.erase(channels.find(channel)); } - -void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ - - //TODO: implement unused - parent->exchanges.destroy(exchange); - if(!nowait) parent->client->getExchange().deleteOk(channel); -} - -void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = parent->getQueue(name, channel); - } else { - std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - parent->getChannel(channel)->setDefaultQueue(queue); - - //apply settings & create persistent record if required - queue_created.first->create(arguments); - - //add default binding: - parent->exchanges.getDefault()->bind(queue, name, 0); - if (exclusive) { - parent->exclusiveQueues.push_back(queue); - } else if(autoDelete){ - parent->cleaner.add(queue); - } - } - } - if (exclusive && !queue->isExclusiveOwner(parent)) { - throw ChannelException(405, "Cannot grant exclusive access to queue"); - } - if (!nowait) { - string queueName = queue->getName(); - parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); - } -} - -void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); - if(exchange){ - // kpvdr - cannot use this any longer as routingKey is now const - // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - // exchange->bind(queue, routingKey, &arguments); - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) parent->client->getQueue().bindOk(channel); - }else{ - throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - int count = queue->purge(); - if(!nowait) parent->client->getQueue().purgeOk(channel, count); -} - -void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = parent->getQueue(queue, channel); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(parent)){ - queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); - if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - q->destroy(); - parent->queues.destroy(queue); - } - - if(!nowait) parent->client->getQueue().deleteOk(channel, count); -} - - - - -void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ - //TODO: handle global - parent->getChannel(channel)->setPrefetchSize(prefetchSize); - parent->getChannel(channel)->setPrefetchCount(prefetchCount); - parent->client->getBasic().qosOk(channel); -} - -void Connection::BasicHandlerImpl::consume( - u_int16_t channelId, u_int16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) -{ - - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - Channel* channel = parent->channels[channelId]; - if(!consumerTag.empty() && channel->exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = consumerTag; - channel->consume( - newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); - - if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - -} - -void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ - parent->getChannel(channel)->cancel(consumerTag); - - if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); -} - -void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate){ - - Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); - if(exchange){ - Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg, exchange); - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); - } -} - -void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - if(!parent->getChannel(channelId)->get(queue, !noAck)){ - string clusterId;//not used, part of an imatix hack - - parent->client->getBasic().getEmpty(channelId, clusterId); - } -} - -void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ - try{ - parent->getChannel(channel)->ack(deliveryTag, multiple); - }catch(InvalidAckException& e){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - } -} - -void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} - -void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ - parent->getChannel(channel)->recover(requeue); -} - -void Connection::TxHandlerImpl::select(u_int16_t channel){ - parent->getChannel(channel)->begin(); - parent->client->getTx().selectOk(channel); -} - -void Connection::TxHandlerImpl::commit(u_int16_t channel){ - parent->getChannel(channel)->commit(); - parent->client->getTx().commitOk(channel); -} - -void Connection::TxHandlerImpl::rollback(u_int16_t channel){ - - parent->getChannel(channel)->rollback(); - parent->client->getTx().rollbackOk(channel); - parent->getChannel(channel)->recover(false); -} - -void -Connection::QueueHandlerImpl::unbind( - u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const qpid::framing::FieldTable& /*arguments*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -void -Connection::ChannelHandlerImpl::resume( - u_int16_t /*channel*/, - const string& /*channelId*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -// Message class method handlers -void -Connection::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +Channel& Connection::getChannel(u_int16_t channel){ + ChannelMap::iterator i = channels.find(channel); + if(i == channels.end()) + throw ConnectionException(504, "Unknown channel: " + channel); + return *i; } -void -Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} -void -Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - }} |