/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include "Connection.h" #include "FanOutExchange.h" #include "HeadersExchange.h" #include "Requester.h" #include "Responder.h" using namespace boost; using namespace qpid::sys; using namespace qpid::framing; using namespace qpid::sys; namespace qpid { namespace broker { Connection::Connection( SessionContext* _context, Broker& broker) : context(_context), client(0), queues(broker.getQueues()), exchanges(broker.getExchanges()), cleaner(broker.getCleaner()), settings(broker.getTimeout(), broker.getStagingThreshold()), requester(broker.getRequester()), responder(broker.getResponder()), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), txHandler(new TxHandlerImpl(this)), messageHandler(new MessageHandlerImpl(this)), framemax(65536), heartbeat(0) {} Connection::~Connection(){ if (client != NULL) delete client; } Channel* Connection::getChannel(u_int16_t channel){ channel_iterator i = channels.find(channel); if(i == channels.end()){ throw ConnectionException(504, "Unknown channel: " + channel); } return i->second; } Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { queue = getChannel(channel)->getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { queue = queues.find(name); if (queue == 0) { throw ChannelException( 404, "Queue not found: " + name); } } return queue; } Exchange::shared_ptr Connection::findExchange(const string& name){ return exchanges.get(name); } void Connection::handleMethod( u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) { AMQMethodBody::shared_ptr method = shared_polymorphic_cast(body); try{ method->invoke(*this, channel); }catch(ChannelException& e){ channels[channel]->close(); channels.erase(channel); client->getChannel().close( channel, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ client->getConnection().close( 0, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ client->getConnection().close( 0, 541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } void Connection::received(qpid::framing::AMQFrame* frame){ u_int16_t channel = frame->getChannel(); AMQBody::shared_ptr body = frame->getBody(); switch(body->type()) { case REQUEST_BODY: responder.received(AMQRequestBody::getData(body)); handleMethod(channel, body); break; case RESPONSE_BODY: // Must process responses before marking them received. handleMethod(channel, body); requester.processed(AMQResponseBody::getData(body)); break; // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. case METHOD_BODY: handleMethod(channel, body); break; case HEADER_BODY: handleHeader( channel, shared_polymorphic_cast(body)); break; case CONTENT_BODY: handleContent( channel, shared_polymorphic_cast(body)); break; case HEARTBEAT_BODY: assert(channel == 0); handleHeartbeat( shared_polymorphic_cast(body)); break; } } /** * An OutputHandler that does request/response procssing before * delgating to another OutputHandler. */ Connection::Sender::Sender( OutputHandler& oh, Requester& req, Responder& resp) : out(oh), requester(req), responder(resp) {} void Connection::Sender::send(AMQFrame* frame) { AMQBody::shared_ptr body = frame->getBody(); u_int16_t type = body->type(); if (type == REQUEST_BODY) requester.sending(AMQRequestBody::getData(body)); else if (type == RESPONSE_BODY) responder.sending(AMQResponseBody::getData(body)); out.send(frame); } void Connection::initiated(qpid::framing::ProtocolInitiation* header){ if (client == 0) { client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); std::cout << "---------------" << this << std::endl; //send connection start FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); // channel, majour, minor client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); } } void Connection::idleOut(){ } void Connection::idleIn(){ } void Connection::closed(){ try { for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ Channel* c = i->second; channels.erase(i); c->close(); delete c; } for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ string name = (*i)->getName(); queues.destroy(name); exclusiveQueues.erase(i); } } catch(std::exception& e) { std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl; } } void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ getChannel(channel)->handleHeader(body); } void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ getChannel(channel)->handleContent(body); } void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ std::cout << "Connection::handleHeartbeat()" << std::endl; } void Connection::ConnectionHandlerImpl::startOk( u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ parent->framemax = framemax; parent->heartbeat = heartbeat; } void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; parent->client->getConnection().openOk(0, knownhosts); } void Connection::ConnectionHandlerImpl::close( u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { parent->client->getConnection().closeOk(0); parent->context->close(); } void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ parent->context->close(); } void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ parent->channels[channel] = new Channel( parent->client->getProtocolVersion() , parent->context, channel, parent->framemax, parent->queues.getStore(), parent->settings.stagingThreshold); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 parent->client->getChannel().openOk(channel, std::string()/* ID */); } void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ Channel* c = parent->getChannel(channel); if(c){ parent->channels.erase(channel); c->close(); delete c; parent->client->getChannel().closeOk(channel); } } void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& /*arguments*/){ if(passive){ if(!parent->exchanges.get(exchange)){ throw ChannelException(404, "Exchange not found: " + exchange); } }else{ try{ std::pair response = parent->exchanges.declare(exchange, type); if(!response.second && response.first->getType() != type){ throw ConnectionException(507, "Exchange already declared to be of type " + response.first->getType() + ", requested " + type); } }catch(UnknownExchangeTypeException& e){ throw ConnectionException(503, "Exchange type not implemented: " + type); } } if(!nowait){ parent->client->getExchange().declareOk(channel); } } void Connection::ExchangeHandlerImpl::unbind( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*exchange*/, const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused parent->exchanges.destroy(exchange); if(!nowait) parent->client->getExchange().deleteOk(channel); } void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = parent->getQueue(name, channel); } else { std::pair queue_created = parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue parent->getChannel(channel)->setDefaultQueue(queue); //apply settings & create persistent record if required queue_created.first->create(arguments); //add default binding: parent->exchanges.getDefault()->bind(queue, name, 0); if (exclusive) { parent->exclusiveQueues.push_back(queue); } else if(autoDelete){ parent->cleaner.add(queue); } } } if (exclusive && !queue->isExclusiveOwner(parent)) { throw ChannelException(405, "Cannot grant exclusive access to queue"); } if (!nowait) { string queueName = queue->getName(); parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); } } void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); if(exchange){ // kpvdr - cannot use this any longer as routingKey is now const // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); // exchange->bind(queue, routingKey, &arguments); string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) parent->client->getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } } void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); if(!nowait) parent->client->getQueue().purgeOk(channel, count); } void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = parent->getQueue(queue, channel); if(ifEmpty && q->getMessageCount() > 0){ throw ChannelException(406, "Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ throw ChannelException(406, "Queue in use."); }else{ //remove the queue from the list of exclusive queues if necessary if(q->isExclusiveOwner(parent)){ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); } count = q->getMessageCount(); q->destroy(); parent->queues.destroy(queue); } if(!nowait) parent->client->getQueue().deleteOk(channel, count); } void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global parent->getChannel(channel)->setPrefetchSize(prefetchSize); parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client->getBasic().qosOk(channel); } void Connection::BasicHandlerImpl::consume( u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) { Queue::shared_ptr queue = parent->getQueue(queueName, channelId); Channel* channel = parent->channels[channelId]; if(!consumerTag.empty() && channel->exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } try{ string newTag = consumerTag; channel->consume( newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields); if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); }catch(ExclusiveAccessException& e){ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); else throw ChannelException(403, "Access would violate previously granted exclusivity"); } } void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); } void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); if(exchange){ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); parent->getChannel(channel)->handlePublish(msg, exchange); }else{ throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } } void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = parent->getQueue(queueName, channelId); if(!parent->getChannel(channelId)->get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack parent->client->getBasic().getEmpty(channelId, clusterId); } } void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ try{ parent->getChannel(channel)->ack(deliveryTag, multiple); }catch(InvalidAckException& e){ throw ConnectionException(530, "Received ack for unrecognised delivery tag"); } } void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); } void Connection::TxHandlerImpl::select(u_int16_t channel){ parent->getChannel(channel)->begin(); parent->client->getTx().selectOk(channel); } void Connection::TxHandlerImpl::commit(u_int16_t channel){ parent->getChannel(channel)->commit(); parent->client->getTx().commitOk(channel); } void Connection::TxHandlerImpl::rollback(u_int16_t channel){ parent->getChannel(channel)->rollback(); parent->client->getTx().rollbackOk(channel); parent->getChannel(channel)->recover(false); } void Connection::QueueHandlerImpl::unbind( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*exchange*/, const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void Connection::ChannelHandlerImpl::resume( u_int16_t /*channel*/, const string& /*channelId*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } // Message class method handlers void Connection::MessageHandlerImpl::append( u_int16_t /*channel*/, const string& /*reference*/, const string& /*bytes*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/, const string& /*destination*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, const string& /*reference*/, const string& /*identifier*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::close( u_int16_t /*channel*/, const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*destination*/, bool /*noLocal*/, bool /*noAck*/, bool /*exclusive*/, const qpid::framing::FieldTable& /*filter*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::get( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*destination*/, bool /*noAck*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/, u_int64_t /*value*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::open( u_int16_t /*channel*/, const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/, u_int32_t /*prefetchSize*/, u_int16_t /*prefetchCount*/, bool /*global*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/, bool /*requeue*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/, u_int16_t /*code*/, const string& /*text*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/, const string& /*reference*/, const string& /*identifier*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, bool /*immediate*/, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& /*exchange*/, const string& /*routingKey*/, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, const string& /*contentType*/, const string& /*contentEncoding*/, const string& /*userId*/, const string& /*appId*/, const string& /*transactionId*/, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content /*body*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } }}