summaryrefslogtreecommitdiff
path: root/qpid/dotnet
diff options
context:
space:
mode:
authorTomas Restrepo <tomasr@apache.org>2007-05-18 00:51:12 +0000
committerTomas Restrepo <tomasr@apache.org>2007-05-18 00:51:12 +0000
commitad680e710c07c5ebfc0d391be98dd60ed0ccbaf9 (patch)
tree3ed0e02f1a2d42b82011c9a612c337570dd5beb9 /qpid/dotnet
parent297cbc84e777d48c2111baa688ba46383c068067 (diff)
downloadqpid-python-ad680e710c07c5ebfc0d391be98dd60ed0ccbaf9.tar.gz
Merged revisions 537954-538078,538080-538083,538085-538097,538099-538108,538110-538239,538241-538881,538883-538906,538908-538911,538913-538921,538923-539191 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r537954 | tomasr | 2007-05-14 14:10:59 -0500 (Mon, 14 May 2007) | 4 lines * QPID-487 (Contributed by Carlos Medina) Fix QpidConnectionInfo.ToString() * QPID-485 (Contributed by Carlos Medina) Fix AmqBrokerInfo.Equals() * QPID-456 Enforce virtual host names start with '/' ........ r538035 | tomasr | 2007-05-14 20:33:00 -0500 (Mon, 14 May 2007) | 6 lines * QPID-452 Improve message classes API * Add XML documentation to IChannel and IMessage * Add missing BrokerDetailTests * Add new tests for message creation and message factories * Fix wrong default encoding for text messages ........ r539178 | tomasr | 2007-05-17 18:50:50 -0500 (Thu, 17 May 2007) | 6 lines * QPID-492 Fix Race condition in message decoding * QPID-249 Make ServiceRequestingClient and ServiceProvidingClient a single, self contained test * Fix incorrect exception message in Qpid.Buffers, improve tests * Make ContentBody use an sliced buffer to avoid extra data copy * Remove useless tests in Qpid.Client (Blocking IO tests) ........ r539191 | tomasr | 2007-05-17 19:18:26 -0500 (Thu, 17 May 2007) | 1 line QPID-490 (Contributed by Carlos Medina) Implement PurgeQueue and DeleteQueue ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@539198 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet')
-rw-r--r--qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs4
-rw-r--r--qpid/dotnet/Qpid.Buffer/ByteBuffer.cs6
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/BrokerDetails/BrokerDetailsTest.cs65
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/Channel/ChannelMessageCreationTests.cs78
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs251
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/Messages/MessageFactoryRegistryTests.cs113
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs8
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj5
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs73
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/log4net.config6
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs66
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs64
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs38
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs37
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqChannel.cs286
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs578
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs4
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs44
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs44
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs4
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs318
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs16
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs173
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs12
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs6
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs24
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs21
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs6
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs2
-rw-r--r--qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs66
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs4
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs61
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs4
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs59
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs31
-rw-r--r--qpid/dotnet/Qpid.Client/Qpid.Client.csproj3
-rw-r--r--qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs213
-rw-r--r--qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs364
-rw-r--r--qpid/dotnet/Qpid.Common/Framing/ContentBody.cs36
-rw-r--r--qpid/dotnet/Qpid.Common/Framing/FieldTable.cs29
-rw-r--r--qpid/dotnet/Qpid.Messaging/IChannel.cs150
-rw-r--r--qpid/dotnet/Qpid.Messaging/IHeaders.cs2
-rw-r--r--qpid/dotnet/Qpid.Messaging/IMessage.cs76
43 files changed, 2197 insertions, 1253 deletions
diff --git a/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs b/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs
index 290e908a0d..aa675e3014 100644
--- a/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs
+++ b/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs
@@ -120,6 +120,9 @@ namespace Qpid.Buffer.Tests
Assert.AreEqual(10, buffer.Limit);
Assert.AreEqual(2, buffer.Position);
Assert.AreEqual(8, buffer.Remaining);
+ buffer.Rewind();
+ Assert.AreEqual((byte)0x02, buffer.GetByte());
+ Assert.AreEqual((byte)0x03, buffer.GetByte());
}
[Test]
@@ -326,3 +329,4 @@ namespace Qpid.Buffer.Tests
} // class SimpleByteBufferTests
}
+
diff --git a/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs b/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs
index 1bfc749746..b341fa7632 100644
--- a/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs
+++ b/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs
@@ -809,7 +809,7 @@ namespace Qpid.Buffer
/// </summary>
/// <param name="position">Position to read from</param>
/// <returns>The value at the position</returns>
- public char getChar(int position)
+ public char GetChar(int position)
{
return (char)GetUInt16(position);
}
@@ -941,7 +941,7 @@ namespace Qpid.Buffer
{
if ( position + length > Limit )
{
- throw new BufferUnderflowException("Attempt to read " + length + " byte(s) to buffer where position is " + _position +
+ throw new BufferUnderflowException("Attempt to read " + length + " byte(s) to buffer where position is " + position +
" and limit is " + Limit);
}
}
@@ -954,7 +954,7 @@ namespace Qpid.Buffer
}
if ( position + length > Limit )
{
- throw new BufferOverflowException("Attempt to write " + length + " byte(s) to buffer where position is " + _position +
+ throw new BufferOverflowException("Attempt to write " + length + " byte(s) to buffer where position is " + position +
" and limit is " + Limit);
}
}
diff --git a/qpid/dotnet/Qpid.Client.Tests/BrokerDetails/BrokerDetailsTest.cs b/qpid/dotnet/Qpid.Client.Tests/BrokerDetails/BrokerDetailsTest.cs
new file mode 100644
index 0000000000..8bc615bd20
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/BrokerDetails/BrokerDetailsTest.cs
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Net;
+using NUnit.Framework;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Tests.BrokerDetails
+{
+ [TestFixture]
+ public class BrokerDetailsTest
+ {
+
+ [Test]
+ public void ValidateBrokerInfoEqualsMethod()
+ {
+ AmqBrokerInfo broker = new AmqBrokerInfo("amqp", "localhost", 5672, true);
+ AmqBrokerInfo broker1 = new AmqBrokerInfo("Amqp", "localhost", 5672, true);
+
+ Assert.IsTrue(broker.Equals(broker1),"The two AmqBrokerInfo objects are not equals");
+ Console.WriteLine(string.Format("The object broker: {0} and broker1: {1} are equals", broker, broker1));
+ }
+
+ [Test]
+ public void ValidateBrokerInfoWithDifferentSSL()
+ {
+ AmqBrokerInfo broker = new AmqBrokerInfo("amqp", "localhost", 5672, true);
+ AmqBrokerInfo broker1 = new AmqBrokerInfo("amqp", "localhost", 5672, false);
+
+ Assert.IsFalse(broker.Equals(broker1), "The two AmqBrokerInfo objects are equals");
+ Console.WriteLine(string.Format("The object broker: {0} and broker1: {1} are not equals", broker, broker1));
+ }
+
+ [Test]
+ public void ValidateBrokerInfoFromToString()
+ {
+ String url = "tcp://localhost:5672?timeout='200',immediatedelivery='true'";
+
+ AmqBrokerInfo broker = new AmqBrokerInfo(url);
+ AmqBrokerInfo broker1 = new AmqBrokerInfo(broker.ToString());
+
+ Assert.AreEqual(broker.GetOption("timeout"), broker1.GetOption("timeout"));
+ Assert.AreEqual(broker.GetOption("immediatedelivery"), broker1.GetOption("immediatedelivery"));
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelMessageCreationTests.cs b/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelMessageCreationTests.cs
new file mode 100644
index 0000000000..40ba1dd25a
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelMessageCreationTests.cs
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+
+using log4net;
+using NUnit.Framework;
+using Qpid.Client;
+using Qpid.Client.Message;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests.Channel
+{
+ /// <summary>
+ /// Test that channels can create messages correctly
+ /// </summary>
+ [TestFixture]
+ public class ChannelMessageCreationTests
+ {
+ [Test]
+ public void CanCreateTextMessage()
+ {
+ IChannel channel = AmqChannel.CreateDisconnectedChannel();
+ ITextMessage msg = channel.CreateTextMessage();
+ Assert.IsNotNull(msg);
+ }
+ [Test]
+ public void CanCreateTextMessageWithContent()
+ {
+ IChannel channel = AmqChannel.CreateDisconnectedChannel();
+ const string CONTENT = "1234567890";
+ ITextMessage msg = channel.CreateTextMessage(CONTENT);
+ Assert.IsNotNull(msg);
+ Assert.AreEqual(CONTENT, msg.Text);
+ }
+ [Test]
+ public void CanCreateBytesMessage()
+ {
+ IChannel channel = AmqChannel.CreateDisconnectedChannel();
+ IBytesMessage msg = channel.CreateBytesMessage();
+ Assert.IsNotNull(msg);
+ }
+ [Test]
+ public void CanCreateMessage()
+ {
+ IChannel channel = AmqChannel.CreateDisconnectedChannel();
+ IMessage msg = channel.CreateMessage();
+ Assert.IsNotNull(msg);
+ }
+ [Test]
+ public void CanCreateMessageFromMimeType()
+ {
+ IChannel channel = AmqChannel.CreateDisconnectedChannel();
+ IMessage msg = channel.CreateMessage("text/xml");
+ Assert.IsNotNull(msg);
+ Assert.IsInstanceOfType(typeof(ITextMessage), msg);
+ }
+ }
+} // namespace Qpid.Client.Tests.Channel
+
diff --git a/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs b/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
new file mode 100644
index 0000000000..88a056a245
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Net;
+using System.Threading;
+using log4net;
+using Qpid.Client.Qms;
+using Qpid.Messaging;
+using NUnit.Framework;
+
+namespace Qpid.Client.Tests.Channel
+{
+ /// <summary>
+ /// Test the queue methods
+ /// </summary>
+ [TestFixture]
+ public class ChannelQueueTest
+ {
+
+ private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+ const string _routingKey = "ServiceQ1";
+
+ private ExceptionListenerDelegate _exceptionDelegate;
+ private AutoResetEvent _evt = new AutoResetEvent(false);
+ private Exception _lastException = null;
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+ private IChannel _channel;
+ private IConnection _connection;
+
+ private string _queueName;
+
+ [SetUp]
+ public virtual void Init()
+ {
+ _logger.Info("public virtual void Init(): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI);
+ _connection = new AMQConnection(connectionInfo);
+ _logger.Info("Starting...");
+
+ // Register this to listen for exceptions on the test connection.
+ _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+ _connection.ExceptionListener += _exceptionDelegate;
+
+ // Establish a session on the broker.
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Create a durable, non-temporary, non-exclusive queue.
+ _queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(_queueName, true, false, false);
+
+ _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey);
+
+ // Clear the most recent message and exception.
+ _lastException = null;
+ }
+
+ [TearDown]
+ public virtual void ShutDown()
+ {
+ _logger.Info("public virtual void Shutdown(): called");
+
+ if (_connection != null)
+ {
+ _logger.Info("Disposing connection.");
+ _connection.Dispose();
+ _logger.Info("Connection disposed.");
+ }
+ }
+
+ [Test]
+ public void DeleteInExistentQueue()
+ {
+ try
+ {
+ _channel.DeleteQueue("Q1", false, false, true);
+ _logger.Info("queue deleted");
+ }
+ catch (AMQException e)
+ {
+ _logger.Info(e.ToString());
+ }
+ }
+
+ [Test]
+ public void DeleteUsedQueue()
+ {
+ // Create the consumer
+ _consumer = _channel.CreateConsumerBuilder(_queueName)
+ .WithPrefetchLow(100)
+ .Create();
+ _logger.Info("Consumer was created...");
+
+ // delete the queue
+ _channel.DeleteQueue(_queueName, false, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteUnUsedQueue()
+ {
+ // delete the queue
+ _channel.DeleteQueue(_queueName, true, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteNonEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+ SendTestMessage("Message 1");
+
+ try
+ {
+ _channel.DeleteQueue(_queueName, true, false, true);
+ }
+ catch (AMQException)
+ {
+ Assert.Fail("The test fails");
+ }
+ }
+
+ [Test]
+ public void DeleteEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ // delete an empty queue with ifEmpty = true
+ _channel.DeleteQueue(_queueName, false, true, true);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteQueueWithResponse()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ // delete the queue, the server must respond
+ _channel.DeleteQueue(_queueName, false, false, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithOutResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, true);
+ }
+
+
+ /// <summary>
+ /// Callback method to handle any exceptions raised by the test connection.</summary> ///
+ /// <param name="e">The connection exception.</param>
+ public void OnException(Exception e)
+ {
+ // Preserve the most recent exception in case test cases need to examine it.
+ _lastException = e;
+
+ // Notify any waiting threads that an exception event has occurred.
+ _evt.Set();
+ }
+
+ /// <summary>
+ /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+ /// depending on whether or not the message should be received by the consumer.
+ ///
+ /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+ /// </summary>
+ ///
+ /// <param name="msgSend">The message to send.</param>
+ private void SendTestMessage(string msg)
+ {
+ // create the IMessage object
+ IMessage msgSend = _channel.CreateTextMessage(msg);
+
+ // send the message
+ _publisher.Send(msgSend);
+ _logger.InfoFormat("The messages \"{0}\" was sent", msg);
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client.Tests/Messages/MessageFactoryRegistryTests.cs b/qpid/dotnet/Qpid.Client.Tests/Messages/MessageFactoryRegistryTests.cs
new file mode 100644
index 0000000000..421d0d4e02
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/Messages/MessageFactoryRegistryTests.cs
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+
+using log4net;
+using NUnit.Framework;
+using Qpid.Messaging;
+using Qpid.Client.Message;
+
+namespace Qpid.Client.Tests.Messages
+{
+ /// <summary>
+ /// Ensure a factory creates messages correctly
+ /// </summary>
+ [TestFixture]
+ public class MessageFactoryRegistryTests
+ {
+ const string TEXT_PLAIN = "text/plain";
+ const string TEXT_XML = "text/xml";
+ const string OCTET_STREAM = "application/octet-stream";
+
+ /// <summary>
+ /// Check default registry can create text/plain messages
+ /// </summary>
+ [Test]
+ public void CanCreateTextPlain()
+ {
+ MessageFactoryRegistry registry =
+ MessageFactoryRegistry.NewDefaultRegistry();
+
+ IMessage message = registry.CreateMessage(TEXT_PLAIN);
+ Assert.IsNotNull(message);
+ Assert.AreEqual(TEXT_PLAIN, message.ContentType);
+ Assert.IsInstanceOfType(typeof(QpidTextMessage), message);
+ }
+ /// <summary>
+ /// Check default registry can create text/xml messages
+ /// </summary>
+ [Test]
+ public void CanCreateTextXml()
+ {
+ MessageFactoryRegistry registry =
+ MessageFactoryRegistry.NewDefaultRegistry();
+
+ IMessage message = registry.CreateMessage(TEXT_XML);
+ Assert.IsNotNull(message);
+ Assert.AreEqual(TEXT_XML, message.ContentType);
+ Assert.IsInstanceOfType(typeof(QpidTextMessage), message);
+ }
+ /// <summary>
+ /// Check default registry can create application/octet-stream messages
+ /// </summary>
+ [Test]
+ public void CanCreateBinary()
+ {
+ MessageFactoryRegistry registry =
+ MessageFactoryRegistry.NewDefaultRegistry();
+
+ IMessage message = registry.CreateMessage(OCTET_STREAM);
+ Assert.IsNotNull(message);
+ Assert.AreEqual(OCTET_STREAM, message.ContentType);
+ Assert.IsInstanceOfType(typeof(QpidBytesMessage), message);
+ }
+ /// <summary>
+ /// Check default registry can create messages for unknown types
+ /// </summary>
+ [Test]
+ public void CanCreateUnknownType()
+ {
+ MessageFactoryRegistry registry =
+ MessageFactoryRegistry.NewDefaultRegistry();
+
+ const string OTHER = "application/unknown";
+ IMessage message = registry.CreateMessage(OTHER);
+ Assert.IsNotNull(message);
+ Assert.AreEqual(OTHER, message.ContentType);
+ Assert.IsInstanceOfType(typeof(QpidBytesMessage), message);
+ }
+ /// <summary>
+ /// Check that text messages default to UTF-8 encoding
+ /// </summary>
+ [Test]
+ public void TextMessagesDefaultToUTF8Encoding()
+ {
+ MessageFactoryRegistry registry =
+ MessageFactoryRegistry.NewDefaultRegistry();
+
+ IMessage message = registry.CreateMessage(TEXT_PLAIN);
+ Assert.AreEqual("utf-8", message.ContentEncoding.ToLower());
+ }
+
+ }
+} // namespace Qpid.Client.Tests.Messages
+
diff --git a/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs b/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
index 687f08eeef..fd1400d9d8 100644
--- a/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
+++ b/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
@@ -38,7 +38,7 @@ namespace Qpid.Client.Tests
private const int MESSAGE_COUNT = 1000;
- private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+ private const string MESSAGE_DATA_BYTES = "****jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
AutoResetEvent _finishedEvent = new AutoResetEvent(false);
@@ -100,7 +100,9 @@ namespace Qpid.Client.Tests
_logger.Info("All messages received");
_finishedEvent.Set();
}
- }
+ if ( newCount % 100 == 0 )
+ System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text);
+ }
[Test]
public void RunTest()
@@ -110,7 +112,7 @@ namespace Qpid.Client.Tests
ITextMessage msg;
try
{
- msg = _channel.CreateTextMessage(GetData(512 + 8*i));
+ msg = _channel.CreateTextMessage(GetData(512 + 8*i));
}
catch (Exception e)
{
diff --git a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
index 819d43b5b0..cbe632ace9 100644
--- a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
+++ b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
@@ -44,7 +44,10 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
- <Compile Include="bio\BlockingIo.cs" />
+ <Compile Include="BrokerDetails\BrokerDetailsTest.cs" />
+ <Compile Include="Channel\ChannelMessageCreationTests.cs" />
+ <Compile Include="Channel\ChannelQueueTest.cs" />
+ <Compile Include="Messages\MessageFactoryRegistryTests.cs" />
<Compile Include="connection\ConnectionTest.cs" />
<Compile Include="connection\SslConnectionTest.cs" />
<Compile Include="failover\FailoverTest.cs" />
diff --git a/qpid/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs b/qpid/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs
deleted file mode 100644
index 24f3299dae..0000000000
--- a/qpid/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using NUnit.Framework;
-using Qpid.Client.Protocol;
-using Qpid.Framing;
-using Qpid.Messaging;
-
-namespace Qpid.Client.Transport
-{
- [TestFixture]
- public class BlockingIo
- {
- [Test]
- public void connectFromOutside()
- {
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- AMQConnection connection = new AMQConnection(connectionInfo);
- ProtocolWriter protocolWriter = connection.ConvenientProtocolWriter;
-
- // TODO: Open channels and handle them simultaneously.
- // Need more thread here?
- // Send ChannelOpen.
- ushort channelId = 1;
- protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
-
- connection.Close();
- }
-
- [Test]
- public void regularConnection()
- {
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- using (IConnection connection = new AMQConnection(connectionInfo)) {
- Console.WriteLine("connection = {0}", connection);
- Thread.Sleep(2000);
- }
- }
-
- [Test]
- public void connectionAndSleepForHeartbeats()
- {
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- using (IConnection connection = new AMQConnection(connectionInfo))
- {
- Console.WriteLine("connection = {0}", connection);
- Thread.Sleep(60000);
- }
- }
- }
-}
diff --git a/qpid/dotnet/Qpid.Client.Tests/log4net.config b/qpid/dotnet/Qpid.Client.Tests/log4net.config
index e5340a1500..4346e0eaeb 100644
--- a/qpid/dotnet/Qpid.Client.Tests/log4net.config
+++ b/qpid/dotnet/Qpid.Client.Tests/log4net.config
@@ -49,8 +49,12 @@
<level value="info" />
<appender-ref ref="ioLog"/>
</logger>
+ <logger name="Qpid.Framing.FieldTable" additivity="false">
+ <level value="debug" />
+ <appender-ref ref="console"/>
+ </logger>
- <root>
+ <root>
<appender-ref ref="console"/>
<appender-ref ref="UdpAppender"/>
<appender-ref ref="filelog"/>
diff --git a/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs b/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
index 515ae41e1c..ad5981a5c5 100644
--- a/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
+++ b/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
@@ -35,16 +35,15 @@ namespace Qpid.Client.Tests
private string _replyToExchangeName;
private string _replyToRoutingKey;
+ const int PACK = 100;
private IMessagePublisher _destinationPublisher;
+ private IMessageConsumer _consumer;
private string _serviceName = "ServiceQ1";
private string _selector = null;
- //private EventWaitHandle _event = new ManualResetEvent(false);
- private AutoResetEvent _event = new AutoResetEvent(false);
-
[SetUp]
public override void Init()
{
@@ -59,36 +58,38 @@ namespace Qpid.Client.Tests
_channel.DeclareQueue(_serviceName, false, false, false);
- IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName)
+ _consumer = _channel.CreateConsumerBuilder(_serviceName)
.WithPrefetchLow(100)
.WithPrefetchHigh(500)
.WithNoLocal(true)
.Create();
- consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
+
+ public override void Shutdown()
+ {
+ _consumer.Dispose();
+ base.Shutdown();
}
private void OnConnectionException(Exception e)
{
_logger.Info("Connection exception occurred", e);
- _event.Set(); // Shutdown test on error
// XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat?
}
[Test]
- public void TestFail()
- {
- Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
- }
-
- /*[Test]
public void Test()
{
_connection.Start();
_logger.Info("Waiting...");
- _event.WaitOne();
- }*/
- public void OnMessage(IMessage message)
+ ServiceRequestingClient client = new ServiceRequestingClient();
+ client.Init();
+ client.SendMessages();
+ }
+
+ private void OnMessage(IMessage message)
{
// _logger.Info("Got message '" + message + "'");
@@ -109,9 +110,9 @@ namespace Qpid.Client.Tests
_destinationPublisher = _channel.CreatePublisherBuilder()
.WithExchangeName(_replyToExchangeName)
.WithRoutingKey(_replyToRoutingKey)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
.Create();
_destinationPublisher.DisableMessageTimestamp = true;
- _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent;
_logger.Debug("After create a producer");
}
catch (QpidException e)
@@ -120,7 +121,7 @@ namespace Qpid.Client.Tests
throw e;
}
_messageCount++;
- if (_messageCount % 1000 == 0)
+ if (_messageCount % PACK == 0)
{
_logger.Info("Received message total: " + _messageCount);
_logger.Info(string.Format("Sending response to '{0}:{1}'",
@@ -129,25 +130,20 @@ namespace Qpid.Client.Tests
try
{
- String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
- ITextMessage msg = _channel.CreateTextMessage(payload);
- if (tm.Headers.Contains("timeSent"))
- {
-// _logger.Info("timeSent property set on message");
-// _logger.Info("timeSent value is: " + tm.Headers["timeSent"]);
- msg.Headers["timeSent"] = tm.Headers["timeSent"];
- }
- _destinationPublisher.Send(msg);
- if (_messageCount % 1000 == 0)
- {
- _logger.Info(string.Format("Sending response to '{0}:{1}'",
- _replyToExchangeName, _replyToRoutingKey));
- }
- }
- catch (QpidException e)
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
+ ITextMessage msg = _channel.CreateTextMessage(payload);
+ if ( tm.Headers.Contains("timeSent") )
+ {
+ msg.Headers["timeSent"] = tm.Headers["timeSent"];
+ }
+ _destinationPublisher.Send(msg);
+ } catch ( QpidException e )
{
- _logger.Error("Error sending message: " + e, e);
- throw e;
+ _logger.Error("Error sending message: " + e, e);
+ throw e;
+ } finally
+ {
+ _destinationPublisher.Dispose();
}
}
}
diff --git a/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs b/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
index 4479d767ea..8264879c1f 100644
--- a/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
+++ b/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
@@ -26,18 +26,18 @@ using Qpid.Messaging;
namespace Qpid.Client.Tests
{
- [TestFixture]
public class ServiceRequestingClient : BaseMessagingTestFixture
{
private const int MESSAGE_SIZE = 1024;
private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
- private const int NUM_MESSAGES = 10000;
+ private const int PACK = 100;
+ private const int NUM_MESSAGES = PACK*10; // increase when in standalone
private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
- AutoResetEvent _finishedEvent = new AutoResetEvent(false);
-
+ ManualResetEvent _finishedEvent = new ManualResetEvent(false);
+
private int _expectedMessageCount = NUM_MESSAGES;
private long _startTime = 0;
@@ -54,9 +54,9 @@ namespace Qpid.Client.Tests
{
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(_commandQueueName)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
.Create();
_publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
- _publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX: need a "with" for this in builder?
}
catch (QpidException e)
{
@@ -64,7 +64,7 @@ namespace Qpid.Client.Tests
}
}
- /*[Test]
+ [Test]
public void SendMessages()
{
InitialiseProducer();
@@ -100,47 +100,18 @@ namespace Qpid.Client.Tests
// Added timestamp.
long timeNow = DateTime.Now.Ticks;
string timeSentString = String.Format("{0:G}", timeNow);
-// _log.Info(String.Format("timeSent={0} timeSentString={1}", timeNow, timeSentString));
- msg.Headers.SetString("timeSent", timeSentString);
- //msg.Headers.SetLong("sentAt", timeNow);
+ msg.Headers.SetLong("timeSent", timeNow);
- try
- {
- _publisher.Send(msg);
- }
- catch (Exception e)
- {
- _log.Error("Error sending message: " + e, e);
- //base._port = 5673;
- _log.Info("Reconnecting but on port 5673");
- try
- {
- base.Init();
- InitialiseProducer();
- // cheesy but a quick test
- _log.Info("Calling SendMessages again");
- SendMessages();
- }
- catch (Exception ex)
- {
- _log.Error("Totally busted: failed to reconnect: " + ex, ex);
- }
- }
+ _publisher.Send(msg);
}
// Assert that the test finishes within a reasonable amount of time.
- const int waitSeconds = 10;
+ const int waitSeconds = 40;
const int waitMilliseconds = waitSeconds * 1000;
_log.Info("Finished sending " + _expectedMessageCount + " messages");
_log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
String.Format("Expected to finish in {0} seconds", waitSeconds));
- }*/
-
- [Test]
- public void TestFail()
- {
- Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
}
public void OnMessage(IMessage m)
@@ -150,23 +121,19 @@ namespace Qpid.Client.Tests
_log.Debug("Message received: " + m);
}
- //if (m.Headers.Contains("sentAt"))
if (!m.Headers.Contains("timeSent"))
{
throw new Exception("Set timeSent!");
}
- //long sentAt = m.Headers.GetLong("sentAt");
- long sentAt = Int64.Parse(m.Headers.GetString("timeSent"));
+
+ long sentAt = m.Headers.GetLong("timeSent");
long now = DateTime.Now.Ticks;
long latencyTicks = now - sentAt;
-// _log.Info(String.Format("latency = {0} ticks ", latencyTicks));
long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
-// _log.Info(String.Format("latency = {0} ms", latencyMilliseconds));
averager.Add(latencyMilliseconds);
- // Output average every 1000 messages.
- if (averager.Num % 1000 == 0)
+ if (averager.Num % PACK == 0)
{
_log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
_log.Info(String.Format("Average latency (ms) = {0}", averager));
@@ -185,13 +152,6 @@ namespace Qpid.Client.Tests
_finishedEvent.Set(); // Notify main thread to quit.
}
}
-
- /*public static void Main(String[] args)
- {
- ServiceRequestingClient c = new ServiceRequestingClient();
- c.Init();
- c.SendMessages();
- }*/
}
class Avergager
diff --git a/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs b/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs
index 4ab6dd5736..b3ee0272b4 100644
--- a/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs
+++ b/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs
@@ -404,5 +404,43 @@ namespace Qpid.Client.Tests.url
Assert.AreEqual("Unterminated option", e.Message);
}
}
+
+ [Test]
+ public void ValidateQpidConnectionInfoFromToString()
+ {
+ String url = "amqp://ritchiem:bob@default/temp?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(url);
+ IConnectionInfo connectionInfo1 = QpidConnectionInfo.FromUrl(connectionInfo.ToString());
+
+ Console.WriteLine(connectionInfo.ToString());
+ Console.WriteLine(connectionInfo1.ToString());
+
+ Assert.AreEqual(connectionInfo.Username, connectionInfo1.Username);
+ Assert.AreEqual(connectionInfo.Password, connectionInfo1.Password);
+ Assert.AreEqual(connectionInfo.VirtualHost, connectionInfo1.VirtualHost);
+
+ Assert.IsTrue((connectionInfo1.GetAllBrokerInfos().Count == 2));
+ Assert.IsTrue(connectionInfo.GetBrokerInfo(0).Equals(connectionInfo1.GetBrokerInfo(0)));
+ Assert.IsTrue(connectionInfo.GetBrokerInfo(1).Equals(connectionInfo1.GetBrokerInfo(1)));
+
+ }
+
+ [Test]
+ public void EnsureVirtualHostStartsWithSlash()
+ {
+ IConnectionInfo connection = new QpidConnectionInfo();
+ connection.VirtualHost = "test";
+ Assert.AreEqual("/test", connection.VirtualHost);
+
+ connection.VirtualHost = "/mytest";
+ Assert.AreEqual("/mytest", connection.VirtualHost);
+
+ connection.VirtualHost = "";
+ Assert.AreEqual("/", connection.VirtualHost);
+
+ connection.VirtualHost = null;
+ Assert.AreEqual("/", connection.VirtualHost);
+ }
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
index 9ae1a49473..90e3788f5a 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
@@ -260,7 +260,7 @@ namespace Qpid.Client
sb.Append(_transport);
sb.Append("://");
- if (!(_transport.ToLower().Equals("vm")))
+ if (!(StringEqualsIgnoreCase(_transport, "vm")))
{
sb.Append(_host);
}
@@ -268,8 +268,7 @@ namespace Qpid.Client
sb.Append(':');
sb.Append(_port);
- // XXX
-// sb.Append(printOptionsURL());
+ sb.Append(URLHelper.printOptions(_options));
return sb.ToString();
}
@@ -284,7 +283,8 @@ namespace Qpid.Client
IBrokerInfo bd = (IBrokerInfo) obj;
return StringEqualsIgnoreCase(_host, bd.Host) &&
_port == bd.Port &&
- _transport == bd.Transport;
+ StringEqualsIgnoreCase(_transport, bd.Transport) &&
+ UseSSL == bd.UseSSL;
}
public override int GetHashCode()
@@ -298,35 +298,6 @@ namespace Qpid.Client
return one.ToLower().Equals(two.ToLower());
}
-// private string printOptionsURL()
-// {
-// stringBuffer optionsURL = new stringBuffer();
-//
-// optionsURL.Append('?');
-//
-// if (!(_options.isEmpty()))
-// {
-//
-// for (string key : _options.keySet())
-// {
-// optionsURL.Append(key);
-//
-// optionsURL.Append("='");
-//
-// optionsURL.Append(_options.get(key));
-//
-// optionsURL.Append("'");
-//
-// optionsURL.Append(URLHelper.DEFAULT_OPTION_SEPERATOR);
-// }
-// }
-//
-// //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
-// optionsURL.deleteCharAt(optionsURL.length() - 1);
-//
-// return optionsURL.tostring();
-// }
-
public bool UseSSL
{
get
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
index 3471ac3640..9a8b9f787a 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -36,7 +36,7 @@ namespace Qpid.Client
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
- private const int BASIC_CONTENT_TYPE = 60;
+ internal const int BASIC_CONTENT_TYPE = 60;
private static int _nextSessionNumber = 0;
@@ -122,7 +122,7 @@ namespace Qpid.Client
if (consumer == null)
{
- _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
}
else
{
@@ -156,103 +156,72 @@ namespace Qpid.Client
}
}
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) :
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch)
- {
- }
-
/// <summary>
/// Initializes a new instance of the <see cref="AmqChannel"/> class.
/// </summary>
- /// <param name="con">The con.</param>
+ /// <param name="con">The connection.</param>
/// <param name="channelId">The channel id.</param>
/// <param name="transacted">if set to <c>true</c> [transacted].</param>
/// <param name="acknowledgeMode">The acknowledge mode.</param>
- /// <param name="messageFactoryRegistry">The message factory registry.</param>
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
+ /// <param name="defaultPrefetch">Default prefetch value</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch)
+ : this()
+ {
+ _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+ _connection = con;
+ _transacted = transacted;
+ if ( transacted )
+ {
+ _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+ } else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
+ _channelId = channelId;
+ }
+
+ private AmqChannel()
{
- _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
- _connection = con;
- _transacted = transacted;
- if (transacted)
- {
- _acknowledgeMode = AcknowledgeMode.SessionTransacted;
- }
- else
- {
- _acknowledgeMode = acknowledgeMode;
- }
- _channelId = channelId;
- _messageFactoryRegistry = messageFactoryRegistry;
+ _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
+ }
+
+ /// <summary>
+ /// Create a disconnected channel that will fault
+ /// for most things, but is useful for testing
+ /// </summary>
+ /// <returns>A new disconnected channel</returns>
+ public static IChannel CreateDisconnectedChannel()
+ {
+ return new AmqChannel();
}
+
public IBytesMessage CreateBytesMessage()
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
- try
- {
- return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
}
public IMessage CreateMessage()
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
- try
- {
- // TODO: this is supposed to create a message consisting only of message headers
- return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ // TODO: this is supposed to create a message consisting only of message headers
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+
+ public IMessage CreateMessage(string mimeType)
+ {
+ return _messageFactoryRegistry.CreateMessage(mimeType);
}
public ITextMessage CreateTextMessage()
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
-
- try
- {
- return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ return CreateTextMessage(String.Empty);
}
public ITextMessage CreateTextMessage(string text)
{
- lock (_connection.FailoverMutex)
- {
- CheckNotClosed();
- try
- {
- ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
- msg.Text = text;
- return msg;
- }
- catch (AMQException e)
- {
- throw new QpidException("Unable to create message: " + e);
- }
- }
+ ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ msg.Text = text;
+ return msg;
}
public bool Transacted
@@ -538,11 +507,6 @@ namespace Qpid.Client
}
}
- public IFieldTable CreateFieldTable()
- {
- return new FieldTable();
- }
-
public void Unsubscribe(String name)
{
throw new NotImplementedException(); // FIXME
@@ -709,6 +673,30 @@ namespace Qpid.Client
// at this point the _consumers map will be empty
}
+ public void PurgeQueue(string queueName, bool noWait)
+ {
+ DoPurgeQueue(queueName, noWait);
+ }
+
+ private void DoPurgeQueue(string queueName, bool noWait)
+ {
+ try
+ {
+ _logger.DebugFormat("PurgeQueue {0}", queueName);
+
+ AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
+
+ if (noWait)
+ _connection.ProtocolWriter.Write(purgeQueue);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
+ }
+ catch (AMQException)
+ {
+ throw;
+ }
+ }
+
/**
* Replays frame on fail over.
*
@@ -784,132 +772,44 @@ namespace Qpid.Client
throw new NotImplementedException(); // FIXME
}
- public void DeleteQueue()
+ public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
- throw new NotImplementedException(); // FIXME
+ DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
}
- public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
- return new MessageConsumerBuilder(this, queueName);
- }
-
- public MessagePublisherBuilder CreatePublisherBuilder()
- {
- return new MessagePublisherBuilder(this);
- }
-
- internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate,
- AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
- bool disableTimestamps)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
-
- private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
- {
- AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName,
- routingKey, mandatory, immediate);
-
- long currentTime = 0;
- if (!disableTimestamps)
+ try
{
- currentTime = DateTime.UtcNow.Ticks;
- message.Timestamp = currentTime;
- }
+ _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
+
+ AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
+ queueName, // queueName
+ ifUnused, // IfUnUsed
+ ifEmpty, // IfEmpty
+ noWait); // NoWait
- ByteBuffer buf = message.Data;
- byte[] payload = null;
- if (buf != null)
- {
- payload = new byte[buf.Remaining];
- buf.GetBytes(payload);
- }
- BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
+ _replayFrames.Add(queueDelete);
- if (timeToLive > 0)
- {
- if (!disableTimestamps)
- {
- contentHeaderProperties.Expiration = currentTime + timeToLive;
- }
- }
- else
- {
- contentHeaderProperties.Expiration = 0;
- }
- contentHeaderProperties.SetDeliveryMode(deliveryMode);
- contentHeaderProperties.Priority = (byte)priority;
-
- ContentBody[] contentBodies = CreateContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
- for (int i = 0; i < contentBodies.Length; i++)
- {
- frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
- }
- if (contentBodies.Length > 0 && _logger.IsDebugEnabled)
- {
- _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ if (noWait)
+ _connection.ProtocolWriter.Write(queueDelete);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
}
-
- // weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties,
- (uint)payload.Length);
- if (_logger.IsDebugEnabled)
+ catch (AMQException)
{
- _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ throw;
}
+ }
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
- lock (_connection.FailoverMutex) {
- _connection.ProtocolWriter.Write(compositeFrame);
- }
+ public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ {
+ return new MessageConsumerBuilder(this, queueName);
}
- /// <summary>
- /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- /// maximum frame size.
- /// </summary>
- /// <param name="payload"></param>
- /// <returns>return the array of content bodies</returns>
- private ContentBody[] CreateContentBodies(byte[] payload)
+ public MessagePublisherBuilder CreatePublisherBuilder()
{
- if (payload == null)
- {
- return null;
- }
- else if (payload.Length == 0)
- {
- return new ContentBody[0];
- }
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- long framePayloadMax = Connection.MaximumFrameSize - 1;
- int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame;
- ContentBody[] bodies = new ContentBody[frameCount];
-
- if (frameCount == 1)
- {
- bodies[0] = new ContentBody();
- bodies[0].Payload = payload;
- }
- else
- {
- long remaining = payload.Length;
- for (int i = 0; i < bodies.Length; i++)
- {
- bodies[i] = new ContentBody();
- byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining];
- Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length);
- bodies[i].Payload = framePayload;
- remaining -= framePayload.Length;
- }
- }
- return bodies;
+ return new MessagePublisherBuilder(this);
}
public string GenerateUniqueName()
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
index 759ffd62e3..fd430694df 100644
--- a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -21,255 +21,359 @@
using System;
using System.Threading;
using log4net;
+using Qpid.Buffer;
using Qpid.Client.Message;
using Qpid.Messaging;
+using Qpid.Framing;
namespace Qpid.Client
{
- public class BasicMessageProducer : Closeable, IMessagePublisher
- {
- protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
- /// <summary>
- /// If true, messages will not get a timestamp.
- /// </summary>
- private bool _disableTimestamps;
-
- /// <summary>
- /// Priority of messages created by this producer.
- /// </summary>
- private int _messagePriority;
-
- /// <summary>
- /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
- ///
- private long _timeToLive;
-
- /// <summary>
- /// Delivery mode used for this producer.
- /// </summary>
- private DeliveryMode _deliveryMode;
-
- private bool _immediate;
- private bool _mandatory;
-
- string _exchangeName;
- string _routingKey;
-
- /// <summary>
- /// Default encoding used for messages produced by this producer.
- /// </summary>
- private string _encoding;
-
- /// <summary>
- /// Default encoding used for message produced by this producer.
- /// </summary>
- private string _mimeType;
-
- /// <summary>
- /// True if this producer was created from a transacted session
- /// </summary>
- private bool _transacted;
-
- private ushort _channelId;
-
- /// <summary>
- /// This is an id generated by the session and is used to tie individual producers to the session. This means we
- /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
- /// to the session so that when an error is propagated to the session it can close the producer (meaning that
- /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
- /// </summary>
- private long _producerId;
-
- /// <summary>
- /// The session used to create this producer
- /// </summary>
- private AmqChannel _channel;
-
- /// <summary>
- /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
- /// </summary>
- protected const bool DEFAULT_IMMEDIATE = false;
-
- /// <summary>
- /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
- /// connected to the exchange for the message
- /// </summary>
- protected const bool DEFAULT_MANDATORY = true;
-
- public BasicMessageProducer(string exchangeName, string routingKey,
- bool transacted,
- ushort channelId,
- AmqChannel channel,
- long producerId,
- DeliveryMode deliveryMode,
- long timeToLive,
- bool immediate,
- bool mandatory,
- int priority)
- {
- _exchangeName = exchangeName;
- _routingKey = routingKey;
- _transacted = transacted;
- _channelId = channelId;
- _channel = channel;
- _producerId = producerId;
- _deliveryMode = deliveryMode;
- _timeToLive = timeToLive;
- _immediate = immediate;
- _mandatory = mandatory;
- _messagePriority = priority;
-
- _channel.RegisterProducer(producerId, this);
- }
-
-
- #region IMessagePublisher Members
-
- public DeliveryMode DeliveryMode
- {
- get
- {
- CheckNotClosed();
- return _deliveryMode;
- }
- set
- {
- CheckNotClosed();
- _deliveryMode = value;
- }
- }
-
- public string ExchangeName
- {
- get { return _exchangeName; }
- }
-
- public string RoutingKey
- {
- get { return _routingKey; }
- }
-
- public bool DisableMessageID
- {
- get
- {
- throw new Exception("The method or operation is not implemented.");
- }
- set
- {
- throw new Exception("The method or operation is not implemented.");
- }
- }
-
- public bool DisableMessageTimestamp
- {
- get
- {
- CheckNotClosed();
- return _disableTimestamps;
- }
- set
- {
- CheckNotClosed();
- _disableTimestamps = value;
- }
- }
-
- public int Priority
- {
- get
- {
- CheckNotClosed();
- return _messagePriority;
- }
- set
- {
- CheckNotClosed();
- if (value < 0 || value > 9)
- {
- throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
- }
- _messagePriority = value;
- }
- }
-
- public override void Close()
- {
- _logger.Debug("Closing producer " + this);
- Interlocked.Exchange(ref _closed, CLOSED);
- _channel.DeregisterProducer(_producerId);
- }
-
- public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
- {
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ ///
+ private long _timeToLive;
+
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+
+ private bool _immediate;
+ private bool _mandatory;
+
+ string _exchangeName;
+ string _routingKey;
+
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+
+ private ushort _channelId;
+
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+
+ /// <summary>
+ /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
+ /// </summary>
+ protected const bool DEFAULT_IMMEDIATE = false;
+
+ /// <summary>
+ /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
+ /// connected to the exchange for the message
+ /// </summary>
+ protected const bool DEFAULT_MANDATORY = true;
+
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+
+ _channel.RegisterProducer(producerId, this);
+ }
+
+
+ #region IMessagePublisher Members
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
- DEFAULT_IMMEDIATE);
- }
-
- public void Send(IMessage msg)
- {
+ return _deliveryMode;
+ }
+ set
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
- DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
- }
-
- // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
- // to facilitate publishing messages to potentially non-existent recipients.
- public void Send(IMessage msg, bool mandatory)
- {
+ _deliveryMode = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
CheckNotClosed();
- SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
- mandatory, DEFAULT_IMMEDIATE);
- }
-
- public long TimeToLive
- {
- get
- {
- CheckNotClosed();
- return _timeToLive;
- }
- set
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 || value > 9 )
{
- CheckNotClosed();
- if (value < 0)
- {
- throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
- }
- _timeToLive = value;
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
}
- }
-
- #endregion
-
- private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
- {
- _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps);
- }
-
- public string MimeType
- {
- set
+ _messagePriority = value;
+ }
+ }
+
+ public override void Close()
+ {
+ _logger.Debug("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
+ DEFAULT_IMMEDIATE);
+ }
+
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ }
+
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ mandatory, DEFAULT_IMMEDIATE);
+ }
+
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 )
{
- CheckNotClosed();
- _mimeType = value;
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
}
- }
+ _timeToLive = value;
+ }
+ }
- public string Encoding
- {
- set
- {
- CheckNotClosed();
- _encoding = value;
- }
- }
+ #endregion
- public void Dispose()
- {
- Close();
- }
- }
+ public string MimeType
+ {
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+
+ public string Encoding
+ {
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #region Message Publishing
+
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ // todo: handle session access ticket
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+ _channel.ChannelId, 0, exchangeName,
+ routingKey, mandatory, immediate
+ );
+
+ // fix message properties
+ if ( !_disableTimestamps )
+ {
+ message.Timestamp = DateTime.UtcNow.Ticks;
+ message.Expiration = message.Timestamp + timeToLive;
+ } else
+ {
+ message.Expiration = 0;
+ }
+ message.DeliveryMode = deliveryMode;
+ message.Priority = (byte)priority;
+
+ ByteBuffer payload = message.Data;
+ int payloadLength = payload.Limit;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for ( int i = 0; i < contentBodies.Length; i++ )
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+ _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
+ message.ContentHeaderProperties, (uint)payloadLength
+ );
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ lock ( _channel.Connection.FailoverMutex )
+ {
+ _channel.Connection.ProtocolWriter.Write(compositeFrame);
+ }
+ }
+
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(ByteBuffer payload)
+ {
+ if ( payload == null )
+ {
+ return null;
+ } else if ( payload.Remaining == 0 )
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+ int frameCount = CalculateContentBodyFrames(payload);
+ ContentBody[] bodies = new ContentBody[frameCount];
+ for ( int i = 0; i < frameCount; i++ )
+ {
+ int length = (payload.Remaining >= framePayloadMax)
+ ? framePayloadMax : payload.Remaining;
+ bodies[i] = new ContentBody(payload, (uint)length);
+ }
+ return bodies;
+ }
+
+ private int CalculateContentBodyFrames(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account
+ // for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ( (payload == null) || (payload.Remaining == 0) )
+ {
+ frameCount = 0;
+ } else
+ {
+ int dataLength = payload.Remaining;
+ int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+ #endregion // Message Publishing
+ }
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
index 41e9d2240c..afcbd26781 100644
--- a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
@@ -45,14 +45,12 @@ namespace Qpid.Client.Handler
_logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
parameters.FrameMax = frame.FrameMax;
- parameters.FrameMax = 65535;
- //params.setChannelMax(frame.channelMax);
parameters.Heartbeat = frame.Heartbeat;
session.ConnectionTuneParameters = parameters;
stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED);
session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame(
- evt.ChannelId, frame.ChannelMax, 65535, frame.Heartbeat));
+ evt.ChannelId, frame.ChannelMax, frame.FrameMax, frame.Heartbeat));
session.WriteFrame(ConnectionOpenBody.CreateAMQFrame(
evt.ChannelId, session.AMQConnection.VirtualHost, null, true));
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
new file mode 100644
index 0000000000..9a67b69834
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class QueueDeleteOkMethodHandler : IStateAwareMethodListener
+ {
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
new file mode 100644
index 0000000000..59e7db34ab
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class QueuePurgeOkMethodHandler : IStateAwareMethodListener
+ {
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
index 75c4edd67d..a7ee085a04 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -27,7 +27,7 @@ namespace Qpid.Client.Message
{
public abstract class AbstractQmsMessageFactory : IMessageFactory
{
- public abstract AbstractQmsMessage CreateMessage();
+ public abstract AbstractQmsMessage CreateMessage(string mimeType);
private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory));
@@ -43,7 +43,7 @@ namespace Qpid.Client.Message
if (bodies != null && bodies.Count == 1)
{
_logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")");
- data = ByteBuffer.Wrap(((ContentBody)bodies[0]).Payload);
+ data = ((ContentBody)bodies[0]).Payload;
}
else
{
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
index 1d2b2db3ca..8e90e852dd 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -72,24 +72,29 @@ namespace Qpid.Client.Message
#endregion
+ #region Properties
+ //
+ // Properties
+ //
+
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
public string MessageId
{
- get
- {
+ get {
if (ContentHeaderProperties.MessageId == null)
{
ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
}
return ContentHeaderProperties.MessageId;
}
- set
- {
- ContentHeaderProperties.MessageId = value;
- }
-
-
- }
+ set { ContentHeaderProperties.MessageId = value; }
+ }
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
public long Timestamp
{
get
@@ -103,36 +108,22 @@ namespace Qpid.Client.Message
}
}
- protected void CheckReadable()
- {
- if (!_readableMessage)
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
- }
-
+ /// <summary>
+ /// The <see cref="CorrelationId"/> as a byte array.
+ /// </summary>
public byte[] CorrelationIdAsBytes
{
- get
- {
- return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId);
- }
- set
- {
- ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value);
- }
+ get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
+ set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
}
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
public string CorrelationId
{
- get
- {
- return ContentHeaderProperties.CorrelationId;
- }
- set
- {
- ContentHeaderProperties.ContentType = value;
- }
+ get { return ContentHeaderProperties.CorrelationId; }
+ set { ContentHeaderProperties.CorrelationId = value; }
}
struct Dest
@@ -147,6 +138,9 @@ namespace Qpid.Client.Message
}
}
+ /// <summary>
+ /// Exchange name of the reply-to address
+ /// </summary>
public string ReplyToExchangeName
{
get
@@ -162,6 +156,9 @@ namespace Qpid.Client.Message
}
}
+ /// <summary>
+ /// Routing key of the reply-to address
+ /// </summary>
public string ReplyToRoutingKey
{
get
@@ -177,50 +174,11 @@ namespace Qpid.Client.Message
}
}
- /// <summary>
- /// Decodes the replyto field if one is set.
- ///
- /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
- /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
- /// empty the replyto field is expected to being with ':'.
- ///
- /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
- /// </summary>
- ///
- /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
- private Dest ReadReplyToHeader()
- {
- string replyToEncoding = ContentHeaderProperties.ReplyTo;
-
- if (replyToEncoding == null)
- {
- return new Dest();
- }
- else
- {
- // Split the replyto field on a ':'
- string[] split = replyToEncoding.Split(':');
- // Ensure that the replyto field argument only consisted of two parts.
- if (split.Length != 2)
- {
- throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
- }
-
- // Extract the exchange name and routing key from the split replyto field.
- string exchangeName = split[0];
- string routingKey = split[1];
-
- return new Dest(exchangeName, routingKey);
- }
- }
-
- private void WriteReplyToHeader(Dest dest)
- {
- string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
- ContentHeaderProperties.ReplyTo = encodedDestination;
- }
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
public DeliveryMode DeliveryMode
{
get
@@ -242,100 +200,94 @@ namespace Qpid.Client.Message
}
}
+ /// <summary>
+ /// True, if this is a redelivered message
+ /// </summary>
public bool Redelivered
{
- get
- {
- return _redelivered;
- }
- set
- {
- _redelivered = value;
- }
- }
+ get { return _redelivered; }
+ set { _redelivered = value; }
+ }
+ /// <summary>
+ /// The message type name
+ /// </summary>
public string Type
{
- get
- {
- return MimeType;
- }
- set
- {
- //MimeType = value;
- }
+ get { return ContentHeaderProperties.Type; }
+ set { ContentHeaderProperties.Type = value; }
}
-
+
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
public long Expiration
{
- get
- {
- return ContentHeaderProperties.Expiration;
- }
- set
- {
- ContentHeaderProperties.Expiration = value;
- }
+ get { return ContentHeaderProperties.Expiration; }
+ set { ContentHeaderProperties.Expiration = value; }
}
- public int Priority
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ public byte Priority
{
- get
- {
- return ContentHeaderProperties.Priority;
- }
- set
- {
- ContentHeaderProperties.Priority = (byte) value;
- }
+ get { return ContentHeaderProperties.Priority; }
+ set { ContentHeaderProperties.Priority = (byte) value; }
}
- // FIXME: implement
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
public string ContentType
{
- get { throw new NotImplementedException(); }
- set { throw new NotImplementedException(); }
+ get { return ContentHeaderProperties.ContentType; }
+ set { ContentHeaderProperties.ContentType = value; }
}
- // FIXME: implement
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
public string ContentEncoding
{
- get { throw new NotImplementedException(); }
- set { throw new NotImplementedException(); }
- }
-
- public void Acknowledge()
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_channel != null)
- {
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
- }
-
+ get { return ContentHeaderProperties.Encoding; }
+ set { ContentHeaderProperties.Encoding = value; }
}
+ /// <summary>
+ /// Headers of this message
+ /// </summary>
public IHeaders Headers
{
get { return _headers; }
}
- public abstract void ClearBodyImpl();
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ public string UserId
+ {
+ get { return ContentHeaderProperties.UserId; }
+ set { ContentHeaderProperties.UserId = value; }
+ }
- public void ClearBody()
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ public string AppId
{
- ClearBodyImpl();
- _readableMessage = false;
+ get { return ContentHeaderProperties.AppId; }
+ set { ContentHeaderProperties.AppId = value; }
}
/// <summary>
- /// Get a String representation of the body of the message. Used in the
- /// toString() method which outputs this before message properties.
+ /// Intra-cluster routing identifier
/// </summary>
- /// <exception cref="QpidException"></exception>
- public abstract string ToBodyString();
+ public string ClusterId
+ {
+ get { return ContentHeaderProperties.ClusterId; }
+ set { ContentHeaderProperties.ClusterId = value; }
+ }
/// <summary>
/// Return the raw byte array that is used to populate the frame when sending
@@ -367,12 +319,37 @@ namespace Qpid.Client.Message
_data = value;
}
}
+ #endregion // Properties
+
- public abstract string MimeType
+ public void Acknowledge()
{
- get;
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_channel != null)
+ {
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
+ }
+
}
+ public abstract void ClearBodyImpl();
+
+ public void ClearBody()
+ {
+ ClearBodyImpl();
+ _readableMessage = false;
+ }
+
+ /// <summary>
+ /// Get a String representation of the body of the message. Used in the
+ /// toString() method which outputs this before message properties.
+ /// </summary>
+ /// <exception cref="QpidException"></exception>
+ public abstract string ToBodyString();
+
public override string ToString()
{
try
@@ -403,18 +380,6 @@ namespace Qpid.Client.Message
}
}
- public IFieldTable UnderlyingMessagePropertiesMap
- {
- get
- {
- return ContentHeaderProperties.Headers;
- }
- set
- {
- ContentHeaderProperties.Headers = (FieldTable)value;
- }
- }
-
public FieldTable PopulateHeadersFromMessageProperties()
{
if (ContentHeaderProperties.Headers == null)
@@ -466,5 +431,56 @@ namespace Qpid.Client.Message
{
get { return !_readableMessage; }
}
+
+ protected void CheckReadable()
+ {
+ if ( !_readableMessage )
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ /// <summary>
+ /// Decodes the replyto field if one is set.
+ ///
+ /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
+ /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
+ /// empty the replyto field is expected to being with ':'.
+ ///
+ /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
+ /// </summary>
+ ///
+ /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
+ private Dest ReadReplyToHeader()
+ {
+ string replyToEncoding = ContentHeaderProperties.ReplyTo;
+
+ if ( replyToEncoding == null )
+ {
+ return new Dest();
+ } else
+ {
+ // Split the replyto field on a ':'
+ string[] split = replyToEncoding.Split(':');
+
+ // Ensure that the replyto field argument only consisted of two parts.
+ if ( split.Length != 2 )
+ {
+ throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+ }
+
+ // Extract the exchange name and routing key from the split replyto field.
+ string exchangeName = split[0];
+ string routingKey = split[1];
+
+ return new Dest(exchangeName, routingKey);
+ }
+ }
+
+ private void WriteReplyToHeader(Dest dest)
+ {
+ string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+ ContentHeaderProperties.ReplyTo = encodedDestination;
+ }
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
index 4a109b128e..cffc585067 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
@@ -28,11 +28,12 @@ namespace Qpid.Client.Message
/// <summary>
/// Create a message
/// </summary>
- /// <param name="messageNbr"></param>
- /// <param name="redelivered"></param>
- /// <param name="contentHeader"></param>
- /// <param name="bodies"></param>
- /// <returns></returns>
+ /// <param name="deliverTag">Delivery Tag</param>
+ /// <param name="messageNbr">Message Sequence Number</param>
+ /// <param name="redelivered">True if this is a redelivered message</param>
+ /// <param name="contentHeader">Content headers</param>
+ /// <param name="bodies">Message bodies</param>
+ /// <returns>The new message</returns>
/// <exception cref="QpidMessagingException">if the message cannot be created</exception>
AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered,
ContentHeaderBody contentHeader,
@@ -41,9 +42,10 @@ namespace Qpid.Client.Message
/// <summary>
/// Creates the message.
/// </summary>
- /// <returns></returns>
+ /// <param name="mimeType">Mime type to associate the new message with</param>
+ /// <returns>The new message</returns>
/// <exception cref="QpidMessagingException">if the message cannot be created</exception>
- AbstractQmsMessage CreateMessage();
+ AbstractQmsMessage CreateMessage(string mimeType);
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
index 95257cef8a..f854a541fc 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
@@ -25,93 +25,104 @@ using Qpid.Messaging;
namespace Qpid.Client.Message
{
- public class MessageFactoryRegistry
- {
- private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+ public class MessageFactoryRegistry
+ {
+ private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+ private IMessageFactory _defaultFactory;
- public void RegisterFactory(string mimeType, IMessageFactory mf)
- {
- if (mf == null)
- {
- throw new ArgumentNullException("Message factory");
- }
- if (mimeType == null)
- {
- throw new ArgumentNullException("mf");
- }
- _mimeToFactoryMap[mimeType] = mf;
- }
+ /// <summary>
+ /// Default factory to use for unknown message types
+ /// </summary>
+ public IMessageFactory DefaultFactory
+ {
+ get { return _defaultFactory; }
+ set { _defaultFactory = value; }
+ }
- public void DeregisterFactory(string mimeType)
- {
- _mimeToFactoryMap.Remove(mimeType);
- }
+ /// <summary>
+ /// Register a new message factory for a MIME type
+ /// </summary>
+ /// <param name="mimeType">Mime type to register</param>
+ /// <param name="mf"></param>
+ public void RegisterFactory(string mimeType, IMessageFactory mf)
+ {
+ if ( mf == null )
+ throw new ArgumentNullException("mf");
+ if ( mimeType == null || mimeType.Length == 0 )
+ throw new ArgumentNullException("mimeType");
- /// <summary>
- /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
- /// concrete message type.
- /// </summary>
- /// <param name="messageNbr">the AMQ message id</param>
- /// <param name="redelivered">true if redelivered</param>
- /// <param name="contentHeader">the content header that was received</param>
- /// <param name="bodies">a list of ContentBody instances</param>
- /// <returns>the message.</returns>
- /// <exception cref="AMQException"/>
- /// <exception cref="QpidException"/>
- public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
- ContentHeaderBody contentHeader,
- IList bodies)
- {
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.Properties;
+ _mimeToFactoryMap[mimeType] = mf;
+ }
- if (properties.ContentType == null)
- {
- properties.ContentType = "";
- }
+ /// <summary>
+ /// Remove a message factory
+ /// </summary>
+ /// <param name="mimeType">MIME type to unregister</param>
+ public void DeregisterFactory(string mimeType)
+ {
+ _mimeToFactoryMap.Remove(mimeType);
+ }
- IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType];
- if (mf == null)
- {
- throw new AMQException("Unsupport MIME type of " + properties.ContentType);
- }
- else
- {
- return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
- }
- }
+ /// <summary>
+ /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ /// concrete message type.
+ /// </summary>
+ /// <param name="messageNbr">the AMQ message id</param>
+ /// <param name="redelivered">true if redelivered</param>
+ /// <param name="contentHeader">the content header that was received</param>
+ /// <param name="bodies">a list of ContentBody instances</param>
+ /// <returns>the message.</returns>
+ /// <exception cref="AMQException"/>
+ /// <exception cref="QpidException"/>
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties)contentHeader.Properties;
- public AbstractQmsMessage CreateMessage(string mimeType)
- {
- if (mimeType == null)
- {
- throw new ArgumentNullException("Mime type must not be null");
- }
- IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType];
- if (mf == null)
- {
- throw new AMQException("Unsupport MIME type of " + mimeType);
- }
- else
- {
- return mf.CreateMessage();
- }
- }
+ if ( properties.ContentType == null )
+ {
+ properties.ContentType = "";
+ }
- /// <summary>
- /// Construct a new registry with the default message factories registered
- /// </summary>
- /// <returns>a message factory registry</returns>
- public static MessageFactoryRegistry NewDefaultRegistry()
- {
- MessageFactoryRegistry mf = new MessageFactoryRegistry();
- mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
- mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
- mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
- // TODO: use bytes message for default message factory
- // MJA - just added this bit back in...
- mf.RegisterFactory("", new QpidBytesMessageFactory());
- return mf;
- }
- }
+ IMessageFactory mf = GetFactory(properties.ContentType);
+ return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
+ }
+
+ /// <summary>
+ /// Create a new message of the specified type
+ /// </summary>
+ /// <param name="mimeType">The Mime type</param>
+ /// <returns>The new message</returns>
+ public AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ if ( mimeType == null || mimeType.Length == 0 )
+ throw new ArgumentNullException("mimeType");
+
+ IMessageFactory mf = GetFactory(mimeType);
+ return mf.CreateMessage(mimeType);
+ }
+
+ /// <summary>
+ /// Construct a new registry with the default message factories registered
+ /// </summary>
+ /// <returns>a message factory registry</returns>
+ public static MessageFactoryRegistry NewDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
+ mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
+ mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
+
+ mf.DefaultFactory = new QpidBytesMessageFactory();
+ return mf;
+ }
+
+ private IMessageFactory GetFactory(string mimeType)
+ {
+ IMessageFactory mf = (IMessageFactory)_mimeToFactoryMap[mimeType];
+ return mf != null ? mf : _defaultFactory;
+ }
+ }
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
index 32e47d852a..cb504d1378 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
@@ -43,8 +43,6 @@ namespace Qpid.Client.Message
public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
{
- private const string MIME_TYPE = "application/octet-stream";
-
private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
public QpidBytesMessage() : this(null)
@@ -59,7 +57,6 @@ namespace Qpid.Client.Message
QpidBytesMessage(ByteBuffer data) : base(data)
{
// superclass constructor has instantiated a content header at this point
- ContentHeaderProperties.ContentType = MIME_TYPE;
if (data == null)
{
_data = ByteBuffer.Allocate(DEFAULT_BUFFER_INITIAL_SIZE);
@@ -71,7 +68,6 @@ namespace Qpid.Client.Message
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
: base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
{
- ContentHeaderProperties.ContentType = MIME_TYPE;
}
public override void ClearBodyImpl()
@@ -116,14 +112,6 @@ namespace Qpid.Client.Message
}
}
- public override string MimeType
- {
- get
- {
- return MIME_TYPE;
- }
- }
-
public long BodyLength
{
get
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
index de4c6675c7..e96c38cbac 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
@@ -62,9 +62,11 @@ namespace Qpid.Client.Message
return new QpidBytesMessage(deliveryTag, contentHeader, data);
}
- public override AbstractQmsMessage CreateMessage()
+ public override AbstractQmsMessage CreateMessage(string mimeType)
{
- return new QpidBytesMessage();
+ QpidBytesMessage msg = new QpidBytesMessage();
+ msg.ContentType = mimeType;
+ return msg;
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
index 6538fcbefc..a258c82d15 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
@@ -28,16 +28,10 @@ namespace Qpid.Client.Message
_headers.Clear();
}
- public string this[string name]
+ public object this[string name]
{
- get
- {
- return GetString(name);
- }
- set
- {
- SetString(name, value);
- }
+ get { return GetObject(name); }
+ set { SetObject(name, value); }
}
public bool GetBoolean(string name)
@@ -167,6 +161,18 @@ namespace Qpid.Client.Message
_headers.SetString(propertyName, value);
}
+ public object GetObject(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ return _headers[propertyName];
+ }
+
+ public void SetObject(string propertyName, object value)
+ {
+ CheckPropertyName(propertyName);
+ _headers[propertyName] = value;
+ }
+
private static void CheckPropertyName(string propertyName)
{
if ( propertyName == null )
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
index cff42f1df5..ae8bdb2074 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
@@ -28,25 +28,22 @@ namespace Qpid.Client.Message
{
public class QpidTextMessage : AbstractQmsMessage, ITextMessage
{
- private const string MIME_TYPE = "text/plain";
-
private string _decodedValue = null;
+ private static Encoding DEFAULT_ENCODING = Encoding.UTF8;
internal QpidTextMessage() : this(null, null)
{
+ ContentEncoding = DEFAULT_ENCODING.BodyName;
}
- QpidTextMessage(ByteBuffer data, String encoding) : base(data)
+ internal QpidTextMessage(ByteBuffer data, String encoding) : base(data)
{
- ContentHeaderProperties.ContentType = MIME_TYPE;
- ContentHeaderProperties.Encoding = encoding;
+ ContentEncoding = encoding;
}
internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
:base(deliveryTag, contentHeader, data)
{
- contentHeader.ContentType = MIME_TYPE;
- _data = data; // FIXME: Unnecessary - done in base class ctor.
}
public override void ClearBodyImpl()
@@ -64,14 +61,6 @@ namespace Qpid.Client.Message
return Text;
}
- public override string MimeType
- {
- get
- {
- return MIME_TYPE;
- }
- }
-
public string Text
{
get
@@ -100,7 +89,7 @@ namespace Qpid.Client.Message
}
else
{
- _decodedValue = Encoding.Default.GetString(bytes);
+ _decodedValue = DEFAULT_ENCODING.GetString(bytes);
}
return _decodedValue;
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
index cc4f6dafe1..4730fa56ad 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
@@ -25,9 +25,11 @@ namespace Qpid.Client.Message
{
public class QpidTextMessageFactory : AbstractQmsMessageFactory
{
- public override AbstractQmsMessage CreateMessage()
+ public override AbstractQmsMessage CreateMessage(string mimeType)
{
- return new QpidTextMessage();
+ QpidTextMessage msg = new QpidTextMessage();
+ msg.ContentType = mimeType;
+ return msg;
}
protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
index cb4e64718b..b64c8e1c27 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
@@ -43,7 +43,7 @@ namespace Qpid.Client.Message
Bodies.Add(body);
if (body.Payload != null)
{
- _bytesReceived += (uint)body.Payload.Length;
+ _bytesReceived += (uint)body.Payload.Remaining;
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs b/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
index 914170467a..d88683f7d5 100644
--- a/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
+++ b/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
@@ -171,19 +171,10 @@ namespace Qpid.Client
sb.Append('?');
foreach (String key in options.Keys)
{
- sb.Append(key);
-
- sb.Append("='");
-
- sb.Append(options[key]);
-
- sb.Append("'");
- sb.Append(DEFAULT_OPTION_SEPERATOR);
+ sb.AppendFormat("{0}='{1}'{2}", key, options[key], DEFAULT_OPTION_SEPERATOR);
}
sb.Remove(sb.Length - 1, 1);
- // sb.deleteCharAt(sb.length() - 1);
-
return sb.ToString();
}
}
@@ -358,9 +349,10 @@ namespace Qpid.Client
public class QpidConnectionInfo : IConnectionInfo
{
+ const string DEFAULT_VHOST = "/";
string _username = "guest";
string _password = "guest";
- string _virtualHost = "/";
+ string _virtualHost = DEFAULT_VHOST;
string _failoverMethod = null;
IDictionary _failoverOptions = new Hashtable();
@@ -385,15 +377,51 @@ namespace Qpid.Client
public string AsUrl()
{
- string result = "amqp://";
- foreach (IBrokerInfo info in _brokerInfos)
+ StringBuilder sb = new StringBuilder();
+ sb.AppendFormat("{0}://", ConnectionUrlConstants.AMQ_PROTOCOL);
+
+ if (_username != null)
{
- result += info.ToString();
+ sb.Append(_username);
+ if (_password != null)
+ {
+ sb.AppendFormat(":{0}", _password);
+ }
+ sb.Append("@");
}
- return result;
+ sb.Append(_clientName);
+ sb.Append(_virtualHost);
+ sb.Append(OptionsToString());
+
+ return sb.ToString();
}
+ private String OptionsToString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.AppendFormat("?{0}='", ConnectionUrlConstants.OPTIONS_BROKERLIST);
+
+ foreach (IBrokerInfo broker in _brokerInfos)
+ {
+ sb.AppendFormat("{0};", broker);
+ }
+
+ sb.Remove(sb.Length - 1, 1);
+ sb.Append("'");
+
+ if (_failoverMethod != null)
+ {
+ sb.AppendFormat("{0}{1}='{2}{3}'", URLHelper.DEFAULT_OPTION_SEPERATOR,
+ ConnectionUrlConstants.OPTIONS_FAILOVER,
+ _failoverMethod,
+ URLHelper.printOptions((Hashtable)_failoverOptions));
+ }
+
+ return sb.ToString();
+ }
+
+
public string FailoverMethod
{
get { return _failoverMethod; }
@@ -449,7 +477,13 @@ namespace Qpid.Client
public string VirtualHost
{
get { return _virtualHost; }
- set { _virtualHost = value; }
+ set {
+ _virtualHost = value;
+ if ( _virtualHost == null || _virtualHost.Length == 0 )
+ _virtualHost = DEFAULT_VHOST;
+ if ( _virtualHost[0] != '/' )
+ _virtualHost = '/' + _virtualHost;
+ }
}
public string GetOption(string key)
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
index 7be17a1080..1233f9d836 100644
--- a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
@@ -65,6 +65,8 @@ namespace Qpid.Client.State
IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+ IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+ IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
// We need to register a map for the null (i.e. all state) handlers otherwise you get
// a stack overflow in the handler searching code when you present it with a frame for which
@@ -96,6 +98,8 @@ namespace Qpid.Client.State
open[typeof(ConnectionCloseBody)] = connectionClose;
open[typeof(BasicDeliverBody)] = basicDeliver;
open[typeof(BasicReturnBody)] = basicReturn;
+ open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+ open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
_state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
}
{
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
index ca7ffad8b3..4e4ca03322 100644
--- a/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
@@ -33,35 +33,39 @@ namespace Qpid.Client.Transport
// Warning: don't use this log for regular logging.
static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
- IByteChannel byteChannel;
- IProtocolEncoder encoder;
- IProtocolDecoder decoder;
+ IByteChannel _byteChannel;
+ IProtocolEncoder _encoder;
+ IProtocolDecoder _decoder;
+ IProtocolDecoderOutput _decoderOutput;
+ private object _syncLock;
- public AmqpChannel(IByteChannel byteChannel)
+ public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput)
{
- this.byteChannel = byteChannel;
+ _byteChannel = byteChannel;
+ _decoderOutput = decoderOutput;
+ _syncLock = new object();
AMQProtocolProvider protocolProvider = new AMQProtocolProvider();
IProtocolCodecFactory factory = protocolProvider.CodecFactory;
- encoder = factory.Encoder;
- decoder = factory.Decoder;
+ _encoder = factory.Encoder;
+ _decoder = factory.Decoder;
}
- public Queue Read()
+ public void Read()
{
- ByteBuffer buffer = byteChannel.Read();
- return DecodeAndTrace(buffer);
+ ByteBuffer buffer = _byteChannel.Read();
+ Decode(buffer);
}
public IAsyncResult BeginRead(AsyncCallback callback, object state)
{
- return byteChannel.BeginRead(callback, state);
+ return _byteChannel.BeginRead(callback, state);
}
- public Queue EndRead(IAsyncResult result)
+ public void EndRead(IAsyncResult result)
{
- ByteBuffer buffer = byteChannel.EndRead(result);
- return DecodeAndTrace(buffer);
+ ByteBuffer buffer = _byteChannel.EndRead(result);
+ Decode(buffer);
}
public void Write(IDataBlock o)
@@ -74,43 +78,32 @@ namespace Qpid.Client.Transport
// we should be doing an async write, but apparently
// the mentalis library doesn't queue async read/writes
// correctly and throws random IOException's. Stay sync for a while
- //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
- byteChannel.Write(Encode(o));
+ //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
+ _byteChannel.Write(Encode(o));
}
private void OnAsyncWriteDone(IAsyncResult result)
{
- byteChannel.EndWrite(result);
+ _byteChannel.EndWrite(result);
}
- private Queue DecodeAndTrace(ByteBuffer buffer)
+ private void Decode(ByteBuffer buffer)
{
- Queue frames = Decode(buffer);
-
- // TODO: Refactor to decorator.
- if ( _protocolTraceLog.IsDebugEnabled )
+ // make sure we don't try to decode more than
+ // one buffer at the same time
+ lock ( _syncLock )
{
- foreach ( object o in frames )
- {
- _protocolTraceLog.Debug(String.Format("READ {0}", o));
- }
+ _decoder.Decode(buffer, _decoderOutput);
}
- return frames;
}
private ByteBuffer Encode(object o)
{
SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
- encoder.Encode(o, output);
+ _encoder.Encode(o, output);
return output.buffer;
}
- private Queue Decode(ByteBuffer byteBuffer)
- {
- SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput();
- decoder.Decode(byteBuffer, outx);
- return outx.MessageQueue;
- }
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
index 0379e582d6..e4d4d2ed29 100644
--- a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
@@ -25,8 +25,8 @@ namespace Qpid.Client.Transport
{
public interface IProtocolChannel : IProtocolWriter
{
- Queue Read();
+ void Read();
IAsyncResult BeginRead(AsyncCallback callback, object state);
- Queue EndRead(IAsyncResult result);
+ void EndRead(IAsyncResult result);
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
new file mode 100644
index 0000000000..07df62ea84
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Qpid.Client.Protocol;
+using Qpid.Codec;
+using Qpid.Framing;
+using log4net;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
+ /// each <see cref="IDataBlock"/> as it is decoded to the
+ /// protocol listener
+ /// </summary>
+ internal class ProtocolDecoderOutput : IProtocolDecoderOutput
+ {
+ private IProtocolListener _protocolListener;
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+
+ public ProtocolDecoderOutput(IProtocolListener protocolListener)
+ {
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ _protocolListener = protocolListener;
+ }
+
+ public void Write(object message)
+ {
+ IDataBlock block = message as IDataBlock;
+ if ( block != null )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", block));
+ _protocolListener.OnMessage(block);
+ }
+ }
+ }
+} // namespace Qpid.Client.Transport
+
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
index 1fb07fb245..2895c75431 100644
--- a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
@@ -24,6 +24,7 @@ using System.IO;
using System.Threading;
using Qpid.Client.Qms;
using Qpid.Client.Protocol;
+using Qpid.Codec;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
@@ -66,7 +67,11 @@ namespace Qpid.Client.Transport.Socket.Blocking
_ioHandler = MakeBrokerConnection(broker, connection);
// todo: get default read size from config!
- _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
// post an initial async read
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
}
@@ -117,22 +122,28 @@ namespace Qpid.Client.Transport.Socket.Blocking
{
try
{
- Queue frames = _amqpChannel.EndRead(result);
-
- // process results
- foreach ( IDataBlock dataBlock in frames )
- {
- _protocolListener.OnMessage(dataBlock);
- }
- // if we're not stopping, post a read again
+ _amqpChannel.EndRead(result);
+
bool stopping = _stopEvent.WaitOne(0, false);
if ( !stopping )
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
} catch ( Exception e )
{
- _protocolListener.OnException(e);
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
}
}
+
+ #region IProtocolDecoderOutput Members
+
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+
+ #endregion
}
}
diff --git a/qpid/dotnet/Qpid.Client/Qpid.Client.csproj b/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
index 2bafeb23f6..7ac6f94800 100644
--- a/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
+++ b/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
@@ -50,6 +50,8 @@
<Compile Include="Client\AMQNoConsumersException.cs" />
<Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+ <Compile Include="Client\Handler\QueueDeleteOkMethodHandler.cs" />
+ <Compile Include="Client\Handler\QueuePurgeOkMethodHandler.cs" />
<Compile Include="Client\SslOptions.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -110,6 +112,7 @@
<Compile Include="Client\Transport\IProtocolChannel.cs" />
<Compile Include="Client\Transport\IProtocolWriter.cs" />
<Compile Include="Client\Transport\ITransport.cs" />
+ <Compile Include="Client\Transport\ProtocolDecoderOutput.cs" />
<Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
<Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />
diff --git a/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs b/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs
index 72c56e0b17..3d454b284b 100644
--- a/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs
+++ b/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs
@@ -19,121 +19,134 @@
*
*/
using System;
+using log4net;
using Qpid.Buffer;
namespace Qpid.Codec
{
- public abstract class CumulativeProtocolDecoder : IProtocolDecoder
- {
- ByteBuffer _remaining;
+ public abstract class CumulativeProtocolDecoder : IProtocolDecoder
+ {
+ static ILog _logger = LogManager.GetLogger(typeof(CumulativeProtocolDecoder));
- /// <summary>
- /// Creates a new instance with the 4096 bytes initial capacity of
- /// cumulative buffer.
- /// </summary>
- protected CumulativeProtocolDecoder()
- {
- _remaining = ByteBuffer.Allocate(4096);
- _remaining.IsAutoExpand = true;
- }
+ ByteBuffer _remaining;
- /// <summary>
- /// Cumulates content of <tt>in</tt> into internal buffer and forwards
- /// decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- /// <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- /// and the cumulative buffer is compacted after decoding ends.
- /// </summary>
- /// <exception cref="Exception">
- /// if your <tt>doDecode()</tt> returned <tt>true</tt> not consuming the cumulative buffer.
- /// </exception>
- public void Decode(ByteBuffer input, IProtocolDecoderOutput output)
- {
- if (_remaining.Position != 0) // If there were remaining undecoded bytes
- {
- DecodeRemainingAndInput(input, output);
- }
- else
- {
- DecodeInput(input, output);
- }
- }
+ /// <summary>
+ /// Creates a new instance with the 4096 bytes initial capacity of
+ /// cumulative buffer.
+ /// </summary>
+ protected CumulativeProtocolDecoder()
+ {
+ _remaining = AllocateBuffer();
+ }
- private void DecodeInput(ByteBuffer input, IProtocolDecoderOutput output)
- {
- // Just decode the input buffer and remember any remaining undecoded bytes.
- try
- {
- DecodeAll(input, output);
- }
- finally
+ /// <summary>
+ /// Cumulates content of <tt>in</tt> into internal buffer and forwards
+ /// decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ /// <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ /// and the cumulative buffer is compacted after decoding ends.
+ /// </summary>
+ /// <exception cref="Exception">
+ /// if your <tt>doDecode()</tt> returned <tt>true</tt> not consuming the cumulative buffer.
+ /// </exception>
+ public void Decode(ByteBuffer input, IProtocolDecoderOutput output)
+ {
+ if ( _remaining.Position != 0 ) // If there were remaining undecoded bytes
+ {
+ DecodeRemainingAndInput(input, output);
+ } else
+ {
+ DecodeInput(input, output);
+ }
+ }
+
+ private void DecodeInput(ByteBuffer input, IProtocolDecoderOutput output)
+ {
+ _logger.Debug(string.Format("DecodeInput: input {0}", input.Remaining));
+ // Just decode the input buffer and remember any remaining undecoded bytes.
+ try
+ {
+ DecodeAll(input, output);
+ } finally
+ {
+ if ( input.HasRemaining )
{
- if (input.HasRemaining)
- {
- _remaining.Put(input);
- }
+ _remaining.Put(input);
}
- }
+ }
+ }
- private void DecodeRemainingAndInput(ByteBuffer input, IProtocolDecoderOutput output)
- {
- // Concatenate input buffer with left-over bytes.
- _remaining.Put(input);
- _remaining.Flip();
+ private void DecodeRemainingAndInput(ByteBuffer input, IProtocolDecoderOutput output)
+ {
+ _logger.Debug(string.Format("DecodeRemainingAndInput: input {0}, remaining {1}", input.Remaining, _remaining.Position));
+ // replace the _remainder buffer, so that we can leave the
+ // original one alone. Necessary because some consumer splice
+ // the buffer and only consume it until later, causing
+ // a race condition if we compact it too soon.
+ ByteBuffer newRemainding = AllocateBuffer();
+ ByteBuffer temp = _remaining;
+ _remaining = newRemainding;
+ temp.Put(input);
+ temp.Flip();
+ try
+ {
+ DecodeAll(temp, output);
+ } finally
+ {
+ if ( temp.Remaining > 0 )
+ _remaining.Put(temp);
+ }
+ }
- try
- {
- DecodeAll(_remaining, output);
- }
- finally
+ private void DecodeAll(ByteBuffer buf, IProtocolDecoderOutput output)
+ {
+ for ( ; ; )
+ {
+ int oldPos = buf.Position;
+ bool decoded = DoDecode(buf, output);
+ if ( decoded )
{
- _remaining.Compact();
- }
- }
+ if ( buf.Position == oldPos )
+ {
+ throw new Exception(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
- private void DecodeAll(ByteBuffer buf, IProtocolDecoderOutput output)
- {
- for (;;)
+ if ( !buf.HasRemaining )
+ {
+ break;
+ }
+ } else
{
- int oldPos = buf.Position;
- bool decoded = DoDecode(buf, output);
- if (decoded)
- {
- if (buf.Position == oldPos)
- {
- throw new Exception(
- "doDecode() can't return true when buffer is not consumed.");
- }
-
- if (!buf.HasRemaining)
- {
- break;
- }
- }
- else
- {
- break;
- }
+ break;
}
- }
+ }
+ }
+
+ /// <summary>
+ /// Implement this method to consume the specified cumulative buffer and
+ /// decode its content into message(s).
+ /// </summary>
+ /// <param name="input">the cumulative buffer</param>
+ /// <param name="output">decoder output</param>
+ /// <returns>
+ /// <tt>true</tt> if and only if there's more to decode in the buffer
+ /// and you want to have <tt>doDecode</tt> method invoked again.
+ /// Return <tt>false</tt> if remaining data is not enough to decode,
+ /// then this method will be invoked again when more data is cumulated.
+ /// </returns>
+ /// <exception cref="Exception">If cannot decode</exception>
+ protected abstract bool DoDecode(ByteBuffer input, IProtocolDecoderOutput output);
- /// <summary>
- /// Implement this method to consume the specified cumulative buffer and
- /// decode its content into message(s).
- /// </summary>
- /// <param name="input">the cumulative buffer</param>
- /// <param name="output">decoder output</param>
- /// <returns>
- /// <tt>true</tt> if and only if there's more to decode in the buffer
- /// and you want to have <tt>doDecode</tt> method invoked again.
- /// Return <tt>false</tt> if remaining data is not enough to decode,
- /// then this method will be invoked again when more data is cumulated.
- /// </returns>
- /// <exception cref="Exception">If cannot decode</exception>
- protected abstract bool DoDecode(ByteBuffer input, IProtocolDecoderOutput output);
+ public void Dispose()
+ {
+ _remaining = null;
+ }
- public void Dispose()
- {
- _remaining = null;
- }
- }
+ private ByteBuffer AllocateBuffer()
+ {
+ ByteBuffer buffer = ByteBuffer.Allocate(4096);
+ buffer.IsAutoExpand = true;
+ return buffer;
+ }
+ }
}
diff --git a/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs b/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs
index 75d67fdfb8..0c06a01eb4 100644
--- a/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs
+++ b/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs
@@ -25,144 +25,266 @@ using Qpid.Messaging;
namespace Qpid.Framing
{
- public class BasicContentHeaderProperties : IContentHeaderProperties
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties));
+ public class BasicContentHeaderProperties : IContentHeaderProperties
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties));
- public string ContentType;
+ private string _contentType;
+ private string _encoding;
+ private FieldTable _headers;
+ private byte _deliveryMode;
+ private byte _priority;
+ private string _correlationId;
+ private long _expiration;
+ private string _replyTo;
+ private string _messageId;
+ private ulong _timestamp;
+ private string _type;
+ private string _userId;
+ private string _appId;
+ private string _clusterId;
- public string Encoding;
- public FieldTable Headers;
+ #region Properties
+ //
+ // Properties
+ //
- public byte DeliveryMode;
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ public string ContentType
+ {
+ get { return _contentType; }
+ set { _contentType = value; }
+ }
- public byte Priority;
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ public string Encoding
+ {
+ get { return _encoding; }
+ set { _encoding = value; }
+ }
- public string CorrelationId;
+ /// <summary>
+ /// Message headers
+ /// </summary>
+ public FieldTable Headers
+ {
+ get { return _headers; }
+ set { _headers = value; }
+ }
- public long Expiration;
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ public byte DeliveryMode
+ {
+ get { return _deliveryMode; }
+ set { _deliveryMode = value; }
+ }
- public string ReplyTo;
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ public byte Priority
+ {
+ get { return _priority; }
+ set { _priority = value; }
+ }
- public string MessageId;
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ public string CorrelationId
+ {
+ get { return _correlationId; }
+ set { _correlationId = value; }
+ }
- public ulong Timestamp;
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ // TODO: Should be string according to spec
+ public long Expiration
+ {
+ get { return _expiration; }
+ set { _expiration = value; }
+ }
- public string Type;
+ /// <summary>
+ /// The destination to reply to
+ /// </summary>
+ public string ReplyTo
+ {
+ get { return _replyTo; }
+ set { _replyTo = value; }
+ }
- public string UserId;
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ public string MessageId
+ {
+ get { return _messageId; }
+ set { _messageId = value; }
+ }
- public string AppId;
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
+ public ulong Timestamp
+ {
+ get { return _timestamp; }
+ set { _timestamp = value; }
+ }
- public string ClusterId;
-
- public BasicContentHeaderProperties()
- {
- }
+ /// <summary>
+ /// The message type name
+ /// </summary>
+ public string Type
+ {
+ get { return _type; }
+ set { _type = value; }
+ }
- public uint PropertyListSize
- {
- get
- {
- return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) +
- EncodingUtils.EncodedShortStringLength(Encoding) +
- EncodingUtils.EncodedFieldTableLength(Headers) +
- 1 + 1 +
- EncodingUtils.EncodedShortStringLength(CorrelationId) +
- EncodingUtils.EncodedShortStringLength(ReplyTo) +
- EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) +
- EncodingUtils.EncodedShortStringLength(MessageId) +
- 8 +
- EncodingUtils.EncodedShortStringLength(Type) +
- EncodingUtils.EncodedShortStringLength(UserId) +
- EncodingUtils.EncodedShortStringLength(AppId) +
- EncodingUtils.EncodedShortStringLength(ClusterId));
-
- }
- }
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ public string UserId
+ {
+ get { return _userId; }
+ set { _userId = value; }
+ }
- public ushort PropertyFlags
- {
- get
- {
- int value = 0;
-
- // for now we just blast in all properties
- for (int i = 0; i < 14; i++)
- {
- value += (1 << (15-i));
- }
- return (ushort) value;
- }
- }
-
- public void WritePropertyListPayload(ByteBuffer buffer)
- {
- EncodingUtils.WriteShortStringBytes(buffer, ContentType);
- EncodingUtils.WriteShortStringBytes(buffer, Encoding);
- EncodingUtils.WriteFieldTableBytes(buffer, Headers);
- buffer.Put(DeliveryMode);
- buffer.Put(Priority);
- EncodingUtils.WriteShortStringBytes(buffer, CorrelationId);
- EncodingUtils.WriteShortStringBytes(buffer, ReplyTo);
- EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration));
- EncodingUtils.WriteShortStringBytes(buffer, MessageId);
- buffer.Put(Timestamp);
- EncodingUtils.WriteShortStringBytes(buffer, Type);
- EncodingUtils.WriteShortStringBytes(buffer, UserId);
- EncodingUtils.WriteShortStringBytes(buffer, AppId);
- EncodingUtils.WriteShortStringBytes(buffer, ClusterId);
- }
-
- public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags)
- {
- _log.Debug("Property flags: " + propertyFlags);
- if ((propertyFlags & (1 << 15)) > 0)
- ContentType = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 14)) > 0)
- Encoding = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 13)) > 0)
- Headers = EncodingUtils.ReadFieldTable(buffer);
- if ((propertyFlags & (1 << 12)) > 0)
- DeliveryMode = buffer.GetByte();
- if ((propertyFlags & (1 << 11)) > 0)
- Priority = buffer.GetByte();
- if ((propertyFlags & (1 << 10)) > 0)
- CorrelationId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 9)) > 0)
- ReplyTo = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 8)) > 0)
- Expiration = EncodingUtils.ReadLongAsShortString(buffer);
- if ((propertyFlags & (1 << 7)) > 0)
- MessageId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 6)) > 0)
- Timestamp = buffer.GetUInt64();
- if ((propertyFlags & (1 << 5)) > 0)
- Type = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 4)) > 0)
- UserId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 3)) > 0)
- AppId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 2)) > 0)
- ClusterId = EncodingUtils.ReadShortString(buffer);
- }
-
- public void SetDeliveryMode(DeliveryMode deliveryMode)
- {
- if (deliveryMode == Messaging.DeliveryMode.NonPersistent)
- {
- DeliveryMode = 1;
- }
- else
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ public string AppId
+ {
+ get { return _appId; }
+ set { _appId = value; }
+ }
+
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ public string ClusterId
+ {
+ get { return _clusterId; }
+ set { _clusterId = value; }
+ }
+
+ #endregion // Properties
+
+
+ public BasicContentHeaderProperties()
+ {
+ }
+
+ public uint PropertyListSize
+ {
+ get
+ {
+ return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) +
+ EncodingUtils.EncodedShortStringLength(Encoding) +
+ EncodingUtils.EncodedFieldTableLength(Headers) +
+ 1 + 1 +
+ EncodingUtils.EncodedShortStringLength(CorrelationId) +
+ EncodingUtils.EncodedShortStringLength(ReplyTo) +
+ EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) +
+ EncodingUtils.EncodedShortStringLength(MessageId) +
+ 8 +
+ EncodingUtils.EncodedShortStringLength(Type) +
+ EncodingUtils.EncodedShortStringLength(UserId) +
+ EncodingUtils.EncodedShortStringLength(AppId) +
+ EncodingUtils.EncodedShortStringLength(ClusterId));
+
+ }
+ }
+
+ public ushort PropertyFlags
+ {
+ get
+ {
+ int value = 0;
+
+ // for now we just blast in all properties
+ for ( int i = 0; i < 14; i++ )
{
- DeliveryMode = 2;
+ value += (1 << (15 - i));
}
- }
+ return (ushort)value;
+ }
+ }
+
+ public void WritePropertyListPayload(ByteBuffer buffer)
+ {
+ EncodingUtils.WriteShortStringBytes(buffer, ContentType);
+ EncodingUtils.WriteShortStringBytes(buffer, Encoding);
+ EncodingUtils.WriteFieldTableBytes(buffer, Headers);
+ buffer.Put(DeliveryMode);
+ buffer.Put(Priority);
+ EncodingUtils.WriteShortStringBytes(buffer, CorrelationId);
+ EncodingUtils.WriteShortStringBytes(buffer, ReplyTo);
+ EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration));
+ EncodingUtils.WriteShortStringBytes(buffer, MessageId);
+ buffer.Put(Timestamp);
+ EncodingUtils.WriteShortStringBytes(buffer, Type);
+ EncodingUtils.WriteShortStringBytes(buffer, UserId);
+ EncodingUtils.WriteShortStringBytes(buffer, AppId);
+ EncodingUtils.WriteShortStringBytes(buffer, ClusterId);
+ }
+
+ public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags)
+ {
+ _log.Debug("Property flags: " + propertyFlags);
+ if ( (propertyFlags & (1 << 15)) > 0 )
+ ContentType = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 14)) > 0 )
+ Encoding = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 13)) > 0 )
+ Headers = EncodingUtils.ReadFieldTable(buffer);
+ if ( (propertyFlags & (1 << 12)) > 0 )
+ DeliveryMode = buffer.GetByte();
+ if ( (propertyFlags & (1 << 11)) > 0 )
+ Priority = buffer.GetByte();
+ if ( (propertyFlags & (1 << 10)) > 0 )
+ CorrelationId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 9)) > 0 )
+ ReplyTo = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 8)) > 0 )
+ Expiration = EncodingUtils.ReadLongAsShortString(buffer);
+ if ( (propertyFlags & (1 << 7)) > 0 )
+ MessageId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 6)) > 0 )
+ Timestamp = buffer.GetUInt64();
+ if ( (propertyFlags & (1 << 5)) > 0 )
+ Type = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 4)) > 0 )
+ UserId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 3)) > 0 )
+ AppId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 2)) > 0 )
+ ClusterId = EncodingUtils.ReadShortString(buffer);
+ }
+
+ public void SetDeliveryMode(DeliveryMode deliveryMode)
+ {
+ if ( deliveryMode == Messaging.DeliveryMode.NonPersistent )
+ {
+ DeliveryMode = 1;
+ } else
+ {
+ DeliveryMode = 2;
+ }
+ }
- public override string ToString()
- {
- return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type;
- }
- }
+ public override string ToString()
+ {
+ return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type;
+ }
+ }
}
diff --git a/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs b/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs
index b63df22339..617086f2e8 100644
--- a/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs
+++ b/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs
@@ -26,11 +26,24 @@ namespace Qpid.Framing
{
public const byte TYPE = 3;
- /// <summary>
- ///
- /// </summary>
- /// TODO: consider whether this should be a pointer into the ByteBuffer to avoid copying */
- public byte[] Payload;
+ private ByteBuffer _payload;
+
+ public ByteBuffer Payload
+ {
+ get { return _payload; }
+ }
+
+ public ContentBody()
+ {
+ }
+ public ContentBody(ByteBuffer payload)
+ {
+ PopulateFromBuffer(payload, (uint)payload.Remaining);
+ }
+ public ContentBody(ByteBuffer payload, uint length)
+ {
+ PopulateFromBuffer(payload, length);
+ }
#region IBody Members
@@ -46,7 +59,7 @@ namespace Qpid.Framing
{
get
{
- return (ushort)(Payload == null ? 0 : Payload.Length);
+ return (ushort)(Payload == null ? 0 : Payload.Remaining);
}
}
@@ -55,6 +68,7 @@ namespace Qpid.Framing
if (Payload != null)
{
buffer.Put(Payload);
+ Payload.Rewind();
}
}
@@ -62,8 +76,9 @@ namespace Qpid.Framing
{
if (size > 0)
{
- Payload = new byte[size];
- buffer.GetBytes(Payload);
+ _payload = buffer.Slice();
+ _payload.Limit = (int)size;
+ buffer.Skip((int)size);
}
}
@@ -76,5 +91,10 @@ namespace Qpid.Framing
frame.BodyFrame = body;
return frame;
}
+
+ public override string ToString()
+ {
+ return string.Format("ContentBody [ Size: {0} ]", Size);
+ }
}
}
diff --git a/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs b/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs
index fe83fff721..adff817923 100644
--- a/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs
+++ b/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs
@@ -47,7 +47,6 @@ namespace Qpid.Framing
/// </summary>
/// <param name="buffer">the buffer from which to read data. The length byte must be read already</param>
/// <param name="length">the length of the field table. Must be > 0.</param>
- /// <exception cref="AMQFrameDecodingException">if there is an error decoding the table</exception>
public FieldTable(ByteBuffer buffer, uint length) : this()
{
_encodedForm = buffer.Slice();
@@ -497,27 +496,18 @@ namespace Qpid.Framing
private AMQTypedValue GetProperty(string name)
{
- lock ( _syncLock )
- {
- if ( _properties == null )
- {
- if ( _encodedForm == null )
- {
- return null;
- } else
- {
- PopulateFromBuffer();
- }
- }
- return (AMQTypedValue) _properties[name];
- }
+ InitMapIfNecessary();
+ return (AMQTypedValue) _properties[name];
}
private void PopulateFromBuffer()
{
try
{
- SetFromBuffer(_encodedForm, _encodedSize);
+ ByteBuffer buffer = _encodedForm;
+ _encodedForm = null;
+ if ( buffer != null )
+ SetFromBuffer(buffer, _encodedSize);
} catch ( AMQFrameDecodingException e )
{
_log.Error("Error decoding FieldTable in deferred decoding mode ", e);
@@ -598,7 +588,11 @@ namespace Qpid.Framing
{
if ( _encodedForm != null )
{
- buffer.Put(_encodedForm);
+ lock ( _syncLock )
+ {
+ buffer.Put(_encodedForm);
+ _encodedForm.Flip();
+ }
} else if ( _properties != null )
{
foreach ( DictionaryEntry de in _properties )
@@ -629,6 +623,7 @@ namespace Qpid.Framing
_log.Debug("Buffer Position:" + buffer.Position +
" Remaining:" + buffer.Remaining);
}
+ throw;
}
}
}
diff --git a/qpid/dotnet/Qpid.Messaging/IChannel.cs b/qpid/dotnet/Qpid.Messaging/IChannel.cs
index 7fceb1a532..7ff1e4b82d 100644
--- a/qpid/dotnet/Qpid.Messaging/IChannel.cs
+++ b/qpid/dotnet/Qpid.Messaging/IChannel.cs
@@ -23,42 +23,135 @@ using System;
namespace Qpid.Messaging
{
public delegate void MessageReceivedDelegate(IMessage msg);
-
+
+ /// <summary>
+ /// Interface used to manipulate an AMQP channel.
+ /// </summary>
+ /// <remarks>
+ /// You can create a channel by using the CreateChannel() method
+ /// of the connection object.
+ /// </remarks>
public interface IChannel : IDisposable
{
+ /// <summary>
+ /// Acknowledge mode for messages received
+ /// </summary>
AcknowledgeMode AcknowledgeMode { get; }
+ /// <summary>
+ /// True if the channel should use transactions
+ /// </summary>
bool Transacted { get; }
/// <summary>
/// Prefetch value to be used as the default for consumers created on this channel.
/// </summary>
- int DefaultPrefetch
- {
- get;
- set;
- }
-
+ int DefaultPrefetch { get; set; }
+
+ /// <summary>
+ /// Declare a new exchange
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
+ /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param>
void DeclareExchange(string exchangeName, string exchangeClass);
+ /// <summary>
+ /// Declare a new exchange using the default exchange class
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
void DeleteExchange(string exchangeName);
+ /// <summary>
+ /// Declare a new queue with the specified set of arguments
+ /// </summary>
+ /// <param name="queueName">Name of the queue</param>
+ /// <param name="isDurable">True if the queue should be durable</param>
+ /// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
+ /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete);
- void DeleteQueue();
-
+ /// <summary>
+ /// Delete a queue with the specifies arguments
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+ /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait);
+ /// <summary>
+ /// Generate a new Unique name to use for a queue
+ /// </summary>
+ /// <returns>A unique name to this channel</returns>
string GenerateUniqueName();
- IFieldTable CreateFieldTable();
+ /// <summary>
+ /// Removes all messages from a queue
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ void PurgeQueue(string queueName, bool noWait);
+
+ /// <summary>
+ /// Bind a queue to the specified exchange
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
void Bind(string queueName, string exchangeName, string routingKey);
+ /// <summary>
+ /// Bind a queue to the specified exchange
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param>
void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args);
+ /// <summary>
+ /// Create a new empty message with no body
+ /// </summary>
+ /// <returns>The new message</returns>
IMessage CreateMessage();
+ /// <summary>
+ /// Create a new message of the specified MIME type
+ /// </summary>
+ /// <param name="mimeType">The mime type to create</param>
+ /// <returns>The new message</returns>
+ IMessage CreateMessage(string mimeType);
+ /// <summary>
+ /// Creates a new message for bytes (application/octet-stream)
+ /// </summary>
+ /// <returns>The new message</returns>
IBytesMessage CreateBytesMessage();
+ /// <summary>
+ /// Creates a new text message (text/plain) with empty content
+ /// </summary>
+ /// <returns>The new message</returns>
ITextMessage CreateTextMessage();
+ /// <summary>
+ /// Creates a new text message (text/plain) with a body
+ /// </summary>
+ /// <param name="initialValue">Initial body of the message</param>
+ /// <returns>The new message</returns>
ITextMessage CreateTextMessage(string initialValue);
#region Consuming
-
+
+ /// <summary>
+ /// Creates a new Consumer using the builder pattern
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <returns>The builder object</returns>
MessageConsumerBuilder CreateConsumerBuilder(string queueName);
+ /// <summary>
+ /// Creates a new consumer
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <param name="prefetchLow">Low prefetch value</param>
+ /// <param name="prefetchHigh">High prefetch value</param>
+ /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
+ /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
+ /// <param name="durable">If true, create a durable subscription</param>
+ /// <param name="subscriptionName">Subscription name</param>
+ /// <returns>The new consumer</returns>
IMessageConsumer CreateConsumer(string queueName,
int prefetchLow,
int prefetchHigh,
@@ -66,15 +159,35 @@ namespace Qpid.Messaging
bool exclusive,
bool durable,
string subscriptionName);
-
+
+ /// <summary>
+ /// Unsubscribe from a queue
+ /// </summary>
+ /// <param name="subscriptionName">Subscription name</param>
void Unsubscribe(string subscriptionName);
#endregion
#region Publishing
+ /// <summary>
+ /// Create a new message publisher using the builder pattern
+ /// </summary>
+ /// <returns>The builder object</returns>
MessagePublisherBuilder CreatePublisherBuilder();
-
+
+ /// <summary>
+ /// Create a new message publisher
+ /// </summary>
+ /// <param name="exchangeName">Name of exchange to publish to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="deliveryMode">Default delivery mode</param>
+ /// <param name="timeToLive">Default TTL time of messages</param>
+ /// <param name="immediate">If true, sent immediately</param>
+ /// <param name="mandatory">If true, the broker will return an error
+ /// (as a connection exception) if the message cannot be delivered</param>
+ /// <param name="priority">Default message priority</param>
+ /// <returns>The new message publisher</returns>
IMessagePublisher CreatePublisher(string exchangeName,
string routingKey,
DeliveryMode deliveryMode,
@@ -86,9 +199,18 @@ namespace Qpid.Messaging
#endregion
#region Transactions
-
+
+ /// <summary>
+ /// Recover after transaction failure
+ /// </summary>
void Recover();
+ /// <summary>
+ /// Commit the transaction
+ /// </summary>
void Commit();
+ /// <summary>
+ /// Rollback the transaction
+ /// </summary>
void Rollback();
#endregion
diff --git a/qpid/dotnet/Qpid.Messaging/IHeaders.cs b/qpid/dotnet/Qpid.Messaging/IHeaders.cs
index aa2d0278f7..edd0ef989b 100644
--- a/qpid/dotnet/Qpid.Messaging/IHeaders.cs
+++ b/qpid/dotnet/Qpid.Messaging/IHeaders.cs
@@ -35,7 +35,7 @@ namespace Qpid.Messaging
{
bool Contains(string name);
- string this[string name] { get; set; }
+ object this[string name] { get; set; }
bool GetBoolean(string name);
void SetBoolean(string name, bool value);
diff --git a/qpid/dotnet/Qpid.Messaging/IMessage.cs b/qpid/dotnet/Qpid.Messaging/IMessage.cs
index 60febc942a..d63662f7e3 100644
--- a/qpid/dotnet/Qpid.Messaging/IMessage.cs
+++ b/qpid/dotnet/Qpid.Messaging/IMessage.cs
@@ -22,25 +22,75 @@ namespace Qpid.Messaging
{
public interface IMessage
{
- string ContentType { get; set;}
- string ContentEncoding { get; set; }
- string CorrelationId { get; set; }
- byte[] CorrelationIdAsBytes { get; set; }
- DeliveryMode DeliveryMode { get; set; }
- long Expiration { get; set; }
- string MessageId { get; set; }
- int Priority { get; set; }
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ string ContentType { get; set;}
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ string ContentEncoding { get; set; }
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ string CorrelationId { get; set; }
+ /// <summary>
+ /// The application correlation identifier, as an array of bytes
+ /// </summary>
+ byte[] CorrelationIdAsBytes { get; set; }
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ DeliveryMode DeliveryMode { get; set; }
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ long Expiration { get; set; }
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ string MessageId { get; set; }
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ byte Priority { get; set; }
+ /// <summary>
+ /// True if the message has been redelivered
+ /// </summary>
bool Redelivered { get; set; }
+ /// <summary>
+ /// Exchange name of the reply-to address
+ /// </summary>
string ReplyToExchangeName { get; set; }
+ /// <summary>
+ /// Routing key of the reply-to address
+ /// </summary>
string ReplyToRoutingKey { get; set; }
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
long Timestamp { get; set; }
+ /// <summary>
+ /// The message type name
+ /// </summary>
string Type { get; set; }
+ /// <summary>
+ /// Message headers
+ /// </summary>
IHeaders Headers { get; }
-
- // XXX: UserId?
- // XXX: AppId?
- // XXX: ClusterId?
-
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ string UserId { get; set; }
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ string AppId { get; set; }
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ string ClusterId { get; set; }
+
void Acknowledge();
void ClearBody();
}