diff options
author | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 22:25:01 +0000 |
---|---|---|
committer | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 22:25:01 +0000 |
commit | bb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f (patch) | |
tree | ceedfed77e541625e0460e8a6577d334ecffa43e | |
parent | 0e528b07a48edcb69d5833d1dd90f12f70403fa3 (diff) | |
download | qpid-python-bb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f.tar.gz |
QPID-441 Fix handling of bounced messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@537019 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs | 142 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 48 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs | 2 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs | 15 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Qpid.Client.csproj | 4 | ||||
-rw-r--r-- | dotnet/Qpid.Common/Protocol/AMQConstant.cs | 9 | ||||
-rw-r--r-- | dotnet/Qpid.Common/Qpid.Common.csproj | 4 |
7 files changed, 159 insertions, 65 deletions
diff --git a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs index 84ae2c92c1..bae5b6d8f9 100644 --- a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs +++ b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs @@ -26,69 +26,103 @@ using Qpid.Messaging; namespace Qpid.Client.Tests { - [TestFixture] - public class UndeliverableTest : BaseMessagingTestFixture - { - private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest)); + /// <summary> + /// Tests that when sending undeliverable messages with the + /// mandatory flag set, an exception is raised on the connection + /// as the message is bounced back by the broker + /// </summary> + [TestFixture] + public class UndeliverableTest : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest)); + private ManualResetEvent _event; + public const int TIMEOUT = 1000; + private Exception _lastException; - [SetUp] - public override void Init() - { - base.Init(); + [SetUp] + public override void Init() + { + base.Init(); + _event = new ManualResetEvent(false); + _lastException = null; - try - { - _connection.ExceptionListener = new ExceptionListenerDelegate(OnException); - } - catch (QpidException e) + try + { + _connection.ExceptionListener = new ExceptionListenerDelegate(OnException); + } catch ( QpidException e ) + { + _logger.Error("Could not add ExceptionListener", e); + } + } + + public void OnException(Exception e) + { + // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message. + + _lastException = e; + _logger.Error("OnException handler received connection-level exception", e); + if ( e is QpidException ) + { + QpidException qe = (QpidException)e; + if ( qe.InnerException is AMQUndeliveredException ) { - _logger.Error("Could not add ExceptionListener", e); + AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException; + _logger.Error("inner exception is AMQUndeliveredException", ue); + _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage())); } - } + } + _event.Set(); + } - public static void OnException(Exception e) - { - // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message. + [Test] + public void SendUndeliverableMessageOnDefaultExchange() + { + SendOne("default exchange", null); + } + [Test] + public void SendUndeliverableMessageOnDirectExchange() + { + SendOne("direct exchange", ExchangeNameDefaults.DIRECT); + } + [Test] + public void SendUndeliverableMessageOnTopicExchange() + { + SendOne("topic exchange", ExchangeNameDefaults.TOPIC); + } + [Test] + public void SendUndeliverableMessageOnHeadersExchange() + { + SendOne("headers exchange", ExchangeNameDefaults.HEADERS); + } - _logger.Error("OnException handler received connection-level exception", e); - if (e is QpidException) - { - QpidException qe = (QpidException)e; - if (qe.InnerException is AMQUndeliveredException) - { - AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException; - _logger.Error("inner exception is AMQUndeliveredException", ue); - _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage())); + private void SendOne(string exchangeNameFriendly, string exchangeName) + { + _logger.Info("Sending undeliverable message to " + exchangeNameFriendly); - } - } - } + // Send a test message to a non-existant queue + // on the specified exchange. See if message is returned! + MessagePublisherBuilder builder = _channel.CreatePublisherBuilder() + .WithRoutingKey("Non-existant route key!") + .WithMandatory(true); // necessary so that the server bounces the message back + if ( exchangeName != null ) + { + builder.WithExchangeName(exchangeName); + } + IMessagePublisher publisher = builder.Create(); + publisher.Send(_channel.CreateTextMessage("Hiya!")); - [Test] - public void SendUndeliverableMessage() - { - SendOne("default exchange", null); - SendOne("direct exchange", ExchangeNameDefaults.DIRECT); - SendOne("topic exchange", ExchangeNameDefaults.TOPIC); - SendOne("headers exchange", ExchangeNameDefaults.HEADERS); + // check we received an exception on the connection + // and that it is of the right type + _event.WaitOne(TIMEOUT, true); - Thread.Sleep(1000); // Wait for message returns! - } + Type expectedException = typeof(AMQUndeliveredException); + Exception ex = _lastException; + Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException); - private void SendOne(string exchangeNameFriendly, string exchangeName) - { - _logger.Info("Sending undeliverable message to " + exchangeNameFriendly); + if ( ex.InnerException != null ) + ex = ex.InnerException; - // Send a test message to a non-existant queue on the default exchange. See if message is returned! - MessagePublisherBuilder builder = _channel.CreatePublisherBuilder() - .WithRoutingKey("Non-existant route key!") - .WithMandatory(true); - if (exchangeName != null) - { - builder.WithExchangeName(exchangeName); - } - IMessagePublisher publisher = builder.Create(); - publisher.Send(_channel.CreateTextMessage("Hiya!")); - } - } + Assert.IsInstanceOfType(expectedException, ex); + } + } } diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 07650c170b..3471ac3640 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -28,6 +28,7 @@ using Qpid.Client.Message; using Qpid.Collections; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Protocol; namespace Qpid.Client { @@ -568,8 +569,14 @@ namespace Qpid.Client if (_logger.IsDebugEnabled) { _logger.Debug("Message received in session with channel id " + _channelId); - } - _queue.EnqueueBlocking(message); + } + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } else + { + _queue.EnqueueBlocking(message); + } } public int DefaultPrefetch @@ -986,5 +993,42 @@ namespace Qpid.Client // FIXME: lock FailoverMutex here? _connection.ProtocolWriter.Write(ackFrame); } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage( + 0, false, message.ContentHeader, + message.Bodies + ); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + _connection.ExceptionReceived(exception); + } catch ( Exception ex ) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + + } } } diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs index 78526f906f..0bd65a1ace 100644 --- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs @@ -32,7 +32,7 @@ namespace Qpid.Client.Handler public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) { - _logger.Debug("New JmsBounce method received"); + _logger.Debug("New Basic.Return method received"); UnprocessedMessage msg = new UnprocessedMessage(); msg.DeliverBody = null; msg.BounceBody = (BasicReturnBody) evt.Method; diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs index 0ce8a393c9..7f88dd8219 100644 --- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs @@ -44,11 +44,20 @@ namespace Qpid.Client.Handler AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId); evt.ProtocolSession.WriteFrame(frame); - // HACK + if ( errorCode != AMQConstant.REPLY_SUCCESS.Code ) { - _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); - evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason)); + _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + throw new AMQNoConsumersException(reason); + if ( errorCode == AMQConstant.NO_ROUTE.Code ) + throw new AMQNoRouteException(reason); + if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code ) + throw new AMQInvalidArgumentException(reason); + if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code ) + throw new AMQInvalidRoutingKeyException(reason); + // any other + throw new AMQChannelClosedException(errorCode, "Error: " + reason); } evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason); } diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj index 19d2180a09..31cd5e03ae 100644 --- a/dotnet/Qpid.Client/Qpid.Client.csproj +++ b/dotnet/Qpid.Client/Qpid.Client.csproj @@ -43,6 +43,8 @@ <Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -144,4 +146,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file diff --git a/dotnet/Qpid.Common/Protocol/AMQConstant.cs b/dotnet/Qpid.Common/Protocol/AMQConstant.cs index 560ac97122..b85dd2ff4c 100644 --- a/dotnet/Qpid.Common/Protocol/AMQConstant.cs +++ b/dotnet/Qpid.Common/Protocol/AMQConstant.cs @@ -77,17 +77,20 @@ namespace Qpid.Protocol public static readonly AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
public static readonly AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
public static readonly AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
- public static readonly AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
- public static readonly AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
public static readonly AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static readonly AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
public static readonly AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+ public static readonly AMQConstant ALREADY_EXISTS = new AMQConstant(405, "already exists", true);
+ public static readonly AMQConstant IN_USE = new AMQConstant(406, "in use", true);
+ public static readonly AMQConstant INVALID_ROUTING_KEY = new AMQConstant(407, "routing key invalid", true);
+ public static readonly AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "request timeout", true);
+ public static readonly AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
public static readonly AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static readonly AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
public static readonly AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true);
public static readonly AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true);
public static readonly AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
+ public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
public static readonly AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
public static readonly AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true);
diff --git a/dotnet/Qpid.Common/Qpid.Common.csproj b/dotnet/Qpid.Common/Qpid.Common.csproj index ec85567a25..944fe24677 100644 --- a/dotnet/Qpid.Common/Qpid.Common.csproj +++ b/dotnet/Qpid.Common/Qpid.Common.csproj @@ -44,6 +44,8 @@ <Compile Include="AMQConnectionClosedException.cs" />
<Compile Include="AMQDisconnectedException.cs" />
<Compile Include="AMQException.cs" />
+ <Compile Include="AMQInvalidArgumentException.cs" />
+ <Compile Include="AMQInvalidRoutingKeyException.cs" />
<Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
@@ -208,4 +210,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file |