summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-18 14:59:31 +0000
committerAlan Conway <aconway@apache.org>2012-04-18 14:59:31 +0000
commit372976bc6de5a176f16069ed552b4c2fec9ee8fc (patch)
treea8422e1983b93e3d9714ec08bf55d0da70e47515
parent5cd161cf2311d743aee1d3b0c724064043080e38 (diff)
downloadqpid-python-372976bc6de5a176f16069ed552b4c2fec9ee8fc.tar.gz
QPID-3352: Fix test for failed session to avoid confusion with as yet uninitialised session
Previously, Link was using sessionHandler::isReady() to determine if a bridge had failed and needed recovery. However this incorrectcly recovers bridges that are not yet initialized as well as those that have failed. This was causing sporadic core dumps in serveral of the ha_tests.py tests, in particular test_backup_acquired. This patch adds a callback to notify the bridge when the session is detached, and use this as the criteria for recovering a bridge. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327532 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h10
-rwxr-xr-xqpid/cpp/src/tests/run_federation_tests15
6 files changed, 37 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 9a1f4be468..5b531e4636 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -62,7 +62,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
- initialize(init)
+ initialize(init), detached(false)
{
std::stringstream title;
title << id << "_" << name;
@@ -85,11 +85,14 @@ Bridge::~Bridge()
void Bridge::create(Connection& c)
{
+ detached = false; // Reset detached in case we are recovering.
connState = &c;
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
SessionHandler& sessionHandler = c.getChannel(id);
+ sessionHandler.setDetachedCallback(
+ boost::bind(&Bridge::sessionDetached, shared_from_this()));
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
@@ -179,12 +182,6 @@ void Bridge::destroy()
listener(this);
}
-bool Bridge::isSessionReady() const
-{
- SessionHandler& sessionHandler = conn->getChannel(id);
- return sessionHandler.ready();
-}
-
void Bridge::setPersistenceId(uint64_t pId) const
{
persistenceId = pId;
@@ -336,4 +333,8 @@ const string& Bridge::getLocalTag() const
return link->getBroker()->getFederationTag();
}
+void Bridge::sessionDetached() {
+ detached = true;
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index b849b11ba8..32b9fd1781 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -33,6 +33,7 @@
#include "qmf/org/apache/qpid/broker/Bridge.h"
#include <boost/function.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <memory>
namespace qpid {
@@ -44,7 +45,10 @@ class Link;
class LinkRegistry;
class SessionHandler;
-class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
+class Bridge : public PersistableConfig,
+ public management::Manageable,
+ public Exchange::DynamicBridge,
+ public boost::enable_shared_from_this<Bridge>
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -63,7 +67,7 @@ public:
void destroy();
bool isDurable() { return args.i_durable; }
- bool isSessionReady() const;
+ bool isDetached() const { return detached; }
management::ManagementObject* GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
@@ -90,6 +94,9 @@ public:
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
private:
+ // Callback when the bridge's session is detached.
+ void sessionDetached();
+
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
@@ -112,7 +119,7 @@ private:
ConnectionState* connState;
Connection* conn;
InitializeCallback initialize;
-
+ bool detached; // Set when session is detached.
bool resetProxy();
};
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index c90e748077..855063a6ad 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -312,7 +312,7 @@ void Link::ioThreadProcessing()
// check for bridge session errors and recover
if (!active.empty()) {
Bridges::iterator removed = std::remove_if(
- active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+ active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1));
for (Bridges::iterator i = removed; i != active.end(); ++i) {
Bridge::shared_ptr bridge = *i;
bridge->closed();
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 752fa55535..b58c7c01c5 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -64,6 +64,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
+ if (detachedCallback) detachedCallback();
connection.closeChannel(channel.get());
}
@@ -117,4 +118,8 @@ void SessionHandler::attached(const std::string& name)
}
}
+void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
+ detachedCallback = cb;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 8cd5072574..4e2cfaa963 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,6 +25,7 @@
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include <boost/function.hpp>
namespace qpid {
class SessionState;
@@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
* This proxy is for sending such commands. In a clustered broker it will take steps
* to synchronize command order across the cluster. In a stand-alone broker
* it is just a synonym for getProxy()
- */
+ */
framing::AMQP_ClientProxy& getClusterOrderProxy() {
return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
}
@@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
+ void setDetachedCallback(boost::function<void()> cb);
+
protected:
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
@@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
std::auto_ptr<SetChannelProxy> clusterOrderProxy;
+ boost::function<void ()> detachedCallback;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests
index b71fa14c47..7735b559cf 100755
--- a/qpid/cpp/src/tests/run_federation_tests
+++ b/qpid/cpp/src/tests/run_federation_tests
@@ -33,16 +33,13 @@ else
SKIPTESTS='-i *_xml' # note: single quotes prevent expansion of *
fi
+QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no --log-enable=info+ --log-enable=debug+:Bridge --log-to-file"
start_brokers() {
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- LOCAL_PORT=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- REMOTE_PORT=`cat qpidd.port`
-
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- REMOTE_B1=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- REMOTE_B2=`cat qpidd.port`
+ rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log
+ LOCAL_PORT=$($QPIDD_CMD fed_local.log)
+ REMOTE_PORT=$($QPIDD_CMD fed_remote.log)
+ REMOTE_B1=$($QPIDD_CMD fed_b1.log)
+ REMOTE_B2=$($QPIDD_CMD fed_b2.log)
}
stop_brokers() {