summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp388
1 files changed, 0 insertions, 388 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
deleted file mode 100644
index 981801c40e..0000000000
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <boost/format.hpp>
-
-#include "BrokerAdapter.h"
-#include "BrokerChannel.h"
-#include "Connection.h"
-#include "AMQMethodBody.h"
-#include "Exception.h"
-
-namespace qpid {
-namespace broker {
-
-using boost::format;
-using namespace qpid;
-using namespace qpid::framing;
-
-typedef std::vector<Queue::shared_ptr> QueueVector;
-
-
-BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
- CoreRefs(ch, c, b),
- connection(c),
- basicHandler(*this),
- channelHandler(*this),
- connectionHandler(*this),
- exchangeHandler(*this),
- messageHandler(*this),
- queueHandler(*this),
- txHandler(*this)
-{}
-
-
-ProtocolVersion BrokerAdapter::getVersion() const {
- return connection.getVersion();
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext&, const FieldTable& /*clientProperties*/,
- const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/)
-{
- client.tune(
- 100, connection.getFrameMax(), connection.getHeartbeat());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(
- const MethodContext&, const string& /*response*/){}
-
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
- const MethodContext&, uint16_t /*channelmax*/,
- uint32_t framemax, uint16_t heartbeat)
-{
- connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::open(
- const MethodContext& context, const string& /*virtualHost*/,
- const string& /*capabilities*/, bool /*insist*/)
-{
- string knownhosts;
- client.openOk(
- knownhosts, context.getRequestId());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk(context.getRequestId());
- connection.getOutput().close();
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.getOutput().close();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::open(
- const MethodContext& context, const string& /*outOfBand*/){
- channel.open();
- // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(
- std::string()/* ID */, context.getRequestId());
-}
-
-void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-
-void BrokerAdapter::ChannelHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/,
- const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk(context.getRequestId());
- // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
- connection.closeChannel(channel.getId());
-}
-
-void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-
-
-
-void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
-
- if(passive){
- if(!broker.getExchanges().get(exchange)) {
- throw ChannelException(404, "Exchange not found: " + exchange);
- }
- }else{
- try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
- if(!response.second && response.first->getType() != type){
- throw ConnectionException(
- 530,
- "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
- }
- }catch(UnknownExchangeTypeException& e){
- throw ConnectionException(
- 503, "Exchange type not implemented: " + type);
- }
- }
- if(!nowait){
- client.declareOk(context.getRequestId());
- }
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
-
- //TODO: implement unused
- broker.getExchanges().destroy(exchange);
- if(!nowait) client.deleteOk(context.getRequestId());
-}
-
-void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = connection.getQueue(name, channel.getId());
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(
- name, durable,
- autoDelete ? connection.getTimeout() : 0,
- exclusive ? &connection : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- channel.setDefaultQueue(queue);
-
- //apply settings & create persistent record if required
- queue_created.first->create(arguments);
-
- //add default binding:
- broker.getExchanges().getDefault()->bind(queue, name, 0);
- if (exclusive) {
- connection.exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- broker.getCleaner().add(queue);
- }
- }
- }
- if (exclusive && !queue->isExclusiveOwner(&connection))
- throw ChannelException(
- 405,
- format("Cannot grant exclusive access to queue '%s'")
- % queue->getName());
- if (!nowait) {
- string queueName = queue->getName();
- client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount(),
- context.getRequestId());
- }
-}
-
-void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
- if(exchange){
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) client.bindOk(context.getRequestId());
- }else{
- throw ChannelException(
- 404, "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void
-BrokerAdapter::QueueHandlerImpl::unbind(
- const MethodContext& context,
- uint16_t /*ticket*/,
- const string& queueName,
- const string& exchangeName,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments )
-{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
-
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
- if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
-
- exchange->unbind(queue, routingKey, &arguments);
-
- client.unbindOk(context.getRequestId());
-}
-
-void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- int count = queue->purge();
- if(!nowait) client.purgeOk( count, context.getRequestId());
-}
-
-void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
- ChannelException error(0, "");
- int count(0);
- Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
- if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&connection)){
- QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
- if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
- }
- count = q->getMessageCount();
- q->destroy();
- broker.getQueues().destroy(queue);
- }
-
- if(!nowait)
- client.deleteOk(count, context.getRequestId());
-}
-
-
-
-
-void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
- client.qosOk(context.getRequestId());
-}
-
-void BrokerAdapter::BasicHandlerImpl::consume(
- const MethodContext& context, uint16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
-{
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!consumerTag.empty() && channel.exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- string newTag = consumerTag;
- channel.consume(
- newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
-
- if(!nowait) client.consumeOk(newTag, context.getRequestId());
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
-}
-
-void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
- channel.cancel(consumerTag);
-
- if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
-}
-
-void BrokerAdapter::BasicHandlerImpl::publish(
- const MethodContext& context, uint16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate)
-{
-
- Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- if(exchange){
- BasicMessage* msg = new BasicMessage(
- &connection, exchangeName, routingKey, mandatory, immediate,
- context.methodBody);
- channel.handlePublish(msg);
- }else{
- throw ChannelException(
- 404, "Exchange not found '" + exchangeName + "'");
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
- string clusterId;//not used, part of an imatix hack
-
- client.getEmpty(clusterId, context.getRequestId());
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){
- channel.ack(deliveryTag, multiple);
-}
-
-void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
- channel.recover(requeue);
-}
-
-void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
- channel.begin();
- client.selectOk(context.getRequestId());
-}
-
-void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
- channel.commit();
- client.commitOk(context.getRequestId());
-}
-
-void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
-
- channel.rollback();
- client.rollbackOk(context.getRequestId());
- channel.recover(false);
-}
-
-void
-BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
-{
- //no specific action required, generic response handling should be sufficient
-}
-
-
-//
-// Message class method handlers
-//
-void
-BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
-{
- client.ok(context.getRequestId());
- client.pong();
-}
-
-
-void
-BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
-{
- client.ok(context.getRequestId());
-}
-
-void
-BrokerAdapter::ChannelHandlerImpl::resume(
- const MethodContext&,
- const string& /*channel*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-}} // namespace qpid::broker
-