diff options
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 388 |
1 files changed, 0 insertions, 388 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp deleted file mode 100644 index 981801c40e..0000000000 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ /dev/null @@ -1,388 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/format.hpp> - -#include "BrokerAdapter.h" -#include "BrokerChannel.h" -#include "Connection.h" -#include "AMQMethodBody.h" -#include "Exception.h" - -namespace qpid { -namespace broker { - -using boost::format; -using namespace qpid; -using namespace qpid::framing; - -typedef std::vector<Queue::shared_ptr> QueueVector; - - -BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - CoreRefs(ch, c, b), - connection(c), - basicHandler(*this), - channelHandler(*this), - connectionHandler(*this), - exchangeHandler(*this), - messageHandler(*this), - queueHandler(*this), - txHandler(*this) -{} - - -ProtocolVersion BrokerAdapter::getVersion() const { - return connection.getVersion(); -} - -void BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext&, const FieldTable& /*clientProperties*/, - const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/) -{ - client.tune( - 100, connection.getFrameMax(), connection.getHeartbeat()); -} - -void BrokerAdapter::ConnectionHandlerImpl::secureOk( - const MethodContext&, const string& /*response*/){} - -void BrokerAdapter::ConnectionHandlerImpl::tuneOk( - const MethodContext&, uint16_t /*channelmax*/, - uint32_t framemax, uint16_t heartbeat) -{ - connection.setFrameMax(framemax); - connection.setHeartbeat(heartbeat); -} - -void BrokerAdapter::ConnectionHandlerImpl::open( - const MethodContext& context, const string& /*virtualHost*/, - const string& /*capabilities*/, bool /*insist*/) -{ - string knownhosts; - client.openOk( - knownhosts, context.getRequestId()); -} - -void BrokerAdapter::ConnectionHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(context.getRequestId()); - connection.getOutput().close(); -} - -void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ - connection.getOutput().close(); -} - -void BrokerAdapter::ChannelHandlerImpl::open( - const MethodContext& context, const string& /*outOfBand*/){ - channel.open(); - // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk( - std::string()/* ID */, context.getRequestId()); -} - -void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} - -void BrokerAdapter::ChannelHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, - const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(context.getRequestId()); - // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. - connection.closeChannel(channel.getId()); -} - -void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} - - - -void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ - - if(passive){ - if(!broker.getExchanges().get(exchange)) { - throw ChannelException(404, "Exchange not found: " + exchange); - } - }else{ - try{ - std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type); - if(!response.second && response.first->getType() != type){ - throw ConnectionException( - 530, - "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); - } - }catch(UnknownExchangeTypeException& e){ - throw ConnectionException( - 503, "Exchange type not implemented: " + type); - } - } - if(!nowait){ - client.declareOk(context.getRequestId()); - } -} - -void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ - - //TODO: implement unused - broker.getExchanges().destroy(exchange); - if(!nowait) client.deleteOk(context.getRequestId()); -} - -void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = connection.getQueue(name, channel.getId()); - } else { - std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare( - name, durable, - autoDelete ? connection.getTimeout() : 0, - exclusive ? &connection : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - channel.setDefaultQueue(queue); - - //apply settings & create persistent record if required - queue_created.first->create(arguments); - - //add default binding: - broker.getExchanges().getDefault()->bind(queue, name, 0); - if (exclusive) { - connection.exclusiveQueues.push_back(queue); - } else if(autoDelete){ - broker.getCleaner().add(queue); - } - } - } - if (exclusive && !queue->isExclusiveOwner(&connection)) - throw ChannelException( - 405, - format("Cannot grant exclusive access to queue '%s'") - % queue->getName()); - if (!nowait) { - string queueName = queue->getName(); - client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount(), - context.getRequestId()); - } -} - -void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ - - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); - if(exchange){ - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) client.bindOk(context.getRequestId()); - }else{ - throw ChannelException( - 404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void -BrokerAdapter::QueueHandlerImpl::unbind( - const MethodContext& context, - uint16_t /*ticket*/, - const string& queueName, - const string& exchangeName, - const string& routingKey, - const qpid::framing::FieldTable& arguments ) -{ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); - - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); - if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); - - exchange->unbind(queue, routingKey, &arguments); - - client.unbindOk(context.getRequestId()); -} - -void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - int count = queue->purge(); - if(!nowait) client.purgeOk( count, context.getRequestId()); -} - -void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&connection)){ - QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); - if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - q->destroy(); - broker.getQueues().destroy(queue); - } - - if(!nowait) - client.deleteOk(count, context.getRequestId()); -} - - - - -void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ - //TODO: handle global - channel.setPrefetchSize(prefetchSize); - channel.setPrefetchCount(prefetchCount); - client.qosOk(context.getRequestId()); -} - -void BrokerAdapter::BasicHandlerImpl::consume( - const MethodContext& context, uint16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) -{ - - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!consumerTag.empty() && channel.exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - string newTag = consumerTag; - channel.consume( - newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - - if(!nowait) client.consumeOk(newTag, context.getRequestId()); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); -} - -void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ - channel.cancel(consumerTag); - - if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); -} - -void BrokerAdapter::BasicHandlerImpl::publish( - const MethodContext& context, uint16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate) -{ - - Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - if(exchange){ - BasicMessage* msg = new BasicMessage( - &connection, exchangeName, routingKey, mandatory, immediate, - context.methodBody); - channel.handlePublish(msg); - }else{ - throw ChannelException( - 404, "Exchange not found '" + exchangeName + "'"); - } -} - -void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ - string clusterId;//not used, part of an imatix hack - - client.getEmpty(clusterId, context.getRequestId()); - } -} - -void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){ - channel.ack(deliveryTag, multiple); -} - -void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){} - -void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ - channel.recover(requeue); -} - -void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ - channel.begin(); - client.selectOk(context.getRequestId()); -} - -void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ - channel.commit(); - client.commitOk(context.getRequestId()); -} - -void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ - - channel.rollback(); - client.rollbackOk(context.getRequestId()); - channel.recover(false); -} - -void -BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) -{ - //no specific action required, generic response handling should be sufficient -} - - -// -// Message class method handlers -// -void -BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) -{ - client.ok(context.getRequestId()); - client.pong(); -} - - -void -BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) -{ - client.ok(context.getRequestId()); -} - -void -BrokerAdapter::ChannelHandlerImpl::resume( - const MethodContext&, - const string& /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -}} // namespace qpid::broker - |