summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-16 20:17:50 +0000
committerAlan Conway <aconway@apache.org>2007-01-16 20:17:50 +0000
commitbc84e62cc549ac2d751a45d61a867354c84c60d6 (patch)
tree160824086ea1edfd2d28f153626d378d69d0f516 /cpp
parent0df54842626c3cc065cad1a2595458f54253a178 (diff)
downloadqpid-python-bc84e62cc549ac2d751a45d61a867354c84c60d6.tar.gz
* Renamed Session* classes to Connection* to align with AMQP spec
- broker::SessionHandlerImpl -> broker::Connection - broker::SessionHandlerImplFactory -> broker::ConnectionFactory - sys::SessionHandler -> ConnectionInputHandler - sys::SessionHandlerFactory -> ConnectionInputHandlerFactory git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496848 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/Broker.cpp6
-rw-r--r--cpp/lib/broker/Broker.h4
-rw-r--r--cpp/lib/broker/Connection.cpp (renamed from cpp/lib/broker/SessionHandlerImpl.cpp)139
-rw-r--r--cpp/lib/broker/Connection.h (renamed from cpp/lib/broker/SessionHandlerImpl.h)61
-rw-r--r--cpp/lib/broker/ConnectionFactory.cpp (renamed from cpp/lib/broker/SessionHandlerFactoryImpl.cpp)14
-rw-r--r--cpp/lib/broker/ConnectionFactory.h (renamed from cpp/lib/broker/SessionHandlerFactoryImpl.h)14
-rw-r--r--cpp/lib/broker/Makefile.am8
-rw-r--r--cpp/lib/broker/QueueRegistry.cpp1
-rw-r--r--cpp/lib/common/Exception.h11
-rw-r--r--cpp/lib/common/Makefile.am4
-rw-r--r--cpp/lib/common/framing/Requester.cpp12
-rw-r--r--cpp/lib/common/sys/Acceptor.h4
-rw-r--r--cpp/lib/common/sys/ConnectionInputHandler.h (renamed from cpp/lib/common/sys/SessionHandler.h)6
-rw-r--r--cpp/lib/common/sys/ConnectionInputHandlerFactory.h (renamed from cpp/lib/common/sys/SessionHandlerFactory.h)14
-rw-r--r--cpp/lib/common/sys/apr/APRAcceptor.cpp6
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.cpp2
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h6
-rw-r--r--cpp/lib/common/sys/posix/EventChannelAcceptor.cpp10
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.cpp4
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.h10
-rw-r--r--cpp/lib/common/sys/posix/PosixAcceptor.cpp2
-rw-r--r--cpp/tests/AcceptorTest.cpp4
-rw-r--r--cpp/tests/EventChannelConnectionTest.cpp8
-rw-r--r--cpp/tests/Makefile.am2
-rw-r--r--cpp/tests/MockConnectionInputHandler.h (renamed from cpp/tests/MockSessionHandler.h)26
25 files changed, 185 insertions, 193 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index c2117eaf23..079eb5fd73 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -29,10 +29,10 @@
#include "MessageStoreModule.h"
#include "NullMessageStore.h"
#include "ProtocolInitiation.h"
-#include "SessionHandlerImpl.h"
+#include "Connection.h"
#include "sys/SessionContext.h"
-#include "sys/SessionHandler.h"
-#include "sys/SessionHandlerFactory.h"
+#include "sys/ConnectionInputHandler.h"
+#include "sys/ConnectionInputHandlerFactory.h"
#include "sys/TimeoutHandler.h"
#include "Broker.h"
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h
index ad7fbb1eca..929ed4360e 100644
--- a/cpp/lib/broker/Broker.h
+++ b/cpp/lib/broker/Broker.h
@@ -23,7 +23,7 @@
*/
#include <Configuration.h>
-#include <SessionHandlerFactoryImpl.h>
+#include <ConnectionFactory.h>
#include <sys/Runnable.h>
#include <sys/Acceptor.h>
#include <SharedObject.h>
@@ -99,7 +99,7 @@ class Broker : public qpid::sys::Runnable,
u_int32_t timeout;
u_int64_t stagingThreshold;
AutoDelete cleaner;
- SessionHandlerFactoryImpl factory;
+ ConnectionFactory factory;
qpid::framing::Requester requester;
qpid::framing::Responder responder;
};
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/Connection.cpp
index d7f6320535..c391ff6db5 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -21,7 +21,7 @@
#include <iostream>
#include <assert.h>
-#include "SessionHandlerImpl.h"
+#include "Connection.h"
#include "FanOutExchange.h"
#include "HeadersExchange.h"
@@ -37,7 +37,7 @@ using namespace qpid::sys;
namespace qpid {
namespace broker {
-SessionHandlerImpl::SessionHandlerImpl(
+Connection::Connection(
SessionContext* _context, Broker& broker) :
context(_context),
@@ -59,14 +59,14 @@ SessionHandlerImpl::SessionHandlerImpl(
heartbeat(0)
{}
-SessionHandlerImpl::~SessionHandlerImpl(){
+Connection::~Connection(){
if (client != NULL)
delete client;
}
-Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
+Channel* Connection::getChannel(u_int16_t channel){
channel_iterator i = channels.find(channel);
if(i == channels.end()){
throw ConnectionException(504, "Unknown channel: " + channel);
@@ -74,7 +74,7 @@ Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
return i->second;
}
-Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
Queue::shared_ptr queue;
if (name.empty()) {
queue = getChannel(channel)->getDefaultQueue();
@@ -89,11 +89,11 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha
}
-Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+Exchange::shared_ptr Connection::findExchange(const string& name){
return exchanges.get(name);
}
-void SessionHandlerImpl::handleMethod(
+void Connection::handleMethod(
u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
{
AMQMethodBody::shared_ptr method =
@@ -104,11 +104,12 @@ void SessionHandlerImpl::handleMethod(
channels[channel]->close();
channels.erase(channel);
client->getChannel().close(
- channel, e.code, e.text,
+ channel, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
client->getConnection().close(
- 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ 0, e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
client->getConnection().close(
0, 541/*internal error*/, e.what(),
@@ -116,7 +117,7 @@ void SessionHandlerImpl::handleMethod(
}
}
-void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+void Connection::received(qpid::framing::AMQFrame* frame){
u_int16_t channel = frame->getChannel();
AMQBody::shared_ptr body = frame->getBody();
switch(body->type())
@@ -156,12 +157,12 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
* An OutputHandler that does request/response procssing before
* delgating to another OutputHandler.
*/
-SessionHandlerImpl::Sender::Sender(
+Connection::Sender::Sender(
OutputHandler& oh, Requester& req, Responder& resp)
: out(oh), requester(req), responder(resp)
{}
-void SessionHandlerImpl::Sender::send(AMQFrame* frame) {
+void Connection::Sender::send(AMQFrame* frame) {
AMQBody::shared_ptr body = frame->getBody();
u_int16_t type = body->type();
if (type == REQUEST_BODY)
@@ -171,7 +172,7 @@ void SessionHandlerImpl::Sender::send(AMQFrame* frame) {
out.send(frame);
}
-void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+void Connection::initiated(qpid::framing::ProtocolInitiation* header){
if (client == 0)
{
@@ -189,15 +190,15 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
}
-void SessionHandlerImpl::idleOut(){
+void Connection::idleOut(){
}
-void SessionHandlerImpl::idleIn(){
+void Connection::idleIn(){
}
-void SessionHandlerImpl::closed(){
+void Connection::closed(){
try {
for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
Channel* c = i->second;
@@ -215,37 +216,37 @@ void SessionHandlerImpl::closed(){
}
}
-void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
getChannel(channel)->handleHeader(body);
}
-void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
getChannel(channel)->handleContent(body);
}
-void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ std::cout << "Connection::handleHeartbeat()" << std::endl;
}
-void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
+void Connection::ConnectionHandlerImpl::startOk(
u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
}
-void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+void Connection::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
-void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+void Connection::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
parent->framemax = framemax;
parent->heartbeat = heartbeat;
}
-void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void Connection::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
parent->client->getConnection().openOk(0, knownhosts);
}
-void SessionHandlerImpl::ConnectionHandlerImpl::close(
+void Connection::ConnectionHandlerImpl::close(
u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
@@ -253,13 +254,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::close(
parent->context->close();
}
-void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+void Connection::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
parent->context->close();
}
-void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
+void Connection::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
parent->channels[channel] = new Channel(
@@ -271,10 +272,10 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin
parent->client->getChannel().openOk(channel, std::string()/* ID */);
}
-void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
-void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+void Connection::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
+void Connection::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void Connection::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
Channel* c = parent->getChannel(channel);
if(c){
@@ -285,11 +286,11 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t
}
}
-void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+void Connection::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void Connection::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -314,7 +315,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
}
-void SessionHandlerImpl::ExchangeHandlerImpl::unbind(
+void Connection::ExchangeHandlerImpl::unbind(
u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*queue*/,
@@ -327,7 +328,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind(
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+void Connection::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
@@ -335,7 +336,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16
if(!nowait) parent->client->getExchange().deleteOk(channel);
}
-void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+void Connection::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -370,7 +371,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
}
}
-void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+void Connection::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -388,14 +389,14 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
}
}
-void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void Connection::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
int count = queue->purge();
if(!nowait) parent->client->getQueue().purgeOk(channel, count);
}
-void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+void Connection::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -421,14 +422,14 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
-void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void Connection::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
parent->getChannel(channel)->setPrefetchSize(prefetchSize);
parent->getChannel(channel)->setPrefetchCount(prefetchCount);
parent->client->getBasic().qosOk(channel);
}
-void SessionHandlerImpl::BasicHandlerImpl::consume(
+void Connection::BasicHandlerImpl::consume(
u_int16_t channelId, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -457,13 +458,13 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(
}
-void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+void Connection::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
parent->getChannel(channel)->cancel(consumerTag);
if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
}
-void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+void Connection::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
@@ -476,7 +477,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
}
}
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void Connection::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
if(!parent->getChannel(channelId)->get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -485,7 +486,7 @@ void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*
}
}
-void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+void Connection::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
try{
parent->getChannel(channel)->ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -493,23 +494,23 @@ void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deli
}
}
-void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void Connection::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+void Connection::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
parent->getChannel(channel)->recover(requeue);
}
-void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+void Connection::TxHandlerImpl::select(u_int16_t channel){
parent->getChannel(channel)->begin();
parent->client->getTx().selectOk(channel);
}
-void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+void Connection::TxHandlerImpl::commit(u_int16_t channel){
parent->getChannel(channel)->commit();
parent->client->getTx().commitOk(channel);
}
-void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+void Connection::TxHandlerImpl::rollback(u_int16_t channel){
parent->getChannel(channel)->rollback();
parent->client->getTx().rollbackOk(channel);
@@ -517,7 +518,7 @@ void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
}
void
-SessionHandlerImpl::QueueHandlerImpl::unbind(
+Connection::QueueHandlerImpl::unbind(
u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*queue*/,
@@ -529,25 +530,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind(
}
void
-SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
+Connection::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
+Connection::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
+Connection::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
-SessionHandlerImpl::ChannelHandlerImpl::resume(
+Connection::ChannelHandlerImpl::resume(
u_int16_t /*channel*/,
const string& /*channelId*/ )
{
@@ -556,7 +557,7 @@ SessionHandlerImpl::ChannelHandlerImpl::resume(
// Message class method handlers
void
-SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::append( u_int16_t /*channel*/,
const string& /*reference*/,
const string& /*bytes*/ )
{
@@ -565,14 +566,14 @@ SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/,
void
-SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
const string& /*destination*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
const string& /*reference*/,
const string& /*identifier*/ )
{
@@ -580,14 +581,14 @@ SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
}
void
-SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::close( u_int16_t /*channel*/,
const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::consume( u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*queue*/,
const string& /*destination*/,
@@ -600,13 +601,13 @@ SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/,
}
void
-SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+Connection::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::get( u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*queue*/,
const string& /*destination*/,
@@ -616,27 +617,27 @@ SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/,
}
void
-SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::offset( u_int16_t /*channel*/,
u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+Connection::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::open( u_int16_t /*channel*/,
const string& /*reference*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::qos( u_int16_t /*channel*/,
u_int32_t /*prefetchSize*/,
u_int16_t /*prefetchCount*/,
bool /*global*/ )
@@ -645,14 +646,14 @@ SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/,
}
void
-SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::recover( u_int16_t /*channel*/,
bool /*requeue*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
-SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::reject( u_int16_t /*channel*/,
u_int16_t /*code*/,
const string& /*text*/ )
{
@@ -660,7 +661,7 @@ SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/,
}
void
-SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::resume( u_int16_t /*channel*/,
const string& /*reference*/,
const string& /*identifier*/ )
{
@@ -668,7 +669,7 @@ SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/,
}
void
-SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+Connection::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/Connection.h
index 070bd1266e..2f2770b28d 100644
--- a/cpp/lib/broker/SessionHandlerImpl.h
+++ b/cpp/lib/broker/Connection.h
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _SessionHandlerImpl_
-#define _SessionHandlerImpl_
+#ifndef _Connection_
+#define _Connection_
#include <map>
#include <sstream>
@@ -29,7 +29,7 @@
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
#include <sys/SessionContext.h>
-#include <sys/SessionHandler.h>
+#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
#include "Broker.h"
#include "Exception.h"
@@ -37,22 +37,6 @@
namespace qpid {
namespace broker {
-struct ChannelException : public qpid::Exception {
- u_int16_t code;
- string text;
- ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
- ~ChannelException() throw() {}
- const char* what() const throw() { return text.c_str(); }
-};
-
-struct ConnectionException : public qpid::Exception {
- u_int16_t code;
- string text;
- ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
- ~ConnectionException() throw() {}
- const char* what() const throw() { return text.c_str(); }
-};
-
class Settings {
public:
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
@@ -61,7 +45,7 @@ class Settings {
Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
};
-class SessionHandlerImpl : public qpid::sys::SessionHandler,
+class Connection : public qpid::sys::ConnectionInputHandler,
public qpid::framing::AMQP_ServerOperations,
public ConnectionToken
{
@@ -117,31 +101,28 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
Exchange::shared_ptr findExchange(const string& name);
public:
- SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker);
+ Connection(qpid::sys::SessionContext* context, Broker& broker);
virtual void received(qpid::framing::AMQFrame* frame);
virtual void initiated(qpid::framing::ProtocolInitiation* header);
virtual void idleOut();
virtual void idleIn();
virtual void closed();
- virtual ~SessionHandlerImpl();
+ virtual ~Connection();
class ConnectionHandlerImpl : public ConnectionHandler{
- SessionHandlerImpl* parent;
+ Connection* parent;
public:
- inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ inline ConnectionHandlerImpl(Connection* _parent) : parent(_parent) {}
virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,
const string& response, const string& locale);
- // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void secureOk(u_int16_t channel, const string& response);
virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
- // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);
- // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,
u_int16_t methodId);
@@ -151,11 +132,10 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
};
class ChannelHandlerImpl : public ChannelHandler{
- SessionHandlerImpl* parent;
+ Connection* parent;
public:
- inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ inline ChannelHandlerImpl(Connection* _parent) : parent(_parent) {}
- // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void open(u_int16_t channel, const string& outOfBand);
virtual void flow(u_int16_t channel, bool active);
@@ -180,9 +160,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
};
class ExchangeHandlerImpl : public ExchangeHandler{
- SessionHandlerImpl* parent;
+ Connection* parent;
public:
- inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ inline ExchangeHandlerImpl(Connection* _parent) : parent(_parent) {}
virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,
bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
@@ -202,9 +182,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
class QueueHandlerImpl : public QueueHandler{
- SessionHandlerImpl* parent;
+ Connection* parent;
public:
- inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ inline QueueHandlerImpl(Connection* _parent) : parent(_parent) {}
virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,
bool passive, bool durable, bool exclusive,
@@ -224,7 +204,6 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
bool nowait);
- // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,
bool nowait);
@@ -232,9 +211,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
};
class BasicHandlerImpl : public BasicHandler{
- SessionHandlerImpl* parent;
+ Connection* parent;
public:
- inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ inline BasicHandlerImpl(Connection* _parent) : parent(_parent) {}
virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
@@ -261,9 +240,9 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
};
class TxHandlerImpl : public TxHandler{
- SessionHandlerImpl* parent;
+ Connection* parent;
public:
- TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ TxHandlerImpl(Connection* _parent) : parent(_parent) {}
virtual ~TxHandlerImpl() {}
virtual void select(u_int16_t channel);
virtual void commit(u_int16_t channel);
@@ -271,13 +250,13 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
};
class MessageHandlerImpl : public MessageHandler {
- SessionHandlerImpl* parent;
+ Connection* parent;
// Constructors and destructors
public:
MessageHandlerImpl() {}
- MessageHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ MessageHandlerImpl(Connection* _parent) : parent(_parent) {}
virtual ~MessageHandlerImpl() {}
// Protocol methods
diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/ConnectionFactory.cpp
index 559fd6bca1..3c4c7cd724 100644
--- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/lib/broker/ConnectionFactory.cpp
@@ -18,26 +18,26 @@
* under the License.
*
*/
-#include <SessionHandlerFactoryImpl.h>
-#include <SessionHandlerImpl.h>
+#include <ConnectionFactory.h>
+#include <Connection.h>
namespace qpid {
namespace broker {
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b)
+ConnectionFactory::ConnectionFactory(Broker& b) : broker(b)
{}
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+ConnectionFactory::~ConnectionFactory()
{
broker.getCleaner().stop();
}
-qpid::sys::SessionHandler*
-SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt)
+qpid::sys::ConnectionInputHandler*
+ConnectionFactory::create(qpid::sys::SessionContext* ctxt)
{
- return new SessionHandlerImpl(ctxt, broker);
+ return new Connection(ctxt, broker);
}
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/ConnectionFactory.h
index 49c42b4d1c..fe8052ed9c 100644
--- a/cpp/lib/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/lib/broker/ConnectionFactory.h
@@ -18,23 +18,23 @@
* under the License.
*
*/
-#ifndef _SessionHandlerFactoryImpl_
-#define _SessionHandlerFactoryImpl_
+#ifndef _ConnectionFactory_
+#define _ConnectionFactory_
-#include "SessionHandlerFactory.h"
+#include "ConnectionInputHandlerFactory.h"
namespace qpid {
namespace broker {
class Broker;
-class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory
+class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
{
public:
- SessionHandlerFactoryImpl(Broker& b);
+ ConnectionFactory(Broker& b);
- virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+ virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext* ctxt);
- virtual ~SessionHandlerFactoryImpl();
+ virtual ~ConnectionFactory();
private:
Broker& broker;
diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am
index 01a5b3d847..b1a0c1af78 100644
--- a/cpp/lib/broker/Makefile.am
+++ b/cpp/lib/broker/Makefile.am
@@ -65,10 +65,10 @@ libqpidbroker_la_SOURCES = \
QueueRegistry.h \
RecoveryManager.cpp \
RecoveryManager.h \
- SessionHandlerFactoryImpl.cpp \
- SessionHandlerFactoryImpl.h \
- SessionHandlerImpl.cpp \
- SessionHandlerImpl.h \
+ ConnectionFactory.cpp \
+ ConnectionFactory.h \
+ Connection.cpp \
+ Connection.h \
TopicExchange.cpp \
TopicExchange.h \
TransactionalStore.h \
diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp
index 2d1382ef09..ff2f83b725 100644
--- a/cpp/lib/broker/QueueRegistry.cpp
+++ b/cpp/lib/broker/QueueRegistry.cpp
@@ -19,7 +19,6 @@
*
*/
#include <QueueRegistry.h>
-#include <SessionHandlerImpl.h>
#include <sstream>
#include <assert.h>
diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h
index f35d427bb0..61bbc0ab5f 100644
--- a/cpp/lib/common/Exception.h
+++ b/cpp/lib/common/Exception.h
@@ -54,6 +54,17 @@ class Exception : public std::exception
typedef boost::shared_ptr<Exception> shared_ptr;
};
+struct ChannelException : public qpid::Exception {
+ u_int16_t code;
+ ChannelException(u_int16_t _code, std::string _text)
+ : Exception(_text), code(_code) {}
+};
+
+struct ConnectionException : public qpid::Exception {
+ u_int16_t code;
+ ConnectionException(u_int16_t _code, std::string _text)
+ : Exception(_text), code(_code) {}
+};
}
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index 541145ac97..813c49135e 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -120,8 +120,8 @@ nobase_pkginclude_HEADERS = \
sys/Mutex.h \
sys/Runnable.h \
sys/SessionContext.h \
- sys/SessionHandler.h \
- sys/SessionHandlerFactory.h \
+ sys/ConnectionInputHandler.h \
+ sys/ConnectionInputHandlerFactory.h \
sys/ShutdownHandler.h \
sys/Socket.h \
sys/Thread.h \
diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp
index 1dd3cd4ce9..7e1da505c6 100644
--- a/cpp/lib/common/framing/Requester.cpp
+++ b/cpp/lib/common/framing/Requester.cpp
@@ -33,13 +33,15 @@ void Requester::sending(AMQRequestBody::Data& request) {
void Requester::processed(const AMQResponseBody::Data& response) {
responseMark = response.responseId;
RequestId id = response.requestId;
- RequestId end = id + response.batchOffset;
+ RequestId end = id + response.batchOffset + 1;
for ( ; id < end; ++id) {
std::set<RequestId>::iterator i = requests.find(id);
- if (i == requests.end())
- // TODO aconway 2007-01-12: Verify this is the right exception.
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response.");
- requests.erase(i);
+ if (i != requests.end())
+ requests.erase(i);
+ else {
+ // FIXME aconway 2007-01-16: Uncomment exception when ids are propagating.
+// THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response.");
+ }
}
}
diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h
index e6bc27a593..f571dcbddd 100644
--- a/cpp/lib/common/sys/Acceptor.h
+++ b/cpp/lib/common/sys/Acceptor.h
@@ -28,7 +28,7 @@
namespace qpid {
namespace sys {
-class SessionHandlerFactory;
+class ConnectionInputHandlerFactory;
class Acceptor : public qpid::SharedObject<Acceptor>
{
@@ -36,7 +36,7 @@ class Acceptor : public qpid::SharedObject<Acceptor>
static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
virtual ~Acceptor() = 0;
virtual int16_t getPort() const = 0;
- virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0;
+ virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
diff --git a/cpp/lib/common/sys/SessionHandler.h b/cpp/lib/common/sys/ConnectionInputHandler.h
index 76f79d421d..fa70dfaf48 100644
--- a/cpp/lib/common/sys/SessionHandler.h
+++ b/cpp/lib/common/sys/ConnectionInputHandler.h
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _SessionHandler_
-#define _SessionHandler_
+#ifndef _ConnectionInputHandler_
+#define _ConnectionInputHandler_
#include <InputHandler.h>
#include <InitiationHandler.h>
@@ -29,7 +29,7 @@
namespace qpid {
namespace sys {
- class SessionHandler :
+ class ConnectionInputHandler :
public qpid::framing::InitiationHandler,
public qpid::framing::InputHandler,
public TimeoutHandler
diff --git a/cpp/lib/common/sys/SessionHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
index 2a01aebcb0..5bb5e17704 100644
--- a/cpp/lib/common/sys/SessionHandlerFactory.h
+++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _SessionHandlerFactory_
-#define _SessionHandlerFactory_
+#ifndef _ConnectionInputHandlerFactory_
+#define _ConnectionInputHandlerFactory_
#include <boost/noncopyable.hpp>
@@ -27,17 +27,17 @@ namespace qpid {
namespace sys {
class SessionContext;
-class SessionHandler;
+class ConnectionInputHandler;
/**
* Callback interface used by the Acceptor to
- * create a SessionHandler for each new connection.
+ * create a ConnectionInputHandler for each new connection.
*/
-class SessionHandlerFactory : private boost::noncopyable
+class ConnectionInputHandlerFactory : private boost::noncopyable
{
public:
- virtual SessionHandler* create(SessionContext* ctxt) = 0;
- virtual ~SessionHandlerFactory(){}
+ virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0;
+ virtual ~ConnectionInputHandlerFactory(){}
};
}}
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
index 6853833797..10f787f4fe 100644
--- a/cpp/lib/common/sys/apr/APRAcceptor.cpp
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -19,7 +19,7 @@
*
*/
#include <sys/Acceptor.h>
-#include <sys/SessionHandlerFactory.h>
+#include <sys/ConnectionInputHandlerFactory.h>
#include "LFProcessor.h"
#include "LFSessionContext.h"
#include "APRBase.h"
@@ -33,7 +33,7 @@ class APRAcceptor : public Acceptor
public:
APRAcceptor(int16_t port, int backlog, int threads, bool trace);
virtual int16_t getPort() const;
- virtual void run(qpid::sys::SessionHandlerFactory* factory);
+ virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory);
virtual void shutdown();
private:
@@ -75,7 +75,7 @@ int16_t APRAcceptor::getPort() const {
return address->port;
}
-void APRAcceptor::run(SessionHandlerFactory* factory) {
+void APRAcceptor::run(ConnectionInputHandlerFactory* factory) {
running = true;
processor.start();
std::cout << "Listening on port " << getPort() << "..." << std::endl;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp
index 7fb8d5a91b..43fc3de3dd 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.cpp
+++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -160,7 +160,7 @@ void LFSessionContext::shutdown(){
handleClose();
}
-void LFSessionContext::init(SessionHandler* _handler){
+void LFSessionContext::init(ConnectionInputHandler* _handler){
handler = _handler;
processor->add(&fd);
}
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
index 9483cbe590..8cf50b87ba 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.h
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -31,7 +31,7 @@
#include <Buffer.h>
#include <sys/Monitor.h>
#include <sys/SessionContext.h>
-#include <sys/SessionHandler.h>
+#include <sys/ConnectionInputHandler.h>
#include "APRSocket.h"
#include "LFProcessor.h"
@@ -49,7 +49,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
qpid::framing::Buffer in;
qpid::framing::Buffer out;
- qpid::sys::SessionHandler* handler;
+ qpid::sys::ConnectionInputHandler* handler;
LFProcessor* const processor;
apr_pollfd_t fd;
@@ -74,7 +74,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
virtual void close();
void read();
void write();
- void init(qpid::sys::SessionHandler* handler);
+ void init(qpid::sys::ConnectionInputHandler* handler);
void startProcessing();
void stopProcessing();
void handleClose();
diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
index 7cd6f60902..787d12d6d1 100644
--- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -27,8 +27,8 @@
#include <boost/scoped_ptr.hpp>
#include <sys/SessionContext.h>
-#include <sys/SessionHandler.h>
-#include <sys/SessionHandlerFactory.h>
+#include <sys/ConnectionInputHandler.h>
+#include <sys/ConnectionInputHandlerFactory.h>
#include <sys/Acceptor.h>
#include <sys/Socket.h>
#include <framing/Buffer.h>
@@ -53,7 +53,7 @@ class EventChannelAcceptor : public Acceptor {
int getPort() const;
- void run(SessionHandlerFactory& factory);
+ void run(ConnectionInputHandlerFactory& factory);
void shutdown();
@@ -68,7 +68,7 @@ class EventChannelAcceptor : public Acceptor {
bool isRunning;
boost::ptr_vector<EventChannelConnection> connections;
AcceptEvent acceptEvent;
- SessionHandlerFactory* factory;
+ ConnectionInputHandlerFactory* factory;
bool isShutdown;
EventChannelThreads::shared_ptr threads;
};
@@ -100,7 +100,7 @@ int EventChannelAcceptor::getPort() const {
return port; // Immutable no need for lock.
}
-void EventChannelAcceptor::run(SessionHandlerFactory& f) {
+void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) {
{
Mutex::ScopedLock l(lock);
if (!isRunning && !isShutdown) {
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
index 196dde5af8..4449dc3035 100644
--- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
@@ -22,7 +22,7 @@
#include <boost/assert.hpp>
#include "EventChannelConnection.h"
-#include "sys/SessionHandlerFactory.h"
+#include "sys/ConnectionInputHandlerFactory.h"
#include "QpidError.h"
using namespace std;
@@ -36,7 +36,7 @@ const size_t EventChannelConnection::bufferSize = 65536;
EventChannelConnection::EventChannelConnection(
EventChannelThreads::shared_ptr threads_,
- SessionHandlerFactory& factory_,
+ ConnectionInputHandlerFactory& factory_,
int rfd,
int wfd,
bool isTrace_
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h
index bace045993..1504e92651 100644
--- a/cpp/lib/common/sys/posix/EventChannelConnection.h
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.h
@@ -24,17 +24,17 @@
#include "EventChannelThreads.h"
#include "sys/Monitor.h"
#include "sys/SessionContext.h"
-#include "sys/SessionHandler.h"
+#include "sys/ConnectionInputHandler.h"
#include "sys/AtomicCount.h"
#include "framing/AMQFrame.h"
namespace qpid {
namespace sys {
-class SessionHandlerFactory;
+class ConnectionInputHandlerFactory;
/**
- * Implements SessionContext and delegates to a SessionHandler
+ * Implements SessionContext and delegates to a ConnectionInputHandler
* for a connection via the EventChannel.
*@param readDescriptor file descriptor for reading.
*@param writeDescriptor file descriptor for writing,
@@ -44,7 +44,7 @@ class EventChannelConnection : public SessionContext {
public:
EventChannelConnection(
EventChannelThreads::shared_ptr threads,
- SessionHandlerFactory& factory,
+ ConnectionInputHandlerFactory& factory,
int readDescriptor,
int writeDescriptor = 0,
bool isTrace = false
@@ -86,7 +86,7 @@ class EventChannelConnection : public SessionContext {
AtomicCount busyThreads;
EventChannelThreads::shared_ptr threads;
- std::auto_ptr<SessionHandler> handler;
+ std::auto_ptr<ConnectionInputHandler> handler;
qpid::framing::Buffer in, out;
FrameQueue writeFrames;
bool isTrace;
diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
index 842aa76f36..a80a6c61f7 100644
--- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
@@ -32,7 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
class PosixAcceptor : public Acceptor {
public:
virtual int16_t getPort() const { fail(); return 0; }
- virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); }
+ virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); }
virtual void shutdown() { fail(); }
};
diff --git a/cpp/tests/AcceptorTest.cpp b/cpp/tests/AcceptorTest.cpp
index 394dfea463..34a51888d4 100644
--- a/cpp/tests/AcceptorTest.cpp
+++ b/cpp/tests/AcceptorTest.cpp
@@ -28,7 +28,7 @@
#include "framing/Buffer.h"
#include "qpid_test_plugin.h"
-#include "MockSessionHandler.h"
+#include "MockConnectionInputHandler.h"
using namespace qpid::sys;
using namespace qpid::framing;
@@ -45,7 +45,7 @@ class AcceptorTest : public CppUnit::TestCase, private Runnable
CPPUNIT_TEST_SUITE_END();
private:
- MockSessionHandlerFactory factory;
+ MockConnectionInputHandlerFactory factory;
Acceptor::shared_ptr acceptor;
public:
diff --git a/cpp/tests/EventChannelConnectionTest.cpp b/cpp/tests/EventChannelConnectionTest.cpp
index a6b309d771..5e94b07dbd 100644
--- a/cpp/tests/EventChannelConnectionTest.cpp
+++ b/cpp/tests/EventChannelConnectionTest.cpp
@@ -24,11 +24,11 @@
#include "framing/AMQHeartbeatBody.h"
#include "framing/AMQFrame.h"
#include "sys/posix/EventChannelConnection.h"
-#include "sys/SessionHandler.h"
-#include "sys/SessionHandlerFactory.h"
+#include "sys/ConnectionInputHandler.h"
+#include "sys/ConnectionInputHandlerFactory.h"
#include "sys/Socket.h"
#include "qpid_test_plugin.h"
-#include "MockSessionHandler.h"
+#include "MockConnectionInputHandler.h"
using namespace qpid::sys;
using namespace qpid::framing;
@@ -100,7 +100,7 @@ class EventChannelConnectionTest : public CppUnit::TestCase
EventChannelThreads::shared_ptr threads;
int pipe[2];
std::auto_ptr<EventChannelConnection> connection;
- MockSessionHandlerFactory factory;
+ MockConnectionInputHandlerFactory factory;
};
// Make this test suite a plugin.
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index 4822309102..d08249e759 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -12,7 +12,7 @@ INCLUDES = \
EXTRA_DIST = \
topictest \
qpid_test_plugin.h \
- MockSessionHandler.h
+ MockConnectionInputHandler.h
client_exe_tests = \
diff --git a/cpp/tests/MockSessionHandler.h b/cpp/tests/MockConnectionInputHandler.h
index aace780ac9..522df0900f 100644
--- a/cpp/tests/MockSessionHandler.h
+++ b/cpp/tests/MockConnectionInputHandler.h
@@ -1,5 +1,5 @@
-#ifndef _tests_MockSessionHandler_h
-#define _tests_MockSessionHandler_h
+#ifndef _tests_MockConnectionInputHandler_h
+#define _tests_MockConnectionInputHandler_h
/*
*
@@ -19,16 +19,16 @@
*
*/
-#include "sys/SessionHandler.h"
-#include "sys/SessionHandlerFactory.h"
+#include "sys/ConnectionInputHandler.h"
+#include "sys/ConnectionInputHandlerFactory.h"
#include "sys/Monitor.h"
#include "framing/ProtocolInitiation.h"
-struct MockSessionHandler : public qpid::sys::SessionHandler {
+struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
- MockSessionHandler() : state(START) {}
+ MockConnectionInputHandler() : state(START) {}
- ~MockSessionHandler() {}
+ ~MockConnectionInputHandler() {}
void initiated(qpid::framing::ProtocolInitiation* pi) {
qpid::sys::Monitor::ScopedLock l(monitor);
@@ -86,12 +86,12 @@ struct MockSessionHandler : public qpid::sys::SessionHandler {
};
-struct MockSessionHandlerFactory : public qpid::sys::SessionHandlerFactory {
- MockSessionHandlerFactory() : handler(0) {}
+struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory {
+ MockConnectionInputHandlerFactory() : handler(0) {}
- qpid::sys::SessionHandler* create(qpid::sys::SessionContext*) {
+ qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext*) {
qpid::sys::Monitor::ScopedLock lock(monitor);
- handler = new MockSessionHandler();
+ handler = new MockConnectionInputHandler();
monitor.notifyAll();
return handler;
}
@@ -104,10 +104,10 @@ struct MockSessionHandlerFactory : public qpid::sys::SessionHandlerFactory {
CPPUNIT_ASSERT(monitor.wait(deadline));
}
- MockSessionHandler* handler;
+ MockConnectionInputHandler* handler;
qpid::sys::Monitor monitor;
};
-#endif /*!_tests_MockSessionHandler_h*/
+#endif /*!_tests_MockConnectionInputHandler_h*/