summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-19 08:27:36 +0000
committerGordon Sim <gsim@apache.org>2007-07-19 08:27:36 +0000
commitb87a1e9d27755e2f98792567c29a0625b92c8654 (patch)
treecb1232987efbfa1cc0ef8ec5e62b07b6b6c918b6 /cpp/src/qpid/broker/MessageHandlerImpl.cpp
parentdfe8a370b6580446cf970e27562ad0385178922f (diff)
downloadqpid-python-b87a1e9d27755e2f98792567c29a0625b92c8654.tar.gz
removed the need to pass MethodContext/RequestId through proxy and handler/adapter interfaces
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557522 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp108
1 files changed, 35 insertions, 73 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 252b465cc5..c9fbc2b95d 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -42,72 +42,64 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
//
void
-MessageHandlerImpl::cancel(const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::open(const MethodContext& context,
- const string& reference)
+MessageHandlerImpl::open(const string& reference)
{
references.open(reference);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::append(const MethodContext& context,
- const string& reference,
- const string& /*bytes*/ )
+MessageHandlerImpl::append(const framing::MethodContext& context)
{
- references.get(reference)->append(
- boost::shared_polymorphic_downcast<MessageAppendBody>(
- context.methodBody));
- client.ok(context.getRequestId());
+ MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
+ references.get(body->getReference())->append(body);
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::close(const MethodContext& context,
- const string& reference)
+MessageHandlerImpl::close(const string& reference)
{
Reference::shared_ptr ref = references.get(reference);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
// Send any transfer messages to their correct exchanges and okay them
const Reference::Messages& msgs = ref->getMessages();
for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
channel.handleInlineTransfer(*m);
- client.ok((*m)->getRequestId());
+ client.setResponseTo((*m)->getRequestId());
+ client.ok();
}
ref->close();
}
void
-MessageHandlerImpl::checkpoint(const MethodContext& context,
- const string& /*reference*/,
+MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
// Initial implementation (which is conforming) is to do nothing here
// and return offset zero for the resume
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::resume(const MethodContext& context,
- const string& reference,
+MessageHandlerImpl::resume(const string& reference,
const string& /*identifier*/ )
{
// Initial (null) implementation
// open reference and return 0 offset
references.open(reference);
- client.offset(0, context.getRequestId());
+ client.offset(0);//GRS, );//GRS, context.getRequestId());
}
void
-MessageHandlerImpl::offset(const MethodContext&,
- uint64_t /*value*/ )
+MessageHandlerImpl::offset(uint64_t /*value*/ )
{
// Shouldn't ever receive this as it is reponse to resume
// which is never sent
@@ -116,8 +108,7 @@ MessageHandlerImpl::offset(const MethodContext&,
}
void
-MessageHandlerImpl::consume(const MethodContext& context,
- uint16_t /*ticket*/,
+MessageHandlerImpl::consume(uint16_t /*ticket*/,
const string& queueName,
const string& destination,
bool noLocal,
@@ -132,14 +123,13 @@ MessageHandlerImpl::consume(const MethodContext& context,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
void
-MessageHandlerImpl::get( const MethodContext& context,
- uint16_t /*ticket*/,
+MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& queueName,
const string& destination,
bool noAck )
@@ -148,13 +138,13 @@ MessageHandlerImpl::get( const MethodContext& context,
GetAdapter out(adapter, queue, destination, connection.getFrameMax());
if(channel.get(out, queue, !noAck))
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
else
- client.empty(context.getRequestId());
+ client.empty();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::empty( const MethodContext& )
+MessageHandlerImpl::empty()
{
// Shouldn't ever receive this as it is a response to get
// which is never sent
@@ -163,34 +153,31 @@ MessageHandlerImpl::empty( const MethodContext& )
}
void
-MessageHandlerImpl::ok(const MethodContext& /*context*/)
+MessageHandlerImpl::ok()
{
channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
}
void
-MessageHandlerImpl::qos(const MethodContext& context,
- uint32_t prefetchSize,
+MessageHandlerImpl::qos(uint32_t prefetchSize,
uint16_t prefetchCount,
bool /*global*/ )
{
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::recover(const MethodContext& context,
- bool requeue)
+MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
void
-MessageHandlerImpl::reject(const MethodContext& /*context*/,
- uint16_t /*code*/,
+MessageHandlerImpl::reject(uint16_t /*code*/,
const string& /*text*/ )
{
//channel.ack();
@@ -198,45 +185,20 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/,
}
void
-MessageHandlerImpl::transfer(const MethodContext& context,
- uint16_t /*ticket*/,
- const string& /* destination */,
- bool /*redelivered*/,
- bool /*immediate*/,
- uint64_t /*ttl*/,
- uint8_t /*priority*/,
- uint64_t /*timestamp*/,
- uint8_t /*deliveryMode*/,
- uint64_t /*expiration*/,
- const string& /*exchangeName*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const framing::FieldTable& /*applicationHeaders*/,
- const framing::Content& body,
- bool /*mandatory*/)
+MessageHandlerImpl::transfer(const framing::MethodContext& context)
{
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
RequestId requestId = context.getRequestId();
- if (body.isInline()) {
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, requestId, transfer));
+ if (transfer->getBody().isInline()) {
+ MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok(requestId);
+ client.ok();
} else {
- Reference::shared_ptr ref(references.get(body.getValue()));
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, requestId, transfer, ref));
+ Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
+ MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
ref->addMessage(message);
}
}