diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-15 17:20:48 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-15 17:20:48 +0000 |
commit | 95906fe274ca7a1b407680a23858483d5253791b (patch) | |
tree | 7d02fc67665d1b7fda60decbce85f6a8f67c1b76 | |
parent | 259968ae10e95dc315f3e08506c5dae27e7099f4 (diff) | |
download | qpid-python-95906fe274ca7a1b407680a23858483d5253791b.tar.gz |
QPID-4027 Added extension interfaces for Session, Sender and Receiver.
Added convinience class that converts java long timeout to the C++
Duration object.
Added ReceiverManagementDecorator and SenderManagementDecorator that
provides state management and error handling via the decorator pattern
for a Receiver and a Sender respectively.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350702 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 631 insertions, 0 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java new file mode 100644 index 0000000000..59251dc597 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java @@ -0,0 +1,48 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.cpp; + +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.cpp.jni.Duration; + +public class CppDuration +{ + public static Duration getDuration(long duration) + { + if (Receiver.FOREVER == duration) + { + return Duration.getFOREVER(); + } + else if (Receiver.IMMEDIATE == duration) + { + return Duration.getIMMEDIATE(); + } + else if (Receiver.SECOND == duration) + { + return Duration.getSECOND(); + } + else if (Receiver.MINUTE == duration) + { + return Duration.getMINUTE(); + } + else + { + return new Duration(duration); + } + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java new file mode 100644 index 0000000000..2ff574f07d --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java @@ -0,0 +1,26 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; + +public interface ReceiverExt extends Receiver +{ + public void recreate() throws MessagingException; +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java new file mode 100644 index 0000000000..c05fbc52c0 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java @@ -0,0 +1,26 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Sender; + +public interface SenderExt extends Sender +{ + public void recreate() throws MessagingException; +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java new file mode 100644 index 0000000000..626ef5770b --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java @@ -0,0 +1,30 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.ext; + +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Session; + +public interface SessionExt extends Session +{ + public ConnectionExt getConnectionExt(); + + public void exception(MessagingException e); + + public void recreate() throws MessagingException; +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java new file mode 100644 index 0000000000..ef7a0ca354 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java @@ -0,0 +1,267 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.ReceiverException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.ext.ReceiverExt; +import org.apache.qpid.messaging.ext.SessionExt; +import org.apache.qpid.messaging.util.SessionManagementDecorator.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a Receiver. + * This class adds, + * 1. State management. + * 2. Exception handling. + * + */ +public class ReceiverManagementDecorator implements ReceiverExt +{ + private static Logger _logger = LoggerFactory.getLogger(ReceiverManagementDecorator.class); + + public enum ReceiverState {OPENED, CLOSED, ERROR}; + + private Receiver _delegate; + private ReceiverState _state = ReceiverState.OPENED; + private SessionExt _ssn; + private final Object _connectionLock; // global per connection lock + + public ReceiverManagementDecorator(SessionExt ssn, Receiver delegate) + { + _ssn = ssn; + _delegate = delegate; + _connectionLock = ssn.getConnectionExt().getConnectionLock(); + } + + @Override + public Message get(long timeout) throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.get(timeout); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public Message fetch(long timeout) throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.fetch(timeout); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void setCapacity(int capacity) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.setCapacity(capacity); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getCapacity() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getCapacity(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getAvailable() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getAvailable(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettled() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getUnsettled(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void close() throws MessagingException + { + checkClosedAndThrowException("Receiver is already closed"); + synchronized (_connectionLock) + { + _state = ReceiverState.CLOSED; + _delegate.close(); + } + } + + @Override + public boolean isClosed() + { + return _state == ReceiverState.CLOSED; + } + + @Override + public String getName() throws MessagingException + { + checkClosedAndThrowException(); + return _delegate.getName(); + } + + @Override + public Session getSession() throws MessagingException + { + checkClosedAndThrowException(); + _ssn.checkError(); + return _ssn; + } + + @Override + public void recreate() throws MessagingException + { + // TODO Auto-generated method stub + + } + + private void checkClosedAndThrowException() throws ReceiverException + { + checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver"); + } + + private void checkClosedAndThrowException(String closedMessage) throws ReceiverException + { + switch (_state) + { + case ERROR: + throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this"); + case CLOSED: + throw new ReceiverException(closedMessage); + } + } + + /** + * A ConnectionException will cause the Session/Receiver to go into a temporary error state, + * which prevents it from being used further. + * From there the Session and Receiver can be moved into OPENED (if failover works) or + * CLOSED if there is no failover or if failover has failed. + * @param e + * @throws MessagingException + */ + private ReceiverException handleConnectionException(ConnectionException e) + { + synchronized (_connectionLock) + { + _state = ReceiverState.ERROR; + _ssn.exception(e); // This might trigger failover in a layer above. + if (_state == ReceiverState.CLOSED) + { + // The connection has instructed the session and it's child objects to be closed. + // Either there was no failover, or failover has failed. + return new ReceiverException("Receiver is closed due to connection error",e); + } + else + { + // Asking the application or the Parent handler to retry the operation. + // The Session and Receiver should be in OPENED state at this time. + return new ReceiverException("Receiver was in a temporary error state due to connection error." + + "Plase retry your operation",e); + } + } + } + + /** + * Session Exceptions will generally invalidate the Session. + * TODO this needs to be revisited again. + * A new session will need to be created in that case. + * @param e + * @throws MessagingException + */ + private ReceiverException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + // This should close all receivers (including this) and senders. + _ssn.exception(e); + } + return new ReceiverException("Session has been closed",e); + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java new file mode 100644 index 0000000000..2c28116da1 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java @@ -0,0 +1,234 @@ +package org.apache.qpid.messaging.util; + +import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.Receiver; +import org.apache.qpid.messaging.ReceiverException; +import org.apache.qpid.messaging.Sender; +import org.apache.qpid.messaging.SenderException; +import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.ext.SenderExt; +import org.apache.qpid.messaging.ext.SessionExt; +import org.apache.qpid.messaging.util.ReceiverManagementDecorator.ReceiverState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Decorator that adds basic housekeeping tasks to a Sender. + * This class adds, + * 1. State management. + * 2. Exception handling. + * + */ +public class SenderManagementDecorator implements SenderExt +{ + private static Logger _logger = LoggerFactory.getLogger(SenderManagementDecorator.class); + + public enum SenderState {OPENED, CLOSED, ERROR}; + + private Sender _delegate; + private SenderState _state = SenderState.OPENED; + private SessionExt _ssn; + private final Object _connectionLock; // global per connection lock + + public SenderManagementDecorator(SessionExt ssn, Sender delegate) + { + _ssn = ssn; + _delegate = delegate; + _connectionLock = ssn.getConnectionExt().getConnectionLock(); + } + + @Override + public void send(Message message, boolean sync) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.send(message, sync); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public void close() throws MessagingException + { + checkClosedAndThrowException("Sender is already closed"); + synchronized (_connectionLock) + { + _state = SenderState.CLOSED; + _delegate.close(); + } + } + + @Override + public void setCapacity(int capacity) throws MessagingException + { + checkClosedAndThrowException(); + try + { + _delegate.setCapacity(capacity); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getCapacity() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getCapacity(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getAvailable() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getAvailable(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public int getUnsettled() throws MessagingException + { + checkClosedAndThrowException(); + try + { + return _delegate.getUnsettled(); + } + catch (ConnectionException e) + { + throw handleConnectionException(e); + } + catch (SessionException e) + { + throw handleSessionException(e); + } + } + + @Override + public boolean isClosed() throws MessagingException + { + return _state == SenderState.CLOSED; + } + + @Override + public String getName() throws MessagingException + { + checkClosedAndThrowException(); + return _delegate.getName(); + } + + @Override + public Session getSession() throws MessagingException + { + checkClosedAndThrowException(); + _ssn.checkError(); + return _ssn; + } + + @Override + public void recreate() throws MessagingException + { + // TODO Auto-generated method stub + + } + + private void checkClosedAndThrowException() throws ReceiverException + { + checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver"); + } + + private void checkClosedAndThrowException(String closedMessage) throws ReceiverException + { + switch (_state) + { + case ERROR: + throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this"); + case CLOSED: + throw new ReceiverException(closedMessage); + } + } + + /** + * A ConnectionException will cause the Session/Sender to go into a temporary error state, + * which prevents it from being used further. + * From there the Session and Sender can be moved into OPENED (if failover works) or + * CLOSED if there is no failover or if failover has failed. + * @param e + * @throws MessagingException + */ + private SenderException handleConnectionException(ConnectionException e) + { + synchronized (_connectionLock) + { + _state = SenderState.ERROR; + _ssn.exception(e); // This might trigger failover in a layer above. + if (_state == SenderState.CLOSED) + { + // The connection has instructed the session and it's child objects to be closed. + // Either there was no failover, or failover has failed. + return new SenderException("Sender is closed due to connection error",e); + } + else + { + // Asking the application or the Parent handler to retry the operation. + // The Session and Receiver should be in OPENED state at this time. + return new SenderException("Sender was in a temporary error state due to connection error." + + "Plase retry your operation",e); + } + } + } + + /** + * Session Exceptions will generally invalidate the Session. + * TODO this needs to be revisited again. + * A new session will need to be created in that case. + * @param e + * @throws MessagingException + */ + private SenderException handleSessionException(SessionException e) + { + synchronized (_connectionLock) + { + // This should close all senders (including this) and receivers. + _ssn.exception(e); + } + return new SenderException("Session has been closed",e); + } +} |