diff options
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/network/io/IoSender.cs')
-rw-r--r-- | qpid/dotnet/client-010/client/transport/network/io/IoSender.cs | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs new file mode 100644 index 0000000000..025b782a12 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs @@ -0,0 +1,137 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Threading; +using common.org.apache.qpid.transport.util; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network.io +{ + public sealed class IoSender : IIoSender<MemoryStream> + { + private static readonly Logger log = Logger.Get(typeof (IoReceiver)); + private readonly IIoTransport ioTransport; + private readonly Stream bufStream; + private bool closed; + private readonly Mutex mutClosed = new Mutex(); + private readonly CircularBuffer<byte[]> queue; + private readonly Thread thread; + private readonly int timeout; + private readonly MemoryStream _tobeSent = new MemoryStream(); + public IoSender(IIoTransport transport, int queueSize, int timeout) + { + this.timeout = timeout; + ioTransport = transport; + bufStream = transport.Stream; + queue = new CircularBuffer<byte[]>(queueSize); + thread = new Thread(Go); + log.Debug("Creating IoSender thread"); + thread.Name = String.Format("IoSender - {0}", transport.Socket) ; + thread.IsBackground = true; + thread.Start(); + } + + public void Send(MemoryStream str) + { + int pos = (int) str.Position; + str.Seek(0, SeekOrigin.Begin); + Send(str, pos); + } + + public void Send(MemoryStream str, int size) + { + mutClosed.WaitOne(); + if (closed) + { + throw new TransportException("sender is Closed"); + } + mutClosed.ReleaseMutex(); + byte[] buf = new byte[size]; + str.Read(buf, 0, size); + _tobeSent.Write(buf, 0, size); + } + + public void Flush() + { + int length = (int)_tobeSent.Position; + byte[] buf = new byte[length]; + _tobeSent.Seek(0, SeekOrigin.Begin); + _tobeSent.Read(buf, 0, length); + queue.Enqueue(buf); + // bufStream.Write(buf, 0, length); + // _tobeSent = new MemoryStream(); + // _writer.Write(buf, 0, length); + // _writer.Flush(); + _tobeSent.Seek(0, SeekOrigin.Begin); + } + + public void Close() + { + log.Debug("Closing Sender"); + mutClosed.WaitOne(); + if (!closed) + { + try + { + closed = true; + queue.Close(); + thread.Join(timeout); + if (thread.IsAlive) + { + throw new TransportException("join timed out"); + } + } + catch (ThreadInterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + mutClosed.ReleaseMutex(); + } + + private void Go() + { + while (! closed) + { + //MemoryStream st = queue.Dequeue(); + byte[] st = queue.Dequeue(); + if (st != null) + { + try + { + // int length = (int) st.Length; + // byte[] buf = new byte[length]; + // st.Read(buf, 0, length); + bufStream.Write(st, 0, st.Length); + } + catch (Exception e) + { + closed = true; + ioTransport.Connection.On_ReceivedException(this, new ExceptionArgs(e)); + } + } + } + } + } +} |