summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2006-12-14 21:38:28 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2006-12-14 21:38:28 +0000
commit295cdabf2cea8feb127b094cc123326404551fa4 (patch)
treeec552e78fd6ea957f270f6a2c0800de956c0cf3b
parentad806562f83ebca3bc9d246772b235eb9c696b82 (diff)
downloadqpid-python-295cdabf2cea8feb127b094cc123326404551fa4.tar.gz
Broker side dynamic version hook up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487359 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp85
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.h2
2 files changed, 52 insertions, 35 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index 8757cc2fc3..ad203b4515 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -37,9 +37,6 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
AutoDelete* _cleaner,
const Settings& _settings) :
context(_context),
-// AMQP version management change - kpvdr 2006-11-17
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- client(context, 8, 0),
queues(_queues),
exchanges(_exchanges),
cleaner(_cleaner),
@@ -51,9 +48,18 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
queueHandler(new QueueHandlerImpl(this)),
txHandler(new TxHandlerImpl(this)),
framemax(65536),
- heartbeat(0) {}
+ heartbeat(0)
+ {
+
+ client =NULL;
+}
-SessionHandlerImpl::~SessionHandlerImpl(){}
+SessionHandlerImpl::~SessionHandlerImpl(){
+
+ if (client != NULL)
+ delete client;
+
+}
Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
channel_iterator i = channels.find(channel);
@@ -96,12 +102,12 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
}catch(ChannelException& e){
channels[channel]->close();
channels.erase(channel);
- client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
- client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
string error(e.what());
- client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
+ client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -120,14 +126,21 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
}
}
-void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
- //send connection start
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+
+ if (client == NULL)
+ {
+ client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
+
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US"); // channel, majour, minor
+ client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales);
+ }
}
+
void SessionHandlerImpl::idleOut(){
}
@@ -169,8 +182,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
-
- parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+ parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
}
void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
@@ -182,14 +194,14 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_
void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
- parent->client.getConnection().openOk(0, knownhosts);
+ parent->client->getConnection().openOk(0, knownhosts);
}
void SessionHandlerImpl::ConnectionHandlerImpl::close(
u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- parent->client.getConnection().closeOk(0);
+ parent->client->getConnection().closeOk(0);
parent->context->close();
}
@@ -202,7 +214,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
parent->channels[channel] = new Channel(parent->context, channel, parent->framemax,
parent->queues->getStore(), parent->settings.stagingThreshold);
- parent->client.getChannel().openOk(channel);
+ parent->client->getChannel().openOk(channel);
}
void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
@@ -215,7 +227,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t
parent->channels.erase(channel);
c->close();
delete c;
- parent->client.getChannel().closeOk(channel);
+ parent->client->getChannel().closeOk(channel);
}
}
@@ -242,17 +254,17 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
throw ConnectionException(503, "Exchange type not implemented: " + type);
}
}
-
if(!nowait){
- parent->client.getExchange().declareOk(channel);
+ parent->client->getExchange().declareOk(channel);
}
}
void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
+
//TODO: implement unused
parent->exchanges->destroy(exchange);
- if(!nowait) parent->client.getExchange().deleteOk(channel);
+ if(!nowait) parent->client->getExchange().deleteOk(channel);
}
void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
@@ -286,7 +298,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
}
if (!nowait) {
string queueName = queue->getName();
- parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
@@ -302,7 +314,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
// exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) parent->client.getQueue().bindOk(channel);
+ if(!nowait) parent->client->getQueue().bindOk(channel);
}else{
throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
}
@@ -312,7 +324,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
int count = queue->purge();
- if(!nowait) parent->client.getQueue().purgeOk(channel, count);
+ if(!nowait) parent->client->getQueue().purgeOk(channel, count);
}
void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
@@ -334,7 +346,8 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
q->destroy();
parent->queues->destroy(queue);
}
- if(!nowait) parent->client.getQueue().deleteOk(channel, count);
+
+ if(!nowait) parent->client->getQueue().deleteOk(channel, count);
}
@@ -344,7 +357,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref
//TODO: handle global
parent->getChannel(channel)->setPrefetchSize(prefetchSize);
parent->getChannel(channel)->setPrefetchCount(prefetchCount);
- parent->client.getBasic().qosOk(channel);
+ parent->client->getBasic().qosOk(channel);
}
void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
@@ -361,7 +374,8 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
try{
string newTag = consumerTag;
channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag);
+
+ if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -374,7 +388,8 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
parent->getChannel(channel)->cancel(consumerTag);
- if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
+
+ if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag);
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
@@ -394,7 +409,8 @@ void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
if(!parent->getChannel(channelId)->get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- parent->client.getBasic().getEmpty(channelId, clusterId);
+
+ parent->client->getBasic().getEmpty(channelId, clusterId);
}
}
@@ -414,17 +430,18 @@ void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool reque
void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
parent->getChannel(channel)->begin();
- parent->client.getTx().selectOk(channel);
+ parent->client->getTx().selectOk(channel);
}
void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
parent->getChannel(channel)->commit();
- parent->client.getTx().commitOk(channel);
+ parent->client->getTx().commitOk(channel);
}
void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+
parent->getChannel(channel)->rollback();
- parent->client.getTx().rollbackOk(channel);
+ parent->client->getTx().rollbackOk(channel);
parent->getChannel(channel)->recover(false);
}
diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h
index 98c87a7806..043ad8bf98 100644
--- a/cpp/lib/broker/SessionHandlerImpl.h
+++ b/cpp/lib/broker/SessionHandlerImpl.h
@@ -76,7 +76,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
qpid::sys::SessionContext* context;
- qpid::framing::AMQP_ClientProxy client;
+ qpid::framing::AMQP_ClientProxy* client;
QueueRegistry* queues;
ExchangeRegistry* const exchanges;
AutoDelete* const cleaner;