summaryrefslogtreecommitdiff
path: root/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs')
-rw-r--r--trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs134
1 files changed, 0 insertions, 134 deletions
diff --git a/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs b/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
deleted file mode 100644
index 4491d2e98b..0000000000
--- a/trunk/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-using System;
-using System.IO;
-using System.Threading;
-using common.org.apache.qpid.transport.util;
-using org.apache.qpid.transport.util;
-
-namespace org.apache.qpid.transport.network.io
-{
- public sealed class IoSender : IIoSender<MemoryStream>
- {
- private static readonly Logger log = Logger.Get(typeof (IoReceiver));
- private readonly Stream bufStream;
- private bool closed;
- private readonly Mutex mutClosed = new Mutex();
- private readonly CircularBuffer<byte[]> queue;
- private readonly Thread thread;
- private readonly int timeout;
- private readonly MemoryStream _tobeSent = new MemoryStream();
- public IoSender(IIoTransport transport, int queueSize, int timeout)
- {
- this.timeout = timeout;
- bufStream = transport.Stream;
- queue = new CircularBuffer<byte[]>(queueSize);
- thread = new Thread(Go);
- log.Debug("Creating IoSender thread");
- thread.Name = String.Format("IoSender - {0}", transport.Socket) ;
- thread.IsBackground = true;
- thread.Start();
- }
-
- public void Send(MemoryStream str)
- {
- int pos = (int) str.Position;
- str.Seek(0, SeekOrigin.Begin);
- Send(str, pos);
- }
-
- public void Send(MemoryStream str, int size)
- {
- mutClosed.WaitOne();
- if (closed)
- {
- throw new TransportException("sender is Closed");
- }
- mutClosed.ReleaseMutex();
- byte[] buf = new byte[size];
- str.Read(buf, 0, size);
- _tobeSent.Write(buf, 0, size);
- }
-
- public void Flush()
- {
- int length = (int)_tobeSent.Position;
- byte[] buf = new byte[length];
- _tobeSent.Seek(0, SeekOrigin.Begin);
- _tobeSent.Read(buf, 0, length);
- queue.Enqueue(buf);
- // bufStream.Write(buf, 0, length);
- // _tobeSent = new MemoryStream();
- // _writer.Write(buf, 0, length);
- // _writer.Flush();
- _tobeSent.Seek(0, SeekOrigin.Begin);
- }
-
- public void Close()
- {
- log.Debug("Closing Sender");
- mutClosed.WaitOne();
- if (!closed)
- {
- try
- {
- closed = true;
- queue.Close();
- thread.Join(timeout);
- if (thread.IsAlive)
- {
- throw new TransportException("join timed out");
- }
- }
- catch (ThreadInterruptedException e)
- {
- throw new TransportException(e);
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
- mutClosed.ReleaseMutex();
- }
-
- private void Go()
- {
- while (! closed)
- {
- //MemoryStream st = queue.Dequeue();
- byte[] st = queue.Dequeue();
- if (st != null)
- {
- try
- {
- // int length = (int) st.Length;
- // byte[] buf = new byte[length];
- // st.Read(buf, 0, length);
- bufStream.Write(st, 0, st.Length);
- }
- catch (Exception e)
- {
- Console.WriteLine(e);
- }
- }
- }
- }
- }
-}