diff options
author | Alan Conway <aconway@apache.org> | 2007-01-16 20:17:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-16 20:17:50 +0000 |
commit | bc84e62cc549ac2d751a45d61a867354c84c60d6 (patch) | |
tree | 160824086ea1edfd2d28f153626d378d69d0f516 /cpp | |
parent | 0df54842626c3cc065cad1a2595458f54253a178 (diff) | |
download | qpid-python-bc84e62cc549ac2d751a45d61a867354c84c60d6.tar.gz |
* Renamed Session* classes to Connection* to align with AMQP spec
- broker::SessionHandlerImpl -> broker::Connection
- broker::SessionHandlerImplFactory -> broker::ConnectionFactory
- sys::SessionHandler -> ConnectionInputHandler
- sys::SessionHandlerFactory -> ConnectionInputHandlerFactory
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496848 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
25 files changed, 185 insertions, 193 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index c2117eaf23..079eb5fd73 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -29,10 +29,10 @@ #include "MessageStoreModule.h" #include "NullMessageStore.h" #include "ProtocolInitiation.h" -#include "SessionHandlerImpl.h" +#include "Connection.h" #include "sys/SessionContext.h" -#include "sys/SessionHandler.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandler.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "sys/TimeoutHandler.h" #include "Broker.h" diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index ad7fbb1eca..929ed4360e 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -23,7 +23,7 @@ */ #include <Configuration.h> -#include <SessionHandlerFactoryImpl.h> +#include <ConnectionFactory.h> #include <sys/Runnable.h> #include <sys/Acceptor.h> #include <SharedObject.h> @@ -99,7 +99,7 @@ class Broker : public qpid::sys::Runnable, u_int32_t timeout; u_int64_t stagingThreshold; AutoDelete cleaner; - SessionHandlerFactoryImpl factory; + ConnectionFactory factory; qpid::framing::Requester requester; qpid::framing::Responder responder; }; diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/Connection.cpp index d7f6320535..c391ff6db5 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -21,7 +21,7 @@ #include <iostream> #include <assert.h> -#include "SessionHandlerImpl.h" +#include "Connection.h" #include "FanOutExchange.h" #include "HeadersExchange.h" @@ -37,7 +37,7 @@ using namespace qpid::sys; namespace qpid { namespace broker { -SessionHandlerImpl::SessionHandlerImpl( +Connection::Connection( SessionContext* _context, Broker& broker) : context(_context), @@ -59,14 +59,14 @@ SessionHandlerImpl::SessionHandlerImpl( heartbeat(0) {} -SessionHandlerImpl::~SessionHandlerImpl(){ +Connection::~Connection(){ if (client != NULL) delete client; } -Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ +Channel* Connection::getChannel(u_int16_t channel){ channel_iterator i = channels.find(channel); if(i == channels.end()){ throw ConnectionException(504, "Unknown channel: " + channel); @@ -74,7 +74,7 @@ Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ return i->second; } -Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ +Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { queue = getChannel(channel)->getDefaultQueue(); @@ -89,11 +89,11 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha } -Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ +Exchange::shared_ptr Connection::findExchange(const string& name){ return exchanges.get(name); } -void SessionHandlerImpl::handleMethod( +void Connection::handleMethod( u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) { AMQMethodBody::shared_ptr method = @@ -104,11 +104,12 @@ void SessionHandlerImpl::handleMethod( channels[channel]->close(); channels.erase(channel); client->getChannel().close( - channel, e.code, e.text, + channel, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ client->getConnection().close( - 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + 0, e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ client->getConnection().close( 0, 541/*internal error*/, e.what(), @@ -116,7 +117,7 @@ void SessionHandlerImpl::handleMethod( } } -void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ +void Connection::received(qpid::framing::AMQFrame* frame){ u_int16_t channel = frame->getChannel(); AMQBody::shared_ptr body = frame->getBody(); switch(body->type()) @@ -156,12 +157,12 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ * An OutputHandler that does request/response procssing before * delgating to another OutputHandler. */ -SessionHandlerImpl::Sender::Sender( +Connection::Sender::Sender( OutputHandler& oh, Requester& req, Responder& resp) : out(oh), requester(req), responder(resp) {} -void SessionHandlerImpl::Sender::send(AMQFrame* frame) { +void Connection::Sender::send(AMQFrame* frame) { AMQBody::shared_ptr body = frame->getBody(); u_int16_t type = body->type(); if (type == REQUEST_BODY) @@ -171,7 +172,7 @@ void SessionHandlerImpl::Sender::send(AMQFrame* frame) { out.send(frame); } -void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ +void Connection::initiated(qpid::framing::ProtocolInitiation* header){ if (client == 0) { @@ -189,15 +190,15 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ } -void SessionHandlerImpl::idleOut(){ +void Connection::idleOut(){ } -void SessionHandlerImpl::idleIn(){ +void Connection::idleIn(){ } -void SessionHandlerImpl::closed(){ +void Connection::closed(){ try { for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ Channel* c = i->second; @@ -215,37 +216,37 @@ void SessionHandlerImpl::closed(){ } } -void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ +void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ getChannel(channel)->handleHeader(body); } -void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ +void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ getChannel(channel)->handleContent(body); } -void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + std::cout << "Connection::handleHeartbeat()" << std::endl; } -void SessionHandlerImpl::ConnectionHandlerImpl::startOk( +void Connection::ConnectionHandlerImpl::startOk( u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } -void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} +void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} -void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ +void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ parent->framemax = framemax; parent->heartbeat = heartbeat; } -void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; parent->client->getConnection().openOk(0, knownhosts); } -void SessionHandlerImpl::ConnectionHandlerImpl::close( +void Connection::ConnectionHandlerImpl::close( u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { @@ -253,13 +254,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::close( parent->context->close(); } -void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ +void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ parent->context->close(); } -void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ +void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ parent->channels[channel] = new Channel( @@ -271,10 +272,10 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin parent->client->getChannel().openOk(channel, std::string()/* ID */); } -void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} +void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} +void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, +void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ Channel* c = parent->getChannel(channel); if(c){ @@ -285,11 +286,11 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t } } -void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} +void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} -void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, +void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& /*arguments*/){ @@ -314,7 +315,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 } -void SessionHandlerImpl::ExchangeHandlerImpl::unbind( +void Connection::ExchangeHandlerImpl::unbind( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, @@ -327,7 +328,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind( -void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, +void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused @@ -335,7 +336,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 if(!nowait) parent->client->getExchange().deleteOk(channel); } -void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, +void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; @@ -370,7 +371,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t } } -void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, +void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ @@ -388,14 +389,14 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t } } -void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); if(!nowait) parent->client->getQueue().purgeOk(channel, count); } -void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, +void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -421,14 +422,14 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t -void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global parent->getChannel(channel)->setPrefetchSize(prefetchSize); parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client->getBasic().qosOk(channel); } -void SessionHandlerImpl::BasicHandlerImpl::consume( +void Connection::BasicHandlerImpl::consume( u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -457,13 +458,13 @@ void SessionHandlerImpl::BasicHandlerImpl::consume( } -void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ +void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); } -void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, +void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ @@ -476,7 +477,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t } } -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = parent->getQueue(queueName, channelId); if(!parent->getChannel(channelId)->get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -485,7 +486,7 @@ void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /* } } -void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ +void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ try{ parent->getChannel(channel)->ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -493,23 +494,23 @@ void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deli } } -void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ +void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); } -void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ +void Connection::TxHandlerImpl::select(u_int16_t channel){ parent->getChannel(channel)->begin(); parent->client->getTx().selectOk(channel); } -void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ +void Connection::TxHandlerImpl::commit(u_int16_t channel){ parent->getChannel(channel)->commit(); parent->client->getTx().commitOk(channel); } -void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ +void Connection::TxHandlerImpl::rollback(u_int16_t channel){ parent->getChannel(channel)->rollback(); parent->client->getTx().rollbackOk(channel); @@ -517,7 +518,7 @@ void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ } void -SessionHandlerImpl::QueueHandlerImpl::unbind( +Connection::QueueHandlerImpl::unbind( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, @@ -529,25 +530,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind( } void -SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) +Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) +Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) +Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void -SessionHandlerImpl::ChannelHandlerImpl::resume( +Connection::ChannelHandlerImpl::resume( u_int16_t /*channel*/, const string& /*channelId*/ ) { @@ -556,7 +557,7 @@ SessionHandlerImpl::ChannelHandlerImpl::resume( // Message class method handlers void -SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::append( u_int16_t /*channel*/, const string& /*reference*/, const string& /*bytes*/ ) { @@ -565,14 +566,14 @@ SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/, void -SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/, const string& /*destination*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, const string& /*reference*/, const string& /*identifier*/ ) { @@ -580,14 +581,14 @@ SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, } void -SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::close( u_int16_t /*channel*/, const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*destination*/, @@ -600,13 +601,13 @@ SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/, } void -SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::get( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*queue*/, const string& /*destination*/, @@ -616,27 +617,27 @@ SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/, } void -SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/, u_int64_t /*value*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::open( u_int16_t /*channel*/, const string& /*reference*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/, u_int32_t /*prefetchSize*/, u_int16_t /*prefetchCount*/, bool /*global*/ ) @@ -645,14 +646,14 @@ SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/, } void -SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/, bool /*requeue*/ ) { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void -SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/, u_int16_t /*code*/, const string& /*text*/ ) { @@ -660,7 +661,7 @@ SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/, } void -SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/, const string& /*reference*/, const string& /*identifier*/ ) { @@ -668,7 +669,7 @@ SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/, } void -SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, +Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/Connection.h index 070bd1266e..2f2770b28d 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/Connection.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionHandlerImpl_ -#define _SessionHandlerImpl_ +#ifndef _Connection_ +#define _Connection_ #include <map> #include <sstream> @@ -29,7 +29,7 @@ #include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> #include <sys/SessionContext.h> -#include <sys/SessionHandler.h> +#include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> #include "Broker.h" #include "Exception.h" @@ -37,22 +37,6 @@ namespace qpid { namespace broker { -struct ChannelException : public qpid::Exception { - u_int16_t code; - string text; - ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} - ~ChannelException() throw() {} - const char* what() const throw() { return text.c_str(); } -}; - -struct ConnectionException : public qpid::Exception { - u_int16_t code; - string text; - ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} - ~ConnectionException() throw() {} - const char* what() const throw() { return text.c_str(); } -}; - class Settings { public: const u_int32_t timeout;//timeout for auto-deleted queues (in ms) @@ -61,7 +45,7 @@ class Settings { Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} }; -class SessionHandlerImpl : public qpid::sys::SessionHandler, +class Connection : public qpid::sys::ConnectionInputHandler, public qpid::framing::AMQP_ServerOperations, public ConnectionToken { @@ -117,31 +101,28 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, Exchange::shared_ptr findExchange(const string& name); public: - SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker); + Connection(qpid::sys::SessionContext* context, Broker& broker); virtual void received(qpid::framing::AMQFrame* frame); virtual void initiated(qpid::framing::ProtocolInitiation* header); virtual void idleOut(); virtual void idleIn(); virtual void closed(); - virtual ~SessionHandlerImpl(); + virtual ~Connection(); class ConnectionHandlerImpl : public ConnectionHandler{ - SessionHandlerImpl* parent; + Connection* parent; public: - inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + inline ConnectionHandlerImpl(Connection* _parent) : parent(_parent) {} virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, const string& response, const string& locale); - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void secureOk(u_int16_t channel, const string& response); virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, u_int16_t methodId); @@ -151,11 +132,10 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, }; class ChannelHandlerImpl : public ChannelHandler{ - SessionHandlerImpl* parent; + Connection* parent; public: - inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + inline ChannelHandlerImpl(Connection* _parent) : parent(_parent) {} - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void open(u_int16_t channel, const string& outOfBand); virtual void flow(u_int16_t channel, bool active); @@ -180,9 +160,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, }; class ExchangeHandlerImpl : public ExchangeHandler{ - SessionHandlerImpl* parent; + Connection* parent; public: - inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + inline ExchangeHandlerImpl(Connection* _parent) : parent(_parent) {} virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, bool passive, bool durable, bool autoDelete, bool internal, bool nowait, @@ -202,9 +182,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, class QueueHandlerImpl : public QueueHandler{ - SessionHandlerImpl* parent; + Connection* parent; public: - inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + inline QueueHandlerImpl(Connection* _parent) : parent(_parent) {} virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, bool passive, bool durable, bool exclusive, @@ -224,7 +204,6 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, bool nowait); - // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, bool nowait); @@ -232,9 +211,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, }; class BasicHandlerImpl : public BasicHandler{ - SessionHandlerImpl* parent; + Connection* parent; public: - inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + inline BasicHandlerImpl(Connection* _parent) : parent(_parent) {} virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); @@ -261,9 +240,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, }; class TxHandlerImpl : public TxHandler{ - SessionHandlerImpl* parent; + Connection* parent; public: - TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + TxHandlerImpl(Connection* _parent) : parent(_parent) {} virtual ~TxHandlerImpl() {} virtual void select(u_int16_t channel); virtual void commit(u_int16_t channel); @@ -271,13 +250,13 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, }; class MessageHandlerImpl : public MessageHandler { - SessionHandlerImpl* parent; + Connection* parent; // Constructors and destructors public: MessageHandlerImpl() {} - MessageHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + MessageHandlerImpl(Connection* _parent) : parent(_parent) {} virtual ~MessageHandlerImpl() {} // Protocol methods diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/ConnectionFactory.cpp index 559fd6bca1..3c4c7cd724 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/lib/broker/ConnectionFactory.cpp @@ -18,26 +18,26 @@ * under the License. * */ -#include <SessionHandlerFactoryImpl.h> -#include <SessionHandlerImpl.h> +#include <ConnectionFactory.h> +#include <Connection.h> namespace qpid { namespace broker { -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b) +ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {} -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() +ConnectionFactory::~ConnectionFactory() { broker.getCleaner().stop(); } -qpid::sys::SessionHandler* -SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt) +qpid::sys::ConnectionInputHandler* +ConnectionFactory::create(qpid::sys::SessionContext* ctxt) { - return new SessionHandlerImpl(ctxt, broker); + return new Connection(ctxt, broker); } }} // namespace qpid::broker diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/ConnectionFactory.h index 49c42b4d1c..fe8052ed9c 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.h +++ b/cpp/lib/broker/ConnectionFactory.h @@ -18,23 +18,23 @@ * under the License. * */ -#ifndef _SessionHandlerFactoryImpl_ -#define _SessionHandlerFactoryImpl_ +#ifndef _ConnectionFactory_ +#define _ConnectionFactory_ -#include "SessionHandlerFactory.h" +#include "ConnectionInputHandlerFactory.h" namespace qpid { namespace broker { class Broker; -class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory +class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory { public: - SessionHandlerFactoryImpl(Broker& b); + ConnectionFactory(Broker& b); - virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); + virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext* ctxt); - virtual ~SessionHandlerFactoryImpl(); + virtual ~ConnectionFactory(); private: Broker& broker; diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 01a5b3d847..b1a0c1af78 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -65,10 +65,10 @@ libqpidbroker_la_SOURCES = \ QueueRegistry.h \ RecoveryManager.cpp \ RecoveryManager.h \ - SessionHandlerFactoryImpl.cpp \ - SessionHandlerFactoryImpl.h \ - SessionHandlerImpl.cpp \ - SessionHandlerImpl.h \ + ConnectionFactory.cpp \ + ConnectionFactory.h \ + Connection.cpp \ + Connection.h \ TopicExchange.cpp \ TopicExchange.h \ TransactionalStore.h \ diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp index 2d1382ef09..ff2f83b725 100644 --- a/cpp/lib/broker/QueueRegistry.cpp +++ b/cpp/lib/broker/QueueRegistry.cpp @@ -19,7 +19,6 @@ * */ #include <QueueRegistry.h> -#include <SessionHandlerImpl.h> #include <sstream> #include <assert.h> diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index f35d427bb0..61bbc0ab5f 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -54,6 +54,17 @@ class Exception : public std::exception typedef boost::shared_ptr<Exception> shared_ptr; }; +struct ChannelException : public qpid::Exception { + u_int16_t code; + ChannelException(u_int16_t _code, std::string _text) + : Exception(_text), code(_code) {} +}; + +struct ConnectionException : public qpid::Exception { + u_int16_t code; + ConnectionException(u_int16_t _code, std::string _text) + : Exception(_text), code(_code) {} +}; } diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index 541145ac97..813c49135e 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -120,8 +120,8 @@ nobase_pkginclude_HEADERS = \ sys/Mutex.h \ sys/Runnable.h \ sys/SessionContext.h \ - sys/SessionHandler.h \ - sys/SessionHandlerFactory.h \ + sys/ConnectionInputHandler.h \ + sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ sys/Socket.h \ sys/Thread.h \ diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp index 1dd3cd4ce9..7e1da505c6 100644 --- a/cpp/lib/common/framing/Requester.cpp +++ b/cpp/lib/common/framing/Requester.cpp @@ -33,13 +33,15 @@ void Requester::sending(AMQRequestBody::Data& request) { void Requester::processed(const AMQResponseBody::Data& response) { responseMark = response.responseId; RequestId id = response.requestId; - RequestId end = id + response.batchOffset; + RequestId end = id + response.batchOffset + 1; for ( ; id < end; ++id) { std::set<RequestId>::iterator i = requests.find(id); - if (i == requests.end()) - // TODO aconway 2007-01-12: Verify this is the right exception. - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); - requests.erase(i); + if (i != requests.end()) + requests.erase(i); + else { + // FIXME aconway 2007-01-16: Uncomment exception when ids are propagating. +// THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); + } } } diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h index e6bc27a593..f571dcbddd 100644 --- a/cpp/lib/common/sys/Acceptor.h +++ b/cpp/lib/common/sys/Acceptor.h @@ -28,7 +28,7 @@ namespace qpid { namespace sys { -class SessionHandlerFactory; +class ConnectionInputHandlerFactory; class Acceptor : public qpid::SharedObject<Acceptor> { @@ -36,7 +36,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); virtual ~Acceptor() = 0; virtual int16_t getPort() const = 0; - virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/lib/common/sys/SessionHandler.h b/cpp/lib/common/sys/ConnectionInputHandler.h index 76f79d421d..fa70dfaf48 100644 --- a/cpp/lib/common/sys/SessionHandler.h +++ b/cpp/lib/common/sys/ConnectionInputHandler.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionHandler_ -#define _SessionHandler_ +#ifndef _ConnectionInputHandler_ +#define _ConnectionInputHandler_ #include <InputHandler.h> #include <InitiationHandler.h> @@ -29,7 +29,7 @@ namespace qpid { namespace sys { - class SessionHandler : + class ConnectionInputHandler : public qpid::framing::InitiationHandler, public qpid::framing::InputHandler, public TimeoutHandler diff --git a/cpp/lib/common/sys/SessionHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h index 2a01aebcb0..5bb5e17704 100644 --- a/cpp/lib/common/sys/SessionHandlerFactory.h +++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionHandlerFactory_ -#define _SessionHandlerFactory_ +#ifndef _ConnectionInputHandlerFactory_ +#define _ConnectionInputHandlerFactory_ #include <boost/noncopyable.hpp> @@ -27,17 +27,17 @@ namespace qpid { namespace sys { class SessionContext; -class SessionHandler; +class ConnectionInputHandler; /** * Callback interface used by the Acceptor to - * create a SessionHandler for each new connection. + * create a ConnectionInputHandler for each new connection. */ -class SessionHandlerFactory : private boost::noncopyable +class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual SessionHandler* create(SessionContext* ctxt) = 0; - virtual ~SessionHandlerFactory(){} + virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0; + virtual ~ConnectionInputHandlerFactory(){} }; }} diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 6853833797..10f787f4fe 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -19,7 +19,7 @@ * */ #include <sys/Acceptor.h> -#include <sys/SessionHandlerFactory.h> +#include <sys/ConnectionInputHandlerFactory.h> #include "LFProcessor.h" #include "LFSessionContext.h" #include "APRBase.h" @@ -33,7 +33,7 @@ class APRAcceptor : public Acceptor public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); virtual int16_t getPort() const; - virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); virtual void shutdown(); private: @@ -75,7 +75,7 @@ int16_t APRAcceptor::getPort() const { return address->port; } -void APRAcceptor::run(SessionHandlerFactory* factory) { +void APRAcceptor::run(ConnectionInputHandlerFactory* factory) { running = true; processor.start(); std::cout << "Listening on port " << getPort() << "..." << std::endl; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp index 7fb8d5a91b..43fc3de3dd 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -160,7 +160,7 @@ void LFSessionContext::shutdown(){ handleClose(); } -void LFSessionContext::init(SessionHandler* _handler){ +void LFSessionContext::init(ConnectionInputHandler* _handler){ handler = _handler; processor->add(&fd); } diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index 9483cbe590..8cf50b87ba 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -31,7 +31,7 @@ #include <Buffer.h> #include <sys/Monitor.h> #include <sys/SessionContext.h> -#include <sys/SessionHandler.h> +#include <sys/ConnectionInputHandler.h> #include "APRSocket.h" #include "LFProcessor.h" @@ -49,7 +49,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext qpid::framing::Buffer in; qpid::framing::Buffer out; - qpid::sys::SessionHandler* handler; + qpid::sys::ConnectionInputHandler* handler; LFProcessor* const processor; apr_pollfd_t fd; @@ -74,7 +74,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext virtual void close(); void read(); void write(); - void init(qpid::sys::SessionHandler* handler); + void init(qpid::sys::ConnectionInputHandler* handler); void startProcessing(); void stopProcessing(); void handleClose(); diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 7cd6f60902..787d12d6d1 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -27,8 +27,8 @@ #include <boost/scoped_ptr.hpp> #include <sys/SessionContext.h> -#include <sys/SessionHandler.h> -#include <sys/SessionHandlerFactory.h> +#include <sys/ConnectionInputHandler.h> +#include <sys/ConnectionInputHandlerFactory.h> #include <sys/Acceptor.h> #include <sys/Socket.h> #include <framing/Buffer.h> @@ -53,7 +53,7 @@ class EventChannelAcceptor : public Acceptor { int getPort() const; - void run(SessionHandlerFactory& factory); + void run(ConnectionInputHandlerFactory& factory); void shutdown(); @@ -68,7 +68,7 @@ class EventChannelAcceptor : public Acceptor { bool isRunning; boost::ptr_vector<EventChannelConnection> connections; AcceptEvent acceptEvent; - SessionHandlerFactory* factory; + ConnectionInputHandlerFactory* factory; bool isShutdown; EventChannelThreads::shared_ptr threads; }; @@ -100,7 +100,7 @@ int EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } -void EventChannelAcceptor::run(SessionHandlerFactory& f) { +void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) { { Mutex::ScopedLock l(lock); if (!isRunning && !isShutdown) { diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp index 196dde5af8..4449dc3035 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp +++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp @@ -22,7 +22,7 @@ #include <boost/assert.hpp> #include "EventChannelConnection.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "QpidError.h" using namespace std; @@ -36,7 +36,7 @@ const size_t EventChannelConnection::bufferSize = 65536; EventChannelConnection::EventChannelConnection( EventChannelThreads::shared_ptr threads_, - SessionHandlerFactory& factory_, + ConnectionInputHandlerFactory& factory_, int rfd, int wfd, bool isTrace_ diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h index bace045993..1504e92651 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.h +++ b/cpp/lib/common/sys/posix/EventChannelConnection.h @@ -24,17 +24,17 @@ #include "EventChannelThreads.h" #include "sys/Monitor.h" #include "sys/SessionContext.h" -#include "sys/SessionHandler.h" +#include "sys/ConnectionInputHandler.h" #include "sys/AtomicCount.h" #include "framing/AMQFrame.h" namespace qpid { namespace sys { -class SessionHandlerFactory; +class ConnectionInputHandlerFactory; /** - * Implements SessionContext and delegates to a SessionHandler + * Implements SessionContext and delegates to a ConnectionInputHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, @@ -44,7 +44,7 @@ class EventChannelConnection : public SessionContext { public: EventChannelConnection( EventChannelThreads::shared_ptr threads, - SessionHandlerFactory& factory, + ConnectionInputHandlerFactory& factory, int readDescriptor, int writeDescriptor = 0, bool isTrace = false @@ -86,7 +86,7 @@ class EventChannelConnection : public SessionContext { AtomicCount busyThreads; EventChannelThreads::shared_ptr threads; - std::auto_ptr<SessionHandler> handler; + std::auto_ptr<ConnectionInputHandler> handler; qpid::framing::Buffer in, out; FrameQueue writeFrames; bool isTrace; diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp index 842aa76f36..a80a6c61f7 100644 --- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp +++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -32,7 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } class PosixAcceptor : public Acceptor { public: virtual int16_t getPort() const { fail(); return 0; } - virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } + virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } virtual void shutdown() { fail(); } }; diff --git a/cpp/tests/AcceptorTest.cpp b/cpp/tests/AcceptorTest.cpp index 394dfea463..34a51888d4 100644 --- a/cpp/tests/AcceptorTest.cpp +++ b/cpp/tests/AcceptorTest.cpp @@ -28,7 +28,7 @@ #include "framing/Buffer.h" #include "qpid_test_plugin.h" -#include "MockSessionHandler.h" +#include "MockConnectionInputHandler.h" using namespace qpid::sys; using namespace qpid::framing; @@ -45,7 +45,7 @@ class AcceptorTest : public CppUnit::TestCase, private Runnable CPPUNIT_TEST_SUITE_END(); private: - MockSessionHandlerFactory factory; + MockConnectionInputHandlerFactory factory; Acceptor::shared_ptr acceptor; public: diff --git a/cpp/tests/EventChannelConnectionTest.cpp b/cpp/tests/EventChannelConnectionTest.cpp index a6b309d771..5e94b07dbd 100644 --- a/cpp/tests/EventChannelConnectionTest.cpp +++ b/cpp/tests/EventChannelConnectionTest.cpp @@ -24,11 +24,11 @@ #include "framing/AMQHeartbeatBody.h" #include "framing/AMQFrame.h" #include "sys/posix/EventChannelConnection.h" -#include "sys/SessionHandler.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandler.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "sys/Socket.h" #include "qpid_test_plugin.h" -#include "MockSessionHandler.h" +#include "MockConnectionInputHandler.h" using namespace qpid::sys; using namespace qpid::framing; @@ -100,7 +100,7 @@ class EventChannelConnectionTest : public CppUnit::TestCase EventChannelThreads::shared_ptr threads; int pipe[2]; std::auto_ptr<EventChannelConnection> connection; - MockSessionHandlerFactory factory; + MockConnectionInputHandlerFactory factory; }; // Make this test suite a plugin. diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 4822309102..d08249e759 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -12,7 +12,7 @@ INCLUDES = \ EXTRA_DIST = \ topictest \ qpid_test_plugin.h \ - MockSessionHandler.h + MockConnectionInputHandler.h client_exe_tests = \ diff --git a/cpp/tests/MockSessionHandler.h b/cpp/tests/MockConnectionInputHandler.h index aace780ac9..522df0900f 100644 --- a/cpp/tests/MockSessionHandler.h +++ b/cpp/tests/MockConnectionInputHandler.h @@ -1,5 +1,5 @@ -#ifndef _tests_MockSessionHandler_h -#define _tests_MockSessionHandler_h +#ifndef _tests_MockConnectionInputHandler_h +#define _tests_MockConnectionInputHandler_h /* * @@ -19,16 +19,16 @@ * */ -#include "sys/SessionHandler.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandler.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "sys/Monitor.h" #include "framing/ProtocolInitiation.h" -struct MockSessionHandler : public qpid::sys::SessionHandler { +struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { - MockSessionHandler() : state(START) {} + MockConnectionInputHandler() : state(START) {} - ~MockSessionHandler() {} + ~MockConnectionInputHandler() {} void initiated(qpid::framing::ProtocolInitiation* pi) { qpid::sys::Monitor::ScopedLock l(monitor); @@ -86,12 +86,12 @@ struct MockSessionHandler : public qpid::sys::SessionHandler { }; -struct MockSessionHandlerFactory : public qpid::sys::SessionHandlerFactory { - MockSessionHandlerFactory() : handler(0) {} +struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory { + MockConnectionInputHandlerFactory() : handler(0) {} - qpid::sys::SessionHandler* create(qpid::sys::SessionContext*) { + qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext*) { qpid::sys::Monitor::ScopedLock lock(monitor); - handler = new MockSessionHandler(); + handler = new MockConnectionInputHandler(); monitor.notifyAll(); return handler; } @@ -104,10 +104,10 @@ struct MockSessionHandlerFactory : public qpid::sys::SessionHandlerFactory { CPPUNIT_ASSERT(monitor.wait(deadline)); } - MockSessionHandler* handler; + MockConnectionInputHandler* handler; qpid::sys::Monitor monitor; }; -#endif /*!_tests_MockSessionHandler_h*/ +#endif /*!_tests_MockConnectionInputHandler_h*/ |