summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs2
-rw-r--r--dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs12
-rw-r--r--dotnet/Qpid.Client.Tests/interop/TestClient.cs14
-rw-r--r--dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs334
-rw-r--r--dotnet/Qpid.Messaging/ExchangeNameDefaults.cs15
5 files changed, 304 insertions, 73 deletions
diff --git a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs
index 42fa0c5be9..657c512a67 100644
--- a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs
+++ b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase2BasicP2P.cs
@@ -159,7 +159,7 @@ namespace Apache.Qpid.Client.Tests.interop.TestCases
log.Debug("public Message GetReport(IChannel channel): called");
// Close the test connection.
- connection.Stop();
+ //connection.Stop();
// Generate a report message containing the count of the number of messages passed.
IMessage report = channel.CreateMessage();
diff --git a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs
index 2f5c63ca9c..937730576d 100644
--- a/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs
+++ b/dotnet/Qpid.Client.Tests/interop/TestCases/TestCase3BasicPubSub.cs
@@ -121,6 +121,8 @@ namespace Apache.Qpid.Client.Tests.interop.TestCases
publisher = channel[0].CreatePublisherBuilder()
.WithExchangeName(ExchangeNameDefaults.TOPIC)
.WithRoutingKey(sendDestination)
+ .WithMandatory(false)
+ .WithImmediate(false)
.Create();
break;
@@ -187,7 +189,15 @@ namespace Apache.Qpid.Client.Tests.interop.TestCases
// Close the test connections.
/*foreach (IConnection con in connection)
{
- con.Close();
+ try
+ {
+ con.Stop();
+ }
+ catch (AMQConnectionClosedException e)
+ {
+ // The connection has already died due to an error. Log this as a warning.
+ log.Warn("Connection already closed.");
+ }
}*/
// Generate a report message containing the count of the number of messages passed.
diff --git a/dotnet/Qpid.Client.Tests/interop/TestClient.cs b/dotnet/Qpid.Client.Tests/interop/TestClient.cs
index e8d37ce5b3..3edd28a892 100644
--- a/dotnet/Qpid.Client.Tests/interop/TestClient.cs
+++ b/dotnet/Qpid.Client.Tests/interop/TestClient.cs
@@ -32,13 +32,13 @@ namespace Apache.Qpid.Client.Tests.interop
public class TestClient
{
private static ILog log = LogManager.GetLogger(typeof(TestClient));
-
+
/// <summary> Defines the default broker for the tests, localhost, default port. </summary>
public static string DEFAULT_BROKER_URL = "amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'";
-
+
/// <summary> Defines the default virtual host to use for the tests, none. </summary>
public static string DEFAULT_VIRTUAL_HOST = "";
-
+
/// <summary> Defines the default identifying name of this test client. </summary>
public static string DEFAULT_CLIENT_NAME = "dotnet";
@@ -74,7 +74,7 @@ namespace Apache.Qpid.Client.Tests.interop
public TestClient(string brokerUrl, string virtualHost, string clientName)
{
log.Info("public TestClient(string brokerUrl = " + brokerUrl + ", string virtualHost = " + virtualHost
- + ", string clientName = " + clientName + "): called");
+ + ", string clientName = " + clientName + "): called");
// Retain the connection parameters.
TestClient.brokerUrl = brokerUrl;
@@ -117,6 +117,8 @@ namespace Apache.Qpid.Client.Tests.interop
clientName = nextArg.Substring(2);
}
}
+
+ NDC.Push(clientName);
// Create a test client and start it running.
TestClient client = new TestClient(brokerUrl, virtualHost, clientName);
@@ -136,6 +138,8 @@ namespace Apache.Qpid.Client.Tests.interop
{
Monitor.Wait(terminationMonitor);
}
+
+ NDC.Pop();
}
/// <summary>
@@ -202,7 +206,7 @@ namespace Apache.Qpid.Client.Tests.interop
public static IConnection CreateConnection(string brokerUrl, string virtualHost)
{
log.Info("public static Connection createConnection(string brokerUrl = " + brokerUrl + ", string virtualHost = "
- + virtualHost + "): called");
+ + virtualHost + "): called");
// Create a connection to the broker.
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(brokerUrl);
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
index f8c44261f2..34b47137e5 100644
--- a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -38,41 +38,36 @@ namespace Apache.Qpid.Client.Message
protected bool _readableMessage = false;
private QpidHeaders _headers;
-#region new_java_ctrs
-
protected AbstractQmsMessage(ByteBuffer data)
: base(new BasicContentHeaderProperties())
{
- Init(data);
+ Init(data);
}
protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
: this(contentHeader, deliveryTag)
{
- Init(data);
+ Init(data);
}
protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
{
- Init(null);
+ Init(null);
}
private void Init(ByteBuffer data)
{
- _data = data;
- if ( _data != null )
- {
- _data.Acquire();
- }
- _readableMessage = (data != null);
- if ( ContentHeaderProperties.Headers == null )
- ContentHeaderProperties.Headers = new FieldTable();
- _headers = new QpidHeaders(ContentHeaderProperties.Headers);
+ _data = data;
+ if ( _data != null )
+ {
+ _data.Acquire();
+ }
+ _readableMessage = (data != null);
+ if ( ContentHeaderProperties.Headers == null )
+ ContentHeaderProperties.Headers = new FieldTable();
+ _headers = new QpidHeaders(ContentHeaderProperties.Headers);
}
-#endregion
-
- #region Properties
//
// Properties
//
@@ -82,7 +77,8 @@ namespace Apache.Qpid.Client.Message
/// </summary>
public string MessageId
{
- get {
+ get
+ {
if (ContentHeaderProperties.MessageId == null)
{
ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
@@ -145,12 +141,11 @@ namespace Apache.Qpid.Client.Message
{
get
{
- Dest dest = ReadReplyToHeader();
- return dest.ExchangeName;
+ return ReadReplyToHeader().ExchangeName;
}
set
{
- Dest dest = ReadReplyToHeader();
+ BindingURL dest = ReadReplyToHeader();
dest.ExchangeName = value;
WriteReplyToHeader(dest);
}
@@ -163,19 +158,16 @@ namespace Apache.Qpid.Client.Message
{
get
{
- Dest dest = ReadReplyToHeader();
- return dest.RoutingKey;
+ return ReadReplyToHeader().RoutingKey;
}
set
{
- Dest dest = ReadReplyToHeader();
+ BindingURL dest = ReadReplyToHeader();
dest.RoutingKey = value;
WriteReplyToHeader(dest);
}
}
-
-
/// <summary>
/// Non-persistent (1) or persistent (2)
/// </summary>
@@ -186,12 +178,12 @@ namespace Apache.Qpid.Client.Message
byte b = ContentHeaderProperties.DeliveryMode;
switch (b)
{
- case 1:
- return DeliveryMode.NonPersistent;
- case 2:
- return DeliveryMode.Persistent;
- default:
- throw new QpidException("Illegal value for delivery mode in content header properties");
+ case 1:
+ return DeliveryMode.NonPersistent;
+ case 2:
+ return DeliveryMode.Persistent;
+ default:
+ throw new QpidException("Illegal value for delivery mode in content header properties");
}
}
set
@@ -200,9 +192,9 @@ namespace Apache.Qpid.Client.Message
}
}
- /// <summary>
- /// True, if this is a redelivered message
- /// </summary>
+ /// <summary>
+ /// True, if this is a redelivered message
+ /// </summary>
public bool Redelivered
{
get { return _redelivered; }
@@ -319,8 +311,6 @@ namespace Apache.Qpid.Client.Message
_data = value;
}
}
- #endregion // Properties
-
public void Acknowledge()
{
@@ -434,10 +424,10 @@ namespace Apache.Qpid.Client.Message
protected void CheckReadable()
{
- if ( !_readableMessage )
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
+ if ( !_readableMessage )
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
}
/// <summary>
@@ -451,40 +441,254 @@ namespace Apache.Qpid.Client.Message
/// </summary>
///
/// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
- private Dest ReadReplyToHeader()
+ private BindingURL ReadReplyToHeader()
{
- string replyToEncoding = ContentHeaderProperties.ReplyTo;
+ string replyToEncoding = ContentHeaderProperties.ReplyTo;
+ //log.Debug("replyToEncoding = " + replyToEncoding);
+
+ BindingURL bindingUrl = new BindingURL(replyToEncoding);
+ //log.Debug("bindingUrl = " + bindingUrl.ToString());
+
+ return bindingUrl;
- //log.Info("replyToEncoding = " + replyToEncoding);
+ //log.Info("replyToEncoding = " + replyToEncoding);
+
+// if ( replyToEncoding == null )
+// {
+// return new Dest();
+// } else
+// {
+// // Split the replyto field on a ':'
+// string[] split = replyToEncoding.Split(':');
+
+// // Ensure that the replyto field argument only consisted of two parts.
+// if ( split.Length != 2 )
+// {
+// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+// }
+
+// // Extract the exchange name and routing key from the split replyto field.
+// string exchangeName = split[0];
+
+// string[] split2 = split[1].Split('/');
+// string routingKey = split2[3];
+
+// return new Dest(exchangeName, routingKey);
+// }
+ }
+
+ private void WriteReplyToHeader(BindingURL dest)
+ {
+ string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+ ContentHeaderProperties.ReplyTo = encodedDestination;
+ }
+ }
+
+ public class BindingURL
+ {
+ public readonly static string OPTION_EXCLUSIVE = "exclusive";
+ public readonly static string OPTION_AUTODELETE = "autodelete";
+ public readonly static string OPTION_DURABLE = "durable";
+ public readonly static string OPTION_CLIENTID = "clientid";
+ public readonly static string OPTION_SUBSCRIPTION = "subscription";
+ public readonly static string OPTION_ROUTING_KEY = "routingkey";
+
+ /// <summary> Holds the undecoded URL </summary>
+ string url;
+
+ /// <summary> Holds the decoded options. </summary>
+ IDictionary options = new Hashtable();
+
+ /// <summary> Holds the decoded exchange class. </summary>
+ string exchangeClass;
+
+ /// <summary> Holds the decoded exchange name. </summary>
+ string exchangeName;
+
+ /// <summary> Holds the destination name. </summary>
+ string destination;
+
+ /// <summary> Holds the decoded queue name. </summary>
+ string queueName;
+
+ /// <summary>
+ /// The binding URL has the format:
+ /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ /// </summary>
+ public BindingURL(string url)
+ {
+ this.url = url;
+ Parse();
+ }
+
+ public string Url { get { return url; } }
+
+ public string ExchangeClass
+ {
+ get { return exchangeClass; }
+ set { exchangeClass = value; }
+ }
- if ( replyToEncoding == null )
- {
- return new Dest();
- } else
- {
- // Split the replyto field on a ':'
- string[] split = replyToEncoding.Split(':');
+ public string ExchangeName
+ {
+ get { return exchangeName; }
+ set { exchangeName = value; }
+ }
- // Ensure that the replyto field argument only consisted of two parts.
- if ( split.Length != 2 )
- {
- throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
- }
+ public string QueueName
+ {
+ get { return queueName; }
+ set { queueName = value; }
+ }
- // Extract the exchange name and routing key from the split replyto field.
- string exchangeName = split[0];
+ public string DestinationName
+ {
+ get { return destination; }
+ set { destination = value; }
+ }
- string[] split2 = split[1].Split('/');
- string routingKey = split2[3];
+ public string RoutingKey {
+ get { return (string)options[OPTION_ROUTING_KEY]; }
+ set { options[OPTION_ROUTING_KEY] = value; }
+ }
- return new Dest(exchangeName, routingKey);
- }
+ public bool ContainsOption(string key) { return options.Contains(key); }
+
+ public string ToString()
+ {
+ return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
+ ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
}
- private void WriteReplyToHeader(Dest dest)
+ private void Parse()
{
- string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
- ContentHeaderProperties.ReplyTo = encodedDestination;
+ Uri binding = new Uri(url);
+
+ // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
+ string exchangeClass = binding.Scheme;
+
+ if (exchangeClass == null)
+ {
+ url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
+ Parse();
+
+ return;
+ }
+ else
+ {
+ this.exchangeClass = exchangeClass;
+ }
+
+ // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
+ string exchangeName = binding.Host;
+
+ if (exchangeName == null)
+ {
+ if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ this.exchangeName = "";
+ }
+ }
+ else
+ {
+ this.exchangeName = exchangeName;
+ }
+
+ // Extract the destination and queue name.
+ if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
+ {
+ throw new UriFormatException("Destination or Queue required");
+ }
+ else
+ {
+ int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
+ if (slashOffset == -1)
+ {
+ throw new UriFormatException("Destination required");
+ }
+ else
+ {
+ String path = binding.AbsolutePath;
+
+ this.destination = path.Substring(1, slashOffset - 1);
+ this.queueName = path.Substring(slashOffset + 1);
+ }
+ }
+
+ ParseOptions(options, binding.Query);
+
+ // If the routing key is not set as an option, set it to the destination name.
+ if (!ContainsOption(OPTION_ROUTING_KEY))
+ {
+ options[OPTION_ROUTING_KEY] = destination;
+ }
}
+
+ /// <summary>
+ /// options looks like this
+ /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
+ /// </summary>
+ public static void ParseOptions(IDictionary optionMap, string options)
+ {
+ // Check that there really are some options to parse.
+ if ((options == null) || (options.IndexOf('=') == -1))
+ {
+ return;
+ }
+
+ int optionIndex = options.IndexOf('=');
+ string option = options.Substring(0, optionIndex);
+ int length = options.Length;
+ int nestedQuotes = 0;
+
+ // Holds the index of the final "'".
+ int valueIndex = optionIndex;
+
+ // Loop over all the options.Dest
+ while ((nestedQuotes > 0) || (valueIndex < length))
+ {
+ valueIndex++;
+
+ if (valueIndex >= length)
+ {
+ break;
+ }
+
+ if (options[valueIndex] == '\'')
+ {
+ if ((valueIndex + 1) < options.Length)
+ {
+ if ((options[valueIndex + 1] == '&') ||
+ (options[valueIndex + 1] == ',') ||
+ (options[valueIndex + 1] == ';') ||
+ (options[valueIndex + 1] == '\''))
+ {
+ nestedQuotes--;
+
+ if (nestedQuotes == 0)
+ {
+ // We've found the value of an option
+ break;
+ }
+ }
+ else
+ {
+ nestedQuotes++;
+ }
+ }
+ else
+ {
+ // We are at the end of the string
+ // Check to see if we are corectly closing quotes
+ if (options[valueIndex] == '\'')
+ {
+ nestedQuotes--;
+ }
+
+ break;
+ }
+ }
+ }
+ }
}
}
diff --git a/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs b/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs
index cda92d6b55..2689fb5e46 100644
--- a/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs
+++ b/dotnet/Qpid.Messaging/ExchangeNameDefaults.cs
@@ -24,6 +24,19 @@ namespace Apache.Qpid.Messaging
{
public readonly static string TOPIC = "amq.topic";
public readonly static string DIRECT = "amq.direct";
- public readonly static string HEADERS = "amq.match";
+ public readonly static string HEADERS = "amq.match";
+ public readonly static string FANOUT = "amq.fanout";
+
+ /// <summary> Defines the identifying type name of topic exchanges. </summary>
+ public readonly static string TOPIC_EXCHANGE_CLASS = "topic";
+
+ /// <summary> Defines the identifying type name of direct exchanges. </summary>
+ public readonly static string DIRECT_EXCHANGE_CLASS = "direct";
+
+ /// <summary> Defines the identifying type name of headers exchanges. </summary>
+ public readonly static string HEADERS_EXCHANGE_CLASS = "headers";
+
+ /// <summary> Defines the identifying type name of fanout exchanges. </summary>
+ public readonly static string FANOUT_EXCHANGE_CLASS = "fanout";
}
}