diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-02-21 00:23:25 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-02-21 00:23:25 +0000 |
commit | be9a95607d8e831e8f7c5802828afe677c798b93 (patch) | |
tree | 1cbb1db9ad9532bb333079ba33753ac919bff383 /cpp/lib/broker/MessageHandlerImpl.cpp | |
parent | 5059a83a83b4c7e16e374c539f8ced77811f7e51 (diff) | |
download | qpid-python-be9a95607d8e831e8f7c5802828afe677c798b93.tar.gz |
r1152@fuschia: andrew | 2007-02-17 21:14:42 +0000
More support for references (and transfers of reference content)
r1220@fuschia: andrew | 2007-02-21 00:22:53 +0000
Working version of delivering Message Transfers by reference
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509834 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 115 |
1 files changed, 67 insertions, 48 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 0853aebcb1..784f180d5c 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -33,46 +33,84 @@ namespace broker { using namespace framing; MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) - : HandlerImplType(parent), references(channel) {} + : HandlerImplType(parent) {} // // Message class method handlers // + +void +MessageHandlerImpl::cancel(const MethodContext& context, + const string& destination ) +{ + channel.cancel(destination); + client.ok(context.getRequestId()); +} + +void +MessageHandlerImpl::open(const MethodContext& context, + const string& reference) +{ + references.open(reference); + client.ok(context.getRequestId()); +} + void MessageHandlerImpl::append(const MethodContext& context, const string& reference, const string& /*bytes*/ ) { - references.get(reference).append( + references.get(reference)->append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); client.ok(context.getRequestId()); } - void -MessageHandlerImpl::cancel(const MethodContext& context, - const string& destination ) +MessageHandlerImpl::close(const MethodContext& context, + const string& reference) { - channel.cancel(destination); + Reference::shared_ptr ref = references.get(reference); client.ok(context.getRequestId()); + + // Send any transfer messages to their correct exchanges and okay them + const Reference::Messages& msgs = ref->getMessages(); + for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { + channel.handleInlineTransfer(*m); + client.ok((*m)->getRequestId()); + } + ref->close(); } void -MessageHandlerImpl::checkpoint(const MethodContext&, +MessageHandlerImpl::checkpoint(const MethodContext& context, const string& /*reference*/, const string& /*identifier*/ ) { - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); + // Initial implementation (which is conforming) is to do nothing here + // and return offset zero for the resume + client.ok(context.getRequestId()); } void -MessageHandlerImpl::close(const MethodContext& context, - const string& reference) +MessageHandlerImpl::resume(const MethodContext& context, + const string& reference, + const string& /*identifier*/ ) { - references.get(reference).close(); - client.ok(context.getRequestId()); + // Initial (null) implementation + // open reference and return 0 offset + references.open(reference); + client.offset(0, context.getRequestId()); +} + +void +MessageHandlerImpl::offset(const MethodContext&, + u_int64_t /*value*/ ) +{ + // Shouldn't ever receive this as it is reponse to resume + // which is never sent + // TODO astitcher 2007-02-16 What is the correct exception to throw here? + THROW_QPID_ERROR(INTERNAL_ERROR, "impossible"); } void @@ -98,14 +136,6 @@ MessageHandlerImpl::consume(const MethodContext& context, } void -MessageHandlerImpl::empty( const MethodContext& ) -{ - // Shouldn't ever receive this as it is a response to get - // TODO astitcher 2007-02-09 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); -} - -void MessageHandlerImpl::get( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -122,11 +152,12 @@ MessageHandlerImpl::get( const MethodContext& context, } void -MessageHandlerImpl::offset(const MethodContext&, - u_int64_t /*value*/ ) +MessageHandlerImpl::empty( const MethodContext& ) { - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); + // Shouldn't ever receive this as it is a response to get + // which is never sent + // TODO astitcher 2007-02-09 What is the correct exception to throw here? + THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); } void @@ -136,14 +167,6 @@ MessageHandlerImpl::ok(const MethodContext& /*context*/) } void -MessageHandlerImpl::open(const MethodContext& context, - const string& reference) -{ - references.open(reference); - client.ok(context.getRequestId()); -} - -void MessageHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, @@ -173,15 +196,6 @@ MessageHandlerImpl::reject(const MethodContext&, } void -MessageHandlerImpl::resume(const MethodContext&, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); -} - -void MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /* destination */, @@ -210,14 +224,19 @@ MessageHandlerImpl::transfer(const MethodContext& context, MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); - MessageMessage::shared_ptr message( - new MessageMessage(&connection, transfer)); + RequestId requestId = context.getRequestId(); - if (body.isInline()) + if (body.isInline()) { + MessageMessage::shared_ptr message( + new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - else - references.get(body.getValue()).addMessage(message); - client.ok(context.getRequestId()); + client.ok(requestId); + } else { + Reference::shared_ptr ref(references.get(body.getValue())); + MessageMessage::shared_ptr message( + new MessageMessage(&connection, requestId, transfer, ref)); + ref->addMessage(message); + } } |