/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.IO; using System.Threading; using log4net; using Apache.Qpid.Buffer; using Apache.Qpid.Client.Protocol; namespace Apache.Qpid.Client.Transport { /// /// Responsible for reading and writing /// ByteBuffers from/to network streams, and handling /// the stream filters /// public class IoHandler : IByteChannel, IDisposable { private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler)); private const int DEFAULT_BUFFER_SIZE = 32 * 1024; private Stream _topStream; private IProtocolListener _protocolListener; private int _readBufferSize; public int ReadBufferSize { get { return _readBufferSize; } set { _readBufferSize = value; } } /// /// Initialize a new instance /// /// Underlying network stream /// Protocol listener to report exceptions to public IoHandler(Stream stream, IProtocolListener protocolListener) { if ( stream == null ) throw new ArgumentNullException("stream"); if ( protocolListener == null ) throw new ArgumentNullException("protocolListener"); // initially, the stream at the top of the filter // chain is the underlying network stream _topStream = stream; _protocolListener = protocolListener; _readBufferSize = DEFAULT_BUFFER_SIZE; } /// /// Adds a new filter on the top of the chain /// /// Stream filter to put on top of the chain /// /// This should *only* be called during initialization. We don't /// support changing the filter change after the first read/write /// has been done and it's not thread-safe to boot! /// public void AddFilter(IStreamFilter filter) { _topStream = filter.CreateFilterStream(_topStream); } #region IByteChannel Implementation // // IByteChannel Implementation // /// /// Read a from the underlying /// network stream and any configured filters /// /// A ByteBuffer, if available public ByteBuffer Read() { byte[] bytes = AllocateBuffer(); int numOctets = _topStream.Read(bytes, 0, bytes.Length); return WrapByteArray(bytes, numOctets); } /// /// Begin an asynchronous read operation /// /// Callback method to call when read operation completes /// State object /// An object public IAsyncResult BeginRead(AsyncCallback callback, object state) { byte[] bytes = AllocateBuffer(); ReadData rd = new ReadData(callback, state, bytes); // only put a callback if the caller wants one. AsyncCallback myCallback = null; if ( callback != null ) myCallback = new AsyncCallback(OnAsyncReadDone); IAsyncResult result = _topStream.BeginRead( bytes, 0, bytes.Length, myCallback,rd ); return new WrappedAsyncResult(result, bytes); } /// /// End an asynchronous read operation /// /// The object returned from /// The read public ByteBuffer EndRead(IAsyncResult result) { WrappedAsyncResult theResult = (WrappedAsyncResult)result; int bytesRead = _topStream.EndRead(theResult.InnerResult); return WrapByteArray(theResult.Buffer, bytesRead); } /// /// Write a to the underlying network /// stream, going through any configured filters /// /// public void Write(ByteBuffer buffer) { try { _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME } catch (Exception e) { _log.Warn("Write caused exception", e); _protocolListener.OnException(e); } } /// /// Begin an asynchronous write operation /// /// Buffer to write /// A callback to call when the operation completes /// State object /// An object public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) { try { return _topStream.BeginWrite( buffer.Array, buffer.Position, buffer.Limit, callback, state ); } catch ( Exception e ) { _log.Error("BeginWrite caused exception", e); // not clear if an exception here should be propagated? we still // need to propagate it upwards anyway! _protocolListener.OnException(e); throw; } } /// /// End an asynchronous write operation /// /// The object returned by public void EndWrite(IAsyncResult result) { try { _topStream.EndWrite(result); } catch ( Exception e ) { _log.Error("EndWrite caused exception", e); // not clear if an exception here should be propagated? _protocolListener.OnException(e); //throw; } } #endregion // IByteChannel Implementation #region IDisposable Implementation // // IDisposable Implementation // public void Dispose() { if ( _topStream != null ) { _topStream.Close(); } } #endregion // IDisposable Implementation #region Private and Helper Classes/Methods // // Private and Helper Classes/Methods // private byte[] AllocateBuffer() { return new byte[ReadBufferSize]; } private static ByteBuffer WrapByteArray(byte[] bytes, int size) { ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes); byteBuffer.Limit = size; byteBuffer.Flip(); return byteBuffer; } private static void OnAsyncReadDone(IAsyncResult result) { ReadData rd = (ReadData) result.AsyncState; IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer); rd.Callback(wrapped); } class ReadData { private object _state; private AsyncCallback _callback; private byte[] _buffer; public object State { get { return _state; } } public AsyncCallback Callback { get { return _callback; } } public byte[] Buffer { get { return _buffer; } } public ReadData(AsyncCallback callback, object state, byte[] buffer) { _callback = callback; _state = state; _buffer = buffer; } } class WrappedAsyncResult : IAsyncResult { private IAsyncResult _innerResult; private byte[] _buffer; #region IAsyncResult Properties // // IAsyncResult Properties // public bool IsCompleted { get { return _innerResult.IsCompleted; } } public WaitHandle AsyncWaitHandle { get { return _innerResult.AsyncWaitHandle; } } public object AsyncState { get { return _innerResult.AsyncState; } } public bool CompletedSynchronously { get { return _innerResult.CompletedSynchronously; } } #endregion // IAsyncResult Properties public IAsyncResult InnerResult { get { return _innerResult; } } public byte[] Buffer { get { return _buffer; } } public WrappedAsyncResult(IAsyncResult result, byte[] buffer) { if ( result == null ) throw new ArgumentNullException("result"); if ( buffer == null ) throw new ArgumentNullException("buffer"); _innerResult = result; _buffer = buffer; } } #endregion // Private and Helper Classes/Methods } }