diff options
-rw-r--r-- | cpp/lib/broker/Broker.cpp | 13 | ||||
-rw-r--r-- | cpp/lib/broker/Broker.h | 4 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 12 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 77 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 14 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.h | 2 | ||||
-rw-r--r-- | cpp/tests/.vg-supp | 412 |
7 files changed, 510 insertions, 24 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index f650452e33..335ce2b3a0 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -47,17 +47,13 @@ const std::string amq_match("amq.match"); Broker::Broker(const Configuration& conf) : config(conf), + store(createStore(conf)), queues(store.get()), timeout(30000), stagingThreshold(0), cleaner(&queues, timeout/10), factory(*this) { - if (config.getStore().empty()) - store.reset(new NullMessageStore(config.isTrace())); - else - store.reset(new MessageStoreModule(config.getStore())); - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); exchanges.declare(amq_topic, TopicExchange::typeName); @@ -84,6 +80,13 @@ Broker::shared_ptr Broker::create(int16_t port) Broker::shared_ptr Broker::create(const Configuration& config) { return Broker::shared_ptr(new Broker(config)); } + +MessageStore* Broker::createStore(const Configuration& config) { + if (config.getStore().empty()) + return new NullMessageStore(config.isTrace()); + else + return new MessageStoreModule(config.getStore()); +} void Broker::run() { getAcceptor().run(&factory); diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index 7c21e90b18..68c04336d8 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -90,13 +90,15 @@ class Broker : public sys::Runnable, Configuration config; sys::Acceptor::shared_ptr acceptor; - std::auto_ptr<MessageStore> store; + const std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; uint32_t timeout; uint64_t stagingThreshold; AutoDelete cleaner; ConnectionFactory factory; + + static MessageStore* createStore(const Configuration& config); }; }} diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 709369ae2f..7739ab19e0 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -121,22 +121,18 @@ class Message { return publisher; } - virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? + virtual void encode(framing::Buffer& buffer) = 0; + virtual void encodeHeader(framing::Buffer& buffer) = 0; /** * @returns the size of the buffer needed to encode this * message in its entirety - * - * XXXX: Only used in tests? */ virtual uint32_t encodedSize() = 0; /** * @returns the size of the buffer needed to encode the * 'header' of this message (not just the header frame, * but other meta data e.g.routing key and exchange) - * - * XXXX: Only used in tests? */ virtual uint32_t encodedHeaderSize() = 0; /** @@ -149,6 +145,10 @@ class Message { * content size else returns 0. */ virtual uint64_t expectedContentSize() = 0; + + virtual void decodeHeader(framing::Buffer& buffer) = 0; + virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0; + // TODO: AMS 29/1/2007 Don't think these are really part of base class diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 3449078d70..a50375cdd3 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -26,6 +26,7 @@ #include "MessageCloseBody.h" #include "MessageAppendBody.h" #include "Reference.h" +#include "framing/AMQFrame.h" #include "framing/FieldTable.h" #include "framing/BasicHeaderProperties.h" @@ -61,6 +62,11 @@ MessageMessage::MessageMessage( reference(reference_) {} +/** + * Currently used by message store impls to recover messages + */ +MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {} + // TODO: astitcher 1-Mar-2007: This code desperately needs better factoring void MessageMessage::transferMessage( framing::ChannelAdapter& channel, @@ -213,27 +219,82 @@ bool MessageMessage::isPersistent() uint32_t MessageMessage::encodedSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return encodedHeaderSize() + encodedContentSize(); } uint32_t MessageMessage::encodedHeaderSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return transfer->size() - transfer->baseSize(); } uint32_t MessageMessage::encodedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; } uint64_t MessageMessage::expectedContentSize() { - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: + return 0; +} + +void MessageMessage::encode(Buffer& buffer) +{ + encodeHeader(buffer); +} + +void MessageMessage::encodeHeader(Buffer& buffer) +{ + if (transfer->getBody().isInline()) { + transfer->encodeContent(buffer); + } else { + string data; + for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) { + data += (*a)->getBytes(); + } + framing::Content body(INLINE, data); + std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body)); + copy->encodeContent(buffer); + } +} + +void MessageMessage::decodeHeader(Buffer& buffer) +{ + transfer->decodeContent(buffer); +} + +void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) +{ } +MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, + const string& destination, + const framing::Content& body) +{ + return new MessageTransferBody(version, + transfer->getTicket(), + destination, + getRedelivered(), + transfer->getImmediate(), + transfer->getTtl(), + transfer->getPriority(), + transfer->getTimestamp(), + transfer->getDeliveryMode(), + transfer->getExpiration(), + getExchange(), + getRoutingKey(), + transfer->getMessageId(), + transfer->getCorrelationId(), + transfer->getReplyTo(), + transfer->getContentType(), + transfer->getContentEncoding(), + transfer->getUserId(), + transfer->getAppId(), + transfer->getTransactionId(), + transfer->getSecurityToken(), + transfer->getApplicationHeaders(), + body, + transfer->getMandatory()); + +} }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index 8a2ff3a063..a13a63a416 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -45,6 +45,7 @@ class MessageMessage: public Message{ MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer); MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference); + MessageMessage(); // Default destructor okay @@ -70,15 +71,22 @@ class MessageMessage: public Message{ const framing::FieldTable& getApplicationHeaders(); bool isPersistent(); + void encode(framing::Buffer& buffer); + void encodeHeader(framing::Buffer& buffer); uint32_t encodedSize(); uint32_t encodedHeaderSize(); uint32_t encodedContentSize(); uint64_t expectedContentSize(); + void decodeHeader(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); private: - void transferMessage(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize); + void transferMessage(framing::ChannelAdapter& channel, + const std::string& consumerTag, + uint32_t framesize); + framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version, + const std::string& destination, + const framing::Content& body); framing::RequestId requestId; const TransferPtr transfer; diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index e184fff1d6..f21659a57a 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -63,8 +63,8 @@ class AMQRequestBody : public AMQMethodBody void setResponseMark(ResponseId mark) { data.responseMark=mark; } bool isRequest()const { return true; } - protected: static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; } + protected: void printPrefix(std::ostream& out) const; private: diff --git a/cpp/tests/.vg-supp b/cpp/tests/.vg-supp index e107377aa2..66779c30e1 100644 --- a/cpp/tests/.vg-supp +++ b/cpp/tests/.vg-supp @@ -4703,3 +4703,415 @@ obj:* obj:* } +{ + x26763_1 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit21TestCaseMethodFunctorclEv + fun:_ZN7CppUnit16DefaultProtector7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZNK7CppUnit14ProtectorChain14ProtectFunctorclEv + fun:_ZN7CppUnit14ProtectorChain7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZN7CppUnit10TestResult7protectERKNS_7FunctorEPNS_4TestERKSs + fun:_ZN7CppUnit8TestCase3runEPNS_10TestResultE +} +{ + x26763_2 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit21TestCaseMethodFunctorclEv + fun:_ZN7CppUnit16DefaultProtector7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZNK7CppUnit14ProtectorChain14ProtectFunctorclEv + fun:_ZN7CppUnit14ProtectorChain7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZN7CppUnit10TestResult7protectERKNS_7FunctorEPNS_4TestERKSs + fun:_ZN7CppUnit8TestCase3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite3runEPNS_10TestResultE +} +{ + x26763_3 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_4 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_5 + Memcheck:Leak + fun:malloc + fun:_dl_map_object_deps + fun:dl_open_worker + fun:_dl_catch_error + fun:_dl_open + fun:do_dlopen + fun:_dl_catch_error + fun:__libc_dlopen_mode + fun:pthread_cancel_init + fun:_Unwind_ForcedUnwind + fun:__pthread_unwind + fun:pthread_exit + fun:pthread_exit + obj:* + obj:* + obj:* + fun:start_thread + fun:clone +} +{ + x26763_6 + Memcheck:Leak + fun:_Znwj + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_7 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_8 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_9 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_10 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_11 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser +} +{ + x26763_12 + Memcheck:Leak + fun:calloc + fun:_dl_allocate_tls + fun:pthread_create@@GLIBC_2.1 + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_13 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_14 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit21TestCaseMethodFunctorclEv + fun:_ZN7CppUnit16DefaultProtector7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZNK7CppUnit14ProtectorChain14ProtectFunctorclEv + fun:_ZN7CppUnit14ProtectorChain7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZN7CppUnit10TestResult7protectERKNS_7FunctorEPNS_4TestERKSs + fun:_ZN7CppUnit8TestCase3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE +} +{ + x26763_15 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit21TestCaseMethodFunctorclEv + fun:_ZN7CppUnit16DefaultProtector7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZNK7CppUnit14ProtectorChain14ProtectFunctorclEv + fun:_ZN7CppUnit14ProtectorChain7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZN7CppUnit10TestResult7protectERKNS_7FunctorEPNS_4TestERKSs + fun:_ZN7CppUnit8TestCase3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite3runEPNS_10TestResultE + fun:_ZN7CppUnit10TestRunner13WrappingSuite3runEPNS_10TestResultE + fun:_ZN7CppUnit10TestResult7runTestEPNS_4TestE + fun:_ZN7CppUnit10TestRunner3runERNS_10TestResultERKSs + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_16 + Memcheck:Leak + fun:_Znwj + fun:_ZNSs4_Rep9_S_createEjjRKSaIcE + obj:/usr/lib/libstdc++.so.6.0.8 + fun:_ZNSsC1EPKcRKSaIcE + obj:* + obj:* + obj:* + fun:call_init + fun:_dl_init + fun:dl_open_worker + fun:_dl_catch_error + fun:_dl_open + fun:dlopen_doit + fun:_dl_catch_error + fun:_dlerror_run + fun:dlopen@GLIBC_2.0 + fun:_ZN7CppUnit21DynamicLibraryManager13doLoadLibraryERKSs + fun:_ZN7CppUnit21DynamicLibraryManager11loadLibraryERKSs + fun:_ZN7CppUnit21DynamicLibraryManagerC1ERKSs + fun:_ZN7CppUnit13PlugInManager4loadERKSsRKNS_16PlugInParametersE + fun:_Z8runTestsRK17CommandLineParser + fun:main +} +{ + x26763_17 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit21TestCaseMethodFunctorclEv + fun:_ZN7CppUnit16DefaultProtector7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZNK7CppUnit14ProtectorChain14ProtectFunctorclEv + fun:_ZN7CppUnit14ProtectorChain7protectERKNS_7FunctorERKNS_16ProtectorContextE + fun:_ZN7CppUnit10TestResult7protectERKNS_7FunctorEPNS_4TestERKSs + fun:_ZN7CppUnit8TestCase3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite3runEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite15doRunChildTestsEPNS_10TestResultE + fun:_ZN7CppUnit13TestComposite3runEPNS_10TestResultE + fun:_ZN7CppUnit10TestRunner13WrappingSuite3runEPNS_10TestResultE +} +{ + x26763_18 + Memcheck:Leak + fun:_Znwj + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + obj:* + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + obj:* + obj:* + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv +} |