summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java46
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (renamed from java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java)13
16 files changed, 97 insertions, 104 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 415b7a3c68..fa43b8809d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -29,6 +29,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -99,10 +100,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQCodecFactory codecFactory)
throws AMQException
{
- this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _minaProtocolSession = session;
+ session.setAttachment(this);
+
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _codecFactory = codecFactory;
+ _managedObject = createMBean();
+ _managedObject.register();
+// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
@@ -208,13 +218,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry);
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if(!_frameListeners.isEmpty())
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+ wasAnyoneInterested = listener.methodReceived(evt) ||
wasAnyoneInterested;
}
}
@@ -233,7 +243,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_logger.error("Closing connection due to: " + e.getMessage());
writeFrame(e.getCloseFrame(frame.channel));
}
- catch (AMQException e)
+ catch (Exception e)
{
_stateManager.error(e);
for (AMQMethodListener listener : _frameListeners)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index fb78b0d8b7..70e530699e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -25,7 +25,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.log4j.Logger;
@@ -43,7 +43,9 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
-
+ private final QueueRegistry _queueRegistry;
+ private final ExchangeRegistry _exchangeRegistry;
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
*/
@@ -58,13 +60,16 @@ public class AMQStateManager implements AMQMethodListener
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager()
+ public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register)
+ protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _protocolSession = protocolSession;
_currentState = initial;
if (register)
{
@@ -149,7 +154,7 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void error(AMQException e)
+ public void error(Exception e)
{
_logger.error("State manager received error notification: " + e, e);
for (StateListener l : _stateListeners)
@@ -158,15 +163,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt);
+ handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
return true;
}
return false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 53291a3fd0..50596d4bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -89,7 +89,7 @@ public class FailoverHandler implements Runnable
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager());
+ _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_amqProtocolHandler.setStateManager(existingStateManager);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
deleted file mode 100644
index 2cbd8f0e32..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-public interface AMQMethodListener
-{
- /**
- * Invoked when a method frame has been received
- * @param evt the event
- * @return true if the handler has processed the method frame, false otherwise. Note
- * that this does not prohibit the method event being delivered to subsequent listeners
- * but can be used to determine if nobody has dealt with an incoming method frame.
- * @throws AMQException if an error has occurred. This exception will be delivered
- * to all registered listeners using the error() method (see below) allowing them to
- * perform cleanup if necessary.
- */
- boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException;
-
- /**
- * Callback when an error has occurred. Allows listeners to clean up.
- * @param e
- */
- void error(Exception e);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 220f3c3b69..a0aa1d544b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -40,6 +40,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.BogusSSLContextFactory;
import java.util.Iterator;
@@ -68,7 +69,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
private volatile AMQProtocolSession _protocolSession;
- private AMQStateManager _stateManager = new AMQStateManager();
+// private AMQStateManager _stateManager = new AMQStateManager();
private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
@@ -277,7 +278,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- _stateManager.error(e);
+ _protocolSession.getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -316,14 +317,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt, _protocolSession);
+ boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
if (!wasAnyoneInterested)
@@ -333,7 +334,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
catch (AMQException e)
{
- _stateManager.error(e);
+ _protocolSession.getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -394,7 +395,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void attainState(AMQState s) throws AMQException
{
- _stateManager.attainState(s);
+ _protocolSession.getStateManager().attainState(s);
}
/**
@@ -486,7 +487,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void closeConnection() throws AMQException
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -556,12 +557,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQStateManager getStateManager()
{
- return _stateManager;
+ return _protocolSession.getStateManager();
}
public void setStateManager(AMQStateManager stateManager)
{
- _stateManager = stateManager;
+ _protocolSession.setStateManager(stateManager);
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
}
FailoverState getFailoverState()
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 718d256a52..1af7ca55e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -33,6 +33,7 @@ import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.commons.lang.StringUtils;
import javax.jms.JMSException;
@@ -63,6 +64,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
protected final IoSession _minaProtocolSession;
+ private AMQStateManager _stateManager;
+
protected WriteFuture _lastWriteFuture;
/**
@@ -98,6 +101,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
_protocolHandler = null;
_minaProtocolSession = null;
+ _stateManager = new AMQStateManager(this);
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -106,6 +110,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ _stateManager = new AMQStateManager(this);
}
public void init()
@@ -136,6 +141,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
getAMQConnection().setClientID(clientID);
}
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
public String getVirtualHost()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 4fff4fab00..c05e6faf53 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
AMQMethodBody method = evt.getMethod();
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index c9b7593796..ea5cfce2ea 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -23,8 +23,8 @@ package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.handler.*;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.protocol.AMQMethodListener;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -41,6 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
@@ -55,13 +56,14 @@ public class AMQStateManager implements AMQMethodListener
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
- public AMQStateManager()
+ public AMQStateManager(AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
}
- protected AMQStateManager(AMQState state, boolean register)
+ protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
{
+ _protocolSession = protocolSession;
_currentState = state;
if(register)
{
@@ -147,12 +149,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, protocolSession, evt);
+ handler.methodReceived(this, _protocolSession, evt);
return true;
}
return false;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index be5f705ffa..1b4a3e8327 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler
public void handle(int channel, AMQMethodBody method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
- _stateMgr.methodReceived(evt, _session);
+ _stateMgr.methodReceived(evt);
}
private class SessionAdapter extends AMQProtocolSession
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index 3c1b50fb99..a371748b9e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import java.util.HashMap;
@@ -43,9 +44,9 @@ public class ClientHandlerRegistry extends AMQStateManager
private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
private final MemberHandle _identity;
- protected ClientHandlerRegistry(MemberHandle local)
+ protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
_identity = local;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
index 89f402c1b9..352928b121 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
@@ -46,7 +46,7 @@ class ClusterBuilder
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory());
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
}
private MethodHandlerFactory getHandlerFactory()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index 6e7efb3659..c1306b4c13 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.ClusterMembershipBody;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
}
void connect(String join) throws Exception
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index 1763bcd03f..04c5f7b451 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -37,10 +37,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
{
private MemberHandle _peer;
- public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
+// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
{
super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+// super(session, queueRegistry, exchangeRegistry, codecFactory);
}
public boolean isPeerSession()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 8557fc17c7..f447895013 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -63,7 +63,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
super(host, port);
_local = local;
- _legacyHandler = new ClientHandlerRegistry(local);
+ _legacyHandler = new ClientHandlerRegistry(local, null);
}
private void init(IoSession session)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 71c53146a8..27d5629f27 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.HashMap;
import java.util.Map;
@@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry()
+ ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s)
+ ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory)
+ ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
init(factory);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
index 6596da1f8f..f77b5084f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
@@ -18,12 +18,8 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -43,14 +39,11 @@ public interface AMQMethodListener
* to all registered listeners using the error() method (see below) allowing them to
* perform cleanup if necessary.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
/**
* Callback when an error has occurred. Allows listeners to clean up.
* @param e
*/
- void error(AMQException e);
+ void error(Exception e);
}