summaryrefslogtreecommitdiff
path: root/cpp/src/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/broker/BrokerAdapter.cpp388
1 files changed, 388 insertions, 0 deletions
diff --git a/cpp/src/broker/BrokerAdapter.cpp b/cpp/src/broker/BrokerAdapter.cpp
new file mode 100644
index 0000000000..d04610c877
--- /dev/null
+++ b/cpp/src/broker/BrokerAdapter.cpp
@@ -0,0 +1,388 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/format.hpp>
+
+#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
+#include "Connection.h"
+#include "../framing/AMQMethodBody.h"
+#include "../Exception.h"
+
+namespace qpid {
+namespace broker {
+
+using boost::format;
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr> QueueVector;
+
+
+BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ CoreRefs(ch, c, b),
+ connection(c),
+ basicHandler(*this),
+ channelHandler(*this),
+ connectionHandler(*this),
+ exchangeHandler(*this),
+ messageHandler(*this),
+ queueHandler(*this),
+ txHandler(*this)
+{}
+
+
+ProtocolVersion BrokerAdapter::getVersion() const {
+ return connection.getVersion();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(
+ 100, connection.getFrameMax(), connection.getHeartbeat());
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(
+ const MethodContext&, const string& /*response*/){}
+
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
+ const MethodContext&, uint16_t /*channelmax*/,
+ uint32_t framemax, uint16_t heartbeat)
+{
+ connection.setFrameMax(framemax);
+ connection.setHeartbeat(heartbeat);
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
+ string knownhosts;
+ client.openOk(
+ knownhosts, context.getRequestId());
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::close(
+ const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk(context.getRequestId());
+ connection.getOutput().close();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
+ connection.getOutput().close();
+}
+
+void BrokerAdapter::ChannelHandlerImpl::open(
+ const MethodContext& context, const string& /*outOfBand*/){
+ channel.open();
+ // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
+ client.openOk(
+ std::string()/* ID */, context.getRequestId());
+}
+
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+
+void BrokerAdapter::ChannelHandlerImpl::close(
+ const MethodContext& context, uint16_t /*replyCode*/,
+ const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk(context.getRequestId());
+ // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
+ connection.closeChannel(channel.getId());
+}
+
+void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+
+
+
+void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
+
+ if(passive){
+ if(!broker.getExchanges().get(exchange)) {
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(
+ 530,
+ "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(
+ 503, "Exchange type not implemented: " + type);
+ }
+ }
+ if(!nowait){
+ client.declareOk(context.getRequestId());
+ }
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
+
+ //TODO: implement unused
+ broker.getExchanges().destroy(exchange);
+ if(!nowait) client.deleteOk(context.getRequestId());
+}
+
+void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = connection.getQueue(name, channel.getId());
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ broker.getQueues().declare(
+ name, durable,
+ autoDelete ? connection.getTimeout() : 0,
+ exclusive ? &connection : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ channel.setDefaultQueue(queue);
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ broker.getExchanges().getDefault()->bind(queue, name, 0);
+ if (exclusive) {
+ connection.exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ broker.getCleaner().add(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(&connection))
+ throw ChannelException(
+ 405,
+ format("Cannot grant exclusive access to queue '%s'")
+ % queue->getName());
+ if (!nowait) {
+ string queueName = queue->getName();
+ client.declareOk(
+ queueName, queue->getMessageCount(), queue->getConsumerCount(),
+ context.getRequestId());
+ }
+}
+
+void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ if(exchange){
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if(!nowait) client.bindOk(context.getRequestId());
+ }else{
+ throw ChannelException(
+ 404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void
+BrokerAdapter::QueueHandlerImpl::unbind(
+ const MethodContext& context,
+ uint16_t /*ticket*/,
+ const string& queueName,
+ const string& exchangeName,
+ const string& routingKey,
+ const qpid::framing::FieldTable& arguments )
+{
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+
+ Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+
+ exchange->unbind(queue, routingKey, &arguments);
+
+ client.unbindOk(context.getRequestId());
+}
+
+void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ int count = queue->purge();
+ if(!nowait) client.purgeOk( count, context.getRequestId());
+}
+
+void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(&connection)){
+ QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+ if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ q->destroy();
+ broker.getQueues().destroy(queue);
+ }
+
+ if(!nowait)
+ client.deleteOk(count, context.getRequestId());
+}
+
+
+
+
+void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ channel.setPrefetchSize(prefetchSize);
+ channel.setPrefetchCount(prefetchCount);
+ client.qosOk(context.getRequestId());
+}
+
+void BrokerAdapter::BasicHandlerImpl::consume(
+ const MethodContext& context, uint16_t /*ticket*/,
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
+{
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!consumerTag.empty() && channel.exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ string newTag = consumerTag;
+ channel.consume(
+ newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+
+ if(!nowait) client.consumeOk(newTag, context.getRequestId());
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+}
+
+void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+ channel.cancel(consumerTag);
+
+ if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
+}
+
+void BrokerAdapter::BasicHandlerImpl::publish(
+ const MethodContext& context, uint16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate)
+{
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
+ if(exchange){
+ BasicMessage* msg = new BasicMessage(
+ &connection, exchangeName, routingKey, mandatory, immediate,
+ context.methodBody);
+ channel.handlePublish(msg);
+ }else{
+ throw ChannelException(
+ 404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
+ string clusterId;//not used, part of an imatix hack
+
+ client.getEmpty(clusterId, context.getRequestId());
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){
+ channel.ack(deliveryTag, multiple);
+}
+
+void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+ channel.recover(requeue);
+}
+
+void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+ channel.begin();
+ client.selectOk(context.getRequestId());
+}
+
+void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+ channel.commit();
+ client.commitOk(context.getRequestId());
+}
+
+void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
+
+ channel.rollback();
+ client.rollbackOk(context.getRequestId());
+ channel.recover(false);
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+{
+ //no specific action required, generic response handling should be sufficient
+}
+
+
+//
+// Message class method handlers
+//
+void
+BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+{
+ client.ok(context.getRequestId());
+ client.pong();
+}
+
+
+void
+BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+{
+ client.ok(context.getRequestId());
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::resume(
+ const MethodContext&,
+ const string& /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+}} // namespace qpid::broker
+