summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/network/InputHandler.cs')
-rw-r--r--qpid/dotnet/client-010/client/transport/network/InputHandler.cs266
1 files changed, 266 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/InputHandler.cs b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
new file mode 100644
index 0000000000..c5d5f13727
--- /dev/null
+++ b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
@@ -0,0 +1,266 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using System.Text;
+using org.apache.qpid.transport.util;
+
+namespace org.apache.qpid.transport.network
+{
+ /// <summary>
+ /// InputHandler
+ /// </summary>
+ public sealed class InputHandler : IReceiver<ReceivedPayload<INetworkEvent>>
+ {
+ public enum State
+ {
+ PROTO_HDR,
+ FRAME_HDR,
+ FRAME_BODY,
+ ERROR
+ }
+
+ private static readonly Logger log = Logger.Get(typeof(InputHandler));
+ private readonly Object m_objectLock = new object();
+
+ // the event raised when a buffer is read from the wire
+ public event EventHandler<ReceivedPayload<INetworkEvent>> ReceivedEvent;
+ public event EventHandler<ExceptionArgs> ExceptionProcessing;
+
+ // Not in used... This even is never raised in the code => the application will block on Close() until the timeout is reached
+ public event EventHandler Closed;
+
+ event EventHandler<ReceivedPayload<INetworkEvent>> IReceiver<ReceivedPayload<INetworkEvent>>.Received
+ {
+ add
+ {
+ lock (m_objectLock)
+ {
+ ReceivedEvent += value;
+ }
+ }
+ remove
+ {
+ lock (m_objectLock)
+ {
+ ReceivedEvent -= value;
+ }
+ }
+ }
+
+ event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<INetworkEvent>>.Exception
+ {
+ add
+ {
+ lock (m_objectLock)
+ {
+ ExceptionProcessing += value;
+ }
+ }
+ remove
+ {
+ lock (m_objectLock)
+ {
+ ExceptionProcessing -= value;
+ }
+ }
+ }
+
+ private State state;
+ private MemoryStream input;
+ private int needed;
+
+ private byte flags;
+ private SegmentType type;
+ private byte track;
+ private int channel;
+
+ public InputHandler(State state)
+ {
+ this.state = state;
+ switch (state)
+ {
+ case State.PROTO_HDR:
+ needed = 8;
+ break;
+ case State.FRAME_HDR:
+ needed = Frame.HEADER_SIZE;
+ break;
+ }
+ }
+
+ // The command listening for a buffer read.
+ public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload)
+ {
+ MemoryStream buf = payload.Payload;
+ int remaining = (int) buf.Length;
+ if( input != null )
+ {
+ remaining += (int) input.Length;
+ }
+ try
+ {
+ while (remaining > 0)
+ {
+ if (remaining >= needed)
+ {
+ if (input != null)
+ {
+ byte[] tmp = new byte[buf.Length];
+ buf.Read(tmp, 0, tmp.Length);
+ input.Write(tmp, 0, tmp.Length);
+ input.Seek(0, SeekOrigin.Begin);
+ buf = input;
+ }
+ int startPos = (int)buf.Position;
+ int consumed = needed;
+ state = Next(buf);
+ if ((buf.Position - startPos) < consumed)
+ {
+ buf.Seek(consumed - (buf.Position - startPos), SeekOrigin.Current);
+ }
+ remaining -= consumed;
+ input = null;
+ }
+ else
+ {
+ byte[] tmp;
+ if (input == null)
+ {
+ input = new MemoryStream();
+ tmp = new byte[remaining];
+ }
+ else
+ {
+ // this is a full buffer
+ tmp = new byte[buf.Length];
+ }
+ buf.Read(tmp, 0, tmp.Length);
+ input.Write(tmp, 0, tmp.Length);
+ remaining = 0;
+ }
+ }
+ }
+ catch (Exception t)
+ {
+ Console.Write(t);
+ if (ExceptionProcessing != null)
+ {
+ ExceptionProcessing(this, new ExceptionArgs(t));
+ }
+ }
+ }
+
+ #region Private Support Functions
+
+ private State Next(MemoryStream buf)
+ {
+ BinaryReader reader = new BinaryReader(buf);
+
+ switch (state)
+ {
+ case State.PROTO_HDR:
+ char a = reader.ReadChar();
+ char m = reader.ReadChar();
+ char q = reader.ReadChar();
+ char p = reader.ReadChar();
+ if (a != 'A' &&
+ m != 'M' &&
+ q != 'Q' &&
+ p != 'P')
+ {
+ Error("bad protocol header: {0}", buf.ToString());
+ return State.ERROR;
+ }
+ reader.ReadByte();
+ byte instance = reader.ReadByte();
+ byte major = reader.ReadByte();
+ byte minor = reader.ReadByte();
+ Fire_NetworkEvent(new ProtocolHeader(instance, major, minor));
+ needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ case State.FRAME_HDR:
+ reader = new BinaryReader(buf, Encoding.BigEndianUnicode);
+ flags = reader.ReadByte();
+ type = SegmentTypeGetter.Get(reader.ReadByte()); // generated code
+ int size = reader.ReadUInt16();
+ size = ByteEncoder.GetBigEndian((UInt16)size);
+ size -= Frame.HEADER_SIZE;
+ if (size < 0 || size > (64 * 1024 - 12))
+ {
+ Error("bad frame size: {0:d}", size);
+ return State.ERROR;
+ }
+ reader.ReadByte();
+ byte b = reader.ReadByte();
+ if ((b & 0xF0) != 0)
+ {
+ Error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: {0}", b);
+ return State.ERROR;
+ }
+ track = (byte)(b & 0xF);
+ channel = reader.ReadUInt16();
+ channel = ByteEncoder.GetBigEndian((UInt16)channel);
+ if (size == 0)
+ {
+ Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream()));
+ needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ }
+ needed = size;
+ return State.FRAME_BODY;
+ case State.FRAME_BODY:
+ Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf));
+ needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ default:
+ if (ExceptionProcessing != null)
+ {
+ ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame")));
+ }
+ throw new Exception("Error creating frame");
+ }
+ }
+
+ private void Error(String fmt, params Object[] args)
+ {
+ Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args));
+ }
+
+ private void Fire_NetworkEvent(INetworkEvent netevent)
+ {
+ log.Debug("InputHandler: network event:", netevent);
+ ReceivedPayload<INetworkEvent> payload = new ReceivedPayload<INetworkEvent>();
+ payload.Payload = netevent;
+ if (ReceivedEvent != null)
+ {
+ ReceivedEvent(this, payload);
+ }
+ else
+ {
+ log.Debug("Nobody listening for event: {0}");
+ }
+ }
+
+ #endregion
+ }
+} \ No newline at end of file