diff options
Diffstat (limited to 'trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java')
-rw-r--r-- | trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java | 289 |
1 files changed, 0 insertions, 289 deletions
diff --git a/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java deleted file mode 100644 index f520a21ba0..0000000000 --- a/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import junit.framework.TestCase; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.TestNetworkDriver; -import org.apache.qpid.client.MockAMQConnection; -import org.apache.qpid.client.AMQAuthenticationException; -import org.apache.qpid.client.state.AMQState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. - * - * Currently we do checks at the Session level to ensure that the connection/session are open. However, it is possible - * for the connection to close AFTER this check has been performed. - * - * Performing a similar check at the frameListener level in AMQProtocolHandler makes most sence as this will prevent - * listening when there can be no returning frames. - * - * With the correction in place it also means that the new listener will either make it on to the list for notification - * or it will be notified of any existing exception due to the connection being closed. - * - * There may still be an issue in this space if the client utilises a second thread to close the session as there will - * be no exception set to throw and so the wait will occur. That said when the session is closed the framelisteners - * should be notified. Not sure this is tested. - */ -public class AMQProtocolHandlerTest extends TestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class); - - // The handler to test - AMQProtocolHandler _handler; - - // A frame to block upon whilst waiting the exception - AMQFrame _blockFrame; - - // Latch to know when the listener receives an exception - private CountDownLatch _handleCountDown; - // The listener that will receive an exception - BlockToAccessFrameListener _listener; - - @Override - public void setUp() throws Exception - { - //Create a new ProtocolHandler with a fake connection. - _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - _handler.setNetworkDriver(new TestNetworkDriver()); - AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); - _blockFrame = new AMQFrame(0, body); - - _handleCountDown = new CountDownLatch(1); - - _logger.info("Creating _Listener that should also receive the thrown exception."); - _listener = new BlockToAccessFrameListener(1); - } - - /** - * There are two paths based on the type of exception thrown. - * - * This tests that when an AMQException is thrown we get the same type of AMQException back with the real exception - * wrapped as the cause. - * - * @throws InterruptedException - if we are unable to wait for the test signals - */ - public void testFrameListenerUpdateWithAMQException() throws InterruptedException - { - AMQException trigger = new AMQAuthenticationException(AMQConstant.ACCESS_REFUSED, - "AMQPHTest", new RuntimeException()); - - performWithException(trigger); - - - AMQException receivedException = (AMQException) _listener.getReceivedException(); - - assertEquals("Return exception was not the expected type", - AMQAuthenticationException.class, receivedException.getClass()); - - assertEquals("The _Listener did not receive the correct error code", - trigger.getErrorCode(), receivedException.getErrorCode()); - } - - /** - * There are two paths based on the type of exception thrown. - * - * This tests that when a generic Exception is thrown we get the exception back wrapped in a AMQException - * as the cause. - * @throws InterruptedException - if we are unable to wait for the test signals - */ - public void testFrameListenerUpdateWithException() throws InterruptedException - { - - Exception trigger = new Exception(new RuntimeException()); - - performWithException(trigger); - - assertEquals("The _Listener did not receive the correct error code", - AMQConstant.INTERNAL_ERROR, ((AMQException)_listener.getReceivedException()).getErrorCode()); - } - - /** - * This is the main test method for both test cases. - * - * What occurs is that we create a new thread that will block (<30s[DEFAULT]) for a frame or exception to occur . - * - * We use a CountDownLatch to ensure that the new thread is running before we then yield and sleep to help ensure - * the new thread has entered the synchronized block in the writeCommandFrameAndWaitForReply. - * - * We can then ack like an the incomming exception handler in (ConnectionCloseMethodHandler). - * - * We fire the error to the stateManager, which in this case will recored the error as there are no state listeners. - * - * We then set the connection to be closed, as we would normally close the socket at this point. - * - * Then fire the exception in to any frameListeners. - * - * The blocked listener (created above) when receiving the error simulates the user by creating a new request to - * block for a frame. - * - * This request should fail. Prior to the fix this will fail with a NPE as we are attempting to use a null listener - * in the writeCommand.... call L:268. - * - * This highlights that the listener would be added dispite there being a pending error state that the listener will - * miss as it is not currently part of the _frameListeners set that is being notified by the iterator. - * - * The method waits to ensure that an exception is received before returning. - * - * The calling methods validate that exception that was received based on the one they sent in. - * - * @param trigger The exception to throw through the handler - */ - private void performWithException(Exception trigger) throws InterruptedException - { - - final CountDownLatch callingWriteCommand = new CountDownLatch(1); - - //Set an initial listener that will allow us to create a new blocking method - new Thread(new Runnable() - { - public void run() - { - - try - { - - _logger.info("At initial block, signalling to fire new exception"); - callingWriteCommand.countDown(); - - _handler.writeCommandFrameAndWaitForReply(_blockFrame, _listener); - } - catch (Exception e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }).start(); - - _logger.info("Waiting for 'initial block' to start "); - if (!callingWriteCommand.await(1000, TimeUnit.MILLISECONDS)) - { - fail("Failed to start new thread to block for frame"); - } - - // Do what we can to ensure that this thread does not continue before the above thread has hit the synchronized - // block in the writeCommandFrameAndWaitForReply - Thread.yield(); - Thread.sleep(1000); - - _logger.info("Firing Erorr through state manager. There should be not state waiters here."); - _handler.getStateManager().error(trigger); - - _logger.info("Setting state to be CONNECTION_CLOSED."); - - _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - - _logger.info("Firing exception"); - _handler.propagateExceptionToFrameListeners(trigger); - - _logger.info("Awaiting notifcation from handler that exception arrived."); - - if (!_handleCountDown.await(2000, TimeUnit.MILLISECONDS)) - { - fail("Failed to handle exception and timeout has not occured"); - } - - - assertNotNull("The _Listener did not receive the exception", _listener.getReceivedException()); - - assertTrue("Received exception not an AMQException", - _listener.getReceivedException() instanceof AMQException); - - AMQException receivedException = (AMQException) _listener.getReceivedException(); - - assertTrue("The _Listener did not receive the correct message", - receivedException.getMessage().startsWith(trigger.getMessage())); - - - assertEquals("The _Listener did not receive the correct cause", - trigger, receivedException.getCause()); - - assertEquals("The _Listener did not receive the correct sub cause", - trigger.getCause(), receivedException.getCause().getCause()); - - } - - class BlockToAccessFrameListener extends BlockingMethodFrameListener - { - private Exception _receivedException = null; - - /** - * Creates a new method listener, that filters incoming method to just those that match the specified channel id. - * - * @param channelId The channel id to filter incoming methods with. - */ - public BlockToAccessFrameListener(int channelId) - { - super(channelId); - _logger.info("Creating a listener:" + this); - } - - public boolean processMethod(int channelId, AMQMethodBody frame) - { - return true; - } - - @Override - public void error(Exception e) - { - _logger.info("Exception(" + e + ") Received by:" + this); - // Create a new Thread to start the blocking registration. - new Thread(new Runnable() - { - - public void run() - { - //Set an initial listener that will allow us to create a new blocking method - try - { - _handler.writeCommandFrameAndWaitForReply(_blockFrame, null, 2000L); - _logger.info("listener(" + this + ") Wait completed"); - } - catch (Exception e) - { - _logger.info("listener(" + this + ") threw exception:" + e.getMessage()); - _receivedException = e; - } - - _logger.info("listener(" + this + ") completed"); - _handleCountDown.countDown(); - } - }).start(); - } - - public Exception getReceivedException() - { - return _receivedException; - } - } - -} |