summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-29 20:01:33 +0000
committerGordon Sim <gsim@apache.org>2008-04-29 20:01:33 +0000
commite2c3c63774918a303ea495b3c01a1601fde78bed (patch)
treeec2a91389821653b1270b9d8493e6e6114c961b8 /cpp/src
parent9f153bc328112ed2ee25a801eff1f6a277c7bb19 (diff)
downloadqpid-python-e2c3c63774918a303ea495b3c01a1601fde78bed.tar.gz
QPID-977: shutdown mgmt client cleanly in federation tests (patch from tross@redhat.com)
QPID-981: added custom options to queue declare to tag each message as it goes through a bridge queue and allow loop prevention through specifying exclusions git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652075 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp11
-rw-r--r--cpp/src/qpid/broker/Message.cpp46
-rw-r--r--cpp/src/qpid/broker/Message.h6
-rw-r--r--cpp/src/qpid/broker/Queue.cpp27
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rwxr-xr-xcpp/src/tests/federation.py90
6 files changed, 178 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index ea9a41ac9d..8c4b5d6faf 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -59,7 +59,16 @@ void Bridge::create()
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
- peer.getQueue().declare(queue, "", false, false, true, true, FieldTable());
+ FieldTable queueSettings;
+ if (args.i_id.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_id);
+ }
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ }
+ bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
+ bool autoDelete = !durable;//auto delete transient queues?
+ peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
peer.getMessage().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index dd013843f9..27076ccad8 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -28,6 +28,8 @@
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
#include "qpid/log/Statement.h"
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string/split.hpp>
using boost::intrusive_ptr;
using namespace qpid::broker;
@@ -214,6 +216,7 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max
void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
{
+ sys::Mutex::ScopedLock l(lock);
Relay f(out);
frames.map_if(f, TypeFilter<HEADER_BODY>());
}
@@ -243,3 +246,46 @@ bool Message::isContentLoaded() const
{
return loaded;
}
+
+
+namespace
+{
+ const std::string X_QPID_TRACE("x-qpid.trace");
+}
+
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
+{
+ const FieldTable* headers = getApplicationHeaders();
+ if (headers) {
+ std::string traceStr = headers->getString(X_QPID_TRACE);
+ if (traceStr.size()) {
+ std::vector<std::string> trace;
+ boost::split(trace, traceStr, boost::is_any_of(", ") );
+
+ for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
+ for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
+ if (*i == *j) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
+void Message::addTraceId(const std::string& id)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (isA<MessageTransferBody>()) {
+ FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
+ std::string trace = headers.getString(X_QPID_TRACE);
+ if (trace.empty()) {
+ headers.setString(X_QPID_TRACE, id);
+ } else if (trace.find(id) == std::string::npos) {
+ trace += ",";
+ trace += id;
+ headers.setString(X_QPID_TRACE, trace);
+ }
+ }
+}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 87c7a9c43e..e3a214dce1 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -23,11 +23,13 @@
*/
#include <string>
+#include <vector>
#include <boost/shared_ptr.hpp>
#include <boost/variant.hpp>
#include "PersistableMessage.h"
#include "MessageAdapter.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
namespace qpid {
@@ -125,7 +127,11 @@ public:
bool isContentLoaded() const;
+ bool isExcluded(const std::vector<std::string>& excludes) const;
+ void addTraceId(const std::string& id);
+
private:
+ mutable sys::Mutex lock;
framing::FrameSet frames;
mutable boost::shared_ptr<Exchange> exchange;
mutable uint64_t persistenceId;
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e799cde2b9..06009a208d 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -37,6 +37,8 @@
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string/split.hpp>
using namespace qpid::broker;
using namespace qpid::sys;
@@ -105,6 +107,11 @@ bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
+bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
+{
+ return traceExclude.size() && msg->isExcluded(traceExclude);
+}
+
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
@@ -113,7 +120,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
}
} else if (isLocal(msg)) {
//drop message
- QPID_LOG(debug, "Dropping 'local' message from " << getName());
+ QPID_LOG(info, "Dropping 'local' message from " << getName());
+ } else if (isExcluded(msg)) {
+ //drop message
+ QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
// if no store then mark as enqueued
if (!enqueue(0, msg)){
@@ -448,6 +458,10 @@ bool Queue::canAutoDelete() const{
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
+ if (traceId.size()) {
+ msg->addTraceId(traceId);
+ }
+
if (msg->isPersistent() && store) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -477,6 +491,8 @@ namespace
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
const std::string qpidNoLocal("no-local");
+ const std::string qpidTraceIdentity("qpid.trace.id");
+ const std::string qpidTraceExclude("qpid.trace.exclude");
}
void Queue::create(const FieldTable& _settings)
@@ -497,6 +513,15 @@ void Queue::configure(const FieldTable& _settings)
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+
+ traceId = _settings.getString(qpidTraceIdentity);
+ std::string excludeList = _settings.getString(qpidTraceExclude);
+ if (excludeList.size()) {
+ boost::split(traceExclude, excludeList, boost::is_any_of(", ") );
+ }
+ QPID_LOG(info, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
+
if (mgmtObject.get() != 0)
mgmtObject->set_arguments (_settings);
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 18d28d32fb..724f5b049c 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -72,6 +72,8 @@ namespace qpid {
uint32_t consumerCount;
OwnershipToken* exclusive;
bool noLocal;
+ std::string traceId;
+ std::vector<std::string> traceExclude;
Listeners listeners;
Messages messages;
mutable qpid::sys::Mutex consumerLock;
@@ -98,6 +100,8 @@ namespace qpid {
void removeListener(Consumer&);
void addListener(Consumer&);
+ bool isExcluded(boost::intrusive_ptr<Message>& msg);
+
public:
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -120,7 +124,6 @@ namespace qpid {
bool acquire(const QueuedMessage& msg);
- bool isLocal(boost::intrusive_ptr<Message>& msg);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
@@ -174,6 +177,7 @@ namespace qpid {
void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
boost::shared_ptr<Exchange> getAlternateExchange();
+ bool isLocal(boost::intrusive_ptr<Message>& msg);
//PersistableQueue support:
uint64_t getPersistenceId() const;
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 6a022c81ba..ce9c4a8757 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -24,6 +24,11 @@ from qpid.management import managementChannel, managementClient
from qpid.datatypes import Message
from qpid.queue import Empty
+def add_module(args=sys.argv[1:]):
+ for a in args:
+ if a.startswith("federation"):
+ return False
+ return True
def scan_args(name, default=None, args=sys.argv[1:]):
if (name in args):
@@ -56,6 +61,9 @@ class Helper:
self.mch = self.mc.addChannel(self.session)
self.mc.syncWaitForStable(self.mch)
+ def shutdown (self):
+ self.mc.removeChannel (self.mch)
+
def get_objects(self, type):
return self.mc.syncGetObjects(self.mch, type)
@@ -94,6 +102,8 @@ class FederationTests(TestBase010):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
+ mgmt.shutdown ()
+
def test_pull_from_exchange(self):
session = self.session
@@ -135,6 +145,8 @@ class FederationTests(TestBase010):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
+ mgmt.shutdown()
+
def test_pull_from_queue(self):
session = self.session
@@ -158,7 +170,7 @@ class FederationTests(TestBase010):
mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
link = mgmt.get_object("link")
- mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "src_is_queue":1})
+ mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", "src_is_queue":1})
bridge = mgmt.get_object("bridge")
#add some more messages (i.e. after bridge was created)
@@ -167,8 +179,11 @@ class FederationTests(TestBase010):
r_session.message_transfer(message=Message(dp, "Message %d" % i))
for i in range(1, 11):
- msg = queue.get(timeout=5)
- self.assertEqual("Message %d" % i, msg.body)
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message %d'" % i)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected message in queue: " + extra.body)
@@ -181,12 +196,77 @@ class FederationTests(TestBase010):
mgmt.call_method(link, "close")
self.assertEqual(len(mgmt.get_objects("link")), 0)
+ mgmt.shutdown ()
+
+ def test_tracing(self):
+ session = self.session
+
+ mgmt = Helper(self)
+ broker = mgmt.get_object("broker")
+
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
+
+ mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
+ "id":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
+ bridge = mgmt.get_object("bridge")
+
+ #setup queue to receive messages from local broker
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ #send messages to remote broker and confirm it is routed to local broker
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("1")
+
+ trace = [None, "exclude-me", "a,exclude-me,b", "also-exclude-me,c", "dont-exclude-me"]
+ body = ["yes", "first-bad", "second-bad", "third-bad", "yes"]
+ for b, t in zip(body, trace):
+ headers = {}
+ if (t): headers["x-qpid.trace"]=t
+ dp = r_session.delivery_properties(routing_key="my-key")
+ mp = r_session.message_properties(application_headers=headers)
+ r_session.message_transfer(destination="amq.direct", message=Message(dp, mp, b))
+
+ for e in ["my-bridge-id", "dont-exclude-me,my-bridge-id"]:
+ msg = queue.get(timeout=5)
+ self.assertEqual("yes", msg.body)
+ self.assertEqual(e, self.getAppHeader(msg, "x-qpid.trace"))
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ mgmt.call_method(bridge, "close")
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+
+ mgmt.call_method(link, "close")
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+ mgmt.shutdown ()
+
+ def getProperty(self, msg, name):
+ for h in msg.headers:
+ if hasattr(h, name): return getattr(h, name)
+ return None
+
+ def getAppHeader(self, msg, name):
+ headers = self.getProperty(msg, "application_headers")
+ if headers:
+ return headers[name]
+ return None
+
if __name__ == '__main__':
args = sys.argv[1:]
#need to remove the extra options from args as test runner doesn't recognise them
extract_args("--remote-port", args)
extract_args("--remote-host", args)
- #add module(s) to run to testrunners args
- args.append("federation")
+
+ if add_module():
+ #add module(s) to run to testrunners args
+ args.append("federation")
if not testrunner.run(args): sys.exit(1)