summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/Transport/IoHandler.cs')
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IoHandler.cs644
1 files changed, 322 insertions, 322 deletions
diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
index 9ac513069e..0475236d92 100644
--- a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
+++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
@@ -1,322 +1,322 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.IO;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Client.Protocol;
-
-namespace Apache.Qpid.Client.Transport
-{
- /// <summary>
- /// Responsible for reading and writing
- /// ByteBuffers from/to network streams, and handling
- /// the stream filters
- /// </summary>
- public class IoHandler : IByteChannel, IDisposable
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
- private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- private Stream _topStream;
- private IProtocolListener _protocolListener;
- private int _readBufferSize;
-
- public int ReadBufferSize
- {
- get { return _readBufferSize; }
- set { _readBufferSize = value; }
- }
-
- /// <summary>
- /// Initialize a new instance
- /// </summary>
- /// <param name="stream">Underlying network stream</param>
- /// <param name="protocolListener">Protocol listener to report exceptions to</param>
- public IoHandler(Stream stream, IProtocolListener protocolListener)
- {
- if ( stream == null )
- throw new ArgumentNullException("stream");
- if ( protocolListener == null )
- throw new ArgumentNullException("protocolListener");
-
- // initially, the stream at the top of the filter
- // chain is the underlying network stream
- _topStream = stream;
- _protocolListener = protocolListener;
- _readBufferSize = DEFAULT_BUFFER_SIZE;
- }
-
- /// <summary>
- /// Adds a new filter on the top of the chain
- /// </summary>
- /// <param name="filter">Stream filter to put on top of the chain</param>
- /// <remarks>
- /// This should *only* be called during initialization. We don't
- /// support changing the filter change after the first read/write
- /// has been done and it's not thread-safe to boot!
- /// </remarks>
- public void AddFilter(IStreamFilter filter)
- {
- _topStream = filter.CreateFilterStream(_topStream);
- }
-
- #region IByteChannel Implementation
- //
- // IByteChannel Implementation
- //
-
- /// <summary>
- /// Read a <see cref="ByteBuffer"/> from the underlying
- /// network stream and any configured filters
- /// </summary>
- /// <returns>A ByteBuffer, if available</returns>
- public ByteBuffer Read()
- {
- byte[] bytes = AllocateBuffer();
-
- int numOctets = _topStream.Read(bytes, 0, bytes.Length);
-
- return WrapByteArray(bytes, numOctets);
- }
-
- /// <summary>
- /// Begin an asynchronous read operation
- /// </summary>
- /// <param name="callback">Callback method to call when read operation completes</param>
- /// <param name="state">State object</param>
- /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
- public IAsyncResult BeginRead(AsyncCallback callback, object state)
- {
- byte[] bytes = AllocateBuffer();
- ReadData rd = new ReadData(callback, state, bytes);
-
- // only put a callback if the caller wants one.
- AsyncCallback myCallback = null;
- if ( callback != null )
- myCallback = new AsyncCallback(OnAsyncReadDone);
-
- IAsyncResult result = _topStream.BeginRead(
- bytes, 0, bytes.Length, myCallback,rd
- );
- return new WrappedAsyncResult(result, bytes);
- }
-
- /// <summary>
- /// End an asynchronous read operation
- /// </summary>
- /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
- /// <returns>The <see cref="ByteBuffer"/> read</returns>
- public ByteBuffer EndRead(IAsyncResult result)
- {
- WrappedAsyncResult theResult = (WrappedAsyncResult)result;
- int bytesRead = _topStream.EndRead(theResult.InnerResult);
- return WrapByteArray(theResult.Buffer, bytesRead);
- }
-
- /// <summary>
- /// Write a <see cref="ByteBuffer"/> to the underlying network
- /// stream, going through any configured filters
- /// </summary>
- /// <param name="buffer"></param>
- public void Write(ByteBuffer buffer)
- {
- try
- {
- _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
- }
- catch (Exception e)
- {
- _log.Warn("Write caused exception", e);
- _protocolListener.OnException(e);
- }
- }
-
- /// <summary>
- /// Begin an asynchronous write operation
- /// </summary>
- /// <param name="buffer">Buffer to write</param>
- /// <param name="callback">A callback to call when the operation completes</param>
- /// <param name="state">State object</param>
- /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
- public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
- {
- try
- {
- return _topStream.BeginWrite(
- buffer.Array, buffer.Position, buffer.Limit,
- callback, state
- );
- } catch ( Exception e )
- {
- _log.Error("BeginWrite caused exception", e);
- // not clear if an exception here should be propagated? we still
- // need to propagate it upwards anyway!
- _protocolListener.OnException(e);
- throw;
- }
- }
-
- /// <summary>
- /// End an asynchronous write operation
- /// </summary>
- /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
- public void EndWrite(IAsyncResult result)
- {
- try
- {
- _topStream.EndWrite(result);
- } catch ( Exception e )
- {
- _log.Error("EndWrite caused exception", e);
- // not clear if an exception here should be propagated?
- _protocolListener.OnException(e);
- //throw;
- }
- }
- #endregion // IByteChannel Implementation
-
- #region IDisposable Implementation
- //
- // IDisposable Implementation
- //
-
- public void Dispose()
- {
- if ( _topStream != null )
- {
- _topStream.Close();
- }
- }
-
- #endregion // IDisposable Implementation
-
- #region Private and Helper Classes/Methods
- //
- // Private and Helper Classes/Methods
- //
-
- private byte[] AllocateBuffer()
- {
- return new byte[ReadBufferSize];
- }
-
- private static ByteBuffer WrapByteArray(byte[] bytes, int size)
- {
- ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
- byteBuffer.Limit = size;
- byteBuffer.Flip();
-
- return byteBuffer;
- }
-
-
- private static void OnAsyncReadDone(IAsyncResult result)
- {
- ReadData rd = (ReadData) result.AsyncState;
- IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
- rd.Callback(wrapped);
- }
-
- class ReadData
- {
- private object _state;
- private AsyncCallback _callback;
- private byte[] _buffer;
-
- public object State
- {
- get { return _state; }
- }
-
- public AsyncCallback Callback
- {
- get { return _callback; }
- }
-
- public byte[] Buffer
- {
- get { return _buffer; }
- }
-
- public ReadData(AsyncCallback callback, object state, byte[] buffer)
- {
- _callback = callback;
- _state = state;
- _buffer = buffer;
- }
- }
-
- class WrappedAsyncResult : IAsyncResult
- {
- private IAsyncResult _innerResult;
- private byte[] _buffer;
-
- #region IAsyncResult Properties
- //
- // IAsyncResult Properties
- //
- public bool IsCompleted
- {
- get { return _innerResult.IsCompleted; }
- }
-
- public WaitHandle AsyncWaitHandle
- {
- get { return _innerResult.AsyncWaitHandle; }
- }
-
- public object AsyncState
- {
- get { return _innerResult.AsyncState; }
- }
-
- public bool CompletedSynchronously
- {
- get { return _innerResult.CompletedSynchronously; }
- }
- #endregion // IAsyncResult Properties
-
- public IAsyncResult InnerResult
- {
- get { return _innerResult; }
- }
- public byte[] Buffer
- {
- get { return _buffer; }
- }
-
- public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
- {
- if ( result == null )
- throw new ArgumentNullException("result");
- if ( buffer == null )
- throw new ArgumentNullException("buffer");
-
- _innerResult = result;
- _buffer = buffer;
- }
- }
-
- #endregion // Private and Helper Classes/Methods
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Protocol;
+
+namespace Apache.Qpid.Client.Transport
+{
+ /// <summary>
+ /// Responsible for reading and writing
+ /// ByteBuffers from/to network streams, and handling
+ /// the stream filters
+ /// </summary>
+ public class IoHandler : IByteChannel, IDisposable
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
+ private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private Stream _topStream;
+ private IProtocolListener _protocolListener;
+ private int _readBufferSize;
+
+ public int ReadBufferSize
+ {
+ get { return _readBufferSize; }
+ set { _readBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="stream">Underlying network stream</param>
+ /// <param name="protocolListener">Protocol listener to report exceptions to</param>
+ public IoHandler(Stream stream, IProtocolListener protocolListener)
+ {
+ if ( stream == null )
+ throw new ArgumentNullException("stream");
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ // initially, the stream at the top of the filter
+ // chain is the underlying network stream
+ _topStream = stream;
+ _protocolListener = protocolListener;
+ _readBufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ /// <summary>
+ /// Adds a new filter on the top of the chain
+ /// </summary>
+ /// <param name="filter">Stream filter to put on top of the chain</param>
+ /// <remarks>
+ /// This should *only* be called during initialization. We don't
+ /// support changing the filter change after the first read/write
+ /// has been done and it's not thread-safe to boot!
+ /// </remarks>
+ public void AddFilter(IStreamFilter filter)
+ {
+ _topStream = filter.CreateFilterStream(_topStream);
+ }
+
+ #region IByteChannel Implementation
+ //
+ // IByteChannel Implementation
+ //
+
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ public ByteBuffer Read()
+ {
+ byte[] bytes = AllocateBuffer();
+
+ int numOctets = _topStream.Read(bytes, 0, bytes.Length);
+
+ return WrapByteArray(bytes, numOctets);
+ }
+
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ byte[] bytes = AllocateBuffer();
+ ReadData rd = new ReadData(callback, state, bytes);
+
+ // only put a callback if the caller wants one.
+ AsyncCallback myCallback = null;
+ if ( callback != null )
+ myCallback = new AsyncCallback(OnAsyncReadDone);
+
+ IAsyncResult result = _topStream.BeginRead(
+ bytes, 0, bytes.Length, myCallback,rd
+ );
+ return new WrappedAsyncResult(result, bytes);
+ }
+
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ WrappedAsyncResult theResult = (WrappedAsyncResult)result;
+ int bytesRead = _topStream.EndRead(theResult.InnerResult);
+ return WrapByteArray(theResult.Buffer, bytesRead);
+ }
+
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ public void Write(ByteBuffer buffer)
+ {
+ try
+ {
+ _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
+ }
+ catch (Exception e)
+ {
+ _log.Warn("Write caused exception", e);
+ _protocolListener.OnException(e);
+ }
+ }
+
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ try
+ {
+ return _topStream.BeginWrite(
+ buffer.Array, buffer.Position, buffer.Limit,
+ callback, state
+ );
+ } catch ( Exception e )
+ {
+ _log.Error("BeginWrite caused exception", e);
+ // not clear if an exception here should be propagated? we still
+ // need to propagate it upwards anyway!
+ _protocolListener.OnException(e);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ public void EndWrite(IAsyncResult result)
+ {
+ try
+ {
+ _topStream.EndWrite(result);
+ } catch ( Exception e )
+ {
+ _log.Error("EndWrite caused exception", e);
+ // not clear if an exception here should be propagated?
+ _protocolListener.OnException(e);
+ //throw;
+ }
+ }
+ #endregion // IByteChannel Implementation
+
+ #region IDisposable Implementation
+ //
+ // IDisposable Implementation
+ //
+
+ public void Dispose()
+ {
+ if ( _topStream != null )
+ {
+ _topStream.Close();
+ }
+ }
+
+ #endregion // IDisposable Implementation
+
+ #region Private and Helper Classes/Methods
+ //
+ // Private and Helper Classes/Methods
+ //
+
+ private byte[] AllocateBuffer()
+ {
+ return new byte[ReadBufferSize];
+ }
+
+ private static ByteBuffer WrapByteArray(byte[] bytes, int size)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
+ byteBuffer.Limit = size;
+ byteBuffer.Flip();
+
+ return byteBuffer;
+ }
+
+
+ private static void OnAsyncReadDone(IAsyncResult result)
+ {
+ ReadData rd = (ReadData) result.AsyncState;
+ IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
+ rd.Callback(wrapped);
+ }
+
+ class ReadData
+ {
+ private object _state;
+ private AsyncCallback _callback;
+ private byte[] _buffer;
+
+ public object State
+ {
+ get { return _state; }
+ }
+
+ public AsyncCallback Callback
+ {
+ get { return _callback; }
+ }
+
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public ReadData(AsyncCallback callback, object state, byte[] buffer)
+ {
+ _callback = callback;
+ _state = state;
+ _buffer = buffer;
+ }
+ }
+
+ class WrappedAsyncResult : IAsyncResult
+ {
+ private IAsyncResult _innerResult;
+ private byte[] _buffer;
+
+ #region IAsyncResult Properties
+ //
+ // IAsyncResult Properties
+ //
+ public bool IsCompleted
+ {
+ get { return _innerResult.IsCompleted; }
+ }
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _innerResult.AsyncWaitHandle; }
+ }
+
+ public object AsyncState
+ {
+ get { return _innerResult.AsyncState; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return _innerResult.CompletedSynchronously; }
+ }
+ #endregion // IAsyncResult Properties
+
+ public IAsyncResult InnerResult
+ {
+ get { return _innerResult; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
+ {
+ if ( result == null )
+ throw new ArgumentNullException("result");
+ if ( buffer == null )
+ throw new ArgumentNullException("buffer");
+
+ _innerResult = result;
+ _buffer = buffer;
+ }
+ }
+
+ #endregion // Private and Helper Classes/Methods
+ }
+}