summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Message.cpp15
-rw-r--r--cpp/src/qpid/broker/Message.h4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--cpp/src/tests/cluster_test.cpp65
6 files changed, 93 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 2c3c444baa..b912cd3a1c 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -401,4 +401,19 @@ framing::FieldTable& Message::getOrInsertHeaders()
return getProperties<MessageProperties>()->getApplicationHeaders();
}
+
+void Message::setUpdateDestination(const std::string& d)
+{
+ updateDestination = d;
+}
+
+
+bool Message::isUpdateMessage()
+{
+ return updateDestination.size() && isA<MessageTransferBody>()
+ && getMethod<MessageTransferBody>()->getDestination() == updateDestination;
+}
+
+std::string Message::updateDestination;
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 94ce7561a7..2c75d945fa 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -160,6 +160,9 @@ public:
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
+ bool isUpdateMessage();
+ static void setUpdateDestination(const std::string&);
+
private:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
@@ -186,6 +189,7 @@ public:
mutable boost::intrusive_ptr<Message> empty;
MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
+ static std::string updateDestination;
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 759a38d919..30be733f89 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -570,7 +570,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
string key = ft->getAsString(qpidVQMatchProperty);
i = lvq.find(key);
- if (i == lvq.end()){
+ if (i == lvq.end() || msg->isUpdateMessage()){
messages.push_back(qm);
listeners.populate(copy);
lvq[key] = msg;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 56e349b06b..dbad5e12ed 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -300,9 +300,15 @@ namespace qpid {
ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
/** Apply f to each Message on the queue. */
- template <class F> void eachMessage(F f) const {
+ template <class F> void eachMessage(F f) {
sys::Mutex::ScopedLock l(messageLock);
- std::for_each(messages.begin(), messages.end(), f);
+ if (lastValueQueue) {
+ for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
+ f(checkLvqReplace(*i));
+ }
+ } else {
+ std::for_each(messages.begin(), messages.end(), f);
+ }
}
/** Apply f to each QueueBinding on the queue */
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index c2c07c052a..695bd8bfeb 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -34,6 +34,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/management/IdAllocator.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionState.h"
#include "qpid/client/ConnectionSettings.h"
@@ -136,6 +137,7 @@ struct ClusterPlugin : public Plugin {
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
+ broker::Message::setUpdateDestination(UpdateClient::UPDATE);
ManagementAgent* mgmt = broker->getManagementAgent();
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index b87bbf4aa5..50ca241b5d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -936,10 +936,13 @@ void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::ve
}
}
-void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m",
+ const std::string& lvqKey="")
{
for (int i = 0; i < count; i++) {
- client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag));
+ Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag);
+ if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey);
+ client.session.messageTransfer(arg::content=message);
}
}
@@ -998,6 +1001,64 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
}
}
+QPID_AUTO_TEST_CASE(testLvqUpdate) {
+ //tests that lvqs are accurately replicated on newly joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setOrdering(LVQ);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+ send(c1, "q", 5, 1, "a", "a");
+ send(c1, "q", 2, 1, "b", "b");
+ send(c1, "q", 1, 1, "c", "c");
+ send(c1, "q", 1, 3, "b", "b");
+
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1"));
+ }
+}
+
+
+QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) {
+ //tests that lvqs are accurately replicated on newly joined nodes
+ //if the lvq state has been affected by browsers
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ {
+ ScopedSuppressLogging allQuiet;
+ QueueOptions options;
+ options.setOrdering(LVQ);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+ send(c1, "q", 1, 1, "a", "a");
+ send(c1, "q", 2, 1, "b", "b");
+ send(c1, "q", 1, 1, "c", "c");
+ checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1"));
+ send(c1, "q", 4, 2, "a", "a");
+ send(c1, "q", 1, 3, "b", "b");
+
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3"));
+ }
+}
+
QPID_AUTO_TEST_CASE(testRelease) {
//tests that releasing a messages that was unacked when one node
//joined works correctly