diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-02-21 00:23:25 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-02-21 00:23:25 +0000 |
commit | be9a95607d8e831e8f7c5802828afe677c798b93 (patch) | |
tree | 1cbb1db9ad9532bb333079ba33753ac919bff383 /cpp/lib | |
parent | 5059a83a83b4c7e16e374c539f8ced77811f7e51 (diff) | |
download | qpid-python-be9a95607d8e831e8f7c5802828afe677c798b93.tar.gz |
r1152@fuschia: andrew | 2007-02-17 21:14:42 +0000
More support for references (and transfers of reference content)
r1220@fuschia: andrew | 2007-02-21 00:22:53 +0000
Working version of delivering Message Transfers by reference
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509834 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 45 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 18 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 115 | ||||
-rw-r--r-- | cpp/lib/broker/Reference.cpp | 19 | ||||
-rw-r--r-- | cpp/lib/broker/Reference.h | 23 |
5 files changed, 135 insertions, 85 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index f51722f2da..e1be57fad7 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -22,12 +22,14 @@ #include "BrokerMessageMessage.h" #include "ChannelAdapter.h" #include "MessageTransferBody.h" +#include "MessageOpenBody.h" +#include "MessageCloseBody.h" #include "MessageAppendBody.h" #include "Reference.h" #include "framing/FieldTable.h" #include "framing/BasicHeaderProperties.h" -#include <iostream> +#include <algorithm> using namespace std; using namespace qpid::framing; @@ -36,21 +38,50 @@ namespace qpid { namespace broker { MessageMessage::MessageMessage( - ConnectionToken* publisher, TransferPtr transfer_ + ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), transfer_->getMandatory(), transfer_->getImmediate(), transfer_), + requestId(requestId_), transfer(transfer_) {} +MessageMessage::MessageMessage( + ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_, + ReferencePtr reference_ +) : Message(publisher, transfer_->getDestination(), + transfer_->getRoutingKey(), + transfer_->getMandatory(), + transfer_->getImmediate(), + transfer_), + requestId(requestId_), + transfer(transfer_), + reference(reference_) +{} + void MessageMessage::deliver( framing::ChannelAdapter& channel, const std::string& consumerTag, u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { + const framing::Content& body = transfer->getBody(); + + // Send any reference data + if (!body.isInline()){ + // Open + channel.send(new MessageOpenBody(channel.getVersion(), reference->getId())); + // Appends + for(Reference::Appends::const_iterator a = reference->getAppends().begin(); + a != reference->getAppends().end(); + ++a) { + channel.send(new MessageAppendBody(*a->get())); + } + } + + // The the transfer channel.send( new MessageTransferBody(channel.getVersion(), transfer->getTicket(), @@ -74,8 +105,13 @@ void MessageMessage::deliver( transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - transfer->getBody(), + body, transfer->getMandatory())); + // Close any reference data + if (!body.isInline()){ + // Close + channel.send(new MessageCloseBody(channel.getVersion(), reference->getId())); + } } void MessageMessage::sendGetOk( @@ -120,11 +156,10 @@ bool MessageMessage::isComplete() u_int64_t MessageMessage::contentSize() const { - // FIXME astitcher 2007-2-7 only works for inline content if (transfer->getBody().isInline()) return transfer->getBody().getValue().size(); else - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); + return reference->getSize(); } qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index 89289a7fd0..3673fde9ed 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -23,7 +23,7 @@ */ #include "BrokerMessageBase.h" #include "MessageTransferBody.h" -#include "Reference.h" +#include "amqp_types.h" #include <vector> @@ -31,7 +31,6 @@ namespace qpid { namespace framing { class MessageTransferBody; -class MessageApppendBody; } namespace broker { @@ -42,17 +41,16 @@ class MessageMessage: public Message{ public: typedef boost::shared_ptr<MessageMessage> shared_ptr; typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; - typedef Reference::AppendPtr AppendPtr; - typedef Reference::Appends Appends; + typedef boost::shared_ptr<Reference> ReferencePtr; - MessageMessage(ConnectionToken* publisher, TransferPtr transfer); + MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer); + MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference); // Default destructor okay + framing::RequestId getRequestId() {return requestId; } TransferPtr getTransfer() { return transfer; } - - const Appends& getAppends() { return appends; } - void setAppends(const Appends& appends_) { appends = appends_; } + ReferencePtr getReference() { return reference; } void deliver(framing::ChannelAdapter& channel, const std::string& consumerTag, @@ -78,9 +76,9 @@ class MessageMessage: public Message{ u_int64_t expectedContentSize(); private: - + framing::RequestId requestId; const TransferPtr transfer; - Appends appends; + const ReferencePtr reference; }; }} diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 0853aebcb1..784f180d5c 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -33,46 +33,84 @@ namespace broker { using namespace framing; MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) - : HandlerImplType(parent), references(channel) {} + : HandlerImplType(parent) {} // // Message class method handlers // + +void +MessageHandlerImpl::cancel(const MethodContext& context, + const string& destination ) +{ + channel.cancel(destination); + client.ok(context.getRequestId()); +} + +void +MessageHandlerImpl::open(const MethodContext& context, + const string& reference) +{ + references.open(reference); + client.ok(context.getRequestId()); +} + void MessageHandlerImpl::append(const MethodContext& context, const string& reference, const string& /*bytes*/ ) { - references.get(reference).append( + references.get(reference)->append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); client.ok(context.getRequestId()); } - void -MessageHandlerImpl::cancel(const MethodContext& context, - const string& destination ) +MessageHandlerImpl::close(const MethodContext& context, + const string& reference) { - channel.cancel(destination); + Reference::shared_ptr ref = references.get(reference); client.ok(context.getRequestId()); + + // Send any transfer messages to their correct exchanges and okay them + const Reference::Messages& msgs = ref->getMessages(); + for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { + channel.handleInlineTransfer(*m); + client.ok((*m)->getRequestId()); + } + ref->close(); } void -MessageHandlerImpl::checkpoint(const MethodContext&, +MessageHandlerImpl::checkpoint(const MethodContext& context, const string& /*reference*/, const string& /*identifier*/ ) { - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); + // Initial implementation (which is conforming) is to do nothing here + // and return offset zero for the resume + client.ok(context.getRequestId()); } void -MessageHandlerImpl::close(const MethodContext& context, - const string& reference) +MessageHandlerImpl::resume(const MethodContext& context, + const string& reference, + const string& /*identifier*/ ) { - references.get(reference).close(); - client.ok(context.getRequestId()); + // Initial (null) implementation + // open reference and return 0 offset + references.open(reference); + client.offset(0, context.getRequestId()); +} + +void +MessageHandlerImpl::offset(const MethodContext&, + u_int64_t /*value*/ ) +{ + // Shouldn't ever receive this as it is reponse to resume + // which is never sent + // TODO astitcher 2007-02-16 What is the correct exception to throw here? + THROW_QPID_ERROR(INTERNAL_ERROR, "impossible"); } void @@ -98,14 +136,6 @@ MessageHandlerImpl::consume(const MethodContext& context, } void -MessageHandlerImpl::empty( const MethodContext& ) -{ - // Shouldn't ever receive this as it is a response to get - // TODO astitcher 2007-02-09 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); -} - -void MessageHandlerImpl::get( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -122,11 +152,12 @@ MessageHandlerImpl::get( const MethodContext& context, } void -MessageHandlerImpl::offset(const MethodContext&, - u_int64_t /*value*/ ) +MessageHandlerImpl::empty( const MethodContext& ) { - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); + // Shouldn't ever receive this as it is a response to get + // which is never sent + // TODO astitcher 2007-02-09 What is the correct exception to throw here? + THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); } void @@ -136,14 +167,6 @@ MessageHandlerImpl::ok(const MethodContext& /*context*/) } void -MessageHandlerImpl::open(const MethodContext& context, - const string& reference) -{ - references.open(reference); - client.ok(context.getRequestId()); -} - -void MessageHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, @@ -173,15 +196,6 @@ MessageHandlerImpl::reject(const MethodContext&, } void -MessageHandlerImpl::resume(const MethodContext&, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - // FIXME astitcher 2007-01-11: 0-9 feature - THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented "); -} - -void MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /* destination */, @@ -210,14 +224,19 @@ MessageHandlerImpl::transfer(const MethodContext& context, MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); - MessageMessage::shared_ptr message( - new MessageMessage(&connection, transfer)); + RequestId requestId = context.getRequestId(); - if (body.isInline()) + if (body.isInline()) { + MessageMessage::shared_ptr message( + new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - else - references.get(body.getValue()).addMessage(message); - client.ok(context.getRequestId()); + client.ok(requestId); + } else { + Reference::shared_ptr ref(references.get(body.getValue())); + MessageMessage::shared_ptr message( + new MessageMessage(&connection, requestId, transfer, ref)); + ref->addMessage(message); + } } diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp index 0aedef2bef..c4c33e6363 100644 --- a/cpp/lib/broker/Reference.cpp +++ b/cpp/lib/broker/Reference.cpp @@ -20,36 +20,35 @@ #include "Reference.h" #include "BrokerMessageMessage.h" #include "QpidError.h" +#include "MessageAppendBody.h" #include "CompletionHandler.h" namespace qpid { namespace broker { -Reference& ReferenceRegistry::open(const Reference::Id& id) { +Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) { ReferenceMap::iterator i = references.find(id); // TODO aconway 2007-02-05: should we throw Channel or Connection // exceptions here? if (i != references.end()) throw ConnectionException(503, "Attempt to re-open reference " +id); - return references[id] = Reference(id, this); + return references[id] = Reference::shared_ptr(new Reference(id, this)); } -Reference& ReferenceRegistry::get(const Reference::Id& id) { +Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) { ReferenceMap::iterator i = references.find(id); if (i == references.end()) throw ConnectionException(503, "Attempt to use non-existent reference "+id); return i->second; } -void Reference::close() { - for_each(messages.begin(), messages.end(), - boost::bind(&Reference::complete, this, _1)); - registry->references.erase(getId()); +void Reference::append(AppendPtr ptr) { + appends.push_back(ptr); + size += ptr->getBytes().length(); } -void Reference::complete(MessagePtr message) { - message->setAppends(appends); - registry->handler.complete(message); +void Reference::close() { + registry->references.erase(getId()); } }} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h index 77c315bbc5..7b3a63fca2 100644 --- a/cpp/lib/broker/Reference.h +++ b/cpp/lib/broker/Reference.h @@ -34,7 +34,6 @@ class MessageAppendBody; namespace broker { class MessageMessage; -class CompletionHandler; class ReferenceRegistry; /** @@ -51,21 +50,23 @@ class Reference { public: typedef std::string Id; + typedef boost::shared_ptr<Reference> shared_ptr; typedef boost::shared_ptr<MessageMessage> MessagePtr; typedef std::vector<MessagePtr> Messages; typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; typedef std::vector<AppendPtr> Appends; Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) - : id(id_), registry(reg) {} + : id(id_), size(0), registry(reg) {} const std::string& getId() const { return id; } + u_int64_t getSize() const { return size; } /** Add a message to be completed with this reference */ void addMessage(MessagePtr message) { messages.push_back(message); } /** Append more data to the reference */ - void append(AppendPtr ptr) { appends.push_back(ptr); } + void append(AppendPtr ptr); /** Close the reference, complete each associated message */ void close(); @@ -74,9 +75,8 @@ class Reference const Messages& getMessages() const { return messages; } private: - void complete(MessagePtr message); - Id id; + u_int64_t size; ReferenceRegistry* registry; Messages messages; Appends appends; @@ -91,17 +91,16 @@ class Reference */ class ReferenceRegistry { public: - ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {}; - Reference& open(const Reference::Id& id); - Reference& get(const Reference::Id& id); + ReferenceRegistry() {}; + Reference::shared_ptr open(const Reference::Id& id); + Reference::shared_ptr get(const Reference::Id& id); private: - typedef std::map<Reference::Id, Reference> ReferenceMap; - CompletionHandler& handler; + typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap; ReferenceMap references; - // Reference calls references.erase() and uses handler. - friend class Reference; + // Reference calls references.erase(). + friend class Reference; }; |