diff options
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/network')
15 files changed, 1832 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/Assembler.cs b/qpid/dotnet/client-010/client/transport/network/Assembler.cs new file mode 100644 index 0000000000..ff85f11c2f --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Assembler.cs @@ -0,0 +1,254 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using System.IO; +using org.apache.qpid.transport.codec; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// Assembler + /// </summary> + public delegate void Processor(INetworkDelegate ndelegate); + + public class Assembler : INetworkDelegate, IReceiver<ReceivedPayload<IProtocolEvent>> + { + private static readonly Logger log = Logger.Get(typeof (Assembler)); + private readonly Dictionary<int, List<byte[]>> segments; + private readonly Method[] incomplete; + [ThreadStatic] static MSDecoder _decoder; + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + public event EventHandler<ReceivedPayload<IProtocolEvent>> ReceivedEvent; + public event EventHandler Closed; + + + // Not in use : + public event EventHandler<ExceptionArgs> Exception; + + event EventHandler<ReceivedPayload<IProtocolEvent>> IReceiver<ReceivedPayload<IProtocolEvent>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedEvent += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedEvent -= value; + } + } + } + + public Assembler() + { + segments = new Dictionary<int, List<byte[]>>(); + incomplete = new Method[64*1024]; + } + + // Invoked when a network event is received + public void On_ReceivedEvent(object sender, ReceivedPayload<INetworkEvent> payload) + { + payload.Payload.ProcessNetworkEvent(this); + } + + #region Interface INetworkDelegate + + public void Init(ProtocolHeader header) + { + Emit(0, header); + } + + public void Error(ProtocolError error) + { + Emit(0, error); + } + + public void Frame(Frame frame) + { + MemoryStream segment; + if (frame.IsFirstFrame() && frame.IsLastFrame()) + { + byte[] tmp = new byte[frame.BodySize]; + frame.Body.Read(tmp, 0, tmp.Length); + segment = new MemoryStream(); + BinaryWriter w = new BinaryWriter(segment); + w.Write(tmp); + Assemble(frame, new MemoryStream(tmp)); + } + else + { + List<byte[]> frames; + if (frame.IsFirstFrame()) + { + frames = new List<byte[]>(); + SetSegment(frame, frames); + } + else + { + frames = GetSegment(frame); + } + byte[] tmp = new byte[frame.BodySize]; + frame.Body.Read(tmp, 0, tmp.Length); + frames.Add(tmp); + + if (frame.IsLastFrame()) + { + ClearSegment(frame); + segment = new MemoryStream(); + BinaryWriter w = new BinaryWriter(segment); + foreach (byte[] f in frames) + { + w.Write(f); + } + Assemble(frame, segment); + } + } + } + + #endregion + + #region Private Support Functions + + + private MSDecoder GetDecoder() + { + if( _decoder == null ) + { + _decoder = new MSDecoder(); + } + return _decoder; + } + + private void Assemble(Frame frame, MemoryStream segment) + { + MSDecoder decoder = GetDecoder(); + decoder.Init(segment); + int channel = frame.Channel; + Method command; + switch (frame.Type) + { + case SegmentType.CONTROL: + int controlType = decoder.ReadUint16(); + Method control = Method.Create(controlType); + control.Read(decoder); + Emit(channel, control); + break; + case SegmentType.COMMAND: + int commandType = decoder.ReadUint16(); + // read in the session header, right now we don't use it + decoder.ReadUint16(); + command = Method.Create(commandType); + command.Read(decoder); + if (command.HasPayload()) + { + incomplete[channel] = command; + } + else + { + Emit(channel, command); + } + break; + case SegmentType.HEADER: + command = incomplete[channel]; + List<Struct> structs = new List<Struct>(); + while (decoder.HasRemaining()) + { + structs.Add(decoder.ReadStruct32()); + } + command.Header = new Header(structs); + if (frame.IsLastSegment()) + { + incomplete[channel] = null; + Emit(channel, command); + } + break; + case SegmentType.BODY: + command = incomplete[channel]; + segment.Seek(0, SeekOrigin.Begin); + command.Body = segment; + incomplete[channel] = null; + Emit(channel, command); + break; + default: + throw new Exception("unknown frame type: " + frame.Type); + } + } + + private int SegmentKey(Frame frame) + { + return (frame.Track + 1)*frame.Channel; + } + + private List<byte[]> GetSegment(Frame frame) + { + return segments[SegmentKey(frame)]; + } + + private void SetSegment(Frame frame, List<byte[]> segment) + { + int key = SegmentKey(frame); + if (segments.ContainsKey(key)) + { + Error(new ProtocolError(network.Frame.L2, "segment in progress: %s", + frame)); + } + segments.Add(SegmentKey(frame), segment); + } + + private void ClearSegment(Frame frame) + { + segments.Remove(SegmentKey(frame)); + } + + // Emit a protocol event + private void Emit(int channel, IProtocolEvent protevent) + { + protevent.Channel = channel; + log.Debug("Assembler: protocol event:", protevent); + ReceivedPayload<IProtocolEvent> payload = new ReceivedPayload<IProtocolEvent>(); + payload.Payload = protevent; + + if (protevent is ConnectionCloseOk) + { + if (Closed != null) + Closed(this, EventArgs.Empty); + } + else + { + if (ReceivedEvent != null) + ReceivedEvent(this, payload); + else + log.Debug("No listener for event: {0}", protevent); + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/Disassembler.cs b/qpid/dotnet/client-010/client/transport/network/Disassembler.cs new file mode 100644 index 0000000000..3f0a6a8974 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Disassembler.cs @@ -0,0 +1,222 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using org.apache.qpid.transport.codec; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// Disassembler + /// </summary> + public sealed class Disassembler : ISender<IProtocolEvent>, IProtocolDelegate<Object> + { + private readonly IIoSender<MemoryStream> _sender; + private readonly int _maxPayload; + private readonly MemoryStream _header; + private readonly BinaryWriter _writer; + private readonly Object _sendlock = new Object(); + [ThreadStatic] static MSEncoder _encoder; + + + public Disassembler(IIoSender<MemoryStream> sender, int maxFrame) + { + if (maxFrame <= network.Frame.HEADER_SIZE || maxFrame >= 64*1024) + { + throw new Exception(String.Format("maxFrame must be > {0} and < 64K: ", network.Frame.HEADER_SIZE) + maxFrame); + } + _sender = sender; + _maxPayload = maxFrame - network.Frame.HEADER_SIZE; + _header = new MemoryStream(network.Frame.HEADER_SIZE); + _writer = new BinaryWriter(_header); + } + + #region Sender Interface + + public void Send(IProtocolEvent pevent) + { + pevent.ProcessProtocolEvent(null, this); + } + + public void Flush() + { + lock (_sendlock) + { + _sender.Flush(); + } + } + + public void Close() + { + lock (_sendlock) + { + _sender.Close(); + } + } + + #endregion + + #region ProtocolDelegate<Object> Interface + + public void Init(Object v, ProtocolHeader header) + { + lock (_sendlock) + { + _sender.Send(header.ToMemoryStream()); + _sender.Flush(); + } + } + + public void Control(Object v, Method method) + { + InvokeMethod(method, SegmentType.CONTROL); + } + + public void Command(Object v, Method method) + { + InvokeMethod(method, SegmentType.COMMAND); + } + + public void Error(Object v, ProtocolError error) + { + throw new Exception("Error: " + error); + } + + #endregion + + #region private + + private void Frame(byte flags, byte type, byte track, int channel, int size, MemoryStream buf) + { + lock (_sendlock) + { + _writer.Write(flags); + _writer.Write(type); + _writer.Write(ByteEncoder.GetBigEndian((UInt16)(size + network.Frame.HEADER_SIZE))); + _writer.Write((byte)0); + _writer.Write(track); + _writer.Write(ByteEncoder.GetBigEndian((UInt16)( channel))); + _writer.Write((byte)0); + _writer.Write((byte)0); + _writer.Write((byte)0); + _writer.Write((byte)0); + _sender.Send(_header); + _header.Seek(0, SeekOrigin.Begin); + _sender.Send(buf, size); + } + } + + private void Fragment(byte flags, SegmentType type, IProtocolEvent mevent, MemoryStream buf) + { + byte typeb = (byte) type; + byte track = mevent.EncodedTrack == network.Frame.L4 ? (byte) 1 : (byte) 0; + int remaining = (int) buf.Length; + buf.Seek(0, SeekOrigin.Begin); + bool first = true; + while (true) + { + int size = Math.Min(_maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= network.Frame.FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= network.Frame.LAST_FRAME; + } + + Frame(newflags, typeb, track, mevent.Channel, size, buf); + + if (remaining == 0) + { + break; + } + } + } + + private MSEncoder GetEncoder() + { + if( _encoder == null) + { + _encoder = new MSEncoder(4 * 1024); + } + return _encoder; + } + + private void InvokeMethod(Method method, SegmentType type) + { + MSEncoder encoder = GetEncoder(); + encoder.Init(); + encoder.WriteUint16(method.GetEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.Sync) + { + encoder.WriteUint16(0x0101); + } + else + { + encoder.WriteUint16(0x0100); + } + } + method.Write(_encoder); + MemoryStream methodSeg = encoder.Segment(); + + byte flags = network.Frame.FIRST_SEG; + + bool payload = method.HasPayload(); + if (!payload) + { + flags |= network.Frame.LAST_SEG; + } + + MemoryStream headerSeg = null; + if (payload) + { + Header hdr = method.Header; + Struct[] structs = hdr.Structs; + + foreach (Struct st in structs) + { + encoder.WriteStruct32(st); + } + headerSeg = encoder.Segment(); + } + + lock (_sendlock) + { + Fragment(flags, type, method, methodSeg); + if (payload) + { + Fragment( 0x0, SegmentType.HEADER, method, headerSeg); + Fragment(network.Frame.LAST_SEG, SegmentType.BODY, method, method.Body); + } + } + } + + #endregion + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/Frame.cs b/qpid/dotnet/client-010/client/transport/network/Frame.cs new file mode 100644 index 0000000000..b8ec36d8b6 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Frame.cs @@ -0,0 +1,143 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; + +namespace org.apache.qpid.transport.network +{ + public sealed class Frame : INetworkEvent + { + internal static int HEADER_SIZE = 12; + + // XXX: enums? + public const byte L1 = 0; + public const byte L2 = 1; + public const byte L3 = 2; + public const byte L4 = 3; + + public static byte RESERVED = 0x0; + + public static byte VERSION = 0x0; + + public static byte FIRST_SEG = 0x8; + public static byte LAST_SEG = 0x4; + public static byte FIRST_FRAME = 0x2; + public static byte LAST_FRAME = 0x1; + + private readonly byte flags; + private readonly SegmentType type; + private readonly byte track; + private readonly int channel; + private readonly MemoryStream body; + private int _bodySize; + + + public Frame(byte flags, SegmentType type, byte track, int channel, int bodySize, + MemoryStream body) + { + this.flags = flags; + this.type = type; + this.track = track; + this.channel = channel; + this.body = body; + _bodySize = bodySize; + } + + public int BodySize + { + get { return _bodySize; } + } + + public MemoryStream Body + { + get { return body; } + } + + public byte Flags + { + get { return flags; } + } + + public int Channel + { + get { return channel; } + } + + public int Size + { + get { return (int) body.Length;} + } + + public SegmentType Type + { + get { return type; } + } + + public byte Track + { + get { return track; } + } + + private bool Flag(byte mask) + { + return (flags & mask) != 0; + } + + public bool IsFirstSegment() + { + return Flag(FIRST_SEG); + } + + public bool IsLastSegment() + { + return Flag(LAST_SEG); + } + + public bool IsFirstFrame() + { + return Flag(FIRST_FRAME); + } + + public bool IsLastFrame() + { + return Flag(LAST_FRAME); + } + + #region INetworkEvent Methods + + public void ProcessNetworkEvent(INetworkDelegate ndelegate) + { + ndelegate.Frame(this); + } + + #endregion + + public override String ToString() + { + return String.Format + ("[{0:d} {1:d} {2:d} {3} {4}{5}{6}{7}] ", Channel, Size, Track, Type, + IsFirstSegment() ? 1 : 0, IsLastSegment() ? 1 : 0, + IsFirstFrame() ? 1 : 0, IsLastFrame() ? 1 : 0); + } + + + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/IIoSender.cs b/qpid/dotnet/client-010/client/transport/network/IIoSender.cs new file mode 100644 index 0000000000..747b5b9f98 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/IIoSender.cs @@ -0,0 +1,28 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.network +{ + public interface IIOSender<T>:Sender<T> + { + void send(T body, int siz); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/INetworkDelegate.cs b/qpid/dotnet/client-010/client/transport/network/INetworkDelegate.cs new file mode 100644 index 0000000000..9226adc2b7 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/INetworkDelegate.cs @@ -0,0 +1,40 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using ProtocolError = org.apache.qpid.transport.ProtocolError; +using ProtocolHeader = org.apache.qpid.transport.ProtocolHeader; +namespace org.apache.qpid.transport.network +{ + + + /// <summary> + /// NetworkDelegate + /// </summary> + + public interface INetworkDelegate + { + + void Init(ProtocolHeader header); + + void Frame(Frame frame); + + void Error(ProtocolError error); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/INetworkEvent.cs b/qpid/dotnet/client-010/client/transport/network/INetworkEvent.cs new file mode 100644 index 0000000000..e6f0d6fc8a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/INetworkEvent.cs @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport.network +{ + + /// <summary> + /// INetworkEvent + /// </summary> + + public interface INetworkEvent + { + void ProcessNetworkEvent(INetworkDelegate networkDelegate); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/InputHandler.cs b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs new file mode 100644 index 0000000000..c5d5f13727 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs @@ -0,0 +1,266 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// InputHandler + /// </summary> + public sealed class InputHandler : IReceiver<ReceivedPayload<INetworkEvent>> + { + public enum State + { + PROTO_HDR, + FRAME_HDR, + FRAME_BODY, + ERROR + } + + private static readonly Logger log = Logger.Get(typeof(InputHandler)); + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + public event EventHandler<ReceivedPayload<INetworkEvent>> ReceivedEvent; + public event EventHandler<ExceptionArgs> ExceptionProcessing; + + // Not in used... This even is never raised in the code => the application will block on Close() until the timeout is reached + public event EventHandler Closed; + + event EventHandler<ReceivedPayload<INetworkEvent>> IReceiver<ReceivedPayload<INetworkEvent>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedEvent += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedEvent -= value; + } + } + } + + event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<INetworkEvent>>.Exception + { + add + { + lock (m_objectLock) + { + ExceptionProcessing += value; + } + } + remove + { + lock (m_objectLock) + { + ExceptionProcessing -= value; + } + } + } + + private State state; + private MemoryStream input; + private int needed; + + private byte flags; + private SegmentType type; + private byte track; + private int channel; + + public InputHandler(State state) + { + this.state = state; + switch (state) + { + case State.PROTO_HDR: + needed = 8; + break; + case State.FRAME_HDR: + needed = Frame.HEADER_SIZE; + break; + } + } + + // The command listening for a buffer read. + public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload) + { + MemoryStream buf = payload.Payload; + int remaining = (int) buf.Length; + if( input != null ) + { + remaining += (int) input.Length; + } + try + { + while (remaining > 0) + { + if (remaining >= needed) + { + if (input != null) + { + byte[] tmp = new byte[buf.Length]; + buf.Read(tmp, 0, tmp.Length); + input.Write(tmp, 0, tmp.Length); + input.Seek(0, SeekOrigin.Begin); + buf = input; + } + int startPos = (int)buf.Position; + int consumed = needed; + state = Next(buf); + if ((buf.Position - startPos) < consumed) + { + buf.Seek(consumed - (buf.Position - startPos), SeekOrigin.Current); + } + remaining -= consumed; + input = null; + } + else + { + byte[] tmp; + if (input == null) + { + input = new MemoryStream(); + tmp = new byte[remaining]; + } + else + { + // this is a full buffer + tmp = new byte[buf.Length]; + } + buf.Read(tmp, 0, tmp.Length); + input.Write(tmp, 0, tmp.Length); + remaining = 0; + } + } + } + catch (Exception t) + { + Console.Write(t); + if (ExceptionProcessing != null) + { + ExceptionProcessing(this, new ExceptionArgs(t)); + } + } + } + + #region Private Support Functions + + private State Next(MemoryStream buf) + { + BinaryReader reader = new BinaryReader(buf); + + switch (state) + { + case State.PROTO_HDR: + char a = reader.ReadChar(); + char m = reader.ReadChar(); + char q = reader.ReadChar(); + char p = reader.ReadChar(); + if (a != 'A' && + m != 'M' && + q != 'Q' && + p != 'P') + { + Error("bad protocol header: {0}", buf.ToString()); + return State.ERROR; + } + reader.ReadByte(); + byte instance = reader.ReadByte(); + byte major = reader.ReadByte(); + byte minor = reader.ReadByte(); + Fire_NetworkEvent(new ProtocolHeader(instance, major, minor)); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + case State.FRAME_HDR: + reader = new BinaryReader(buf, Encoding.BigEndianUnicode); + flags = reader.ReadByte(); + type = SegmentTypeGetter.Get(reader.ReadByte()); // generated code + int size = reader.ReadUInt16(); + size = ByteEncoder.GetBigEndian((UInt16)size); + size -= Frame.HEADER_SIZE; + if (size < 0 || size > (64 * 1024 - 12)) + { + Error("bad frame size: {0:d}", size); + return State.ERROR; + } + reader.ReadByte(); + byte b = reader.ReadByte(); + if ((b & 0xF0) != 0) + { + Error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: {0}", b); + return State.ERROR; + } + track = (byte)(b & 0xF); + channel = reader.ReadUInt16(); + channel = ByteEncoder.GetBigEndian((UInt16)channel); + if (size == 0) + { + Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream())); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + } + needed = size; + return State.FRAME_BODY; + case State.FRAME_BODY: + Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf)); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + default: + if (ExceptionProcessing != null) + { + ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame"))); + } + throw new Exception("Error creating frame"); + } + } + + private void Error(String fmt, params Object[] args) + { + Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args)); + } + + private void Fire_NetworkEvent(INetworkEvent netevent) + { + log.Debug("InputHandler: network event:", netevent); + ReceivedPayload<INetworkEvent> payload = new ReceivedPayload<INetworkEvent>(); + payload.Payload = netevent; + if (ReceivedEvent != null) + { + ReceivedEvent(this, payload); + } + else + { + log.Debug("Nobody listening for event: {0}"); + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs b/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs new file mode 100644 index 0000000000..69598a43e8 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs @@ -0,0 +1,40 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using ProtocolError = org.apache.qpid.transport.ProtocolError; +using ProtocolHeader = org.apache.qpid.transport.ProtocolHeader; +namespace org.apache.qpid.transport.network +{ + + + /// <summary> + /// NetworkDelegate + /// </summary> + + public interface NetworkDelegate + { + + void Init(ProtocolHeader header); + + void Frame(Frame frame); + + void Error(ProtocolError error); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs b/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs new file mode 100644 index 0000000000..e5ac6de93a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport.network +{ + + /// <summary> + /// NetworkEvent + /// </summary> + + public interface NetworkEvent + { + void ProcessNetworkEvent(NetworkDelegate networkDelegate); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/io/IIoSender.cs b/qpid/dotnet/client-010/client/transport/network/io/IIoSender.cs new file mode 100644 index 0000000000..acc7724a06 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IIoSender.cs @@ -0,0 +1,28 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.network +{ + public interface IIoSender<T>:ISender<T> + { + void Send(T body, int siz); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs b/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs new file mode 100644 index 0000000000..41a09e7079 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System.IO; +using System.Net.Sockets; + +namespace org.apache.qpid.transport.network.io +{ + public interface IIoTransport + { + Connection Connection + { + get; + set; + } + + IReceiver<ReceivedPayload<MemoryStream>> Receiver + { + get; + set; + } + + IoSender Sender + { + get; + set; + } + + + Stream Stream + { + get; + set; + } + + TcpClient Socket + { + get; + set; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs new file mode 100644 index 0000000000..b60444fa29 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs @@ -0,0 +1,185 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Threading; +using Logger = org.apache.qpid.transport.util.Logger; + + +namespace org.apache.qpid.transport.network.io +{ + /// <summary> + /// IoReceiver + /// </summary> + public sealed class IoReceiver : IReceiver<ReceivedPayload<MemoryStream>> + { + private static readonly Logger log = Logger.Get(typeof(IoReceiver)); + private readonly int m_bufferSize; + private readonly Stream m_bufStream; + private readonly int m_timeout; + private readonly Thread m_thread; + private bool m_closed; + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + event EventHandler<ReceivedPayload<MemoryStream>> ReceivedBuffer; + event EventHandler<ExceptionArgs> ExceptionReading; + event EventHandler ReceiverClosed; + + event EventHandler<ReceivedPayload<MemoryStream>> IReceiver<ReceivedPayload<MemoryStream>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedBuffer += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedBuffer -= value; + } + } + } + + event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<MemoryStream>>.Exception + { + add + { + lock (m_objectLock) + { + ExceptionReading += value; + } + } + remove + { + lock (m_objectLock) + { + ExceptionReading -= value; + } + } + } + + event EventHandler IReceiver<ReceivedPayload<MemoryStream>>.Closed + { + add + { + lock (m_objectLock) + { + ReceiverClosed += value; + } + } + remove + { + lock (m_objectLock) + { + ReceiverClosed -= value; + } + } + } + + public IoReceiver(Stream stream, int bufferSize, int timeout) + { + m_bufferSize = bufferSize; + m_bufStream = stream; + m_timeout = timeout; + m_thread = new Thread(Go); + m_thread.Name = String.Format("IoReceiver - {0}", stream); + m_thread.IsBackground = true; + m_thread.Start(); + } + + public void Close() + { + Mutex mut = new Mutex(); + mut.WaitOne(); + if (!m_closed) + { + m_closed = true; + try + { + log.Debug("Receiver closing"); + m_bufStream.Close(); + m_thread.Join(m_timeout); + if (m_thread.IsAlive) + { + throw new TransportException("join timed out"); + } + } + catch (ThreadInterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + mut.ReleaseMutex(); + } + + void Go() + { + // create a BufferedStream on top of the NetworkStream. + int threshold = m_bufferSize/2; + byte[] buffer = new byte[m_bufferSize]; + try + { + int read; + int offset = 0; + ReceivedPayload<MemoryStream> payload = new ReceivedPayload<MemoryStream>(); + while ((read = m_bufStream.Read(buffer, offset, m_bufferSize - offset)) > 0) + { + MemoryStream memStream = new MemoryStream(buffer, offset, read); + if (ReceivedBuffer != null) + { + // call the event + payload.Payload = memStream; + ReceivedBuffer(this, payload); + } + offset += read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[m_bufferSize]; + } + } + log.Debug("Receiver thread terminating"); + } + catch (Exception t) + { + if (ExceptionReading != null) + { + ExceptionReading(this, new ExceptionArgs(t)); + } + } + finally + { + if (ReceiverClosed != null) + { + ReceiverClosed(this, new EventArgs()); + } + } + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs b/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs new file mode 100644 index 0000000000..b6c7940a1d --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs @@ -0,0 +1,227 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; + +using org.apache.qpid.transport.util; +using org.apache.qpid.client; + +namespace org.apache.qpid.transport.network.io +{ + public sealed class IoSSLTransport : IIoTransport + { + // constants + private const int DEFAULT_READ_WRITE_BUFFER_SIZE = 64*1024; + private const int TIMEOUT = 60000; + private const int QUEUE_SIZE = 1000; + // props + private static readonly Logger log = Logger.Get(typeof (IoSSLTransport)); + private Stream m_stream; + private IoSender m_sender; + private IReceiver<ReceivedPayload<MemoryStream>> m_receiver; + private TcpClient m_socket; + private Connection m_con; + private readonly bool _rejectUntrusted; + + public static Connection Connect(String host, int port, String mechanism, X509Certificate certificate, bool rejectUntrusted, Client client) + { + ClientConnectionDelegate connectionDelegate = new ClientConnectionDelegate(client, string.Empty, string.Empty, mechanism); + ManualResetEvent negotiationComplete = new ManualResetEvent(true); + connectionDelegate.SetCondition(negotiationComplete); + connectionDelegate.VirtualHost = string.Empty; + + IIoTransport transport = new IoSSLTransport(host, port, certificate, rejectUntrusted, connectionDelegate); + + Connection _conn = transport.Connection; + _conn.Send(new ProtocolHeader(1, 0, 10)); + negotiationComplete.WaitOne(); + + if (connectionDelegate.Exception != null) + throw connectionDelegate.Exception; + + connectionDelegate.SetCondition(null); + + return _conn; + } + + public static Connection Connect(String host, int port, String virtualHost, String mechanism, string serverName, string certPath, String certPass, bool rejectUntrusted, Client client) + { + // create certificate object based on whether or not password is null + X509Certificate cert; + if (certPass != null) + { + cert = new X509Certificate2(certPath, certPass); + } + else + { + cert = X509Certificate.CreateFromCertFile(certPath); + } + + return Connect(host, port, mechanism, cert, rejectUntrusted, client); + } + + public IoSSLTransport(String host, int port, X509Certificate certificate, bool rejectUntrusted, ConnectionDelegate conndel) + { + _rejectUntrusted = rejectUntrusted; + CreateSocket(host, port); + CreateSSLStream(host, Socket, certificate); + Sender = new IoSender(this, QUEUE_SIZE, TIMEOUT); + Receiver = new IoReceiver(Stream, Socket.ReceiveBufferSize*2, TIMEOUT); + Assembler assembler = new Assembler(); + InputHandler inputHandler = new InputHandler(InputHandler.State.PROTO_HDR); + Connection = new Connection(assembler, new Disassembler(Sender, 64*1024 - 1), conndel); + // Input handler listen to Receiver events + Receiver.Received += inputHandler.On_ReceivedBuffer; + // Assembler listen to inputhandler events + inputHandler.ReceivedEvent += assembler.On_ReceivedEvent; + // Connection listen to asembler protocol event + Receiver.Closed += Connection.On_ReceivedClosed; + assembler.Closed += Connection.On_ReceivedClosed; + Receiver.Exception += Connection.On_ReceivedException; + inputHandler.ExceptionProcessing += Connection.On_ReceivedException; + assembler.ReceivedEvent += Connection.On_ReceivedEvent; + } + + public Connection Connection + { + get { return m_con; } + set { m_con = value; } + } + + public IReceiver<ReceivedPayload<MemoryStream>> Receiver + { + get { return m_receiver; } + set { m_receiver = value; } + } + + public IoSender Sender + { + get { return m_sender; } + set { m_sender = value; } + } + + + public Stream Stream + { + get { return m_stream; } + set { m_stream = value; } + } + + public TcpClient Socket + { + get { return m_socket; } + set { m_socket = value; } + } + + #region Private Support Functions + + private void CreateSocket(String host, int port) + { + TcpClient socket; + try + { + socket = new TcpClient(); + String noDelay = Environment.GetEnvironmentVariable("qpid.tcpNoDelay"); + String writeBufferSize = Environment.GetEnvironmentVariable("qpid.writeBufferSize"); + String readBufferSize = Environment.GetEnvironmentVariable("qpid.readBufferSize"); + socket.NoDelay = noDelay != null && bool.Parse(noDelay); + socket.ReceiveBufferSize = readBufferSize == null + ? DEFAULT_READ_WRITE_BUFFER_SIZE + : int.Parse(readBufferSize); + socket.SendBufferSize = writeBufferSize == null + ? DEFAULT_READ_WRITE_BUFFER_SIZE + : int.Parse(writeBufferSize); + + log.Debug("NoDelay : {0}", socket.NoDelay); + log.Debug("ReceiveBufferSize : {0}", socket.ReceiveBufferSize); + log.Debug("SendBufferSize : {0}", socket.SendBufferSize); + log.Debug("Openning connection with host : {0}; port: {1}", host, port); + + socket.Connect(host, port); + Socket = socket; + } + catch (Exception e) + { + throw new TransportException(string.Format("Error connecting to broker: {0}", e.Message)); + } + } + + private void CreateSSLStream(String host, TcpClient socket, X509Certificate certificate) + { + try + { + //Initializes a new instance of the SslStream class using the specified Stream, stream closure behavior, certificate validation delegate and certificate selection delegate + SslStream sslStream = new SslStream(socket.GetStream(), false, ValidateServerCertificate, LocalCertificateSelection); + + X509CertificateCollection certCol = new X509CertificateCollection(); + certCol.Add(certificate); + + sslStream.AuthenticateAsClient(host, certCol, SslProtocols.Default, true); + Stream = sslStream; + } + catch (AuthenticationException e) + { + log.Warn("Exception: {0}", e.Message); + if (e.InnerException != null) + { + log.Warn("Inner exception: {0}", e.InnerException.Message); + e = new AuthenticationException(e.InnerException.Message, e.InnerException); + } + socket.Close(); + throw new TransportException(string.Format("Authentication failed, closing connection to broker: {0}", e.Message)); + } + } + + // The following method is invoked by the RemoteCertificateValidationDelegate. + public bool ValidateServerCertificate( + object sender, + X509Certificate certificate, + X509Chain chain, + SslPolicyErrors sslPolicyErrors) + { + bool result = true; + if (sslPolicyErrors != SslPolicyErrors.None && _rejectUntrusted ) + { + log.Warn("Certificate error: {0}", sslPolicyErrors); + // Do not allow this client to communicate with unauthenticated servers. + result = false; + } + return result; + } + + public X509Certificate LocalCertificateSelection( + Object sender, + string targetHost, + X509CertificateCollection localCertificates, + X509Certificate remoteCertificate, + string[] acceptableIssuers + ) + { + // used to be return null; in the original version + return localCertificates[0]; + } + + #endregion + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs new file mode 100644 index 0000000000..025b782a12 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs @@ -0,0 +1,137 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Threading; +using common.org.apache.qpid.transport.util; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network.io +{ + public sealed class IoSender : IIoSender<MemoryStream> + { + private static readonly Logger log = Logger.Get(typeof (IoReceiver)); + private readonly IIoTransport ioTransport; + private readonly Stream bufStream; + private bool closed; + private readonly Mutex mutClosed = new Mutex(); + private readonly CircularBuffer<byte[]> queue; + private readonly Thread thread; + private readonly int timeout; + private readonly MemoryStream _tobeSent = new MemoryStream(); + public IoSender(IIoTransport transport, int queueSize, int timeout) + { + this.timeout = timeout; + ioTransport = transport; + bufStream = transport.Stream; + queue = new CircularBuffer<byte[]>(queueSize); + thread = new Thread(Go); + log.Debug("Creating IoSender thread"); + thread.Name = String.Format("IoSender - {0}", transport.Socket) ; + thread.IsBackground = true; + thread.Start(); + } + + public void Send(MemoryStream str) + { + int pos = (int) str.Position; + str.Seek(0, SeekOrigin.Begin); + Send(str, pos); + } + + public void Send(MemoryStream str, int size) + { + mutClosed.WaitOne(); + if (closed) + { + throw new TransportException("sender is Closed"); + } + mutClosed.ReleaseMutex(); + byte[] buf = new byte[size]; + str.Read(buf, 0, size); + _tobeSent.Write(buf, 0, size); + } + + public void Flush() + { + int length = (int)_tobeSent.Position; + byte[] buf = new byte[length]; + _tobeSent.Seek(0, SeekOrigin.Begin); + _tobeSent.Read(buf, 0, length); + queue.Enqueue(buf); + // bufStream.Write(buf, 0, length); + // _tobeSent = new MemoryStream(); + // _writer.Write(buf, 0, length); + // _writer.Flush(); + _tobeSent.Seek(0, SeekOrigin.Begin); + } + + public void Close() + { + log.Debug("Closing Sender"); + mutClosed.WaitOne(); + if (!closed) + { + try + { + closed = true; + queue.Close(); + thread.Join(timeout); + if (thread.IsAlive) + { + throw new TransportException("join timed out"); + } + } + catch (ThreadInterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + mutClosed.ReleaseMutex(); + } + + private void Go() + { + while (! closed) + { + //MemoryStream st = queue.Dequeue(); + byte[] st = queue.Dequeue(); + if (st != null) + { + try + { + // int length = (int) st.Length; + // byte[] buf = new byte[length]; + // st.Read(buf, 0, length); + bufStream.Write(st, 0, st.Length); + } + catch (Exception e) + { + closed = true; + ioTransport.Connection.On_ReceivedException(this, new ExceptionArgs(e)); + } + } + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs b/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs new file mode 100644 index 0000000000..483e5428b8 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs @@ -0,0 +1,141 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Net.Sockets; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network.io +{ + /// <summary> + /// This class provides a socket based transport using sync io classes. + /// + /// The following params are configurable via JVM arguments + /// TCP_NO_DELAY - qpid.tcpNoDelay + /// SO_RCVBUF - qpid.readBufferSize + /// SO_SNDBUF - qpid.writeBufferSize + /// </summary> + public sealed class IoTransport : IIoTransport + { + // constants + private const int DEFAULT_READ_WRITE_BUFFER_SIZE = 64*1024; + private const int TIMEOUT = 60000; + private const int QUEUE_SIZE = 1000; + // props + private static readonly Logger log = Logger.Get(typeof (IoTransport)); + private Stream m_stream; + private IoSender m_sender; + private IReceiver<ReceivedPayload<MemoryStream>> m_receiver; + private TcpClient m_socket; + private Connection m_con; + + public static Connection Connect(String host, int port, ConnectionDelegate conndel) + { + IoTransport transport = new IoTransport(host, port, conndel); + return transport.Connection; + } + + public IoTransport(String host, int port, ConnectionDelegate conndel) + { + CreateSocket(host, port); + Sender = new IoSender(this, QUEUE_SIZE, TIMEOUT); + Receiver = new IoReceiver(Stream, Socket.ReceiveBufferSize * 2, TIMEOUT); + Assembler assembler = new Assembler(); + InputHandler inputHandler = new InputHandler(InputHandler.State.PROTO_HDR); + Connection = new Connection(assembler, new Disassembler(Sender, 64 * 1024 - 1), conndel); + // Input handler listen to Receiver events + Receiver.Received += inputHandler.On_ReceivedBuffer; + // Assembler listen to inputhandler events + inputHandler.ReceivedEvent += assembler.On_ReceivedEvent; + // Connection listen to asembler protocol event + Receiver.Closed += Connection.On_ReceivedClosed; + assembler.Closed += Connection.On_ReceivedClosed; + Receiver.Exception += Connection.On_ReceivedException; + inputHandler.ExceptionProcessing += Connection.On_ReceivedException; + assembler.ReceivedEvent += Connection.On_ReceivedEvent; + } + + public Connection Connection + { + get { return m_con; } + set { m_con = value; } + } + + public IReceiver<ReceivedPayload<MemoryStream>> Receiver + { + get { return m_receiver; } + set { m_receiver = value; } + } + + public IoSender Sender + { + get { return m_sender; } + set { m_sender = value; } + } + + + public Stream Stream + { + get { return m_stream; } + set { m_stream = value; } + } + + public TcpClient Socket + { + get { return m_socket; } + set { m_socket = value; } + } + + #region Private Support Functions + + private void CreateSocket(String host, int port) + { + try + { + TcpClient socket = new TcpClient(); + String noDelay = Environment.GetEnvironmentVariable("qpid.tcpNoDelay"); + String writeBufferSize = Environment.GetEnvironmentVariable("qpid.writeBufferSize"); + String readBufferSize = Environment.GetEnvironmentVariable("qpid.readBufferSize"); + socket.NoDelay = noDelay != null && bool.Parse(noDelay); + socket.ReceiveBufferSize = readBufferSize == null ? DEFAULT_READ_WRITE_BUFFER_SIZE : int.Parse(readBufferSize); + socket.SendBufferSize = writeBufferSize == null ? DEFAULT_READ_WRITE_BUFFER_SIZE : int.Parse(writeBufferSize); + + log.Debug("NoDelay : {0}", socket.NoDelay); + log.Debug("ReceiveBufferSize : {0}", socket.ReceiveBufferSize); + log.Debug("SendBufferSize : {0}", socket.SendBufferSize); + log.Debug("Openning connection with host : {0}; port: {1}", host, port); + + socket.Connect(host, port); + Socket = socket; + Stream = socket.GetStream(); + } + catch (SocketException e) + { + Console.WriteLine(e.StackTrace); + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + } + + #endregion + } +}
\ No newline at end of file |