summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/dotnet/Qpid.Buffer/FixedByteBuffer.cs380
-rw-r--r--qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs473
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs6
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj4
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs211
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs188
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/log4net.config15
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AMQConnection.cs36
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqChannel.cs10
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs2
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs2
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs2
-rw-r--r--qpid/dotnet/Qpid.Messaging/IHeaders.cs11
-rw-r--r--qpid/dotnet/Qpid.NET.sln12
-rw-r--r--qpid/dotnet/TopicListener/Program.cs10
-rw-r--r--qpid/dotnet/TopicListener/Properties/AssemblyInfo.cs33
-rw-r--r--qpid/dotnet/TopicListener/TopicListener.csproj89
-rw-r--r--qpid/dotnet/TopicPublisher/Program.cs10
-rw-r--r--qpid/dotnet/TopicPublisher/Properties/AssemblyInfo.cs33
-rw-r--r--qpid/dotnet/TopicPublisher/TopicPublisher.csproj85
20 files changed, 1579 insertions, 33 deletions
diff --git a/qpid/dotnet/Qpid.Buffer/FixedByteBuffer.cs b/qpid/dotnet/Qpid.Buffer/FixedByteBuffer.cs
new file mode 100644
index 0000000000..30d51b6d9e
--- /dev/null
+++ b/qpid/dotnet/Qpid.Buffer/FixedByteBuffer.cs
@@ -0,0 +1,380 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Buffer
+{
+ /// <summary>
+ /// A FixedByteBuffer is for ....
+ ///
+ /// <para/>Todo: Seems to wrap a HeapByteBuffer and add more its method signature, other than that, seems pointless. Push down the extra methods
+ /// onto HeapByteBuffer and get rid of this altoghether? (Or pull heap byte buffer up into this class and get rid of it).
+ ///
+ /// <para/>Todo: Name methods using C# conventions.
+ ///
+ /// </summary>
+ public class FixedByteBuffer
+ {
+ private HeapByteBuffer _buf;
+
+ /// <summary>
+ /// Creates a fixed
+ /// </summary>
+ /// <param name="capacity"></param>
+ public FixedByteBuffer(int capacity)
+ {
+ _buf = new HeapByteBuffer(capacity);
+ }
+
+ public FixedByteBuffer(byte[] bytes)
+ {
+ _buf = HeapByteBuffer.wrap(bytes);
+ }
+
+ public static FixedByteBuffer wrap(byte[] array)
+ {
+ return new FixedByteBuffer(array);
+ }
+
+ public static FixedByteBuffer wrap(byte[] array, int offset, int length)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ByteOrder order()
+ {
+ return ByteOrder.LittleEndian;
+ }
+
+ public void order(ByteOrder bo)
+ {
+ // Ignore endianess.
+ }
+
+ public void compact()
+ {
+ _buf.Compact();
+ }
+
+ public char getChar()
+ {
+ throw new NotImplementedException();
+ }
+
+ public char getChar(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putChar(char value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putChar(int index, char value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public bool isDirect()
+ {
+ return false;
+ }
+
+ public bool isReadOnly()
+ {
+ throw new NotImplementedException();
+ }
+
+ public int capacity()
+ {
+ return _buf.Capacity;
+ }
+
+ public int limit()
+ {
+ return _buf.Limit;
+ }
+
+ public int limit(int limit)
+ {
+ int previousLimit = _buf.Limit;
+ _buf.Limit = limit;
+ return previousLimit;
+ }
+
+ public int position()
+ {
+ return _buf.Position;
+ }
+
+ public int position(int newPosition)
+ {
+ int prev = _buf.Position;
+ _buf.Position = newPosition;
+ return prev;
+ }
+
+ public void mark()
+ {
+ throw new NotImplementedException();
+ }
+
+ public static FixedByteBuffer allocateDirect(int capacity)
+ {
+ throw new NotImplementedException();
+ }
+
+ public static FixedByteBuffer allocate(int capacity)
+ {
+ return new FixedByteBuffer(capacity);
+ }
+
+ public void clear()
+ {
+ _buf.Clear();
+ }
+
+ public void put(byte b)
+ {
+ _buf.Put(b);
+ }
+
+ public void put(int index, byte b)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void put(FixedByteBuffer buf)
+ {
+ _buf.Put(buf.array(), buf.position(), buf.limit() - buf.position());
+ }
+
+ public FixedByteBuffer duplicate()
+ {
+ throw new NotImplementedException();
+ }
+
+ public FixedByteBuffer slice()
+ {
+ throw new NotImplementedException();
+ }
+
+ public FixedByteBuffer asReadOnlyBuffer()
+ {
+ throw new NotImplementedException();
+ }
+
+ /// <summary>
+ /// Returns backing array.
+ /// </summary>
+ /// <returns></returns>
+ public byte[] array()
+ {
+ return _buf.array();
+ }
+
+ public int arrayOffset()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void reset()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void flip()
+ {
+ _buf.Flip();
+ }
+
+ public void rewind()
+ {
+ _buf.Rewind();
+ }
+
+ public byte get()
+ {
+ return _buf.Get();
+ }
+
+ public byte get(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public short getShort()
+ {
+ return _buf.GetShort();
+ }
+
+ public short getShort(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putShort(short value)
+ {
+ _buf.Put(value);
+ }
+
+ public void putShort(int index, short value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public int getInt()
+ {
+ return _buf.GetInt();
+ }
+
+ public int getInt(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putInt(int value)
+ {
+ _buf.Put(value);
+ }
+
+ public void putInt(int index, int value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ByteBuffer get(byte[] dst, int offset, int length)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ByteBuffer put(byte[] src, int offset, int length)
+ {
+ throw new NotImplementedException();
+ }
+
+ public long getLong()
+ {
+ return _buf.GetLong();
+ }
+
+ public long getLong(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putLong(long value)
+ {
+ _buf.Put(value);
+ }
+
+ public void putLong(int index, long value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public int remaining()
+ {
+ return _buf.Remaining;
+ }
+
+ public float getFloat()
+ {
+ return _buf.GetFloat();
+ }
+
+ public float getFloat(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putFloat(float value)
+ {
+ _buf.Put(value);
+ }
+
+ public void putFloat(int index, float value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public double getDouble()
+ {
+ return _buf.GetDouble();
+ }
+
+ public double getDouble(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void putDouble(double value)
+ {
+ _buf.Put(value);
+ }
+
+ public void putDouble(int index, double value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ushort getUnsignedShort()
+ {
+ return _buf.GetUnsignedShort();
+ }
+
+ public uint getUnsignedInt()
+ {
+ return _buf.GetUnsignedInt();
+ }
+
+ public void get(byte[] dst)
+ {
+ _buf.Get(dst);
+ }
+
+ public void put(ushort value)
+ {
+ _buf.Put(value);
+ }
+
+ public void put(uint max)
+ {
+ _buf.Put(max);
+ }
+
+ public void put(ulong tag)
+ {
+ _buf.Put(tag);
+ }
+
+ public void put(byte[] src)
+ {
+ _buf.Put(src);
+ }
+
+ public ulong getUnsignedLong()
+ {
+ return _buf.GetUnsignedLong();
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs b/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
new file mode 100644
index 0000000000..add8e2e80c
--- /dev/null
+++ b/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+
+namespace Qpid.Buffer
+{
+ /// <summary>
+ ///
+ /// </summary>
+ public class HeapByteBuffer //: ByteBuffer
+ {
+ private byte[] _underlyingData;
+
+ /// <summary> The current position within the buffer where the next value to be read or written will occurr. </summary>
+ private int _position;
+
+ /// <summary> The index of the first element that should not be read or written. </summary>
+ private int _limit;
+
+ public HeapByteBuffer(int size) : this(new byte[size], 0)
+ {
+ }
+
+ private HeapByteBuffer(byte[] bytes, int length)
+ {
+ _underlyingData = bytes;
+ _limit = bytes.Length;
+ _position = length;
+ }
+
+ public /*override*/ int Capacity
+ {
+ get
+ {
+ return _underlyingData.Length;
+ }
+ }
+
+ public /*override*/ int Position
+ {
+ get
+ {
+ return _position;
+ }
+ set
+ {
+ _position = value;
+ }
+ }
+
+ /// <summary>
+ /// Sets this buffer's limit. If the position is larger than the new limit then it is set to the new limit.
+ /// </summary>
+ /// <value>The new limit value; must be non-negative and no larger than this buffer's capacity</value>
+ public /*override*/ int Limit
+ {
+ get
+ {
+ return _limit;
+ }
+ set
+ {
+ if (value < 0)
+ {
+ throw new ArgumentException("Limit must not be negative");
+ }
+ if (value > Capacity)
+ {
+ throw new ArgumentException("Limit must not be greater than Capacity");
+ }
+ _limit = value;
+ if (_position > value)
+ {
+ _position = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Returns the number of elements between the current position and the limit
+ /// </summary>
+ /// <value>The number of elements remaining in this buffer</value>
+ public /*override*/ int Remaining
+ {
+ get
+ {
+ return (_limit - _position);
+ }
+ }
+
+ public /*override*/ void Clear()
+ {
+ _position = 0;
+ _limit = Capacity;
+ }
+
+ public /*override*/ void Flip()
+ {
+ _limit = _position;
+ _position = 0;
+ }
+
+ public /*override*/ void Rewind()
+ {
+ _position = 0;
+ }
+
+ public byte[] array()
+ {
+ return _underlyingData;
+ }
+
+ public /*override*/ byte[] ToByteArray()
+ {
+ // Return copy of bytes remaining.
+ byte[] result = new byte[Remaining];
+ Array.Copy(_underlyingData, _position, result, 0, Remaining);
+ return result;
+ }
+
+ private void CheckSpace(int size)
+ {
+ if (_position + size > _limit)
+ {
+ throw new BufferOverflowException("Attempt to write " + size + " byte(s) to buffer where position is " + _position +
+ " and limit is " + _limit);
+ }
+ }
+
+ private void CheckSpaceForReading(int size)
+ {
+ if (_position + size > _limit)
+ {
+ throw new BufferUnderflowException("Attempt to read " + size + " byte(s) to buffer where position is " + _position +
+ " and limit is " + _limit);
+ }
+ }
+
+ /// <summary>
+ /// Writes the given byte into this buffer at the current position, and then increments the position.
+ /// </summary>
+ /// <param name="data">The byte to be written</param>
+ /// <exception cref="BufferOverflowException">If this buffer's current position is not smaller than its limit</exception>
+ public /*override*/ void Put(byte data)
+ {
+ CheckSpace(1);
+ _underlyingData[_position++] = data;
+ }
+
+ /// <summary>
+ /// Writes all the data in the given byte array into this buffer at the current
+ /// position and then increments the position.
+ /// </summary>
+ /// <param name="data">The data to copy.</param>
+ /// <exception cref="BufferOverflowException">If this buffer's current position plus the array length is not smaller than its limit</exception>
+ public /*override*/ void Put(byte[] data)
+ {
+ Put(data, 0, data.Length);
+ }
+
+ public /*override*/ void Put(byte[] data, int offset, int size)
+ {
+ if (data == null)
+ {
+ throw new ArgumentNullException("data");
+ }
+ CheckSpace(size);
+ Array.Copy(data, offset, _underlyingData, _position, size);
+ _position += size;
+ }
+
+ /// <summary>
+ /// Writes the given ushort into this buffer at the current position, and then increments the position.
+ /// </summary>
+ /// <param name="data">The ushort to be written</param>
+ public /*override*/ void Put(ushort data)
+ {
+ CheckSpace(2);
+ _underlyingData[_position++] = (byte) (data >> 8);
+ _underlyingData[_position++] = (byte) data;
+ }
+
+ public /*override*/ void Put(uint data)
+ {
+ CheckSpace(4);
+ _underlyingData[_position++] = (byte) (data >> 24);
+ _underlyingData[_position++] = (byte) (data >> 16);
+ _underlyingData[_position++] = (byte) (data >> 8);
+ _underlyingData[_position++] = (byte) data;
+ }
+
+ public /*override*/ void Put(ulong data)
+ {
+ CheckSpace(8);
+ _underlyingData[_position++] = (byte) (data >> 56);
+ _underlyingData[_position++] = (byte) (data >> 48);
+ _underlyingData[_position++] = (byte) (data >> 40);
+ _underlyingData[_position++] = (byte) (data >> 32);
+ _underlyingData[_position++] = (byte) (data >> 24);
+ _underlyingData[_position++] = (byte) (data >> 16);
+ _underlyingData[_position++] = (byte) (data >> 8);
+ _underlyingData[_position++] = (byte) data;
+ }
+
+ public void Put(short data)
+ {
+ Put((ushort)data);
+ }
+
+ public void Put(int data)
+ {
+ Put((uint)data);
+ }
+
+ public void Put(long data)
+ {
+ Put((ulong)data);
+ }
+
+ public void Put(float data)
+ {
+ unsafe
+ {
+ uint val = *((uint*)&data);
+ Put(val);
+ }
+ }
+
+ public void Put(double data)
+ {
+ unsafe
+ {
+ ulong val = *((ulong*)&data);
+ Put(val);
+ }
+ }
+
+
+ /// <summary>
+ /// Read the byte at the current position and increment the position
+ /// </summary>
+ /// <returns>a byte</returns>
+ /// <exception cref="BufferUnderflowException">if there are no bytes left to read</exception>
+ public /*override*/ byte Get()
+ {
+ CheckSpaceForReading(1);
+ return _underlyingData[_position++];
+ }
+
+ /// <summary>
+ /// Reads bytes from the buffer into the supplied array
+ /// </summary>
+ /// <param name="destination">The destination array. The array must not
+ /// be bigger than the remaining space in the buffer, nor can it be null.</param>
+ public /*override*/ void Get(byte[] destination)
+ {
+ if (destination == null)
+ {
+ throw new ArgumentNullException("destination");
+ }
+ int len = destination.Length;
+ CheckSpaceForReading(len);
+ Array.Copy(_underlyingData, _position, destination, 0, len);
+ _position += len;
+ }
+
+ /// <summary>
+ /// Reads and returns an unsigned short (two bytes, big endian) from this buffer
+ /// </summary>
+ /// <returns>an unsigned short</returns>
+ /// <exception cref="BufferUnderflowException">If there are fewer than two bytes remaining in this buffer</exception>
+ public /*override*/ ushort GetUnsignedShort()
+ {
+ CheckSpaceForReading(2);
+ byte upper = _underlyingData[_position++];
+ byte lower = _underlyingData[_position++];
+ return (ushort) ((upper << 8) + lower);
+ }
+
+ /// <summary>
+ /// Reads and returns an unsigned int (four bytes, big endian) from this buffer
+ /// </summary>
+ /// <returns>an unsigned integer</returns>
+ /// <exception cref="BufferUnderflowException">If there are fewer than four bytes remaining in this buffer</exception>
+ public /*override*/ uint GetUnsignedInt()
+ {
+ CheckSpaceForReading(4);
+ byte b1 = _underlyingData[_position++];
+ byte b2 = _underlyingData[_position++];
+ byte b3 = _underlyingData[_position++];
+ byte b4 = _underlyingData[_position++];
+ return (uint) ((b1 << 24) + (b2 << 16) + (b3 << 8) + b4);
+ }
+
+ public /*override*/ ulong GetUnsignedLong()
+ {
+ CheckSpaceForReading(8);
+ byte b1 = _underlyingData[_position++];
+ byte b2 = _underlyingData[_position++];
+ byte b3 = _underlyingData[_position++];
+ byte b4 = _underlyingData[_position++];
+ byte b5 = _underlyingData[_position++];
+ byte b6 = _underlyingData[_position++];
+ byte b7 = _underlyingData[_position++];
+ byte b8 = _underlyingData[_position++];
+ // all the casts necessary because otherwise each subexpression
+ // only gets promoted to uint and cause incorrect results
+ return (((ulong)b1 << 56) + ((ulong)b2 << 48) + ((ulong)b3 << 40) +
+ ((ulong)b4 << 32) + ((ulong)b5 << 24) +
+ ((ulong)b6 << 16) + ((ulong)b7 << 8) + b8);
+ }
+
+ public short GetShort()
+ {
+ return (short) GetUnsignedShort();
+ }
+
+ public int GetInt()
+ {
+ return (int) GetUnsignedInt();
+ }
+
+ public long GetLong()
+ {
+ return (long) GetUnsignedLong();
+ }
+
+ public float GetFloat()
+ {
+ unsafe
+ {
+ uint val = GetUnsignedInt();
+ return *((float*)&val);
+ }
+ }
+
+ public double GetDouble()
+ {
+ unsafe
+ {
+ ulong val = GetUnsignedLong();
+ return *((double*)&val);
+ }
+ }
+
+ public /*override*/ string GetString(uint length, Encoding encoder)
+ {
+ CheckSpaceForReading((int)length);
+ string result = encoder.GetString(_underlyingData, _position, (int)length);
+ _position += (int)length;
+ return result;
+ }
+
+ public /*override*/ void Acquire()
+ {
+ }
+
+ public /*override*/ void Release()
+ {
+ }
+
+ public /*override*/ bool IsAutoExpand
+ {
+ get { return false; }
+ set { }
+ }
+
+ public /*override*/ void Expand(int expectedRemaining)
+ {
+ throw new NotImplementedException();
+ }
+
+ public /*override*/ void Expand(int pos, int expectedRemaining)
+ {
+ throw new NotImplementedException();
+ }
+
+ public /*override*/ bool Pooled
+ {
+ get { return false; }
+ set { }
+ }
+
+ public void Mark()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Reset()
+ {
+ throw new NotImplementedException();
+ }
+
+ public /*override*/ byte Get(int index)
+ {
+ throw new NotImplementedException();
+ }
+
+// public /*override*/ void Put(ByteBuffer src)
+// {
+// if (src == this)
+// {
+// throw new ArgumentException("Cannot copy self into self!");
+// }
+//
+// HeapByteBuffer sb;
+// if (src is HeapByteBuffer)
+// {
+// sb = (HeapByteBuffer) src;
+// }
+// else
+// {
+// sb = (HeapByteBuffer)((RefCountingByteBuffer) src).Buf;
+// }
+// int n = sb.Remaining;
+// if (n > Remaining)
+// {
+// throw new BufferOverflowException("Not enought capacity in this buffer for " + n + " elements - only " + Remaining + " remaining");
+// }
+// Array.Copy(sb._underlyingData, sb._position, _underlyingData, _position, n);
+// sb._position += n;
+// _position += n;
+// }
+
+ public /*override*/ void Compact()
+ {
+ if (Remaining > 0)
+ {
+ if (_position > 0)
+ {
+ Array.Copy(_underlyingData, _position, _underlyingData, 0, Remaining);
+ }
+ _position = Remaining;
+ }
+ else
+ {
+ _position = 0;
+ }
+ _limit = Capacity;
+ }
+
+ public static HeapByteBuffer wrap(byte[] bytes, int length)
+ {
+ return new HeapByteBuffer(bytes, length);
+ }
+
+ public static HeapByteBuffer wrap(byte[] bytes)
+ {
+ return new HeapByteBuffer(bytes, bytes.Length);
+ }
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs b/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs
index fe44bc8639..2700c4afb2 100644
--- a/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs
+++ b/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs
@@ -251,11 +251,9 @@ namespace Qpid.Client.Tests
}
}
- /// <summary>
- /// Returns a field table containing patterns to match the test header exchange against.
- /// </summary>
+ /// <summary> Returns a field table containing patterns to match the test header exchange against. </summary>
///
- /// <returns>A field table containing test patterns.</returns>
+ /// <returns> A field table containing test patterns. </returns>
private FieldTable CreatePatternAsFieldTable()
{
FieldTable matchTable = new FieldTable();
diff --git a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
index 7fb96536a7..3d5af00887 100644
--- a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
+++ b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
@@ -49,6 +49,8 @@
<Compile Include="failover\FailoverTest.cs" />
<Compile Include="failover\FailoverTxTest.cs" />
<Compile Include="HeadersExchange\HeadersExchangeTest.cs" />
+ <Compile Include="interop\TopicListener.cs" />
+ <Compile Include="interop\TopicPublisher.cs" />
<Compile Include="MultiConsumer\ProducerMultiConsumer.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Common\BaseMessagingTestFixture.cs" />
@@ -96,4 +98,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project> \ No newline at end of file
diff --git a/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs b/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs
new file mode 100644
index 0000000000..db9b1a4650
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Qpid.Messaging;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Tests.interop
+{
+ public class TopicListener
+ {
+ private static ILog log = LogManager.GetLogger(typeof(TopicListener));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
+ public static string CONTROL_ROUTING_KEY = "topic_control";
+
+ /// <summary> Holds the routing key for the queue to send reports to. </summary>
+ public static string RESPONSE_ROUTING_KEY = "response";
+
+ /// <summary> Holds the connection to listen on. </summary>
+ private IConnection connection;
+
+ /// <summary> Holds the channel for all test messages.</summary>
+ private IChannel channel;
+
+ /// <summary> Holds the producer to send report messages on. </summary>
+ private IMessagePublisher publisher;
+
+ /// <summary> Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. </summary> */
+ private bool init;
+
+ /// <summary> Holds the count of messages received by this listener. </summary> */
+ private int count;
+
+ /// <summary> Creates a topic listener using the specified broker URL. </summary>
+ ///
+ /// <param name="connectionUri">The broker URL to listen on.</param>
+ TopicListener(string connectionUri)
+ {
+ log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+ connection = new AMQConnection(connectionInfo);
+
+ // Establish a session on the broker.
+ channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Set up a queue to listen for test messages on.
+ string topicQueueName = channel.GenerateUniqueName();
+ channel.DeclareQueue(topicQueueName, false, true, true);
+
+ // Set this listener up to listen for incoming messages on the test topic queue.
+ channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY);
+ IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName)
+ .Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+
+ // Set up this listener with a producer to send the reports on.
+ publisher = channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.DIRECT)
+ .WithRoutingKey(RESPONSE_ROUTING_KEY)
+ .Create();
+
+ connection.Start();
+ Console.WriteLine("Waiting for messages...");
+ }
+
+ public static void Main(String[] argv)
+ {
+ // Create an instance of this listener with the command line parameters.
+ new TopicListener(DEFAULT_URI);
+ }
+
+ /// <summary>
+ /// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
+ /// shutdown messages result in this listener being terminated.
+ /// </summary>
+ ///
+ /// <param name="message">The received message.</param>
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void onMessage(Message message = " + message + "): called");
+
+ // Take the start time of the first message if this is the first message.
+ if (!init)
+ {
+ count = 0;
+ init = true;
+ }
+
+ // Check if the message is a control message telling this listener to shut down.
+ if (IsShutdown(message))
+ {
+ log.Debug("Got a shutdown message.");
+ Shutdown();
+ }
+ // Check if the message is a report request message asking this listener to respond with the message count.
+ else if (IsReport(message))
+ {
+ log.Debug("Got a report request message.");
+
+ // Send the message count report.
+ SendReport();
+
+ // Reset the initialization flag so that the next message is considered to be the first.
+ init = false;
+ }
+ // Otherwise it is an ordinary test message, so increment the message count.
+ else
+ {
+ count++;
+ }
+ }
+
+ /// <summary> Checks a message to see if it is a shutdown control message. </summary>
+ ///
+ /// <param name="m">The message to check.</param>
+ ///
+ /// <returns><tt>true</tt> if it is a shutdown control message, <tt>false</tt> otherwise.</returns>
+ private bool IsShutdown(IMessage m)
+ {
+ bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ //log.Debug("isShutdown = " + result);
+
+ return result;
+ }
+
+ /// <summary> Checks a message to see if it is a report request control message. </summary>
+ ///
+ /// <param name="m">The message to check.</param>
+ ///
+ /// <returns><tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.</returns>
+ private bool IsReport(IMessage m)
+ {
+ bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST");
+
+ //log.Debug("isReport = " + result);
+
+ return result;
+ }
+
+ /// <summary> Checks whether or not a text field on a message has the specified value. </summary>
+ ///
+ /// <param name="e">The message to check.</param>
+ /// <param name="e">The name of the field to check.</param>
+ /// <param name="e">The expected value of the field to compare with.</param>
+ ///
+ /// <returns> <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. </returns>
+ private static bool CheckTextField(IMessage m, string fieldName, string value)
+ {
+ /*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ + ", String value = " + value + "): called");*/
+
+ string comp = m.Headers.GetString(fieldName);
+
+ return (comp != null) && comp == value;
+ }
+
+ /// <summary> Stops the message consumer and closes the connection. </summary>
+ private void Shutdown()
+ {
+ connection.Stop();
+ channel.Dispose();
+ connection.Dispose();
+ }
+
+ /// <summary> Sends the report message to the response location. </summary>
+ private void SendReport()
+ {
+ string report = "Received " + count + ".";
+
+ IMessage reportMessage = channel.CreateTextMessage(report);
+
+ reportMessage.Headers.SetBoolean("BOOLEAN", false);
+ //reportMessage.Headers.SetByte("BYTE", 5);
+ reportMessage.Headers.SetDouble("DOUBLE", 3.141);
+ reportMessage.Headers.SetFloat("FLOAT", 1.0f);
+ reportMessage.Headers.SetInt("INT", 1);
+ reportMessage.Headers.SetLong("LONG", 1);
+ reportMessage.Headers.SetString("STRING", "hello");
+ reportMessage.Headers.SetShort("SHORT", 2);
+
+ publisher.Send(reportMessage);
+
+ Console.WriteLine("Sent report: " + report);
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs b/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
new file mode 100644
index 0000000000..5004bb28d2
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
@@ -0,0 +1,188 @@
+using System;
+using System.Threading;
+using log4net;
+using Qpid.Messaging;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Tests.interop
+{
+ public class TopicPublisher
+ {
+ private static ILog log = LogManager.GetLogger(typeof(TopicPublisher));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
+ const int TIMEOUT = 10000;
+
+ /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
+ const string CONTROL_ROUTING_KEY = "topic_control";
+
+ /// <summary> Holds the routing key for the queue to send reports to. </summary>
+ const string RESPONSE_ROUTING_KEY = "response";
+
+ /// <summary> Holds the number of messages to send in each test run. </summary>
+ private int numMessages;
+
+ /// <summary> Holds the number of subscribers listening to the messsages. </summary>
+ private int numSubscribers;
+
+ /// <summary> A monitor used to wait for all reports to arrive back from consumers on. </summary>
+ private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false);
+
+ /// <summary> Holds the connection to listen on. </summary>
+ private IConnection connection;
+
+ /// <summary> Holds the channel for all test messages.</summary>
+ private IChannel channel;
+
+ /// <summary> Holds the producer to send test messages on. </summary>
+ private IMessagePublisher publisher;
+
+ /// <summary>
+ /// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
+ /// subscribers.
+ /// </summary>
+ ///
+ /// <param name="connectionUri">The broker URL.</param>
+ /// <param name="numMessages">The number of messages to send in each test.</param>
+ /// <param name="numSubscribers">The number of subscribes that are expected to reply with a report.</param>
+ TopicPublisher(string connectionUri, int numMessages, int numSubscribers)
+ {
+ log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages +
+ ", int numSubscribers = " + numSubscribers + "): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+ connection = new AMQConnection(connectionInfo);
+
+ // Establish a session on the broker.
+ channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Set up a queue to listen for reports on.
+ string responseQueueName = channel.GenerateUniqueName();
+ channel.DeclareQueue(responseQueueName, false, true, true);
+
+ // Set this listener up to listen for reports on the response queue.
+ channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY);
+ //channel.Bind(responseQueueName, "<<default>>", RESPONSE_ROUTING_KEY);
+ IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)
+ .Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+
+ // Set up this listener with a producer to send the test messages and report requests on.
+ publisher = channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(CONTROL_ROUTING_KEY)
+ .Create();
+
+ // Keep the test parameters.
+ this.numMessages = numMessages;
+ this.numSubscribers = numSubscribers;
+
+ connection.Start();
+ Console.WriteLine("Sending messages and waiting for reports...");
+ }
+
+ /// <summary>
+ /// Start a test subscriber. The broker URL must be specified as the first command line argument.
+ /// </summary>
+ ///
+ /// <param name="argv">The command line arguments, broker URL first.</param>
+ public static void Main(String[] argv)
+ {
+ // Create an instance of this publisher with the command line parameters.
+ TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1);
+
+ // Publish the test messages.
+ publisher.DoTest();
+ }
+
+ /// <summary>
+ /// Sends the test messages and waits for all subscribers to reply with a report.
+ /// </summary>
+ public void DoTest()
+ {
+ log.Debug("public void DoTest(): called");
+
+ // Create a test message to send.
+ IMessage testMessage = channel.CreateTextMessage("test");
+
+ // Send the desired number of test messages.
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.Send(testMessage);
+ }
+
+ log.Debug("Sent " + numMessages + " test messages.");
+
+ // Send the report request.
+ IMessage reportRequestMessage = channel.CreateTextMessage("Report request message.");
+ reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST";
+
+ reportRequestMessage.Headers.SetBoolean("BOOLEAN", false);
+ //reportRequestMessage.Headers.SetByte("BYTE", 5);
+ reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141);
+ reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f);
+ reportRequestMessage.Headers.SetInt("INT", 1);
+ reportRequestMessage.Headers.SetLong("LONG", 1);
+ reportRequestMessage.Headers.SetString("STRING", "hello");
+ reportRequestMessage.Headers.SetShort("SHORT", 2);
+
+ publisher.Send(reportRequestMessage);
+
+ log.Debug("Sent the report request message, waiting for all replies...");
+
+ // Wait until all the reports come in.
+ allReportsReceivedEvt.WaitOne(TIMEOUT, true);
+
+ // Check if all reports were really received or if the timeout occurred.
+ if (numSubscribers == 0)
+ {
+ log.Debug("Got all reports.");
+ }
+ else
+ {
+ log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + ".");
+ }
+
+ // Send the termination request.
+ IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message.");
+ terminationRequestMessage.Headers["TYPE"] = "TERMINATION_REQUEST";
+ publisher.Send(terminationRequestMessage);
+
+ log.Debug("Sent the termination request message.");
+
+ // Close all message producers and consumers and the connection to the broker.
+ Shutdown();
+ }
+
+ /// <summary>
+ /// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
+ /// zero, at which time waiting threads are notified of this event.
+ /// </summary>
+ ///
+ /// <param name="message">The received report message.</param>
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void OnMessage(IMessage message = " + message + "): called");
+
+ // Decrement the count of expected messages and release the wait monitor when this becomes zero.
+ if (--numSubscribers == 0)
+ {
+ log.Debug("Got reports from all subscribers.");
+ allReportsReceivedEvt.Set();
+ }
+ }
+
+ /// <summary> Stops the message consumers and closes the connection. </summary>
+ private void Shutdown()
+ {
+ connection.Stop();
+ publisher.Dispose();
+ channel.Dispose();
+ connection.Dispose();
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client.Tests/log4net.config b/qpid/dotnet/Qpid.Client.Tests/log4net.config
index 71096de248..e5340a1500 100644
--- a/qpid/dotnet/Qpid.Client.Tests/log4net.config
+++ b/qpid/dotnet/Qpid.Client.Tests/log4net.config
@@ -3,6 +3,7 @@
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%d [%t] %-5p %c:%M(%L) - %m%n" />
</layout>
+ <threshold value="info"/>
</appender>
<appender name="filelog" type="log4net.Appender.FileAppender">
@@ -27,6 +28,16 @@
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date - %message%newline"/>
</layout>
+ <threshold value="info"/>
+ </appender>
+
+ <appender name="UdpAppender" type="log4net.Appender.UdpAppender">
+ <remoteAddress value="127.0.0.1" />
+ <remotePort value="4445" />
+ <layout type="log4net.Layout.XmlLayoutSchemaLog4j">
+ <locationInfo value="true" />
+ </layout>
+ <threshold value="debug"/>
</appender>
<logger name="Qpid.Client.ProtocolChannel.Tracing" additivity="false">
@@ -38,10 +49,10 @@
<level value="info" />
<appender-ref ref="ioLog"/>
</logger>
-
+
<root>
- <level value="info"/>
<appender-ref ref="console"/>
+ <appender-ref ref="UdpAppender"/>
<appender-ref ref="filelog"/>
</root>
</log4net> \ No newline at end of file
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
index 1a342f0b15..a59670ef5a 100644
--- a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -127,9 +127,9 @@ namespace Qpid.Client
{
throw new ArgumentException("ConnectionInfo must be specified");
}
- _log.Info("ConnectionInfo: " + connectionInfo);
+ _log.Debug("ConnectionInfo: " + connectionInfo);
_connectionInfo = connectionInfo;
- _log.Info("password = " + _connectionInfo.Password);
+ _log.Debug("password = " + _connectionInfo.Password);
_failoverPolicy = new FailoverPolicy(connectionInfo);
// We are not currently connected.
@@ -141,7 +141,7 @@ namespace Qpid.Client
try
{
IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo();
- _log.Info("Connecting to " + brokerInfo);
+ _log.Debug("Connecting to " + brokerInfo);
MakeBrokerConnection(brokerInfo);
break;
}
@@ -185,7 +185,7 @@ namespace Qpid.Client
foreach (Type type in assembly.GetTypes())
{
- _log.Info(String.Format("type = {0}", type));
+ _log.Debug(String.Format("type = {0}", type));
}
Type transport = assembly.GetType(transportType);
@@ -197,13 +197,13 @@ namespace Qpid.Client
}
- _log.Info("transport = " + transport);
- _log.Info("ctors = " + transport.GetConstructors());
+ _log.Debug("transport = " + transport);
+ _log.Debug("ctors = " + transport.GetConstructors());
ConstructorInfo info = transport.GetConstructors()[0];
ITransport result = (ITransport)info.Invoke(new object[] { host, port, this });
- _log.Info("transport = " + result);
+ _log.Debug("transport = " + result);
return result;
}*/
@@ -441,11 +441,11 @@ namespace Qpid.Client
/// </summary>
private void CloseAllSessions(Exception cause)
{
- _log.Info("Closing all session in connection " + this);
+ _log.Debug("Closing all session in connection " + this);
ICollection sessions = new ArrayList(_sessions.Values);
foreach (AmqChannel channel in sessions)
{
- _log.Info("Closing channel " + channel);
+ _log.Debug("Closing channel " + channel);
if (cause != null)
{
channel.ClosedWithException(cause);
@@ -462,7 +462,7 @@ namespace Qpid.Client
}
}
}
- _log.Info("Done closing all sessions in connection " + this);
+ _log.Debug("Done closing all sessions in connection " + this);
}
public int MaximumChannelCount
@@ -685,7 +685,7 @@ namespace Qpid.Client
}
catch (Exception e)
{
- _log.Info("Unable to connect to broker at " + bd, e);
+ _log.Debug("Unable to connect to broker at " + bd, e);
AttemptReconnection();
}
return false;
@@ -747,11 +747,11 @@ namespace Qpid.Client
{
if (!(e is AMQException))
{
- _log.Info("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e);
+ _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e);
}
else
{
- _log.Info(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo());
+ _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo());
}
}
}
@@ -767,7 +767,7 @@ namespace Qpid.Client
public void ResubscribeChannels()
{
ArrayList channels = new ArrayList(_sessions.Values);
- _log.Info(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count));
+ _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count));
foreach (AmqChannel channel in channels)
{
_protocolSession.AddSessionByChannel(channel.ChannelId, channel);
@@ -778,7 +778,7 @@ namespace Qpid.Client
private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted)
{
- _log.Info(string.Format("Reopening channel id={0} prefetch={1} transacted={2}",
+ _log.Debug(string.Format("Reopening channel id={0} prefetch={1} transacted={2}",
channelId, prefetch, transacted));
try
{
@@ -843,7 +843,7 @@ namespace Qpid.Client
// TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker?
_protocolWriter.Write(HeartbeatBody.FRAME);
}
- _log.Info("Heatbeat thread stopped");
+ _log.Debug("Heatbeat thread stopped");
}
public void Stop()
@@ -854,7 +854,7 @@ namespace Qpid.Client
public void StartHeartBeatThread(int heartbeatSeconds)
{
- _log.Info("Starting new heartbeat thread");
+ _log.Debug("Starting new heartbeat thread");
_heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000);
_heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run));
_heartBeatThread.Name = "HeartBeat";
@@ -865,7 +865,7 @@ namespace Qpid.Client
{
if (_heartBeatRunner != null)
{
- _log.Info("Stopping old heartbeat thread");
+ _log.Debug("Stopping old heartbeat thread");
_heartBeatRunner.Stop();
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
index 9855416db5..07650c170b 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -33,9 +33,9 @@ namespace Qpid.Client
{
public class AmqChannel : Closeable, IChannel
{
- private const int BASIC_CONTENT_TYPE = 60;
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
- private static readonly ILog _logger = LogManager.GetLogger(typeof (AmqChannel));
+ private const int BASIC_CONTENT_TYPE = 60;
private static int _nextSessionNumber = 0;
@@ -110,7 +110,7 @@ namespace Qpid.Client
DispatchMessage(message);
}
- _logger.Info("Dispatcher thread terminating for channel " + _containingChannel._channelId);
+ _logger.Debug("Dispatcher thread terminating for channel " + _containingChannel._channelId);
}
private void DispatchMessage(UnprocessedMessage message)
@@ -405,14 +405,14 @@ namespace Qpid.Client
/// </summary>
private void CloseProducers()
{
- _logger.Info("Closing producers on session " + this);
+ _logger.Debug("Closing producers on session " + this);
// we need to clone the list of producers since the close() method updates the _producers collection
// which would result in a concurrent modification exception
ArrayList clonedProducers = new ArrayList(_producers.Values);
foreach (BasicMessageProducer prod in clonedProducers)
{
- _logger.Info("Closing producer " + prod);
+ _logger.Debug("Closing producer " + prod);
prod.Close();
}
// at this point the _producers map is empty
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
index a93d2db675..759ffd62e3 100644
--- a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -196,7 +196,7 @@ namespace Qpid.Client
public override void Close()
{
- _logger.Info("Closing producer " + this);
+ _logger.Debug("Closing producer " + this);
Interlocked.Exchange(ref _closed, CLOSED);
_channel.DeregisterProducer(_producerId);
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
index 8b276c09e9..41e9d2240c 100644
--- a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
@@ -42,7 +42,7 @@ namespace Qpid.Client.Handler
parameters = new ConnectionTuneParameters();
}
- _logger.Info(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
+ _logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
parameters.FrameMax = frame.FrameMax;
parameters.FrameMax = 65535;
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
index 05f673d520..7be17a1080 100644
--- a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
@@ -122,7 +122,7 @@ namespace Qpid.Client.State
{
if (InfoLoggingHack)
{
- _logger.Info("State changing to " + newState + " from old state " + _currentState);
+ _logger.Debug("State changing to " + newState + " from old state " + _currentState);
}
_logger.Debug("State changing to " + newState + " from old state " + _currentState);
AMQState oldState = _currentState;
diff --git a/qpid/dotnet/Qpid.Messaging/IHeaders.cs b/qpid/dotnet/Qpid.Messaging/IHeaders.cs
index 1a0d187c67..aa2d0278f7 100644
--- a/qpid/dotnet/Qpid.Messaging/IHeaders.cs
+++ b/qpid/dotnet/Qpid.Messaging/IHeaders.cs
@@ -20,6 +20,17 @@
*/
namespace Qpid.Messaging
{
+ /// <summary>
+ /// IHeaders represents the header fields of an AMQ message and provides methods to access those fields. There are accessor methods to
+ /// get and set each header field for each supported header field data type.
+ ///
+ /// <para/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th>Responsibilities</th></tr>
+ /// <tr><td>Provide accessors for all supported header field types.</td></tr>
+ /// <tr><td>Check if a set of headers contains a named property.</td></tr>
+ /// </table>
+ ///
+ /// </summary>
public interface IHeaders
{
bool Contains(string name);
diff --git a/qpid/dotnet/Qpid.NET.sln b/qpid/dotnet/Qpid.NET.sln
index 5d302a5a68..70ae637b23 100644
--- a/qpid/dotnet/Qpid.NET.sln
+++ b/qpid/dotnet/Qpid.NET.sln
@@ -20,6 +20,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Qpid.Sasl.Tests", "Qpid.Sas
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Qpid.Buffer.Tests", "Qpid.Buffer.Tests\Qpid.Buffer.Tests.csproj", "{74640962-99D0-4D06-B57A-9CD66517CF52}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicPublisher", "TopicPublisher\TopicPublisher.csproj", "{A06C9FFD-22FF-4654-856D-897C230978AF}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicListener", "TopicListener\TopicListener.csproj", "{9A112DF2-146F-4CF4-919B-9D3BE7D088E9}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -66,6 +70,14 @@ Global
{74640962-99D0-4D06-B57A-9CD66517CF52}.Debug|Any CPU.Build.0 = Debug|Any CPU
{74640962-99D0-4D06-B57A-9CD66517CF52}.Release|Any CPU.ActiveCfg = Release|Any CPU
{74640962-99D0-4D06-B57A-9CD66517CF52}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A06C9FFD-22FF-4654-856D-897C230978AF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A06C9FFD-22FF-4654-856D-897C230978AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A06C9FFD-22FF-4654-856D-897C230978AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A06C9FFD-22FF-4654-856D-897C230978AF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9A112DF2-146F-4CF4-919B-9D3BE7D088E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9A112DF2-146F-4CF4-919B-9D3BE7D088E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9A112DF2-146F-4CF4-919B-9D3BE7D088E9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9A112DF2-146F-4CF4-919B-9D3BE7D088E9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/qpid/dotnet/TopicListener/Program.cs b/qpid/dotnet/TopicListener/Program.cs
new file mode 100644
index 0000000000..c969f4d736
--- /dev/null
+++ b/qpid/dotnet/TopicListener/Program.cs
@@ -0,0 +1,10 @@
+namespace TopicListener
+{
+ class Program
+ {
+ static void Main(string[] args)
+ {
+ Qpid.Client.Tests.interop.TopicListener.Main(args);
+ }
+ }
+}
diff --git a/qpid/dotnet/TopicListener/Properties/AssemblyInfo.cs b/qpid/dotnet/TopicListener/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000000..8672df7059
--- /dev/null
+++ b/qpid/dotnet/TopicListener/Properties/AssemblyInfo.cs
@@ -0,0 +1,33 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("TopicListener")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("J.P. Morgan Chase & Co.")]
+[assembly: AssemblyProduct("TopicListener")]
+[assembly: AssemblyCopyright("Copyright © J.P. Morgan Chase & Co. 2007")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("1c2db1cd-239f-495a-b6b4-c815ea534489")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/qpid/dotnet/TopicListener/TopicListener.csproj b/qpid/dotnet/TopicListener/TopicListener.csproj
new file mode 100644
index 0000000000..e049688b77
--- /dev/null
+++ b/qpid/dotnet/TopicListener/TopicListener.csproj
@@ -0,0 +1,89 @@
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.50727</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{9A112DF2-146F-4CF4-919B-9D3BE7D088E9}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>TopicListener</RootNamespace>
+ <AssemblyName>TopicListener</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Program.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Buffer.Tests\Qpid.Buffer.Tests.csproj">
+ <Project>{74640962-99D0-4D06-B57A-9CD66517CF52}</Project>
+ <Name>Qpid.Buffer.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Buffer\Qpid.Buffer.csproj">
+ <Project>{44384DF2-B0A4-4580-BDBC-EE4BAA87D995}</Project>
+ <Name>Qpid.Buffer</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Client.Tests\Qpid.Client.Tests.csproj">
+ <Project>{BA1B0032-4CE6-40DD-A2DC-119F0FFA0A1D}</Project>
+ <Name>Qpid.Client.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Client\Qpid.Client.csproj">
+ <Project>{68987C05-3768-452C-A6FC-6BA1D372852F}</Project>
+ <Name>Qpid.Client</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Codec\Qpid.Codec.csproj">
+ <Project>{22D0D0C2-77AF-4DE3-B456-7FF3893F9F88}</Project>
+ <Name>Qpid.Codec</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common.Tests\Qpid.Common.Tests.csproj">
+ <Project>{F83624B0-762B-4D82-900D-FF4C1B36E36E}</Project>
+ <Name>Qpid.Common.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common\Qpid.Common.csproj">
+ <Project>{77064C42-24D2-4CEB-9EA2-0EF481A43205}</Project>
+ <Name>Qpid.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Messaging\Qpid.Messaging.csproj">
+ <Project>{6688F826-C58E-4C1B-AA1F-22AFAB4B7D07}</Project>
+ <Name>Qpid.Messaging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Sasl.Tests\Qpid.Sasl.Tests.csproj">
+ <Project>{587B3520-EBB9-41ED-B019-E96116B651CE}</Project>
+ <Name>Qpid.Sasl.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Sasl\Qpid.Sasl.csproj">
+ <Project>{1465B0EE-6452-42A6-AB73-B2F9EABEEE75}</Project>
+ <Name>Qpid.Sasl</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project> \ No newline at end of file
diff --git a/qpid/dotnet/TopicPublisher/Program.cs b/qpid/dotnet/TopicPublisher/Program.cs
new file mode 100644
index 0000000000..9206466cf7
--- /dev/null
+++ b/qpid/dotnet/TopicPublisher/Program.cs
@@ -0,0 +1,10 @@
+namespace TopicPublisher
+{
+ class Program
+ {
+ static void Main(string[] args)
+ {
+ Qpid.Client.Tests.interop.TopicPublisher.Main(args);
+ }
+ }
+}
diff --git a/qpid/dotnet/TopicPublisher/Properties/AssemblyInfo.cs b/qpid/dotnet/TopicPublisher/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000000..85442960ed
--- /dev/null
+++ b/qpid/dotnet/TopicPublisher/Properties/AssemblyInfo.cs
@@ -0,0 +1,33 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("TopicPublisher")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("J.P. Morgan Chase & Co.")]
+[assembly: AssemblyProduct("TopicPublisher")]
+[assembly: AssemblyCopyright("Copyright © J.P. Morgan Chase & Co. 2007")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("93fa1c32-c0f8-47e5-b167-dc581e33eb9b")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/qpid/dotnet/TopicPublisher/TopicPublisher.csproj b/qpid/dotnet/TopicPublisher/TopicPublisher.csproj
new file mode 100644
index 0000000000..3d5350ca27
--- /dev/null
+++ b/qpid/dotnet/TopicPublisher/TopicPublisher.csproj
@@ -0,0 +1,85 @@
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.50727</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{A06C9FFD-22FF-4654-856D-897C230978AF}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>TopicPublisher</RootNamespace>
+ <AssemblyName>TopicPublisher</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Program.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Buffer\Qpid.Buffer.csproj">
+ <Project>{44384DF2-B0A4-4580-BDBC-EE4BAA87D995}</Project>
+ <Name>Qpid.Buffer</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Client.Tests\Qpid.Client.Tests.csproj">
+ <Project>{BA1B0032-4CE6-40DD-A2DC-119F0FFA0A1D}</Project>
+ <Name>Qpid.Client.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Client\Qpid.Client.csproj">
+ <Project>{68987C05-3768-452C-A6FC-6BA1D372852F}</Project>
+ <Name>Qpid.Client</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Codec\Qpid.Codec.csproj">
+ <Project>{22D0D0C2-77AF-4DE3-B456-7FF3893F9F88}</Project>
+ <Name>Qpid.Codec</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common.Tests\Qpid.Common.Tests.csproj">
+ <Project>{F83624B0-762B-4D82-900D-FF4C1B36E36E}</Project>
+ <Name>Qpid.Common.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common\Qpid.Common.csproj">
+ <Project>{77064C42-24D2-4CEB-9EA2-0EF481A43205}</Project>
+ <Name>Qpid.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Messaging\Qpid.Messaging.csproj">
+ <Project>{6688F826-C58E-4C1B-AA1F-22AFAB4B7D07}</Project>
+ <Name>Qpid.Messaging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Sasl.Tests\Qpid.Sasl.Tests.csproj">
+ <Project>{587B3520-EBB9-41ED-B019-E96116B651CE}</Project>
+ <Name>Qpid.Sasl.Tests</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Sasl\Qpid.Sasl.csproj">
+ <Project>{1465B0EE-6452-42A6-AB73-B2F9EABEEE75}</Project>
+ <Name>Qpid.Sasl</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project> \ No newline at end of file