diff options
Diffstat (limited to 'java')
16 files changed, 97 insertions, 104 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 415b7a3c68..fa43b8809d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; @@ -99,10 +100,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQCodecFactory codecFactory) throws AMQException { - this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this); + _minaProtocolSession = session; + session.setAttachment(this); + + _queueRegistry = queueRegistry; + _exchangeRegistry = exchangeRegistry; + _codecFactory = codecFactory; + _managedObject = createMBean(); + _managedObject.register(); +// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { @@ -208,13 +218,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, (AMQMethodBody) frame.bodyFrame); try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry); + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if(!_frameListeners.isEmpty()) { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } @@ -233,7 +243,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _logger.error("Closing connection due to: " + e.getMessage()); writeFrame(e.getCloseFrame(frame.channel)); } - catch (AMQException e) + catch (Exception e) { _stateManager.error(e); for (AMQMethodListener listener : _frameListeners) diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index fb78b0d8b7..70e530699e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.log4j.Logger; @@ -43,7 +43,9 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - + private final QueueRegistry _queueRegistry; + private final ExchangeRegistry _exchangeRegistry; + private final AMQProtocolSession _protocolSession; /** * The current state */ @@ -58,13 +60,16 @@ public class AMQStateManager implements AMQMethodListener private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager() + public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true); + this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession); } - protected AMQStateManager(AMQState initial, boolean register) + protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { + _queueRegistry = queueRegistry; + _exchangeRegistry = exchangeRegistry; + _protocolSession = protocolSession; _currentState = initial; if (register) { @@ -149,7 +154,7 @@ public class AMQStateManager implements AMQMethodListener } } - public void error(AMQException e) + public void error(Exception e) { _logger.error("State manager received error notification: " + e, e); for (StateListener l : _stateListeners) @@ -158,15 +163,12 @@ public class AMQStateManager implements AMQMethodListener } } - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException + public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt); + handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); return true; } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 53291a3fd0..50596d4bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -89,7 +89,7 @@ public class FailoverHandler implements Runnable // have a state waiter waiting until the connection is closed for some reason. Or in future we may have // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); - _amqProtocolHandler.setStateManager(new AMQStateManager()); + _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession())); if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { _amqProtocolHandler.setStateManager(existingStateManager); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java deleted file mode 100644 index 2cbd8f0e32..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.client.protocol.AMQProtocolSession; - -public interface AMQMethodListener -{ - /** - * Invoked when a method frame has been received - * @param evt the event - * @return true if the handler has processed the method frame, false otherwise. Note - * that this does not prohibit the method event being delivered to subsequent listeners - * but can be used to determine if nobody has dealt with an incoming method frame. - * @throws AMQException if an error has occurred. This exception will be delivered - * to all registered listeners using the error() method (see below) allowing them to - * perform cleanup if necessary. - */ - boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException; - - /** - * Callback when an error has occurred. Allows listeners to clean up. - * @param e - */ - void error(Exception e); -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 220f3c3b69..a0aa1d544b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -40,6 +40,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.BogusSSLContextFactory; import java.util.Iterator; @@ -68,7 +69,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private volatile AMQProtocolSession _protocolSession; - private AMQStateManager _stateManager = new AMQStateManager(); +// private AMQStateManager _stateManager = new AMQStateManager(); private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); @@ -277,7 +278,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - _stateManager.error(e); + _protocolSession.getStateManager().error(e); if(!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -316,14 +317,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt, _protocolSession); + boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt); if(!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); while (it.hasNext()) { final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } if (!wasAnyoneInterested) @@ -333,7 +334,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter } catch (AMQException e) { - _stateManager.error(e); + _protocolSession.getStateManager().error(e); if(!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); @@ -394,7 +395,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void attainState(AMQState s) throws AMQException { - _stateManager.attainState(s); + _protocolSession.getStateManager().attainState(s); } /** @@ -486,7 +487,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void closeConnection() throws AMQException { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); + _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -556,12 +557,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQStateManager getStateManager() { - return _stateManager; + return _protocolSession.getStateManager(); } public void setStateManager(AMQStateManager stateManager) { - _stateManager = stateManager; + _protocolSession.setStateManager(stateManager); + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; } FailoverState getFailoverState() diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 718d256a52..1af7ca55e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQProtocolWriter; +import org.apache.qpid.client.state.AMQStateManager; import org.apache.commons.lang.StringUtils; import javax.jms.JMSException; @@ -63,6 +64,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis protected final IoSession _minaProtocolSession; + private AMQStateManager _stateManager; + protected WriteFuture _lastWriteFuture; /** @@ -98,6 +101,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { _protocolHandler = null; _minaProtocolSession = null; + _stateManager = new AMQStateManager(this); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -106,6 +110,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _minaProtocolSession = protocolSession; // properties of the connection are made available to the event handlers _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); + _stateManager = new AMQStateManager(this); } public void init() @@ -136,6 +141,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { getAMQConnection().setClientID(clientID); } + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public void setStateManager(AMQStateManager stateManager) + { + _stateManager = stateManager; + } public String getVirtualHost() { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 4fff4fab00..c05e6faf53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.client.protocol.AMQProtocolSession; public abstract class BlockingMethodFrameListener implements AMQMethodListener @@ -55,7 +56,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * @return true if the listener has dealt with this frame * @throws AMQException */ - public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { AMQMethodBody method = evt.getMethod(); diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index c9b7593796..ea5cfce2ea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -23,8 +23,8 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; import org.apache.qpid.client.handler.*; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.protocol.AMQMethodListener; import org.apache.qpid.framing.*; import org.apache.log4j.Logger; @@ -41,6 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); + private final AMQProtocolSession _protocolSession; /** * The current state @@ -55,13 +56,14 @@ public class AMQStateManager implements AMQMethodListener private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); - public AMQStateManager() + public AMQStateManager(AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true); + this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); } - protected AMQStateManager(AMQState state, boolean register) + protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession) { + _protocolSession = protocolSession; _currentState = state; if(register) { @@ -147,12 +149,12 @@ public class AMQStateManager implements AMQMethodListener } } - public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException + public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, protocolSession, evt); + handler.methodReceived(this, _protocolSession, evt); return true; } return false; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index be5f705ffa..1b4a3e8327 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler public void handle(int channel, AMQMethodBody method) throws AMQException { AMQMethodEvent evt = new AMQMethodEvent(channel, method); - _stateMgr.methodReceived(evt, _session); + _stateMgr.methodReceived(evt); } private class SessionAdapter extends AMQProtocolSession diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index 3c1b50fb99..a371748b9e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.IllegalStateTransitionException; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.*; import java.util.HashMap; @@ -43,9 +44,9 @@ public class ClientHandlerRegistry extends AMQStateManager private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); private final MemberHandle _identity; - protected ClientHandlerRegistry(MemberHandle local) + protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession); _identity = local; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index 89f402c1b9..352928b121 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -46,7 +46,7 @@ class ClusterBuilder ServerHandlerRegistry getHandlerRegistry() { - return new ServerHandlerRegistry(getHandlerFactory()); + return new ServerHandlerRegistry(getHandlerFactory(), null, null, null); } private MethodHandlerFactory getHandlerFactory() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index 6e7efb3659..c1306b4c13 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.ClusterMembershipBody; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements _handlers = handler._handlers; } - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException { - new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers)); + new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession)); } void connect(String join) throws Exception diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 1763bcd03f..04c5f7b451 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -37,10 +37,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession { private MemberHandle _peer; - public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException +// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, +// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException { super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager); +// super(session, queueRegistry, exchangeRegistry, codecFactory); } public boolean isPeerSession() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 8557fc17c7..f447895013 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -63,7 +63,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { super(host, port); _local = local; - _legacyHandler = new ClientHandlerRegistry(local); + _legacyHandler = new ClientHandlerRegistry(local, null); } private void init(IoSession session) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 71c53146a8..27d5629f27 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.IllegalStateTransitionException; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.HashMap; import java.util.Map; @@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - ServerHandlerRegistry() + ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); } - ServerHandlerRegistry(ServerHandlerRegistry s) + ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); _handlers.putAll(s._handlers); } - ServerHandlerRegistry(MethodHandlerFactory factory) + ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); init(factory); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 6596da1f8f..f77b5084f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -18,12 +18,8 @@ * under the License. * */ -package org.apache.qpid.server.protocol; +package org.apache.qpid.protocol; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.framing.AMQMethodBody; /** @@ -43,14 +39,11 @@ public interface AMQMethodListener * to all registered listeners using the error() method (see below) allowing them to * perform cleanup if necessary. */ - <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException; + <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception; /** * Callback when an error has occurred. Allows listeners to clean up. * @param e */ - void error(AMQException e); + void error(Exception e); } |