diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/Transport')
15 files changed, 0 insertions, 1236 deletions
diff --git a/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs b/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs deleted file mode 100644 index dd0bb404cb..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using Apache.Qpid.Codec; -using Apache.Qpid.Codec.Demux; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Transport -{ - public class AMQProtocolProvider - { - private DemuxingProtocolCodecFactory _factory; - - public AMQProtocolProvider() - { - _factory = new DemuxingProtocolCodecFactory(); - _factory.Register(new AMQDataBlockEncoder()); - _factory.Register(new AMQDataBlockDecoder()); - _factory.Register(new ProtocolInitiation.Decoder()); - } - - public IProtocolCodecFactory CodecFactory - { - get - { - return _factory; - } - } - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs deleted file mode 100644 index 1e217e755b..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using log4net; -using Apache.Qpid.Buffer; -using Apache.Qpid.Codec; -using Apache.Qpid.Codec.Support; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Transport -{ - public class AmqpChannel : IProtocolChannel - { - // Warning: don't use this log for regular logging. - static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel"); - - IByteChannel _byteChannel; - IProtocolEncoder _encoder; - IProtocolDecoder _decoder; - IProtocolDecoderOutput _decoderOutput; - private object _syncLock; - - public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput) - { - _byteChannel = byteChannel; - _decoderOutput = decoderOutput; - _syncLock = new object(); - - AMQProtocolProvider protocolProvider = new AMQProtocolProvider(); - IProtocolCodecFactory factory = protocolProvider.CodecFactory; - _encoder = factory.Encoder; - _decoder = factory.Decoder; - } - - public void Read() - { - ByteBuffer buffer = _byteChannel.Read(); - Decode(buffer); - } - - public IAsyncResult BeginRead(AsyncCallback callback, object state) - { - return _byteChannel.BeginRead(callback, state); - } - - public void EndRead(IAsyncResult result) - { - ByteBuffer buffer = _byteChannel.EndRead(result); - Decode(buffer); - } - - public void Write(IDataBlock o) - { - // TODO: Refactor to decorator. - if (_protocolTraceLog.IsDebugEnabled) - { - _protocolTraceLog.Debug(String.Format("WRITE {0}", o)); - } - // we should be doing an async write, but apparently - // the mentalis library doesn't queue async read/writes - // correctly and throws random IOException's. Stay sync for a while - //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); - _byteChannel.Write(Encode(o)); - } - - // not used for now - //private void OnAsyncWriteDone(IAsyncResult result) - //{ - // _byteChannel.EndWrite(result); - //} - - private void Decode(ByteBuffer buffer) - { - // make sure we don't try to decode more than - // one buffer at the same time - lock ( _syncLock ) - { - _decoder.Decode(buffer, _decoderOutput); - } - } - - private ByteBuffer Encode(object o) - { - SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); - _encoder.Encode(o, output); - return output.buffer; - } - - } -} - - diff --git a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs deleted file mode 100644 index 35806f2a6e..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using Apache.Qpid.Buffer; - -namespace Apache.Qpid.Client.Transport -{ - /// <summary> - /// Represents input/output channels that read - /// and write <see cref="ByteBuffer"/> instances - /// </summary> - public interface IByteChannel - { - /// <summary> - /// Read a <see cref="ByteBuffer"/> from the underlying - /// network stream and any configured filters - /// </summary> - /// <returns>A ByteBuffer, if available</returns> - ByteBuffer Read(); - /// <summary> - /// Begin an asynchronous read operation - /// </summary> - /// <param name="callback">Callback method to call when read operation completes</param> - /// <param name="state">State object</param> - /// <returns>An <see cref="System.IAsyncResult"/> object</returns> - IAsyncResult BeginRead(AsyncCallback callback, object state); - /// <summary> - /// End an asynchronous read operation - /// </summary> - /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param> - /// <returns>The <see cref="ByteBuffer"/> read</returns> - ByteBuffer EndRead(IAsyncResult result); - /// <summary> - /// Write a <see cref="ByteBuffer"/> to the underlying network - /// stream, going through any configured filters - /// </summary> - /// <param name="buffer"></param> - void Write(ByteBuffer buffer); - /// <summary> - /// Begin an asynchronous write operation - /// </summary> - /// <param name="buffer">Buffer to write</param> - /// <param name="callback">A callback to call when the operation completes</param> - /// <param name="state">State object</param> - /// <returns>An <see cref="System.IAsyncResult"/> object</returns> - IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state); - /// <summary> - /// End an asynchronous write operation - /// </summary> - /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param> - void EndWrite(IAsyncResult result); - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs deleted file mode 100644 index 0b59ee8799..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; - -namespace Apache.Qpid.Client.Transport -{ - public interface IProtocolChannel : IProtocolWriter - { - void Read(); - IAsyncResult BeginRead(AsyncCallback callback, object state); - void EndRead(IAsyncResult result); - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs deleted file mode 100644 index 592dff3a19..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Transport -{ - public interface IProtocolWriter - { - void Write(IDataBlock o); - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs deleted file mode 100644 index e0e890fc5a..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System.IO; - -namespace Apache.Qpid.Client.Transport -{ - /// <summary> - /// Defines a way to introduce an arbitrary filtering - /// stream into the stream chain managed by <see cref="IoHandler"/> - /// </summary> - public interface IStreamFilter - { - /// <summary> - /// Creates a new filtering stream on top of another - /// </summary> - /// <param name="lowerStream">Next stream on the stack</param> - /// <returns>A new filtering stream</returns> - Stream CreateFilterStream(Stream lowerStream); - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/dotnet/Qpid.Client/Client/Transport/ITransport.cs deleted file mode 100644 index 693a9a9534..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/ITransport.cs +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using Apache.Qpid.Client.Qms; -using Apache.Qpid.Client.Protocol; - -namespace Apache.Qpid.Client.Transport -{ - public interface ITransport : IConnectionCloser - { - void Connect(IBrokerInfo broker, AMQConnection connection); - string LocalEndpoint { get; } - IProtocolWriter ProtocolWriter { get; } - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs deleted file mode 100644 index 0475236d92..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs +++ /dev/null @@ -1,322 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.IO; -using System.Threading; -using log4net; -using Apache.Qpid.Buffer; -using Apache.Qpid.Client.Protocol; - -namespace Apache.Qpid.Client.Transport -{ - /// <summary> - /// Responsible for reading and writing - /// ByteBuffers from/to network streams, and handling - /// the stream filters - /// </summary> - public class IoHandler : IByteChannel, IDisposable - { - private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler)); - private const int DEFAULT_BUFFER_SIZE = 32 * 1024; - - private Stream _topStream; - private IProtocolListener _protocolListener; - private int _readBufferSize; - - public int ReadBufferSize - { - get { return _readBufferSize; } - set { _readBufferSize = value; } - } - - /// <summary> - /// Initialize a new instance - /// </summary> - /// <param name="stream">Underlying network stream</param> - /// <param name="protocolListener">Protocol listener to report exceptions to</param> - public IoHandler(Stream stream, IProtocolListener protocolListener) - { - if ( stream == null ) - throw new ArgumentNullException("stream"); - if ( protocolListener == null ) - throw new ArgumentNullException("protocolListener"); - - // initially, the stream at the top of the filter - // chain is the underlying network stream - _topStream = stream; - _protocolListener = protocolListener; - _readBufferSize = DEFAULT_BUFFER_SIZE; - } - - /// <summary> - /// Adds a new filter on the top of the chain - /// </summary> - /// <param name="filter">Stream filter to put on top of the chain</param> - /// <remarks> - /// This should *only* be called during initialization. We don't - /// support changing the filter change after the first read/write - /// has been done and it's not thread-safe to boot! - /// </remarks> - public void AddFilter(IStreamFilter filter) - { - _topStream = filter.CreateFilterStream(_topStream); - } - - #region IByteChannel Implementation - // - // IByteChannel Implementation - // - - /// <summary> - /// Read a <see cref="ByteBuffer"/> from the underlying - /// network stream and any configured filters - /// </summary> - /// <returns>A ByteBuffer, if available</returns> - public ByteBuffer Read() - { - byte[] bytes = AllocateBuffer(); - - int numOctets = _topStream.Read(bytes, 0, bytes.Length); - - return WrapByteArray(bytes, numOctets); - } - - /// <summary> - /// Begin an asynchronous read operation - /// </summary> - /// <param name="callback">Callback method to call when read operation completes</param> - /// <param name="state">State object</param> - /// <returns>An <see cref="System.IAsyncResult"/> object</returns> - public IAsyncResult BeginRead(AsyncCallback callback, object state) - { - byte[] bytes = AllocateBuffer(); - ReadData rd = new ReadData(callback, state, bytes); - - // only put a callback if the caller wants one. - AsyncCallback myCallback = null; - if ( callback != null ) - myCallback = new AsyncCallback(OnAsyncReadDone); - - IAsyncResult result = _topStream.BeginRead( - bytes, 0, bytes.Length, myCallback,rd - ); - return new WrappedAsyncResult(result, bytes); - } - - /// <summary> - /// End an asynchronous read operation - /// </summary> - /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param> - /// <returns>The <see cref="ByteBuffer"/> read</returns> - public ByteBuffer EndRead(IAsyncResult result) - { - WrappedAsyncResult theResult = (WrappedAsyncResult)result; - int bytesRead = _topStream.EndRead(theResult.InnerResult); - return WrapByteArray(theResult.Buffer, bytesRead); - } - - /// <summary> - /// Write a <see cref="ByteBuffer"/> to the underlying network - /// stream, going through any configured filters - /// </summary> - /// <param name="buffer"></param> - public void Write(ByteBuffer buffer) - { - try - { - _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME - } - catch (Exception e) - { - _log.Warn("Write caused exception", e); - _protocolListener.OnException(e); - } - } - - /// <summary> - /// Begin an asynchronous write operation - /// </summary> - /// <param name="buffer">Buffer to write</param> - /// <param name="callback">A callback to call when the operation completes</param> - /// <param name="state">State object</param> - /// <returns>An <see cref="System.IAsyncResult"/> object</returns> - public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) - { - try - { - return _topStream.BeginWrite( - buffer.Array, buffer.Position, buffer.Limit, - callback, state - ); - } catch ( Exception e ) - { - _log.Error("BeginWrite caused exception", e); - // not clear if an exception here should be propagated? we still - // need to propagate it upwards anyway! - _protocolListener.OnException(e); - throw; - } - } - - /// <summary> - /// End an asynchronous write operation - /// </summary> - /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param> - public void EndWrite(IAsyncResult result) - { - try - { - _topStream.EndWrite(result); - } catch ( Exception e ) - { - _log.Error("EndWrite caused exception", e); - // not clear if an exception here should be propagated? - _protocolListener.OnException(e); - //throw; - } - } - #endregion // IByteChannel Implementation - - #region IDisposable Implementation - // - // IDisposable Implementation - // - - public void Dispose() - { - if ( _topStream != null ) - { - _topStream.Close(); - } - } - - #endregion // IDisposable Implementation - - #region Private and Helper Classes/Methods - // - // Private and Helper Classes/Methods - // - - private byte[] AllocateBuffer() - { - return new byte[ReadBufferSize]; - } - - private static ByteBuffer WrapByteArray(byte[] bytes, int size) - { - ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes); - byteBuffer.Limit = size; - byteBuffer.Flip(); - - return byteBuffer; - } - - - private static void OnAsyncReadDone(IAsyncResult result) - { - ReadData rd = (ReadData) result.AsyncState; - IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer); - rd.Callback(wrapped); - } - - class ReadData - { - private object _state; - private AsyncCallback _callback; - private byte[] _buffer; - - public object State - { - get { return _state; } - } - - public AsyncCallback Callback - { - get { return _callback; } - } - - public byte[] Buffer - { - get { return _buffer; } - } - - public ReadData(AsyncCallback callback, object state, byte[] buffer) - { - _callback = callback; - _state = state; - _buffer = buffer; - } - } - - class WrappedAsyncResult : IAsyncResult - { - private IAsyncResult _innerResult; - private byte[] _buffer; - - #region IAsyncResult Properties - // - // IAsyncResult Properties - // - public bool IsCompleted - { - get { return _innerResult.IsCompleted; } - } - - public WaitHandle AsyncWaitHandle - { - get { return _innerResult.AsyncWaitHandle; } - } - - public object AsyncState - { - get { return _innerResult.AsyncState; } - } - - public bool CompletedSynchronously - { - get { return _innerResult.CompletedSynchronously; } - } - #endregion // IAsyncResult Properties - - public IAsyncResult InnerResult - { - get { return _innerResult; } - } - public byte[] Buffer - { - get { return _buffer; } - } - - public WrappedAsyncResult(IAsyncResult result, byte[] buffer) - { - if ( result == null ) - throw new ArgumentNullException("result"); - if ( buffer == null ) - throw new ArgumentNullException("buffer"); - - _innerResult = result; - _buffer = buffer; - } - } - - #endregion // Private and Helper Classes/Methods - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs deleted file mode 100644 index 9fa313152f..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Codec; -using Apache.Qpid.Framing; -using log4net; - -namespace Apache.Qpid.Client.Transport -{ - /// <summary> - /// <see cref="IProtocolDecoderOutput"/> implementation that forwards - /// each <see cref="IDataBlock"/> as it is decoded to the - /// protocol listener - /// </summary> - internal class ProtocolDecoderOutput : IProtocolDecoderOutput - { - private IProtocolListener _protocolListener; - static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel"); - - public ProtocolDecoderOutput(IProtocolListener protocolListener) - { - if ( protocolListener == null ) - throw new ArgumentNullException("protocolListener"); - - _protocolListener = protocolListener; - } - - public void Write(object message) - { - IDataBlock block = message as IDataBlock; - if ( block != null ) - { - _protocolTraceLog.Debug(String.Format("READ {0}", block)); - _protocolListener.OnMessage(block); - } - } - } -} // namespace Apache.Qpid.Client.Transport - - diff --git a/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs deleted file mode 100644 index a1aa889ba0..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using Apache.Qpid.Buffer; -using Apache.Qpid.Codec; - -namespace Apache.Qpid.Client.Transport -{ - public class SingleProtocolEncoderOutput : IProtocolEncoderOutput - { - public ByteBuffer buffer; - - public void Write(ByteBuffer buf) - { - if (buffer != null) - { - throw new InvalidOperationException("{0} does not allow the writing of more than one buffer"); - } - buffer = buf; - } - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs deleted file mode 100644 index f336d8a80a..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Collections; -using System.IO; -using System.Threading; -using Apache.Qpid.Client.Qms; -using Apache.Qpid.Client.Protocol; -using Apache.Qpid.Codec; -using Apache.Qpid.Framing; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - /// <summary> - /// TCP Socket transport supporting both - /// SSL and non-SSL connections. - /// </summary> - public class BlockingSocketTransport : ITransport - { - // Configuration variables. - IProtocolListener _protocolListener; - - // Runtime variables. - private ISocketConnector _connector; - private IoHandler _ioHandler; - private AmqpChannel _amqpChannel; - private ManualResetEvent _stopEvent; - - public IProtocolWriter ProtocolWriter - { - get { return _amqpChannel; } - } - public string LocalEndpoint - { - get { return _connector.LocalEndpoint; } - } - - - /// <summary> - /// Connect to the specified broker - /// </summary> - /// <param name="broker">The broker to connect to</param> - /// <param name="connection">The AMQ connection</param> - public void Connect(IBrokerInfo broker, AMQConnection connection) - { - _stopEvent = new ManualResetEvent(false); - _protocolListener = connection.ProtocolListener; - - _ioHandler = MakeBrokerConnection(broker, connection); - // todo: get default read size from config! - - IProtocolDecoderOutput decoderOutput = - new ProtocolDecoderOutput(_protocolListener); - _amqpChannel = - new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput); - - // post an initial async read - _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this); - } - - /// <summary> - /// Close the broker connection - /// </summary> - public void Close() - { - StopReading(); - CloseBrokerConnection(); - } - - private void StopReading() - { - _stopEvent.Set(); - } - - private void CloseBrokerConnection() - { - if ( _ioHandler != null ) - { - _ioHandler.Dispose(); - _ioHandler = null; - } - if ( _connector != null ) - { - _connector.Dispose(); - _connector = null; - } - } - - private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection) - { - if ( broker.UseSSL ) - { - _connector = new SslSocketConnector(); - } else - { - _connector = new SocketConnector(); - } - - Stream stream = _connector.Connect(broker); - return new IoHandler(stream, connection.ProtocolListener); - } - - private void OnAsyncReadDone(IAsyncResult result) - { - try - { - _amqpChannel.EndRead(result); - - bool stopping = _stopEvent.WaitOne(0, false); - if ( !stopping ) - _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null); - } catch ( Exception e ) - { - // ignore any errors during closing - bool stopping = _stopEvent.WaitOne(0, false); - if ( !stopping ) - _protocolListener.OnException(e); - } - } - - #region IProtocolDecoderOutput Members - - public void Write(object message) - { - _protocolListener.OnMessage((IDataBlock)message); - } - - #endregion - } -} - - diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs deleted file mode 100644 index 4540f01f4e..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using log4net; -using Apache.Qpid.Buffer; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - class ByteChannel : IByteChannel - { - // Warning: don't use this log for regular logging. - private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel"); - - private IByteChannel _lowerChannel; - - public ByteChannel(IByteChannel lowerChannel) - { - _lowerChannel = lowerChannel; - } - - public ByteBuffer Read() - { - ByteBuffer result = _lowerChannel.Read(); - - // TODO: Move into decorator. - if (_ioTraceLog.IsDebugEnabled) - { - _ioTraceLog.Debug(String.Format("READ {0}", result)); - } - - return result; - } - - public IAsyncResult BeginRead(AsyncCallback callback, object state) - { - return _lowerChannel.BeginRead(callback, state); - } - - public ByteBuffer EndRead(IAsyncResult result) - { - ByteBuffer buffer = _lowerChannel.EndRead(result); - if ( _ioTraceLog.IsDebugEnabled ) - { - _ioTraceLog.Debug(String.Format("READ {0}", buffer)); - } - return buffer; - } - - public void Write(ByteBuffer buffer) - { - // TODO: Move into decorator. - if (_ioTraceLog.IsDebugEnabled) - { - _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); - } - - _lowerChannel.Write(buffer); - } - - public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state) - { - if ( _ioTraceLog.IsDebugEnabled ) - { - _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); - } - return _lowerChannel.BeginWrite(buffer, callback, state); - } - - public void EndWrite(IAsyncResult result) - { - _lowerChannel.EndWrite(result); - } - } -} diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs deleted file mode 100644 index 137fa19c0d..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.IO; -using Apache.Qpid.Client.Qms; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - interface ISocketConnector : IDisposable - { - string LocalEndpoint { get; } - Stream Connect(IBrokerInfo broker); - } -} - - diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs deleted file mode 100644 index b6dd8c3be1..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System.IO; -using System.Net; -using System.Net.Sockets; -using Apache.Qpid.Client.Qms; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - /// <summary> - /// Implements a TCP connection over regular sockets. - /// </summary> - class SocketConnector : ISocketConnector - { - private MyTcpClient _tcpClient; - - public string LocalEndpoint - { - get { return _tcpClient.LocalEndpoint.ToString(); } - } - - public Stream Connect(IBrokerInfo broker) - { - _tcpClient = new MyTcpClient(broker.Host, broker.Port); - return _tcpClient.GetStream(); - } - - public void Dispose() - { - if ( _tcpClient != null ) - { - _tcpClient.Close(); - _tcpClient = null; - } - } - - class MyTcpClient : TcpClient - { - public MyTcpClient(string host, int port) - : base(host, port) - { - } - - public EndPoint LocalEndpoint - { - get { return Client.LocalEndPoint; } - } - } - - } -} - - diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs deleted file mode 100644 index 8436e6fc4f..0000000000 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System.IO; -using System.Net; -using log4net; -using Apache.Qpid.Client.Qms; -using Org.Mentalis.Security.Ssl; -using MCertificate = Org.Mentalis.Security.Certificates.Certificate; -using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain; - -namespace Apache.Qpid.Client.Transport.Socket.Blocking -{ - /// <summary> - /// Implements a TLS v1.0 connection using the Mentalis.org library - /// </summary> - /// <remarks> - /// It would've been easier to implement this at the StreamFilter - /// level, but unfortunately the Mentalis library doesn't support - /// a passthrough SSL stream class and is tied directly - /// to socket-like classes. - /// </remarks> - class SslSocketConnector : ISocketConnector - { - private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector)); - private MyTcpClient _tcpClient; - - public string LocalEndpoint - { - get { return _tcpClient.LocalEndpoint.ToString(); } - } - - public Stream Connect(IBrokerInfo broker) - { - MCertificate cert = GetClientCert(broker); - SecurityOptions options = new SecurityOptions( - SecureProtocol.Tls1, cert, ConnectionEnd.Client - ); - if ( broker.SslOptions != null - && broker.SslOptions.IgnoreValidationErrors ) - { - _logger.Warn("Ignoring any certificate validation errors during SSL handshake..."); - options.VerificationType = CredentialVerification.None; - } - - _tcpClient = new MyTcpClient(broker.Host, broker.Port, options); - return _tcpClient.GetStream(); - } - - public void Dispose() - { - if ( _tcpClient != null ) - { - _tcpClient.Close(); - _tcpClient = null; - } - } - - private static MCertificate GetClientCert(IBrokerInfo broker) - { - // if a client certificate is configured, - // use that to enable mutual authentication - MCertificate cert = null; - if ( broker.SslOptions != null - && broker.SslOptions.ClientCertificate != null ) - { - cert = MCertificate.CreateFromX509Certificate( - broker.SslOptions.ClientCertificate - ); - _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true)); - } - return cert; - } - - class MyTcpClient : SecureTcpClient - { - public MyTcpClient(string host, int port, SecurityOptions options) - : base(host, port, options) - { - } - - public EndPoint LocalEndpoint - { - get { return Client.LocalEndPoint; } - } - - } - - } -} |