summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-08-03 15:33:12 +0000
committerAlan Conway <aconway@apache.org>2010-08-03 15:33:12 +0000
commit9e7d3b38762eeb806941ae3fe0e8bdc70561fbe3 (patch)
treefd55d91c8e2fd3281707a54f56dcb0de2813bf52
parent3af9c51bef0768a75ecd417076154fe6a0ae11ce (diff)
downloadqpid-python-9e7d3b38762eeb806941ae3fe0e8bdc70561fbe3.tar.gz
Disable non-0 session timeouts.
Since session resume is not fully implemented, non-0 session timeouts are of no use. Moreover the partial implementation causes problems in a cluster as stale sessions kept alive by a timeout can interfere with failover and updates. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@981933 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/SessionState.cpp5
-rw-r--r--cpp/src/qpid/SessionState.h4
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionState.h2
-rw-r--r--cpp/src/tests/BrokerFixture.h8
-rw-r--r--cpp/src/tests/ClusterFailover.cpp53
-rw-r--r--cpp/src/tests/ClusterFixture.h2
8 files changed, 66 insertions, 15 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 4f370c6765..e5019604d2 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -95,6 +95,9 @@ SessionState::SendState::SendState() : unflushedSize(), replaySize(), bytesSince
SessionState::ReceiveState::ReceiveState() : bytesSinceKnownCompleted() {}
+uint32_t SessionState::getTimeout() const { return timeout; }
+void SessionState::setTimeout(uint32_t seconds) { timeout = seconds; }
+
SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
@@ -240,7 +243,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) :
replayFlushLimit(flush), replayHardLimit(hard) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false)
+ : id(i), timeout(0), config(c), stateful(false), receiverTrackingDisabled(false)
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index da28738546..02853b1143 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -92,8 +92,8 @@ class SessionState {
const SessionId& getId() const { return id; }
- uint32_t getTimeout() const { return timeout; }
- void setTimeout(uint32_t seconds) { timeout = seconds; }
+ QPID_COMMON_EXTERN virtual uint32_t getTimeout() const;
+ QPID_COMMON_EXTERN virtual void setTimeout(uint32_t seconds);
bool operator==(const SessionId& other) const { return id == other; }
bool operator==(const SessionState& other) const { return id == other.id; }
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 2448e9ef26..b113d49a73 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -205,7 +205,7 @@ void SessionHandler::handleDetach() {
void SessionHandler::requestTimeout(uint32_t t) {
checkAttached();
getState()->setTimeout(t);
- peer.timeout(t);
+ peer.timeout(getState()->getTimeout());
}
void SessionHandler::timeout(uint32_t t) {
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index be4f8c7b40..426ef190dd 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -380,6 +380,11 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+// Session resume is not fully implemented so it is useless to set a
+// non-0 timeout. Moreover it creates problems in a cluster because
+// dead sessions are kept and interfere with failover.
+void SessionState::setTimeout(uint32_t) { }
+
framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
return handler->getClusterOrderProxy();
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index eade93ddaa..3dcb0a62d4 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -92,6 +92,8 @@ class SessionState : public qpid::SessionState,
Broker& getBroker();
+ void setTimeout(uint32_t seconds);
+
/** OutputControl **/
void abort();
void activateOutput();
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 566fbda406..29920bb71a 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -122,10 +122,10 @@ struct ClientT {
qpid::client::LocalQueue lq;
std::string name;
- ClientT(uint16_t port, const std::string& name_=std::string())
- : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {}
- ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
- : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
+ ClientT(uint16_t port, const std::string& name_=std::string(), int timeout=0)
+ : connection(port), session(connection.newSession(name_,timeout)), subs(session), name(name_) {}
+ ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string(), int timeout=0)
+ : connection(settings), session(connection.newSession(name_, timeout)), subs(session), name(name_) {}
~ClientT() { close(); }
void close() { session.close(); connection.close(); }
diff --git a/cpp/src/tests/ClusterFailover.cpp b/cpp/src/tests/ClusterFailover.cpp
index 6b1ef99807..06fc1c06be 100644
--- a/cpp/src/tests/ClusterFailover.cpp
+++ b/cpp/src/tests/ClusterFailover.cpp
@@ -50,19 +50,60 @@ using boost::shared_ptr;
// Timeout for tests that wait for messages
const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+ClusterFixture::Args getArgs() {
+ ClusterFixture::Args args;
+ args += "--auth", "no", "--no-module-dir", "--load-module", getLibPath("CLUSTER_LIB");
+ return args;
+}
// Test re-connecting with same session name after a failure.
QPID_AUTO_TEST_CASE(testReconnectSameSessionName) {
- ostringstream clusterLib;
- clusterLib << getLibPath("CLUSTER_LIB");
- ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str());
- ClusterFixture cluster(2, args, -1);
- Client c0(cluster[0], "foo");
+ ClusterFixture cluster(2, getArgs(), -1);
+ // Specify a timeout to make sure it is ignored, session resume is
+ // not implemented so sessions belonging to dead brokers should
+ // not be kept.
+ Client c0(cluster[0], "foo", 5);
BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); // wait for both.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("sendme", "q"));
+ BOOST_CHECK_EQUAL(c0.subs.get("q").getData(), "sendme");
cluster.killWithSilencer(0, c0.connection, 9);
- Client c1(cluster[1], "foo"); // Using same name, should be cleaned up.
+ Client c1(cluster[1], "foo", 5);
+ c1.session.queueQuery(); // Try to use the session.
}
+QPID_AUTO_TEST_CASE(testReconnectExclusiveQueue) {
+ // Regresion test. Session timeouts should be ignored
+ // by the broker as session resume is not implemented.
+ ClusterFixture cluster(2, getArgs(), -1);
+ Client c0(cluster[0], "foo", 5);
+ c0.session.queueDeclare("exq", arg::exclusive=true);
+ SubscriptionSettings settings;
+ settings.exclusive = true;
+ settings.autoAck = 0;
+ Subscription s0 = c0.subs.subscribe(c0.lq, "exq", settings, "exsub");
+ c0.session.messageTransfer(arg::content=Message("sendme", "exq"));
+ BOOST_CHECK_EQUAL(c0.lq.get().getData(), "sendme");
+
+ // Regression: core dump on exit if unacked messages were left in
+ // a session with a timeout.
+ cluster.kill(0);
+
+ // Regression: session timeouts prevented re-connecting to
+ // exclusive queue.
+ Client c1(cluster[1]);
+ c1.session.queueDeclare("exq", arg::exclusive=true);
+ Subscription s1 = c1.subs.subscribe(c1.lq, "exq", settings, "exsub");
+ s1.cancel();
+
+ // Regression: session timeouts prevented new member joining
+ // cluster with exclusive queues.
+ cluster.add();
+ Client c2(cluster[2]);
+ c2.session.queueQuery();
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h
index 1eee32b9a4..f548ff9376 100644
--- a/cpp/src/tests/ClusterFixture.h
+++ b/cpp/src/tests/ClusterFixture.h
@@ -89,7 +89,7 @@ class ClusterFixture : public vector<uint16_t> {
/** Kill a forked broker with sig, or shutdown localBroker. */
void kill(size_t n, int sig=SIGINT);
- /** Kill a broker and suppressing errors from closing connection c. */
+ /** Kill a broker and suppress errors from closing connection c. */
void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
private: