summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-12 22:02:11 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-12 22:02:11 +0000
commitbd98b98b0dc2f15563a280967f9cd907bc7aa7c4 (patch)
treee2708db8fcd1757accbc59565f36349dea00793e /java/client
parentd6de073a34a7781977e71f89dc9f9b66921993cb (diff)
downloadqpid-python-bd98b98b0dc2f15563a280967f9cd907bc7aa7c4.tar.gz
Created common AMQMethodListener class, allowing the Request and Response managers to use a common interface to dispatch events to both the client and servers. Refactoring of bothe the client and broker AMQStateManagers and AMQProtocolSession classes was performed. The refactoring has run aground in the clustering, however, and this still needs to be resolved. As the cluster tests are currently disabled (by whom, I'm not sure), this does not disrupt the overall test result. JIRAs will be opened for this issue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java46
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java14
6 files changed, 41 insertions, 63 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 53291a3fd0..50596d4bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -89,7 +89,7 @@ public class FailoverHandler implements Runnable
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager());
+ _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_amqProtocolHandler.setStateManager(existingStateManager);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
deleted file mode 100644
index 2cbd8f0e32..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-public interface AMQMethodListener
-{
- /**
- * Invoked when a method frame has been received
- * @param evt the event
- * @return true if the handler has processed the method frame, false otherwise. Note
- * that this does not prohibit the method event being delivered to subsequent listeners
- * but can be used to determine if nobody has dealt with an incoming method frame.
- * @throws AMQException if an error has occurred. This exception will be delivered
- * to all registered listeners using the error() method (see below) allowing them to
- * perform cleanup if necessary.
- */
- boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException;
-
- /**
- * Callback when an error has occurred. Allows listeners to clean up.
- * @param e
- */
- void error(Exception e);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 220f3c3b69..a0aa1d544b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -40,6 +40,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.BogusSSLContextFactory;
import java.util.Iterator;
@@ -68,7 +69,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
private volatile AMQProtocolSession _protocolSession;
- private AMQStateManager _stateManager = new AMQStateManager();
+// private AMQStateManager _stateManager = new AMQStateManager();
private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
@@ -277,7 +278,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- _stateManager.error(e);
+ _protocolSession.getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -316,14 +317,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt, _protocolSession);
+ boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
if (!wasAnyoneInterested)
@@ -333,7 +334,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
catch (AMQException e)
{
- _stateManager.error(e);
+ _protocolSession.getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -394,7 +395,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void attainState(AMQState s) throws AMQException
{
- _stateManager.attainState(s);
+ _protocolSession.getStateManager().attainState(s);
}
/**
@@ -486,7 +487,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void closeConnection() throws AMQException
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -556,12 +557,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQStateManager getStateManager()
{
- return _stateManager;
+ return _protocolSession.getStateManager();
}
public void setStateManager(AMQStateManager stateManager)
{
- _stateManager = stateManager;
+ _protocolSession.setStateManager(stateManager);
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
}
FailoverState getFailoverState()
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 718d256a52..1af7ca55e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -33,6 +33,7 @@ import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.commons.lang.StringUtils;
import javax.jms.JMSException;
@@ -63,6 +64,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
protected final IoSession _minaProtocolSession;
+ private AMQStateManager _stateManager;
+
protected WriteFuture _lastWriteFuture;
/**
@@ -98,6 +101,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
_protocolHandler = null;
_minaProtocolSession = null;
+ _stateManager = new AMQStateManager(this);
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -106,6 +110,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ _stateManager = new AMQStateManager(this);
}
public void init()
@@ -136,6 +141,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
getAMQConnection().setClientID(clientID);
}
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
public String getVirtualHost()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 4fff4fab00..c05e6faf53 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
AMQMethodBody method = evt.getMethod();
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index c9b7593796..ea5cfce2ea 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -23,8 +23,8 @@ package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.handler.*;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.protocol.AMQMethodListener;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -41,6 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
@@ -55,13 +56,14 @@ public class AMQStateManager implements AMQMethodListener
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
- public AMQStateManager()
+ public AMQStateManager(AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
}
- protected AMQStateManager(AMQState state, boolean register)
+ protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
{
+ _protocolSession = protocolSession;
_currentState = state;
if(register)
{
@@ -147,12 +149,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, protocolSession, evt);
+ handler.methodReceived(this, _protocolSession, evt);
return true;
}
return false;