diff options
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/network/InputHandler.cs')
-rw-r--r-- | qpid/dotnet/client-010/client/transport/network/InputHandler.cs | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/InputHandler.cs b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs new file mode 100644 index 0000000000..c5d5f13727 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs @@ -0,0 +1,266 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// InputHandler + /// </summary> + public sealed class InputHandler : IReceiver<ReceivedPayload<INetworkEvent>> + { + public enum State + { + PROTO_HDR, + FRAME_HDR, + FRAME_BODY, + ERROR + } + + private static readonly Logger log = Logger.Get(typeof(InputHandler)); + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + public event EventHandler<ReceivedPayload<INetworkEvent>> ReceivedEvent; + public event EventHandler<ExceptionArgs> ExceptionProcessing; + + // Not in used... This even is never raised in the code => the application will block on Close() until the timeout is reached + public event EventHandler Closed; + + event EventHandler<ReceivedPayload<INetworkEvent>> IReceiver<ReceivedPayload<INetworkEvent>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedEvent += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedEvent -= value; + } + } + } + + event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<INetworkEvent>>.Exception + { + add + { + lock (m_objectLock) + { + ExceptionProcessing += value; + } + } + remove + { + lock (m_objectLock) + { + ExceptionProcessing -= value; + } + } + } + + private State state; + private MemoryStream input; + private int needed; + + private byte flags; + private SegmentType type; + private byte track; + private int channel; + + public InputHandler(State state) + { + this.state = state; + switch (state) + { + case State.PROTO_HDR: + needed = 8; + break; + case State.FRAME_HDR: + needed = Frame.HEADER_SIZE; + break; + } + } + + // The command listening for a buffer read. + public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload) + { + MemoryStream buf = payload.Payload; + int remaining = (int) buf.Length; + if( input != null ) + { + remaining += (int) input.Length; + } + try + { + while (remaining > 0) + { + if (remaining >= needed) + { + if (input != null) + { + byte[] tmp = new byte[buf.Length]; + buf.Read(tmp, 0, tmp.Length); + input.Write(tmp, 0, tmp.Length); + input.Seek(0, SeekOrigin.Begin); + buf = input; + } + int startPos = (int)buf.Position; + int consumed = needed; + state = Next(buf); + if ((buf.Position - startPos) < consumed) + { + buf.Seek(consumed - (buf.Position - startPos), SeekOrigin.Current); + } + remaining -= consumed; + input = null; + } + else + { + byte[] tmp; + if (input == null) + { + input = new MemoryStream(); + tmp = new byte[remaining]; + } + else + { + // this is a full buffer + tmp = new byte[buf.Length]; + } + buf.Read(tmp, 0, tmp.Length); + input.Write(tmp, 0, tmp.Length); + remaining = 0; + } + } + } + catch (Exception t) + { + Console.Write(t); + if (ExceptionProcessing != null) + { + ExceptionProcessing(this, new ExceptionArgs(t)); + } + } + } + + #region Private Support Functions + + private State Next(MemoryStream buf) + { + BinaryReader reader = new BinaryReader(buf); + + switch (state) + { + case State.PROTO_HDR: + char a = reader.ReadChar(); + char m = reader.ReadChar(); + char q = reader.ReadChar(); + char p = reader.ReadChar(); + if (a != 'A' && + m != 'M' && + q != 'Q' && + p != 'P') + { + Error("bad protocol header: {0}", buf.ToString()); + return State.ERROR; + } + reader.ReadByte(); + byte instance = reader.ReadByte(); + byte major = reader.ReadByte(); + byte minor = reader.ReadByte(); + Fire_NetworkEvent(new ProtocolHeader(instance, major, minor)); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + case State.FRAME_HDR: + reader = new BinaryReader(buf, Encoding.BigEndianUnicode); + flags = reader.ReadByte(); + type = SegmentTypeGetter.Get(reader.ReadByte()); // generated code + int size = reader.ReadUInt16(); + size = ByteEncoder.GetBigEndian((UInt16)size); + size -= Frame.HEADER_SIZE; + if (size < 0 || size > (64 * 1024 - 12)) + { + Error("bad frame size: {0:d}", size); + return State.ERROR; + } + reader.ReadByte(); + byte b = reader.ReadByte(); + if ((b & 0xF0) != 0) + { + Error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: {0}", b); + return State.ERROR; + } + track = (byte)(b & 0xF); + channel = reader.ReadUInt16(); + channel = ByteEncoder.GetBigEndian((UInt16)channel); + if (size == 0) + { + Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream())); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + } + needed = size; + return State.FRAME_BODY; + case State.FRAME_BODY: + Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf)); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + default: + if (ExceptionProcessing != null) + { + ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame"))); + } + throw new Exception("Error creating frame"); + } + } + + private void Error(String fmt, params Object[] args) + { + Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args)); + } + + private void Fire_NetworkEvent(INetworkEvent netevent) + { + log.Debug("InputHandler: network event:", netevent); + ReceivedPayload<INetworkEvent> payload = new ReceivedPayload<INetworkEvent>(); + payload.Payload = netevent; + if (ReceivedEvent != null) + { + ReceivedEvent(this, payload); + } + else + { + log.Debug("Nobody listening for event: {0}"); + } + } + + #endregion + } +}
\ No newline at end of file |