diff options
author | Alex Rudyy <orudyy@apache.org> | 2015-03-27 15:50:50 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2015-03-27 15:50:50 +0000 |
commit | be7e028eda0b4880686e8b069e36a1946164fb67 (patch) | |
tree | 199730645a80c1925113593b8f61c1ab78219caa | |
parent | d9f4fdcab179600c43d86fd07949d1c5ecbb1767 (diff) | |
download | qpid-python-be7e028eda0b4880686e8b069e36a1946164fb67.tar.gz |
QPID-6469: Improve AMQProtocolEngine#exception to guard against an exception whilst trying to send response to client from the method and re-throw unexpected exception
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1669612 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 153 insertions, 19 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 0a64008a69..2cdb425bb5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -1161,8 +1160,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { if (throwable instanceof AMQProtocolHeaderException) { - writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _sender.close(); + sendResponseAndCloseSender(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1181,30 +1179,57 @@ public class AMQProtocolEngine implements ServerProtocolEngine, throwable.getMessage()), _currentClassId, _currentMethodId); - - try + sendResponseAndCloseSender(closeBody.generateFrame(0)); + } + finally + { + if (!(throwable instanceof TransportException + || throwable instanceof ConnectionScopedRuntimeException)) { - writeFrame(closeBody.generateFrame(0)); + if (throwable instanceof Error) + { + throw (Error) throwable; + } - _sender.close(); - } - catch(SenderException e) - { - // ignore + if (throwable instanceof RuntimeException) + { + throw (RuntimeException) throwable; + } + + if (throwable instanceof Throwable) + { + throw new ServerScopedRuntimeException("Unexpected exception", throwable); + } } + } + } + } + private void sendResponseAndCloseSender(AMQDataBlock dataBlock) + { + try + { + writeFrame(dataBlock); + } + catch(SenderException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Exception occurred on sending response", e); } - finally + } + finally + { + try { - if(throwable instanceof Error) - { - throw (Error) throwable; - } - if(throwable instanceof ServerScopedRuntimeException) + _sender.close(); + } + catch(SenderException e) + { + if (_logger.isDebugEnabled()) { - throw (ServerScopedRuntimeException) throwable; + _logger.debug("Exception occurred on sender close", e); } - } } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java index a9eb2b1680..9121600dcd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java @@ -20,20 +20,30 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngineTest extends QpidTestCase @@ -92,4 +102,103 @@ public class AMQProtocolEngineTest extends QpidTestCase assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); } + + public void testThrownExceptionOnSendingResponseFromExceptionHandler() + { + ByteBufferSender sender = mock(ByteBufferSender.class); + when(_network.getSender()).thenReturn(sender); + doThrow(new SenderException("exception on close")).when(sender).close(); + doThrow(new SenderException("exception on send")).when(sender).send(any(ByteBuffer.class)); + + AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); + + try + { + engine.exception(new ConnectionScopedRuntimeException("test")); + } + catch (Exception e) + { + fail("Unexpected exception is thrown " + e); + } + + doThrow(new NullPointerException("unexpected exception")).when(sender).send(any(ByteBuffer.class)); + try + { + engine.exception(new ConnectionScopedRuntimeException("test")); + fail("Unexpected exception should be reported"); + } + catch (NullPointerException e) + { + // pass + } + + } + + public void testExceptionHandling() + { + ByteBufferSender sender = mock(ByteBufferSender.class); + when(_network.getSender()).thenReturn(sender); + + AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); + + try + { + engine.exception(new ConnectionScopedRuntimeException("test")); + } + catch (Exception e) + { + fail("Unexpected exception is thrown " + e); + } + + + try + { + engine.exception(new SenderException("test")); + } + catch (NullPointerException e) + { + fail("Unexpected exception should be reported"); + } + + try + { + engine.exception(new NullPointerException("test")); + fail("NullPointerException should be re-thrown"); + } + catch (NullPointerException e) + { + //pass + } + + try + { + engine.exception(new ServerScopedRuntimeException("test")); + fail("ServerScopedRuntimeException should be re-thrown"); + } + catch (ServerScopedRuntimeException e) + { + //pass + } + + try + { + engine.exception(new AMQException(AMQConstant.INTERNAL_ERROR, "test")); + fail("AMQException should be re-thrown as ServerScopedRuntimeException"); + } + catch (ServerScopedRuntimeException e) + { + //pass + } + + try + { + engine.exception(new Error("test")); + fail("Error should be re-thrown"); + } + catch (Error e) + { + //pass + } + } + } |