summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTomas Restrepo <tomasr@apache.org>2007-05-10 22:25:01 +0000
committerTomas Restrepo <tomasr@apache.org>2007-05-10 22:25:01 +0000
commitbb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f (patch)
treeceedfed77e541625e0460e8a6577d334ecffa43e
parent0e528b07a48edcb69d5833d1dd90f12f70403fa3 (diff)
downloadqpid-python-bb9ecb50f9b1a184e7ea59c933ceb56facd2fd3f.tar.gz
QPID-441 Fix handling of bounced messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@537019 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs142
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs48
-rw-r--r--dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs2
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs15
-rw-r--r--dotnet/Qpid.Client/Qpid.Client.csproj4
-rw-r--r--dotnet/Qpid.Common/Protocol/AMQConstant.cs9
-rw-r--r--dotnet/Qpid.Common/Qpid.Common.csproj4
7 files changed, 159 insertions, 65 deletions
diff --git a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
index 84ae2c92c1..bae5b6d8f9 100644
--- a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
+++ b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
@@ -26,69 +26,103 @@ using Qpid.Messaging;
namespace Qpid.Client.Tests
{
- [TestFixture]
- public class UndeliverableTest : BaseMessagingTestFixture
- {
- private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
+ /// <summary>
+ /// Tests that when sending undeliverable messages with the
+ /// mandatory flag set, an exception is raised on the connection
+ /// as the message is bounced back by the broker
+ /// </summary>
+ [TestFixture]
+ public class UndeliverableTest : BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
+ private ManualResetEvent _event;
+ public const int TIMEOUT = 1000;
+ private Exception _lastException;
- [SetUp]
- public override void Init()
- {
- base.Init();
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _event = new ManualResetEvent(false);
+ _lastException = null;
- try
- {
- _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
- }
- catch (QpidException e)
+ try
+ {
+ _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
+ } catch ( QpidException e )
+ {
+ _logger.Error("Could not add ExceptionListener", e);
+ }
+ }
+
+ public void OnException(Exception e)
+ {
+ // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message.
+
+ _lastException = e;
+ _logger.Error("OnException handler received connection-level exception", e);
+ if ( e is QpidException )
+ {
+ QpidException qe = (QpidException)e;
+ if ( qe.InnerException is AMQUndeliveredException )
{
- _logger.Error("Could not add ExceptionListener", e);
+ AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
+ _logger.Error("inner exception is AMQUndeliveredException", ue);
+ _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
}
- }
+ }
+ _event.Set();
+ }
- public static void OnException(Exception e)
- {
- // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message.
+ [Test]
+ public void SendUndeliverableMessageOnDefaultExchange()
+ {
+ SendOne("default exchange", null);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnDirectExchange()
+ {
+ SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnTopicExchange()
+ {
+ SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnHeadersExchange()
+ {
+ SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
+ }
- _logger.Error("OnException handler received connection-level exception", e);
- if (e is QpidException)
- {
- QpidException qe = (QpidException)e;
- if (qe.InnerException is AMQUndeliveredException)
- {
- AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
- _logger.Error("inner exception is AMQUndeliveredException", ue);
- _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
+ private void SendOne(string exchangeNameFriendly, string exchangeName)
+ {
+ _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
- }
- }
- }
+ // Send a test message to a non-existant queue
+ // on the specified exchange. See if message is returned!
+ MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
+ .WithRoutingKey("Non-existant route key!")
+ .WithMandatory(true); // necessary so that the server bounces the message back
+ if ( exchangeName != null )
+ {
+ builder.WithExchangeName(exchangeName);
+ }
+ IMessagePublisher publisher = builder.Create();
+ publisher.Send(_channel.CreateTextMessage("Hiya!"));
- [Test]
- public void SendUndeliverableMessage()
- {
- SendOne("default exchange", null);
- SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
- SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
- SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
+ // check we received an exception on the connection
+ // and that it is of the right type
+ _event.WaitOne(TIMEOUT, true);
- Thread.Sleep(1000); // Wait for message returns!
- }
+ Type expectedException = typeof(AMQUndeliveredException);
+ Exception ex = _lastException;
+ Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException);
- private void SendOne(string exchangeNameFriendly, string exchangeName)
- {
- _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
+ if ( ex.InnerException != null )
+ ex = ex.InnerException;
- // Send a test message to a non-existant queue on the default exchange. See if message is returned!
- MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
- .WithRoutingKey("Non-existant route key!")
- .WithMandatory(true);
- if (exchangeName != null)
- {
- builder.WithExchangeName(exchangeName);
- }
- IMessagePublisher publisher = builder.Create();
- publisher.Send(_channel.CreateTextMessage("Hiya!"));
- }
- }
+ Assert.IsInstanceOfType(expectedException, ex);
+ }
+ }
}
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 07650c170b..3471ac3640 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -28,6 +28,7 @@ using Qpid.Client.Message;
using Qpid.Collections;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Protocol;
namespace Qpid.Client
{
@@ -568,8 +569,14 @@ namespace Qpid.Client
if (_logger.IsDebugEnabled)
{
_logger.Debug("Message received in session with channel id " + _channelId);
- }
- _queue.EnqueueBlocking(message);
+ }
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ } else
+ {
+ _queue.EnqueueBlocking(message);
+ }
}
public int DefaultPrefetch
@@ -986,5 +993,42 @@ namespace Qpid.Client
// FIXME: lock FailoverMutex here?
_connection.ProtocolWriter.Write(ackFrame);
}
+
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(
+ 0, false, message.ContentHeader,
+ message.Bodies
+ );
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQException exception;
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ {
+ exception = new AMQNoConsumersException(reason, bouncedMessage);
+ } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ } else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+ }
+ _connection.ExceptionReceived(exception);
+ } catch ( Exception ex )
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
+ }
+
+ }
}
}
diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
index 78526f906f..0bd65a1ace 100644
--- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
@@ -32,7 +32,7 @@ namespace Qpid.Client.Handler
public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
{
- _logger.Debug("New JmsBounce method received");
+ _logger.Debug("New Basic.Return method received");
UnprocessedMessage msg = new UnprocessedMessage();
msg.DeliverBody = null;
msg.BounceBody = (BasicReturnBody) evt.Method;
diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
index 0ce8a393c9..7f88dd8219 100644
--- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
+++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
@@ -44,11 +44,20 @@ namespace Qpid.Client.Handler
AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
evt.ProtocolSession.WriteFrame(frame);
- // HACK
+
if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
{
- _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
- evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason));
+ _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ throw new AMQNoConsumersException(reason);
+ if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ throw new AMQNoRouteException(reason);
+ if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+ throw new AMQInvalidArgumentException(reason);
+ if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+ throw new AMQInvalidRoutingKeyException(reason);
+ // any other
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
}
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj
index 19d2180a09..31cd5e03ae 100644
--- a/dotnet/Qpid.Client/Qpid.Client.csproj
+++ b/dotnet/Qpid.Client/Qpid.Client.csproj
@@ -43,6 +43,8 @@
<Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -144,4 +146,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project> \ No newline at end of file
diff --git a/dotnet/Qpid.Common/Protocol/AMQConstant.cs b/dotnet/Qpid.Common/Protocol/AMQConstant.cs
index 560ac97122..b85dd2ff4c 100644
--- a/dotnet/Qpid.Common/Protocol/AMQConstant.cs
+++ b/dotnet/Qpid.Common/Protocol/AMQConstant.cs
@@ -77,17 +77,20 @@ namespace Qpid.Protocol
public static readonly AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
public static readonly AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
public static readonly AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
- public static readonly AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
- public static readonly AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
public static readonly AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static readonly AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
public static readonly AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+ public static readonly AMQConstant ALREADY_EXISTS = new AMQConstant(405, "already exists", true);
+ public static readonly AMQConstant IN_USE = new AMQConstant(406, "in use", true);
+ public static readonly AMQConstant INVALID_ROUTING_KEY = new AMQConstant(407, "routing key invalid", true);
+ public static readonly AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "request timeout", true);
+ public static readonly AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
public static readonly AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static readonly AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
public static readonly AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true);
public static readonly AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true);
public static readonly AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
+ public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
public static readonly AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
public static readonly AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true);
diff --git a/dotnet/Qpid.Common/Qpid.Common.csproj b/dotnet/Qpid.Common/Qpid.Common.csproj
index ec85567a25..944fe24677 100644
--- a/dotnet/Qpid.Common/Qpid.Common.csproj
+++ b/dotnet/Qpid.Common/Qpid.Common.csproj
@@ -44,6 +44,8 @@
<Compile Include="AMQConnectionClosedException.cs" />
<Compile Include="AMQDisconnectedException.cs" />
<Compile Include="AMQException.cs" />
+ <Compile Include="AMQInvalidArgumentException.cs" />
+ <Compile Include="AMQInvalidRoutingKeyException.cs" />
<Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
@@ -208,4 +210,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project> \ No newline at end of file