summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
committerAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
commitd490fba74749bcde972e5a0d95f84b165f8ea05e (patch)
treeffc58006adb15ec8fa29955911f5f3a0f02dfa69
parente4aee82085958588458ba34d2bf7dd0db90a257d (diff)
downloadqpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store have the same headers and content-size for an updatee or a broker that receives the publish directly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/cluster.cmake1
-rw-r--r--cpp/src/cluster.mk1
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/Message.cpp15
-rw-r--r--cpp/src/qpid/broker/Message.h8
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp7
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.cpp47
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.h8
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp5
-rw-r--r--cpp/src/qpid/framing/FrameSet.h6
-rwxr-xr-xcpp/src/tests/cluster_tests.py28
-rw-r--r--cpp/src/tests/test_store.cpp37
-rw-r--r--python/qpid/brokertest.py9
17 files changed, 146 insertions, 37 deletions
diff --git a/cpp/src/cluster.cmake b/cpp/src/cluster.cmake
index 2c7b108e71..6552c39f12 100644
--- a/cpp/src/cluster.cmake
+++ b/cpp/src/cluster.cmake
@@ -109,6 +109,7 @@ if (BUILD_CLUSTER)
qpid/cluster/ExpiryPolicy.cpp
qpid/cluster/FailoverExchange.cpp
qpid/cluster/FailoverExchange.h
+ qpid/cluster/UpdateExchange.cpp
qpid/cluster/UpdateExchange.h
qpid/cluster/UpdateReceiver.h
qpid/cluster/LockedConnectionMap.h
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 9da6578df0..081889130e 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -69,6 +69,7 @@ cluster_la_SOURCES = \
qpid/cluster/FailoverExchange.cpp \
qpid/cluster/FailoverExchange.h \
qpid/cluster/UpdateExchange.h \
+ qpid/cluster/UpdateExchange.cpp \
qpid/cluster/UpdateReceiver.h \
qpid/cluster/LockedConnectionMap.h \
qpid/cluster/Multicaster.cpp \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 3c67c429a0..849bf6d1f5 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -154,6 +154,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
+ clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 73d5860cb3..b85aa7d96c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -168,8 +168,10 @@ public:
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
bool recovery;
+ bool clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+
public:
virtual ~Broker();
@@ -259,6 +261,9 @@ public:
void setRecovery(bool set) { recovery = set; }
bool getRecovery() const { return recovery; }
+ void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ bool isClusterUpdatee() const { return clusterUpdatee; }
+
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e2799b0bff..47ca7a7ae8 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -425,19 +425,4 @@ framing::FieldTable& Message::getOrInsertHeaders()
return getProperties<MessageProperties>()->getApplicationHeaders();
}
-
-void Message::setUpdateDestination(const std::string& d)
-{
- updateDestination = d;
-}
-
-
-bool Message::isUpdateMessage()
-{
- return updateDestination.size() && isA<MessageTransferBody>()
- && getMethod<MessageTransferBody>()->getDestination() == updateDestination;
-}
-
-std::string Message::updateDestination;
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 3894960c95..375fa9ce26 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -104,6 +104,10 @@ public:
return frames.as<T>();
}
+ template <class T> T* getMethod() {
+ return frames.as<T>();
+ }
+
template <class T> bool isA() const {
return frames.isA<T>();
}
@@ -157,9 +161,6 @@ public:
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
- bool isUpdateMessage();
- static void setUpdateDestination(const std::string&);
-
private:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
@@ -190,7 +191,6 @@ public:
MessageCallback* dequeueCallback;
uint32_t requiredCredit;
- static std::string updateDestination;
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 780c254a56..ef1adaf7ec 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -598,7 +598,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
string key = ft->getAsString(qpidVQMatchProperty);
i = lvq.find(key);
- if (i == lvq.end() || msg->isUpdateMessage()){
+ if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
messages.push_back(qm);
listeners.populate(copy);
lvq[key] = msg;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f877720350..d049001eb0 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -619,6 +619,7 @@ void Cluster::initMapCompleted(Lock& l) {
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
+ broker.setClusterUpdatee(true);
state = JOINER;
}
else { // I can go ready.
@@ -813,6 +814,7 @@ void Cluster::checkUpdateIn(Lock& l) {
memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ broker.setClusterUpdatee(false);
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
deliverEventQueue.start();
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 7f0a2752b0..e4aee6730b 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -139,7 +139,6 @@ struct ClusterPlugin : public Plugin {
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
- broker::Message::setUpdateDestination(UpdateClient::UPDATE);
ManagementAgent* mgmt = broker->getManagementAgent();
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index f263577fd3..279284da2c 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -217,10 +217,11 @@ class MessageUpdater {
// Disable client code that clears the delivery-properties.exchange
sb.get()->setDoClearDeliveryPropertiesExchange(false);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
- message::ACQUIRE_MODE_PRE_ACQUIRED);
+ *message.payload->getFrames().as<framing::MessageTransferBody>());
+ transfer.setDestination(UpdateClient::UPDATE);
- sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased());
+ sb.get()->send(transfer, message.payload->getFrames(),
+ !message.payload->isContentReleased());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp
new file mode 100644
index 0000000000..11937f296f
--- /dev/null
+++ b/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/broker/Message.h"
+#include "UpdateExchange.h"
+
+namespace qpid {
+namespace cluster {
+
+using framing::MessageTransferBody;
+using framing::DeliveryProperties;
+
+UpdateExchange::UpdateExchange(management::Manageable* parent)
+ : broker::Exchange(UpdateClient::UPDATE, parent),
+ broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
+
+
+void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+ MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
+ assert(transfer);
+ const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
+ assert(props);
+ if (props->hasExchange())
+ transfer->setDestination(props->getExchange());
+ else
+ transfer->clearDestinationFlag();
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h
index 00a92c7f1e..9d7d9ee5fc 100644
--- a/cpp/src/qpid/cluster/UpdateExchange.h
+++ b/cpp/src/qpid/cluster/UpdateExchange.h
@@ -30,14 +30,14 @@ namespace qpid {
namespace cluster {
/**
- * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange
- * on messages.
+ * A keyless exchange (like fanout exchange) that does not modify
+ * delivery-properties.exchange but copies it to the MessageTransfer.
*/
class UpdateExchange : public broker::FanOutExchange
{
public:
- UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
- void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
+ UpdateExchange(management::Manageable* parent);
+ void setProperties(const boost::intrusive_ptr<broker::Message>&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index f50035e49a..c03dd39458 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -52,6 +52,11 @@ const AMQMethodBody* FrameSet::getMethod() const
return parts.empty() ? 0 : parts[0].getMethod();
}
+AMQMethodBody* FrameSet::getMethod()
+{
+ return parts.empty() ? 0 : parts[0].getMethod();
+}
+
const AMQHeaderBody* FrameSet::getHeaders() const
{
return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index e3e8727600..398a709353 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -57,6 +57,7 @@ public:
bool isContentBearing() const;
QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const;
+ QPID_COMMON_EXTERN AMQMethodBody* getMethod();
QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const;
QPID_COMMON_EXTERN AMQHeaderBody* getHeaders();
@@ -70,6 +71,11 @@ public:
return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
}
+ template <class T> T* as() {
+ AMQMethodBody* method = getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0;
+ }
+
template <class T> const T* getHeaderProperties() const {
const AMQHeaderBody* header = getHeaders();
return header ? header->get<T>() : 0;
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index e57a826000..178271e977 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -57,6 +57,34 @@ class ShortTests(BrokerTest):
self.assertEqual("y", m.content)
s2.connection.close()
+ def test_store_direct_update_match(self):
+ """Verify that brokers stores an identical message whether they receive it
+ direct from clients or during an update, no header or other differences"""
+ cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
+ cluster.start(args=["--test-store-dump", "direct.dump"])
+ # Try messages with various headers
+ cluster[0].send_message("q", Message(durable=True, content="foobar",
+ subject="subject",
+ reply_to="reply_to",
+ properties={"n":10}))
+ # Try messages of different sizes
+ for size in range(0,10000,100):
+ cluster[0].send_message("q", Message(content="x"*size, durable=True))
+ # Try sending via named exchange
+ c = cluster[0].connect_old()
+ s = c.session(str(qpid.datatypes.uuid4()))
+ s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
+ props = s.delivery_properties(routing_key="foo", delivery_mode=2)
+ s.message_transfer(
+ destination="amq.direct",
+ message=qpid.datatypes.Message(props, "content"))
+
+ # Now update a new member and compare their dumps.
+ cluster.start(args=["--test-store-dump", "updatee.dump"])
+ assert file("direct.dump").read() == file("updatee.dump").read()
+ os.remove("direct.dump")
+ os.remove("updatee.dump")
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp
index c675c6daa3..257e77b6b4 100644
--- a/cpp/src/tests/test_store.cpp
+++ b/cpp/src/tests/test_store.cpp
@@ -34,11 +34,14 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include <boost/cast.hpp>
#include <boost/lexical_cast.hpp>
+#include <memory>
+#include <fstream>
using namespace qpid;
using namespace broker;
@@ -51,10 +54,13 @@ namespace tests {
struct TestStoreOptions : public Options {
string name;
+ string dump;
TestStoreOptions() : Options("Test Store Options") {
addOptions()
- ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+ ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
+ ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+ ;
}
};
@@ -71,19 +77,38 @@ struct Completer : public Runnable {
class TestStore : public NullMessageStore {
public:
- TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+ TestStore(const TestStoreOptions& opts, Broker& broker_)
+ : options(opts), name(opts.name), broker(broker_)
+ {
+ QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
+ if (!options.dump.empty())
+ dump.reset(new ofstream(options.dump.c_str()));
+ }
~TestStore() {
for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
}
+ virtual bool isNull() const { return false; }
+
void enqueue(TransactionContext* ,
- const boost::intrusive_ptr<PersistableMessage>& msg,
+ const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& )
{
- string data = boost::polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+ Message* msg = dynamic_cast<Message*>(pmsg.get());
+ assert(msg);
+
+ // Dump the message if there is a dump file.
+ if (dump.get()) {
+ msg->getFrames().getMethod()->print(*dump);
+ *dump << endl << " ";
+ msg->getFrames().getHeaders()->print(*dump);
+ *dump << endl << " ";
+ *dump << msg->getFrames().getContentSize() << endl;
+ }
// Check the message for special instructions.
+ string data = msg->getFrames().getContent();
size_t i = string::npos;
size_t j = string::npos;
if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
@@ -119,9 +144,11 @@ class TestStore : public NullMessageStore {
private:
static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ TestStoreOptions options;
string name;
Broker& broker;
vector<Thread> threads;
+ std::auto_ptr<ofstream> dump;
};
const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
@@ -139,7 +166,7 @@ struct TestStorePlugin : public Plugin {
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- boost::shared_ptr<MessageStore> p(new TestStore(options.name, *broker));
+ boost::shared_ptr<MessageStore> p(new TestStore(options, *broker));
broker->setStore (p);
}
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index 9fa79a220b..83d6c44d84 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -259,15 +259,15 @@ class Cluster:
self.args += [ "--load-module", BrokerTest.cluster_lib ]
self.start_n(count, expect=expect, wait=wait)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
log.debug("Cluster %s starting member %s" % (self.name, name))
- self._brokers.append(self.test.broker(self.args, name, expect, wait))
+ self._brokers.append(self.test.broker(self.args+args, name, expect, wait))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True):
- for i in range(count): self.start(expect=expect, wait=wait)
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -289,6 +289,7 @@ class BrokerTest(TestCase):
receiver_exec = os.getenv("RECEIVER_EXEC")
sender_exec = os.getenv("SENDER_EXEC")
store_lib = os.getenv("STORE_LIB")
+ test_store_lib = os.getenv("TEST_STORE_LIB")
rootdir = os.getcwd()
def configure(self, config): self.config=config