summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs')
-rw-r--r--qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs185
1 files changed, 185 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
new file mode 100644
index 0000000000..b60444fa29
--- /dev/null
+++ b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
@@ -0,0 +1,185 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using System.Threading;
+using Logger = org.apache.qpid.transport.util.Logger;
+
+
+namespace org.apache.qpid.transport.network.io
+{
+ /// <summary>
+ /// IoReceiver
+ /// </summary>
+ public sealed class IoReceiver : IReceiver<ReceivedPayload<MemoryStream>>
+ {
+ private static readonly Logger log = Logger.Get(typeof(IoReceiver));
+ private readonly int m_bufferSize;
+ private readonly Stream m_bufStream;
+ private readonly int m_timeout;
+ private readonly Thread m_thread;
+ private bool m_closed;
+ private readonly Object m_objectLock = new object();
+
+ // the event raised when a buffer is read from the wire
+ event EventHandler<ReceivedPayload<MemoryStream>> ReceivedBuffer;
+ event EventHandler<ExceptionArgs> ExceptionReading;
+ event EventHandler ReceiverClosed;
+
+ event EventHandler<ReceivedPayload<MemoryStream>> IReceiver<ReceivedPayload<MemoryStream>>.Received
+ {
+ add
+ {
+ lock (m_objectLock)
+ {
+ ReceivedBuffer += value;
+ }
+ }
+ remove
+ {
+ lock (m_objectLock)
+ {
+ ReceivedBuffer -= value;
+ }
+ }
+ }
+
+ event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<MemoryStream>>.Exception
+ {
+ add
+ {
+ lock (m_objectLock)
+ {
+ ExceptionReading += value;
+ }
+ }
+ remove
+ {
+ lock (m_objectLock)
+ {
+ ExceptionReading -= value;
+ }
+ }
+ }
+
+ event EventHandler IReceiver<ReceivedPayload<MemoryStream>>.Closed
+ {
+ add
+ {
+ lock (m_objectLock)
+ {
+ ReceiverClosed += value;
+ }
+ }
+ remove
+ {
+ lock (m_objectLock)
+ {
+ ReceiverClosed -= value;
+ }
+ }
+ }
+
+ public IoReceiver(Stream stream, int bufferSize, int timeout)
+ {
+ m_bufferSize = bufferSize;
+ m_bufStream = stream;
+ m_timeout = timeout;
+ m_thread = new Thread(Go);
+ m_thread.Name = String.Format("IoReceiver - {0}", stream);
+ m_thread.IsBackground = true;
+ m_thread.Start();
+ }
+
+ public void Close()
+ {
+ Mutex mut = new Mutex();
+ mut.WaitOne();
+ if (!m_closed)
+ {
+ m_closed = true;
+ try
+ {
+ log.Debug("Receiver closing");
+ m_bufStream.Close();
+ m_thread.Join(m_timeout);
+ if (m_thread.IsAlive)
+ {
+ throw new TransportException("join timed out");
+ }
+ }
+ catch (ThreadInterruptedException e)
+ {
+ throw new TransportException(e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ mut.ReleaseMutex();
+ }
+
+ void Go()
+ {
+ // create a BufferedStream on top of the NetworkStream.
+ int threshold = m_bufferSize/2;
+ byte[] buffer = new byte[m_bufferSize];
+ try
+ {
+ int read;
+ int offset = 0;
+ ReceivedPayload<MemoryStream> payload = new ReceivedPayload<MemoryStream>();
+ while ((read = m_bufStream.Read(buffer, offset, m_bufferSize - offset)) > 0)
+ {
+ MemoryStream memStream = new MemoryStream(buffer, offset, read);
+ if (ReceivedBuffer != null)
+ {
+ // call the event
+ payload.Payload = memStream;
+ ReceivedBuffer(this, payload);
+ }
+ offset += read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[m_bufferSize];
+ }
+ }
+ log.Debug("Receiver thread terminating");
+ }
+ catch (Exception t)
+ {
+ if (ExceptionReading != null)
+ {
+ ExceptionReading(this, new ExceptionArgs(t));
+ }
+ }
+ finally
+ {
+ if (ReceiverClosed != null)
+ {
+ ReceiverClosed(this, new EventArgs());
+ }
+ }
+ }
+ }
+} \ No newline at end of file