diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/broker/BrokerMessageMessage.cpp | 221 | ||||
-rw-r--r-- | cpp/src/broker/BrokerMessageMessage.h | 27 | ||||
-rw-r--r-- | cpp/src/tests/ReferenceTest.cpp | 54 |
4 files changed, 160 insertions, 143 deletions
diff --git a/cpp/src/broker/Broker.cpp b/cpp/src/broker/Broker.cpp index 13da64915d..e0ab5e2a37 100644 --- a/cpp/src/broker/Broker.cpp +++ b/cpp/src/broker/Broker.cpp @@ -95,6 +95,7 @@ void Broker::run() { void Broker::shutdown() { if (acceptor) acceptor->shutdown(); + cleaner.stop(); } Broker::~Broker() { diff --git a/cpp/src/broker/BrokerMessageMessage.cpp b/cpp/src/broker/BrokerMessageMessage.cpp index f320a0915e..d8eb0fada0 100644 --- a/cpp/src/broker/BrokerMessageMessage.cpp +++ b/cpp/src/broker/BrokerMessageMessage.cpp @@ -48,7 +48,9 @@ MessageMessage::MessageMessage( transfer_), requestId(requestId_), transfer(transfer_) -{} +{ + assert(transfer->getBody().isInline()); +} MessageMessage::MessageMessage( ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_, @@ -61,7 +63,10 @@ MessageMessage::MessageMessage( requestId(requestId_), transfer(transfer_), reference(reference_) -{} +{ + assert(!transfer->getBody().isInline()); + assert(reference_); +} /** * Currently used by message store impls to recover messages @@ -74,101 +79,100 @@ void MessageMessage::transferMessage( const std::string& consumerTag, uint32_t framesize) { - const framing::Content& body = transfer->getBody(); + const framing::Content& body = transfer->getBody(); + // Send any reference data + ReferencePtr ref= getReference(); + if (ref){ - // Send any reference data - if (!body.isInline()){ - // Open - channel.send(new MessageOpenBody(channel.getVersion(), reference->getId())); - // Appends - for(Reference::Appends::const_iterator a = reference->getAppends().begin(); - a != reference->getAppends().end(); - ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); - // Calculate overhead bytes - // Assume that the overhead is constant as the reference name doesn't change - uint32_t overhead = sizeleft - content.size(); - string::size_type contentStart = 0; - while (sizeleft) { - string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(), - string(content, contentStart, contentSize))); - sizeleft -= contentSize; - contentStart += contentSize; - } - } - } + // Open + channel.send(new MessageOpenBody(channel.getVersion(), ref->getId())); + // Appends + for(Reference::Appends::const_iterator a = ref->getAppends().begin(); + a != ref->getAppends().end(); + ++a) { + uint32_t sizeleft = (*a)->size(); + const string& content = (*a)->getBytes(); + // Calculate overhead bytes + // Assume that the overhead is constant as the reference name doesn't change + uint32_t overhead = sizeleft - content.size(); + string::size_type contentStart = 0; + while (sizeleft) { + string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; + channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize))); + sizeleft -= contentSize; + contentStart += contentSize; + } + } + } - // The transfer - if ( transfer->size()<=framesize ) { + // The transfer + if ( transfer->size()<=framesize ) { channel.send( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body, - transfer->getMandatory())); - } else { - // Thing to do here is to construct a simple reference message then deliver that instead - // fragmentation will be taken care of in the delivery if necessary; - string content = body.getValue(); - string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname), - transfer->getMandatory())); - ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); - newMsg.transferMessage(channel, consumerTag, framesize); - return; - } - // Close any reference data - if (!body.isInline()){ - // Close - channel.send(new MessageCloseBody(channel.getVersion(), reference->getId())); - } + new MessageTransferBody(channel.getVersion(), + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + body, + transfer->getMandatory())); + } else { + // Thing to do here is to construct a simple reference message then deliver that instead + // fragmentation will be taken care of in the delivery if necessary; + string content = body.getValue(); + string refname = "dummy"; + TransferPtr newTransfer( + new MessageTransferBody(channel.getVersion(), + transfer->getTicket(), + consumerTag, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + framing::Content(REFERENCE, refname), + transfer->getMandatory())); + ReferencePtr newRef(new Reference(refname)); + Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); + newRef->append(newAppend); + MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); + newMsg.transferMessage(channel, consumerTag, framesize); + return; + } + // Close any reference data + if (ref) + channel.send(new MessageCloseBody(channel.getVersion(), ref->getId())); } void MessageMessage::deliver( @@ -177,18 +181,18 @@ void MessageMessage::deliver( uint64_t /*deliveryTag*/, uint32_t framesize) { - transferMessage(channel, consumerTag, framesize); + transferMessage(channel, consumerTag, framesize); } void MessageMessage::sendGetOk( const framing::MethodContext& context, - const std::string& destination, + const std::string& destination, uint32_t /*messageCount*/, uint64_t /*deliveryTag*/, uint32_t framesize) { - framing::ChannelAdapter* channel = context.channel; - transferMessage(*channel, destination, framesize); + framing::ChannelAdapter* channel = context.channel; + transferMessage(*channel, destination, framesize); } bool MessageMessage::isComplete() @@ -198,10 +202,12 @@ bool MessageMessage::isComplete() uint64_t MessageMessage::contentSize() const { - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); - else - return reference->getSize(); + if (transfer->getBody().isInline()) + return transfer->getBody().getValue().size(); + else { + assert(getReference()); + return getReference()->getSize(); + } } qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() @@ -249,8 +255,10 @@ void MessageMessage::encodeHeader(Buffer& buffer) const if (transfer->getBody().isInline()) { transfer->encodeContent(buffer); } else { + assert(getReference()); string data; - for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) { + const Reference::Appends& appends = getReference()->getAppends(); + for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { data += (*a)->getBytes(); } framing::Content body(INLINE, data); @@ -302,4 +310,11 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version transfer->getMandatory()); } + +MessageMessage::ReferencePtr MessageMessage::getReference() const { + return reference.lock(); +} + + }} // namespace qpid::broker + diff --git a/cpp/src/broker/BrokerMessageMessage.h b/cpp/src/broker/BrokerMessageMessage.h index 88ac07ecc3..31962d5879 100644 --- a/cpp/src/broker/BrokerMessageMessage.h +++ b/cpp/src/broker/BrokerMessageMessage.h @@ -24,7 +24,7 @@ #include "BrokerMessageBase.h" #include "MessageTransferBody.h" #include "../framing/amqp_types.h" - +#include <boost/weak_ptr.hpp> #include <vector> namespace qpid { @@ -42,16 +42,16 @@ class MessageMessage: public Message{ typedef boost::shared_ptr<MessageMessage> shared_ptr; typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; typedef boost::shared_ptr<Reference> ReferencePtr; - + MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer); MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference); MessageMessage(); // Default destructor okay - framing::RequestId getRequestId() {return requestId; } - TransferPtr getTransfer() { return transfer; } - ReferencePtr getReference() { return reference; } + framing::RequestId getRequestId() const {return requestId; } + TransferPtr getTransfer() const { return transfer; } + ReferencePtr getReference() const ; void deliver(framing::ChannelAdapter& channel, const std::string& consumerTag, @@ -81,16 +81,19 @@ class MessageMessage: public Message{ void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); private: - void transferMessage(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize); - framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version, - const std::string& destination, - const framing::Content& body) const; + void transferMessage( + framing::ChannelAdapter& channel, + const std::string& consumerTag, + uint32_t framesize); + + framing::MessageTransferBody* copyTransfer( + const framing::ProtocolVersion& version, + const std::string& destination, + const framing::Content& body) const; framing::RequestId requestId; const TransferPtr transfer; - const ReferencePtr reference; + const boost::weak_ptr<Reference> reference; }; }} diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp index b179ab8fdd..223f750ef6 100644 --- a/cpp/src/tests/ReferenceTest.cpp +++ b/cpp/src/tests/ReferenceTest.cpp @@ -28,6 +28,7 @@ #include "../broker/CompletionHandler.h" using namespace boost; +using namespace qpid; using namespace qpid::broker; using namespace qpid::framing; using namespace std; @@ -41,22 +42,8 @@ class ReferenceTest : public CppUnit::TestCase ProtocolVersion v; ReferenceRegistry registry; - Reference::shared_ptr r1; - MessageTransferBody::shared_ptr t1, t2; - MessageMessage::shared_ptr m1, m2; - MessageAppendBody::shared_ptr a1, a2; - public: - - ReferenceTest() : - r1(registry.open("bar")), - t1(new MessageTransferBody(v)), - t2(new MessageTransferBody(v)), - m1(new MessageMessage(0, 1, t1, r1)), - m2(new MessageMessage(0, 2, t2, r1)), - a1(new MessageAppendBody(v)), - a2(new MessageAppendBody(v)) - {} + public: void testRegistry() { Reference::shared_ptr ref = registry.open("foo"); CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId()); @@ -69,32 +56,43 @@ class ReferenceTest : public CppUnit::TestCase registry.open("foo"); CPPUNIT_FAIL("Expected exception"); } catch(...) {} + ref->close(); + try { + registry.get("foo"); + CPPUNIT_FAIL("Expected exception"); + } catch(...) {} } void testReference() { + + Reference::shared_ptr r1(registry.open("bar")); + + MessageTransferBody::shared_ptr t1(new MessageTransferBody(v)); + // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up. + const_cast<framing::Content&>(t1->getBody()) = framing::Content(REFERENCE,"bar"); + MessageMessage::shared_ptr m1(new MessageMessage(0, 1, t1, r1)); + + MessageTransferBody::shared_ptr t2(new MessageTransferBody(v)); + const_cast<framing::Content&>(t2->getBody()) = framing::Content(REFERENCE,"bar"); + MessageMessage::shared_ptr m2(new MessageMessage(0, 2, t2, r1)); + + MessageAppendBody::shared_ptr a1(new MessageAppendBody(v)); + MessageAppendBody::shared_ptr a2(new MessageAppendBody(v)); + r1->addMessage(m1); r1->addMessage(m2); CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size()); r1->append(a1); r1->append(a2); CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size()); - const vector<MessageMessage::shared_ptr> messages = r1->getMessages(); r1->close(); - try { - registry.open("bar"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - CPPUNIT_ASSERT_EQUAL(messages[0], m1); - CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[0], a1); - CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[1], a2); + CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[0], a1); + CPPUNIT_ASSERT_EQUAL(m1->getReference()->getAppends()[1], a2); - CPPUNIT_ASSERT_EQUAL(messages[1], m2); - CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[0], a1); - CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[1], a2); + CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[0], a1); + CPPUNIT_ASSERT_EQUAL(m2->getReference()->getAppends()[1], a2); } - - }; // Make this test suite a plugin. |