diff options
5 files changed, 304 insertions, 73 deletions
diff --git a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs index 42fa0c5be9..657c512a67 100644 --- a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs +++ b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs @@ -159,7 +159,7 @@ namespace Apache.Qpid.Client.Tests.interop.TestCases log.Debug("public Message GetReport(IChannel channel): called");
// Close the test connection.
- connection.Stop();
+ //connection.Stop();
// Generate a report message containing the count of the number of messages passed.
IMessage report = channel.CreateMessage();
diff --git a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs index 2f5c63ca9c..937730576d 100644 --- a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs +++ b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs @@ -121,6 +121,8 @@ namespace Apache.Qpid.Client.Tests.interop.TestCases publisher = channel[0].CreatePublisherBuilder()
.WithExchangeName(ExchangeNameDefaults.TOPIC)
.WithRoutingKey(sendDestination)
+ .WithMandatory(false)
+ .WithImmediate(false)
.Create();
break;
@@ -187,7 +189,15 @@ namespace Apache.Qpid.Client.Tests.interop.TestCases // Close the test connections.
/*foreach (IConnection con in connection)
{
- con.Close();
+ try
+ {
+ con.Stop();
+ }
+ catch (AMQConnectionClosedException e)
+ {
+ // The connection has already died due to an error. Log this as a warning.
+ log.Warn("Connection already closed.");
+ }
}*/
// Generate a report message containing the count of the number of messages passed.
diff --git a/dotnet/Qpid.Client.Tests/interop/TestClient.cs b/dotnet/Qpid.Client.Tests/interop/TestClient.cs index e8d37ce5b3..3edd28a892 100644 --- a/dotnet/Qpid.Client.Tests/interop/TestClient.cs +++ b/dotnet/Qpid.Client.Tests/interop/TestClient.cs @@ -32,13 +32,13 @@ namespace Apache.Qpid.Client.Tests.interop public class TestClient
{
private static ILog log = LogManager.GetLogger(typeof(TestClient));
-
+
/// <summary> Defines the default broker for the tests, localhost, default port. </summary>
public static string DEFAULT_BROKER_URL = "amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'";
-
+
/// <summary> Defines the default virtual host to use for the tests, none. </summary>
public static string DEFAULT_VIRTUAL_HOST = "";
-
+
/// <summary> Defines the default identifying name of this test client. </summary>
public static string DEFAULT_CLIENT_NAME = "dotnet";
@@ -74,7 +74,7 @@ namespace Apache.Qpid.Client.Tests.interop public TestClient(string brokerUrl, string virtualHost, string clientName)
{
log.Info("public TestClient(string brokerUrl = " + brokerUrl + ", string virtualHost = " + virtualHost
- + ", string clientName = " + clientName + "): called");
+ + ", string clientName = " + clientName + "): called");
// Retain the connection parameters.
TestClient.brokerUrl = brokerUrl;
@@ -117,6 +117,8 @@ namespace Apache.Qpid.Client.Tests.interop clientName = nextArg.Substring(2);
}
}
+
+ NDC.Push(clientName);
// Create a test client and start it running.
TestClient client = new TestClient(brokerUrl, virtualHost, clientName);
@@ -136,6 +138,8 @@ namespace Apache.Qpid.Client.Tests.interop {
Monitor.Wait(terminationMonitor);
}
+
+ NDC.Pop();
}
/// <summary>
@@ -202,7 +206,7 @@ namespace Apache.Qpid.Client.Tests.interop public static IConnection CreateConnection(string brokerUrl, string virtualHost)
{
log.Info("public static Connection createConnection(string brokerUrl = " + brokerUrl + ", string virtualHost = "
- + virtualHost + "): called");
+ + virtualHost + "): called");
// Create a connection to the broker.
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(brokerUrl);
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs index f8c44261f2..34b47137e5 100644 --- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs +++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -38,41 +38,36 @@ namespace Apache.Qpid.Client.Message protected bool _readableMessage = false; private QpidHeaders _headers; -#region new_java_ctrs - protected AbstractQmsMessage(ByteBuffer data) : base(new BasicContentHeaderProperties()) { - Init(data); + Init(data); } protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) : this(contentHeader, deliveryTag) { - Init(data); + Init(data); } protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) { - Init(null); + Init(null); } private void Init(ByteBuffer data) { - _data = data; - if ( _data != null ) - { - _data.Acquire(); - } - _readableMessage = (data != null); - if ( ContentHeaderProperties.Headers == null ) - ContentHeaderProperties.Headers = new FieldTable(); - _headers = new QpidHeaders(ContentHeaderProperties.Headers); + _data = data; + if ( _data != null ) + { + _data.Acquire(); + } + _readableMessage = (data != null); + if ( ContentHeaderProperties.Headers == null ) + ContentHeaderProperties.Headers = new FieldTable(); + _headers = new QpidHeaders(ContentHeaderProperties.Headers); } -#endregion - - #region Properties // // Properties // @@ -82,7 +77,8 @@ namespace Apache.Qpid.Client.Message /// </summary> public string MessageId { - get { + get + { if (ContentHeaderProperties.MessageId == null) { ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; @@ -145,12 +141,11 @@ namespace Apache.Qpid.Client.Message { get { - Dest dest = ReadReplyToHeader(); - return dest.ExchangeName; + return ReadReplyToHeader().ExchangeName; } set { - Dest dest = ReadReplyToHeader(); + BindingURL dest = ReadReplyToHeader(); dest.ExchangeName = value; WriteReplyToHeader(dest); } @@ -163,19 +158,16 @@ namespace Apache.Qpid.Client.Message { get { - Dest dest = ReadReplyToHeader(); - return dest.RoutingKey; + return ReadReplyToHeader().RoutingKey; } set { - Dest dest = ReadReplyToHeader(); + BindingURL dest = ReadReplyToHeader(); dest.RoutingKey = value; WriteReplyToHeader(dest); } } - - /// <summary> /// Non-persistent (1) or persistent (2) /// </summary> @@ -186,12 +178,12 @@ namespace Apache.Qpid.Client.Message byte b = ContentHeaderProperties.DeliveryMode; switch (b) { - case 1: - return DeliveryMode.NonPersistent; - case 2: - return DeliveryMode.Persistent; - default: - throw new QpidException("Illegal value for delivery mode in content header properties"); + case 1: + return DeliveryMode.NonPersistent; + case 2: + return DeliveryMode.Persistent; + default: + throw new QpidException("Illegal value for delivery mode in content header properties"); } } set @@ -200,9 +192,9 @@ namespace Apache.Qpid.Client.Message } } - /// <summary> - /// True, if this is a redelivered message - /// </summary> + /// <summary> + /// True, if this is a redelivered message + /// </summary> public bool Redelivered { get { return _redelivered; } @@ -319,8 +311,6 @@ namespace Apache.Qpid.Client.Message _data = value; } } - #endregion // Properties - public void Acknowledge() { @@ -434,10 +424,10 @@ namespace Apache.Qpid.Client.Message protected void CheckReadable() { - if ( !_readableMessage ) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } + if ( !_readableMessage ) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } } /// <summary> @@ -451,40 +441,254 @@ namespace Apache.Qpid.Client.Message /// </summary> /// /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns> - private Dest ReadReplyToHeader() + private BindingURL ReadReplyToHeader() { - string replyToEncoding = ContentHeaderProperties.ReplyTo; + string replyToEncoding = ContentHeaderProperties.ReplyTo; + //log.Debug("replyToEncoding = " + replyToEncoding); + + BindingURL bindingUrl = new BindingURL(replyToEncoding); + //log.Debug("bindingUrl = " + bindingUrl.ToString()); + + return bindingUrl; - //log.Info("replyToEncoding = " + replyToEncoding); + //log.Info("replyToEncoding = " + replyToEncoding); + +// if ( replyToEncoding == null ) +// { +// return new Dest(); +// } else +// { +// // Split the replyto field on a ':' +// string[] split = replyToEncoding.Split(':'); + +// // Ensure that the replyto field argument only consisted of two parts. +// if ( split.Length != 2 ) +// { +// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); +// } + +// // Extract the exchange name and routing key from the split replyto field. +// string exchangeName = split[0]; + +// string[] split2 = split[1].Split('/'); +// string routingKey = split2[3]; + +// return new Dest(exchangeName, routingKey); +// } + } + + private void WriteReplyToHeader(BindingURL dest) + { + string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); + ContentHeaderProperties.ReplyTo = encodedDestination; + } + } + + public class BindingURL + { + public readonly static string OPTION_EXCLUSIVE = "exclusive"; + public readonly static string OPTION_AUTODELETE = "autodelete"; + public readonly static string OPTION_DURABLE = "durable"; + public readonly static string OPTION_CLIENTID = "clientid"; + public readonly static string OPTION_SUBSCRIPTION = "subscription"; + public readonly static string OPTION_ROUTING_KEY = "routingkey"; + + /// <summary> Holds the undecoded URL </summary> + string url; + + /// <summary> Holds the decoded options. </summary> + IDictionary options = new Hashtable(); + + /// <summary> Holds the decoded exchange class. </summary> + string exchangeClass; + + /// <summary> Holds the decoded exchange name. </summary> + string exchangeName; + + /// <summary> Holds the destination name. </summary> + string destination; + + /// <summary> Holds the decoded queue name. </summary> + string queueName; + + /// <summary> + /// The binding URL has the format: + /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + /// </summary> + public BindingURL(string url) + { + this.url = url; + Parse(); + } + + public string Url { get { return url; } } + + public string ExchangeClass + { + get { return exchangeClass; } + set { exchangeClass = value; } + } - if ( replyToEncoding == null ) - { - return new Dest(); - } else - { - // Split the replyto field on a ':' - string[] split = replyToEncoding.Split(':'); + public string ExchangeName + { + get { return exchangeName; } + set { exchangeName = value; } + } - // Ensure that the replyto field argument only consisted of two parts. - if ( split.Length != 2 ) - { - throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); - } + public string QueueName + { + get { return queueName; } + set { queueName = value; } + } - // Extract the exchange name and routing key from the split replyto field. - string exchangeName = split[0]; + public string DestinationName + { + get { return destination; } + set { destination = value; } + } - string[] split2 = split[1].Split('/'); - string routingKey = split2[3]; + public string RoutingKey { + get { return (string)options[OPTION_ROUTING_KEY]; } + set { options[OPTION_ROUTING_KEY] = value; } + } - return new Dest(exchangeName, routingKey); - } + public bool ContainsOption(string key) { return options.Contains(key); } + + public string ToString() + { + return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + + ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] "; } - private void WriteReplyToHeader(Dest dest) + private void Parse() { - string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); - ContentHeaderProperties.ReplyTo = encodedDestination; + Uri binding = new Uri(url); + + // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified. + string exchangeClass = binding.Scheme; + + if (exchangeClass == null) + { + url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url; + Parse(); + + return; + } + else + { + this.exchangeClass = exchangeClass; + } + + // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified. + string exchangeName = binding.Host; + + if (exchangeName == null) + { + if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS)) + { + this.exchangeName = ""; + } + } + else + { + this.exchangeName = exchangeName; + } + + // Extract the destination and queue name. + if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals("")) + { + throw new UriFormatException("Destination or Queue required"); + } + else + { + int slashOffset = binding.AbsolutePath.IndexOf("/", 1); + if (slashOffset == -1) + { + throw new UriFormatException("Destination required"); + } + else + { + String path = binding.AbsolutePath; + + this.destination = path.Substring(1, slashOffset - 1); + this.queueName = path.Substring(slashOffset + 1); + } + } + + ParseOptions(options, binding.Query); + + // If the routing key is not set as an option, set it to the destination name. + if (!ContainsOption(OPTION_ROUTING_KEY)) + { + options[OPTION_ROUTING_KEY] = destination; + } } + + /// <summary> + /// options looks like this + /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value' + /// </summary> + public static void ParseOptions(IDictionary optionMap, string options) + { + // Check that there really are some options to parse. + if ((options == null) || (options.IndexOf('=') == -1)) + { + return; + } + + int optionIndex = options.IndexOf('='); + string option = options.Substring(0, optionIndex); + int length = options.Length; + int nestedQuotes = 0; + + // Holds the index of the final "'". + int valueIndex = optionIndex; + + // Loop over all the options.Dest + while ((nestedQuotes > 0) || (valueIndex < length)) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options[valueIndex] == '\'') + { + if ((valueIndex + 1) < options.Length) + { + if ((options[valueIndex + 1] == '&') || + (options[valueIndex + 1] == ',') || + (options[valueIndex + 1] == ';') || + (options[valueIndex + 1] == '\'')) + { + nestedQuotes--; + + if (nestedQuotes == 0) + { + // We've found the value of an option + break; + } + } + else + { + nestedQuotes++; + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options[valueIndex] == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + } } } diff --git a/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs b/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs index cda92d6b55..2689fb5e46 100644 --- a/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs +++ b/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs @@ -24,6 +24,19 @@ namespace Apache.Qpid.Messaging { public readonly static string TOPIC = "amq.topic"; public readonly static string DIRECT = "amq.direct"; - public readonly static string HEADERS = "amq.match"; + public readonly static string HEADERS = "amq.match"; + public readonly static string FANOUT = "amq.fanout"; + + /// <summary> Defines the identifying type name of topic exchanges. </summary> + public readonly static string TOPIC_EXCHANGE_CLASS = "topic"; + + /// <summary> Defines the identifying type name of direct exchanges. </summary> + public readonly static string DIRECT_EXCHANGE_CLASS = "direct"; + + /// <summary> Defines the identifying type name of headers exchanges. </summary> + public readonly static string HEADERS_EXCHANGE_CLASS = "headers"; + + /// <summary> Defines the identifying type name of fanout exchanges. </summary> + public readonly static string FANOUT_EXCHANGE_CLASS = "fanout"; } } |