diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-19 08:27:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-19 08:27:36 +0000 |
commit | b87a1e9d27755e2f98792567c29a0625b92c8654 (patch) | |
tree | cb1232987efbfa1cc0ef8ec5e62b07b6b6c918b6 /cpp/src/qpid/broker/MessageHandlerImpl.cpp | |
parent | dfe8a370b6580446cf970e27562ad0385178922f (diff) | |
download | qpid-python-b87a1e9d27755e2f98792567c29a0625b92c8654.tar.gz |
removed the need to pass MethodContext/RequestId through proxy and handler/adapter interfaces
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557522 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 108 |
1 files changed, 35 insertions, 73 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 252b465cc5..c9fbc2b95d 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -42,72 +42,64 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) // void -MessageHandlerImpl::cancel(const MethodContext& context, - const string& destination ) +MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::open(const MethodContext& context, - const string& reference) +MessageHandlerImpl::open(const string& reference) { references.open(reference); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::append(const MethodContext& context, - const string& reference, - const string& /*bytes*/ ) +MessageHandlerImpl::append(const framing::MethodContext& context) { - references.get(reference)->append( - boost::shared_polymorphic_downcast<MessageAppendBody>( - context.methodBody)); - client.ok(context.getRequestId()); + MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); + references.get(body->getReference())->append(body); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::close(const MethodContext& context, - const string& reference) +MessageHandlerImpl::close(const string& reference) { Reference::shared_ptr ref = references.get(reference); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); // Send any transfer messages to their correct exchanges and okay them const Reference::Messages& msgs = ref->getMessages(); for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { channel.handleInlineTransfer(*m); - client.ok((*m)->getRequestId()); + client.setResponseTo((*m)->getRequestId()); + client.ok(); } ref->close(); } void -MessageHandlerImpl::checkpoint(const MethodContext& context, - const string& /*reference*/, +MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { // Initial implementation (which is conforming) is to do nothing here // and return offset zero for the resume - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::resume(const MethodContext& context, - const string& reference, +MessageHandlerImpl::resume(const string& reference, const string& /*identifier*/ ) { // Initial (null) implementation // open reference and return 0 offset references.open(reference); - client.offset(0, context.getRequestId()); + client.offset(0);//GRS, );//GRS, context.getRequestId()); } void -MessageHandlerImpl::offset(const MethodContext&, - uint64_t /*value*/ ) +MessageHandlerImpl::offset(uint64_t /*value*/ ) { // Shouldn't ever receive this as it is reponse to resume // which is never sent @@ -116,8 +108,7 @@ MessageHandlerImpl::offset(const MethodContext&, } void -MessageHandlerImpl::consume(const MethodContext& context, - uint16_t /*ticket*/, +MessageHandlerImpl::consume(uint16_t /*ticket*/, const string& queueName, const string& destination, bool noLocal, @@ -132,14 +123,13 @@ MessageHandlerImpl::consume(const MethodContext& context, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } void -MessageHandlerImpl::get( const MethodContext& context, - uint16_t /*ticket*/, +MessageHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, const string& destination, bool noAck ) @@ -148,13 +138,13 @@ MessageHandlerImpl::get( const MethodContext& context, GetAdapter out(adapter, queue, destination, connection.getFrameMax()); if(channel.get(out, queue, !noAck)) - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); else - client.empty(context.getRequestId()); + client.empty();//GRS context.getRequestId()); } void -MessageHandlerImpl::empty( const MethodContext& ) +MessageHandlerImpl::empty() { // Shouldn't ever receive this as it is a response to get // which is never sent @@ -163,34 +153,31 @@ MessageHandlerImpl::empty( const MethodContext& ) } void -MessageHandlerImpl::ok(const MethodContext& /*context*/) +MessageHandlerImpl::ok() { channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); } void -MessageHandlerImpl::qos(const MethodContext& context, - uint32_t prefetchSize, +MessageHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/ ) { //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::recover(const MethodContext& context, - bool requeue) +MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } void -MessageHandlerImpl::reject(const MethodContext& /*context*/, - uint16_t /*code*/, +MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ ) { //channel.ack(); @@ -198,45 +185,20 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/, } void -MessageHandlerImpl::transfer(const MethodContext& context, - uint16_t /*ticket*/, - const string& /* destination */, - bool /*redelivered*/, - bool /*immediate*/, - uint64_t /*ttl*/, - uint8_t /*priority*/, - uint64_t /*timestamp*/, - uint8_t /*deliveryMode*/, - uint64_t /*expiration*/, - const string& /*exchangeName*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const framing::FieldTable& /*applicationHeaders*/, - const framing::Content& body, - bool /*mandatory*/) +MessageHandlerImpl::transfer(const framing::MethodContext& context) { MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); RequestId requestId = context.getRequestId(); - if (body.isInline()) { - MessageMessage::shared_ptr message( - new MessageMessage(&connection, requestId, transfer)); + if (transfer->getBody().isInline()) { + MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(requestId); + client.ok(); } else { - Reference::shared_ptr ref(references.get(body.getValue())); - MessageMessage::shared_ptr message( - new MessageMessage(&connection, requestId, transfer, ref)); + Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); + MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); ref->addMessage(message); } } |