summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:19:57 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:19:57 +0000
commitfa8babc296c51c31166e718af1cdd3b4c650142e (patch)
tree36883ee3b93bcec7bceec9c3de34b042ab22c409
parentc0a42551f21e17199abbb82fa48f4531397de862 (diff)
downloadqpid-python-fa8babc296c51c31166e718af1cdd3b4c650142e.tar.gz
QPID-4027 Experimented with adding common functionality via Decorators
as opposed to inheritence. ConnectionManagementDecorator and SessionManagementDecorator adds common session/connection management, state management, error handling etc. The decorator approach allows us to add/remove functionality easily and to isolate logic without assuming/depending on base classes like we do with our current client. Added some extension interfaces under the "ext" package that facilitates the implementation of the API. This package is not intended to be visible to the users. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350699 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java46
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java29
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java285
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java (renamed from qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java)0
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java477
5 files changed, 837 insertions, 0 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
new file mode 100644
index 0000000000..557c3c0fe7
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
@@ -0,0 +1,46 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import java.util.List;
+
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Session;
+
+/**
+ * An extended interface meant for API implementors.
+ */
+public interface ConnectionExt extends Connection
+{
+ public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
+
+ public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
+
+ public List<Session> getSessions() throws ConnectionException;
+
+ public void exception(ConnectionException e);
+
+ /**
+ * The per connection lock that is used by the connection
+ * and it's child objects. A single lock is used to prevent
+ * deadlocks that could occur with having multiple locks,
+ * perhaps at the cost of a minor perf degradation.
+ */
+ public Object getConnectionLock();
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java
new file mode 100644
index 0000000000..e1af2479c5
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java
@@ -0,0 +1,29 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.ConnectionException;
+
+public interface ConnectionStateListener
+{
+ public void exception(ConnectionException e);
+
+ public void opened();
+
+ public void closed();
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
new file mode 100644
index 0000000000..5200e5081d
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
@@ -0,0 +1,285 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.apache.qpid.messaging.ext.ConnectionStateListener;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a connection.
+ * This allows the various implementations to reuse basic functions.
+ * This class adds,
+ * 1. Basic session mgt (tracking, default name generation ..etc)
+ * 2. Connection state management.
+ * 3. Error handling.
+ *
+ * <i> <b>Close() can be called by,</b>
+ * <ol>
+ * <li>The application (normal close)</li>
+ * <li>By the parent if it's not null (error)</li>
+ * <li>By this object if parent is null (error)</li>
+ * </ol>
+ * </i>
+ *
+ * <u>Failover</u>
+ * This Decorator does not handle any failover.
+ *
+ * If failover is handled at a layer above then it will take appropriate action.
+ * @see ConnectionFailoverDecorator for an example.
+ * If failover is handled at a layer below (or no failover at all) then an exception means the connection is no longer usable.
+ * Therefore this class will attempt to close the connection if the parent is null.
+ */
+public class ConnectionManagementDecorator implements ConnectionExt
+{
+ private static Logger _logger = LoggerFactory.getLogger(ConnectionManagementDecorator.class);
+
+ public enum ConnectionState { UNDEFINED, OPENED, CLOSED, ERROR}
+
+ private ConnectionExt _parent;
+ private Connection _delegate;
+ private ConnectionState _state = ConnectionState.UNDEFINED;
+ private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
+ private Map<String, Session> _sessions = new ConcurrentHashMap<String,Session>();
+ private ConnectionException _lastException;
+ private List<ConnectionStateListener> _stateListeners = new ArrayList<ConnectionStateListener>();
+
+ private final Object _connectionLock;
+
+ public ConnectionManagementDecorator(Connection delegate)
+ {
+ this(null,delegate);
+ }
+
+ public ConnectionManagementDecorator(ConnectionExt parent, Connection delegate)
+ {
+ _delegate = delegate;
+ _parent = parent;
+ _connectionLock = (_parent == null) ? new Object() : _parent.getConnectionLock();
+ }
+
+ @Override
+ public void open() throws MessagingException
+ {
+ // return without exception denotes success
+ _delegate.open();
+ synchronized (_connectionLock)
+ {
+ _state = ConnectionState.OPENED;
+ for (ConnectionStateListener l: _stateListeners)
+ {
+ l.opened();
+ }
+ }
+ }
+
+ @Override
+ public boolean isOpen() throws MessagingException
+ {
+ return _delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Connection is already closed");
+ synchronized(_connectionLock)
+ {
+ _state = ConnectionState.CLOSED;
+ for (Session ssn : _sessions.values())
+ {
+ ssn.close();
+ }
+ _sessions.clear();
+
+ for (ConnectionStateListener l: _stateListeners)
+ {
+ l.closed();
+ }
+ }
+ _delegate.close();
+ }
+
+ @Override
+ public Session createSession(String name) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ if (name == null || name.isEmpty()) { name = generateSessionName(); }
+ Session ssn = new SessionManagementDecorator(this,_delegate.createSession(name));
+ _sessions.put(name, ssn);
+ return ssn;
+ }
+ catch(ConnectionException e)
+ {
+ exception(e);
+ // If there is a failover handler above this it will handle it.
+ // Otherwise the application gets this.
+ throw new ConnectionException("Connection closed",e);
+ }
+ }
+
+ @Override
+ public Session createTransactionalSession(String name)
+ throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ if (name == null || name.isEmpty()) { name = generateSessionName(); }
+ Session ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
+ _sessions.put(name, ssn);
+ return ssn;
+ }
+ catch(ConnectionException e)
+ {
+ exception(e);
+ // If there is a failover handler above this it will handle it.
+ // Otherwise the application gets this.
+ throw new ConnectionException("Connection closed",e);
+ }
+ }
+
+ @Override
+ public String getAuthenticatedUsername() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ return _delegate.getAuthenticatedUsername();
+ }
+
+ @Override
+ public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException
+ {
+ checkClosedAndThrowException();
+ synchronized (_connectionLock)
+ {
+ _stateListeners.add(l);
+ }
+ }
+
+ @Override
+ public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException
+ {
+ checkClosedAndThrowException();
+ synchronized (_connectionLock)
+ {
+ _stateListeners.remove(l);
+ }
+ }
+
+ @Override
+ public List<Session> getSessions() throws ConnectionException
+ {
+ checkClosedAndThrowException();
+ return new ArrayList<Session>(_sessions.values());
+ }
+
+ @Override // Called by the delegate or a a session created by this connection.
+ public void exception(ConnectionException e)
+ {
+ synchronized(_connectionLock)
+ {
+ _state = ConnectionState.ERROR;
+ if (_lastException != null)
+ {
+ _logger.warn("Last exception was not notified to the application", _lastException);
+ }
+ _lastException = e;
+
+ for (ConnectionStateListener l: _stateListeners)
+ {
+ l.exception(_lastException);
+ }
+
+ if (_parent != null)
+ {
+ _parent.exception(e);
+ }
+ else
+ {
+ try
+ {
+ close();
+ }
+ catch(MessagingException ex)
+ {
+ //ignore
+ }
+ }
+ }
+ // should we clean lastException if we notify it via a listener?
+ }
+
+ @Override
+ public Object getConnectionLock()
+ {
+ return _connectionLock;
+ }
+
+ private void checkClosedAndThrowException() throws ConnectionException
+ {
+ checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection");
+ }
+
+ private void checkClosedAndThrowException(String msg) throws ConnectionException
+ {
+ switch (_state)
+ {
+ case UNDEFINED:
+ case ERROR:
+ throw new ConnectionException("Connection is in an error state. The connection may or may not recover from this");
+ case CLOSED:
+ synchronized(_connectionLock)
+ {
+ if(_lastException != null)
+ {
+ Throwable cause = _lastException;
+ _lastException = null;
+ throw new ConnectionException(msg, cause);
+ }
+ else
+ {
+ throw new ConnectionException(msg);
+ }
+ }
+ default:
+ break;
+ }
+ }
+
+ private String generateSessionName()
+ {
+ // TODO add local IP and pid to the beginning;
+ return _ssnNameGenerator.generate().toString();
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java
index 0dfd6e735d..0dfd6e735d 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
new file mode 100644
index 0000000000..027e9b9605
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
@@ -0,0 +1,477 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a session.
+ * This class adds,
+ * 1. Management of receivers and senders created by this session.
+ * 2. State management.
+ * 3. Exception handling.
+ *
+ * <b>Exception Handling</b>
+ * This class will wrap each method call to it's delegate to handle error situations.
+ * First it will check if the session is already CLOSED or in an ERROR situation.
+ * Then it will look for connection and session errors and handle as follows.
+ *
+ * <b>Connection Exceptions</b>
+ * This class intercepts ConnectionException's and are passed onto the connection.
+ * The Session will be marked as ERROR and a session exception will be thrown with an appropriate message.
+ * Any further use of the session is prevented until it moves to OPENED.
+ *
+ * If failover is handled at a layer above, there will be a Session Decorator that
+ * would handle the session exception and retry when the connection is available.
+ * This handler may block the call until the state moves into either OPENED or CLOSED.
+ * Ex @see SessionFailoverDecorator.
+ *
+ * If failover is handled at a layer below, then a connection exception means it has failed already.
+ * Therefore when passed to the connection,the exception will be thrown directly to the application.
+ * The connection object will be responsible for calling close on this session for the above case.
+ *
+ * <i> <b>Close() can be called by,</b>
+ * <ol>
+ * <li>The application (normal close)</li>
+ * <li>By the parent via failover (error)</li>
+ * <li>By the connection object, if not failover(error)</li>
+ * </ol>
+ * </i>
+ *
+ * <b>Session Exceptions</b>
+ * For the time being, anytime a session exception is received, the session will be marked CLOSED.
+ * We need to revisit this.
+ */
+public class SessionManagementDecorator implements Session
+{
+ private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class);
+
+ public enum SessionState { UNDEFINED, OPENED, CLOSED, ERROR}
+
+ private ConnectionExt _conn;
+ private Session _delegate;
+ SessionState _state = SessionState.UNDEFINED;
+ private List<Receiver> _receivers = new ArrayList<Receiver>();
+ private List<Sender> _senders = new ArrayList<Sender>();
+ private final Object _connectionLock; // global per connection lock
+
+ public SessionManagementDecorator(ConnectionExt conn, Session delegate)
+ {
+ _conn = conn;
+ _delegate = delegate;
+ _connectionLock = conn.getConnectionLock();
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _state == SessionState.CLOSED;
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Session is already closed");
+ synchronized(_connectionLock)
+ {
+ _state = SessionState.CLOSED;
+ for (Sender sender: _senders)
+ {
+ sender.close();
+ }
+ _senders.clear();
+
+ for (Receiver receiver: _receivers)
+ {
+ receiver.close();
+ }
+ _receivers.clear();
+ _delegate.close();
+ }
+ }
+
+ @Override
+ public void commit() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.commit();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void rollback() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.rollback();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge(boolean sync) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.acknowledge(sync);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void acknowledge(Message message, boolean sync)
+ throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.acknowledge(message, sync);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void reject(Message message) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.reject(message);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void release(Message message) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.release(message);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void sync(boolean block) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.sync(block);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getReceivable() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getReceivable();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettledAcks() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getUnsettledAcks();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver nextReceiver(long timeout) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.nextReceiver(timeout);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Sender createSender(Address address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Sender sender = _delegate.createSender(address);
+ _senders.add(sender);
+ return sender;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Sender createSender(String address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Sender sender = _delegate.createSender(address);
+ _senders.add(sender);
+ return sender;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver createReceiver(Address address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Receiver receiver = _delegate.createReceiver(address);
+ _receivers.add(receiver);
+ return receiver;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Receiver createReceiver(String address) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ Receiver receiver = _delegate.createReceiver(address);
+ _receivers.add(receiver);
+ return receiver;
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws MessagingException
+ {
+ checkError();
+ return _conn; // always return your peer (not your delegate's peer)
+ }
+
+ @Override
+ public boolean hasError()
+ {
+ return _delegate.hasError();
+ }
+
+ @Override
+ public void checkError() throws MessagingException
+ {
+ checkClosedAndThrowException(); // check if we already have the info.
+ try
+ {
+ // Asking the delegate.
+ _delegate.checkError();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ private void checkClosedAndThrowException() throws SessionException
+ {
+ checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion");
+ }
+
+ private void checkClosedAndThrowException(String closedMessage) throws SessionException
+ {
+ switch (_state)
+ {
+ case ERROR:
+ case UNDEFINED:
+ throw new SessionException("Session is in a temporary error state. The session may or may not recover from this");
+ case CLOSED:
+ throw new SessionException(closedMessage);
+ }
+ }
+
+ /**
+ * A ConnectionException will cause the Session to go into a temporary error state,
+ * which prevents it from being used further.
+ * From there the Session can be moved into OPENED (if failover works) or
+ * CLOSED if there is no failover or if failover has failed.
+ * @param e
+ * @throws MessagingException
+ */
+ private SessionException handleConnectionException(ConnectionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = SessionState.ERROR;
+ _conn.exception(e); // This might trigger failover in a layer above.
+ if (_state == SessionState.CLOSED)
+ {
+ // The connection has instructed the session to be closed.
+ // Either there was no failover, or failover has failed.
+ return new SessionException("Session is closed due to connection error",e);
+ }
+ else
+ {
+ // Asking the application or the Parent handler to retry the operation.
+ // The Session should be in OPENED state at this time.
+ return new SessionException("Session was in a temporary error state due to connection error." +
+ "Plase retry your operation",e);
+ }
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ private SessionException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = SessionState.CLOSED;
+ }
+ return new SessionException("Session has been closed",e);
+ }
+}