From ad287c07e29466dd61d444f0404848f08da3c9c6 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 26 Jun 2012 15:53:37 +0000 Subject: QPID-4027 Added an experimental Failover Decorator to experiment with providing high level failover based on "stop-the-world" concept. Added an abstraction for Failover strategy. Renamed the ConnectionStateListener to ConnectionEventListener and added a ConnectionEvent class to notify various connection events. The experimental failover implementation makes use of this. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1354073 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/messaging/internal/ConnectionEvent.java | 61 +++ .../internal/ConnectionEventListener.java | 29 ++ .../internal/ConnectionStateListener.java | 29 -- .../qpid/messaging/internal/ConnectionString.java | 27 + .../qpid/messaging/internal/FailoverStrategy.java | 29 ++ .../internal/FailoverStrategyFactory.java | 33 ++ .../util/AbstractConnectionDecorator.java | 202 ++++++++ .../messaging/util/AbstractReceiverDecorator.java | 122 +++++ .../messaging/util/AbstractSenderDecorator.java | 89 ++++ .../messaging/util/AbstractSessionDecorator.java | 219 ++++++++ .../util/ConnectionManagementDecorator.java | 313 ------------ .../util/ReceiverManagementDecorator.java | 267 ---------- .../messaging/util/SenderManagementDecorator.java | 233 --------- .../messaging/util/SessionManagementDecorator.java | 549 -------------------- .../util/failover/ConnectionFailoverDecorator.java | 357 +++++++++++++ .../util/failover/DefaultFailoverStrategy.java | 128 +++++ .../failover/DefaultFailoverStrategyFactory.java | 15 + .../util/failover/ReceiverFailoverDecorator.java | 312 ++++++++++++ .../util/failover/SenderFailoverDecorator.java | 291 +++++++++++ .../util/failover/SessionFailoverDecorator.java | 559 +++++++++++++++++++++ 20 files changed, 2473 insertions(+), 1391 deletions(-) create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java delete mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java delete mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java delete mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java delete mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java delete mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java create mode 100644 qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java new file mode 100644 index 0000000000..8e376b712d --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEvent.java @@ -0,0 +1,61 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.internal; + +public class ConnectionEvent +{ + public enum EventType + { + OPENED, + CLOSED, + RECONNCTED, + RECONNCTION_ATTEMPTED, + RECONNCTION_FAILED, + CONNECTION_LOST, + PRE_FAILOVER, + POST_FAILOVER, + } + + private ConnectionInternal _conn; + private EventType _type; + private Object _source; + + public ConnectionEvent(ConnectionInternal conn, EventType type, Object source) + { + _conn = conn; + _type = type; + _source = source; + } + + public ConnectionInternal getConnection() + { + return _conn; + } + + public EventType getType() + { + return _type; + } + + public Object getSource() + { + return _source; + } + + +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java new file mode 100644 index 0000000000..4055c1e904 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java @@ -0,0 +1,29 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.internal; + +import org.apache.qpid.messaging.ConnectionException; + +public interface ConnectionStateListener +{ + public void exception(ConnectionException e); + + public void opened(); + + public void closed(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java deleted file mode 100644 index 4055c1e904..0000000000 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionStateListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.messaging.internal; - -import org.apache.qpid.messaging.ConnectionException; - -public interface ConnectionStateListener -{ - public void exception(ConnectionException e); - - public void opened(); - - public void closed(); -} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java new file mode 100644 index 0000000000..327c499366 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionString.java @@ -0,0 +1,27 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.internal; + +import java.util.Map; + +public interface ConnectionString +{ + public String getUrl(); + + public Map getOptions(); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java new file mode 100644 index 0000000000..18ad027315 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java @@ -0,0 +1,29 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.internal; + +public interface FailoverStrategy +{ + public boolean failoverAllowed(); + + public ConnectionString getNextConnectionString(); + + public ConnectionString getCurrentConnectionString(); + + public void connectionAttained(ConnectionInternal conn); +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java new file mode 100644 index 0000000000..8b27cf2ac7 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java @@ -0,0 +1,33 @@ +package org.apache.qpid.messaging.internal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FailoverStrategyFactory +{ + private final static FailoverStrategyFactory _instance; + private static final Logger _logger = LoggerFactory.getLogger(FailoverStrategyFactory.class); + + static + { + String className = System.getProperty("qpid.failover-factory", + "org.apache.qpid.messaging.util.failover.DefaultFailoverStrategyFactory"); // will default to java + try + { + _instance = (FailoverStrategyFactory) Class.forName(className).newInstance(); + } + catch (Exception e) + { + _logger.error("Error loading failover factory class",e); + throw new Error("Error loading failover factory class",e); + } + } + + public static FailoverStrategyFactory get() + { + return _instance; + } + + public abstract FailoverStrategy getFailoverStrategy(ConnectionInternal con); + +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java new file mode 100644 index 0000000000..50bc53c2b7 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java @@ -0,0 +1,202 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.MessageFactory; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.TransportFailureException; +import org.apache.qpid.messaging.internal.ConnectionEvent; +import org.apache.qpid.messaging.internal.ConnectionEvent.EventType; +import org.apache.qpid.messaging.internal.ConnectionEventListener; +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractConnectionDecorator implements ConnectionInternal +{ + private final Logger _logger = LoggerFactory.getLogger(getClass()); + + protected ConnectionInternal _delegate; + protected final Object _connectionLock; + protected List _stateListeners = new ArrayList(); + protected Map _sessions = new ConcurrentHashMap(); + + protected AbstractConnectionDecorator(ConnectionInternal delegate, Object lock) + { + _delegate = delegate; + _connectionLock = lock; + } + + @Override + public void open() throws MessagingException + { + synchronized (_connectionLock) + { + _delegate.open(); + } + } + + public void reconnect(String url, Map options) throws TransportFailureException + { + synchronized (_connectionLock) + { + _delegate.reconnect(url,options); + } + } + + @Override + public boolean isOpen() throws MessagingException + { + checkPreConditions(); + return _delegate.isOpen(); + } + + @Override + public void close() throws MessagingException + { + synchronized (_connectionLock) + { + for (Session ssn : _sessions.values()) + { + try + { + ssn.close(); + } + catch (Exception e) + { + _logger.warn("Error closing session",e); + } + } + _sessions.clear(); + try + { + _delegate.close(); + } + catch(Exception e) + { + _logger.warn("Error closing connection",e); + } + notifyEvent(new ConnectionEvent(this,EventType.CLOSED,this)); + } + } + + @Override + public String getAuthenticatedUsername() throws MessagingException + { + checkPreConditions(); + return _delegate.getAuthenticatedUsername(); + } + + @Override + public MessageFactory getMessageFactory() throws MessagingException + { + checkPreConditions(); + return _delegate.getMessageFactory(); + } + + @Override + public void addConnectionEventListener(ConnectionEventListener l) throws ConnectionException + { + checkPreConditions(); + synchronized (_connectionLock) + { + _stateListeners.add(l); + } + } + + @Override + public void removeConnectionEventListener(ConnectionEventListener l) throws ConnectionException + { + checkPreConditions(); + synchronized (_connectionLock) + { + _stateListeners.remove(l); + } + } + + + @Override + public List getSessions() throws ConnectionException + { + checkPreConditions(); + return new ArrayList(_sessions.values()); + } + + @Override + public void unregisterSession(SessionInternal ssn) + { + synchronized (_connectionLock) + { + _sessions.remove(ssn.getName()); + } + } + + @Override + public void exception(TransportFailureException e, long serialNumber) + { + synchronized(_connectionLock) + { + for (ConnectionEventListener l: _stateListeners) + { + l.exception(new ConnectionException("Connection Failed",e)); + } + } + } + + @Override + public Object getConnectionLock() + { + return _connectionLock; + } + + @Override + public String getConnectionURL() + { + return _delegate.getConnectionURL(); + } + + @Override + public Map getConnectionOptions() + { + return _delegate.getConnectionOptions(); + } + + @Override + public long getSerialNumber() + { + return _delegate.getSerialNumber(); + } + + protected void notifyEvent(ConnectionEvent event) + { + for (ConnectionEventListener l: _stateListeners) + { + l.eventOccured(event); + } + } + + protected abstract void checkPreConditions() throws ConnectionException; +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java new file mode 100644 index 0000000000..aa6ab50072 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractReceiverDecorator.java @@ -0,0 +1,122 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.ReceiverException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.internal.ReceiverInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractReceiverDecorator implements ReceiverInternal +{ + private Logger _logger = LoggerFactory.getLogger(getClass()); + + public enum ReceiverState {OPENED, CLOSED, ERROR}; + + protected ReceiverInternal _delegate; + protected SessionInternal _ssn; + protected final Object _connectionLock; // global per connection lock + + public AbstractReceiverDecorator(SessionInternal ssn, ReceiverInternal delegate) + { + _ssn = ssn; + _delegate = delegate; + _connectionLock = ssn.getConnectionInternal().getConnectionLock(); + } + + @Override + public Message get(long timeout) throws MessagingException + { + checkPreConditions(); + return _delegate.get(timeout); + } + + @Override + public Message fetch(long timeout) throws MessagingException + { + checkPreConditions(); + return _delegate.fetch(timeout); + } + + @Override + public void setCapacity(int capacity) throws MessagingException + { + checkPreConditions(); + _delegate.setCapacity(capacity); + } + + @Override + public int getCapacity() throws MessagingException + { + checkPreConditions(); + return _delegate.getCapacity(); + } + + @Override + public int getAvailable() throws MessagingException + { + checkPreConditions(); + return _delegate.getAvailable(); + } + + @Override + public int getUnsettled() throws MessagingException + { + checkPreConditions(); + return _delegate.getUnsettled(); + } + + @Override + public void close() throws MessagingException + { + synchronized (_connectionLock) + { + _delegate.close(); + _ssn.unregisterReceiver(this); + } + } + + @Override + public boolean isClosed() + { + return _delegate.isClosed(); + } + + @Override + public String getName() throws MessagingException + { + checkPreConditions(); + return _delegate.getName(); + } + + @Override + public Session getSession() throws MessagingException + { + checkPreConditions(); + _ssn.checkError(); + return _ssn; + } + + protected abstract void checkPreConditions() throws ReceiverException; +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java new file mode 100644 index 0000000000..776575ebd1 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java @@ -0,0 +1,89 @@ +package org.apache.qpid.messaging.util; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.SenderException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.internal.SenderInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractSenderDecorator implements SenderInternal +{ + private Logger _logger = LoggerFactory.getLogger(getClass()); + + protected SenderInternal _delegate; + protected SessionInternal _ssn; + protected final Object _connectionLock; // global per connection lock + + public AbstractSenderDecorator(SessionInternal ssn, SenderInternal delegate) + { + _ssn = ssn; + _delegate = delegate; + _connectionLock = ssn.getConnectionInternal().getConnectionLock(); + } + + @Override + public void send(Message message, boolean sync) throws MessagingException + { + checkPreConditions(); + _delegate.send(message, sync); + } + + @Override + public void close() throws MessagingException + { + synchronized (_connectionLock) + { + _delegate.close(); + _ssn.unregisterSender(this); + } + } + + @Override + public void setCapacity(int capacity) throws MessagingException + { + checkPreConditions(); + _delegate.setCapacity(capacity); + } + + @Override + public int getCapacity() throws MessagingException + { + checkPreConditions(); + return _delegate.getCapacity(); + } + + @Override + public int getAvailable() throws MessagingException + { + checkPreConditions(); + return _delegate.getAvailable(); + } + + @Override + public int getUnsettled() throws MessagingException + { + checkPreConditions(); + return _delegate.getUnsettled(); + } + + @Override + public String getName() throws MessagingException + { + checkPreConditions(); + return _delegate.getName(); + } + + @Override + public Session getSession() throws MessagingException + { + checkPreConditions(); + _ssn.checkError(); + return _ssn; + } + + protected abstract void checkPreConditions() throws SenderException;} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java new file mode 100644 index 0000000000..f46aae63b0 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java @@ -0,0 +1,219 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.messaging.Connection; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.Sender; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.TransportFailureException; +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.ReceiverInternal; +import org.apache.qpid.messaging.internal.SenderInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractSessionDecorator implements SessionInternal +{ + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + protected ConnectionInternal _conn; + protected SessionInternal _delegate; + protected List _receivers = new ArrayList(); + protected List _senders = new ArrayList(); + protected final Object _connectionLock; // global per connection lock + + public AbstractSessionDecorator(ConnectionInternal conn, SessionInternal delegate) + { + _conn = conn; + _delegate = delegate; + _connectionLock = conn.getConnectionLock(); + } + + @Override + public void close() throws MessagingException + { + synchronized(_connectionLock) + { + Sender[] senders = new Sender[_senders.size()]; + senders = _senders.toArray(senders); + for (Sender sender: senders) + { + try + { + sender.close(); + } + catch (Exception e) + { + _logger.warn("Error closing sender", e); + } + } + _senders.clear(); + + Receiver[] receivers = new Receiver[_receivers.size()]; + receivers = _receivers.toArray(receivers); + for (Receiver receiver: receivers) + { + try + { + receiver.close(); + } + catch (Exception e) + { + _logger.warn("Error closing receiver", e); + } + } + _receivers.clear(); + _delegate.close(); + _conn.unregisterSession(this); + } + } + + @Override + public boolean isClosed() + { + return _delegate.isClosed(); + } + + @Override + public void commit() throws MessagingException + { + checkPreConditions(); + _delegate.commit(); + } + + @Override + public void rollback() throws MessagingException + { + checkPreConditions(); + _delegate.rollback(); + } + + @Override + public void acknowledge(boolean sync) throws MessagingException + { + checkPreConditions(); + _delegate.acknowledge(sync); + } + + @Override + public void acknowledge(Message message, boolean sync) + throws MessagingException + { + checkPreConditions(); + _delegate.acknowledge(message, sync); + } + + @Override + public void reject(Message message) throws MessagingException + { + checkPreConditions(); + _delegate.reject(message); + } + + @Override + public void release(Message message) throws MessagingException + { + checkPreConditions(); + _delegate.release(message); + } + + @Override + public void sync(boolean block) throws MessagingException + { + checkPreConditions(); + _delegate.sync(block); + } + + @Override + public int getReceivable() throws MessagingException + { + checkPreConditions(); + return _delegate.getReceivable(); + } + + @Override + public int getUnsettledAcks() throws MessagingException + { + checkPreConditions(); + return _delegate.getUnsettledAcks(); + } + + @Override + public Receiver nextReceiver(long timeout) throws MessagingException + { + checkPreConditions(); + return _delegate.nextReceiver(timeout); + } + + @Override + public Connection getConnection() throws MessagingException + { + checkError(); + return _conn; // always return your peer (not your delegate's peer) + } + + @Override + public boolean hasError() + { + return _delegate.hasError(); + } + + @Override + public void checkError() throws MessagingException + { + checkPreConditions(); // check if we already have the info. + // Asking the delegate. + _delegate.checkError(); + } + + @Override + public ConnectionInternal getConnectionInternal() + { + return _conn; + } + + @Override + public String getName() + { + return _delegate.getName(); + } + + @Override + public void unregisterReceiver(ReceiverInternal receiver) + { + _receivers.remove(receiver); + _delegate.unregisterReceiver(receiver); + } + + @Override + public void unregisterSender(SenderInternal sender) + { + _senders.remove(sender); + _delegate.unregisterSender(sender); + } + + protected abstract void checkPreConditions() throws SessionException; +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java deleted file mode 100644 index ccc216e7af..0000000000 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java +++ /dev/null @@ -1,313 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.messaging.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.qpid.messaging.Connection; -import org.apache.qpid.messaging.ConnectionException; -import org.apache.qpid.messaging.MessageFactory; -import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.Session; -import org.apache.qpid.messaging.SessionException; -import org.apache.qpid.messaging.internal.ConnectionInternal; -import org.apache.qpid.messaging.internal.ConnectionStateListener; -import org.apache.qpid.messaging.internal.SessionInternal; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Decorator that adds basic housekeeping tasks to a connection. - * This allows the various implementations to reuse basic functions. - * This class adds, - * 1. Basic session mgt (tracking, default name generation ..etc) - * 2. Connection state management. - * 3. Error handling. - * - * Close() can be called by, - *
    - *
  1. The application (normal close)
  2. - *
  3. By the parent if it's not null (error)
  4. - *
  5. By this object if parent is null (error)
  6. - *
- *
- * - * Failover - * This Decorator does not handle any failover. - * - * If failover is handled at a layer above then it will take appropriate action. - * @see ConnectionFailoverDecorator for an example. - * If failover is handled at a layer below (or no failover at all) then an exception means the connection is no longer usable. - * Therefore this class will attempt to close the connection if the parent is null. - */ -public class ConnectionManagementDecorator implements ConnectionInternal -{ - private static Logger _logger = LoggerFactory.getLogger(ConnectionManagementDecorator.class); - - public enum ConnectionState { UNDEFINED, OPENED, CLOSED, ERROR} - - private ConnectionInternal _parent; - private Connection _delegate; - private ConnectionState _state = ConnectionState.UNDEFINED; - private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); - private Map _sessions = new ConcurrentHashMap(); - private ConnectionException _lastException; - private List _stateListeners = new ArrayList(); - - private final Object _connectionLock; - - public ConnectionManagementDecorator(Connection delegate) - { - this(null,delegate); - } - - public ConnectionManagementDecorator(ConnectionInternal parent, Connection delegate) - { - _delegate = delegate; - _parent = parent; - _connectionLock = (_parent == null) ? new Object() : _parent.getConnectionLock(); - } - - @Override - public void open() throws MessagingException - { - // return without exception denotes success - _delegate.open(); - synchronized (_connectionLock) - { - _state = ConnectionState.OPENED; - for (ConnectionStateListener l: _stateListeners) - { - l.opened(); - } - } - } - - @Override - public boolean isOpen() throws MessagingException - { - return _delegate.isOpen(); - } - - @Override - public void close() throws MessagingException - { - checkClosedAndThrowException("Connection is already closed"); - synchronized(_connectionLock) - { - _state = ConnectionState.CLOSED; - for (Session ssn : _sessions.values()) - { - try - { - ssn.close(); - } - catch (Exception e) - { - _logger.warn("Error closing session",e); - } - } - _sessions.clear(); - - for (ConnectionStateListener l: _stateListeners) - { - l.closed(); - } - } - _delegate.close(); - } - - @Override - public Session createSession(String name) throws MessagingException - { - checkClosedAndThrowException(); - try - { - if (name == null || name.isEmpty()) { name = generateSessionName(); } - SessionInternal ssn = new SessionManagementDecorator(this,name,_delegate.createSession(name)); - _sessions.put(name, ssn); - return ssn; - } - catch(ConnectionException e) - { - exception(e); - // If there is a failover handler above this it will handle it. - // Otherwise the application gets this. - throw new ConnectionException("Connection closed",e); - } - } - - @Override - public Session createTransactionalSession(String name) - throws MessagingException - { - checkClosedAndThrowException(); - try - { - if (name == null || name.isEmpty()) { name = generateSessionName(); } - SessionInternal ssn = new SessionManagementDecorator(this,name,_delegate.createTransactionalSession(name)); - _sessions.put(name, ssn); - return ssn; - } - catch(ConnectionException e) - { - exception(e); - // If there is a failover handler above this it will handle it. - // Otherwise the application gets this. - throw new ConnectionException("Connection closed",e); - } - } - - @Override - public String getAuthenticatedUsername() throws MessagingException - { - checkClosedAndThrowException(); - return _delegate.getAuthenticatedUsername(); - } - - @Override - public MessageFactory getMessageFactory() throws MessagingException - { - checkClosedAndThrowException(); - return _delegate.getMessageFactory(); - } - - @Override - public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException - { - checkClosedAndThrowException(); - synchronized (_connectionLock) - { - _stateListeners.add(l); - } - } - - @Override - public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException - { - checkClosedAndThrowException(); - synchronized (_connectionLock) - { - _stateListeners.remove(l); - } - } - - @Override - public List getSessions() throws ConnectionException - { - checkClosedAndThrowException(); - return new ArrayList(_sessions.values()); - } - - @Override - public void unregisterSession(SessionInternal ssn) - { - _sessions.remove(ssn.getName()); - } - - @Override // Called by the delegate or a a session created by this connection. - public void exception(ConnectionException e) - { - synchronized(_connectionLock) - { - _state = ConnectionState.ERROR; - if (_lastException != null) - { - _logger.warn("Last exception was not notified to the application", _lastException); - } - _lastException = e; - - for (ConnectionStateListener l: _stateListeners) - { - l.exception(_lastException); - } - - if (_parent != null) - { - _parent.exception(e); - } - else - { - try - { - close(); - } - catch(MessagingException ex) - { - //ignore - } - } - } - // should we clean lastException if we notify it via a listener? - } - - @Override - public Object getConnectionLock() - { - return _connectionLock; - } - - @Override - public void recreate() throws MessagingException - { - // TODO Auto-generated method stub - } - - private void checkClosedAndThrowException() throws ConnectionException - { - checkClosedAndThrowException("Connection is closed. You cannot invoke methods on a closed connection"); - } - - private void checkClosedAndThrowException(String msg) throws ConnectionException - { - switch (_state) - { - case UNDEFINED: - case ERROR: - throw new ConnectionException("Connection is in an error state. The connection may or may not recover from this"); - case CLOSED: - synchronized(_connectionLock) - { - if(_lastException != null) - { - Throwable cause = _lastException; - _lastException = null; - throw new ConnectionException(msg, cause); - } - else - { - throw new ConnectionException(msg); - } - } - default: - break; - } - } - - private String generateSessionName() - { - // TODO add local IP and pid to the beginning; - return _ssnNameGenerator.generate().toString(); - } -} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java deleted file mode 100644 index a4e545e6f0..0000000000 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java +++ /dev/null @@ -1,267 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.messaging.util; - -import org.apache.qpid.messaging.ConnectionException; -import org.apache.qpid.messaging.Message; -import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.Receiver; -import org.apache.qpid.messaging.ReceiverException; -import org.apache.qpid.messaging.Session; -import org.apache.qpid.messaging.SessionException; -import org.apache.qpid.messaging.internal.ReceiverInternal; -import org.apache.qpid.messaging.internal.SessionInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Decorator that adds basic housekeeping tasks to a Receiver. - * This class adds, - * 1. State management. - * 2. Exception handling. - * - */ -public class ReceiverManagementDecorator implements ReceiverInternal -{ - private static Logger _logger = LoggerFactory.getLogger(ReceiverManagementDecorator.class); - - public enum ReceiverState {OPENED, CLOSED, ERROR}; - - private Receiver _delegate; - private ReceiverState _state = ReceiverState.OPENED; - private SessionInternal _ssn; - private final Object _connectionLock; // global per connection lock - - public ReceiverManagementDecorator(SessionInternal ssn, Receiver delegate) - { - _ssn = ssn; - _delegate = delegate; - _connectionLock = ssn.getConnectionInternal().getConnectionLock(); - } - - @Override - public Message get(long timeout) throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.get(timeout); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Message fetch(long timeout) throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.fetch(timeout); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void setCapacity(int capacity) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.setCapacity(capacity); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getCapacity() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getCapacity(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getAvailable() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getAvailable(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getUnsettled() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getUnsettled(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void close() throws MessagingException - { - checkClosedAndThrowException("Receiver is already closed"); - synchronized (_connectionLock) - { - _state = ReceiverState.CLOSED; - _delegate.close(); - _ssn.unregisterReceiver(this); - } - } - - @Override - public boolean isClosed() - { - return _state == ReceiverState.CLOSED; - } - - @Override - public String getName() throws MessagingException - { - checkClosedAndThrowException(); - return _delegate.getName(); - } - - @Override - public Session getSession() throws MessagingException - { - checkClosedAndThrowException(); - _ssn.checkError(); - return _ssn; - } - - @Override - public void recreate() throws MessagingException - { - // TODO Auto-generated method stub - - } - - private void checkClosedAndThrowException() throws ReceiverException - { - checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver"); - } - - private void checkClosedAndThrowException(String closedMessage) throws ReceiverException - { - switch (_state) - { - case ERROR: - throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this"); - case CLOSED: - throw new ReceiverException(closedMessage); - } - } - - /** - * A ConnectionException will cause the Session/Receiver to go into a temporary error state, - * which prevents it from being used further. - * From there the Session and Receiver can be moved into OPENED (if failover works) or - * CLOSED if there is no failover or if failover has failed. - * @param e - * @throws MessagingException - */ - private ReceiverException handleConnectionException(ConnectionException e) - { - synchronized (_connectionLock) - { - _state = ReceiverState.ERROR; - _ssn.exception(e); // This might trigger failover in a layer above. - if (_state == ReceiverState.CLOSED) - { - // The connection has instructed the session and it's child objects to be closed. - // Either there was no failover, or failover has failed. - return new ReceiverException("Receiver is closed due to connection error",e); - } - else - { - // Asking the application or the Parent handler to retry the operation. - // The Session and Receiver should be in OPENED state at this time. - return new ReceiverException("Receiver was in a temporary error state due to connection error." + - "Plase retry your operation",e); - } - } - } - - /** - * Session Exceptions will generally invalidate the Session. - * TODO this needs to be revisited again. - * A new session will need to be created in that case. - * @param e - * @throws MessagingException - */ - private ReceiverException handleSessionException(SessionException e) - { - synchronized (_connectionLock) - { - // This should close all receivers (including this) and senders. - _ssn.exception(e); - } - return new ReceiverException("Session has been closed",e); - } -} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java deleted file mode 100644 index 14bd8f3b15..0000000000 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java +++ /dev/null @@ -1,233 +0,0 @@ -package org.apache.qpid.messaging.util; - -import org.apache.qpid.messaging.ConnectionException; -import org.apache.qpid.messaging.Message; -import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.ReceiverException; -import org.apache.qpid.messaging.Sender; -import org.apache.qpid.messaging.SenderException; -import org.apache.qpid.messaging.Session; -import org.apache.qpid.messaging.SessionException; -import org.apache.qpid.messaging.internal.SenderInternal; -import org.apache.qpid.messaging.internal.SessionInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Decorator that adds basic housekeeping tasks to a Sender. - * This class adds, - * 1. State management. - * 2. Exception handling. - * - */ -public class SenderManagementDecorator implements SenderInternal -{ - private static Logger _logger = LoggerFactory.getLogger(SenderManagementDecorator.class); - - public enum SenderState {OPENED, CLOSED, ERROR}; - - private Sender _delegate; - private SenderState _state = SenderState.OPENED; - private SessionInternal _ssn; - private final Object _connectionLock; // global per connection lock - - public SenderManagementDecorator(SessionInternal ssn, Sender delegate) - { - _ssn = ssn; - _delegate = delegate; - _connectionLock = ssn.getConnectionInternal().getConnectionLock(); - } - - @Override - public void send(Message message, boolean sync) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.send(message, sync); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void close() throws MessagingException - { - checkClosedAndThrowException("Sender is already closed"); - synchronized (_connectionLock) - { - _state = SenderState.CLOSED; - _delegate.close(); - _ssn.unregisterSender(this); - } - } - - @Override - public void setCapacity(int capacity) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.setCapacity(capacity); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getCapacity() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getCapacity(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getAvailable() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getAvailable(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getUnsettled() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getUnsettled(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public boolean isClosed() throws MessagingException - { - return _state == SenderState.CLOSED; - } - - @Override - public String getName() throws MessagingException - { - checkClosedAndThrowException(); - return _delegate.getName(); - } - - @Override - public Session getSession() throws MessagingException - { - checkClosedAndThrowException(); - _ssn.checkError(); - return _ssn; - } - - @Override - public void recreate() throws MessagingException - { - // TODO Auto-generated method stub - - } - - private void checkClosedAndThrowException() throws ReceiverException - { - checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver"); - } - - private void checkClosedAndThrowException(String closedMessage) throws ReceiverException - { - switch (_state) - { - case ERROR: - throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this"); - case CLOSED: - throw new ReceiverException(closedMessage); - } - } - - /** - * A ConnectionException will cause the Session/Sender to go into a temporary error state, - * which prevents it from being used further. - * From there the Session and Sender can be moved into OPENED (if failover works) or - * CLOSED if there is no failover or if failover has failed. - * @param e - * @throws MessagingException - */ - private SenderException handleConnectionException(ConnectionException e) - { - synchronized (_connectionLock) - { - _state = SenderState.ERROR; - _ssn.exception(e); // This might trigger failover in a layer above. - if (_state == SenderState.CLOSED) - { - // The connection has instructed the session and it's child objects to be closed. - // Either there was no failover, or failover has failed. - return new SenderException("Sender is closed due to connection error",e); - } - else - { - // Asking the application or the Parent handler to retry the operation. - // The Session and Receiver should be in OPENED state at this time. - return new SenderException("Sender was in a temporary error state due to connection error." + - "Plase retry your operation",e); - } - } - } - - /** - * Session Exceptions will generally invalidate the Session. - * TODO this needs to be revisited again. - * A new session will need to be created in that case. - * @param e - * @throws MessagingException - */ - private SenderException handleSessionException(SessionException e) - { - synchronized (_connectionLock) - { - // This should close all senders (including this) and receivers. - _ssn.exception(e); - } - return new SenderException("Session has been closed",e); - } -} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java deleted file mode 100644 index 1422be7207..0000000000 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java +++ /dev/null @@ -1,549 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.messaging.util; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.qpid.messaging.Address; -import org.apache.qpid.messaging.Connection; -import org.apache.qpid.messaging.ConnectionException; -import org.apache.qpid.messaging.Message; -import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.Receiver; -import org.apache.qpid.messaging.Sender; -import org.apache.qpid.messaging.Session; -import org.apache.qpid.messaging.SessionException; -import org.apache.qpid.messaging.internal.ConnectionInternal; -import org.apache.qpid.messaging.internal.ReceiverInternal; -import org.apache.qpid.messaging.internal.SenderInternal; -import org.apache.qpid.messaging.internal.SessionInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Decorator that adds basic housekeeping tasks to a session. - * This class adds, - * 1. Management of receivers and senders created by this session. - * 2. State management. - * 3. Exception handling. - * - * Exception Handling - * This class will wrap each method call to it's delegate to handle error situations. - * First it will check if the session is already CLOSED or in an ERROR situation. - * Then it will look for connection and session errors and handle as follows. - * - * Connection Exceptions - * This class intercepts ConnectionException's and are passed onto the connection. - * The Session will be marked as ERROR and a session exception will be thrown with an appropriate message. - * Any further use of the session is prevented until it moves to OPENED. - * - * If failover is handled at a layer above, there will be a Session Decorator that - * would handle the session exception and retry when the connection is available. - * This handler may block the call until the state moves into either OPENED or CLOSED. - * Ex @see SessionFailoverDecorator. - * - * If failover is handled at a layer below, then a connection exception means it has failed already. - * Therefore when passed to the connection,the exception will be thrown directly to the application. - * The connection object will be responsible for calling close on this session for the above case. - * - * Close() can be called by, - *
    - *
  1. The application (normal close)
  2. - *
  3. By the parent via failover (error)
  4. - *
  5. By the connection object, if no failover(error)
  6. - *
  7. By itself if it receives and exception (error)
  8. - *
- *
- * - * Session Exceptions - * For the time being, anytime a session exception is received, the session will be marked CLOSED. - * We need to revisit this. - */ -public class SessionManagementDecorator implements SessionInternal -{ - private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class); - - public enum SessionState {OPENED, CLOSED, ERROR} - - private ConnectionInternal _conn; - private Session _delegate; - private String _name; - SessionState _state = SessionState.OPENED; - private List _receivers = new ArrayList(); - private List _senders = new ArrayList(); - private final Object _connectionLock; // global per connection lock - - public SessionManagementDecorator(ConnectionInternal conn, String name, Session delegate) - { - _conn = conn; - _delegate = delegate; - _name = name; - _connectionLock = conn.getConnectionLock(); - } - - @Override - public boolean isClosed() - { - return _state == SessionState.CLOSED; - } - - @Override - public void close() throws MessagingException - { - checkClosedAndThrowException("Session is already closed"); - synchronized(_connectionLock) - { - _state = SessionState.CLOSED; - for (Sender sender: _senders) - { - try - { - sender.close(); - } - catch (Exception e) - { - _logger.warn("Error closing sender", e); - } - } - _senders.clear(); - - for (Receiver receiver: _receivers) - { - try - { - receiver.close(); - } - catch (Exception e) - { - _logger.warn("Error closing receiver", e); - } - } - _receivers.clear(); - _delegate.close(); - _conn.unregisterSession(this); - } - } - - @Override - public void commit() throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.commit(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void rollback() throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.rollback(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void acknowledge(boolean sync) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.acknowledge(sync); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void acknowledge(Message message, boolean sync) - throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.acknowledge(message, sync); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void reject(Message message) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.reject(message); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void release(Message message) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.release(message); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void sync(boolean block) throws MessagingException - { - checkClosedAndThrowException(); - try - { - _delegate.sync(block); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getReceivable() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getReceivable(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public int getUnsettledAcks() throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.getUnsettledAcks(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Receiver nextReceiver(long timeout) throws MessagingException - { - checkClosedAndThrowException(); - try - { - return _delegate.nextReceiver(timeout); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Sender createSender(Address address) throws MessagingException - { - checkClosedAndThrowException(); - try - { - SenderInternal sender = new SenderManagementDecorator(this,_delegate.createSender(address)); - _senders.add(sender); - return sender; - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Sender createSender(String address) throws MessagingException - { - checkClosedAndThrowException(); - try - { - SenderInternal sender = new SenderManagementDecorator(this,_delegate.createSender(address)); - _senders.add(sender); - return sender; - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Receiver createReceiver(Address address) throws MessagingException - { - checkClosedAndThrowException(); - try - { - ReceiverInternal receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address)); - _receivers.add(receiver); - return receiver; - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Receiver createReceiver(String address) throws MessagingException - { - checkClosedAndThrowException(); - try - { - ReceiverInternal receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address)); - _receivers.add(receiver); - return receiver; - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public Connection getConnection() throws MessagingException - { - checkError(); - return _conn; // always return your peer (not your delegate's peer) - } - - @Override - public boolean hasError() - { - return _delegate.hasError(); - } - - @Override - public void checkError() throws MessagingException - { - checkClosedAndThrowException(); // check if we already have the info. - try - { - // Asking the delegate. - _delegate.checkError(); - } - catch (ConnectionException e) - { - throw handleConnectionException(e); - } - catch (SessionException e) - { - throw handleSessionException(e); - } - } - - @Override - public void exception(MessagingException e) - { - if (e instanceof ConnectionException) - { - handleConnectionException((ConnectionException)e); - } - else if (e instanceof SessionException) - { - handleSessionException((SessionException)e); - } - } - - @Override - public void recreate() throws MessagingException - { - // TODO Auto-generated method stub - - } - - @Override - public ConnectionInternal getConnectionInternal() - { - return _conn; - } - - @Override - public String getName() - { - return _name; - } - - @Override - public void unregisterReceiver(ReceiverInternal receiver) - { - _receivers.remove(receiver); - } - - @Override - public void unregisterSender(SenderInternal sender) - { - _senders.remove(sender); - } - - private void checkClosedAndThrowException() throws SessionException - { - checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion"); - } - - private void checkClosedAndThrowException(String closedMessage) throws SessionException - { - switch (_state) - { - case ERROR: - throw new SessionException("Session is in a temporary error state. The session may or may not recover from this"); - case CLOSED: - throw new SessionException(closedMessage); - } - } - - /** - * A ConnectionException will cause the Session to go into a temporary error state, - * which prevents it from being used further. - * From there the Session can be moved into OPENED (if failover works) or - * CLOSED if there is no failover or if failover has failed. - * @param e - * @throws MessagingException - */ - private SessionException handleConnectionException(ConnectionException e) - { - synchronized (_connectionLock) - { - _state = SessionState.ERROR; - _conn.exception(e); // This might trigger failover in a layer above. - if (_state == SessionState.CLOSED) - { - // The connection has instructed the session to be closed. - // Either there was no failover, or failover has failed. - return new SessionException("Session is closed due to connection error",e); - } - else - { - // Asking the application or the Parent handler to retry the operation. - // The Session should be in OPENED state at this time. - return new SessionException("Session was in a temporary error state due to connection error." + - "Plase retry your operation",e); - } - } - } - - /** - * Session Exceptions will generally invalidate the Session. - * TODO this needs to be revisited again. - * A new session will need to be created in that case. - * @param e - * @throws MessagingException - */ - private SessionException handleSessionException(SessionException e) - { - synchronized (_connectionLock) - { - try - { - close(); - } - catch(MessagingException ex) - { - // Should not throw an exception here. - // Even if it did, does't matter as are closing. - } - } - return new SessionException("Session has been closed",e); - } -} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java new file mode 100644 index 0000000000..4f7b09b5c2 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java @@ -0,0 +1,357 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util.failover; + +import java.util.Map; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.MessageFactory; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.TransportFailureException; +import org.apache.qpid.messaging.internal.ConnectionEvent; +import org.apache.qpid.messaging.internal.ConnectionEvent.EventType; +import org.apache.qpid.messaging.internal.ConnectionEventListener; +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.ConnectionString; +import org.apache.qpid.messaging.internal.FailoverStrategy; +import org.apache.qpid.messaging.internal.FailoverStrategyFactory; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.apache.qpid.messaging.util.AbstractConnectionDecorator; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * Closing after unsuccessful failover is not done yet! + * + */ +public class ConnectionFailoverDecorator extends AbstractConnectionDecorator +{ + private static Logger _logger = LoggerFactory.getLogger(ConnectionFailoverDecorator.class); + + public enum ConnectionState + { + UNDEFINED, + OPENED, + CONNECTION_LOST, + FAILOVER_IN_PROGRESS, + CLOSED + }; + + private ConnectionState _state = ConnectionState.UNDEFINED; + private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); + private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); + private ConnectionException _lastException; + private FailoverStrategy _failoverStrategy; + private long _serialNumber = 0; + + public ConnectionFailoverDecorator(ConnectionInternal delegate, Object lock) + { + super(delegate,lock); + _failoverStrategy = FailoverStrategyFactory.get().getFailoverStrategy(delegate); + } + + @Override + public void open() throws MessagingException + { + synchronized (_connectionLock) + { + super.open(); + _serialNumber = getSerialNumber(); + _state = ConnectionState.OPENED; + notifyEvent(new ConnectionEvent(this,EventType.OPENED,this)); + } + } + + @Override + public boolean isOpen() throws MessagingException + { + // If the state is opened, query the delegate and see. + if (_state == ConnectionState.OPENED) + { + return super.isOpen(); + } + else + { + return false; + } + } + + @Override + public void reconnect(String url,Map options) throws TransportFailureException + { + synchronized (_connectionLock) + { + super.reconnect(url,options); + _serialNumber = getSerialNumber(); + _state = ConnectionState.OPENED; + notifyEvent(new ConnectionEvent(this,EventType.RECONNCTED,this)); + } + } + + @Override + public void close() throws MessagingException + { + synchronized(_connectionLock) + { + if (_state == ConnectionState.CLOSED) + { + throw new MessagingException("Connection is already closed"); + } + super.close(); + _state = ConnectionState.CLOSED; + } + } + + @Override + public Session createSession(String name) throws MessagingException + { + checkPreConditions(); + long serialNumber = _serialNumber; // take a snapshot + try + { + if (name == null || name.isEmpty()) { name = generateSessionName(); } + SessionInternal ssn = new SessionFailoverDecorator(this, + (SessionInternal) _delegate.createSession(name)); + _sessions.put(name, ssn); + return ssn; + } + catch(TransportFailureException e) + { + failover(e,serialNumber); + return createSession(name); + } + } + + @Override + public Session createTransactionalSession(String name) throws MessagingException + { + checkPreConditions(); + long serialNumber = _serialNumber; // take a snapshot + try + { + if (name == null || name.isEmpty()) { name = generateSessionName(); } + SessionInternal ssn = new SessionFailoverDecorator(this, + (SessionInternal)_delegate.createTransactionalSession(name)); + _sessions.put(name, ssn); + return ssn; + } + catch(TransportFailureException e) + { + failover(e,serialNumber); + return createTransactionalSession(name); + } + } + + @Override + public String getAuthenticatedUsername() throws MessagingException + { + checkPreConditions(); + long serialNumber = _serialNumber; // take a snapshot + try + { + return _delegate.getAuthenticatedUsername(); + } + catch(TransportFailureException e) + { + failover(e,serialNumber); + return getAuthenticatedUsername(); + } + } + + @Override + public MessageFactory getMessageFactory() throws MessagingException + { + checkPreConditions(); + long serialNumber = _serialNumber; // take a snapshot + try + { + return _delegate.getMessageFactory(); + } + catch(TransportFailureException e) + { + failover(e,serialNumber); + return getMessageFactory(); + } + } + + @Override + public void exception(TransportFailureException e, long serialNumber) + { + try + { + failover(e,serialNumber); + } + catch(ConnectionException ex) + { + //ignore. + //failover() handles notifications + } + } + + @Override + public void recreate() throws MessagingException + { + synchronized(_connectionLock) + { + for (SessionInternal ssn: _sessions.values()) + { + ssn.recreate(); + } + } + } + + protected void checkPreConditions() throws ConnectionException + { + switch (_state) + { + case CLOSED: + throw new ConnectionException("Connection is closed. You cannot invoke methods on a closed connection"); + case UNDEFINED: + throw new ConnectionException("Connection should be opened before it can be used"); + case CONNECTION_LOST: + case FAILOVER_IN_PROGRESS: + waitForFailoverToComplete(); + } + } + + protected void failover(TransportFailureException e,long serialNumber) throws ConnectionException + { + synchronized(_connectionLock) + { + if (_serialNumber > serialNumber) + { + return; // Ignore, We have a working connection now. + } + + _logger.warn("Connection lost!"); + _state = ConnectionState.CONNECTION_LOST; + notifyEvent(new ConnectionEvent(this,EventType.CONNECTION_LOST,this)); + + if (_failoverStrategy.failoverAllowed()) + { + // Failover is allowed at least once. + _state = ConnectionState.FAILOVER_IN_PROGRESS; + notifyEvent(new ConnectionEvent(this,EventType.PRE_FAILOVER,this)); + + + + StringBuffer errorMsg = new StringBuffer(); + while (_failoverStrategy.failoverAllowed()) + { + try + { + ConnectionString conString = _failoverStrategy.getNextConnectionString(); + notifyEvent(new ConnectionEvent(this,EventType.RECONNCTION_ATTEMPTED,this)); + _logger.warn("Attempting connection to " + conString.getUrl()); + reconnect(conString.getUrl(), conString.getOptions()); + try + { + recreate(); + _state = ConnectionState.OPENED; + _lastException = null; + } + catch (MessagingException ex) + { + _lastException = new ConnectionException( + "Recreating the state for the connection and it's children failed", + ex); + } + break; + } + catch (TransportFailureException te) + { + errorMsg.append("\nUnable to connect to : " + + _failoverStrategy.getCurrentConnectionString().getUrl() + + " due to : " + + te.getMessage()); + notifyEvent(new ConnectionEvent(this,EventType.RECONNCTION_FAILED,this)); + } + } + + if (_state != ConnectionState.OPENED) + { + closeInternal(); + _lastException = new ConnectionException("Failover was unsuccessful." + errorMsg.toString()); + _logger.warn("Faiolver was unsuccesful" + errorMsg.toString()); + } + notifyEvent(new ConnectionEvent(this,EventType.POST_FAILOVER,this)); + } + else + { + closeInternal(); + _state = ConnectionState.CLOSED; + _lastException = new ConnectionException("Connection Failed!",e); + _logger.warn("Connection Failed!", e); + } + + _connectionLock.notifyAll(); + + if (_lastException != null) + { + for (ConnectionEventListener l: _stateListeners) + { + l.exception(_lastException); + } + throw _lastException; + } + } + } + + protected void waitForFailoverToComplete() throws ConnectionException + { + synchronized (_connectionLock) + { + try + { + _connectionLock.wait(_failoverTimeout); + } + catch (InterruptedException e) + { + //ignore. + } + if (_state == ConnectionState.CLOSED) + { + throw new ConnectionException("Connection is closed. Failover was unsuccesfull",_lastException); + } + } + } + + private String generateSessionName() + { + // TODO add local IP and pid to the beginning; + return _ssnNameGenerator.generate().toString(); + } + + // Suppresses the exceptions + private void closeInternal() + { + try + { + close(); + } + catch (Exception e) + { + //ignore + } + } + +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java new file mode 100644 index 0000000000..fb6bf9e1b6 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java @@ -0,0 +1,128 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util.failover; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.ConnectionString; +import org.apache.qpid.messaging.internal.FailoverStrategy; + +public class DefaultFailoverStrategy implements FailoverStrategy +{ + ConnectionString[] _reconnectURLs; + + /** seconds (give up and report failure after specified time) */ + private long _reconnectTimeout = 1000; + + /** n (give up and report failure after specified number of attempts) */ + private int _reconnectLimit = 1; + + /** seconds (initial delay between failed reconnection attempts) */ + private long _reconnectIntervalMin = 1000; + + /** seconds (maximum delay between failed reconnection attempts) */ + private long _reconnectIntervalMax = 1000; + + private ConnectionString _currentUrl; + + /** reconnection attemps */ + private int _attempts = 0; + + /** Index for retrieving a URL from _reconnectURLs */ + private int _index = 0; + + /* quick and dirty impl for experimentation */ + public DefaultFailoverStrategy(String url, Map map) + { + map = map == null ? Collections.EMPTY_MAP : map; + _currentUrl = new ConnectionStringImpl(url,map); + // read the map and fill in the above fields. + if (map.containsKey("reconnect_urls")) + { + String[] urls = ((String)map.get("reconnect_urls")).split(","); + _reconnectURLs = new ConnectionString[urls.length]; + for(int i=0; i _options; + + public ConnectionStringImpl(String url, Map options) + { + _url = url; + _options = options; + } + + public String getUrl() + { + return _url; + } + + public Map getOptions() + { + return _options; + } + + } +} \ No newline at end of file diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java new file mode 100644 index 0000000000..b3e2aa02c9 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java @@ -0,0 +1,15 @@ +package org.apache.qpid.messaging.util.failover; + +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.FailoverStrategy; +import org.apache.qpid.messaging.internal.FailoverStrategyFactory; + +public class DefaultFailoverStrategyFactory extends FailoverStrategyFactory +{ + + @Override + public FailoverStrategy getFailoverStrategy(ConnectionInternal con) + { + return new DefaultFailoverStrategy(con.getConnectionURL(), con.getConnectionOptions()); + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java new file mode 100644 index 0000000000..5162add0ad --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java @@ -0,0 +1,312 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util.failover; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.ReceiverException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.TransportFailureException; +import org.apache.qpid.messaging.internal.ConnectionEvent; +import org.apache.qpid.messaging.internal.ConnectionEventListener; +import org.apache.qpid.messaging.internal.ReceiverInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.apache.qpid.messaging.util.AbstractReceiverDecorator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a Receiver. + * This class adds, + * 1. State management. + * 2. Exception handling. + * + */ +public class ReceiverFailoverDecorator extends AbstractReceiverDecorator implements ConnectionEventListener +{ + private static Logger _logger = LoggerFactory.getLogger(ReceiverFailoverDecorator.class); + + public enum ReceiverState {OPENED, CLOSED, FAILOVER_IN_PROGRESS}; + + private ReceiverState _state = ReceiverState.OPENED; + private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); + private ReceiverException _lastException; + private long _connSerialNumber = 0; + + public ReceiverFailoverDecorator(SessionInternal ssn, ReceiverInternal delegate) + { + super(ssn,delegate); + synchronized(_connectionLock) + { + _connSerialNumber = ssn.getConnectionInternal().getSerialNumber(); + } + } + + @Override + public Message get(long timeout) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.get(timeout); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return get(timeout); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Message fetch(long timeout) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.fetch(timeout); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return fetch(timeout); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void setCapacity(int capacity) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.setCapacity(capacity); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + setCapacity(capacity); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getCapacity() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getCapacity(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getCapacity(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getAvailable() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getAvailable(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getAvailable(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettled() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getUnsettled(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getUnsettled(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void close() throws MessagingException + { + synchronized (_connectionLock) + { + if (_state == ReceiverState.CLOSED) + { + throw new MessagingException("Receiver is already closed"); + } + _state = ReceiverState.CLOSED; + super.close(); + } + } + + @Override + public boolean isClosed() + { + return _state == ReceiverState.CLOSED; + } + + @Override + public Session getSession() throws MessagingException + { + checkPreConditions(); + _ssn.checkError(); + return _ssn; + } + + @Override + public void recreate() throws MessagingException + { + synchronized(_connectionLock) + { + _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); + _delegate.recreate(); + } + } + + @Override + public void eventOccured(ConnectionEvent event) + { + synchronized (_connectionLock) + { + switch(event.getType()) + { + case PRE_FAILOVER: + case CONNECTION_LOST: + _state = ReceiverState.FAILOVER_IN_PROGRESS; + break; + case RECONNCTED: + _state = ReceiverState.OPENED; + break; + case POST_FAILOVER: + try + { + if (_state != ReceiverState.OPENED) + { + close(); + } + } + catch (MessagingException e) + { + _logger.warn("Exception when trying to close the receiver", e); + } + _connectionLock.notifyAll(); + break; + default: + break; //ignore the rest + } + } + } + + @Override // From ConnectionEventListener + public void exception(ConnectionException e) + {// NOOP + } + + protected void checkPreConditions() throws ReceiverException + { + switch (_state) + { + case CLOSED: + throw new ReceiverException("Receiver is closed. You cannot invoke methods on a closed Receiver",_lastException); + case FAILOVER_IN_PROGRESS: + waitForFailoverToComplete(); + } + } + + protected void waitForFailoverToComplete() throws ReceiverException + { + synchronized (_connectionLock) + { + try + { + _connectionLock.wait(_failoverTimeout); + } + catch (InterruptedException e) + { + //ignore. + } + if (_state == ReceiverState.CLOSED) + { + throw new ReceiverException("Receiver is closed. Failover was unsuccesfull",_lastException); + } + } + } + + protected ReceiverException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + // This should close all receivers (including this) and senders. + _ssn.exception(e); + } + return new ReceiverException("Session has been closed",e); + } + + protected void failover(TransportFailureException e, long serialNumber) throws ReceiverException + { + synchronized (_connectionLock) + { + if (_connSerialNumber > serialNumber) + { + return; // ignore, we already have failed over. + } + _state = ReceiverState.FAILOVER_IN_PROGRESS; + _ssn.exception(e, serialNumber); // This triggers failover. + waitForFailoverToComplete(); + } + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java new file mode 100644 index 0000000000..8df574a74d --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java @@ -0,0 +1,291 @@ +package org.apache.qpid.messaging.util.failover; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.ReceiverException; +import org.apache.qpid.messaging.SenderException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.TransportFailureException; +import org.apache.qpid.messaging.internal.ConnectionEvent; +import org.apache.qpid.messaging.internal.ConnectionEventListener; +import org.apache.qpid.messaging.internal.SenderInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.apache.qpid.messaging.util.AbstractSenderDecorator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a Sender. + * This class adds, + * 1. State management. + * 2. Exception handling. + * 3. Failover + * + */ +public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener +{ + private static Logger _logger = LoggerFactory.getLogger(SenderFailoverDecorator.class); + + public enum SenderState {OPENED, CLOSED, FAILOVER_IN_PROGRESS}; + + private SenderState _state = SenderState.OPENED; + private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); + private ReceiverException _lastException; + private long _connSerialNumber = 0; + + public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate) + { + super(ssn,delegate); + synchronized(_connectionLock) + { + _connSerialNumber = ssn.getConnectionInternal().getSerialNumber(); + } + } + + @Override + public void send(Message message, boolean sync) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.send(message, sync); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + send(message, sync); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void close() throws MessagingException + { + synchronized (_connectionLock) + { + if (_state == SenderState.CLOSED) + { + throw new MessagingException("Sender is already closed"); + } + _state = SenderState.CLOSED; + super.close(); + } + } + + @Override + public void setCapacity(int capacity) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.setCapacity(capacity); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + setCapacity(capacity); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getCapacity() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getCapacity(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getCapacity(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getAvailable() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getAvailable(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getAvailable(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettled() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getUnsettled(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getUnsettled(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public boolean isClosed() throws MessagingException + { + return _state == SenderState.CLOSED; + } + + @Override + public String getName() throws MessagingException + { + checkPreConditions(); + return getName(); + } + + @Override + public Session getSession() throws MessagingException + { + checkPreConditions(); + _ssn.checkError(); + return _ssn; + } + + @Override + public void recreate() throws MessagingException + { + synchronized(_connectionLock) + { + _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); + _delegate.recreate(); + } + } + + @Override + public void eventOccured(ConnectionEvent event) + { + synchronized (_connectionLock) + { + switch(event.getType()) + { + case PRE_FAILOVER: + case CONNECTION_LOST: + _state = SenderState.FAILOVER_IN_PROGRESS; + break; + case RECONNCTED: + _state = SenderState.OPENED; + break; + case POST_FAILOVER: + try + { + if (_state != SenderState.OPENED) + { + close(); + } + } + catch (MessagingException e) + { + _logger.warn("Exception when trying to close the receiver", e); + } + _connectionLock.notifyAll(); + break; + default: + break; //ignore the rest + } + } + } + + @Override // From ConnectionEventListener + public void exception(ConnectionException e) + {// NOOP + } + + protected void waitForFailoverToComplete() throws SenderException + { + synchronized (_connectionLock) + { + try + { + _connectionLock.wait(_failoverTimeout); + } + catch (InterruptedException e) + { + //ignore. + } + if (_state == SenderState.CLOSED) + { + throw new SenderException("Receiver is closed. Failover was unsuccesfull",_lastException); + } + } + } + + protected void failover(TransportFailureException e, long serialNumber) throws SenderException + { + synchronized (_connectionLock) + { + if (_connSerialNumber > serialNumber) + { + return; // ignore, we already have failed over. + } + _state = SenderState.FAILOVER_IN_PROGRESS; + _ssn.exception(e, serialNumber); // This triggers failover. + waitForFailoverToComplete(); + } + } + + protected void checkPreConditions() throws SenderException + { + switch (_state) + { + case CLOSED: + throw new SenderException("Sender is closed. You cannot invoke methods on a closed sender",_lastException); + case FAILOVER_IN_PROGRESS: + waitForFailoverToComplete(); + } + } + + /** + * Session Exceptions will generally invalidate the Session. + * TODO this needs to be revisited again. + * A new session will need to be created in that case. + * @param e + * @throws MessagingException + */ + protected SenderException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + // This should close all senders (including this) and receivers. + _ssn.exception(e); + } + return new SenderException("Session has been closed",e); + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java new file mode 100644 index 0000000000..5f2bafa1e6 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java @@ -0,0 +1,559 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util.failover; + +import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.Sender; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.TransportFailureException; +import org.apache.qpid.messaging.internal.ConnectionEvent; +import org.apache.qpid.messaging.internal.ConnectionEventListener; +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.ReceiverInternal; +import org.apache.qpid.messaging.internal.SenderInternal; +import org.apache.qpid.messaging.internal.SessionInternal; +import org.apache.qpid.messaging.util.AbstractSessionDecorator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SessionFailoverDecorator extends AbstractSessionDecorator implements ConnectionEventListener +{ + private static Logger _logger = LoggerFactory.getLogger(SessionFailoverDecorator.class); + + public enum SessionState {OPENED, CLOSED, FAILOVER_IN_PROGRESS} + + private SessionState _state = SessionState.OPENED; + private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); + private SessionException _lastException; + private long _connSerialNumber = 0; + + public SessionFailoverDecorator(ConnectionInternal conn, SessionInternal delegate) + { + super(conn,delegate); + synchronized(_connectionLock) + { + _connSerialNumber = conn.getSerialNumber(); + } + } + + @Override + public void close() throws MessagingException + { + synchronized(_connectionLock) + { + if (_state == SessionState.CLOSED) + { + throw new MessagingException("Session is already closed"); + } + _state = SessionState.CLOSED; + super.close(); + } + } + + @Override + public void commit() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.commit(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + commit(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void rollback() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.rollback(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + rollback(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void acknowledge(boolean sync) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.acknowledge(sync); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + acknowledge(sync); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void acknowledge(Message message, boolean sync) + throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.acknowledge(message,sync); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + acknowledge(message,sync); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void reject(Message message) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.reject(message); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + reject(message); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void release(Message message) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.release(message); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + release(message); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void sync(boolean block) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + _delegate.sync(block); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + sync(block); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getReceivable() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getReceivable(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getReceivable(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettledAcks() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.getUnsettledAcks(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return getUnsettledAcks(); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver nextReceiver(long timeout) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + return _delegate.nextReceiver(timeout); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return nextReceiver(timeout); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Sender createSender(Address address) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + SenderInternal sender = new SenderFailoverDecorator(this, + (SenderInternal) _delegate.createSender(address)); + synchronized (_connectionLock) + { + _senders.add(sender); + } + return sender; + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return createSender(address); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Sender createSender(String address) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + SenderInternal sender = new SenderFailoverDecorator(this, + (SenderInternal) _delegate.createSender(address)); + synchronized (_connectionLock) + { + _senders.add(sender); + } + return sender; + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return createSender(address); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver createReceiver(Address address) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + ReceiverInternal receiver = new ReceiverFailoverDecorator(this, + (ReceiverInternal) _delegate.createReceiver(address)); + synchronized (_connectionLock) + { + _receivers.add(receiver); + } + return receiver; + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return createReceiver(address); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Receiver createReceiver(String address) throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot + try + { + ReceiverInternal receiver = new ReceiverFailoverDecorator(this, + (ReceiverInternal) _delegate.createReceiver(address)); + synchronized (_connectionLock) + { + _receivers.add(receiver); + } + return receiver; + } + catch (TransportFailureException e) + { + failover(e,serialNumber); + return createReceiver(address); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void checkError() throws MessagingException + { + checkPreConditions(); + long serialNumber = _connSerialNumber; // take a snapshot // check if we already have the info. + try + { + // Asking the delegate. + _delegate.checkError(); + } + catch (TransportFailureException e) + { + failover(e,serialNumber); // will throw an exception + return; + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public boolean isClosed() + { + if (_state == SessionState.OPENED) + { + return super.isClosed(); // ask the delegate to be sure. + } + else + { + return true; + } + } + + @Override // From SessionInternal + public void exception(TransportFailureException e, long serialNumber) + { + try + { + failover((TransportFailureException)e, serialNumber); + } + catch (SessionException ex) + { + _lastException = ex; + } + } + + public void exception(SessionException e) + { + handleSessionException(e); + } + + @Override + public void recreate() throws MessagingException + { + synchronized (_connectionLock) + { + _connSerialNumber = _conn.getSerialNumber(); + _delegate.recreate(); + for (ReceiverInternal rec : _receivers) + { + rec.recreate(); + } + for (SenderInternal sender : _senders) + { + sender.recreate(); + } + } + } + + @Override + public ConnectionInternal getConnectionInternal() + { + return _conn; + } + + @Override //From ConnectionEventListener + public void exception(ConnectionException e) + { + // NOOP + } + + @Override //From ConnectionEventListener + public void eventOccured(ConnectionEvent event) + { + synchronized (_connectionLock) + { + switch(event.getType()) + { + case PRE_FAILOVER: + case CONNECTION_LOST: + _state = SessionState.FAILOVER_IN_PROGRESS; + break; + case RECONNCTED: + _state = SessionState.OPENED; + break; + case POST_FAILOVER: + try + { + if (_state != SessionState.OPENED) + { + close(); + } + } + catch (MessagingException e) + { + _logger.warn("Exception when trying to close the session", e); + } + _connectionLock.notifyAll(); + break; + default: + break; //ignore the rest + } + } + } + + protected void failover(TransportFailureException e, long serialNumber) throws SessionException + { + synchronized (_connectionLock) + { + if (_connSerialNumber > serialNumber) + { + return; // ignore, we already have failed over. + } + _state = SessionState.FAILOVER_IN_PROGRESS; + _conn.exception(e, serialNumber); // This triggers failover. + waitForFailoverToComplete(); + } + } + + protected void checkPreConditions() throws SessionException + { + switch (_state) + { + case CLOSED: + throw new SessionException("Session is closed. You cannot invoke methods on a closed session",_lastException); + case FAILOVER_IN_PROGRESS: + waitForFailoverToComplete(); + } + } + + /** + * Session Exceptions will generally invalidate the Session. + * TODO this needs to be revisited again. + * A new session will need to be created in that case. + * @param e + * @throws MessagingException + */ + protected SessionException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + try + { + close(); + } + catch(MessagingException ex) + { + _logger.warn("Error when closing session : " + getName(), ex); + } + } + return new SessionException("Session has been closed",e); + } + + protected void waitForFailoverToComplete() throws SessionException + { + synchronized (_connectionLock) + { + try + { + _connectionLock.wait(_failoverTimeout); + } + catch (InterruptedException e) + { + //ignore. + } + if (_state == SessionState.CLOSED) + { + throw new SessionException("Session is closed. Failover was unsuccesfull",_lastException); + } + } + } +} -- cgit v1.2.1