diff options
author | Alan Conway <aconway@apache.org> | 2007-02-07 10:13:41 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-07 10:13:41 +0000 |
commit | 1977153241e86e93b237d2ed7fe02883d44646c5 (patch) | |
tree | 68ea88d60712a2459b524add42e412f4ae8ce9b6 /cpp/lib | |
parent | 877e7ae368d4320bd60ba5750be207a5cac13f43 (diff) | |
download | qpid-python-1977153241e86e93b237d2ed7fe02883d44646c5.tar.gz |
* broker/BrokerMessage.cpp: Added ConnectionToken publisher.
* cpp/lib/broker/BrokerMessageMessage.cpp:
- Added ConnectionToken publisher.
- Implemented getDeliveryMode, getApplicationHeaders
* cpp/lib/broker/Reference.cpp: Holds MessageMessage instead of just
MessageTransferBody.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 2 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 17 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 2 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 14 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 84 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 19 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 15 | ||||
-rw-r--r-- | cpp/lib/broker/Reference.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/broker/Reference.h | 25 |
10 files changed, 94 insertions, 96 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index ba1ccb7031..84ac747846 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -188,7 +188,9 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){ +void Channel::handleInlineTransfer( + Message::shared_ptr msg, Exchange::shared_ptr& exch) +{ if(transactional){ TxPublish* deliverable = new TxPublish(msg); exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index cbad2382a8..6e906e7615 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -139,7 +139,7 @@ class Channel : public framing::ChannelAdapter, void handleContent(boost::shared_ptr<framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); - void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exchange); + void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& exchange); // For ChannelAdapter void handleMethodInContext( diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 43a22ab6b9..d232efff16 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -41,11 +41,10 @@ BasicMessage::BasicMessage( const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo ) : - Message(_exchange, _routingKey, _mandatory, _immediate, respondTo), - publisher(_publisher), + Message(_publisher, _exchange, _routingKey, _mandatory, + _immediate, respondTo), size(0) -{ -} +{} // FIXME aconway 2007-02-01: remove. // BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : @@ -56,7 +55,7 @@ BasicMessage::BasicMessage( // } // For tests only. -BasicMessage::BasicMessage() : publisher(0), size(0) +BasicMessage::BasicMessage() : size(0) {} BasicMessage::~BasicMessage(){ @@ -126,10 +125,6 @@ const FieldTable& BasicMessage::getApplicationHeaders(){ return getHeaderProperties()->getHeaders(); } -const ConnectionToken* const BasicMessage::getPublisher(){ - return publisher; -} - bool BasicMessage::isPersistent() { if(!header) return false; @@ -230,12 +225,14 @@ void BasicMessage::releaseContent(MessageStore* store) store->stage(this); } if (!content.get() || content->size() > 0) { + // FIXME aconway 2007-02-07: handle MessageMessage. //set content to lazy loading mode (but only if there is stored content): //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is // then set as a member of that message so its lifetime is guaranteed to be no longer than // that of the message itself - content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize())); + content = std::auto_ptr<Content>( + new LazyLoadedContent(store, this, expectedContentSize())); } } diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index d56912ea60..308fcc1791 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -52,7 +52,6 @@ using framing::string; * request. */ class BasicMessage : public Message { - const ConnectionToken* const publisher; framing::AMQHeaderBody::shared_ptr header; std::auto_ptr<Content> content; sys::Mutex contentLock; @@ -72,7 +71,6 @@ class BasicMessage : public Message { void setHeader(framing::AMQHeaderBody::shared_ptr header); void addContent(framing::AMQContentBody::shared_ptr data); bool isComplete(); - const ConnectionToken* const getPublisher(); void deliver(framing::ChannelAdapter&, const string& consumerTag, diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index d5e37fbc7a..32767191ca 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -40,10 +40,10 @@ class BasicHeaderProperties; class FieldTable; } -namespace broker { -class MessageStore; +namespace broker { class ConnectionToken; +class MessageStore; /** * Base class for all types of internal broker messages @@ -51,6 +51,7 @@ class ConnectionToken; * TODO; AMS: for the moment this is mostly a placeholder */ class Message{ + const ConnectionToken* publisher; std::string exchange; std::string routingKey; const bool mandatory; @@ -62,9 +63,12 @@ class Message{ public: typedef boost::shared_ptr<Message> shared_ptr; - Message(const std::string& _exchange, const std::string& _routingKey, + Message(const ConnectionToken* publisher_, + const std::string& _exchange, + const std::string& _routingKey, bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo_) : + publisher(publisher_), exchange(_exchange), routingKey(_routingKey), mandatory(_mandatory), @@ -122,7 +126,9 @@ class Message{ virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; virtual const framing::FieldTable& getApplicationHeaders() = 0; virtual bool isPersistent() = 0; - virtual const ConnectionToken* const getPublisher() = 0; + virtual const ConnectionToken* getPublisher() const { + return publisher; + } virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index d7020b8923..29186cc18e 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -25,26 +25,24 @@ #include "MessageAppendBody.h" #include "Reference.h" #include "framing/FieldTable.h" +#include "framing/BasicHeaderProperties.h" #include <iostream> using namespace std; -using namespace qpid::broker; using namespace qpid::framing; - -MessageMessage::MessageMessage(TransferPtr transfer_) - : Message(transfer_->getDestination(), transfer_->getRoutingKey(), - transfer_->getMandatory(), transfer_->getImmediate(), - transfer_), - transfer(transfer_) -{} -MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref) - : Message(transfer_->getDestination(), transfer_->getRoutingKey(), - transfer_->getMandatory(), transfer_->getImmediate(), - transfer_), - transfer(transfer_), - appends(ref.getAppends()) +namespace qpid { +namespace broker { + +MessageMessage::MessageMessage( + ConnectionToken* publisher, TransferPtr transfer_ +) : Message(publisher, transfer_->getDestination(), + transfer_->getRoutingKey(), + transfer_->getMandatory(), + transfer_->getImmediate(), + transfer_), + transfer(transfer_) {} void MessageMessage::deliver( @@ -55,29 +53,29 @@ void MessageMessage::deliver( { channel.send( new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - transfer->getBody(), - transfer->getMandatory())); + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + transfer->getBody(), + transfer->getMandatory())); } void MessageMessage::sendGetOk( @@ -107,19 +105,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() const FieldTable& MessageMessage::getApplicationHeaders() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); return transfer->getApplicationHeaders(); } bool MessageMessage::isPersistent() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return false; // FIXME aconway 2007-02-05: -} - -const ConnectionToken* const MessageMessage::getPublisher() -{ - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return transfer->getDeliveryMode() == PERSISTENT; } u_int32_t MessageMessage::encodedSize() @@ -146,3 +136,5 @@ u_int64_t MessageMessage::expectedContentSize() return 0; // FIXME aconway 2007-02-05: } + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index 5310ef65b3..fb0a4749d4 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -35,19 +35,25 @@ class MessageApppendBody; } namespace broker { +class ConnectionToken; class Reference; class MessageMessage: public Message{ public: - typedef Reference::TransferPtr TransferPtr; + typedef boost::shared_ptr<MessageMessage> shared_ptr; + typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; typedef Reference::AppendPtr AppendPtr; typedef Reference::Appends Appends; - MessageMessage(TransferPtr transfer); - MessageMessage(TransferPtr transfer, const Reference&); + MessageMessage(ConnectionToken* publisher, TransferPtr transfer); // Default destructor okay - + + TransferPtr getTransfer() { return transfer; } + + const Appends& getAppends() { return appends; } + void setAppends(const Appends& appends_) { appends = appends_; } + void deliver(framing::ChannelAdapter& channel, const std::string& consumerTag, u_int64_t deliveryTag, @@ -64,19 +70,16 @@ class MessageMessage: public Message{ framing::BasicHeaderProperties* getHeaderProperties(); const framing::FieldTable& getApplicationHeaders(); bool isPersistent(); - const ConnectionToken* const getPublisher(); u_int32_t encodedSize(); u_int32_t encodedHeaderSize(); u_int32_t encodedContentSize(); u_int64_t expectedContentSize(); - TransferPtr getTransfer() { return transfer; } - const Appends& getAppends() { return appends; } private: const TransferPtr transfer; - const Appends appends; + Appends appends; }; }} diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 5f5e9b84e7..49c4153185 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -217,14 +217,13 @@ MessageHandlerImpl::transfer(const MethodContext& context, MessageTransferBody::shared_ptr transfer( boost::shared_polymorphic_downcast<MessageTransferBody>( context.methodBody)); - if (body.isInline()) { - Message::shared_ptr msg(new MessageMessage(transfer)); - channel.handleInlineTransfer(msg, exchange); - } - else { - // Add to reference. - references.get(body.getValue()).transfer(transfer); - } + MessageMessage::shared_ptr message( + new MessageMessage(&connection, transfer)); + + if (body.isInline()) + channel.handleInlineTransfer(message, exchange); + else + references.get(body.getValue()).addMessage(message); client.ok(context); } diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp index a5e734d77a..dc9cb94f84 100644 --- a/cpp/lib/broker/Reference.cpp +++ b/cpp/lib/broker/Reference.cpp @@ -43,14 +43,14 @@ Reference& ReferenceRegistry::get(const Reference::Id& id) { } void Reference::close() { - for_each(transfers.begin(), transfers.end(), + for_each(messages.begin(), messages.end(), boost::bind(&Reference::complete, this, _1)); registry->references.erase(getId()); } -void Reference::complete(TransferPtr transfer) { - MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this)); - registry->handler.complete(msg); +void Reference::complete(MessagePtr message) { + message->setAppends(appends); + registry->handler.complete(message); } }} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h index ecaca3de41..77c315bbc5 100644 --- a/cpp/lib/broker/Reference.h +++ b/cpp/lib/broker/Reference.h @@ -28,20 +28,21 @@ namespace qpid { namespace framing { -class MessageTransferBody; class MessageAppendBody; } namespace broker { +class MessageMessage; class CompletionHandler; class ReferenceRegistry; /** * A reference is an accumulation point for data in a multi-frame - * message. A reference can be used by multiple transfer commands, so - * the reference tracks which commands are using it. When the reference - * is closed, all the associated transfers are completed. + * message. A reference can be used by multiple transfer commands to + * create multiple messages, so the reference tracks which commands + * are using it. When the reference is closed, all the associated + * transfers are completed. * * THREAD UNSAFE: per-channel resource, access to channels is * serialized. @@ -50,8 +51,8 @@ class Reference { public: typedef std::string Id; - typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; - typedef std::vector<TransferPtr> Transfers; + typedef boost::shared_ptr<MessageMessage> MessagePtr; + typedef std::vector<MessagePtr> Messages; typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; typedef std::vector<AppendPtr> Appends; @@ -60,24 +61,24 @@ class Reference const std::string& getId() const { return id; } - /** Add a transfer to be completed with this reference */ - void transfer(TransferPtr transfer) { transfers.push_back(transfer); } + /** Add a message to be completed with this reference */ + void addMessage(MessagePtr message) { messages.push_back(message); } /** Append more data to the reference */ void append(AppendPtr ptr) { appends.push_back(ptr); } - /** Close the reference, complete each associated transfer */ + /** Close the reference, complete each associated message */ void close(); const Appends& getAppends() const { return appends; } - const Transfers& getTransfers() const { return transfers; } + const Messages& getMessages() const { return messages; } private: - void complete(TransferPtr transfer); + void complete(MessagePtr message); Id id; ReferenceRegistry* registry; - Transfers transfers; + Messages messages; Appends appends; }; |