summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java46
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java29
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java285
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java (renamed from qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java)0
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java477
5 files changed, 837 insertions, 0 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
new file mode 100644
index 0000000000..557c3c0fe7
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
@@ -0,0 +1,46 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import java.util.List;
+
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Session;
+
+/**
+ * An extended interface meant for API implementors.
+ */
+public interface ConnectionExt extends Connection
+{
+ public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
+
+ public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
+
+ public List<Session> getSessions() throws ConnectionException;
+
+ public void exception(ConnectionException e);
+
+ /**
+ * The per connection lock that is used by the connection
+ * and it's child objects. A single lock is used to prevent
+ * deadlocks that could occur with having multiple locks,
+ * perhaps at the cost of a minor perf degradation.
+ */
+ public Object getConnectionLock();
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java
new file mode 100644
index 0000000000..e1af2479c5
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java
@@ -0,0 +1,29 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.ConnectionException;
+
+public interface ConnectionStateListener
+{
+ public void exception(ConnectionException e);
+
+ public void opened();
+
+ public void closed();
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
new file mode 100644
index 0000000000..5200e5081d
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
@@ -0,0 +1,285 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.apache.qpid.messaging.ext.ConnectionStateListener;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a connection.
+ * This allows the various implementations to reuse basic functions.
+ * This class adds,
+ * 1. Basic session mgt (tracking, default name generation ..etc)
+ * 2. Connection state management.
+ * 3. Error handling.
+ *
+ * <i> <b>Close() can be called by,</b>
+ * <ol>
+ * <li>The application (normal close)</li>
+ * <li>By the parent if it's not null (error)</li>
+ * <li>By this object if parent is null (error)</li>
+ * </ol>
+ * </i>
+ *
+ * <u>Failover</u>
+ * This Decorator does not handle any failover.
+ *
+ * If failover is handled at a layer above then it will take appropriate action.
+ * @see ConnectionFailoverDecorator for an example.
+ * If failover is handled at a layer below (or no failover at all) then an exception means the connection is no longer usable.
+ * Therefore this class will attempt to close the connection if the parent is null.
+ */
+public class ConnectionManagementDecorator implements ConnectionExt
+{
+ private static Logger _logger = LoggerFactory.getLogger(ConnectionManagementDecorator.class);
+
+ public enum ConnectionState { UNDEFINED, OPENED, CLOSED, ERROR}
+
+ private ConnectionExt _parent;
+ private Connection _delegate;
+ private ConnectionState _state = ConnectionState.UNDEFINED;
+ private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
+ private Map<String, Session> _sessions = new ConcurrentHashMap<String,Session>();
+ private ConnectionException _lastException;
+ private List<ConnectionStateListener> _stateListeners = new ArrayList<ConnectionStateListener>();
+
+ private final Object _connectionLock;
+
+ public ConnectionManagementDecorator(Connection delegate)
+ {
+ this(null,delegate);
+ }
+
+ public ConnectionManagementDecorator(ConnectionExt parent, Connection delegate)
+ {
+ _delegate = delegate;
+ _parent = parent;
+ _connectionLock = (_parent == null) ? new Object() : _parent.getConnectionLock();
+ }
+
+ @Override
+ public void open() throws MessagingException
+ {
+ // return without exception denotes success
+ _delegate.open();
+ synchronized (_connectionLock)
+ {
+ _state = ConnectionState.OPENED;
+ for (ConnectionStateListener l: _stateListeners)
+ {
+ l.opened();
+ }
+ }
+ }
+
+ @Override
+ public boolean isOpen() throws MessagingException
+ {
+ return _delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Connection is already closed");
+ synchronized(_connectionLock)
+ {
+ _state = ConnectionState.CLOSED;
+ for (Session ssn : _sessions.values())
+ {
+ ssn.close();
+ }
+ _sessions.clear();
+
+ for (ConnectionStateListener l: _stateListeners)
+ {
+ l.closed();
+ }
+ }
+ _delegate.close();
+ }
+
+ @Override
+ public Session createSession(String name) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ if (name == null || name.isEmpty()) { name = generateSessionName(); }
+ Session ssn = new SessionManagementDecorator(this,_delegate.createSession(name));
+ _sessions.put(name, ssn);
+ return ssn;
+ }
+ catch(ConnectionException e)
+ {
+ exception(e);
+ // If there is a failover handler above this it will handle it.
+ // Otherwise the application gets this.
+ throw new ConnectionException("Connection closed",e);
+ }
+ }
+
+ @Override
+ public Session createTransactionalSession(String name)
+ throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ if (name == null || name.isEmpty()) { name = generateSessionName(); }
+ Session ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
+ _sessions.put(name, ssn);
+ return ssn;
+ }
+ catch(ConnectionException e)
+ {
+ exception(e);
+ // If there is a failover handler above this it will handle it.
+ // Otherwise the application gets this.
+ throw new ConnectionException("Connection closed",e);
+ }
+ }
+
+ @Override
+ public String getAuthenticatedUsername() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ return _delegate.getAuthenticatedUsername();
+ }
+
+ @Override
+ public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException
+ {
+ checkClosedAndThrowException();
+ synchronized (_connectionLock)
+ {
+ _stateListeners.add(l);
+ }
+ }
+
+ @Override
+ public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException
+ {
+ checkClosedAndThrowException();
+ synchronized (_connectionLock)
+ {
+ _stateListeners.remove(l);
+ }
+ }
+
+ @Override
+ public List<Session> getSessions() throws ConnectionException
+ {
+ checkClosedAndThrowException();
+ return new ArrayList<Session>(_sessions.values());
+ }
+
+ @Override // Called by the delegate or a a session created by this connection.
+ public void exception(ConnectionException e)
+ {
+ synchronized(_connectionLock)
+ {
+ _state = ConnectionState.ERROR;
+ if (_lastException != null)
+ {
+ _logger.warn("Last exception was not notified to the application", _lastException);
+ }
+ _lastException = e;
+
+ for (ConnectionStateListener l: _stateListeners)
+ {
+ l.exception(_lastException);
+ }
+
+ if (_parent != null)
+ {
+ _parent.exception(e);
+ }
+ else
+ {
+ try
+ {
+ close();
+ }
+ catch(MessagingException ex)
+ {
+ //ignore
+ }
+ }
+ }
+ // should we clean lastException if we notify it via a listener?
+ }
+
+ @Override
+ public Object getConnectionLock()
+ {
+ return _connectionLock;
+ }
+
+ private void checkClosedAndThrowException() throws ConnectionException
+ {
+ checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection");
+ }
+
+ private void checkClosedAndThrowException(String msg) throws ConnectionException
+ {
+ switch (_state)
+ {
+ case UNDEFINED:
+ case ERROR:
+ throw new ConnectionException("Connection is in an error state. The connection may or may not recover from this");
+ case CLOSED:
+ synchronized(_connectionLock)
+ {
+ if(_lastException != null)
+ {
+ Throwable cause = _lastException;
+ _lastException = null;
+ throw new ConnectionException(msg, cause);
+ }
+ else
+ {
+ throw new ConnectionException(msg);
+ }
+ }
+ default:
+ break;
+ }
+ }
+
+ private String generateSessionName()
+ {
+ // TODO add local IP and pid to the beginning;
+ return _ssnNameGenerator.generate().toString();
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java
index 0dfd6e735d..0dfd6e735d 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
new file mode 100644
index 0000000000..027e9b9605
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
@@ -0,0 +1,477 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a session.
+ * This class adds,
+ * 1. Management of receivers and senders created by this session.
+ * 2. State management.
+ * 3. Exception handling.
+ *
+ * <b>Exception Handling</b>
+ * This class will wrap each method call to it's delegate to handle error situations.
+ * First it will check if the session is already CLOSED or in an ERROR situation.
+ * Then it will look for connection and session errors and handle as follows.
+ *
+ * <b>Connection Exceptions</b>
+ * This class intercepts ConnectionException's and are passed onto the connection.
+ * The Session will be marked as ERROR and a session exception will be thrown with an appropriate message.
+ * Any further use of the session is prevented until it moves to OPENED.
+ *
+ * If failover is handled at a layer above, there will be a Session Decorator that
+ * would handle the session exception and retry when the connection is available.
+ * This handler may block the call until the state moves into either OPENED or CLOSED.
+ * Ex @see SessionFailoverDecorator.
+ *
+ * If failover is handled at a layer below, then a connection exception means it has failed already.
+ * Therefore when passed to the connection,the exception will be thrown directly to the application.
+ * The connection object will be responsible for calling close on this session for the above case.
+ *
+ * <i> <b>Close() can be called by,</b>
+ * <ol>
+ * <li>The application (normal close)</li>
+ * <li>By the parent via failover (error)</li>
+ * <li>By the connection object, if not failover(error)</li>
+ * </ol>
+ * </i>
+ *
+ * <b>Session Exceptions</b>
+ * For the time being, anytime a session exception is received, the session will be marked CLOSED.
+ * We need to revisit this.
+ */
+public class SessionManagementDecorator implements Session
+{
+ private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class);
+
+ public enum SessionState { UNDEFINED, OPENED, CLOSED, ERROR}
+
+ private ConnectionExt _conn;
+ private Session _delegate;
+ SessionState _state = SessionState.UNDEFINED;
+ private List<Receiver> _receivers = new ArrayList<Receiver>();
+ private List<Sender> _senders = new ArrayList<Sender>();
+ private final Object _connectionLock; // global per connection lock
+
+ public SessionManagementDecorator(ConnectionExt conn, Session delegate)
+ {
+ _conn = conn;
+ _delegate = delegate;
+ _connectionLock = conn.getConnectionLock();
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _state == SessionState.CLOSED;
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Session is already closed");
+ synchronized(_connectionLock)
+ {
+ _state = SessionState.CLOSED;
+ for (Sender sender: _senders)
+ {
+ sender.close();
+ }
+ _senders.clear();
+
+ for (Receiver receiver: _receivers)
+ {
+ receiver.close();
+ }
+ _receivers.clear();
+ _delegate.close();
+ }
+ }
+
+ @Override
+ public void commit() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.commit();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void rollback() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.rollback();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge(boolean sync) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.acknowledge(sync);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge(Message message, boolean sync)
+ throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.acknowledge(message, sync);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void reject(Message message) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.reject(message);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void release(Message message) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.release(message);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void sync(boolean block) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.sync(block);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getReceivable() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getReceivable();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettledAcks() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getUnsettledAcks();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver nextReceiver(long timeout) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.nextReceiver(timeout);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Sender createSender(Address address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Sender sender = _delegate.createSender(address);
+ _senders.add(sender);
+ return sender;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Sender createSender(String address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Sender sender = _delegate.createSender(address);
+ _senders.add(sender);
+ return sender;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver createReceiver(Address address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Receiver receiver = _delegate.createReceiver(address);
+ _receivers.add(receiver);
+ return receiver;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver createReceiver(String address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Receiver receiver = _delegate.createReceiver(address);
+ _receivers.add(receiver);
+ return receiver;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws MessagingException
+ {
+ checkError();
+ return _conn; // always return your peer (not your delegate's peer)
+ }
+
+ @Override
+ public boolean hasError()
+ {
+ return _delegate.hasError();
+ }
+
+ @Override
+ public void checkError() throws MessagingException
+ {
+ checkClosedAndThrowException(); // check if we already have the info.
+ try
+ {
+ // Asking the delegate.
+ _delegate.checkError();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ private void checkClosedAndThrowException() throws SessionException
+ {
+ checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion");
+ }
+
+ private void checkClosedAndThrowException(String closedMessage) throws SessionException
+ {
+ switch (_state)
+ {
+ case ERROR:
+ case UNDEFINED:
+ throw new SessionException("Session is in a temporary error state. The session may or may not recover from this");
+ case CLOSED:
+ throw new SessionException(closedMessage);
+ }
+ }
+
+ /**
+ * A ConnectionException will cause the Session to go into a temporary error state,
+ * which prevents it from being used further.
+ * From there the Session can be moved into OPENED (if failover works) or
+ * CLOSED if there is no failover or if failover has failed.
+ * @param e
+ * @throws MessagingException
+ */
+ private SessionException handleConnectionException(ConnectionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = SessionState.ERROR;
+ _conn.exception(e); // This might trigger failover in a layer above.
+ if (_state == SessionState.CLOSED)
+ {
+ // The connection has instructed the session to be closed.
+ // Either there was no failover, or failover has failed.
+ return new SessionException("Session is closed due to connection error",e);
+ }
+ else
+ {
+ // Asking the application or the Parent handler to retry the operation.
+ // The Session should be in OPENED state at this time.
+ return new SessionException("Session was in a temporary error state due to connection error." +
+ "Plase retry your operation",e);
+ }
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ private SessionException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = SessionState.CLOSED;
+ }
+ return new SessionException("Session has been closed",e);
+ }
+}