summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java25
3 files changed, 22 insertions, 26 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 4b4417b6ef..b0bd8f8e97 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
- _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
for (AMQSession s : sessions)
{
- ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
s.resubscribe();
}
}
-
public void closeConnection(long timeout) throws JMSException, AMQException
{
try
@@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
ConnectionClose close = exc.getClose();
if (close == null)
{
+ _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
+
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
_conn.failoverPrep();
- _qpidConnection.resume();
+ _conn.resubscribeSessions();
_conn.fireFailoverComplete();
return;
}
@@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_logger.error("error during failover", e);
}
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
}
ExceptionListener listener = _conn._exceptionListener;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1eaccf53fc..517a7a5ce8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void resumed(Session ssn)
{
_qpidConnection = ssn.getConnection();
- try
- {
- resubscribe();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
}
public void message(Session ssn, MessageTransfer xfr)
@@ -942,6 +934,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected Long requestQueueDepth(AMQDestination amqd)
{
+ flushAcknowledgments();
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index ed2e96e83b..92e61984d2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.message;
+import java.lang.ref.SoftReference;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -38,7 +39,6 @@ import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
@@ -61,7 +61,7 @@ import org.apache.qpid.transport.ReplyTo;
*/
public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
- private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+ private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -229,22 +229,19 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
else
{
- Destination dest = _destinationCache.get(replyTo);
+ Destination dest = null;
+ SoftReference<Destination> ref = _destinationCache.get(replyTo);
+ if (ref != null)
+ {
+ dest = ref.get();
+ }
if (dest == null)
{
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- dest = generateDestination(exchange == null ? new AMQShortString("") :
- new AMQShortString(exchange),
- routingKey == null ? new AMQShortString(""):
- new AMQShortString(routingKey));
-
-
-
-
-
- _destinationCache.put(replyTo, dest);
+ dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ _destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
return dest;
@@ -291,7 +288,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
- _destinationCache.put(replyTo, destination);
+ _destinationCache.put(replyTo, new SoftReference<Destination>(destination));
_messageProps.setReplyTo(replyTo);
}