diff options
author | Alan Conway <aconway@apache.org> | 2007-01-19 21:33:27 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-19 21:33:27 +0000 |
commit | e861284318186f8d9cd64a7ddcc28b8d20b98721 (patch) | |
tree | 6dac612d65297dc5f104350884fc01385c69ecda /cpp/lib/broker/BrokerAdapter.cpp | |
parent | 226be67c91b25a5ba8efdd9ba88566033ec97718 (diff) | |
download | qpid-python-e861284318186f8d9cd64a7ddcc28b8d20b98721.tar.gz |
Last big refactoring for 0-9 framing. Still need additional tests &
debugging but the overall structure is all in place.
* configure.ac: Added -Wno_virtual_overload warning
* ChannelTest.cpp, MessageBuilderTest.cpp: Fixed virtual overload warnings.
* ChannelAdapter.cpp: Common base for client/broker adapters.
Creates invocation context, handles request/resposne IDs.
* CppGenerator.java:
- Proxies send methods using MethodContext.
* Various .h files: removed unnecessary #includes, added to requred .cpp files.
* ConnectionContext: renamed from SessionContext.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 94 |
1 files changed, 48 insertions, 46 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 0d34868710..fda7d15784 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -181,9 +181,9 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations }; void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( - const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat); + connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){} @@ -193,40 +193,40 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext connection.heartbeat = heartbeat; } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; - connection.client->getConnection().openOk(0, knownhosts); + connection.client->getConnection().openOk(context, knownhosts); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( - const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/, + const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getConnection().closeOk(0); - connection.context->close(); + connection.client->getConnection().closeOk(context); + connection.getOutput().close(); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){ - connection.context->close(); + connection.getOutput().close(); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( - const MethodContext&, const string& /*outOfBand*/){ + const MethodContext& context, const string& /*outOfBand*/){ // FIXME aconway 2007-01-17: Assertions on all channel methods, assertChannelNonZero(channel.getId()); if (channel.isOpen()) throw ConnectionException(504, "Channel already open"); channel.open(); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */); + connection.client->getChannel().openOk(context, std::string()/* ID */); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/, +void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ - connection.client->getChannel().closeOk(channel.getId()); + connection.client->getChannel().closeOk(context); // FIXME aconway 2007-01-18: Following line destroys this. Ugly. connection.closeChannel(channel.getId()); } @@ -235,7 +235,7 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&) -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& /*arguments*/){ @@ -258,19 +258,19 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& } } if(!nowait){ - connection.client->getExchange().declareOk(channel.getId()); + connection.client->getExchange().declareOk(context); } } -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(channel.getId()); + if(!nowait) connection.client->getExchange().deleteOk(context); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name, +void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; @@ -301,11 +301,11 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u } if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount()); + connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, +void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ @@ -314,7 +314,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_in if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(channel.getId()); + if(!nowait) connection.client->getQueue().bindOk(context); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -341,14 +341,14 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( connection.client->getQueue().unbindOk(channel.getId()); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count); + if(!nowait) connection.client->getQueue().purgeOk(context, count); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue, +void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -368,21 +368,21 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count); + if(!nowait) connection.client->getQueue().deleteOk(context, count); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(channel.getId()); + connection.client->getBasic().qosOk(context); } void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( - const MethodContext&, u_int16_t /*ticket*/, + const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) @@ -398,7 +398,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag); + if(!nowait) connection.client->getBasic().consumeOk(context, newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -409,10 +409,10 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( } -void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag); + if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); } void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/, @@ -429,12 +429,12 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(channel.getId(), clusterId); + connection.client->getBasic().getEmpty(context, clusterId); } } @@ -452,20 +452,20 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, b channel.recover(requeue); } -void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){ +void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); - connection.client->getTx().selectOk(channel.getId()); + connection.client->getTx().selectOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){ +void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); - connection.client->getTx().commitOk(channel.getId()); + connection.client->getTx().commitOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){ +void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); - connection.client->getTx().rollbackOk(channel.getId()); + connection.client->getTx().rollbackOk(context); channel.recover(false); } @@ -499,6 +499,7 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( BrokerAdapter::BrokerAdapter( Channel* ch, Connection& c, Broker& b ) : + ChannelAdapter(c.getOutput(), ch->getId()), channel(ch), connection(c), broker(b), @@ -507,24 +508,25 @@ BrokerAdapter::BrokerAdapter( assert(ch); } -void BrokerAdapter::handleMethod( - boost::shared_ptr<qpid::framing::AMQMethodBody> method) +void BrokerAdapter::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) { try{ - // FIXME aconway 2007-01-17: invoke to take Channel&? - method->invoke(*serverOps, channel->getId()); + method->invoke(*serverOps, context); }catch(ChannelException& e){ - connection.closeChannel(channel->getId()); + connection.closeChannel(getId()); connection.client->getChannel().close( - channel->getId(), e.code, e.toString(), + context, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ connection.client->getConnection().close( - 0, e.code, e.toString(), + context, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ connection.client->getConnection().close( - 0, 541/*internal error*/, e.what(), + context, 541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } |