summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-24 17:03:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-24 17:03:19 +0000
commit283aaa337b4ab30cd59c321ef6abe20cb1dd53a5 (patch)
treed9c2f329223ff6a08ae20cc878bcfbfb6a2af3a3
parent059f79ddbc64283ac4f65dd34b9c0044dec69e02 (diff)
downloadqpid-python-283aaa337b4ab30cd59c321ef6abe20cb1dd53a5.tar.gz
Don't failover if the client is closing the connection, check if connection is closed on send
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829414 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java16
4 files changed, 33 insertions, 27 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 97d0d0516e..e1d9ae735c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -89,11 +89,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
// TODO: use system property thingy for this
- if (System.getProperty("UseTransportIo", "false").equals("false"))
+ if (System.getProperty("UseTransportIo", "false").equals("false"))
{
TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- }
- else
+ }
+ else
{
_conn.getProtocolHandler().createIoTransportSession(brokerDetail);
}
@@ -197,7 +197,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
* Low = MaxPrefetch / 2
* @return XASession
* @throws JMSException thrown if there is a problem creating the session.
- */
+ */
public XASession createXASession() throws JMSException
{
return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
@@ -214,7 +214,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
// todo Be aware of possible changes to parameter order as versions change.
BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
_conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
-
+
if (transacted)
{
if (_logger.isDebugEnabled())
@@ -222,7 +222,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_logger.debug("Issuing TxSelect for " + channelId);
}
TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
+
// TODO: Be aware of possible changes to parameter order as versions change.
_conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
@@ -299,7 +299,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
}
-
+
public void setIdleTimeout(long l){}
public int getMaxChannelID()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 44ce59975a..df59be25d0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -46,14 +46,14 @@ import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
- enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
-
+ enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
+
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
/**
- * If true, messages will not get a timestamp.
+ * If true, messages will not get a timestamp.
*/
protected boolean _disableTimestamps;
@@ -105,7 +105,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private long _producerId;
/**
- * The session used to create this producer
+ * The session used to create this producer
*/
protected AMQSession _session;
@@ -118,11 +118,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
-
+
protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
-
+
protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -145,14 +145,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
- setPublishMode();
+ setPublishMode();
}
-
+
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
// Will add support for this when we provide a more robust binding URL
-
+
String syncPub = _connection.getSyncPublish();
// Support for deprecated option sync_persistence
if (syncPub.equals("persistent") || _connection.getSyncPersistence())
@@ -163,7 +163,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
-
+
_logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
}
@@ -277,6 +277,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
checkPreConditions();
checkInitialDestination();
+
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -548,6 +549,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
+ if(_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection closed");
+ }
}
private void checkInitialDestination()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 6b88b6a22d..f74dbba939 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -93,7 +93,6 @@ public class FailoverHandler implements Runnable
* Creates a failover handler on a protocol session, for a particular MINA session (network connection).
*
* @param amqProtocolHandler The protocol handler that spans the failover.
- * @param session The MINA session, for the failing connection.
*/
public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
{
@@ -191,7 +190,7 @@ public class FailoverHandler implements Runnable
}
else
{
- // Set the new Protocol Session in the StateManager.
+ // Set the new Protocol Session in the StateManager.
existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
// Now that the ProtocolHandler has been reconnected clean up
@@ -199,7 +198,7 @@ public class FailoverHandler implements Runnable
// it any old exception that had occured prior to failover may
// prohibit reconnection.
// e.g. During testing when the broker is shutdown gracefully.
- // The broker
+ // The broker
// Clear any exceptions we gathered
if (existingStateManager.getCurrentState() != AMQState.CONNECTION_OPEN)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index ac7f441a13..6500a82818 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -269,12 +269,15 @@ public class AMQProtocolHandler implements ProtocolEngine
/** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
- Thread failoverThread = new Thread(_failoverHandler);
- failoverThread.setName("Failover");
- // Do not inherit daemon-ness from current thread as this can be a daemon
- // thread such as a AnonymousIoService thread.
- failoverThread.setDaemon(false);
- failoverThread.start();
+ if(!_connection.isClosed())
+ {
+ Thread failoverThread = new Thread(_failoverHandler);
+ failoverThread.setName("Failover");
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.setDaemon(false);
+ failoverThread.start();
+ }
}
public void readerIdle()
@@ -298,7 +301,6 @@ public class AMQProtocolHandler implements ProtocolEngine
*/
public void exception(Throwable cause)
{
- _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))