diff options
author | Alan Conway <aconway@apache.org> | 2010-08-03 15:33:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-08-03 15:33:12 +0000 |
commit | 9e7d3b38762eeb806941ae3fe0e8bdc70561fbe3 (patch) | |
tree | fd55d91c8e2fd3281707a54f56dcb0de2813bf52 | |
parent | 3af9c51bef0768a75ecd417076154fe6a0ae11ce (diff) | |
download | qpid-python-9e7d3b38762eeb806941ae3fe0e8bdc70561fbe3.tar.gz |
Disable non-0 session timeouts.
Since session resume is not fully implemented, non-0 session timeouts
are of no use. Moreover the partial implementation causes problems in
a cluster as stale sessions kept alive by a timeout can interfere with
failover and updates.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@981933 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 8 | ||||
-rw-r--r-- | cpp/src/tests/ClusterFailover.cpp | 53 | ||||
-rw-r--r-- | cpp/src/tests/ClusterFixture.h | 2 |
8 files changed, 66 insertions, 15 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 4f370c6765..e5019604d2 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -95,6 +95,9 @@ SessionState::SendState::SendState() : unflushedSize(), replaySize(), bytesSince SessionState::ReceiveState::ReceiveState() : bytesSinceKnownCompleted() {} +uint32_t SessionState::getTimeout() const { return timeout; } +void SessionState::setTimeout(uint32_t seconds) { timeout = seconds; } + SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; } SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } @@ -240,7 +243,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) : replayFlushLimit(flush), replayHardLimit(hard) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false) + : id(i), timeout(0), config(c), stateful(false), receiverTrackingDisabled(false) { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index da28738546..02853b1143 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -92,8 +92,8 @@ class SessionState { const SessionId& getId() const { return id; } - uint32_t getTimeout() const { return timeout; } - void setTimeout(uint32_t seconds) { timeout = seconds; } + QPID_COMMON_EXTERN virtual uint32_t getTimeout() const; + QPID_COMMON_EXTERN virtual void setTimeout(uint32_t seconds); bool operator==(const SessionId& other) const { return id == other; } bool operator==(const SessionState& other) const { return id == other.id; } diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 2448e9ef26..b113d49a73 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -205,7 +205,7 @@ void SessionHandler::handleDetach() { void SessionHandler::requestTimeout(uint32_t t) { checkAttached(); getState()->setTimeout(t); - peer.timeout(t); + peer.timeout(getState()->getTimeout()); } void SessionHandler::timeout(uint32_t t) { diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index be4f8c7b40..426ef190dd 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -380,6 +380,11 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } +// Session resume is not fully implemented so it is useless to set a +// non-0 timeout. Moreover it creates problems in a cluster because +// dead sessions are kept and interfere with failover. +void SessionState::setTimeout(uint32_t) { } + framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { return handler->getClusterOrderProxy(); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index eade93ddaa..3dcb0a62d4 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -92,6 +92,8 @@ class SessionState : public qpid::SessionState, Broker& getBroker(); + void setTimeout(uint32_t seconds); + /** OutputControl **/ void abort(); void activateOutput(); diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 566fbda406..29920bb71a 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -122,10 +122,10 @@ struct ClientT { qpid::client::LocalQueue lq; std::string name; - ClientT(uint16_t port, const std::string& name_=std::string()) - : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {} - ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string()) - : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {} + ClientT(uint16_t port, const std::string& name_=std::string(), int timeout=0) + : connection(port), session(connection.newSession(name_,timeout)), subs(session), name(name_) {} + ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string(), int timeout=0) + : connection(settings), session(connection.newSession(name_, timeout)), subs(session), name(name_) {} ~ClientT() { close(); } void close() { session.close(); connection.close(); } diff --git a/cpp/src/tests/ClusterFailover.cpp b/cpp/src/tests/ClusterFailover.cpp index 6b1ef99807..06fc1c06be 100644 --- a/cpp/src/tests/ClusterFailover.cpp +++ b/cpp/src/tests/ClusterFailover.cpp @@ -50,19 +50,60 @@ using boost::shared_ptr; // Timeout for tests that wait for messages const sys::Duration TIMEOUT=sys::TIME_SEC/4; +ClusterFixture::Args getArgs() { + ClusterFixture::Args args; + args += "--auth", "no", "--no-module-dir", "--load-module", getLibPath("CLUSTER_LIB"); + return args; +} // Test re-connecting with same session name after a failure. QPID_AUTO_TEST_CASE(testReconnectSameSessionName) { - ostringstream clusterLib; - clusterLib << getLibPath("CLUSTER_LIB"); - ClusterFixture::Args args = list_of<string>("--auth")("no")("--no-module-dir")("--no-data-dir")("--load-module")(clusterLib.str()); - ClusterFixture cluster(2, args, -1); - Client c0(cluster[0], "foo"); + ClusterFixture cluster(2, getArgs(), -1); + // Specify a timeout to make sure it is ignored, session resume is + // not implemented so sessions belonging to dead brokers should + // not be kept. + Client c0(cluster[0], "foo", 5); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); // wait for both. + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("sendme", "q")); + BOOST_CHECK_EQUAL(c0.subs.get("q").getData(), "sendme"); cluster.killWithSilencer(0, c0.connection, 9); - Client c1(cluster[1], "foo"); // Using same name, should be cleaned up. + Client c1(cluster[1], "foo", 5); + c1.session.queueQuery(); // Try to use the session. } +QPID_AUTO_TEST_CASE(testReconnectExclusiveQueue) { + // Regresion test. Session timeouts should be ignored + // by the broker as session resume is not implemented. + ClusterFixture cluster(2, getArgs(), -1); + Client c0(cluster[0], "foo", 5); + c0.session.queueDeclare("exq", arg::exclusive=true); + SubscriptionSettings settings; + settings.exclusive = true; + settings.autoAck = 0; + Subscription s0 = c0.subs.subscribe(c0.lq, "exq", settings, "exsub"); + c0.session.messageTransfer(arg::content=Message("sendme", "exq")); + BOOST_CHECK_EQUAL(c0.lq.get().getData(), "sendme"); + + // Regression: core dump on exit if unacked messages were left in + // a session with a timeout. + cluster.kill(0); + + // Regression: session timeouts prevented re-connecting to + // exclusive queue. + Client c1(cluster[1]); + c1.session.queueDeclare("exq", arg::exclusive=true); + Subscription s1 = c1.subs.subscribe(c1.lq, "exq", settings, "exsub"); + s1.cancel(); + + // Regression: session timeouts prevented new member joining + // cluster with exclusive queues. + cluster.add(); + Client c2(cluster[2]); + c2.session.queueQuery(); +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h index 1eee32b9a4..f548ff9376 100644 --- a/cpp/src/tests/ClusterFixture.h +++ b/cpp/src/tests/ClusterFixture.h @@ -89,7 +89,7 @@ class ClusterFixture : public vector<uint16_t> { /** Kill a forked broker with sig, or shutdown localBroker. */ void kill(size_t n, int sig=SIGINT); - /** Kill a broker and suppressing errors from closing connection c. */ + /** Kill a broker and suppress errors from closing connection c. */ void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT); private: |