diff options
author | Kim van der Riet <kpvdr@apache.org> | 2010-04-15 15:57:46 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2010-04-15 15:57:46 +0000 |
commit | 0d368a574d1898a1dc7ec04847728d63ce74ee5e (patch) | |
tree | 8d4b3b05faa0c1d4456031423911c3cb974beff1 /cpp/src | |
parent | 60da39342e42ebd137853034d786220edcd88b0b (diff) | |
download | qpid-python-0d368a574d1898a1dc7ec04847728d63ce74ee5e.tar.gz |
Implementation of QPID-2509 (Remove message staging from C++ broker)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934463 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessageBuilderTest.cpp | 67 |
9 files changed, 17 insertions, 110 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 24c5a0c049..82d38a2d0b 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -81,7 +81,6 @@ Broker::Options::Options(const std::string& name) : workerThreads(5), maxConnections(500), connectionBacklog(10), - stagingThreshold(5000000), enableMgmt(1), mgmtPubInterval(10), queueCleanInterval(60*10),//10 minutes @@ -113,7 +112,6 @@ Broker::Options::Options(const std::string& name) : ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") - ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") @@ -178,7 +176,6 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_workerThreads(conf.workerThreads); mgmtObject->set_maxConns(conf.maxConnections); mgmtObject->set_connBacklog(conf.connectionBacklog); - mgmtObject->set_stagingThreshold(conf.stagingThreshold); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) @@ -223,8 +220,7 @@ Broker::Broker(const Broker::Options& conf) : // The cluster plug-in will setRecovery(false) on all but the first // broker to join a cluster. if (getRecovery()) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, - conf.stagingThreshold); + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); store->recover(recoverer); } else { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index f9be992f0c..f55f94bc8e 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -99,7 +99,6 @@ public: int workerThreads; int maxConnections; int connectionBacklog; - uint64_t stagingThreshold; bool enableMgmt; uint16_t mgmtPubInterval; uint16_t queueCleanInterval; @@ -205,7 +204,6 @@ public: QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } LinkRegistry& getLinks() { return links; } - uint64_t getStagingThreshold() { return config.stagingThreshold; } DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 77ac5a59b0..19caacb595 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -46,7 +46,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable framemax(65535), heartbeat(0), heartbeatmax(120), - stagingThreshold(broker.getStagingThreshold()), federationLink(true), clientSupportsThrottling(false), clusterOrderOut(0) @@ -57,12 +56,10 @@ class ConnectionState : public ConnectionToken, public management::Manageable uint32_t getFrameMax() const { return framemax; } uint16_t getHeartbeat() const { return heartbeat; } uint16_t getHeartbeatMax() const { return heartbeatmax; } - uint64_t getStagingThreshold() const { return stagingThreshold; } void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); } void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } - void setStagingThreshold(uint64_t st) { stagingThreshold = st; } virtual void setUserId(const string& uid) { userId = uid; } const string& getUserId() const { return userId; } @@ -107,7 +104,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable uint32_t framemax; uint16_t heartbeat; uint16_t heartbeatmax; - uint64_t stagingThreshold; string userId; string url; bool federationLink; diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index b1a2b77b05..a6d605c296 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -36,8 +36,8 @@ namespace const std::string QPID_MANAGEMENT("qpid.management"); } -MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : - state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {} +MessageBuilder::MessageBuilder(MessageStore* const _store) : + state(DORMANT), store(_store) {} void MessageBuilder::handle(AMQFrame& frame) { @@ -68,29 +68,13 @@ void MessageBuilder::handle(AMQFrame& frame) default: throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); } - if (staging) { - intrusive_ptr<const PersistableMessage> cpmsg = boost::static_pointer_cast<const PersistableMessage>(message); - store->appendContent(cpmsg, frame.castBody<AMQContentBody>()->getData()); - } else { - message->getFrames().append(frame); - //have we reached the staging limit? if so stage message and release content - if (state == CONTENT - && stagingThreshold - && message->getFrames().getContentSize() >= stagingThreshold - && !NullMessageStore::isNullStore(store) - && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */) - { - message->releaseContent(); - staging = true; - } - } + message->getFrames().append(frame); } void MessageBuilder::end() { message = 0; state = DORMANT; - staging = false; } void MessageBuilder::start(const SequenceNumber& id) @@ -98,7 +82,6 @@ void MessageBuilder::start(const SequenceNumber& id) message = intrusive_ptr<Message>(new Message(id)); message->setStore(store); state = METHOD; - staging = false; } namespace { diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index e63c108097..75dfd6781d 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -35,8 +35,7 @@ namespace qpid { class MessageBuilder : public framing::FrameHandler{ public: - QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store, - uint64_t stagingThreshold); + QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store); QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame); boost::intrusive_ptr<Message> getMessage() { return message; } QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id); @@ -46,8 +45,6 @@ namespace qpid { State state; boost::intrusive_ptr<Message> message; MessageStore* const store; - const uint64_t stagingThreshold; - bool staging; void checkType(uint8_t expected, uint8_t actual); }; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 12ac2d2bfd..dd4b7543af 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -35,17 +35,16 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr, uint64_t _stagingThreshold) - : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} + DtxManager& _dtxMgr) + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} class RecoverableMessageImpl : public RecoverableMessage { intrusive_ptr<Message> msg; - const uint64_t stagingThreshold; public: - RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold); + RecoverableMessageImpl(const intrusive_ptr<Message>& _msg); ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); void setRedelivered(); @@ -130,7 +129,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff { boost::intrusive_ptr<Message> message(new Message()); message->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message)); } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, @@ -161,16 +160,16 @@ void RecoveryManagerImpl::recoveryComplete() exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); } -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t _stagingThreshold) : msg(_msg), stagingThreshold(_stagingThreshold) +RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg) { if (!msg->isPersistent()) { msg->forcePersistent(); // set so that message will get dequeued from store. } } -bool RecoverableMessageImpl::loadContent(uint64_t available) +bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) { - return !stagingThreshold || available < stagingThreshold; + return true; } void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer) diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index 6fbbfc4a6c..1ad7892b13 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -36,10 +36,9 @@ namespace broker { ExchangeRegistry& exchanges; LinkRegistry& links; DtxManager& dtxMgr; - const uint64_t stagingThreshold; public: RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links, - DtxManager& dtxMgr, uint64_t stagingThreshold); + DtxManager& dtxMgr); ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 4c5aaf7fc4..4d5fe14690 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -58,7 +58,7 @@ SessionState::SessionState( broker(b), handler(&h), semanticState(*this, *this), adapter(semanticState), - msgBuilder(&broker.getStore(), broker.getStagingThreshold()), + msgBuilder(&broker.getStore()), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), mgmtObject(0), rateFlowcontrol(0) diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index c2fb8ad32e..c3d40ed88a 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -97,7 +97,7 @@ QPID_AUTO_TEST_SUITE(MessageBuilderTestSuite) QPID_AUTO_TEST_CASE(testHeaderOnly) { - MessageBuilder builder(0, 0); + MessageBuilder builder(0); builder.start(SequenceNumber()); std::string exchange("builder-exchange"); @@ -120,7 +120,7 @@ QPID_AUTO_TEST_CASE(testHeaderOnly) QPID_AUTO_TEST_CASE(test1ContentFrame) { - MessageBuilder builder(0, 0); + MessageBuilder builder(0); builder.start(SequenceNumber()); std::string data("abcdefg"); @@ -153,7 +153,7 @@ QPID_AUTO_TEST_CASE(test1ContentFrame) QPID_AUTO_TEST_CASE(test2ContentFrames) { - MessageBuilder builder(0, 0); + MessageBuilder builder(0); builder.start(SequenceNumber()); std::string data1("abcdefg"); @@ -185,67 +185,6 @@ QPID_AUTO_TEST_CASE(test2ContentFrames) BOOST_CHECK(builder.getMessage()); BOOST_CHECK(builder.getMessage()->getFrames().isComplete()); } - -QPID_AUTO_TEST_CASE(testStaging) -{ - MockMessageStore store; - MessageBuilder builder(&store, 5); - builder.start(SequenceNumber()); - - std::string data1("abcdefg"); - std::string data2("hijklmn"); - std::string exchange("builder-exchange"); - std::string key("builder-exchange"); - - AMQFrame method(MessageTransferBody(ProtocolVersion(), exchange, 0, 0)); - AMQFrame header((AMQHeaderBody())); - AMQFrame content1((AMQContentBody(data1))); - AMQFrame content2((AMQContentBody(data2))); - - header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); - header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); - - builder.handle(method); - builder.handle(header); - - store.expectStage(*builder.getMessage()); - builder.handle(content1); - BOOST_CHECK(store.expectationsMet()); - BOOST_CHECK_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId()); - - store.expectAppendContent(*builder.getMessage(), data2); - builder.handle(content2); - BOOST_CHECK(store.expectationsMet()); - //were the content frames dropped? - BOOST_CHECK(!builder.getMessage()->isContentLoaded()); -} - -QPID_AUTO_TEST_CASE(testNoManagementStaging) -{ - // Make sure management messages don't stage - MockMessageStore store; - MessageBuilder builder(&store, 5); - builder.start(SequenceNumber()); - - std::string data1("abcdefg"); - std::string exchange("qpid.management"); - std::string key("builder-exchange"); - - AMQFrame method(MessageTransferBody(ProtocolVersion(), exchange, 0, 0)); - AMQFrame header((AMQHeaderBody())); - AMQFrame content1((AMQContentBody(data1))); - - header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size()); - header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); - - builder.handle(method); - builder.handle(header); - - builder.handle(content1); - BOOST_CHECK(store.expectationsMet()); - BOOST_CHECK_EQUAL((uint64_t) 0, builder.getMessage()->getPersistenceId()); -} - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |