diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-11-06 09:56:56 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-11-06 09:56:56 +0000 |
commit | 3cb32fb199ccf271da1c08f355940c2550d48464 (patch) | |
tree | cba2c840fcfedb8a0069d84485f2ec372c51083d | |
parent | 340a73dbd9f9797d48b3007ab3b9fa8de21bd6ee (diff) | |
download | qpid-python-3cb32fb199ccf271da1c08f355940c2550d48464.tar.gz |
QPID-1431 : Prevent listeners being added to _frameListeners when an exception state has occured due to connection closure. This adds additional synchronisation to the error handling condition. It also removes the need for a CopyOnWriteArrayList that was perhaps the wrong data structure for this list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711822 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 739 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 116b163b3c..bf32b4df1f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -452,13 +452,16 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToFrameListeners(Exception e) { - if (!_frameListeners.isEmpty()) + synchronized (_frameListeners) { - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + if (!_frameListeners.isEmpty()) { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); + final Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener ml = (AMQMethodListener) it.next(); + ml.error(e); + } } } } @@ -558,18 +561,20 @@ public class AMQProtocolHandler extends IoHandlerAdapter { boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + synchronized (_frameListeners) { - //This iterator is safe from the error state as the frame listeners always add before they send so their - // will be ready and waiting for this response. - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } } - if (!wasAnyoneInterested) { throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" @@ -659,6 +664,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter throw _lastFailoverException; } + if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED) + { + Exception e = _stateManager.getLastException(); + if (e != null) + { + if (e instanceof AMQException) + { + AMQException amqe = (AMQException) e; + + amqe.rethrow(); + } + else + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); + } + } + } + _frameListeners.add(listener); //FIXME: At this point here we should check or before add we should check _stateManager is in an open // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java new file mode 100644 index 0000000000..ce79080e97 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.url.URLSyntaxException; + +import java.io.IOException; + +public class MockAMQConnection extends AMQConnection +{ + public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException + { + super(broker, username, password, clientName, virtualHost); + } + + public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) + throws AMQException, URLSyntaxException + { + super(broker, username, password, clientName, virtualHost, sslConfig); + } + + public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException + { + super(host, port, username, password, clientName, virtualHost); + } + + public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) + throws AMQException, URLSyntaxException + { + super(host, port, username, password, clientName, virtualHost, sslConfig); + } + + public MockAMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) + throws AMQException, URLSyntaxException + { + super(host, port, useSSL, username, password, clientName, virtualHost, sslConfig); + } + + public MockAMQConnection(String connection) + throws AMQException, URLSyntaxException + { + super(connection); + } + + public MockAMQConnection(String connection, SSLConfiguration sslConfig) + throws AMQException, URLSyntaxException + { + super(connection, sslConfig); + } + + public MockAMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) + throws AMQException + { + super(connectionURL, sslConfig); + } + + protected MockAMQConnection(String username, String password, String clientName, String virtualHost) + { + super(username, password, clientName, virtualHost); + } + + @Override + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + { + _connected = true; + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); + return null; + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java new file mode 100644 index 0000000000..10ec220d9e --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -0,0 +1,297 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol; + +import junit.framework.TestCase; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.MockAMQConnection; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.state.AMQState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. + * + * Currently we do checks at the Session level to ensure that the connection/session are open. However, it is possible + * for the connection to close AFTER this check has been performed. + * + * Performing a similar check at the frameListener level in AMQProtocolHandler makes most sence as this will prevent + * listening when there can be no returning frames. + * + * With the correction in place it also means that the new listener will either make it on to the list for notification + * or it will be notified of any existing exception due to the connection being closed. + * + * There may still be an issue in this space if the client utilises a second thread to close the session as there will + * be no exception set to throw and so the wait will occur. That said when the session is closed the framelisteners + * should be notified. Not sure this is tested. + */ +public class AMQProtocolHandlerTest extends TestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class); + + // The handler to test + AMQProtocolHandler _handler; + + // A frame to block upon whilst waiting the exception + AMQFrame _blockFrame; + + // Latch to know when the listener receives an exception + private CountDownLatch _handleCountDown; + // The listener that will receive an exception + BlockToAccessFrameListener _listener; + + @Override + public void setUp() throws Exception + { + //Create a new ProtocolHandler with a fake connection. + _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); + + _handler.sessionCreated(new MockIoSession()); + + AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); + _blockFrame = new AMQFrame(0, body); + + _handleCountDown = new CountDownLatch(1); + + _logger.info("Creating _Listener that should also receive the thrown exception."); + _listener = new BlockToAccessFrameListener(1); + } + + /** + * There are two paths based on the type of exception thrown. + * + * This tests that when an AMQException is thrown we get the same type of AMQException back with the real exception + * wrapped as the cause. + * + * @throws InterruptedException - if we are unable to wait for the test signals + */ + public void testFrameListenerUpdateWithAMQException() throws InterruptedException + { + AMQException trigger = new AMQAuthenticationException(AMQConstant.ACCESS_REFUSED, + "AMQPHTest", new RuntimeException()); + + performWithException(trigger); + + + AMQException receivedException = (AMQException) _listener.getReceivedException(); + + assertEquals("Return exception was not the expected type", + AMQAuthenticationException.class, receivedException.getClass()); + + assertEquals("The _Listener did not receive the correct error code", + trigger.getErrorCode(), receivedException.getErrorCode()); + } + + /** + * There are two paths based on the type of exception thrown. + * + * This tests that when a generic Exception is thrown we get the exception back wrapped in a AMQException + * as the cause. + * @throws InterruptedException - if we are unable to wait for the test signals + */ + public void testFrameListenerUpdateWithException() throws InterruptedException + { + + Exception trigger = new Exception(new RuntimeException()); + + performWithException(trigger); + + assertEquals("The _Listener did not receive the correct error code", + AMQConstant.INTERNAL_ERROR, ((AMQException)_listener.getReceivedException()).getErrorCode()); + } + + /** + * This is the main test method for both test cases. + * + * What occurs is that we create a new thread that will block (<30s[DEFAULT]) for a frame or exception to occur . + * + * We use a CountDownLatch to ensure that the new thread is running before we then yield and sleep to help ensure + * the new thread has entered the synchronized block in the writeCommandFrameAndWaitForReply. + * + * We can then ack like an the incomming exception handler in (ConnectionCloseMethodHandler). + * + * We fire the error to the stateManager, which in this case will recored the error as there are no state listeners. + * + * We then set the connection to be closed, as we would normally close the socket at this point. + * + * Then fire the exception in to any frameListeners. + * + * The blocked listener (created above) when receiving the error simulates the user by creating a new request to + * block for a frame. + * + * This request should fail. Prior to the fix this will fail with a NPE as we are attempting to use a null listener + * in the writeCommand.... call L:268. + * + * This highlights that the listener would be added dispite there being a pending error state that the listener will + * miss as it is not currently part of the _frameListeners set that is being notified by the iterator. + * + * The method waits to ensure that an exception is received before returning. + * + * The calling methods validate that exception that was received based on the one they sent in. + * + * @param trigger The exception to throw through the handler + */ + private void performWithException(Exception trigger) throws InterruptedException + { + + final CountDownLatch callingWriteCommand = new CountDownLatch(1); + + //Set an initial listener that will allow us to create a new blocking method + new Thread(new Runnable() + { + public void run() + { + + try + { + + _logger.info("At initial block, signalling to fire new exception"); + callingWriteCommand.countDown(); + + _handler.writeCommandFrameAndWaitForReply(_blockFrame, _listener); + } + catch (Exception e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }).start(); + + _logger.info("Waiting for 'initial block' to start "); + if (!callingWriteCommand.await(1000, TimeUnit.MILLISECONDS)) + { + fail("Failed to start new thread to block for frame"); + } + + // Do what we can to ensure that this thread does not continue before the above thread has hit the synchronized + // block in the writeCommandFrameAndWaitForReply + Thread.yield(); + Thread.sleep(1000); + + _logger.info("Firing Erorr through state manager. There should be not state waiters here."); + _handler.getStateManager().error(trigger); + + _logger.info("Setting state to be CONNECTION_CLOSED."); + try + { + _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); + } + catch (AMQException e) + { + _logger.error("Unable to change the state to closed.", e); + fail("Unable to change the state to closed due to :"+e.getMessage()); + } + + _logger.info("Firing exception"); + _handler.propagateExceptionToFrameListeners(trigger); + + _logger.info("Awaiting notifcation from handler that exception arrived."); + + if (!_handleCountDown.await(2000, TimeUnit.MILLISECONDS)) + { + fail("Failed to handle exception and timeout has not occured"); + } + + + assertNotNull("The _Listener did not receive the exception", _listener.getReceivedException()); + + assertTrue("Received exception not an AMQException", + _listener.getReceivedException() instanceof AMQException); + + AMQException receivedException = (AMQException) _listener.getReceivedException(); + + assertTrue("The _Listener did not receive the correct message", + receivedException.getMessage().startsWith(trigger.getMessage())); + + + assertEquals("The _Listener did not receive the correct cause", + trigger, receivedException.getCause()); + + assertEquals("The _Listener did not receive the correct sub cause", + trigger.getCause(), receivedException.getCause().getCause()); + + } + + class BlockToAccessFrameListener extends BlockingMethodFrameListener + { + private Exception _receivedException = null; + + /** + * Creates a new method listener, that filters incoming method to just those that match the specified channel id. + * + * @param channelId The channel id to filter incoming methods with. + */ + public BlockToAccessFrameListener(int channelId) + { + super(channelId); + _logger.info("Creating a listener:" + this); + } + + public boolean processMethod(int channelId, AMQMethodBody frame) + { + return true; + } + + @Override + public void error(Exception e) + { + _logger.info("Exception(" + e + ") Received by:" + this); + // Create a new Thread to start the blocking registration. + new Thread(new Runnable() + { + + public void run() + { + //Set an initial listener that will allow us to create a new blocking method + try + { + _handler.writeCommandFrameAndWaitForReply(_blockFrame, null, 2000L); + _logger.info("listener(" + this + ") Wait completed"); + } + catch (Exception e) + { + _logger.info("listener(" + this + ") threw exception:" + e.getMessage()); + _receivedException = e; + } + + _logger.info("listener(" + this + ") completed"); + _handleCountDown.countDown(); + } + }).start(); + } + + public Exception getReceivedException() + { + return _receivedException; + } + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java b/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java new file mode 100644 index 0000000000..f0938a4bc0 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/protocol/MockIoSession.java @@ -0,0 +1,312 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol; + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultCloseFuture; +import org.apache.mina.common.support.DefaultWriteFuture; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.qpid.client.protocol.AMQProtocolSession; + +import java.net.SocketAddress; +import java.net.InetSocketAddress; +import java.util.Set; + +public class MockIoSession implements IoSession +{ + private AMQProtocolSession _protocolSession; + + /** + * Stores the last response written + */ + private Object _lastWrittenObject; + + private boolean _closing; + private IoFilterChain _filterChain; + + public MockIoSession() + { + _filterChain = new AbstractIoFilterChain(this) + { + protected void doWrite(IoSession ioSession, IoFilter.WriteRequest writeRequest) throws Exception + { + + } + + protected void doClose(IoSession ioSession) throws Exception + { + + } + }; + } + + public Object getLastWrittenObject() + { + return _lastWrittenObject; + } + + public IoService getService() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoServiceConfig getServiceConfig() + { + return null; + } + + public IoHandler getHandler() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoSessionConfig getConfig() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoFilterChain getFilterChain() + { + return _filterChain; + } + + public WriteFuture write(Object message) + { + WriteFuture wf = new DefaultWriteFuture(null); + _lastWrittenObject = message; + return wf; + } + + public CloseFuture close() + { + _closing = true; + CloseFuture cf = new DefaultCloseFuture(null); + cf.setClosed(); + return cf; + } + + public Object getAttachment() + { + return _protocolSession; + } + + public Object setAttachment(Object attachment) + { + Object current = _protocolSession; + _protocolSession = (AMQProtocolSession) attachment; + return current; + } + + public Object getAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object setAttribute(String key, Object value) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object setAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object removeAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean containsAttribute(String key) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Set getAttributeKeys() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TransportType getTransportType() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isConnected() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isClosing() + { + return _closing; + } + + public CloseFuture getCloseFuture() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getLocalAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getServiceAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getIdleTime(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public int getWriteTimeout() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getWriteTimeoutInMillis() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setWriteTimeout(int writeTimeout) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public TrafficMask getTrafficMask() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void suspendRead() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void suspendWrite() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resumeRead() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resumeWrite() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getReadBytes() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getWrittenBytes() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getReadMessages() + { + return 0L; + } + + public long getWrittenMessages() + { + return 0L; + } + + public long getWrittenWriteRequests() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getScheduledWriteRequests() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastIoTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastReadTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastWriteTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isIdle(IdleStatus status) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getIdleCount(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } +} |