summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-19 21:33:27 +0000
committerAlan Conway <aconway@apache.org>2007-01-19 21:33:27 +0000
commite861284318186f8d9cd64a7ddcc28b8d20b98721 (patch)
tree6dac612d65297dc5f104350884fc01385c69ecda /cpp/lib/broker/BrokerAdapter.cpp
parent226be67c91b25a5ba8efdd9ba88566033ec97718 (diff)
downloadqpid-python-e861284318186f8d9cd64a7ddcc28b8d20b98721.tar.gz
Last big refactoring for 0-9 framing. Still need additional tests &
debugging but the overall structure is all in place. * configure.ac: Added -Wno_virtual_overload warning * ChannelTest.cpp, MessageBuilderTest.cpp: Fixed virtual overload warnings. * ChannelAdapter.cpp: Common base for client/broker adapters. Creates invocation context, handles request/resposne IDs. * CppGenerator.java: - Proxies send methods using MethodContext. * Various .h files: removed unnecessary #includes, added to requred .cpp files. * ConnectionContext: renamed from SessionContext. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp94
1 files changed, 48 insertions, 46 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 0d34868710..fda7d15784 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -181,9 +181,9 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
};
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
+ connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
@@ -193,40 +193,40 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext
connection.heartbeat = heartbeat;
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
- connection.client->getConnection().openOk(0, knownhosts);
+ connection.client->getConnection().openOk(context, knownhosts);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
- const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(0);
- connection.context->close();
+ connection.client->getConnection().closeOk(context);
+ connection.getOutput().close();
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.context->close();
+ connection.getOutput().close();
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
- const MethodContext&, const string& /*outOfBand*/){
+ const MethodContext& context, const string& /*outOfBand*/){
// FIXME aconway 2007-01-17: Assertions on all channel methods,
assertChannelNonZero(channel.getId());
if (channel.isOpen())
throw ConnectionException(504, "Channel already open");
channel.open();
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */);
+ connection.client->getChannel().openOk(context, std::string()/* ID */);
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
- connection.client->getChannel().closeOk(channel.getId());
+ connection.client->getChannel().closeOk(context);
// FIXME aconway 2007-01-18: Following line destroys this. Ugly.
connection.closeChannel(channel.getId());
}
@@ -235,7 +235,7 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&)
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -258,19 +258,19 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&
}
}
if(!nowait){
- connection.client->getExchange().declareOk(channel.getId());
+ connection.client->getExchange().declareOk(context);
}
}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(channel.getId());
+ if(!nowait) connection.client->getExchange().deleteOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -301,11 +301,11 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u
}
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount());
+ connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -314,7 +314,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_in
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(channel.getId());
+ if(!nowait) connection.client->getQueue().bindOk(context);
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -341,14 +341,14 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
connection.client->getQueue().unbindOk(channel.getId());
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count);
+ if(!nowait) connection.client->getQueue().purgeOk(context, count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -368,21 +368,21 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count);
+ if(!nowait) connection.client->getQueue().deleteOk(context, count);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(channel.getId());
+ connection.client->getBasic().qosOk(context);
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
- const MethodContext&, u_int16_t /*ticket*/,
+ const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
@@ -398,7 +398,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag);
+ if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -409,10 +409,10 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag);
+ if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
@@ -429,12 +429,12 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(channel.getId(), clusterId);
+ connection.client->getBasic().getEmpty(context, clusterId);
}
}
@@ -452,20 +452,20 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, b
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(channel.getId());
+ connection.client->getTx().selectOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(channel.getId());
+ connection.client->getTx().commitOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(channel.getId());
+ connection.client->getTx().rollbackOk(context);
channel.recover(false);
}
@@ -499,6 +499,7 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
BrokerAdapter::BrokerAdapter(
Channel* ch, Connection& c, Broker& b
) :
+ ChannelAdapter(c.getOutput(), ch->getId()),
channel(ch),
connection(c),
broker(b),
@@ -507,24 +508,25 @@ BrokerAdapter::BrokerAdapter(
assert(ch);
}
-void BrokerAdapter::handleMethod(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void BrokerAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
{
try{
- // FIXME aconway 2007-01-17: invoke to take Channel&?
- method->invoke(*serverOps, channel->getId());
+ method->invoke(*serverOps, context);
}catch(ChannelException& e){
- connection.closeChannel(channel->getId());
+ connection.closeChannel(getId());
connection.client->getChannel().close(
- channel->getId(), e.code, e.toString(),
+ context, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
connection.client->getConnection().close(
- 0, e.code, e.toString(),
+ context, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
connection.client->getConnection().close(
- 0, 541/*internal error*/, e.what(),
+ context, 541/*internal error*/, e.what(),
method->amqpClassId(), method->amqpMethodId());
}
}