diff options
-rw-r--r-- | qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java | 46 | ||||
-rw-r--r-- | qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java | 29 | ||||
-rw-r--r-- | qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java | 285 | ||||
-rw-r--r-- | qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java (renamed from qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java) | 0 | ||||
-rw-r--r-- | qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java | 477 |
5 files changed, 837 insertions, 0 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java new file mode 100644 index 0000000000..557c3c0fe7 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java @@ -0,0 +1,46 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import java.util.List; + +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Session; + +/** + * An extended interface meant for API implementors. + */ +public interface ConnectionExt extends Connection +{ + public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException; + + public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException; + + public List<Session> getSessions() throws ConnectionException; + + public void exception(ConnectionException e); + + /** + * The per connection lock that is used by the connection + * and it's child objects. A single lock is used to prevent + * deadlocks that could occur with having multiple locks, + * perhaps at the cost of a minor perf degradation. + */ + public Object getConnectionLock(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java new file mode 100644 index 0000000000..e1af2479c5 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionStateListener.java @@ -0,0 +1,29 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import org.apache.qpid.messaging.ConnectionException; + +public interface ConnectionStateListener +{ + public void exception(ConnectionException e); + + public void opened(); + + public void closed(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java new file mode 100644 index 0000000000..5200e5081d --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java @@ -0,0 +1,285 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.ext.ConnectionExt; +import org.apache.qpid.messaging.ext.ConnectionStateListener; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a connection. + * This allows the various implementations to reuse basic functions. + * This class adds, + * 1. Basic session mgt (tracking, default name generation ..etc) + * 2. Connection state management. + * 3. Error handling. + * + * <i> <b>Close() can be called by,</b> + * <ol> + * <li>The application (normal close)</li> + * <li>By the parent if it's not null (error)</li> + * <li>By this object if parent is null (error)</li> + * </ol> + * </i> + * + * <u>Failover</u> + * This Decorator does not handle any failover. + * + * If failover is handled at a layer above then it will take appropriate action. + * @see ConnectionFailoverDecorator for an example. + * If failover is handled at a layer below (or no failover at all) then an exception means the connection is no longer usable. + * Therefore this class will attempt to close the connection if the parent is null. + */ +public class ConnectionManagementDecorator implements ConnectionExt +{ + private static Logger _logger = LoggerFactory.getLogger(ConnectionManagementDecorator.class); + + public enum ConnectionState { UNDEFINED, OPENED, CLOSED, ERROR} + + private ConnectionExt _parent; + private Connection _delegate; + private ConnectionState _state = ConnectionState.UNDEFINED; + private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); + private Map<String, Session> _sessions = new ConcurrentHashMap<String,Session>(); + private ConnectionException _lastException; + private List<ConnectionStateListener> _stateListeners = new ArrayList<ConnectionStateListener>(); + + private final Object _connectionLock; + + public ConnectionManagementDecorator(Connection delegate) + { + this(null,delegate); + } + + public ConnectionManagementDecorator(ConnectionExt parent, Connection delegate) + { + _delegate = delegate; + _parent = parent; + _connectionLock = (_parent == null) ? new Object() : _parent.getConnectionLock(); + } + + @Override + public void open() throws MessagingException + { + // return without exception denotes success + _delegate.open(); + synchronized (_connectionLock) + { + _state = ConnectionState.OPENED; + for (ConnectionStateListener l: _stateListeners) + { + l.opened(); + } + } + } + + @Override + public boolean isOpen() throws MessagingException + { + return _delegate.isOpen(); + } + + @Override + public void close() throws MessagingException + { + checkClosedAndThrowException("Connection is already closed"); + synchronized(_connectionLock) + { + _state = ConnectionState.CLOSED; + for (Session ssn : _sessions.values()) + { + ssn.close(); + } + _sessions.clear(); + + for (ConnectionStateListener l: _stateListeners) + { + l.closed(); + } + } + _delegate.close(); + } + + @Override + public Session createSession(String name) throws MessagingException + { + checkClosedAndThrowException(); + try + { + if (name == null || name.isEmpty()) { name = generateSessionName(); } + Session ssn = new SessionManagementDecorator(this,_delegate.createSession(name)); + _sessions.put(name, ssn); + return ssn; + } + catch(ConnectionException e) + { + exception(e); + // If there is a failover handler above this it will handle it. + // Otherwise the application gets this. + throw new ConnectionException("Connection closed",e); + } + } + + @Override + public Session createTransactionalSession(String name) + throws MessagingException + { + checkClosedAndThrowException(); + try + { + if (name == null || name.isEmpty()) { name = generateSessionName(); } + Session ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name)); + _sessions.put(name, ssn); + return ssn; + } + catch(ConnectionException e) + { + exception(e); + // If there is a failover handler above this it will handle it. + // Otherwise the application gets this. + throw new ConnectionException("Connection closed",e); + } + } + + @Override + public String getAuthenticatedUsername() throws MessagingException + { + checkClosedAndThrowException(); + return _delegate.getAuthenticatedUsername(); + } + + @Override + public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException + { + checkClosedAndThrowException(); + synchronized (_connectionLock) + { + _stateListeners.add(l); + } + } + + @Override + public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException + { + checkClosedAndThrowException(); + synchronized (_connectionLock) + { + _stateListeners.remove(l); + } + } + + @Override + public List<Session> getSessions() throws ConnectionException + { + checkClosedAndThrowException(); + return new ArrayList<Session>(_sessions.values()); + } + + @Override // Called by the delegate or a a session created by this connection. + public void exception(ConnectionException e) + { + synchronized(_connectionLock) + { + _state = ConnectionState.ERROR; + if (_lastException != null) + { + _logger.warn("Last exception was not notified to the application", _lastException); + } + _lastException = e; + + for (ConnectionStateListener l: _stateListeners) + { + l.exception(_lastException); + } + + if (_parent != null) + { + _parent.exception(e); + } + else + { + try + { + close(); + } + catch(MessagingException ex) + { + //ignore + } + } + } + // should we clean lastException if we notify it via a listener? + } + + @Override + public Object getConnectionLock() + { + return _connectionLock; + } + + private void checkClosedAndThrowException() throws ConnectionException + { + checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection"); + } + + private void checkClosedAndThrowException(String msg) throws ConnectionException + { + switch (_state) + { + case UNDEFINED: + case ERROR: + throw new ConnectionException("Connection is in an error state. The connection may or may not recover from this"); + case CLOSED: + synchronized(_connectionLock) + { + if(_lastException != null) + { + Throwable cause = _lastException; + _lastException = null; + throw new ConnectionException(msg, cause); + } + else + { + throw new ConnectionException(msg); + } + } + default: + break; + } + } + + private String generateSessionName() + { + // TODO add local IP and pid to the beginning; + return _ssnNameGenerator.generate().toString(); + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java index 0dfd6e735d..0dfd6e735d 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ReadOnlyMessageAdapter.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java new file mode 100644 index 0000000000..027e9b9605 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java @@ -0,0 +1,477 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.Sender; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.ext.ConnectionExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a session. + * This class adds, + * 1. Management of receivers and senders created by this session. + * 2. State management. + * 3. Exception handling. + * + * <b>Exception Handling</b> + * This class will wrap each method call to it's delegate to handle error situations. + * First it will check if the session is already CLOSED or in an ERROR situation. + * Then it will look for connection and session errors and handle as follows. + * + * <b>Connection Exceptions</b> + * This class intercepts ConnectionException's and are passed onto the connection. + * The Session will be marked as ERROR and a session exception will be thrown with an appropriate message. + * Any further use of the session is prevented until it moves to OPENED. + * + * If failover is handled at a layer above, there will be a Session Decorator that + * would handle the session exception and retry when the connection is available. + * This handler may block the call until the state moves into either OPENED or CLOSED. + * Ex @see SessionFailoverDecorator. + * + * If failover is handled at a layer below, then a connection exception means it has failed already. + * Therefore when passed to the connection,the exception will be thrown directly to the application. + * The connection object will be responsible for calling close on this session for the above case. + * + * <i> <b>Close() can be called by,</b> + * <ol> + * <li>The application (normal close)</li> + * <li>By the parent via failover (error)</li> + * <li>By the connection object, if not failover(error)</li> + * </ol> + * </i> + * + * <b>Session Exceptions</b> + * For the time being, anytime a session exception is received, the session will be marked CLOSED. + * We need to revisit this. + */ +public class SessionManagementDecorator implements Session +{ + private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class); + + public enum SessionState { UNDEFINED, OPENED, CLOSED, ERROR} + + private ConnectionExt _conn; + private Session _delegate; + SessionState _state = SessionState.UNDEFINED; + private List<Receiver> _receivers = new ArrayList<Receiver>(); + private List<Sender> _senders = new ArrayList<Sender>(); + private final Object _connectionLock; // global per connection lock + + public SessionManagementDecorator(ConnectionExt conn, Session delegate) + { + _conn = conn; + _delegate = delegate; + _connectionLock = conn.getConnectionLock(); + } + + @Override + public boolean isClosed() + { + return _state == SessionState.CLOSED; + } + + @Override + public void close() throws MessagingException + { + checkClosedAndThrowException("Session is already closed"); + synchronized(_connectionLock) + { + _state = SessionState.CLOSED; + for (Sender sender: _senders) + { + sender.close(); + } + _senders.clear(); + + for (Receiver receiver: _receivers) + { + receiver.close(); + } + _receivers.clear(); + _delegate.close(); + } + } + + @Override + public void commit() throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.commit(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void rollback() throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.rollback(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void acknowledge(boolean sync) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.acknowledge(sync); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void acknowledge(Message message, boolean sync) + throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.acknowledge(message, sync); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void reject(Message message) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.reject(message); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void release(Message message) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.release(message); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void sync(boolean block) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.sync(block); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getReceivable() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getReceivable(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettledAcks() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getUnsettledAcks(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver nextReceiver(long timeout) throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.nextReceiver(timeout); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Sender createSender(Address address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Sender sender = _delegate.createSender(address); + _senders.add(sender); + return sender; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Sender createSender(String address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Sender sender = _delegate.createSender(address); + _senders.add(sender); + return sender; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver createReceiver(Address address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Receiver receiver = _delegate.createReceiver(address); + _receivers.add(receiver); + return receiver; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver createReceiver(String address) throws MessagingException + { + checkClosedAndThrowException(); + try + { + Receiver receiver = _delegate.createReceiver(address); + _receivers.add(receiver); + return receiver; + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Connection getConnection() throws MessagingException + { + checkError(); + return _conn; // always return your peer (not your delegate's peer) + } + + @Override + public boolean hasError() + { + return _delegate.hasError(); + } + + @Override + public void checkError() throws MessagingException + { + checkClosedAndThrowException(); // check if we already have the info. + try + { + // Asking the delegate. + _delegate.checkError(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + private void checkClosedAndThrowException() throws SessionException + { + checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion"); + } + + private void checkClosedAndThrowException(String closedMessage) throws SessionException + { + switch (_state) + { + case ERROR: + case UNDEFINED: + throw new SessionException("Session is in a temporary error state. The session may or may not recover from this"); + case CLOSED: + throw new SessionException(closedMessage); + } + } + + /** + * A ConnectionException will cause the Session to go into a temporary error state, + * which prevents it from being used further. + * From there the Session can be moved into OPENED (if failover works) or + * CLOSED if there is no failover or if failover has failed. + * @param e + * @throws MessagingException + */ + private SessionException handleConnectionException(ConnectionException e) + { + synchronized (_connectionLock) + { + _state = SessionState.ERROR; + _conn.exception(e); // This might trigger failover in a layer above. + if (_state == SessionState.CLOSED) + { + // The connection has instructed the session to be closed. + // Either there was no failover, or failover has failed. + return new SessionException("Session is closed due to connection error",e); + } + else + { + // Asking the application or the Parent handler to retry the operation. + // The Session should be in OPENED state at this time. + return new SessionException("Session was in a temporary error state due to connection error." + + "Plase retry your operation",e); + } + } + } + + /** + * Session Exceptions will generally invalidate the Session. + * TODO this needs to be revisited again. + * A new session will need to be created in that case. + * @param e + * @throws MessagingException + */ + private SessionException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + _state = SessionState.CLOSED; + } + return new SessionException("Session has been closed",e); + } +} |