diff options
author | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
commit | d490fba74749bcde972e5a0d95f84b165f8ea05e (patch) | |
tree | ffc58006adb15ec8fa29955911f5f3a0f02dfa69 | |
parent | e4aee82085958588458ba34d2bf7dd0db90a257d (diff) | |
download | qpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz |
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store
have the same headers and content-size for an updatee or a broker that
receives the publish directly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/cluster.cmake | 1 | ||||
-rw-r--r-- | cpp/src/cluster.mk | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 6 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 28 | ||||
-rw-r--r-- | cpp/src/tests/test_store.cpp | 37 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 9 |
17 files changed, 146 insertions, 37 deletions
diff --git a/cpp/src/cluster.cmake b/cpp/src/cluster.cmake index 2c7b108e71..6552c39f12 100644 --- a/cpp/src/cluster.cmake +++ b/cpp/src/cluster.cmake @@ -109,6 +109,7 @@ if (BUILD_CLUSTER) qpid/cluster/ExpiryPolicy.cpp qpid/cluster/FailoverExchange.cpp qpid/cluster/FailoverExchange.h + qpid/cluster/UpdateExchange.cpp qpid/cluster/UpdateExchange.h qpid/cluster/UpdateReceiver.h qpid/cluster/LockedConnectionMap.h diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 9da6578df0..081889130e 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -69,6 +69,7 @@ cluster_la_SOURCES = \ qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ qpid/cluster/UpdateExchange.h \ + qpid/cluster/UpdateExchange.cpp \ qpid/cluster/UpdateReceiver.h \ qpid/cluster/LockedConnectionMap.h \ qpid/cluster/Multicaster.cpp \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 3c67c429a0..849bf6d1f5 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -154,6 +154,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), + clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), connectionCounter(conf.maxConnections), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 73d5860cb3..b85aa7d96c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -168,8 +168,10 @@ public: std::vector<Url> getKnownBrokersImpl(); std::string federationTag; bool recovery; + bool clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; + public: virtual ~Broker(); @@ -259,6 +261,9 @@ public: void setRecovery(bool set) { recovery = set; } bool getRecovery() const { return recovery; } + void setClusterUpdatee(bool set) { clusterUpdatee = set; } + bool isClusterUpdatee() const { return clusterUpdatee; } + management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } ConnectionCounter& getConnectionCounter() {return connectionCounter;} diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e2799b0bff..47ca7a7ae8 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -425,19 +425,4 @@ framing::FieldTable& Message::getOrInsertHeaders() return getProperties<MessageProperties>()->getApplicationHeaders(); } - -void Message::setUpdateDestination(const std::string& d) -{ - updateDestination = d; -} - - -bool Message::isUpdateMessage() -{ - return updateDestination.size() && isA<MessageTransferBody>() - && getMethod<MessageTransferBody>()->getDestination() == updateDestination; -} - -std::string Message::updateDestination; - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 3894960c95..375fa9ce26 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -104,6 +104,10 @@ public: return frames.as<T>(); } + template <class T> T* getMethod() { + return frames.as<T>(); + } + template <class T> bool isA() const { return frames.isA<T>(); } @@ -157,9 +161,6 @@ public: void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); - bool isUpdateMessage(); - static void setUpdateDestination(const std::string&); - private: typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; @@ -190,7 +191,6 @@ public: MessageCallback* dequeueCallback; uint32_t requiredCredit; - static std::string updateDestination; }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 780c254a56..ef1adaf7ec 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -598,7 +598,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ string key = ft->getAsString(qpidVQMatchProperty); i = lvq.find(key); - if (i == lvq.end() || msg->isUpdateMessage()){ + if (i == lvq.end() || (broker && broker->isClusterUpdatee())) { messages.push_back(qm); listeners.populate(copy); lvq[key] = msg; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f877720350..d049001eb0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -619,6 +619,7 @@ void Cluster::initMapCompleted(Lock& l) { if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. + broker.setClusterUpdatee(true); state = JOINER; } else { // I can go ready. @@ -813,6 +814,7 @@ void Cluster::checkUpdateIn(Lock& l) { memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + broker.setClusterUpdatee(false); discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 7f0a2752b0..e4aee6730b 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -139,7 +139,6 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - broker::Message::setUpdateDestination(UpdateClient::UPDATE); ManagementAgent* mgmt = broker->getManagementAgent(); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index f263577fd3..279284da2c 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -217,10 +217,11 @@ class MessageUpdater { // Disable client code that clears the delivery-properties.exchange sb.get()->setDoClearDeliveryPropertiesExchange(false); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, - message::ACQUIRE_MODE_PRE_ACQUIRED); + *message.payload->getFrames().as<framing::MessageTransferBody>()); + transfer.setDestination(UpdateClient::UPDATE); - sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); + sb.get()->send(transfer, message.payload->getFrames(), + !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp new file mode 100644 index 0000000000..11937f296f --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/broker/Message.h" +#include "UpdateExchange.h" + +namespace qpid { +namespace cluster { + +using framing::MessageTransferBody; +using framing::DeliveryProperties; + +UpdateExchange::UpdateExchange(management::Manageable* parent) + : broker::Exchange(UpdateClient::UPDATE, parent), + broker::FanOutExchange(UpdateClient::UPDATE, parent) {} + + +void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { + MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); + assert(transfer); + const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); + assert(props); + if (props->hasExchange()) + transfer->setDestination(props->getExchange()); + else + transfer->clearDestinationFlag(); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h index 00a92c7f1e..9d7d9ee5fc 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.h +++ b/cpp/src/qpid/cluster/UpdateExchange.h @@ -30,14 +30,14 @@ namespace qpid { namespace cluster { /** - * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange - * on messages. + * A keyless exchange (like fanout exchange) that does not modify + * delivery-properties.exchange but copies it to the MessageTransfer. */ class UpdateExchange : public broker::FanOutExchange { public: - UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {} - void setProperties(const boost::intrusive_ptr<broker::Message>&) {} + UpdateExchange(management::Manageable* parent); + void setProperties(const boost::intrusive_ptr<broker::Message>&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index f50035e49a..c03dd39458 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -52,6 +52,11 @@ const AMQMethodBody* FrameSet::getMethod() const return parts.empty() ? 0 : parts[0].getMethod(); } +AMQMethodBody* FrameSet::getMethod() +{ + return parts.empty() ? 0 : parts[0].getMethod(); +} + const AMQHeaderBody* FrameSet::getHeaders() const { return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index e3e8727600..398a709353 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -57,6 +57,7 @@ public: bool isContentBearing() const; QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const; + QPID_COMMON_EXTERN AMQMethodBody* getMethod(); QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const; QPID_COMMON_EXTERN AMQHeaderBody* getHeaders(); @@ -70,6 +71,11 @@ public: return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; } + template <class T> T* as() { + AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0; + } + template <class T> const T* getHeaderProperties() const { const AMQHeaderBody* header = getHeaders(); return header ? header->get<T>() : 0; diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index e57a826000..178271e977 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -57,6 +57,34 @@ class ShortTests(BrokerTest): self.assertEqual("y", m.content) s2.connection.close() + def test_store_direct_update_match(self): + """Verify that brokers stores an identical message whether they receive it + direct from clients or during an update, no header or other differences""" + cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) + cluster.start(args=["--test-store-dump", "direct.dump"]) + # Try messages with various headers + cluster[0].send_message("q", Message(durable=True, content="foobar", + subject="subject", + reply_to="reply_to", + properties={"n":10})) + # Try messages of different sizes + for size in range(0,10000,100): + cluster[0].send_message("q", Message(content="x"*size, durable=True)) + # Try sending via named exchange + c = cluster[0].connect_old() + s = c.session(str(qpid.datatypes.uuid4())) + s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") + props = s.delivery_properties(routing_key="foo", delivery_mode=2) + s.message_transfer( + destination="amq.direct", + message=qpid.datatypes.Message(props, "content")) + + # Now update a new member and compare their dumps. + cluster.start(args=["--test-store-dump", "updatee.dump"]) + assert file("direct.dump").read() == file("updatee.dump").read() + os.remove("direct.dump") + os.remove("updatee.dump") + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp index c675c6daa3..257e77b6b4 100644 --- a/cpp/src/tests/test_store.cpp +++ b/cpp/src/tests/test_store.cpp @@ -34,11 +34,14 @@ #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/Broker.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include <boost/cast.hpp> #include <boost/lexical_cast.hpp> +#include <memory> +#include <fstream> using namespace qpid; using namespace broker; @@ -51,10 +54,13 @@ namespace tests { struct TestStoreOptions : public Options { string name; + string dump; TestStoreOptions() : Options("Test Store Options") { addOptions() - ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance."); + ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") + ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ; } }; @@ -71,19 +77,38 @@ struct Completer : public Runnable { class TestStore : public NullMessageStore { public: - TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {} + TestStore(const TestStoreOptions& opts, Broker& broker_) + : options(opts), name(opts.name), broker(broker_) + { + QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); + if (!options.dump.empty()) + dump.reset(new ofstream(options.dump.c_str())); + } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } + virtual bool isNull() const { return false; } + void enqueue(TransactionContext* , - const boost::intrusive_ptr<PersistableMessage>& msg, + const boost::intrusive_ptr<PersistableMessage>& pmsg, const PersistableQueue& ) { - string data = boost::polymorphic_downcast<Message*>(msg.get())->getFrames().getContent(); + Message* msg = dynamic_cast<Message*>(pmsg.get()); + assert(msg); + + // Dump the message if there is a dump file. + if (dump.get()) { + msg->getFrames().getMethod()->print(*dump); + *dump << endl << " "; + msg->getFrames().getHeaders()->print(*dump); + *dump << endl << " "; + *dump << msg->getFrames().getContentSize() << endl; + } // Check the message for special instructions. + string data = msg->getFrames().getContent(); size_t i = string::npos; size_t j = string::npos; if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 @@ -119,9 +144,11 @@ class TestStore : public NullMessageStore { private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; + TestStoreOptions options; string name; Broker& broker; vector<Thread> threads; + std::auto_ptr<ofstream> dump; }; const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; @@ -139,7 +166,7 @@ struct TestStorePlugin : public Plugin { { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - boost::shared_ptr<MessageStore> p(new TestStore(options.name, *broker)); + boost::shared_ptr<MessageStore> p(new TestStore(options, *broker)); broker->setStore (p); } diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 9fa79a220b..83d6c44d84 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -259,15 +259,15 @@ class Cluster: self.args += [ "--load-module", BrokerTest.cluster_lib ] self.start_n(count, expect=expect, wait=wait) - def start(self, name=None, expect=EXPECT_RUNNING, wait=True): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) log.debug("Cluster %s starting member %s" % (self.name, name)) - self._brokers.append(self.test.broker(self.args, name, expect, wait)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait)) return self._brokers[-1] - def start_n(self, count, expect=EXPECT_RUNNING, wait=True): - for i in range(count): self.start(expect=expect, wait=wait) + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): + for i in range(count): self.start(expect=expect, wait=wait, args=args) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -289,6 +289,7 @@ class BrokerTest(TestCase): receiver_exec = os.getenv("RECEIVER_EXEC") sender_exec = os.getenv("SENDER_EXEC") store_lib = os.getenv("STORE_LIB") + test_store_lib = os.getenv("TEST_STORE_LIB") rootdir = os.getcwd() def configure(self, config): self.config=config |