summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:20:48 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:20:48 +0000
commit95906fe274ca7a1b407680a23858483d5253791b (patch)
tree7d02fc67665d1b7fda60decbce85f6a8f67c1b76
parent259968ae10e95dc315f3e08506c5dae27e7099f4 (diff)
downloadqpid-python-95906fe274ca7a1b407680a23858483d5253791b.tar.gz
QPID-4027 Added extension interfaces for Session, Sender and Receiver.
Added convinience class that converts java long timeout to the C++ Duration object. Added ReceiverManagementDecorator and SenderManagementDecorator that provides state management and error handling via the decorator pattern for a Receiver and a Sender respectively. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350702 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java48
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java26
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java26
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java30
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java267
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java234
6 files changed, 631 insertions, 0 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
new file mode 100644
index 0000000000..59251dc597
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
@@ -0,0 +1,48 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.cpp;
+
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.cpp.jni.Duration;
+
+public class CppDuration
+{
+ public static Duration getDuration(long duration)
+ {
+ if (Receiver.FOREVER == duration)
+ {
+ return Duration.getFOREVER();
+ }
+ else if (Receiver.IMMEDIATE == duration)
+ {
+ return Duration.getIMMEDIATE();
+ }
+ else if (Receiver.SECOND == duration)
+ {
+ return Duration.getSECOND();
+ }
+ else if (Receiver.MINUTE == duration)
+ {
+ return Duration.getMINUTE();
+ }
+ else
+ {
+ return new Duration(duration);
+ }
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
new file mode 100644
index 0000000000..2ff574f07d
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
@@ -0,0 +1,26 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+
+public interface ReceiverExt extends Receiver
+{
+ public void recreate() throws MessagingException;
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
new file mode 100644
index 0000000000..c05fbc52c0
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
@@ -0,0 +1,26 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Sender;
+
+public interface SenderExt extends Sender
+{
+ public void recreate() throws MessagingException;
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
new file mode 100644
index 0000000000..626ef5770b
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
@@ -0,0 +1,30 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+
+public interface SessionExt extends Session
+{
+ public ConnectionExt getConnectionExt();
+
+ public void exception(MessagingException e);
+
+ public void recreate() throws MessagingException;
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
new file mode 100644
index 0000000000..ef7a0ca354
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
@@ -0,0 +1,267 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ReceiverExt;
+import org.apache.qpid.messaging.ext.SessionExt;
+import org.apache.qpid.messaging.util.SessionManagementDecorator.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Receiver.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class ReceiverManagementDecorator implements ReceiverExt
+{
+ private static Logger _logger = LoggerFactory.getLogger(ReceiverManagementDecorator.class);
+
+ public enum ReceiverState {OPENED, CLOSED, ERROR};
+
+ private Receiver _delegate;
+ private ReceiverState _state = ReceiverState.OPENED;
+ private SessionExt _ssn;
+ private final Object _connectionLock; // global per connection lock
+
+ public ReceiverManagementDecorator(SessionExt ssn, Receiver delegate)
+ {
+ _ssn = ssn;
+ _delegate = delegate;
+ _connectionLock = ssn.getConnectionExt().getConnectionLock();
+ }
+
+ @Override
+ public Message get(long timeout) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.get(timeout);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Message fetch(long timeout) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.fetch(timeout);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.setCapacity(capacity);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getCapacity();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getAvailable();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getUnsettled();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Receiver is already closed");
+ synchronized (_connectionLock)
+ {
+ _state = ReceiverState.CLOSED;
+ _delegate.close();
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _state == ReceiverState.CLOSED;
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ return _delegate.getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ private void checkClosedAndThrowException() throws ReceiverException
+ {
+ checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver");
+ }
+
+ private void checkClosedAndThrowException(String closedMessage) throws ReceiverException
+ {
+ switch (_state)
+ {
+ case ERROR:
+ throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this");
+ case CLOSED:
+ throw new ReceiverException(closedMessage);
+ }
+ }
+
+ /**
+ * A ConnectionException will cause the Session/Receiver to go into a temporary error state,
+ * which prevents it from being used further.
+ * From there the Session and Receiver can be moved into OPENED (if failover works) or
+ * CLOSED if there is no failover or if failover has failed.
+ * @param e
+ * @throws MessagingException
+ */
+ private ReceiverException handleConnectionException(ConnectionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = ReceiverState.ERROR;
+ _ssn.exception(e); // This might trigger failover in a layer above.
+ if (_state == ReceiverState.CLOSED)
+ {
+ // The connection has instructed the session and it's child objects to be closed.
+ // Either there was no failover, or failover has failed.
+ return new ReceiverException("Receiver is closed due to connection error",e);
+ }
+ else
+ {
+ // Asking the application or the Parent handler to retry the operation.
+ // The Session and Receiver should be in OPENED state at this time.
+ return new ReceiverException("Receiver was in a temporary error state due to connection error." +
+ "Plase retry your operation",e);
+ }
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ private ReceiverException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ // This should close all receivers (including this) and senders.
+ _ssn.exception(e);
+ }
+ return new ReceiverException("Session has been closed",e);
+ }
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
new file mode 100644
index 0000000000..2c28116da1
--- /dev/null
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
@@ -0,0 +1,234 @@
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.SenderException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.SenderExt;
+import org.apache.qpid.messaging.ext.SessionExt;
+import org.apache.qpid.messaging.util.ReceiverManagementDecorator.ReceiverState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Sender.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class SenderManagementDecorator implements SenderExt
+{
+ private static Logger _logger = LoggerFactory.getLogger(SenderManagementDecorator.class);
+
+ public enum SenderState {OPENED, CLOSED, ERROR};
+
+ private Sender _delegate;
+ private SenderState _state = SenderState.OPENED;
+ private SessionExt _ssn;
+ private final Object _connectionLock; // global per connection lock
+
+ public SenderManagementDecorator(SessionExt ssn, Sender delegate)
+ {
+ _ssn = ssn;
+ _delegate = delegate;
+ _connectionLock = ssn.getConnectionExt().getConnectionLock();
+ }
+
+ @Override
+ public void send(Message message, boolean sync) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.send(message, sync);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Sender is already closed");
+ synchronized (_connectionLock)
+ {
+ _state = SenderState.CLOSED;
+ _delegate.close();
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.setCapacity(capacity);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getCapacity();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getAvailable();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getUnsettled();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public boolean isClosed() throws MessagingException
+ {
+ return _state == SenderState.CLOSED;
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ return _delegate.getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ private void checkClosedAndThrowException() throws ReceiverException
+ {
+ checkClosedAndThrowException("Receiver is closed. You cannot invoke methods on a closed receiver");
+ }
+
+ private void checkClosedAndThrowException(String closedMessage) throws ReceiverException
+ {
+ switch (_state)
+ {
+ case ERROR:
+ throw new ReceiverException("Receiver is in a temporary error state. The session may or may not recover from this");
+ case CLOSED:
+ throw new ReceiverException(closedMessage);
+ }
+ }
+
+ /**
+ * A ConnectionException will cause the Session/Sender to go into a temporary error state,
+ * which prevents it from being used further.
+ * From there the Session and Sender can be moved into OPENED (if failover works) or
+ * CLOSED if there is no failover or if failover has failed.
+ * @param e
+ * @throws MessagingException
+ */
+ private SenderException handleConnectionException(ConnectionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = SenderState.ERROR;
+ _ssn.exception(e); // This might trigger failover in a layer above.
+ if (_state == SenderState.CLOSED)
+ {
+ // The connection has instructed the session and it's child objects to be closed.
+ // Either there was no failover, or failover has failed.
+ return new SenderException("Sender is closed due to connection error",e);
+ }
+ else
+ {
+ // Asking the application or the Parent handler to retry the operation.
+ // The Session and Receiver should be in OPENED state at this time.
+ return new SenderException("Sender was in a temporary error state due to connection error." +
+ "Plase retry your operation",e);
+ }
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ private SenderException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ // This should close all senders (including this) and receivers.
+ _ssn.exception(e);
+ }
+ return new SenderException("Session has been closed",e);
+ }
+}