summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-26 15:53:37 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-26 15:53:37 +0000
commitad287c07e29466dd61d444f0404848f08da3c9c6 (patch)
tree8987b8c53044897fc4873af91751cfaf4d550453
parentfa80acf33b9feaf02e6050e43c259020551c32b3 (diff)
downloadqpid-python-ad287c07e29466dd61d444f0404848f08da3c9c6.tar.gz
QPID-4027 Added an experimental Failover Decorator to experiment with
providing high level failover based on "stop-the-world" concept. Added an abstraction for Failover strategy. Renamed the ConnectionStateListener to ConnectionEventListener and added a ConnectionEvent class to notify various connection events. The experimental failover implementation makes use of this. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1354073 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java61
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java (renamed from qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java)0
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java27
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java29
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java33
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java202
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java122
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java89
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java219
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java313
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java267
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java233
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java549
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java357
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java128
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java15
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java312
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java291
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java559
19 files changed, 2444 insertions, 1362 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java
new file mode 100644
index 0000000000..8e376b712d
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java
@@ -0,0 +1,61 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.internal;
+
+public class ConnectionEvent
+{
+ public enum EventType
+ {
+ OPENED,
+ CLOSED,
+ RECONNCTED,
+ RECONNCTION_ATTEMPTED,
+ RECONNCTION_FAILED,
+ CONNECTION_LOST,
+ PRE_FAILOVER,
+ POST_FAILOVER,
+ }
+
+ private ConnectionInternal _conn;
+ private EventType _type;
+ private Object _source;
+
+ public ConnectionEvent(ConnectionInternal conn, EventType type, Object source)
+ {
+ _conn = conn;
+ _type = type;
+ _source = source;
+ }
+
+ public ConnectionInternal getConnection()
+ {
+ return _conn;
+ }
+
+ public EventType getType()
+ {
+ return _type;
+ }
+
+ public Object getSource()
+ {
+ return _source;
+ }
+
+
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
index 4055c1e904..4055c1e904 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java
new file mode 100644
index 0000000000..327c499366
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java
@@ -0,0 +1,27 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.internal;
+
+import java.util.Map;
+
+public interface ConnectionString
+{
+ public String getUrl();
+
+ public Map<String, Object> getOptions();
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
new file mode 100644
index 0000000000..18ad027315
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
@@ -0,0 +1,29 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.internal;
+
+public interface FailoverStrategy
+{
+ public boolean failoverAllowed();
+
+ public ConnectionString getNextConnectionString();
+
+ public ConnectionString getCurrentConnectionString();
+
+ public void connectionAttained(ConnectionInternal conn);
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
new file mode 100644
index 0000000000..8b27cf2ac7
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
@@ -0,0 +1,33 @@
+package org.apache.qpid.messaging.internal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FailoverStrategyFactory
+{
+ private final static FailoverStrategyFactory _instance;
+ private static final Logger _logger = LoggerFactory.getLogger(FailoverStrategyFactory.class);
+
+ static
+ {
+ String className = System.getProperty("qpid.failover-factory",
+ "org.apache.qpid.messaging.util.failover.DefaultFailoverStrategyFactory"); // will default to java
+ try
+ {
+ _instance = (FailoverStrategyFactory) Class.forName(className).newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error loading failover factory class",e);
+ throw new Error("Error loading failover factory class",e);
+ }
+ }
+
+ public static FailoverStrategyFactory get()
+ {
+ return _instance;
+ }
+
+ public abstract FailoverStrategy getFailoverStrategy(ConnectionInternal con);
+
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
new file mode 100644
index 0000000000..50bc53c2b7
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
@@ -0,0 +1,202 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessageFactory;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.TransportFailureException;
+import org.apache.qpid.messaging.internal.ConnectionEvent;
+import org.apache.qpid.messaging.internal.ConnectionEvent.EventType;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractConnectionDecorator implements ConnectionInternal
+{
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected ConnectionInternal _delegate;
+ protected final Object _connectionLock;
+ protected List<ConnectionEventListener> _stateListeners = new ArrayList<ConnectionEventListener>();
+ protected Map<String, SessionInternal> _sessions = new ConcurrentHashMap<String,SessionInternal>();
+
+ protected AbstractConnectionDecorator(ConnectionInternal delegate, Object lock)
+ {
+ _delegate = delegate;
+ _connectionLock = lock;
+ }
+
+ @Override
+ public void open() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ _delegate.open();
+ }
+ }
+
+ public void reconnect(String url, Map<String,Object> options) throws TransportFailureException
+ {
+ synchronized (_connectionLock)
+ {
+ _delegate.reconnect(url,options);
+ }
+ }
+
+ @Override
+ public boolean isOpen() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ for (Session ssn : _sessions.values())
+ {
+ try
+ {
+ ssn.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Error closing session",e);
+ }
+ }
+ _sessions.clear();
+ try
+ {
+ _delegate.close();
+ }
+ catch(Exception e)
+ {
+ _logger.warn("Error closing connection",e);
+ }
+ notifyEvent(new ConnectionEvent(this,EventType.CLOSED,this));
+ }
+ }
+
+ @Override
+ public String getAuthenticatedUsername() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getAuthenticatedUsername();
+ }
+
+ @Override
+ public MessageFactory getMessageFactory() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getMessageFactory();
+ }
+
+ @Override
+ public void addConnectionEventListener(ConnectionEventListener l) throws ConnectionException
+ {
+ checkPreConditions();
+ synchronized (_connectionLock)
+ {
+ _stateListeners.add(l);
+ }
+ }
+
+ @Override
+ public void removeConnectionEventListener(ConnectionEventListener l) throws ConnectionException
+ {
+ checkPreConditions();
+ synchronized (_connectionLock)
+ {
+ _stateListeners.remove(l);
+ }
+ }
+
+
+ @Override
+ public List<SessionInternal> getSessions() throws ConnectionException
+ {
+ checkPreConditions();
+ return new ArrayList<SessionInternal>(_sessions.values());
+ }
+
+ @Override
+ public void unregisterSession(SessionInternal ssn)
+ {
+ synchronized (_connectionLock)
+ {
+ _sessions.remove(ssn.getName());
+ }
+ }
+
+ @Override
+ public void exception(TransportFailureException e, long serialNumber)
+ {
+ synchronized(_connectionLock)
+ {
+ for (ConnectionEventListener l: _stateListeners)
+ {
+ l.exception(new ConnectionException("Connection Failed",e));
+ }
+ }
+ }
+
+ @Override
+ public Object getConnectionLock()
+ {
+ return _connectionLock;
+ }
+
+ @Override
+ public String getConnectionURL()
+ {
+ return _delegate.getConnectionURL();
+ }
+
+ @Override
+ public Map<String, Object> getConnectionOptions()
+ {
+ return _delegate.getConnectionOptions();
+ }
+
+ @Override
+ public long getSerialNumber()
+ {
+ return _delegate.getSerialNumber();
+ }
+
+ protected void notifyEvent(ConnectionEvent event)
+ {
+ for (ConnectionEventListener l: _stateListeners)
+ {
+ l.eventOccured(event);
+ }
+ }
+
+ protected abstract void checkPreConditions() throws ConnectionException;
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java
new file mode 100644
index 0000000000..aa6ab50072
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java
@@ -0,0 +1,122 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractReceiverDecorator implements ReceiverInternal
+{
+ private Logger _logger = LoggerFactory.getLogger(getClass());
+
+ public enum ReceiverState {OPENED, CLOSED, ERROR};
+
+ protected ReceiverInternal _delegate;
+ protected SessionInternal _ssn;
+ protected final Object _connectionLock; // global per connection lock
+
+ public AbstractReceiverDecorator(SessionInternal ssn, ReceiverInternal delegate)
+ {
+ _ssn = ssn;
+ _delegate = delegate;
+ _connectionLock = ssn.getConnectionInternal().getConnectionLock();
+ }
+
+ @Override
+ public Message get(long timeout) throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.get(timeout);
+ }
+
+ @Override
+ public Message fetch(long timeout) throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.fetch(timeout);
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.setCapacity(capacity);
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getCapacity();
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getAvailable();
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getUnsettled();
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ _delegate.close();
+ _ssn.unregisterReceiver(this);
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _delegate.isClosed();
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkPreConditions();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ protected abstract void checkPreConditions() throws ReceiverException;
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
new file mode 100644
index 0000000000..776575ebd1
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
@@ -0,0 +1,89 @@
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.SenderException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.internal.SenderInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractSenderDecorator implements SenderInternal
+{
+ private Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected SenderInternal _delegate;
+ protected SessionInternal _ssn;
+ protected final Object _connectionLock; // global per connection lock
+
+ public AbstractSenderDecorator(SessionInternal ssn, SenderInternal delegate)
+ {
+ _ssn = ssn;
+ _delegate = delegate;
+ _connectionLock = ssn.getConnectionInternal().getConnectionLock();
+ }
+
+ @Override
+ public void send(Message message, boolean sync) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.send(message, sync);
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ _delegate.close();
+ _ssn.unregisterSender(this);
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.setCapacity(capacity);
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getCapacity();
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getAvailable();
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getUnsettled();
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkPreConditions();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ protected abstract void checkPreConditions() throws SenderException;}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
new file mode 100644
index 0000000000..f46aae63b0
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
@@ -0,0 +1,219 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
+import org.apache.qpid.messaging.internal.SenderInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractSessionDecorator implements SessionInternal
+{
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected ConnectionInternal _conn;
+ protected SessionInternal _delegate;
+ protected List<ReceiverInternal> _receivers = new ArrayList<ReceiverInternal>();
+ protected List<SenderInternal> _senders = new ArrayList<SenderInternal>();
+ protected final Object _connectionLock; // global per connection lock
+
+ public AbstractSessionDecorator(ConnectionInternal conn, SessionInternal delegate)
+ {
+ _conn = conn;
+ _delegate = delegate;
+ _connectionLock = conn.getConnectionLock();
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized(_connectionLock)
+ {
+ Sender[] senders = new Sender[_senders.size()];
+ senders = _senders.toArray(senders);
+ for (Sender sender: senders)
+ {
+ try
+ {
+ sender.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Error closing sender", e);
+ }
+ }
+ _senders.clear();
+
+ Receiver[] receivers = new Receiver[_receivers.size()];
+ receivers = _receivers.toArray(receivers);
+ for (Receiver receiver: receivers)
+ {
+ try
+ {
+ receiver.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Error closing receiver", e);
+ }
+ }
+ _receivers.clear();
+ _delegate.close();
+ _conn.unregisterSession(this);
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _delegate.isClosed();
+ }
+
+ @Override
+ public void commit() throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.commit();
+ }
+
+ @Override
+ public void rollback() throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.rollback();
+ }
+
+ @Override
+ public void acknowledge(boolean sync) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.acknowledge(sync);
+ }
+
+ @Override
+ public void acknowledge(Message message, boolean sync)
+ throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.acknowledge(message, sync);
+ }
+
+ @Override
+ public void reject(Message message) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.reject(message);
+ }
+
+ @Override
+ public void release(Message message) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.release(message);
+ }
+
+ @Override
+ public void sync(boolean block) throws MessagingException
+ {
+ checkPreConditions();
+ _delegate.sync(block);
+ }
+
+ @Override
+ public int getReceivable() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getReceivable();
+ }
+
+ @Override
+ public int getUnsettledAcks() throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.getUnsettledAcks();
+ }
+
+ @Override
+ public Receiver nextReceiver(long timeout) throws MessagingException
+ {
+ checkPreConditions();
+ return _delegate.nextReceiver(timeout);
+ }
+
+ @Override
+ public Connection getConnection() throws MessagingException
+ {
+ checkError();
+ return _conn; // always return your peer (not your delegate's peer)
+ }
+
+ @Override
+ public boolean hasError()
+ {
+ return _delegate.hasError();
+ }
+
+ @Override
+ public void checkError() throws MessagingException
+ {
+ checkPreConditions(); // check if we already have the info.
+ // Asking the delegate.
+ _delegate.checkError();
+ }
+
+ @Override
+ public ConnectionInternal getConnectionInternal()
+ {
+ return _conn;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _delegate.getName();
+ }
+
+ @Override
+ public void unregisterReceiver(ReceiverInternal receiver)
+ {
+ _receivers.remove(receiver);
+ _delegate.unregisterReceiver(receiver);
+ }
+
+ @Override
+ public void unregisterSender(SenderInternal sender)
+ {
+ _senders.remove(sender);
+ _delegate.unregisterSender(sender);
+ }
+
+ protected abstract void checkPreConditions() throws SessionException;
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
deleted file mode 100644
index ccc216e7af..0000000000
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.messaging.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.messaging.Connection;
-import org.apache.qpid.messaging.ConnectionException;
-import org.apache.qpid.messaging.MessageFactory;
-import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.Session;
-import org.apache.qpid.messaging.SessionException;
-import org.apache.qpid.messaging.internal.ConnectionInternal;
-import org.apache.qpid.messaging.internal.ConnectionStateListener;
-import org.apache.qpid.messaging.internal.SessionInternal;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Decorator that adds basic housekeeping tasks to a connection.
- * This allows the various implementations to reuse basic functions.
- * This class adds,
- * 1. Basic session mgt (tracking, default name generation ..etc)
- * 2. Connection state management.
- * 3. Error handling.
- *
- * <i> <b>Close() can be called by,</b>
- * <ol>
- * <li>The application (normal close)</li>
- * <li>By the parent if it's not null (error)</li>
- * <li>By this object if parent is null (error)</li>
- * </ol>
- * </i>
- *
- * <u>Failover</u>
- * This Decorator does not handle any failover.
- *
- * If failover is handled at a layer above then it will take appropriate action.
- * @see ConnectionFailoverDecorator for an example.
- * If failover is handled at a layer below (or no failover at all) then an exception means the connection is no longer usable.
- * Therefore this class will attempt to close the connection if the parent is null.
- */
-public class ConnectionManagementDecorator implements ConnectionInternal
-{
- private static Logger _logger = LoggerFactory.getLogger(ConnectionManagementDecorator.class);
-
- public enum ConnectionState { UNDEFINED, OPENED, CLOSED, ERROR}
-
- private ConnectionInternal _parent;
- private Connection _delegate;
- private ConnectionState _state = ConnectionState.UNDEFINED;
- private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
- private Map<String, SessionInternal> _sessions = new ConcurrentHashMap<String,SessionInternal>();
- private ConnectionException _lastException;
- private List<ConnectionStateListener> _stateListeners = new ArrayList<ConnectionStateListener>();
-
- private final Object _connectionLock;
-
- public ConnectionManagementDecorator(Connection delegate)
- {
- this(null,delegate);
- }
-
- public ConnectionManagementDecorator(ConnectionInternal parent, Connection delegate)
- {
- _delegate = delegate;
- _parent = parent;
- _connectionLock = (_parent == null) ? new Object() : _parent.getConnectionLock();
- }
-
- @Override
- public void open() throws MessagingException
- {
- // return without exception denotes success
- _delegate.open();
- synchronized (_connectionLock)
- {
- _state = ConnectionState.OPENED;
- for (ConnectionStateListener l: _stateListeners)
- {
- l.opened();
- }
- }
- }
-
- @Override
- public boolean isOpen() throws MessagingException
- {
- return _delegate.isOpen();
- }
-
- @Override
- public void close() throws MessagingException
- {
- checkClosedAndThrowException("Connection is already closed");
- synchronized(_connectionLock)
- {
- _state = ConnectionState.CLOSED;
- for (Session ssn : _sessions.values())
- {
- try
- {
- ssn.close();
- }
- catch (Exception e)
- {
- _logger.warn("Error closing session",e);
- }
- }
- _sessions.clear();
-
- for (ConnectionStateListener l: _stateListeners)
- {
- l.closed();
- }
- }
- _delegate.close();
- }
-
- @Override
- public Session createSession(String name) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- if (name == null || name.isEmpty()) { name = generateSessionName(); }
- SessionInternal ssn = new SessionManagementDecorator(this,name,_delegate.createSession(name));
- _sessions.put(name, ssn);
- return ssn;
- }
- catch(ConnectionException e)
- {
- exception(e);
- // If there is a failover handler above this it will handle it.
- // Otherwise the application gets this.
- throw new ConnectionException("Connection closed",e);
- }
- }
-
- @Override
- public Session createTransactionalSession(String name)
- throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- if (name == null || name.isEmpty()) { name = generateSessionName(); }
- SessionInternal ssn = new SessionManagementDecorator(this,name,_delegate.createTransactionalSession(name));
- _sessions.put(name, ssn);
- return ssn;
- }
- catch(ConnectionException e)
- {
- exception(e);
- // If there is a failover handler above this it will handle it.
- // Otherwise the application gets this.
- throw new ConnectionException("Connection closed",e);
- }
- }
-
- @Override
- public String getAuthenticatedUsername() throws MessagingException
- {
- checkClosedAndThrowException();
- return _delegate.getAuthenticatedUsername();
- }
-
- @Override
- public MessageFactory getMessageFactory() throws MessagingException
- {
- checkClosedAndThrowException();
- return _delegate.getMessageFactory();
- }
-
- @Override
- public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException
- {
- checkClosedAndThrowException();
- synchronized (_connectionLock)
- {
- _stateListeners.add(l);
- }
- }
-
- @Override
- public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException
- {
- checkClosedAndThrowException();
- synchronized (_connectionLock)
- {
- _stateListeners.remove(l);
- }
- }
-
- @Override
- public List<SessionInternal> getSessions() throws ConnectionException
- {
- checkClosedAndThrowException();
- return new ArrayList<SessionInternal>(_sessions.values());
- }
-
- @Override
- public void unregisterSession(SessionInternal ssn)
- {
- _sessions.remove(ssn.getName());
- }
-
- @Override // Called by the delegate or a a session created by this connection.
- public void exception(ConnectionException e)
- {
- synchronized(_connectionLock)
- {
- _state = ConnectionState.ERROR;
- if (_lastException != null)
- {
- _logger.warn("Last exception was not notified to the application", _lastException);
- }
- _lastException = e;
-
- for (ConnectionStateListener l: _stateListeners)
- {
- l.exception(_lastException);
- }
-
- if (_parent != null)
- {
- _parent.exception(e);
- }
- else
- {
- try
- {
- close();
- }
- catch(MessagingException ex)
- {
- //ignore
- }
- }
- }
- // should we clean lastException if we notify it via a listener?
- }
-
- @Override
- public Object getConnectionLock()
- {
- return _connectionLock;
- }
-
- @Override
- public void recreate() throws MessagingException
- {
- // TODO Auto-generated method stub
- }
-
- private void checkClosedAndThrowException() throws ConnectionException
- {
- checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection");
- }
-
- private void checkClosedAndThrowException(String msg) throws ConnectionException
- {
- switch (_state)
- {
- case UNDEFINED:
- case ERROR:
- throw new ConnectionException("Connection is in an error state. The connection may or may not recover from this");
- case CLOSED:
- synchronized(_connectionLock)
- {
- if(_lastException != null)
- {
- Throwable cause = _lastException;
- _lastException = null;
- throw new ConnectionException(msg, cause);
- }
- else
- {
- throw new ConnectionException(msg);
- }
- }
- default:
- break;
- }
- }
-
- private String generateSessionName()
- {
- // TODO add local IP and pid to the beginning;
- return _ssnNameGenerator.generate().toString();
- }
-}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
deleted file mode 100644
index a4e545e6f0..0000000000
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.messaging.util;
-
-import org.apache.qpid.messaging.ConnectionException;
-import org.apache.qpid.messaging.Message;
-import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.Receiver;
-import org.apache.qpid.messaging.ReceiverException;
-import org.apache.qpid.messaging.Session;
-import org.apache.qpid.messaging.SessionException;
-import org.apache.qpid.messaging.internal.ReceiverInternal;
-import org.apache.qpid.messaging.internal.SessionInternal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Decorator that adds basic housekeeping tasks to a Receiver.
- * This class adds,
- * 1. State management.
- * 2. Exception handling.
- *
- */
-public class ReceiverManagementDecorator implements ReceiverInternal
-{
- private static Logger _logger = LoggerFactory.getLogger(ReceiverManagementDecorator.class);
-
- public enum ReceiverState {OPENED, CLOSED, ERROR};
-
- private Receiver _delegate;
- private ReceiverState _state = ReceiverState.OPENED;
- private SessionInternal _ssn;
- private final Object _connectionLock; // global per connection lock
-
- public ReceiverManagementDecorator(SessionInternal ssn, Receiver delegate)
- {
- _ssn = ssn;
- _delegate = delegate;
- _connectionLock = ssn.getConnectionInternal().getConnectionLock();
- }
-
- @Override
- public Message get(long timeout) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.get(timeout);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Message fetch(long timeout) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.fetch(timeout);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void setCapacity(int capacity) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.setCapacity(capacity);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getCapacity() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getCapacity();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getAvailable() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getAvailable();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getUnsettled() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getUnsettled();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void close() throws MessagingException
- {
- checkClosedAndThrowException("Receiver is already closed");
- synchronized (_connectionLock)
- {
- _state = ReceiverState.CLOSED;
- _delegate.close();
- _ssn.unregisterReceiver(this);
- }
- }
-
- @Override
- public boolean isClosed()
- {
- return _state == ReceiverState.CLOSED;
- }
-
- @Override
- public String getName() throws MessagingException
- {
- checkClosedAndThrowException();
- return _delegate.getName();
- }
-
- @Override
- public Session getSession() throws MessagingException
- {
- checkClosedAndThrowException();
- _ssn.checkError();
- return _ssn;
- }
-
- @Override
- public void recreate() throws MessagingException
- {
- // TODO Auto-generated method stub
-
- }
-
- private void checkClosedAndThrowException() throws ReceiverException
- {
- checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver");
- }
-
- private void checkClosedAndThrowException(String closedMessage) throws ReceiverException
- {
- switch (_state)
- {
- case ERROR:
- throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this");
- case CLOSED:
- throw new ReceiverException(closedMessage);
- }
- }
-
- /**
- * A ConnectionException will cause the Session/Receiver to go into a temporary error state,
- * which prevents it from being used further.
- * From there the Session and Receiver can be moved into OPENED (if failover works) or
- * CLOSED if there is no failover or if failover has failed.
- * @param e
- * @throws MessagingException
- */
- private ReceiverException handleConnectionException(ConnectionException e)
- {
- synchronized (_connectionLock)
- {
- _state = ReceiverState.ERROR;
- _ssn.exception(e); // This might trigger failover in a layer above.
- if (_state == ReceiverState.CLOSED)
- {
- // The connection has instructed the session and it's child objects to be closed.
- // Either there was no failover, or failover has failed.
- return new ReceiverException("Receiver is closed due to connection error",e);
- }
- else
- {
- // Asking the application or the Parent handler to retry the operation.
- // The Session and Receiver should be in OPENED state at this time.
- return new ReceiverException("Receiver was in a temporary error state due to connection error." +
- "Plase retry your operation",e);
- }
- }
- }
-
- /**
- * Session Exceptions will generally invalidate the Session.
- * TODO this needs to be revisited again.
- * A new session will need to be created in that case.
- * @param e
- * @throws MessagingException
- */
- private ReceiverException handleSessionException(SessionException e)
- {
- synchronized (_connectionLock)
- {
- // This should close all receivers (including this) and senders.
- _ssn.exception(e);
- }
- return new ReceiverException("Session has been closed",e);
- }
-}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
deleted file mode 100644
index 14bd8f3b15..0000000000
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.apache.qpid.messaging.util;
-
-import org.apache.qpid.messaging.ConnectionException;
-import org.apache.qpid.messaging.Message;
-import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.ReceiverException;
-import org.apache.qpid.messaging.Sender;
-import org.apache.qpid.messaging.SenderException;
-import org.apache.qpid.messaging.Session;
-import org.apache.qpid.messaging.SessionException;
-import org.apache.qpid.messaging.internal.SenderInternal;
-import org.apache.qpid.messaging.internal.SessionInternal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Decorator that adds basic housekeeping tasks to a Sender.
- * This class adds,
- * 1. State management.
- * 2. Exception handling.
- *
- */
-public class SenderManagementDecorator implements SenderInternal
-{
- private static Logger _logger = LoggerFactory.getLogger(SenderManagementDecorator.class);
-
- public enum SenderState {OPENED, CLOSED, ERROR};
-
- private Sender _delegate;
- private SenderState _state = SenderState.OPENED;
- private SessionInternal _ssn;
- private final Object _connectionLock; // global per connection lock
-
- public SenderManagementDecorator(SessionInternal ssn, Sender delegate)
- {
- _ssn = ssn;
- _delegate = delegate;
- _connectionLock = ssn.getConnectionInternal().getConnectionLock();
- }
-
- @Override
- public void send(Message message, boolean sync) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.send(message, sync);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void close() throws MessagingException
- {
- checkClosedAndThrowException("Sender is already closed");
- synchronized (_connectionLock)
- {
- _state = SenderState.CLOSED;
- _delegate.close();
- _ssn.unregisterSender(this);
- }
- }
-
- @Override
- public void setCapacity(int capacity) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.setCapacity(capacity);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getCapacity() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getCapacity();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getAvailable() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getAvailable();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getUnsettled() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getUnsettled();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public boolean isClosed() throws MessagingException
- {
- return _state == SenderState.CLOSED;
- }
-
- @Override
- public String getName() throws MessagingException
- {
- checkClosedAndThrowException();
- return _delegate.getName();
- }
-
- @Override
- public Session getSession() throws MessagingException
- {
- checkClosedAndThrowException();
- _ssn.checkError();
- return _ssn;
- }
-
- @Override
- public void recreate() throws MessagingException
- {
- // TODO Auto-generated method stub
-
- }
-
- private void checkClosedAndThrowException() throws ReceiverException
- {
- checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver");
- }
-
- private void checkClosedAndThrowException(String closedMessage) throws ReceiverException
- {
- switch (_state)
- {
- case ERROR:
- throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this");
- case CLOSED:
- throw new ReceiverException(closedMessage);
- }
- }
-
- /**
- * A ConnectionException will cause the Session/Sender to go into a temporary error state,
- * which prevents it from being used further.
- * From there the Session and Sender can be moved into OPENED (if failover works) or
- * CLOSED if there is no failover or if failover has failed.
- * @param e
- * @throws MessagingException
- */
- private SenderException handleConnectionException(ConnectionException e)
- {
- synchronized (_connectionLock)
- {
- _state = SenderState.ERROR;
- _ssn.exception(e); // This might trigger failover in a layer above.
- if (_state == SenderState.CLOSED)
- {
- // The connection has instructed the session and it's child objects to be closed.
- // Either there was no failover, or failover has failed.
- return new SenderException("Sender is closed due to connection error",e);
- }
- else
- {
- // Asking the application or the Parent handler to retry the operation.
- // The Session and Receiver should be in OPENED state at this time.
- return new SenderException("Sender was in a temporary error state due to connection error." +
- "Plase retry your operation",e);
- }
- }
- }
-
- /**
- * Session Exceptions will generally invalidate the Session.
- * TODO this needs to be revisited again.
- * A new session will need to be created in that case.
- * @param e
- * @throws MessagingException
- */
- private SenderException handleSessionException(SessionException e)
- {
- synchronized (_connectionLock)
- {
- // This should close all senders (including this) and receivers.
- _ssn.exception(e);
- }
- return new SenderException("Session has been closed",e);
- }
-}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
deleted file mode 100644
index 1422be7207..0000000000
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
+++ /dev/null
@@ -1,549 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.messaging.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.messaging.Connection;
-import org.apache.qpid.messaging.ConnectionException;
-import org.apache.qpid.messaging.Message;
-import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.Receiver;
-import org.apache.qpid.messaging.Sender;
-import org.apache.qpid.messaging.Session;
-import org.apache.qpid.messaging.SessionException;
-import org.apache.qpid.messaging.internal.ConnectionInternal;
-import org.apache.qpid.messaging.internal.ReceiverInternal;
-import org.apache.qpid.messaging.internal.SenderInternal;
-import org.apache.qpid.messaging.internal.SessionInternal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Decorator that adds basic housekeeping tasks to a session.
- * This class adds,
- * 1. Management of receivers and senders created by this session.
- * 2. State management.
- * 3. Exception handling.
- *
- * <b>Exception Handling</b>
- * This class will wrap each method call to it's delegate to handle error situations.
- * First it will check if the session is already CLOSED or in an ERROR situation.
- * Then it will look for connection and session errors and handle as follows.
- *
- * <b>Connection Exceptions</b>
- * This class intercepts ConnectionException's and are passed onto the connection.
- * The Session will be marked as ERROR and a session exception will be thrown with an appropriate message.
- * Any further use of the session is prevented until it moves to OPENED.
- *
- * If failover is handled at a layer above, there will be a Session Decorator that
- * would handle the session exception and retry when the connection is available.
- * This handler may block the call until the state moves into either OPENED or CLOSED.
- * Ex @see SessionFailoverDecorator.
- *
- * If failover is handled at a layer below, then a connection exception means it has failed already.
- * Therefore when passed to the connection,the exception will be thrown directly to the application.
- * The connection object will be responsible for calling close on this session for the above case.
- *
- * <i> <b>Close() can be called by,</b>
- * <ol>
- * <li>The application (normal close)</li>
- * <li>By the parent via failover (error)</li>
- * <li>By the connection object, if no failover(error)</li>
- * <li>By itself if it receives and exception (error)</li>
- * </ol>
- * </i>
- *
- * <b>Session Exceptions</b>
- * For the time being, anytime a session exception is received, the session will be marked CLOSED.
- * We need to revisit this.
- */
-public class SessionManagementDecorator implements SessionInternal
-{
- private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class);
-
- public enum SessionState {OPENED, CLOSED, ERROR}
-
- private ConnectionInternal _conn;
- private Session _delegate;
- private String _name;
- SessionState _state = SessionState.OPENED;
- private List<ReceiverInternal> _receivers = new ArrayList<ReceiverInternal>();
- private List<SenderInternal> _senders = new ArrayList<SenderInternal>();
- private final Object _connectionLock; // global per connection lock
-
- public SessionManagementDecorator(ConnectionInternal conn, String name, Session delegate)
- {
- _conn = conn;
- _delegate = delegate;
- _name = name;
- _connectionLock = conn.getConnectionLock();
- }
-
- @Override
- public boolean isClosed()
- {
- return _state == SessionState.CLOSED;
- }
-
- @Override
- public void close() throws MessagingException
- {
- checkClosedAndThrowException("Session is already closed");
- synchronized(_connectionLock)
- {
- _state = SessionState.CLOSED;
- for (Sender sender: _senders)
- {
- try
- {
- sender.close();
- }
- catch (Exception e)
- {
- _logger.warn("Error closing sender", e);
- }
- }
- _senders.clear();
-
- for (Receiver receiver: _receivers)
- {
- try
- {
- receiver.close();
- }
- catch (Exception e)
- {
- _logger.warn("Error closing receiver", e);
- }
- }
- _receivers.clear();
- _delegate.close();
- _conn.unregisterSession(this);
- }
- }
-
- @Override
- public void commit() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.commit();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void rollback() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.rollback();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void acknowledge(boolean sync) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.acknowledge(sync);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void acknowledge(Message message, boolean sync)
- throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.acknowledge(message, sync);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void reject(Message message) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.reject(message);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void release(Message message) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.release(message);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void sync(boolean block) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- _delegate.sync(block);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getReceivable() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getReceivable();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public int getUnsettledAcks() throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.getUnsettledAcks();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Receiver nextReceiver(long timeout) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- return _delegate.nextReceiver(timeout);
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Sender createSender(Address address) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- SenderInternal sender = new SenderManagementDecorator(this,_delegate.createSender(address));
- _senders.add(sender);
- return sender;
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Sender createSender(String address) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- SenderInternal sender = new SenderManagementDecorator(this,_delegate.createSender(address));
- _senders.add(sender);
- return sender;
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Receiver createReceiver(Address address) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- ReceiverInternal receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address));
- _receivers.add(receiver);
- return receiver;
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Receiver createReceiver(String address) throws MessagingException
- {
- checkClosedAndThrowException();
- try
- {
- ReceiverInternal receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address));
- _receivers.add(receiver);
- return receiver;
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public Connection getConnection() throws MessagingException
- {
- checkError();
- return _conn; // always return your peer (not your delegate's peer)
- }
-
- @Override
- public boolean hasError()
- {
- return _delegate.hasError();
- }
-
- @Override
- public void checkError() throws MessagingException
- {
- checkClosedAndThrowException(); // check if we already have the info.
- try
- {
- // Asking the delegate.
- _delegate.checkError();
- }
- catch (ConnectionException e)
- {
- throw handleConnectionException(e);
- }
- catch (SessionException e)
- {
- throw handleSessionException(e);
- }
- }
-
- @Override
- public void exception(MessagingException e)
- {
- if (e instanceof ConnectionException)
- {
- handleConnectionException((ConnectionException)e);
- }
- else if (e instanceof SessionException)
- {
- handleSessionException((SessionException)e);
- }
- }
-
- @Override
- public void recreate() throws MessagingException
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public ConnectionInternal getConnectionInternal()
- {
- return _conn;
- }
-
- @Override
- public String getName()
- {
- return _name;
- }
-
- @Override
- public void unregisterReceiver(ReceiverInternal receiver)
- {
- _receivers.remove(receiver);
- }
-
- @Override
- public void unregisterSender(SenderInternal sender)
- {
- _senders.remove(sender);
- }
-
- private void checkClosedAndThrowException() throws SessionException
- {
- checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion");
- }
-
- private void checkClosedAndThrowException(String closedMessage) throws SessionException
- {
- switch (_state)
- {
- case ERROR:
- throw new SessionException("Session is in a temporary error state. The session may or may not recover from this");
- case CLOSED:
- throw new SessionException(closedMessage);
- }
- }
-
- /**
- * A ConnectionException will cause the Session to go into a temporary error state,
- * which prevents it from being used further.
- * From there the Session can be moved into OPENED (if failover works) or
- * CLOSED if there is no failover or if failover has failed.
- * @param e
- * @throws MessagingException
- */
- private SessionException handleConnectionException(ConnectionException e)
- {
- synchronized (_connectionLock)
- {
- _state = SessionState.ERROR;
- _conn.exception(e); // This might trigger failover in a layer above.
- if (_state == SessionState.CLOSED)
- {
- // The connection has instructed the session to be closed.
- // Either there was no failover, or failover has failed.
- return new SessionException("Session is closed due to connection error",e);
- }
- else
- {
- // Asking the application or the Parent handler to retry the operation.
- // The Session should be in OPENED state at this time.
- return new SessionException("Session was in a temporary error state due to connection error." +
- "Plase retry your operation",e);
- }
- }
- }
-
- /**
- * Session Exceptions will generally invalidate the Session.
- * TODO this needs to be revisited again.
- * A new session will need to be created in that case.
- * @param e
- * @throws MessagingException
- */
- private SessionException handleSessionException(SessionException e)
- {
- synchronized (_connectionLock)
- {
- try
- {
- close();
- }
- catch(MessagingException ex)
- {
- // Should not throw an exception here.
- // Even if it did, does't matter as are closing.
- }
- }
- return new SessionException("Session has been closed",e);
- }
-}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java
new file mode 100644
index 0000000000..4f7b09b5c2
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java
@@ -0,0 +1,357 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util.failover;
+
+import java.util.Map;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessageFactory;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.TransportFailureException;
+import org.apache.qpid.messaging.internal.ConnectionEvent;
+import org.apache.qpid.messaging.internal.ConnectionEvent.EventType;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ConnectionString;
+import org.apache.qpid.messaging.internal.FailoverStrategy;
+import org.apache.qpid.messaging.internal.FailoverStrategyFactory;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.apache.qpid.messaging.util.AbstractConnectionDecorator;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Closing after unsuccessful failover is not done yet!
+ *
+ */
+public class ConnectionFailoverDecorator extends AbstractConnectionDecorator
+{
+ private static Logger _logger = LoggerFactory.getLogger(ConnectionFailoverDecorator.class);
+
+ public enum ConnectionState
+ {
+ UNDEFINED,
+ OPENED,
+ CONNECTION_LOST,
+ FAILOVER_IN_PROGRESS,
+ CLOSED
+ };
+
+ private ConnectionState _state = ConnectionState.UNDEFINED;
+ private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
+ private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
+ private ConnectionException _lastException;
+ private FailoverStrategy _failoverStrategy;
+ private long _serialNumber = 0;
+
+ public ConnectionFailoverDecorator(ConnectionInternal delegate, Object lock)
+ {
+ super(delegate,lock);
+ _failoverStrategy = FailoverStrategyFactory.get().getFailoverStrategy(delegate);
+ }
+
+ @Override
+ public void open() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ super.open();
+ _serialNumber = getSerialNumber();
+ _state = ConnectionState.OPENED;
+ notifyEvent(new ConnectionEvent(this,EventType.OPENED,this));
+ }
+ }
+
+ @Override
+ public boolean isOpen() throws MessagingException
+ {
+ // If the state is opened, query the delegate and see.
+ if (_state == ConnectionState.OPENED)
+ {
+ return super.isOpen();
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public void reconnect(String url,Map<String,Object> options) throws TransportFailureException
+ {
+ synchronized (_connectionLock)
+ {
+ super.reconnect(url,options);
+ _serialNumber = getSerialNumber();
+ _state = ConnectionState.OPENED;
+ notifyEvent(new ConnectionEvent(this,EventType.RECONNCTED,this));
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized(_connectionLock)
+ {
+ if (_state == ConnectionState.CLOSED)
+ {
+ throw new MessagingException("Connection is already closed");
+ }
+ super.close();
+ _state = ConnectionState.CLOSED;
+ }
+ }
+
+ @Override
+ public Session createSession(String name) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _serialNumber; // take a snapshot
+ try
+ {
+ if (name == null || name.isEmpty()) { name = generateSessionName(); }
+ SessionInternal ssn = new SessionFailoverDecorator(this,
+ (SessionInternal) _delegate.createSession(name));
+ _sessions.put(name, ssn);
+ return ssn;
+ }
+ catch(TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return createSession(name);
+ }
+ }
+
+ @Override
+ public Session createTransactionalSession(String name) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _serialNumber; // take a snapshot
+ try
+ {
+ if (name == null || name.isEmpty()) { name = generateSessionName(); }
+ SessionInternal ssn = new SessionFailoverDecorator(this,
+ (SessionInternal)_delegate.createTransactionalSession(name));
+ _sessions.put(name, ssn);
+ return ssn;
+ }
+ catch(TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return createTransactionalSession(name);
+ }
+ }
+
+ @Override
+ public String getAuthenticatedUsername() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _serialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getAuthenticatedUsername();
+ }
+ catch(TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getAuthenticatedUsername();
+ }
+ }
+
+ @Override
+ public MessageFactory getMessageFactory() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _serialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getMessageFactory();
+ }
+ catch(TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getMessageFactory();
+ }
+ }
+
+ @Override
+ public void exception(TransportFailureException e, long serialNumber)
+ {
+ try
+ {
+ failover(e,serialNumber);
+ }
+ catch(ConnectionException ex)
+ {
+ //ignore.
+ //failover() handles notifications
+ }
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ synchronized(_connectionLock)
+ {
+ for (SessionInternal ssn: _sessions.values())
+ {
+ ssn.recreate();
+ }
+ }
+ }
+
+ protected void checkPreConditions() throws ConnectionException
+ {
+ switch (_state)
+ {
+ case CLOSED:
+ throw new ConnectionException("Connection is closed. You cannot invoke methods on a closed connection");
+ case UNDEFINED:
+ throw new ConnectionException("Connection should be opened before it can be used");
+ case CONNECTION_LOST:
+ case FAILOVER_IN_PROGRESS:
+ waitForFailoverToComplete();
+ }
+ }
+
+ protected void failover(TransportFailureException e,long serialNumber) throws ConnectionException
+ {
+ synchronized(_connectionLock)
+ {
+ if (_serialNumber > serialNumber)
+ {
+ return; // Ignore, We have a working connection now.
+ }
+
+ _logger.warn("Connection lost!");
+ _state = ConnectionState.CONNECTION_LOST;
+ notifyEvent(new ConnectionEvent(this,EventType.CONNECTION_LOST,this));
+
+ if (_failoverStrategy.failoverAllowed())
+ {
+ // Failover is allowed at least once.
+ _state = ConnectionState.FAILOVER_IN_PROGRESS;
+ notifyEvent(new ConnectionEvent(this,EventType.PRE_FAILOVER,this));
+
+
+
+ StringBuffer errorMsg = new StringBuffer();
+ while (_failoverStrategy.failoverAllowed())
+ {
+ try
+ {
+ ConnectionString conString = _failoverStrategy.getNextConnectionString();
+ notifyEvent(new ConnectionEvent(this,EventType.RECONNCTION_ATTEMPTED,this));
+ _logger.warn("Attempting connection to " + conString.getUrl());
+ reconnect(conString.getUrl(), conString.getOptions());
+ try
+ {
+ recreate();
+ _state = ConnectionState.OPENED;
+ _lastException = null;
+ }
+ catch (MessagingException ex)
+ {
+ _lastException = new ConnectionException(
+ "Recreating the state for the connection and it's children failed",
+ ex);
+ }
+ break;
+ }
+ catch (TransportFailureException te)
+ {
+ errorMsg.append("\nUnable to connect to : " +
+ _failoverStrategy.getCurrentConnectionString().getUrl() +
+ " due to : " +
+ te.getMessage());
+ notifyEvent(new ConnectionEvent(this,EventType.RECONNCTION_FAILED,this));
+ }
+ }
+
+ if (_state != ConnectionState.OPENED)
+ {
+ closeInternal();
+ _lastException = new ConnectionException("Failover was unsuccessful." + errorMsg.toString());
+ _logger.warn("Faiolver was unsuccesful" + errorMsg.toString());
+ }
+ notifyEvent(new ConnectionEvent(this,EventType.POST_FAILOVER,this));
+ }
+ else
+ {
+ closeInternal();
+ _state = ConnectionState.CLOSED;
+ _lastException = new ConnectionException("Connection Failed!",e);
+ _logger.warn("Connection Failed!", e);
+ }
+
+ _connectionLock.notifyAll();
+
+ if (_lastException != null)
+ {
+ for (ConnectionEventListener l: _stateListeners)
+ {
+ l.exception(_lastException);
+ }
+ throw _lastException;
+ }
+ }
+ }
+
+ protected void waitForFailoverToComplete() throws ConnectionException
+ {
+ synchronized (_connectionLock)
+ {
+ try
+ {
+ _connectionLock.wait(_failoverTimeout);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+ if (_state == ConnectionState.CLOSED)
+ {
+ throw new ConnectionException("Connection is closed. Failover was unsuccesfull",_lastException);
+ }
+ }
+ }
+
+ private String generateSessionName()
+ {
+ // TODO add local IP and pid to the beginning;
+ return _ssnNameGenerator.generate().toString();
+ }
+
+ // Suppresses the exceptions
+ private void closeInternal()
+ {
+ try
+ {
+ close();
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
+
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
new file mode 100644
index 0000000000..fb6bf9e1b6
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
@@ -0,0 +1,128 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util.failover;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ConnectionString;
+import org.apache.qpid.messaging.internal.FailoverStrategy;
+
+public class DefaultFailoverStrategy implements FailoverStrategy
+{
+ ConnectionString[] _reconnectURLs;
+
+ /** seconds (give up and report failure after specified time) */
+ private long _reconnectTimeout = 1000;
+
+ /** n (give up and report failure after specified number of attempts) */
+ private int _reconnectLimit = 1;
+
+ /** seconds (initial delay between failed reconnection attempts) */
+ private long _reconnectIntervalMin = 1000;
+
+ /** seconds (maximum delay between failed reconnection attempts) */
+ private long _reconnectIntervalMax = 1000;
+
+ private ConnectionString _currentUrl;
+
+ /** reconnection attemps */
+ private int _attempts = 0;
+
+ /** Index for retrieving a URL from _reconnectURLs */
+ private int _index = 0;
+
+ /* quick and dirty impl for experimentation */
+ public DefaultFailoverStrategy(String url, Map<String,Object> map)
+ {
+ map = map == null ? Collections.EMPTY_MAP : map;
+ _currentUrl = new ConnectionStringImpl(url,map);
+ // read the map and fill in the above fields.
+ if (map.containsKey("reconnect_urls"))
+ {
+ String[] urls = ((String)map.get("reconnect_urls")).split(",");
+ _reconnectURLs = new ConnectionString[urls.length];
+ for(int i=0; i<urls.length; i++)
+ {
+ _reconnectURLs[i] = new ConnectionStringImpl(urls[i],map);
+ }
+ }
+ else
+ {
+ _reconnectURLs = new ConnectionString[]{_currentUrl};
+ }
+ _reconnectLimit = _reconnectURLs.length;
+ }
+
+ @Override
+ public boolean failoverAllowed()
+ {
+ return (_attempts < _reconnectLimit);
+ }
+
+ @Override
+ public ConnectionString getNextConnectionString()
+ {
+ // quick implementation for experimentation, ignoring timeouts, reconnect intervals etc..
+ // the _index will wrap around like a circular buffer
+ _attempts++;
+ if (_index == _reconnectURLs.length)
+ {
+ _index = 0;
+ }
+ _currentUrl = _reconnectURLs[_index++];
+ return _currentUrl;
+ }
+
+ @Override
+ public ConnectionString getCurrentConnectionString()
+ {
+ return _currentUrl;
+ }
+
+ @Override
+ public void connectionAttained(ConnectionInternal conn)
+ {
+ _attempts = 0;
+ }
+
+ class ConnectionStringImpl implements ConnectionString
+ {
+ private final String _url;
+ private final Map<String, Object> _options;
+
+ public ConnectionStringImpl(String url, Map<String, Object> options)
+ {
+ _url = url;
+ _options = options;
+ }
+
+ public String getUrl()
+ {
+ return _url;
+ }
+
+ public Map<String, Object> getOptions()
+ {
+ return _options;
+ }
+
+ }
+} \ No newline at end of file
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
new file mode 100644
index 0000000000..b3e2aa02c9
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
@@ -0,0 +1,15 @@
+package org.apache.qpid.messaging.util.failover;
+
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.FailoverStrategy;
+import org.apache.qpid.messaging.internal.FailoverStrategyFactory;
+
+public class DefaultFailoverStrategyFactory extends FailoverStrategyFactory
+{
+
+ @Override
+ public FailoverStrategy getFailoverStrategy(ConnectionInternal con)
+ {
+ return new DefaultFailoverStrategy(con.getConnectionURL(), con.getConnectionOptions());
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
new file mode 100644
index 0000000000..5162add0ad
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
@@ -0,0 +1,312 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util.failover;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
+import org.apache.qpid.messaging.internal.ConnectionEvent;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.apache.qpid.messaging.util.AbstractReceiverDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Receiver.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class ReceiverFailoverDecorator extends AbstractReceiverDecorator implements ConnectionEventListener
+{
+ private static Logger _logger = LoggerFactory.getLogger(ReceiverFailoverDecorator.class);
+
+ public enum ReceiverState {OPENED, CLOSED, FAILOVER_IN_PROGRESS};
+
+ private ReceiverState _state = ReceiverState.OPENED;
+ private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
+ private ReceiverException _lastException;
+ private long _connSerialNumber = 0;
+
+ public ReceiverFailoverDecorator(SessionInternal ssn, ReceiverInternal delegate)
+ {
+ super(ssn,delegate);
+ synchronized(_connectionLock)
+ {
+ _connSerialNumber = ssn.getConnectionInternal().getSerialNumber();
+ }
+ }
+
+ @Override
+ public Message get(long timeout) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.get(timeout);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return get(timeout);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Message fetch(long timeout) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.fetch(timeout);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return fetch(timeout);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.setCapacity(capacity);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ setCapacity(capacity);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getCapacity();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getCapacity();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getAvailable();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getAvailable();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getUnsettled();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getUnsettled();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ if (_state == ReceiverState.CLOSED)
+ {
+ throw new MessagingException("Receiver is already closed");
+ }
+ _state = ReceiverState.CLOSED;
+ super.close();
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _state == ReceiverState.CLOSED;
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkPreConditions();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ synchronized(_connectionLock)
+ {
+ _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber();
+ _delegate.recreate();
+ }
+ }
+
+ @Override
+ public void eventOccured(ConnectionEvent event)
+ {
+ synchronized (_connectionLock)
+ {
+ switch(event.getType())
+ {
+ case PRE_FAILOVER:
+ case CONNECTION_LOST:
+ _state = ReceiverState.FAILOVER_IN_PROGRESS;
+ break;
+ case RECONNCTED:
+ _state = ReceiverState.OPENED;
+ break;
+ case POST_FAILOVER:
+ try
+ {
+ if (_state != ReceiverState.OPENED)
+ {
+ close();
+ }
+ }
+ catch (MessagingException e)
+ {
+ _logger.warn("Exception when trying to close the receiver", e);
+ }
+ _connectionLock.notifyAll();
+ break;
+ default:
+ break; //ignore the rest
+ }
+ }
+ }
+
+ @Override // From ConnectionEventListener
+ public void exception(ConnectionException e)
+ {// NOOP
+ }
+
+ protected void checkPreConditions() throws ReceiverException
+ {
+ switch (_state)
+ {
+ case CLOSED:
+ throw new ReceiverException("Receiver is closed. You cannot invoke methods on a closed Receiver",_lastException);
+ case FAILOVER_IN_PROGRESS:
+ waitForFailoverToComplete();
+ }
+ }
+
+ protected void waitForFailoverToComplete() throws ReceiverException
+ {
+ synchronized (_connectionLock)
+ {
+ try
+ {
+ _connectionLock.wait(_failoverTimeout);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+ if (_state == ReceiverState.CLOSED)
+ {
+ throw new ReceiverException("Receiver is closed. Failover was unsuccesfull",_lastException);
+ }
+ }
+ }
+
+ protected ReceiverException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ // This should close all receivers (including this) and senders.
+ _ssn.exception(e);
+ }
+ return new ReceiverException("Session has been closed",e);
+ }
+
+ protected void failover(TransportFailureException e, long serialNumber) throws ReceiverException
+ {
+ synchronized (_connectionLock)
+ {
+ if (_connSerialNumber > serialNumber)
+ {
+ return; // ignore, we already have failed over.
+ }
+ _state = ReceiverState.FAILOVER_IN_PROGRESS;
+ _ssn.exception(e, serialNumber); // This triggers failover.
+ waitForFailoverToComplete();
+ }
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
new file mode 100644
index 0000000000..8df574a74d
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
@@ -0,0 +1,291 @@
+package org.apache.qpid.messaging.util.failover;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.SenderException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
+import org.apache.qpid.messaging.internal.ConnectionEvent;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.SenderInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.apache.qpid.messaging.util.AbstractSenderDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Sender.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ * 3. Failover
+ *
+ */
+public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener
+{
+ private static Logger _logger = LoggerFactory.getLogger(SenderFailoverDecorator.class);
+
+ public enum SenderState {OPENED, CLOSED, FAILOVER_IN_PROGRESS};
+
+ private SenderState _state = SenderState.OPENED;
+ private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
+ private ReceiverException _lastException;
+ private long _connSerialNumber = 0;
+
+ public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate)
+ {
+ super(ssn,delegate);
+ synchronized(_connectionLock)
+ {
+ _connSerialNumber = ssn.getConnectionInternal().getSerialNumber();
+ }
+ }
+
+ @Override
+ public void send(Message message, boolean sync) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.send(message, sync);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ send(message, sync);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ if (_state == SenderState.CLOSED)
+ {
+ throw new MessagingException("Sender is already closed");
+ }
+ _state = SenderState.CLOSED;
+ super.close();
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.setCapacity(capacity);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ setCapacity(capacity);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getCapacity();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getCapacity();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getAvailable();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getAvailable();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getUnsettled();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getUnsettled();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public boolean isClosed() throws MessagingException
+ {
+ return _state == SenderState.CLOSED;
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkPreConditions();
+ return getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkPreConditions();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ synchronized(_connectionLock)
+ {
+ _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber();
+ _delegate.recreate();
+ }
+ }
+
+ @Override
+ public void eventOccured(ConnectionEvent event)
+ {
+ synchronized (_connectionLock)
+ {
+ switch(event.getType())
+ {
+ case PRE_FAILOVER:
+ case CONNECTION_LOST:
+ _state = SenderState.FAILOVER_IN_PROGRESS;
+ break;
+ case RECONNCTED:
+ _state = SenderState.OPENED;
+ break;
+ case POST_FAILOVER:
+ try
+ {
+ if (_state != SenderState.OPENED)
+ {
+ close();
+ }
+ }
+ catch (MessagingException e)
+ {
+ _logger.warn("Exception when trying to close the receiver", e);
+ }
+ _connectionLock.notifyAll();
+ break;
+ default:
+ break; //ignore the rest
+ }
+ }
+ }
+
+ @Override // From ConnectionEventListener
+ public void exception(ConnectionException e)
+ {// NOOP
+ }
+
+ protected void waitForFailoverToComplete() throws SenderException
+ {
+ synchronized (_connectionLock)
+ {
+ try
+ {
+ _connectionLock.wait(_failoverTimeout);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+ if (_state == SenderState.CLOSED)
+ {
+ throw new SenderException("Receiver is closed. Failover was unsuccesfull",_lastException);
+ }
+ }
+ }
+
+ protected void failover(TransportFailureException e, long serialNumber) throws SenderException
+ {
+ synchronized (_connectionLock)
+ {
+ if (_connSerialNumber > serialNumber)
+ {
+ return; // ignore, we already have failed over.
+ }
+ _state = SenderState.FAILOVER_IN_PROGRESS;
+ _ssn.exception(e, serialNumber); // This triggers failover.
+ waitForFailoverToComplete();
+ }
+ }
+
+ protected void checkPreConditions() throws SenderException
+ {
+ switch (_state)
+ {
+ case CLOSED:
+ throw new SenderException("Sender is closed. You cannot invoke methods on a closed sender",_lastException);
+ case FAILOVER_IN_PROGRESS:
+ waitForFailoverToComplete();
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ protected SenderException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ // This should close all senders (including this) and receivers.
+ _ssn.exception(e);
+ }
+ return new SenderException("Session has been closed",e);
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
new file mode 100644
index 0000000000..5f2bafa1e6
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
@@ -0,0 +1,559 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util.failover;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
+import org.apache.qpid.messaging.internal.ConnectionEvent;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
+import org.apache.qpid.messaging.internal.SenderInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
+import org.apache.qpid.messaging.util.AbstractSessionDecorator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionFailoverDecorator extends AbstractSessionDecorator implements ConnectionEventListener
+{
+ private static Logger _logger = LoggerFactory.getLogger(SessionFailoverDecorator.class);
+
+ public enum SessionState {OPENED, CLOSED, FAILOVER_IN_PROGRESS}
+
+ private SessionState _state = SessionState.OPENED;
+ private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
+ private SessionException _lastException;
+ private long _connSerialNumber = 0;
+
+ public SessionFailoverDecorator(ConnectionInternal conn, SessionInternal delegate)
+ {
+ super(conn,delegate);
+ synchronized(_connectionLock)
+ {
+ _connSerialNumber = conn.getSerialNumber();
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ synchronized(_connectionLock)
+ {
+ if (_state == SessionState.CLOSED)
+ {
+ throw new MessagingException("Session is already closed");
+ }
+ _state = SessionState.CLOSED;
+ super.close();
+ }
+ }
+
+ @Override
+ public void commit() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.commit();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ commit();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void rollback() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.rollback();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ rollback();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge(boolean sync) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.acknowledge(sync);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ acknowledge(sync);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge(Message message, boolean sync)
+ throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.acknowledge(message,sync);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ acknowledge(message,sync);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void reject(Message message) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.reject(message);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ reject(message);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void release(Message message) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.release(message);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ release(message);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void sync(boolean block) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ _delegate.sync(block);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ sync(block);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getReceivable() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getReceivable();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getReceivable();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettledAcks() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.getUnsettledAcks();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return getUnsettledAcks();
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver nextReceiver(long timeout) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ return _delegate.nextReceiver(timeout);
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return nextReceiver(timeout);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Sender createSender(Address address) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ SenderInternal sender = new SenderFailoverDecorator(this,
+ (SenderInternal) _delegate.createSender(address));
+ synchronized (_connectionLock)
+ {
+ _senders.add(sender);
+ }
+ return sender;
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return createSender(address);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Sender createSender(String address) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ SenderInternal sender = new SenderFailoverDecorator(this,
+ (SenderInternal) _delegate.createSender(address));
+ synchronized (_connectionLock)
+ {
+ _senders.add(sender);
+ }
+ return sender;
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return createSender(address);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver createReceiver(Address address) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ ReceiverInternal receiver = new ReceiverFailoverDecorator(this,
+ (ReceiverInternal) _delegate.createReceiver(address));
+ synchronized (_connectionLock)
+ {
+ _receivers.add(receiver);
+ }
+ return receiver;
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return createReceiver(address);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver createReceiver(String address) throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot
+ try
+ {
+ ReceiverInternal receiver = new ReceiverFailoverDecorator(this,
+ (ReceiverInternal) _delegate.createReceiver(address));
+ synchronized (_connectionLock)
+ {
+ _receivers.add(receiver);
+ }
+ return receiver;
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber);
+ return createReceiver(address);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void checkError() throws MessagingException
+ {
+ checkPreConditions();
+ long serialNumber = _connSerialNumber; // take a snapshot // check if we already have the info.
+ try
+ {
+ // Asking the delegate.
+ _delegate.checkError();
+ }
+ catch (TransportFailureException e)
+ {
+ failover(e,serialNumber); // will throw an exception
+ return;
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ if (_state == SessionState.OPENED)
+ {
+ return super.isClosed(); // ask the delegate to be sure.
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ @Override // From SessionInternal
+ public void exception(TransportFailureException e, long serialNumber)
+ {
+ try
+ {
+ failover((TransportFailureException)e, serialNumber);
+ }
+ catch (SessionException ex)
+ {
+ _lastException = ex;
+ }
+ }
+
+ public void exception(SessionException e)
+ {
+ handleSessionException(e);
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ synchronized (_connectionLock)
+ {
+ _connSerialNumber = _conn.getSerialNumber();
+ _delegate.recreate();
+ for (ReceiverInternal rec : _receivers)
+ {
+ rec.recreate();
+ }
+ for (SenderInternal sender : _senders)
+ {
+ sender.recreate();
+ }
+ }
+ }
+
+ @Override
+ public ConnectionInternal getConnectionInternal()
+ {
+ return _conn;
+ }
+
+ @Override //From ConnectionEventListener
+ public void exception(ConnectionException e)
+ {
+ // NOOP
+ }
+
+ @Override //From ConnectionEventListener
+ public void eventOccured(ConnectionEvent event)
+ {
+ synchronized (_connectionLock)
+ {
+ switch(event.getType())
+ {
+ case PRE_FAILOVER:
+ case CONNECTION_LOST:
+ _state = SessionState.FAILOVER_IN_PROGRESS;
+ break;
+ case RECONNCTED:
+ _state = SessionState.OPENED;
+ break;
+ case POST_FAILOVER:
+ try
+ {
+ if (_state != SessionState.OPENED)
+ {
+ close();
+ }
+ }
+ catch (MessagingException e)
+ {
+ _logger.warn("Exception when trying to close the session", e);
+ }
+ _connectionLock.notifyAll();
+ break;
+ default:
+ break; //ignore the rest
+ }
+ }
+ }
+
+ protected void failover(TransportFailureException e, long serialNumber) throws SessionException
+ {
+ synchronized (_connectionLock)
+ {
+ if (_connSerialNumber > serialNumber)
+ {
+ return; // ignore, we already have failed over.
+ }
+ _state = SessionState.FAILOVER_IN_PROGRESS;
+ _conn.exception(e, serialNumber); // This triggers failover.
+ waitForFailoverToComplete();
+ }
+ }
+
+ protected void checkPreConditions() throws SessionException
+ {
+ switch (_state)
+ {
+ case CLOSED:
+ throw new SessionException("Session is closed. You cannot invoke methods on a closed session",_lastException);
+ case FAILOVER_IN_PROGRESS:
+ waitForFailoverToComplete();
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ protected SessionException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ try
+ {
+ close();
+ }
+ catch(MessagingException ex)
+ {
+ _logger.warn("Error when closing session : " + getName(), ex);
+ }
+ }
+ return new SessionException("Session has been closed",e);
+ }
+
+ protected void waitForFailoverToComplete() throws SessionException
+ {
+ synchronized (_connectionLock)
+ {
+ try
+ {
+ _connectionLock.wait(_failoverTimeout);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+ if (_state == SessionState.CLOSED)
+ {
+ throw new SessionException("Session is closed. Failover was unsuccesfull",_lastException);
+ }
+ }
+ }
+}