diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-24 17:03:19 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-24 17:03:19 +0000 |
commit | 283aaa337b4ab30cd59c321ef6abe20cb1dd53a5 (patch) | |
tree | d9c2f329223ff6a08ae20cc878bcfbfb6a2af3a3 | |
parent | 059f79ddbc64283ac4f65dd34b9c0044dec69e02 (diff) | |
download | qpid-python-283aaa337b4ab30cd59c321ef6abe20cb1dd53a5.tar.gz |
Don't failover if the client is closing the connection, check if connection is closed on send
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829414 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 33 insertions, 27 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 97d0d0516e..e1d9ae735c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -89,11 +89,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); // TODO: use system property thingy for this - if (System.getProperty("UseTransportIo", "false").equals("false")) + if (System.getProperty("UseTransportIo", "false").equals("false")) { TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); - } - else + } + else { _conn.getProtocolHandler().createIoTransportSession(brokerDetail); } @@ -197,7 +197,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate * Low = MaxPrefetch / 2 * @return XASession * @throws JMSException thrown if there is a problem creating the session. - */ + */ public XASession createXASession() throws JMSException { return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); @@ -214,7 +214,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // todo Be aware of possible changes to parameter order as versions change. BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); - + if (transacted) { if (_logger.isDebugEnabled()) @@ -222,7 +222,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _logger.debug("Issuing TxSelect for " + channelId); } TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody(); - + // TODO: Be aware of possible changes to parameter order as versions change. _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); } @@ -299,7 +299,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } } - + public void setIdleTimeout(long l){} public int getMaxChannelID() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 44ce59975a..df59be25d0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -46,14 +46,14 @@ import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { - enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; - + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + protected final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; /** - * If true, messages will not get a timestamp. + * If true, messages will not get a timestamp. */ protected boolean _disableTimestamps; @@ -105,7 +105,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private long _producerId; /** - * The session used to create this producer + * The session used to create this producer */ protected AMQSession _session; @@ -118,11 +118,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private boolean _disableMessageId; private UUIDGen _messageIdGenerator = UUIDs.newGenerator(); - + protected String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; - + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, @@ -145,14 +145,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _mandatory = mandatory; _waitUntilSent = waitUntilSent; _userID = connection.getUsername(); - setPublishMode(); + setPublishMode(); } - + void setPublishMode() { // Publish mode could be configured at destination level as well. // Will add support for this when we provide a more robust binding URL - + String syncPub = _connection.getSyncPublish(); // Support for deprecated option sync_persistence if (syncPub.equals("persistent") || _connection.getSyncPersistence()) @@ -163,7 +163,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { publishMode = PublishMode.SYNC_PUBLISH_ALL; } - + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); } @@ -277,6 +277,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac checkPreConditions(); checkInitialDestination(); + synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -548,6 +549,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { throw new javax.jms.IllegalStateException("Invalid Session"); } + if(_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection closed"); + } } private void checkInitialDestination() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 6b88b6a22d..f74dbba939 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -93,7 +93,6 @@ public class FailoverHandler implements Runnable * Creates a failover handler on a protocol session, for a particular MINA session (network connection). * * @param amqProtocolHandler The protocol handler that spans the failover. - * @param session The MINA session, for the failing connection. */ public FailoverHandler(AMQProtocolHandler amqProtocolHandler) { @@ -191,7 +190,7 @@ public class FailoverHandler implements Runnable } else { - // Set the new Protocol Session in the StateManager. + // Set the new Protocol Session in the StateManager. existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession()); // Now that the ProtocolHandler has been reconnected clean up @@ -199,7 +198,7 @@ public class FailoverHandler implements Runnable // it any old exception that had occured prior to failover may // prohibit reconnection. // e.g. During testing when the broker is shutdown gracefully. - // The broker + // The broker // Clear any exceptions we gathered if (existingStateManager.getCurrentState() != AMQState.CONNECTION_OPEN) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ac7f441a13..6500a82818 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -269,12 +269,15 @@ public class AMQProtocolHandler implements ProtocolEngine /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { - Thread failoverThread = new Thread(_failoverHandler); - failoverThread.setName("Failover"); - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.setDaemon(false); - failoverThread.start(); + if(!_connection.isClosed()) + { + Thread failoverThread = new Thread(_failoverHandler); + failoverThread.setName("Failover"); + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.setDaemon(false); + failoverThread.start(); + } } public void readerIdle() @@ -298,7 +301,6 @@ public class AMQProtocolHandler implements ProtocolEngine */ public void exception(Throwable cause) { - _logger.info("AS: HELLO"); if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) |