diff options
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/network/Assembler.cs')
-rw-r--r-- | qpid/dotnet/client-010/client/transport/network/Assembler.cs | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/network/Assembler.cs b/qpid/dotnet/client-010/client/transport/network/Assembler.cs new file mode 100644 index 0000000000..ff85f11c2f --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Assembler.cs @@ -0,0 +1,254 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using System.IO; +using org.apache.qpid.transport.codec; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// Assembler + /// </summary> + public delegate void Processor(INetworkDelegate ndelegate); + + public class Assembler : INetworkDelegate, IReceiver<ReceivedPayload<IProtocolEvent>> + { + private static readonly Logger log = Logger.Get(typeof (Assembler)); + private readonly Dictionary<int, List<byte[]>> segments; + private readonly Method[] incomplete; + [ThreadStatic] static MSDecoder _decoder; + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + public event EventHandler<ReceivedPayload<IProtocolEvent>> ReceivedEvent; + public event EventHandler Closed; + + + // Not in use : + public event EventHandler<ExceptionArgs> Exception; + + event EventHandler<ReceivedPayload<IProtocolEvent>> IReceiver<ReceivedPayload<IProtocolEvent>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedEvent += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedEvent -= value; + } + } + } + + public Assembler() + { + segments = new Dictionary<int, List<byte[]>>(); + incomplete = new Method[64*1024]; + } + + // Invoked when a network event is received + public void On_ReceivedEvent(object sender, ReceivedPayload<INetworkEvent> payload) + { + payload.Payload.ProcessNetworkEvent(this); + } + + #region Interface INetworkDelegate + + public void Init(ProtocolHeader header) + { + Emit(0, header); + } + + public void Error(ProtocolError error) + { + Emit(0, error); + } + + public void Frame(Frame frame) + { + MemoryStream segment; + if (frame.IsFirstFrame() && frame.IsLastFrame()) + { + byte[] tmp = new byte[frame.BodySize]; + frame.Body.Read(tmp, 0, tmp.Length); + segment = new MemoryStream(); + BinaryWriter w = new BinaryWriter(segment); + w.Write(tmp); + Assemble(frame, new MemoryStream(tmp)); + } + else + { + List<byte[]> frames; + if (frame.IsFirstFrame()) + { + frames = new List<byte[]>(); + SetSegment(frame, frames); + } + else + { + frames = GetSegment(frame); + } + byte[] tmp = new byte[frame.BodySize]; + frame.Body.Read(tmp, 0, tmp.Length); + frames.Add(tmp); + + if (frame.IsLastFrame()) + { + ClearSegment(frame); + segment = new MemoryStream(); + BinaryWriter w = new BinaryWriter(segment); + foreach (byte[] f in frames) + { + w.Write(f); + } + Assemble(frame, segment); + } + } + } + + #endregion + + #region Private Support Functions + + + private MSDecoder GetDecoder() + { + if( _decoder == null ) + { + _decoder = new MSDecoder(); + } + return _decoder; + } + + private void Assemble(Frame frame, MemoryStream segment) + { + MSDecoder decoder = GetDecoder(); + decoder.Init(segment); + int channel = frame.Channel; + Method command; + switch (frame.Type) + { + case SegmentType.CONTROL: + int controlType = decoder.ReadUint16(); + Method control = Method.Create(controlType); + control.Read(decoder); + Emit(channel, control); + break; + case SegmentType.COMMAND: + int commandType = decoder.ReadUint16(); + // read in the session header, right now we don't use it + decoder.ReadUint16(); + command = Method.Create(commandType); + command.Read(decoder); + if (command.HasPayload()) + { + incomplete[channel] = command; + } + else + { + Emit(channel, command); + } + break; + case SegmentType.HEADER: + command = incomplete[channel]; + List<Struct> structs = new List<Struct>(); + while (decoder.HasRemaining()) + { + structs.Add(decoder.ReadStruct32()); + } + command.Header = new Header(structs); + if (frame.IsLastSegment()) + { + incomplete[channel] = null; + Emit(channel, command); + } + break; + case SegmentType.BODY: + command = incomplete[channel]; + segment.Seek(0, SeekOrigin.Begin); + command.Body = segment; + incomplete[channel] = null; + Emit(channel, command); + break; + default: + throw new Exception("unknown frame type: " + frame.Type); + } + } + + private int SegmentKey(Frame frame) + { + return (frame.Track + 1)*frame.Channel; + } + + private List<byte[]> GetSegment(Frame frame) + { + return segments[SegmentKey(frame)]; + } + + private void SetSegment(Frame frame, List<byte[]> segment) + { + int key = SegmentKey(frame); + if (segments.ContainsKey(key)) + { + Error(new ProtocolError(network.Frame.L2, "segment in progress: %s", + frame)); + } + segments.Add(SegmentKey(frame), segment); + } + + private void ClearSegment(Frame frame) + { + segments.Remove(SegmentKey(frame)); + } + + // Emit a protocol event + private void Emit(int channel, IProtocolEvent protevent) + { + protevent.Channel = channel; + log.Debug("Assembler: protocol event:", protevent); + ReceivedPayload<IProtocolEvent> payload = new ReceivedPayload<IProtocolEvent>(); + payload.Payload = protevent; + + if (protevent is ConnectionCloseOk) + { + if (Closed != null) + Closed(this, EventArgs.Empty); + } + else + { + if (ReceivedEvent != null) + ReceivedEvent(this, payload); + else + log.Debug("No listener for event: {0}", protevent); + } + } + + #endregion + } +}
\ No newline at end of file |