summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-16 20:17:50 +0000
committerAlan Conway <aconway@apache.org>2007-01-16 20:17:50 +0000
commitbc84e62cc549ac2d751a45d61a867354c84c60d6 (patch)
tree160824086ea1edfd2d28f153626d378d69d0f516 /cpp/lib/broker/Connection.cpp
parent0df54842626c3cc065cad1a2595458f54253a178 (diff)
downloadqpid-python-bc84e62cc549ac2d751a45d61a867354c84c60d6.tar.gz
* Renamed Session* classes to Connection* to align with AMQP spec
- broker::SessionHandlerImpl -> broker::Connection - broker::SessionHandlerImplFactory -> broker::ConnectionFactory - sys::SessionHandler -> ConnectionInputHandler - sys::SessionHandlerFactory -> ConnectionInputHandlerFactory git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496848 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Connection.cpp')
-rw-r--r--cpp/lib/broker/Connection.cpp700
1 files changed, 700 insertions, 0 deletions
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
new file mode 100644
index 0000000000..c391ff6db5
--- /dev/null
+++ b/cpp/lib/broker/Connection.cpp
@@ -0,0 +1,700 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <assert.h>
+
+#include "Connection.h"
+
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+
+#include "Requester.h"
+#include "Responder.h"
+
+using namespace boost;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace qpid {
+namespace broker {
+
+Connection::Connection(
+ SessionContext* _context, Broker& broker) :
+
+ context(_context),
+ client(0),
+ queues(broker.getQueues()),
+ exchanges(broker.getExchanges()),
+ cleaner(broker.getCleaner()),
+ settings(broker.getTimeout(), broker.getStagingThreshold()),
+ requester(broker.getRequester()),
+ responder(broker.getResponder()),
+ basicHandler(new BasicHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
+ exchangeHandler(new ExchangeHandlerImpl(this)),
+ queueHandler(new QueueHandlerImpl(this)),
+ txHandler(new TxHandlerImpl(this)),
+ messageHandler(new MessageHandlerImpl(this)),
+ framemax(65536),
+ heartbeat(0)
+{}
+
+Connection::~Connection(){
+
+ if (client != NULL)
+ delete client;
+
+}
+
+Channel* Connection::getChannel(u_int16_t channel){
+ channel_iterator i = channels.find(channel);
+ if(i == channels.end()){
+ throw ConnectionException(504, "Unknown channel: " + channel);
+ }
+ return i->second;
+}
+
+Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getChannel(channel)->getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = queues.find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange::shared_ptr Connection::findExchange(const string& name){
+ return exchanges.get(name);
+}
+
+void Connection::handleMethod(
+ u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
+{
+ AMQMethodBody::shared_ptr method =
+ shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client->getChannel().close(
+ channel, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client->getConnection().close(
+ 0, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ client->getConnection().close(
+ 0, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void Connection::received(qpid::framing::AMQFrame* frame){
+ u_int16_t channel = frame->getChannel();
+ AMQBody::shared_ptr body = frame->getBody();
+ switch(body->type())
+ {
+ case REQUEST_BODY:
+ responder.received(AMQRequestBody::getData(body));
+ handleMethod(channel, body);
+ break;
+ case RESPONSE_BODY:
+ // Must process responses before marking them received.
+ handleMethod(channel, body);
+ requester.processed(AMQResponseBody::getData(body));
+ break;
+ // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
+ case METHOD_BODY:
+ handleMethod(channel, body);
+ break;
+ case HEADER_BODY:
+ handleHeader(
+ channel, shared_polymorphic_cast<AMQHeaderBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ handleContent(
+ channel, shared_polymorphic_cast<AMQContentBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ assert(channel == 0);
+ handleHeartbeat(
+ shared_polymorphic_cast<AMQHeartbeatBody>(body));
+ break;
+ }
+}
+
+/**
+ * An OutputHandler that does request/response procssing before
+ * delgating to another OutputHandler.
+ */
+Connection::Sender::Sender(
+ OutputHandler& oh, Requester& req, Responder& resp)
+ : out(oh), requester(req), responder(resp)
+{}
+
+void Connection::Sender::send(AMQFrame* frame) {
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int16_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ out.send(frame);
+}
+
+void Connection::initiated(qpid::framing::ProtocolInitiation* header){
+
+ if (client == 0)
+ {
+ client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
+
+
+ std::cout << "---------------" << this << std::endl;
+
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US"); // channel, majour, minor
+ client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
+ }
+}
+
+
+void Connection::idleOut(){
+
+}
+
+void Connection::idleIn(){
+
+}
+
+void Connection::closed(){
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues.destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
+ }
+}
+
+void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+ getChannel(channel)->handleHeader(body);
+}
+
+void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+ getChannel(channel)->handleContent(body);
+}
+
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ std::cout << "Connection::handleHeartbeat()" << std::endl;
+}
+
+void Connection::ConnectionHandlerImpl::startOk(
+ u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/){
+ parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+}
+
+void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+
+void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+ parent->framemax = framemax;
+ parent->heartbeat = heartbeat;
+}
+
+void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+ string knownhosts;
+ parent->client->getConnection().openOk(0, knownhosts);
+}
+
+void Connection::ConnectionHandlerImpl::close(
+ u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+ parent->client->getConnection().closeOk(0);
+ parent->context->close();
+}
+
+void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+ parent->context->close();
+}
+
+
+
+void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
+
+
+ parent->channels[channel] = new Channel(
+ parent->client->getProtocolVersion() , parent->context, channel,
+ parent->framemax, parent->queues.getStore(),
+ parent->settings.stagingThreshold);
+
+ // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
+ parent->client->getChannel().openOk(channel, std::string()/* ID */);
+}
+
+void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
+void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+
+void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/){
+ Channel* c = parent->getChannel(channel);
+ if(c){
+ parent->channels.erase(channel);
+ c->close();
+ delete c;
+ parent->client->getChannel().closeOk(channel);
+ }
+}
+
+void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+
+
+
+void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
+
+ if(passive){
+ if(!parent->exchanges.get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(507, "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " + type);
+ }
+ }
+ if(!nowait){
+ parent->client->getExchange().declareOk(channel);
+ }
+}
+
+
+void Connection::ExchangeHandlerImpl::unbind(
+ u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const qpid::framing::FieldTable& /*arguments*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+
+
+void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
+
+ //TODO: implement unused
+ parent->exchanges.destroy(exchange);
+ if(!nowait) parent->client->getExchange().deleteOk(channel);
+}
+
+void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = parent->getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ parent->getChannel(channel)->setDefaultQueue(queue);
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ parent->exchanges.getDefault()->bind(queue, name, 0);
+ if (exclusive) {
+ parent->exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ parent->cleaner.add(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(parent)) {
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ if (!nowait) {
+ string queueName = queue->getName();
+ parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ }
+}
+
+void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
+ if(exchange){
+ // kpvdr - cannot use this any longer as routingKey is now const
+ // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+ // exchange->bind(queue, routingKey, &arguments);
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if(!nowait) parent->client->getQueue().bindOk(channel);
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) parent->client->getQueue().purgeOk(channel, count);
+}
+
+void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = parent->getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(parent)){
+ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+ if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ q->destroy();
+ parent->queues.destroy(queue);
+ }
+
+ if(!nowait) parent->client->getQueue().deleteOk(channel, count);
+}
+
+
+
+
+void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ parent->getChannel(channel)->setPrefetchSize(prefetchSize);
+ parent->getChannel(channel)->setPrefetchCount(prefetchCount);
+ parent->client->getBasic().qosOk(channel);
+}
+
+void Connection::BasicHandlerImpl::consume(
+ u_int16_t channelId, u_int16_t /*ticket*/,
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
+{
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ Channel* channel = parent->channels[channelId];
+ if(!consumerTag.empty() && channel->exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = consumerTag;
+ channel->consume(
+ newTag, queue, !noAck, exclusive, noLocal ? parent : 0, &fields);
+
+ if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+ parent->getChannel(channel)->cancel(consumerTag);
+
+ if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
+}
+
+void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate){
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+
+ parent->client->getBasic().getEmpty(channelId, clusterId);
+ }
+}
+
+void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+ try{
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
+}
+
+void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+ parent->getChannel(channel)->recover(requeue);
+}
+
+void Connection::TxHandlerImpl::select(u_int16_t channel){
+ parent->getChannel(channel)->begin();
+ parent->client->getTx().selectOk(channel);
+}
+
+void Connection::TxHandlerImpl::commit(u_int16_t channel){
+ parent->getChannel(channel)->commit();
+ parent->client->getTx().commitOk(channel);
+}
+
+void Connection::TxHandlerImpl::rollback(u_int16_t channel){
+
+ parent->getChannel(channel)->rollback();
+ parent->client->getTx().rollbackOk(channel);
+ parent->getChannel(channel)->recover(false);
+}
+
+void
+Connection::QueueHandlerImpl::unbind(
+ u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const qpid::framing::FieldTable& /*arguments*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+void
+Connection::ChannelHandlerImpl::resume(
+ u_int16_t /*channel*/,
+ const string& /*channelId*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+// Message class method handlers
+void
+Connection::MessageHandlerImpl::append( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*bytes*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+
+void
+Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
+ const string& /*destination*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::close( u_int16_t /*channel*/,
+ const string& /*reference*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noLocal*/,
+ bool /*noAck*/,
+ bool /*exclusive*/,
+ const qpid::framing::FieldTable& /*filter*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::get( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/,
+ u_int64_t /*value*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::open( u_int16_t /*channel*/,
+ const string& /*reference*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/,
+ u_int32_t /*prefetchSize*/,
+ u_int16_t /*prefetchCount*/,
+ bool /*global*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/,
+ bool /*requeue*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/,
+ u_int16_t /*code*/,
+ const string& /*text*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content /*body*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+}}
+