diff options
author | Robert Gemmell <robbie@apache.org> | 2011-10-21 10:14:04 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-10-21 10:14:04 +0000 |
commit | f505cdd862dbd80c2f44331c30f60fbb8432a226 (patch) | |
tree | 0ccca692f9d94867e162839168f1ab7af0e30167 /java/common/src | |
parent | 322aa81b9fa8a358529421fc30b5a7846c11eceb (diff) | |
download | qpid-python-f505cdd862dbd80c2f44331c30f60fbb8432a226.tar.gz |
QPID-3532: make the 0-10 client hold the failover mutex during the failover. Alter the Address resolution code to allow resolving addresses after failover. Add some more failover tests (inc ADDR based ones). Make the failover process notify any waiters in the session to abort and let failover proceed.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1187279 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Connection.java | 13 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Session.java | 58 |
2 files changed, 57 insertions, 14 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 1c521244d0..06c5c83031 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (Session ssn : channels.values()) - { - ssn.closeCode(close); - } ConnectionCloseCode code = close.getReplyCode(); if (code != ConnectionCloseCode.NORMAL) { @@ -705,4 +701,13 @@ public class Connection extends ConnectionInvoker { return sessions.containsKey(new Binary(name.getBytes())); } + + public void notifyFailoverRequired() + { + List<Session> values = new ArrayList<Session>(channels.values()); + for (Session ssn : values) + { + ssn.notifyFailoverRequired(); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index b732191707..321e5256b2 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Session @@ -125,6 +126,8 @@ public class Session extends SessionInvoker private SessionDetachCode detachCode; private final Object stateLock = new Object(); + private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); @@ -257,6 +260,7 @@ public class Session extends SessionInvoker void resume() { + _failoverRequired.set(false); synchronized (commands) { attach(); @@ -459,7 +463,7 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED || state == CLOSING) + if (state == DETACHED || state == CLOSING || state == CLOSED) { return; } @@ -595,11 +599,12 @@ public class Session extends SessionInvoker if (state != OPEN && state != CLOSED && state != CLOSING) { Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (!current.equals(resumer) ) { Waiter w = new Waiter(commands, timeout); while (w.hasTime() && (state != OPEN && state != CLOSED)) { + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -668,6 +673,7 @@ public class Session extends SessionInvoker } } } + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -762,6 +768,14 @@ public class Session extends SessionInvoker } } + private void checkFailoverRequired(String message) + { + if (_failoverRequired.get()) + { + throw new SessionException(message); + } + } + protected boolean shouldIssueFlush(int next) { return (next % 65536) == 0; @@ -787,6 +801,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { + checkFailoverRequired("Session sync was interrupted by failover."); log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); w.await(); } @@ -847,13 +862,6 @@ public class Session extends SessionInvoker } } - private ConnectionClose close = null; - - void closeCode(ConnectionClose close) - { - this.close = close; - } - ExecutionException getException() { synchronized (results) @@ -904,6 +912,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(this, timeout); while (w.hasTime() && state != CLOSED && !isDone()) { + checkFailoverRequired("Operation was interrupted by failover."); log.debug("%s waiting for result: %s", Session.this, this); w.await(); } @@ -915,7 +924,12 @@ public class Session extends SessionInvoker } else if (state == CLOSED) { - throw new SessionException(getException()); + ExecutionException ex = getException(); + if(ex == null) + { + throw new SessionClosedException(); + } + throw new SessionException(ex); } else { @@ -995,6 +1009,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED) { + checkFailoverRequired("close() was interrupted by failover."); w.await(); } @@ -1089,6 +1104,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(stateLock, timeout); while (w.hasTime() && state == NEW) { + checkFailoverRequired("Session opening was interrupted by failover."); w.await(); } } @@ -1111,4 +1127,26 @@ public class Session extends SessionInvoker { return stateLock; } + + protected void notifyFailoverRequired() + { + //ensure any operations waiting are aborted to + //prevent them waiting for timeout for 60 seconds + //and possibly preventing failover proceeding + _failoverRequired.set(true); + synchronized (commands) + { + commands.notifyAll(); + } + synchronized (results) + { + for (ResultFuture<?> result : results.values()) + { + synchronized(result) + { + result.notifyAll(); + } + } + } + } } |