summaryrefslogtreecommitdiff
path: root/qpid/dotnet
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-09-12 07:34:44 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-09-12 07:34:44 +0000
commita95b0040821ed7db6cff873f934c5153c0675e6c (patch)
treea8ffd63407b927f92e70c1a2b745572abd7e54bb /qpid/dotnet
parent2aca173111324ad5dedcd5a30ac1b8bea44b390c (diff)
downloadqpid-python-a95b0040821ed7db6cff873f934c5153c0675e6c.tar.gz
qpid-1277: fixed large messages issue; added more tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@694628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet')
-rw-r--r--qpid/dotnet/client-010/client/transport/network/Assembler.cs31
-rw-r--r--qpid/dotnet/client-010/client/transport/network/Disassembler.cs14
-rw-r--r--qpid/dotnet/client-010/client/transport/network/IIoSender.cs28
-rw-r--r--qpid/dotnet/client-010/client/transport/network/InputHandler.cs25
-rw-r--r--qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs1
-rw-r--r--qpid/dotnet/client-010/client/transport/network/io/IoSender.cs25
-rw-r--r--qpid/dotnet/client-010/test/interop/Message.cs181
-rw-r--r--qpid/dotnet/client-010/test/interop/TestCase.cs8
8 files changed, 264 insertions, 49 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/Assembler.cs b/qpid/dotnet/client-010/client/transport/network/Assembler.cs
index 89bd592f96..65d9061e0a 100644
--- a/qpid/dotnet/client-010/client/transport/network/Assembler.cs
+++ b/qpid/dotnet/client-010/client/transport/network/Assembler.cs
@@ -35,7 +35,7 @@ namespace org.apache.qpid.transport.network
public class Assembler : NetworkDelegate, Receiver<ReceivedPayload<ProtocolEvent>>
{
private static readonly Logger log = Logger.get(typeof (Assembler));
- private readonly Dictionary<int, List<Frame>> segments;
+ private readonly Dictionary<int, List<byte[]>> segments;
private readonly Method[] incomplete;
[ThreadStatic] static MSDecoder _decoder;
private readonly Object m_objectLock = new object();
@@ -101,7 +101,7 @@ namespace org.apache.qpid.transport.network
public Assembler()
{
- segments = new Dictionary<int, List<Frame>>();
+ segments = new Dictionary<int, List<byte[]>>();
incomplete = new Method[64*1024];
}
@@ -133,27 +133,28 @@ namespace org.apache.qpid.transport.network
}
else
{
- List<Frame> frames;
+ List<byte[]> frames;
if (frame.isFirstFrame())
{
- frames = new List<Frame>();
+ frames = new List<byte[]>();
setSegment(frame, frames);
}
else
{
frames = getSegment(frame);
}
-
- frames.Add(frame);
+ byte[] tmp = new byte[frame.BodySize];
+ frame.Body.Read(tmp, 0, tmp.Length);
+ frames.Add(tmp);
if (frame.isLastFrame())
{
clearSegment(frame);
segment = new MemoryStream();
BinaryWriter w = new BinaryWriter(segment);
- foreach (Frame f in frames)
+ foreach (byte[] f in frames)
{
- w.Write(f.Body.ToArray());
+ w.Write(f);
}
assemble(frame, segment);
}
@@ -219,13 +220,9 @@ namespace org.apache.qpid.transport.network
}
break;
case SegmentType.BODY:
- command = incomplete[channel];
- byte[] b = new byte[frame.BodySize];
- MemoryStream body = new MemoryStream();
- segment.Read(b, 0, b.Length);
- body.Write(b, 0, b.Length);
- body.Seek(0, SeekOrigin.Begin);
- command.Body = body;
+ command = incomplete[channel];
+ segment.Seek(0, SeekOrigin.Begin);
+ command.Body = segment;
incomplete[channel] = null;
Emit(channel, command);
break;
@@ -239,12 +236,12 @@ namespace org.apache.qpid.transport.network
return (frame.Track + 1)*frame.Channel;
}
- private List<Frame> getSegment(Frame frame)
+ private List<byte[]> getSegment(Frame frame)
{
return segments[segmentKey(frame)];
}
- private void setSegment(Frame frame, List<Frame> segment)
+ private void setSegment(Frame frame, List<byte[]> segment)
{
int key = segmentKey(frame);
if (segments.ContainsKey(key))
diff --git a/qpid/dotnet/client-010/client/transport/network/Disassembler.cs b/qpid/dotnet/client-010/client/transport/network/Disassembler.cs
index c1e6744f2c..4cf16a98fe 100644
--- a/qpid/dotnet/client-010/client/transport/network/Disassembler.cs
+++ b/qpid/dotnet/client-010/client/transport/network/Disassembler.cs
@@ -30,7 +30,7 @@ namespace org.apache.qpid.transport.network
/// </summary>
public sealed class Disassembler : Sender<ProtocolEvent>, ProtocolDelegate<Object>
{
- private readonly Sender<MemoryStream> _sender;
+ private readonly IIOSender<MemoryStream> _sender;
private readonly int _maxPayload;
private readonly MemoryStream _header;
private readonly BinaryWriter _writer;
@@ -38,7 +38,7 @@ namespace org.apache.qpid.transport.network
[ThreadStatic] static MSEncoder _encoder;
- public Disassembler(Sender<MemoryStream> sender, int maxFrame)
+ public Disassembler(IIOSender<MemoryStream> sender, int maxFrame)
{
if (maxFrame <= Frame.HEADER_SIZE || maxFrame >= 64*1024)
{
@@ -120,8 +120,8 @@ namespace org.apache.qpid.transport.network
_writer.Write((byte)0);
_writer.Write((byte)0);
_sender.send(_header);
- _header.Seek(0, SeekOrigin.Begin);
- _sender.send(buf);
+ _header.Seek(0, SeekOrigin.Begin);
+ _sender.send(buf, size);
}
}
@@ -129,13 +129,13 @@ namespace org.apache.qpid.transport.network
{
byte typeb = (byte) type;
byte track = mevent.EncodedTrack == Frame.L4 ? (byte) 1 : (byte) 0;
-
int remaining = (int) buf.Length;
+ buf.Seek(0, SeekOrigin.Begin);
bool first = true;
while (true)
{
int size = Math.Min(_maxPayload, remaining);
- remaining -= size;
+ remaining -= size;
byte newflags = flags;
if (first)
@@ -146,7 +146,7 @@ namespace org.apache.qpid.transport.network
if (remaining == 0)
{
newflags |= Frame.LAST_FRAME;
- }
+ }
frame(newflags, typeb, track, mevent.Channel, size, buf);
diff --git a/qpid/dotnet/client-010/client/transport/network/IIoSender.cs b/qpid/dotnet/client-010/client/transport/network/IIoSender.cs
new file mode 100644
index 0000000000..67a52ef707
--- /dev/null
+++ b/qpid/dotnet/client-010/client/transport/network/IIoSender.cs
@@ -0,0 +1,28 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+namespace org.apache.qpid.transport.network
+{
+ public interface IIOSender<T>:Sender<T>
+ {
+ void send(T body, int siz);
+ }
+}
diff --git a/qpid/dotnet/client-010/client/transport/network/InputHandler.cs b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
index 244bf5a3ff..f3bffdb821 100644
--- a/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
+++ b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
@@ -127,7 +127,11 @@ namespace org.apache.qpid.transport.network
public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload)
{
MemoryStream buf = payload.Payload;
- int remaining = (int)buf.Length;
+ int remaining = (int) buf.Length;
+ if( input != null )
+ {
+ remaining += (int) input.Length;
+ }
try
{
while (remaining > 0)
@@ -136,10 +140,9 @@ namespace org.apache.qpid.transport.network
{
if (input != null)
{
- remaining += (int) input.Length;
- byte[] tmp = new byte[remaining];
- buf.Read(tmp, 0, remaining);
- input.Write(tmp, 0, remaining);
+ byte[] tmp = new byte[buf.Length];
+ buf.Read(tmp, 0, tmp.Length);
+ input.Write(tmp, 0, tmp.Length);
input.Seek(0, SeekOrigin.Begin);
buf = input;
}
@@ -155,13 +158,19 @@ namespace org.apache.qpid.transport.network
}
else
{
+ byte[] tmp;
if (input == null)
{
input = new MemoryStream();
+ tmp = new byte[remaining];
+ }
+ else
+ {
+ // this is a full buffer
+ tmp = new byte[buf.Length];
}
- byte[] tmp = new byte[remaining];
- buf.Read(tmp, 0, remaining);
- input.Write(tmp, 0, remaining);
+ buf.Read(tmp, 0, tmp.Length);
+ input.Write(tmp, 0, tmp.Length);
remaining = 0;
}
}
diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
index d71093ea55..e080ee5899 100644
--- a/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
+++ b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
@@ -21,7 +21,6 @@
using System;
using System.IO;
using System.Threading;
-using TransportException = org.apache.qpid.transport.TransportException;
using Logger = org.apache.qpid.transport.util.Logger;
diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
index 38fda3e1e2..810e0b8cf4 100644
--- a/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
+++ b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
@@ -17,7 +17,6 @@
* under the License.
*/
using System;
-using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Threading;
@@ -26,7 +25,7 @@ using org.apache.qpid.transport.util;
namespace org.apache.qpid.transport.network.io
{
- public sealed class IoSender : Sender<MemoryStream>
+ public sealed class IoSender : IIOSender<MemoryStream>
{
private static readonly Logger log = Logger.get(typeof (IoReceiver));
private readonly NetworkStream bufStream;
@@ -35,12 +34,12 @@ namespace org.apache.qpid.transport.network.io
private readonly CircularBuffer<byte[]> queue;
private readonly Thread thread;
private readonly int timeout;
- private MemoryStream _tobeSent = new MemoryStream();
+ private readonly MemoryStream _tobeSent = new MemoryStream();
public IoSender(IoTransport transport, int queueSize, int timeout)
{
this.timeout = timeout;
bufStream = transport.Stream;
- queue = new CircularBuffer<byte[]>(2);
+ queue = new CircularBuffer<byte[]>(queueSize);
thread = new Thread(Go);
log.debug("Creating IoSender thread");
thread.Name = String.Format("IoSender - {0}", transport.Socket) ;
@@ -48,20 +47,24 @@ namespace org.apache.qpid.transport.network.io
thread.Start();
}
-
public void send(MemoryStream str)
{
+ int pos = (int) str.Position;
+ str.Seek(0, SeekOrigin.Begin);
+ send(str, pos);
+ }
+
+ public void send(MemoryStream str, int size)
+ {
mutClosed.WaitOne();
if (closed)
{
throw new TransportException("sender is closed");
}
- mutClosed.ReleaseMutex();
- int length = (int)str.Position;
- str.Seek(0, SeekOrigin.Begin);
- byte[] buf = new byte[length];
- str.Read(buf, 0, length);
- _tobeSent.Write(buf, 0, length);
+ mutClosed.ReleaseMutex();
+ byte[] buf = new byte[size];
+ str.Read(buf, 0, size);
+ _tobeSent.Write(buf, 0, size);
}
public void flush()
diff --git a/qpid/dotnet/client-010/test/interop/Message.cs b/qpid/dotnet/client-010/test/interop/Message.cs
new file mode 100644
index 0000000000..79c22e0acf
--- /dev/null
+++ b/qpid/dotnet/client-010/test/interop/Message.cs
@@ -0,0 +1,181 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using System.Text;
+using System.Threading;
+using client.client;
+using NUnit.Framework;
+using org.apache.qpid.client;
+using org.apache.qpid.transport;
+using org.apache.qpid.transport.util;
+
+namespace test.interop
+{
+ public class Message : TestCase
+ {
+ private static readonly Logger _log = Logger.get(typeof (Message));
+
+ [Test]
+ public void sendAndPurge()
+ {
+ _log.debug("Running: exchangeBind");
+ ClientSession ssn = Client.createSession(0);
+ ssn.queueDelete("queue1");
+ QueueQueryResult result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+ Assert.IsNull(result.getQueue());
+ ssn.queueDeclare("queue1", null, null);
+ ssn.exchangeBind("queue1", "amq.direct", "queue1", null);
+ for (int i = 0; i < 10; i++)
+ {
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ Encoding.UTF8.GetBytes("test: " + i));
+ }
+ ssn.sync();
+ result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+ Assert.IsTrue(result.getMessageCount() == 10);
+ ssn.queuePurge("queue1");
+ ssn.sync();
+ result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+ Assert.IsTrue(result.getMessageCount() == 0);
+ }
+
+ [Test]
+ public void sendAndReceiveSmallMessages()
+ {
+ _log.debug("Running: sendAndReceiveSmallMessages");
+ byte[] smallMessage = Encoding.UTF8.GetBytes("test");
+ sendAndReceive(smallMessage, 100);
+ }
+
+ [Test]
+ public void sendAndReceiveLargeMessages()
+ {
+ _log.debug("Running: sendAndReceiveSmallMessages");
+ byte[] largeMessage = new byte[100 * 1024];
+ Random random = new Random();
+ random.NextBytes(largeMessage);
+ sendAndReceive(largeMessage, 10);
+ }
+
+ [Test]
+ public void sendAndReceiveVeryLargeMessages()
+ {
+ _log.debug("Running: sendAndReceiveSmallMessages");
+ byte[] verylargeMessage = new byte[1000 * 1024];
+ Random random = new Random();
+ random.NextBytes(verylargeMessage);
+ sendAndReceive(verylargeMessage, 2);
+ }
+
+ private void sendAndReceive(byte[] messageBody, int count)
+ {
+ ClientSession ssn = Client.createSession(0);
+ ssn.sync();
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueDelete("queue1");
+ QueueQueryResult result = (QueueQueryResult) ssn.queueQuery("queue1").Result;
+ Assert.IsNull(result.getQueue());
+ ssn.queueDeclare("queue1", null, null);
+ ssn.exchangeBind("queue1", "amq.direct", "queue1", null);
+ Object myLock = new Object();
+ MyListener myListener = new MyListener(myLock, count);
+ ssn.attachMessageListener(myListener, "myDest");
+
+ ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, null,
+ 0, null);
+
+
+ // issue credits
+ ssn.messageSetFlowMode("myDest", MessageFlowMode.WINDOW);
+ ssn.messageFlow("myDest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+ ssn.messageFlow("myDest", MessageCreditUnit.MESSAGE, 10000);
+ ssn.sync();
+
+ for (int i = 0; i < count; i++)
+ {
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId(UUID.randomUUID())),
+ messageBody);
+ }
+ ssn.sync();
+
+ lock (myLock)
+ {
+ if (myListener.Count != 0)
+ {
+ Monitor.Wait(myLock, 10000000);
+ }
+ }
+ Assert.IsTrue(myListener.Count == 0);
+ ssn.messageAccept(myListener.UnAck);
+ ssn.sync();
+ // the queue should be empty
+ result = (QueueQueryResult)ssn.queueQuery("queue1").Result;
+ Assert.IsTrue(result.getMessageCount() == 0);
+ ssn.close();
+ }
+
+
+
+ private class MyListener : MessageListener
+ {
+ private static readonly Logger _log = Logger.get(typeof (MyListener));
+ private readonly Object _wl;
+ private int _count;
+ private RangeSet _rs = new RangeSet();
+
+ public MyListener(Object wl, int count)
+ {
+ _wl = wl;
+ _count = count;
+ }
+
+ public void messageTransfer(MessageTransfer m)
+ {
+ byte[] body = new byte[m.Body.Length - m.Body.Position];
+ _log.debug("Got a message of size: " + body.Length + " count = " + _count);
+ _rs.add(m.Id);
+ lock (_wl)
+ {
+ _count--;
+ if (_count == 0)
+ {
+ Monitor.PulseAll(_wl);
+ }
+ }
+ }
+
+ public int Count
+ {
+ get { return _count; }
+ }
+
+ public RangeSet UnAck
+ {
+ get { return _rs; }
+ }
+ }
+ }
+}
diff --git a/qpid/dotnet/client-010/test/interop/TestCase.cs b/qpid/dotnet/client-010/test/interop/TestCase.cs
index 02f5a327f6..9f410a9e9e 100644
--- a/qpid/dotnet/client-010/test/interop/TestCase.cs
+++ b/qpid/dotnet/client-010/test/interop/TestCase.cs
@@ -43,18 +43,16 @@ namespace test.interop
// populate default properties
_properties.Add("UserName", "guest");
_properties.Add("Password", "guest");
- _properties.Add("Host", "192.168.1.14");
- _properties.Add("Port", "5673");
+ _properties.Add("Host", "localhost");
+ _properties.Add("Port", "5672");
_properties.Add("VirtualHost", "test");
//Read the test config file
XmlTextReader reader = new XmlTextReader(Environment.CurrentDirectory + ".\\test.config");
while (reader.Read())
- {
- XmlNodeType nType = reader.NodeType;
+ {
// if node type is an element
if (reader.NodeType == XmlNodeType.Element && reader.Name.Equals("add"))
{
- Console.WriteLine("Element:" + reader.Name.ToString());
if (_properties.ContainsKey(reader.GetAttribute("key")))
{
_properties[reader.GetAttribute("key")] = reader.GetAttribute("value");