summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2015-03-27 15:50:50 +0000
committerAlex Rudyy <orudyy@apache.org>2015-03-27 15:50:50 +0000
commitbe7e028eda0b4880686e8b069e36a1946164fb67 (patch)
tree199730645a80c1925113593b8f61c1ab78219caa
parentd9f4fdcab179600c43d86fd07949d1c5ecbb1767 (diff)
downloadqpid-python-be7e028eda0b4880686e8b069e36a1946164fb67.tar.gz
QPID-6469: Improve AMQProtocolEngine#exception to guard against an exception whilst trying to send response to client from the method and re-throw unexpected exception
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1669612 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java63
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java109
2 files changed, 153 insertions, 19 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 0a64008a69..2cdb425bb5 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -1161,8 +1160,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
if (throwable instanceof AMQProtocolHeaderException)
{
- writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
- _sender.close();
+ sendResponseAndCloseSender(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
_logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
@@ -1181,30 +1179,57 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
throwable.getMessage()),
_currentClassId,
_currentMethodId);
-
- try
+ sendResponseAndCloseSender(closeBody.generateFrame(0));
+ }
+ finally
+ {
+ if (!(throwable instanceof TransportException
+ || throwable instanceof ConnectionScopedRuntimeException))
{
- writeFrame(closeBody.generateFrame(0));
+ if (throwable instanceof Error)
+ {
+ throw (Error) throwable;
+ }
- _sender.close();
- }
- catch(SenderException e)
- {
- // ignore
+ if (throwable instanceof RuntimeException)
+ {
+ throw (RuntimeException) throwable;
+ }
+
+ if (throwable instanceof Throwable)
+ {
+ throw new ServerScopedRuntimeException("Unexpected exception", throwable);
+ }
}
+ }
+ }
+ }
+ private void sendResponseAndCloseSender(AMQDataBlock dataBlock)
+ {
+ try
+ {
+ writeFrame(dataBlock);
+ }
+ catch(SenderException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Exception occurred on sending response", e);
}
- finally
+ }
+ finally
+ {
+ try
{
- if(throwable instanceof Error)
- {
- throw (Error) throwable;
- }
- if(throwable instanceof ServerScopedRuntimeException)
+ _sender.close();
+ }
+ catch(SenderException e)
+ {
+ if (_logger.isDebugEnabled())
{
- throw (ServerScopedRuntimeException) throwable;
+ _logger.debug("Exception occurred on sender close", e);
}
-
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
index a9eb2b1680..9121600dcd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
@@ -20,20 +20,30 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngineTest extends QpidTestCase
@@ -92,4 +102,103 @@ public class AMQProtocolEngineTest extends QpidTestCase
assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute());
}
+
+ public void testThrownExceptionOnSendingResponseFromExceptionHandler()
+ {
+ ByteBufferSender sender = mock(ByteBufferSender.class);
+ when(_network.getSender()).thenReturn(sender);
+ doThrow(new SenderException("exception on close")).when(sender).close();
+ doThrow(new SenderException("exception on send")).when(sender).send(any(ByteBuffer.class));
+
+ AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport);
+
+ try
+ {
+ engine.exception(new ConnectionScopedRuntimeException("test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is thrown " + e);
+ }
+
+ doThrow(new NullPointerException("unexpected exception")).when(sender).send(any(ByteBuffer.class));
+ try
+ {
+ engine.exception(new ConnectionScopedRuntimeException("test"));
+ fail("Unexpected exception should be reported");
+ }
+ catch (NullPointerException e)
+ {
+ // pass
+ }
+
+ }
+
+ public void testExceptionHandling()
+ {
+ ByteBufferSender sender = mock(ByteBufferSender.class);
+ when(_network.getSender()).thenReturn(sender);
+
+ AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport);
+
+ try
+ {
+ engine.exception(new ConnectionScopedRuntimeException("test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is thrown " + e);
+ }
+
+
+ try
+ {
+ engine.exception(new SenderException("test"));
+ }
+ catch (NullPointerException e)
+ {
+ fail("Unexpected exception should be reported");
+ }
+
+ try
+ {
+ engine.exception(new NullPointerException("test"));
+ fail("NullPointerException should be re-thrown");
+ }
+ catch (NullPointerException e)
+ {
+ //pass
+ }
+
+ try
+ {
+ engine.exception(new ServerScopedRuntimeException("test"));
+ fail("ServerScopedRuntimeException should be re-thrown");
+ }
+ catch (ServerScopedRuntimeException e)
+ {
+ //pass
+ }
+
+ try
+ {
+ engine.exception(new AMQException(AMQConstant.INTERNAL_ERROR, "test"));
+ fail("AMQException should be re-thrown as ServerScopedRuntimeException");
+ }
+ catch (ServerScopedRuntimeException e)
+ {
+ //pass
+ }
+
+ try
+ {
+ engine.exception(new Error("test"));
+ fail("Error should be re-thrown");
+ }
+ catch (Error e)
+ {
+ //pass
+ }
+ }
+
}