diff options
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 7 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 8 |
4 files changed, 27 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 34916c2d1e..befaaa31ff 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -59,7 +59,13 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { QPID_LOG(info, *this << "Created, settings: " << q->getSettings()); +} +// This must be separate from the constructor so we can call shared_from_this. +void QueueReplicator::activate() { + // Take a reference to myself to ensure not deleted before initializeBridge + // is called. + self = shared_from_this(); queue->getBroker()->getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -72,19 +78,19 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L "", // excludes false, // dynamic 0, // sync? + // Include shared_ptr to self to ensure we not deleted before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, this, _1, _2) ); } QueueReplicator::~QueueReplicator() { - // FIXME aconway 2011-12-21: causes race condition? Restore. -// queue->getBroker()->getLinks().destroy( -// link->getHost(), link->getPort(), queue->getName(), getName(), string()); + queue->getBroker()->getLinks().destroy( + link->getHost(), link->getPort(), queue->getName(), getName(), string()); } // Called in a broker connection thread when the bridge is created. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) -{ +// shared_ptr to self is just to ensure we are still in memory. +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; @@ -107,6 +113,8 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest); + // Reset self reference so this will be deleted when all external refs are gone. + self.reset(); } namespace { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 518e97f754..5bdafb83c8 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -23,6 +23,7 @@ */ #include "qpid/broker/Exchange.h" #include "qpid/framing/SequenceSet.h" +#include <boost/enable_shared_from_this.hpp> #include <iosfwd> namespace qpid { @@ -47,7 +48,8 @@ namespace ha { * * THREAD UNSAFE: Only called in the connection thread of the source queue. */ -class QueueReplicator : public broker::Exchange +class QueueReplicator : public broker::Exchange, + public boost::enable_shared_from_this<QueueReplicator> { public: static const std::string DEQUEUE_EVENT_KEY; @@ -57,6 +59,8 @@ class QueueReplicator : public broker::Exchange QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); + void activate(); + std::string getType() const; bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); @@ -70,7 +74,7 @@ class QueueReplicator : public broker::Exchange sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; - + boost::shared_ptr<QueueReplicator> self; friend std::ostream& operator<<(std::ostream&, const QueueReplicator&); }; diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 4a192cd91e..416f516f7b 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -295,9 +295,8 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) { name, values[USER].asString(), values[RHOST].asString()); - // FIXME aconway 2011-12-21: casuses race conditions? Restore. -// // Also delete the QueueReplicator exchange for this queue. -// broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); + // Delete the QueueReplicator exchange for this queue. + broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); } } @@ -449,9 +448,9 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { } void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed. if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + qr->activate(); broker.getExchanges().registerExchange(qr); } } diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index a74487310d..d836ed709c 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -30,7 +30,7 @@ RECEIVERS="-r 3" BROKERS= # Local broker CLIENT_HOSTS= # No ssh, all clients are local -while getopts "m:f:n:b:q:s:r:c:txyv" opt; do +while getopts "m:f:n:b:q:s:r:c:txyv-" opt; do case $opt in m) MESSAGES="-m $OPTARG";; f) FLOW="--flow-control $OPTARG";; @@ -44,14 +44,16 @@ while getopts "m:f:n:b:q:s:r:c:txyv" opt; do x) SAVE_RECEIVED="--save-received";; y) NO_DELETE="--no-delete";; v) OPTS="--verbose";; + -) break ;; *) echo "Unknown option"; exit 1;; esac done +shift $(($OPTIND-1)) + REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}" BROKER=$(echo $BROKERS | sed s/,.*//) run_test() { echo $*; shift; "$@"; echo; echo; echo; } OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE" OPTS="$OPTS --create-option $REPLICATE" -run_test "Benchmark:" qpid-cpp-benchmark $OPTS - +run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@" |