diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2006-12-14 21:38:28 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2006-12-14 21:38:28 +0000 |
commit | 295cdabf2cea8feb127b094cc123326404551fa4 (patch) | |
tree | ec552e78fd6ea957f270f6a2c0800de956c0cf3b | |
parent | ad806562f83ebca3bc9d246772b235eb9c696b82 (diff) | |
download | qpid-python-295cdabf2cea8feb127b094cc123326404551fa4.tar.gz |
Broker side dynamic version hook up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487359 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 85 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.h | 2 |
2 files changed, 52 insertions, 35 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 8757cc2fc3..ad203b4515 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -37,9 +37,6 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, AutoDelete* _cleaner, const Settings& _settings) : context(_context), -// AMQP version management change - kpvdr 2006-11-17 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - client(context, 8, 0), queues(_queues), exchanges(_exchanges), cleaner(_cleaner), @@ -51,9 +48,18 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, queueHandler(new QueueHandlerImpl(this)), txHandler(new TxHandlerImpl(this)), framemax(65536), - heartbeat(0) {} + heartbeat(0) + { + + client =NULL; +} -SessionHandlerImpl::~SessionHandlerImpl(){} +SessionHandlerImpl::~SessionHandlerImpl(){ + + if (client != NULL) + delete client; + +} Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ channel_iterator i = channels.find(channel); @@ -96,12 +102,12 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ }catch(ChannelException& e){ channels[channel]->close(); channels.erase(channel); - client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ - client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ string error(e.what()); - client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); + client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); } break; @@ -120,14 +126,21 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ } } -void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){ - //send connection start - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); - client.getConnection().start(0, 8, 0, properties, mechanisms, locales); +void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ + + if (client == NULL) + { + client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); + + //send connection start + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); // channel, majour, minor + client->getConnection().start(0, header->getMajor(), header->getMinor(), properties, mechanisms, locales); + } } + void SessionHandlerImpl::idleOut(){ } @@ -169,8 +182,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ void SessionHandlerImpl::ConnectionHandlerImpl::startOk( u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ - - parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); + parent->client->getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} @@ -182,14 +194,14 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_ void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; - parent->client.getConnection().openOk(0, knownhosts); + parent->client->getConnection().openOk(0, knownhosts); } void SessionHandlerImpl::ConnectionHandlerImpl::close( u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - parent->client.getConnection().closeOk(0); + parent->client->getConnection().closeOk(0); parent->context->close(); } @@ -202,7 +214,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, parent->queues->getStore(), parent->settings.stagingThreshold); - parent->client.getChannel().openOk(channel); + parent->client->getChannel().openOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} @@ -215,7 +227,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t parent->channels.erase(channel); c->close(); delete c; - parent->client.getChannel().closeOk(channel); + parent->client->getChannel().closeOk(channel); } } @@ -242,17 +254,17 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 throw ConnectionException(503, "Exchange type not implemented: " + type); } } - if(!nowait){ - parent->client.getExchange().declareOk(channel); + parent->client->getExchange().declareOk(channel); } } void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ + //TODO: implement unused parent->exchanges->destroy(exchange); - if(!nowait) parent->client.getExchange().deleteOk(channel); + if(!nowait) parent->client->getExchange().deleteOk(channel); } void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, @@ -286,7 +298,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t } if (!nowait) { string queueName = queue->getName(); - parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); + parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); } } @@ -302,7 +314,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t // exchange->bind(queue, routingKey, &arguments); string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) parent->client.getQueue().bindOk(channel); + if(!nowait) parent->client->getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } @@ -312,7 +324,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /* Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); - if(!nowait) parent->client.getQueue().purgeOk(channel, count); + if(!nowait) parent->client->getQueue().purgeOk(channel, count); } void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, @@ -334,7 +346,8 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t q->destroy(); parent->queues->destroy(queue); } - if(!nowait) parent->client.getQueue().deleteOk(channel, count); + + if(!nowait) parent->client->getQueue().deleteOk(channel, count); } @@ -344,7 +357,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref //TODO: handle global parent->getChannel(channel)->setPrefetchSize(prefetchSize); parent->getChannel(channel)->setPrefetchCount(prefetchCount); - parent->client.getBasic().qosOk(channel); + parent->client->getBasic().qosOk(channel); } void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, @@ -361,7 +374,8 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ try{ string newTag = consumerTag; channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag); + + if(!nowait) parent->client->getBasic().consumeOk(channelId, newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -374,7 +388,8 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ parent->getChannel(channel)->cancel(consumerTag); - if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); + + if(!nowait) parent->client->getBasic().cancelOk(channel, consumerTag); } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, @@ -394,7 +409,8 @@ void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /* Queue::shared_ptr queue = parent->getQueue(queueName, channelId); if(!parent->getChannel(channelId)->get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack - parent->client.getBasic().getEmpty(channelId, clusterId); + + parent->client->getBasic().getEmpty(channelId, clusterId); } } @@ -414,17 +430,18 @@ void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool reque void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ parent->getChannel(channel)->begin(); - parent->client.getTx().selectOk(channel); + parent->client->getTx().selectOk(channel); } void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ parent->getChannel(channel)->commit(); - parent->client.getTx().commitOk(channel); + parent->client->getTx().commitOk(channel); } void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ + parent->getChannel(channel)->rollback(); - parent->client.getTx().rollbackOk(channel); + parent->client->getTx().rollbackOk(channel); parent->getChannel(channel)->recover(false); } diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 98c87a7806..043ad8bf98 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -76,7 +76,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; qpid::sys::SessionContext* context; - qpid::framing::AMQP_ClientProxy client; + qpid::framing::AMQP_ClientProxy* client; QueueRegistry* queues; ExchangeRegistry* const exchanges; AutoDelete* const cleaner; |