diff options
author | Kim van der Riet <kpvdr@apache.org> | 2006-11-22 16:57:35 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2006-11-22 16:57:35 +0000 |
commit | d46ac2955c4871c9f22067f47490095e2c5f1806 (patch) | |
tree | 7e76ef7e4ca47e4cc57c83f7950bf97c3eceb210 /cpp/src/qpid/broker/SessionHandlerImpl.cpp | |
parent | 018723f3889e9a1f63585dddba8eecff1d168501 (diff) | |
download | qpid-python-d46ac2955c4871c9f22067f47490095e2c5f1806.tar.gz |
Merged AMQP version-sensitive generated files with C++ trunk. Phase 1 of merge complete - all locations where version info is required in the framing, broker and client code, the version has been hard-coded to mahor=8, minor=0. Next step: make broker and client version-aware.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerImpl.cpp | 64 |
1 files changed, 35 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index bbb5d22c8d..c8c7b440aa 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -37,7 +37,9 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, AutoDelete* _cleaner, const u_int32_t _timeout) : context(_context), - client(context), +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + client(context, 8, 0), queues(_queues), exchanges(_exchanges), cleaner(_cleaner), @@ -165,26 +167,26 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void SessionHandlerImpl::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/, - string& /*response*/, string& /*locale*/){ + u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/){ parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } -void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){} +void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){} void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ parent->framemax = framemax; parent->heartbeat = heartbeat; } -void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){ +void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; parent->client.getConnection().openOk(0, knownhosts); } void SessionHandlerImpl::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/, + u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { parent->client.getConnection().closeOk(0); @@ -197,7 +199,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ -void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){ +void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); parent->client.getChannel().openOk(channel); } @@ -205,7 +207,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*o void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/, +void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ Channel* c = parent->getChannel(channel); if(c){ @@ -220,9 +222,9 @@ void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} -void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type, +void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - FieldTable& /*arguments*/){ + const FieldTable& /*arguments*/){ if(passive){ if(!parent->exchanges->get(exchange)){ @@ -244,17 +246,17 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 parent->client.getExchange().declareOk(channel); } } - + void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, bool /*ifUnused*/, bool nowait){ + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused parent->exchanges->destroy(exchange); if(!nowait) parent->client.getExchange().deleteOk(channel); } -void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name, +void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, FieldTable& /*arguments*/){ + bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = parent->getQueue(name, channel); @@ -282,34 +284,37 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t throw ChannelException(405, "Cannot grant exclusive access to queue"); } if (!nowait) { - name = queue->getName(); - parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); + string queueName = queue->getName(); + parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); } } -void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, - string& exchangeName, string& routingKey, bool nowait, - FieldTable& arguments){ +void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); if(exchange){ - if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - exchange->bind(queue, routingKey, &arguments); +// kpvdr - cannot use this any longer as routingKey is now const +// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); +// exchange->bind(queue, routingKey, &arguments); + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) parent->client.getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } } -void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){ +void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); if(!nowait) parent->client.getQueue().purgeOk(channel, count); } -void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue, +void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -342,7 +347,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref } void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, - string& queueName, string& consumerTag, + const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait){ @@ -353,8 +358,9 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } try{ - channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); + string newTag = consumerTag; + channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0); + if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -365,13 +371,13 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } -void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ +void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){ parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchangeName, string& routingKey, + const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); @@ -383,7 +389,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t } } -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = parent->getQueue(queueName, channelId); if(!parent->getChannel(channelId)->get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack |