summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-13 02:41:14 +0000
committerAlan Conway <aconway@apache.org>2007-02-13 02:41:14 +0000
commit9517deedff9691dbe3429b0b917dfd4208b0b1b8 (patch)
treef8868a2fbc63e92c770b401eeff2aee3a522697a /cpp/lib/broker/BrokerAdapter.cpp
parentd26ea3376f66f69486fe214c8a7a8b96a7605c99 (diff)
downloadqpid-python-9517deedff9691dbe3429b0b917dfd4208b0b1b8.tar.gz
* gentools/templ.cpp/*Proxy*, CppGenerator.java: Changes to Proxy
classes to make them directly usable as an API for low-level AMQP access. - Proxies hold reference to a ChannelAdapter not just an output handler. - Removed MethodContext parameter, makes no sense on requester end. - Return RequestId from request methods so caller can correlate incoming responses. - Add RequestId parameter to response methods so caller can provide correlation for outgoing responses. - No longer inherit from *Operations classes as the signatures no longer match. Proxy is for caller (client/requester) and Operations is for callee (server/responder) * cpp/lib/client/ClientChannel.h: Channel provides a raw proxy to the broker. Normal users will still use the Channel API to deal with the broker, but advanced users (incl ourselves!) can use the raw API to directly send and receive any AMQP message. * cpp/lib/broker/BrokerChannel,BrokerAdapter: Refactor for new proxies. broker::Channel is also a ClientProxy * Sundry files: - Pass ProtcolVersion by value, it is only two bytes. - Misc. const correctness fixes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@506823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp167
1 files changed, 98 insertions, 69 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 8b081874fc..ec80241c66 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -18,11 +18,10 @@
#include <boost/format.hpp>
#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
#include "Connection.h"
-#include "Exception.h"
#include "AMQMethodBody.h"
#include "Exception.h"
-#include "MessageHandlerImpl.h"
namespace qpid {
namespace broker {
@@ -33,18 +32,37 @@ using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/,
+
+BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ CoreRefs(ch, c, b),
+ connection(c),
+ basicHandler(*this),
+ channelHandler(*this),
+ connectionHandler(*this),
+ exchangeHandler(*this),
+ messageHandler(*this),
+ queueHandler(*this),
+ txHandler(*this)
+{}
+
+
+ProtocolVersion BrokerAdapter::getVersion() const {
+ return connection.getVersion();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(
- context, 100, connection.getFrameMax(), connection.getHeartbeat());
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(
+ 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(
const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ConnectionHandlerImpl::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
string knownhosts;
- connection.client->getConnection().openOk(context, knownhosts);
+ client.openOk(
+ knownhosts, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
+void BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(context);
+ client.closeOk(context.getRequestId());
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
+void BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- connection.client->getChannel().openOk(context, std::string()/* ID */);
+ client.openOk(
+ std::string()/* ID */, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
+void BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getChannel().closeOk(context);
+ client.closeOk(context.getRequestId());
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont
}
}
if(!nowait){
- connection.client->getExchange().declareOk(context);
+ client.declareOk(context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(context);
+ if(!nowait) client.deleteOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+ broker.getQueues().declare(
+ name, durable,
+ autoDelete ? connection.getTimeout() : 0,
+ exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext
% queue->getName());
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ client.declareOk(
+ queueName, queue->getMessageCount(), queue->getConsumerCount(),
+ context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(context);
+ if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c
}
void
-BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
+BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
exchange->unbind(queue, routingKey, &arguments);
- connection.client->getQueue().unbindOk(context);
+ client.unbindOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(context, count);
+ if(!nowait) client.purgeOk( count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(context, count);
+ if(!nowait)
+ client.deleteOk(count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(context);
+ client.qosOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
+void BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
+ if(!nowait) client.consumeOk(newTag, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
+ if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+void BrokerAdapter::BasicHandlerImpl::publish(
const MethodContext& context, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate)
@@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(context, clusterId);
+ client.getEmpty(clusterId, context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(context);
+ client.selectOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(context);
+ client.commitOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(context);
+ client.rollbackOk(context.getRequestId());
channel.recover(false);
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
@@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
// Message class method handlers
//
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
- connection.client->getChannel().pong(context);
+ client.ok(context.getRequestId());
+ client.pong();
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
+ client.ok(context.getRequestId());
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
+BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{