diff options
Diffstat (limited to 'qpid/dotnet/client-010/client/transport')
68 files changed, 7312 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/Binary.cs b/qpid/dotnet/client-010/client/transport/Binary.cs new file mode 100644 index 0000000000..f9bd3612dc --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Binary.cs @@ -0,0 +1,129 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport +{ + + + /// <summary> + /// Binary + /// </summary> + + public sealed class Binary + { + + private readonly byte[] bytes; + private readonly int offset_Renamed_Field; + private readonly int size_Renamed_Field; + private int hash = 0; + + public Binary(byte[] bytes, int offset, int size) + { + if (offset + size > bytes.Length) + { + throw new System.IndexOutOfRangeException(); + } + + this.bytes = bytes; + offset_Renamed_Field = offset; + size_Renamed_Field = size; + } + + public Binary(byte[] bytes):this(bytes, 0, bytes.Length) + { + } + + public byte[] Array() + { + return bytes; + } + + public int Offset() + { + return offset_Renamed_Field; + } + + public int Size() + { + return size_Renamed_Field; + } + + public Binary Slice(int low, int high) + { + int sz; + + if (high < 0) + { + sz = size_Renamed_Field + high; + } + else + { + sz = high - low; + } + + if (sz < 0) + { + sz = 0; + } + + return new Binary(bytes, offset_Renamed_Field + low, sz); + } + + public override int GetHashCode() + { + if (hash == 0) + { + int hc = 0; + for (int i = 0; i < size_Renamed_Field; i++) + { + hc = 31 * hc + (0xFF & bytes[offset_Renamed_Field + i]); + } + hash = hc; + } + + return hash; + } + + public override bool Equals(System.Object o) + { + if (!(o is Binary)) + { + return false; + } + + Binary buf = (Binary) o; + if (size_Renamed_Field != buf.size_Renamed_Field) + { + return false; + } + + for (int i = 0; i < size_Renamed_Field; i++) + { + if (bytes[offset_Renamed_Field + i] != buf.bytes[buf.offset_Renamed_Field + i]) + { + return false; + } + } + + return true; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/Binding.cs b/qpid/dotnet/client-010/client/transport/Binding.cs new file mode 100644 index 0000000000..a0899c1066 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Binding.cs @@ -0,0 +1,34 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Binding + /// </summary> + internal interface Binding<E, T> + { + E endpoint(Sender<T> sender); + + Receiver<R> receiver<R>(E endpoint) where R : EventArgs; + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/Channel.cs b/qpid/dotnet/client-010/client/transport/Channel.cs new file mode 100644 index 0000000000..48ba707182 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Channel.cs @@ -0,0 +1,174 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using org.apache.qpid.transport.network; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Channel + /// </summary> + public class Channel : Invoker, IProtocolDelegate<Object> + { + private static readonly Logger log = Logger.Get(typeof (Channel)); + + private readonly Connection _connection; + private readonly int _channel; + private readonly MethodDelegate<Channel> _methoddelegate; + private readonly SessionDelegate _sessionDelegate; + // session may be null + private Session _session; + + public Channel(Connection connection, int channel, SessionDelegate sessionDelegate) + { + _connection = connection; + _channel = channel; + _methoddelegate = new ChannelDelegate(); + _sessionDelegate = sessionDelegate; + } + + public Connection Connection + { + get { return _connection; } + } + + // Invoked when a network event is received + public void On_ReceivedEvent(object sender, ReceivedPayload<IProtocolEvent> payload) + { + if (payload.Payload.Channel == _channel) + { + payload.Payload.ProcessProtocolEvent(null, this); + } + } + + #region ProtocolDelegate<T> + + public void Init(Object v, ProtocolHeader hdr) + { + _connection.ConnectionDelegate.Init(this, hdr); + } + + public void Control(Object v, Method method) + { + switch (method.EncodedTrack) + { + case Frame.L1: + method.Dispatch(this, _connection.ConnectionDelegate); + break; + case Frame.L2: + method.Dispatch(this, _methoddelegate); + break; + case Frame.L3: + method.ProcessProtocolEvent(_session, _sessionDelegate); + break; + default: + throw new Exception("unknown track: " + method.EncodedTrack); + } + } + + public void Command(Object v, Method method) + { + method.ProcessProtocolEvent(_session, _sessionDelegate); + } + + public void Error(Object v, ProtocolError error) + { + throw new Exception(error.Message); + } + + #endregion + + public void Exception(Exception t) + { + _session.Exception(t); + } + + public void ClosedFromConnection() + { + log.Debug("channel Closed: ", this); + if (_session != null) + { + _session.Closed(); + } + } + + public void Closed() + { + log.Debug("channel Closed: ", this); + if (_session != null) + { + _session.Closed(); + } + _connection.RemoveChannel(_channel); + } + + public int EncodedChannel + { + get { return _channel; } + } + + public Session Session + { + get { return _session; } + set { _session = value; } + } + + public void CloseCode(ConnectionClose close) + { + if (_session != null) + { + _session.CloseCode(close); + } + } + + private void Emit(IProtocolEvent pevent) + { + pevent.Channel = _channel; + _connection.Send(pevent); + } + + public void Method(Method m) + { + Emit(m); + + if (!m.Batch) + { + _connection.Flush(); + } + } + + protected override void Invoke(Method m) + { + Method(m); + } + + public override IFuture Invoke(Method m, IFuture future) + { + throw new Exception("UnsupportedOperation"); + } + + public override String ToString() + { + return String.Format("{0}:{1}", _connection, _channel); + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/ChannelDelegate.cs b/qpid/dotnet/client-010/client/transport/ChannelDelegate.cs new file mode 100644 index 0000000000..3a43d6d231 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ChannelDelegate.cs @@ -0,0 +1,41 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport +{ + /// <summary> + /// ChannelDelegate + /// + /// </summary> + internal class ChannelDelegate : MethodDelegate<Channel> + { + public override void SessionDetached(Channel channel, SessionDetached closed) + { + channel.Closed(); + } + + public override void SessionDetach(Channel channel, SessionDetach dtc) + { + channel.Session.Closed(); + channel.SessionDetached(dtc.GetName(), SessionDetachCode.NORMAL); + channel.Closed(); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/ClientDelegate.cs b/qpid/dotnet/client-010/client/transport/ClientDelegate.cs new file mode 100644 index 0000000000..957324ad41 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ClientDelegate.cs @@ -0,0 +1,35 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using org.apache.qpid.transport; + +namespace org.apache.qpid.transport +{ + abstract class ClientDelegate : ConnectionDelegate + { + public override void Init(Channel ch, ProtocolHeader hdr) + { + if (hdr.Major != 0 && hdr.Minor != 10) + { + throw new ProtocolVersionException((sbyte) hdr.Major, (sbyte) hdr.Minor); + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/Connection.cs b/qpid/dotnet/client-010/client/transport/Connection.cs new file mode 100644 index 0000000000..b97357a96b --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Connection.cs @@ -0,0 +1,168 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using Logger = org.apache.qpid.transport.util.Logger; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Connection + /// </summary> + public class Connection + { + private static readonly Logger log = Logger.Get(typeof (Connection)); + + private readonly ISender<IProtocolEvent> _sender; + private readonly ConnectionDelegate _connDdelegate; + private int _channelMax = 1; + private int _connectionId; + private readonly IReceiver<ReceivedPayload<IProtocolEvent>> _receiver; + + private readonly Dictionary<int, Channel> _channels = new Dictionary<int, Channel>(); + + public Connection(IReceiver<ReceivedPayload<IProtocolEvent>> receiver, ISender<IProtocolEvent> sender, ConnectionDelegate connDdelegate) + { + _receiver = receiver; + _sender = sender; + _connDdelegate = connDdelegate; + } + + public int ConnectionId + { + get { return _connectionId; } + set { _connectionId = value; } + } + + public ConnectionDelegate ConnectionDelegate + { + get { return _connDdelegate; } + } + + public int ChannelMax + { + get { return _channelMax; } + set { _channelMax = value; } + } + + public void Send(IProtocolEvent pevent) + { + log.Debug("SEND: [{0}] {1}", this, pevent); + _sender.Send(pevent); + } + + public void Flush() + { + log.Debug("FLUSH: [{0}]", this); + _sender.Flush(); + } + + + public Channel GetChannel() + { + lock (_channels) + { + for (int i = 0; i < ChannelMax; i++) + { + if (!_channels.ContainsKey(i)) + { + return GetChannel(i); + } + } + throw new Exception("no more _channels available"); + } + } + + public Channel GetChannel(int number) + { + lock (_channels) + { + Channel channel = null; + if (_channels.Count > 0) + { + if( _channels.ContainsKey(number)) + channel = _channels[number]; + } + if (channel == null) + { + channel = new Channel(this, number, _connDdelegate.GetSessionDelegate()); + _receiver.Received += channel.On_ReceivedEvent; + _channels.Add(number, channel); + } + return channel; + } + } + + public void RemoveChannel(int number) + { + lock (_channels) + { + _receiver.Received -= _channels[number].On_ReceivedEvent; + _channels.Remove(number); + } + } + + public void On_ReceivedEvent(object sender, ReceivedPayload<IProtocolEvent> payload) + { + log.Debug("RECV: [{0}] {1}", this, payload.Payload); + if (_channels.ContainsKey(payload.Payload.Channel)) return; + Channel channel = GetChannel(payload.Payload.Channel); + channel.On_ReceivedEvent(sender, payload); + } + + public void On_ReceivedException(Object sender, ExceptionArgs arg) + { + _connDdelegate.RaiseException(arg.Exception); + } + + public void On_ReceivedClosed(Object sender, EventArgs arg) + { + log.Debug("Connection Closed: {0}", this); + lock (_channels) + { + foreach (Channel ch in _channels.Values) + { + ch.ClosedFromConnection(); + } + } + _channels.Clear(); + _connDdelegate.Closed(); + } + + + public void CloseCode(ConnectionClose close) + { + lock (_channels) + { + foreach (Channel ch in _channels.Values) + { + ch.CloseCode(close); + } + } + } + + public void Close() + { + _sender.Close(); + } + } + +} diff --git a/qpid/dotnet/client-010/client/transport/ConnectionDelegate.cs b/qpid/dotnet/client-010/client/transport/ConnectionDelegate.cs new file mode 100644 index 0000000000..5d491bc06f --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ConnectionDelegate.cs @@ -0,0 +1,108 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using System.Threading; +using Logger = org.apache.qpid.transport.util.Logger; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// ConnectionDelegate + /// + /// Currently only implemented client specific methods + /// </summary> + public abstract class ConnectionDelegate : MethodDelegate<Channel> + { + private static readonly Logger log = Logger.Get(typeof(ConnectionDelegate)); + private String _virtualHost; + + protected ManualResetEvent _negotiationComplete; + + public abstract SessionDelegate GetSessionDelegate(); + + public abstract void RaiseException(Exception t); + + public abstract void Closed(); + + public void SetCondition(ManualResetEvent negotiationComplete) + { + _negotiationComplete = negotiationComplete; + } + + public virtual void Init(Channel ch, ProtocolHeader hdr) + { + ch.Connection.Send(new ProtocolHeader((byte)1, hdr.Major, hdr.Minor)); + List<Object> plain = new List<Object>(); + plain.Add("PLAIN"); + List<Object> utf8 = new List<Object>(); + utf8.Add("utf8"); + ch.ConnectionStart(null, plain, utf8); + } + + public String VirtualHost + { + get { return _virtualHost; } + set { _virtualHost = value; } + } + + // ---------------------------------------------- + // Client side + //----------------------------------------------- + public override void ConnectionStart(Channel context, ConnectionStart mstruct) + { + Dictionary<String, Object> props = new Dictionary<String, Object>(); + context.ConnectionStartOk(props, null, null, "utf8"); + } + + public override void ConnectionSecure(Channel context, ConnectionSecure mstruct) + { // todo SASL + context.ConnectionSecureOk(new byte[0]); + } + + public override void ConnectionTune(Channel context, ConnectionTune mstruct) + { + context.Connection.ChannelMax = mstruct.GetChannelMax(); + context.ConnectionTuneOk(mstruct.GetChannelMax(), mstruct.GetMaxFrameSize(), mstruct.GetHeartbeatMax()); + context.ConnectionOpen(_virtualHost, null, Option.INSIST); + } + + public override void ConnectionOpenOk(Channel context, ConnectionOpenOk mstruct) + { + List<Object> knownHosts = mstruct.GetKnownHosts(); + if (_negotiationComplete != null) + { + _negotiationComplete.Set(); + } + } + + public override void ConnectionRedirect(Channel context, ConnectionRedirect mstruct) + { + // not going to bother at the moment + } + + public override void ConnectionClose(Channel ch, ConnectionClose close) + { + ch.Connection.CloseCode(close); + ch.ConnectionCloseOk(); + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/Field.cs b/qpid/dotnet/client-010/client/transport/Field.cs new file mode 100644 index 0000000000..9af8c4a476 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Field.cs @@ -0,0 +1,74 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using org.apache.qpid.transport.codec; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Field + /// </summary> + public abstract class Field<C, T> + { + private C container; + private T type; + private String name; + private int index; + + protected Field(C container, T type, String name, int index) + { + this.container = container; + this.type = type; + this.name = name; + this.index = index; + } + + public C Container + { + get { return container; } + } + + public T Type + { + get { return type; } + } + + public String Name + { + get { return name; } + } + + public int Index + { + get { return index; } + } + + public abstract bool Has(Object mystruct); + + public abstract void Has(Object mystruct, bool value); + + public abstract T Get(Object mystruct); + + public abstract void Read(IDecoder dec, Object mystruct); + + public abstract void Write(IEncoder enc, Object mystruct); + } +} diff --git a/qpid/dotnet/client-010/client/transport/Future.cs b/qpid/dotnet/client-010/client/transport/Future.cs new file mode 100644 index 0000000000..c0eadfb7ae --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Future.cs @@ -0,0 +1,38 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Future + /// </summary> + public interface Future + { + Struct Result + { + get; set; + } + + Session Session + { set; + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/Header.cs b/qpid/dotnet/client-010/client/transport/Header.cs new file mode 100644 index 0000000000..742531cfd8 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Header.cs @@ -0,0 +1,83 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using System.Text; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Header + /// </summary> + public class Header + { + private readonly Struct[] _mystructs; + + public Header(List<Struct> structs) + : this(structs.ToArray()) + { + } + + public Header(params Struct[] structs) + { + _mystructs = structs; + } + + public Struct[] Structs + { + get { return _mystructs; } + } + + + public Struct Get(Struct klass) + { + foreach (Struct st in _mystructs) + { + if (Equals(st.GetType(), klass.GetType())) + { + return st; + } + } + return null; + } + + public override String ToString() + { + StringBuilder str = new StringBuilder(); + str.Append(" Header("); + bool first = true; + foreach (Struct s in _mystructs) + { + if (first) + { + first = false; + } + else + { + str.Append(", "); + } + str.Append(s); + } + str.Append(")"); + return str.ToString(); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/IBinding.cs b/qpid/dotnet/client-010/client/transport/IBinding.cs new file mode 100644 index 0000000000..607212f1fe --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/IBinding.cs @@ -0,0 +1,34 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Binding + /// </summary> + internal interface IBinding<E, T> + { + E Endpoint(ISender<T> sender); + + IReceiver<R> Receiver<R>(E endpoint) where R : EventArgs; + } +} diff --git a/qpid/dotnet/client-010/client/transport/IFuture.cs b/qpid/dotnet/client-010/client/transport/IFuture.cs new file mode 100644 index 0000000000..054b828d13 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/IFuture.cs @@ -0,0 +1,38 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Future + /// </summary> + public interface IFuture + { + Struct Result + { + get; set; + } + + Session Session + { set; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/IProtocolDelegate.cs b/qpid/dotnet/client-010/client/transport/IProtocolDelegate.cs new file mode 100644 index 0000000000..a9875fd290 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/IProtocolDelegate.cs @@ -0,0 +1,37 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport +{ + /// <summary> + /// ProtocolDelegate + /// </summary> + public interface IProtocolDelegate<T> + { + void Init(T context, ProtocolHeader header); + + void Control(T context, Method control); + + void Command(T context, Method command); + + void Error(T context, ProtocolError error); + } +} diff --git a/qpid/dotnet/client-010/client/transport/IProtocolEvent.cs b/qpid/dotnet/client-010/client/transport/IProtocolEvent.cs new file mode 100644 index 0000000000..8f915b204a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/IProtocolEvent.cs @@ -0,0 +1,42 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport +{ + /// <summary> + /// IProtocolEvent + /// </summary> + public interface IProtocolEvent + { + int Channel + { + get; + set; + } + + byte EncodedTrack + { + set; + get; + } + + void ProcessProtocolEvent<C>(C context, IProtocolDelegate<C> protocoldelegate); + } +} diff --git a/qpid/dotnet/client-010/client/transport/IReceiver.cs b/qpid/dotnet/client-010/client/transport/IReceiver.cs new file mode 100644 index 0000000000..4c4c9572b9 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/IReceiver.cs @@ -0,0 +1,38 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// a receiver will raise an event when: + /// - data is received + /// - an exception is thrown + /// - it is Closed + /// </summary> + public interface IReceiver <T> where T : EventArgs + { + event EventHandler<T> Received; + event EventHandler<ExceptionArgs> Exception; + event EventHandler Closed; + } +} diff --git a/qpid/dotnet/client-010/client/transport/ISender.cs b/qpid/dotnet/client-010/client/transport/ISender.cs new file mode 100644 index 0000000000..d7d1781aec --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ISender.cs @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport +{ + /// <summary> + /// Sender + /// </summary> + public interface ISender<T> + { + void Send(T msg); + void Flush(); + void Close(); + } +} diff --git a/qpid/dotnet/client-010/client/transport/ISession.cs b/qpid/dotnet/client-010/client/transport/ISession.cs new file mode 100644 index 0000000000..e843095df6 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ISession.cs @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections.Generic; + +namespace org.apache.qpid.transport +{ + public interface ISession : IInvoker + { + bool IsClosed { get; set; } + string Name { get; } + int CommandsIn { get; set; } + byte[] GetName(); + void SetAutoSync(bool value); + Dictionary<int, Method> GetOutstandingCommands(); + int GetCommandsOut(); + int NextCommandId(); + void Identify(Method cmd); + void Processed(Method command); + void Processed(int command); + void Processed(int lower, int upper); + void Processed(Range range); + void FlushProcessed(params Option[] options); + void KnownComplete(RangeSet kc); + void SyncPoint(); + void Attach(Channel channel); + Method GetCommand(int id); + bool Complete(int lower, int upper); + void Sync(); + void Sync(long timeout); + void Result(int command, Struct result); + void AddException(ExecutionException exc); + void CloseCode(ConnectionClose close); + List<ExecutionException> GetExceptions(); + + void MessageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + byte[] body, + params Option[] options); + + void MessageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + String body, + params Option[] options); + + void Close(); + void Exception(Exception t); + void Closed(); + } +} diff --git a/qpid/dotnet/client-010/client/transport/Method.cs b/qpid/dotnet/client-010/client/transport/Method.cs new file mode 100644 index 0000000000..8540698822 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Method.cs @@ -0,0 +1,150 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Text; +using Frame = org.apache.qpid.transport.network.Frame; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Method + /// </summary> + public abstract class Method : Struct, IProtocolEvent + { + public new static Method Create(int type) + { + return (Method) StructFactory.createInstruction(type); + } + + // XXX: command subclass? + private int id; + private int channel; + private bool idSet; + private bool sync; + private bool batch; + + public int Id + { + get { return id; } + set + { + id = value; + idSet = true; + } + } + + + public bool Sync + { + get { return sync; } + set { sync = value; } + } + + public bool Batch + { + get { return batch; } + set { batch = value; } + } + + public abstract bool HasPayload(); + + public virtual Header Header + { + get { return null; } + set { throw new Exception(); } + } + + public virtual MemoryStream Body + { + get { return null; } + set { throw new Exception(); } + } + + + public abstract void Dispatch<C>(C context, MethodDelegate<C> mdelegate ); + + #region IProtocolEvent + + public int Channel + { + get { return channel; } + set { channel = value; } + } + + public abstract byte EncodedTrack { get; set; } + + public void ProcessProtocolEvent<C>(C context, IProtocolDelegate<C> protocoldelegate) + { + if (EncodedTrack == Frame.L4) + { + protocoldelegate.Command(context, this); + } + else + { + protocoldelegate.Control(context, this); + } + } + + #endregion + + public override String ToString() + { + StringBuilder str = new StringBuilder(); + + str.Append("ch="); + str.Append(channel); + + if (EncodedTrack == Frame.L4 && idSet) + { + str.Append(" id="); + str.Append(id); + } + + if (sync || batch) + { + str.Append(" "); + str.Append("["); + if (Sync) + { + str.Append("S"); + } + if (Batch) + { + str.Append("B"); + } + str.Append("]"); + } + str.Append(" "); + str.Append(base.ToString()); + if (Header != null) + { + str.Append(Header.ToString()); + } + if (Body != null) + { + str.Append("\n body="); + str.Append(Body.ToString()); + } + return str.ToString(); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/ProtocolDelegate.cs b/qpid/dotnet/client-010/client/transport/ProtocolDelegate.cs new file mode 100644 index 0000000000..32dbd116ff --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ProtocolDelegate.cs @@ -0,0 +1,37 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport +{ + /// <summary> + /// ProtocolDelegate + /// </summary> + public interface ProtocolDelegate<T> + { + void Init(T context, ProtocolHeader header); + + void Control(T context, Method control); + + void Command(T context, Method command); + + void Error(T context, ProtocolError error); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/ProtocolError.cs b/qpid/dotnet/client-010/client/transport/ProtocolError.cs new file mode 100644 index 0000000000..2a5bf39565 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ProtocolError.cs @@ -0,0 +1,85 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using org.apache.qpid.transport.network; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// ProtocolError + /// </summary> + public sealed class ProtocolError : INetworkEvent, IProtocolEvent + { + private int channel; + private byte track; + private String format; + private Object[] args; + + public ProtocolError(byte track, String format, params Object[] args) + { + this.track = track; + this.format = format; + this.args = args; + } + + #region INetworkEvent Methods + + public void ProcessNetworkEvent(INetworkDelegate ndelegate) + { + ndelegate.Error(this); + } + + #endregion + + #region IProtocolEvent Methods + + public int Channel + { + get { return channel; } + set { channel = value; } + } + + public byte EncodedTrack + { + get { return track; } + set { throw new NotImplementedException(); } + } + + public void ProcessProtocolEvent<C>(C context, IProtocolDelegate<C> protocoldelegate) + { + protocoldelegate.Error(context, this); + } + + #endregion + + public String Message + { + get { return String.Format(format, args); } + } + + + public override String ToString() + { + return String.Format("protocol error: {0}", Message); + } + + } +} diff --git a/qpid/dotnet/client-010/client/transport/ProtocolEvent.cs b/qpid/dotnet/client-010/client/transport/ProtocolEvent.cs new file mode 100644 index 0000000000..990d5ecc3a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ProtocolEvent.cs @@ -0,0 +1,42 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport +{ + /// <summary> + /// ProtocolEvent + /// </summary> + public interface ProtocolEvent + { + int Channel + { + get; + set; + } + + byte EncodedTrack + { + set; + get; + } + + void ProcessProtocolEvent<C>(C context, ProtocolDelegate<C> protocoldelegate); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/ProtocolHeader.cs b/qpid/dotnet/client-010/client/transport/ProtocolHeader.cs new file mode 100644 index 0000000000..4adfee25df --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ProtocolHeader.cs @@ -0,0 +1,124 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Text; +using org.apache.qpid.transport.network; +using Frame = org.apache.qpid.transport.network.Frame; + +namespace org.apache.qpid.transport +{ + /// <summary> ProtocolHeader + /// + /// </summary> + public sealed class ProtocolHeader : INetworkEvent, IProtocolEvent + { + private readonly char[] AMQP = new char[] {'A', 'M', 'Q', 'P'}; + private const byte CLASS = 1; + + private readonly byte instance; + private readonly byte major; + private readonly byte minor; + private int channel; + + public ProtocolHeader(byte instance, byte major, byte minor) + { + this.instance = instance; + this.major = major; + this.minor = minor; + } + + public ProtocolHeader(int instance, int major, int minor) : this((byte)instance, (byte)major, (byte)minor) + { + } + + #region INetworkEvent Methods + + public void ProcessNetworkEvent(INetworkDelegate ndelegate) + { + ndelegate.Init(this); + } + + #endregion + + #region IProtocolEvent Methods + + public int Channel + { + get + { + return channel; + } + set + { + channel = value; + } + } + + public byte EncodedTrack + { + get + { + return Frame.L1; + } + set { throw new NotImplementedException(); } + } + + public void ProcessProtocolEvent<C>(C context, IProtocolDelegate<C> protocoldelegate) + { + protocoldelegate.Init(context, this); + } + + #endregion + + public byte Instance + { + get { return instance; } + } + + public byte Major + { + get { return major; } + } + + public byte Minor + { + get { return minor; } + } + + public MemoryStream ToMemoryStream() + { + MemoryStream buf = new MemoryStream(8); + BinaryWriter writer = new BinaryWriter(buf); + writer.Write(AMQP); + writer.Write(CLASS); + writer.Write(instance); + writer.Write((sbyte) major); + writer.Write((sbyte) minor); + return buf; + } + + public override String ToString() + { + return String.Format("AMQP.{0:d} {1:d}-{2:d}", instance, major, minor); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/Range.cs b/qpid/dotnet/client-010/client/transport/Range.cs new file mode 100644 index 0000000000..904b1c1229 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Range.cs @@ -0,0 +1,117 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport +{ + + /// <summary> + /// Range + /// </summary> + + + public sealed class Range + { + private int _lower; + private int _upper; + + public Range(int lower, int upper) + { + _lower = lower; + _upper = upper; + } + + public int Lower + { + get { return _lower; } + set { _lower = value; } + } + public int Upper + { + get { return _upper; } + set { _upper = value; } + } + + public bool Includes(int value) + { + return Serial.Le(_lower, value) && Serial.Le(value, _upper); + } + + public bool Includes(Range range) + { + return Includes(range._lower) && Includes(range._upper); + } + + public bool Intersects(Range range) + { + return (Includes(range._lower) || Includes(range._upper) || + range.Includes(_lower) || range.Includes(_upper)); + } + + public bool Touches(Range range) + { + return (Intersects(range) || + Includes(range._upper + 1) || Includes(range._lower - 1) || + range.Includes(_upper + 1) || range.Includes(_lower - 1)); + } + + public Range Span(Range range) + { + return new Range(Serial.Min(_lower, range._lower), Serial.Max(_upper, range._upper)); + } + + public List<Range> Subtract(Range range) + { + List<Range> result = new List<Range>(); + + if (Includes(range._lower) && Serial.Le(_lower, range._lower - 1)) + { + result.Add(new Range(_lower, range._lower - 1)); + } + + if (Includes(range._upper) && Serial.Le(range._upper + 1, _upper)) + { + result.Add(new Range(range._upper + 1, _upper)); + } + + if (result.Count == 0 && !range.Includes(this)) + { + result.Add(this); + } + + return result; + } + + public Range Intersect(Range range) + { + int l = Serial.Max(_lower, range._lower); + int r = Serial.Min(_upper, range._upper); + return Serial.Gt(l, r) ? null : new Range(l, r); + } + + public override String ToString() + { + return "[" + _lower + ", " + _upper + "]"; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/RangeSet.cs b/qpid/dotnet/client-010/client/transport/RangeSet.cs new file mode 100644 index 0000000000..0a856ee979 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/RangeSet.cs @@ -0,0 +1,150 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// RangeSet + /// </summary> + public sealed class RangeSet : IEnumerable<Range> + { + private readonly List<Range> _ranges = new List<Range>(); + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public IEnumerator<Range> GetEnumerator() + { + return _ranges.GetEnumerator(); + } + + + public int Size() + { + return _ranges.Count; + } + + + public Range GetFirst() + { + return _ranges[0]; + } + + public bool Includes(Range range) + { + foreach (Range r in this) + { + if (r.Includes(range)) + { + return true; + } + } + + return false; + } + + public bool Includes(int n) + { + foreach (Range r in this) + { + if (r.Includes(n)) + { + return true; + } + } + + return false; + } + + public void Add(Range range) + { + for (int i = 0; i < _ranges.Count; i++) + { + Range r = _ranges[i]; + if (range.Touches(r)) + { + _ranges.Remove(r); + range = range.Span(r); + } + else if (Serial.Lt(range.Upper, r.Lower)) + { + _ranges.Insert(i - 1 , range); + return; + } + } + _ranges.Add(range); + } + + public void Add(int lower, int upper) + { + Add(new Range(lower, upper)); + } + + public void Add(int value) + { + Add(value, value); + } + + public void Clear() + { + _ranges.Clear(); + } + + public RangeSet Copy() + { + RangeSet copy = new RangeSet(); + foreach (Range r in _ranges) + { + copy._ranges.Add(r); + } + return copy; + } + + public override String ToString() + { + StringBuilder str = new StringBuilder(); + str.Append("{"); + bool first = true; + foreach (Range range in _ranges) + { + if (first) + { + first = false; + } + else + { + str.Append(", "); + } + str.Append(range); + } + str.Append("}"); + return str.ToString(); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/ReceivedPayload.cs b/qpid/dotnet/client-010/client/transport/ReceivedPayload.cs new file mode 100644 index 0000000000..e072ba7493 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/ReceivedPayload.cs @@ -0,0 +1,43 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; + +namespace org.apache.qpid.transport +{ + public class ReceivedPayload<T> : EventArgs + { + public ReceivedPayload() + { + } + + public ReceivedPayload(T payload) + { + m_payload = payload; + } + private T m_payload; + + public T Payload + { + get { return m_payload; } + set { m_payload = value; } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/Receiver.cs b/qpid/dotnet/client-010/client/transport/Receiver.cs new file mode 100644 index 0000000000..f8d91c3f10 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Receiver.cs @@ -0,0 +1,38 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// a receiver will raise an event when: + /// - data is received + /// - an exception is thrown + /// - it is closed + /// </summary> + public interface Receiver <T> where T : EventArgs + { + event EventHandler<T> Received; + event EventHandler<ExceptionArgs> Exception; + event EventHandler Closed; + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/Sender.cs b/qpid/dotnet/client-010/client/transport/Sender.cs new file mode 100644 index 0000000000..f8b5bdef06 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Sender.cs @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport +{ + /// <summary> + /// Sender + /// </summary> + public interface Sender<T> + { + void send(T msg); + void flush(); + void close(); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/Session.cs b/qpid/dotnet/client-010/client/transport/Session.cs new file mode 100644 index 0000000000..7b4aff9811 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Session.cs @@ -0,0 +1,522 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; +using org.apache.qpid.transport.util; +using Frame = org.apache.qpid.transport.network.Frame; +using Logger = org.apache.qpid.transport.util.Logger; + + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Session + /// + /// </summary> + public class Session : Invoker, ISession + { + private static readonly Logger log = Logger.Get(typeof (Session)); + private static readonly bool ENABLE_REPLAY; + + static Session() + { + const string enableReplay = "enable_command_replay"; + try + { + String var = Environment.GetEnvironmentVariable(enableReplay); + if (var != null) + { + ENABLE_REPLAY = bool.Parse(var); + } + } + catch (Exception) + { + ENABLE_REPLAY = false; + } + } + + private readonly byte[] _name; + private const long _timeout = 600000; + private bool _autoSync = false; + + // channel may be null + private Channel _channel; + + // incoming command count + private int _commandsIn = 0; + // completed incoming commands + private readonly Object _processedLock = new Object(); + private RangeSet _processed = new RangeSet(); + private int _maxProcessed = - 1; + private int _syncPoint = -1; + + // outgoing command count + private int _commandsOut = 0; + private readonly Dictionary<int, Method> _commands = new Dictionary<int, Method>(); + private int _maxComplete = - 1; + private bool _needSync = false; + private bool _closed; + private readonly Dictionary<int, IFuture> _results = new Dictionary<int, IFuture>(); + private readonly List<ExecutionException> _exceptions = new List<ExecutionException>(); + + + public bool IsClosed + { + get + { + lock (this) + { + return _closed; + } + } + set + { + lock (this) + { + _closed = value; + } + } + } + + public string Name + { + get + { + ASCIIEncoding enc = new ASCIIEncoding(); + return enc.GetString(_name); + } + } + + public Session(byte[] name) + { + _name = name; + } + + public byte[] GetName() + { + return _name; + } + + public void SetAutoSync(bool value) + { + lock (_commands) + { + _autoSync = value; + } + } + + public Dictionary<int, Method> GetOutstandingCommands() + { + return _commands; + } + + public int GetCommandsOut() + { + return _commandsOut; + } + + public int CommandsIn + { + get { return _commandsIn; } + set { _commandsIn = value; } + } + + public int NextCommandId() + { + return _commandsIn++; + } + + public void Identify(Method cmd) + { + int id = NextCommandId(); + cmd.Id = id; + + if (log.IsDebugEnabled()) + { + log.Debug("ID: [{0}] %{1}", _channel, id); + } + + //if ((id % 65536) == 0) + if ((id & 0xff) == 0) + { + FlushProcessed(Option.TIMELY_REPLY); + } + } + + public void Processed(Method command) + { + Processed(command.Id); + } + + public void Processed(int command) + { + Processed(new Range(command, command)); + } + + public void Processed(int lower, int upper) + { + Processed(new Range(lower, upper)); + } + + public void Processed(Range range) + { + log.Debug("{0} processed({1})", this, range); + + bool flush; + lock (_processedLock) + { + _processed.Add(range); + Range first = _processed.GetFirst(); + int lower = first.Lower; + int upper = first.Upper; + int old = _maxProcessed; + if (Serial.Le(lower, _maxProcessed + 1)) + { + _maxProcessed = Serial.Max(_maxProcessed, upper); + } + flush = Serial.Lt(old, _syncPoint) && Serial.Ge(_maxProcessed, _syncPoint); + _syncPoint = _maxProcessed; + } + if (flush) + { + FlushProcessed(); + } + } + + public void FlushProcessed(params Option[] options) + { + RangeSet copy; + lock (_processedLock) + { + copy = _processed.Copy(); + } + SessionCompleted(copy, options); + } + + public void KnownComplete(RangeSet kc) + { + lock (_processedLock) + { + RangeSet newProcessed = new RangeSet(); + foreach (Range pr in _processed) + { + foreach (Range kr in kc) + { + foreach (Range r in pr.Subtract(kr)) + { + newProcessed.Add(r); + } + } + } + _processed = newProcessed; + } + } + + public void SyncPoint() + { + int id = CommandsIn - 1; + log.Debug("{0} synced to {1}", this, id); + bool flush; + lock (_processedLock) + { + _syncPoint = id; + flush = Serial.Ge(_maxProcessed, _syncPoint); + } + if (flush) + { + FlushProcessed(); + } + } + + public void Attach(Channel channel) + { + _channel = channel; + _channel.Session = this; + } + + public Method GetCommand(int id) + { + lock (_commands) + { + return _commands[id]; + } + } + + public bool Complete(int lower, int upper) + { + //avoid autoboxing + if (log.IsDebugEnabled()) + { + log.Debug("{0} complete({1}, {2})", this, lower, upper); + } + lock (_commands) + { + int old = _maxComplete; + for (int id = Serial.Max(_maxComplete, lower); Serial.Le(id, upper); id++) + { + _commands.Remove(id); + } + if (Serial.Le(lower, _maxComplete + 1)) + { + _maxComplete = Serial.Max(_maxComplete, upper); + } + log.Debug("{0} commands remaining: {1}", this, _commands); + Monitor.PulseAll(_commands); + return Serial.Gt(_maxComplete, old); + } + } + + protected override void Invoke(Method m) + { + if (IsClosed) + { + List<ExecutionException> exc = GetExceptions(); + if (exc.Count > 0) + { + throw new SessionException(exc); + } + else if (_close != null) + { + throw new ConnectionException(_close); + } + else + { + throw new SessionClosedException(); + } + } + + if (m.EncodedTrack == Frame.L4) + { + lock (_commands) + { + int next = _commandsOut++; + m.Id = next; + if (next == 0) + { + SessionCommandPoint(0, 0); + } + if (ENABLE_REPLAY) + { + _commands.Add(next, m); + } + if (_autoSync) + { + m.Sync = true; + } + _needSync = ! m.Sync; + _channel.Method(m); + if (_autoSync) + { + Sync(); + } + + // flush every 64K commands to avoid ambiguity on + // wraparound + if ((next%65536) == 0) + { + SessionFlush(Option.COMPLETED); + } + } + } + else + { + _channel.Method(m); + } + } + + public void Sync() + { + Sync(_timeout); + } + + public void Sync(long timeout) + { + log.Debug("{0} sync()", this); + lock (_commands) + { + int point = _commandsOut - 1; + + if (_needSync && Serial.Lt(_maxComplete, point)) + { + ExecutionSync(Option.SYNC); + } + + DateTime start = DateTime.Now; + long elapsed = 0; + + while (!IsClosed && elapsed < timeout && Serial.Lt(_maxComplete, point)) + { + log.Debug("{0} waiting for[{1}]: {2}, {3}", this, point, + _maxComplete, _commands); + Monitor.Wait(_commands, (int) (timeout - elapsed)); + elapsed = DateTime.Now.Subtract(start).Milliseconds; + } + + if (Serial.Lt(_maxComplete, point)) + { + if (IsClosed) + { + throw new SessionException(GetExceptions()); + } + else + { + throw new Exception + (String.Format + ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point)); + } + } + } + } + + + public void Result(int command, Struct result) + { + IFuture future; + lock (_results) + { + if (_results.ContainsKey(command)) + { + future = _results[command]; + _results.Remove(command); + } + else + { + throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result)); + } + } + future.Result = result; + } + + public void AddException(ExecutionException exc) + { + lock (_exceptions) + { + _exceptions.Add(exc); + } + } + + private ConnectionClose _close = null; + + public void CloseCode(ConnectionClose close) + { + _close = close; + } + + public List<ExecutionException> GetExceptions() + { + lock (_exceptions) + { + return new List<ExecutionException>(_exceptions); + } + } + + public override IFuture Invoke(Method m, IFuture future) + { + lock (_commands) + { + future.Session = this; + int command = _commandsOut; + lock (_results) + { + _results.Add(command, future); + } + Invoke(m); + } + return future; + } + + + public void MessageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + byte[] body, + params Option[] options) + { + MemoryStream mbody = new MemoryStream(); + mbody.Write(body,0, body.Length); + MessageTransfer(destination, acceptMode, acquireMode, header, + mbody, options); + } + + public void MessageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + String body, + params Option[] options) + { + MessageTransfer(destination, acceptMode, acquireMode, header, + new MemoryStream(Convert.ToByte(body)), options); + } + + public void Close() + { + SessionRequestTimeout(0); + SessionDetach(_name); + lock (_commands) + { + DateTime start = DateTime.Now; + long elapsed = 0; + + while (!IsClosed && elapsed < _timeout) + { + Monitor.Wait(_commands, (int) (_timeout - elapsed)); + elapsed = DateTime.Now.Subtract(start).Milliseconds; + } + } + } + + public void Exception(Exception t) + { + log.Error(t, "Caught exception"); + } + + public void Closed() + { + IsClosed = true; + lock (_commands) + { + Monitor.PulseAll(_commands); + } + lock (_results) + { + foreach (IFuture result in _results.Values) + { + lock (result) + { + Monitor.PulseAll(result); + } + } + } + _channel.Session = null; + _channel = null; + } + + public override String ToString() + { + return String.Format("session:{0}", _name); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/SessionDelegate.cs b/qpid/dotnet/client-010/client/transport/SessionDelegate.cs new file mode 100644 index 0000000000..973e22df16 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/SessionDelegate.cs @@ -0,0 +1,126 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport +{ + /// <summary> + /// SessionDelegate + /// + /// </summary> + public abstract class SessionDelegate : MethodDelegate<Session>, IProtocolDelegate<Session> + { + public void Init(Session ssn, ProtocolHeader hdr) + { + } + + public void Control(Session ssn, Method method) + { + method.Dispatch(ssn, this); + } + + public void Command(Session ssn, Method method) + { + ssn.Identify(method); + method.Dispatch(ssn, this); + if (!method.HasPayload()) + { + ssn.Processed(method); + } + } + + public void Error(Session ssn, ProtocolError error) + { + } + + public override void ExecutionResult(Session ssn, ExecutionResult result) + { + ssn.Result(result.GetCommandId(), result.GetValue()); + } + + public override void ExecutionException(Session ssn, ExecutionException exc) + { + ssn.AddException(exc); + } + + public override void SessionCompleted(Session ssn, SessionCompleted cmp) + { + RangeSet ranges = cmp.GetCommands(); + RangeSet known = null; + if (cmp.GetTimelyReply()) + { + known = new RangeSet(); + } + + if (ranges != null) + { + foreach (Range range in ranges) + { + bool advanced = ssn.Complete(range.Lower, range.Upper); + if (advanced && known != null) + { + known.Add(range); + } + } + } + + if (known != null) + { + ssn.SessionKnownCompleted(known); + } + } + + public override void SessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp) + { + RangeSet kc = kcmp.GetCommands(); + if (kc != null) + { + ssn.KnownComplete(kc); + } + } + + public override void SessionFlush(Session ssn, SessionFlush flush) + { + if (flush.GetCompleted()) + { + ssn.FlushProcessed(); + } + if (flush.GetConfirmed()) + { + ssn.FlushProcessed(); + } + if (flush.GetExpected()) + { + // to be done + //throw new Exception("not implemented"); + } + } + + public override void SessionCommandPoint(Session ssn, SessionCommandPoint scp) + { + ssn.CommandsIn = scp.GetCommandId(); + } + + public override void ExecutionSync(Session ssn, ExecutionSync sync) + { + ssn.SyncPoint(); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/Struct.cs b/qpid/dotnet/client-010/client/transport/Struct.cs new file mode 100644 index 0000000000..ff8d80fcb1 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/Struct.cs @@ -0,0 +1,121 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using System.Text; +using org.apache.qpid.transport.codec; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// Struct + /// </summary> + + public abstract class Struct : IEncodable + { + public static Struct Create(int type) + { + return StructFactory.create(type); + } + + bool dirty = true; + + public bool Dirty + { + get { return dirty; } + set { dirty = value; } + } + + public abstract int GetStructType(); + + public abstract int GetSizeWidth(); + + public abstract int GetPackWidth(); + + public int GetEncodedType() + { + int type = GetStructType(); + if (type < 0) + { + throw new Exception(); + } + return type; + } + + private bool IsBit<C, T>(Field<C, T> f) + { + return Equals(f.Type, typeof(Boolean)); + } + + private bool Packed() + { + return GetPackWidth() > 0; + } + + private bool Encoded<C, T>(Field<C, T> f) + { + return !Packed() || !IsBit(f) && f.Has(this); + } + + private int GetFlagWidth() + { + return (Fields.Count + 7) / 8; + } + + private int GetFlagCount() + { + return 8 * GetPackWidth(); + } + + public abstract void Read(IDecoder dec); + + public abstract void Write(IEncoder enc); + + public abstract Dictionary<String, Object> Fields + { + get; + } + + public override String ToString() + { + StringBuilder str = new StringBuilder(); + str.Append(GetType()); + str.Append("("); + bool first = true; + foreach (KeyValuePair<String, Object> me in Fields) + { + if (first) + { + first = false; + } + else + { + str.Append(", "); + } + str.Append(me.Key); + str.Append("="); + str.Append(me.Value); + } + str.Append(")"); + return str.ToString(); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs b/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs new file mode 100644 index 0000000000..2e9e587407 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs @@ -0,0 +1,399 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// AbstractDecoder + /// </summary> + public abstract class AbstractDecoder : IDecoder + { + private readonly Dictionary<Binary, String> str8cache = new Dictionary<Binary, String>(); + + protected abstract byte DoGet(); + + protected abstract void DoGet(byte[] bytes); + public abstract bool HasRemaining(); + + protected byte Get() + { + return DoGet(); + } + + protected void Get(byte[] bytes) + { + DoGet(bytes); + } + + protected Binary Get(int size) + { + byte[] bytes = new byte[size]; + Get(bytes); + return new Binary(bytes); + } + + protected short Uget() + { + return (short) (0xFF & Get()); + } + + public virtual short ReadUint8() + { + return Uget(); + } + + public abstract int ReadUint16(); + + + public abstract long ReadUint32(); + + + public int ReadSequenceNo() + { + return (int) ReadUint32(); + } + + public virtual long ReadUint64() + { + long l = 0; + for (int i = 0; i < 8; i++) + { + l |= ((long) (0xFF & Get())) << (56 - i*8); + } + return l; + } + + public abstract short ReadInt8(); + public abstract int ReadInt16(); + public abstract long ReadInt32() ; + public abstract long ReadInt64(); + public abstract float ReadFloat() ; + public abstract double ReadDouble() ; + + public long ReadDatetime() + { + return ReadUint64(); + } + + private static String Decode(byte[] bytes, int offset, int length, Encoding encoding) + { + return encoding.GetString(bytes, offset, length); + } + + private static String Decode(byte[] bytes, Encoding encoding) + { + return Decode(bytes, 0, bytes.Length, encoding); + } + + public String ReadStr8() + { + short size = ReadUint8(); + Binary bin = Get(size); + String str; + if (! str8cache.TryGetValue(bin, out str)) + { + str = Decode(bin.Array(), bin.Offset(), bin.Size(), Encoding.UTF8); + str8cache.Add(bin, str); + } + return str; + } + + public String ReadStr16() + { + int size = ReadUint16(); + byte[] bytes = new byte[size]; + Get(bytes); + return Decode(bytes, Encoding.UTF8); + } + + public byte[] ReadVbin8() + { + int size = ReadUint8(); + byte[] bytes = new byte[size]; + Get(bytes); + return bytes; + } + + public byte[] ReadVbin16() + { + int size = ReadUint16(); + byte[] bytes = new byte[size]; + Get(bytes); + return bytes; + } + + public byte[] ReadVbin32() + { + int size = (int) ReadUint32(); + byte[] bytes = new byte[size]; + Get(bytes); + return bytes; + } + + public RangeSet ReadSequenceSet() + { + int count = ReadUint16()/8; + if (count == 0) + { + return null; + } + RangeSet ranges = new RangeSet(); + for (int i = 0; i < count; i++) + { + ranges.Add(ReadSequenceNo(), ReadSequenceNo()); + } + return ranges; + } + + public RangeSet ReadByteRanges() + { + throw new Exception("not implemented"); + } + + public UUID ReadUuid() + { + long msb = ReadUint64(); + long lsb = ReadUint64(); + return new UUID(msb, lsb); + } + + public String ReadContent() + { + throw new Exception("Deprecated"); + } + + public Struct ReadStruct(int type) + { + Struct st = Struct.Create(type); + int width = st.GetSizeWidth(); + if (width > 0) + { + long size = ReadSize(width); + if (size == 0) + { + return null; + } + } + if (type > 0) + { + int code = ReadUint16(); + Debug.Assert(code == type); + } + st.Read(this); + return st; + } + + public Struct ReadStruct32() + { + long size = ReadUint32(); + if (size == 0) + { + return null; + } + int type = ReadUint16(); + Struct result = Struct.Create(type); + result.Read(this); + return result; + } + + public Dictionary<String, Object> ReadMap() + { + long size = ReadUint32(); + + if (size == 0) + { + return null; + } + + long count = ReadUint32(); + + Dictionary<String, Object> result = new Dictionary<String, Object>(); + for (int i = 0; i < count; i++) + { + String key = ReadStr8(); + byte code = Get(); + QpidType t = GetType(code); + Object value = Read(t); + result.Add(key, value); + } + + return result; + } + + public List<Object> ReadList() + { + long size = ReadUint32(); + + if (size == 0) + { + return null; + } + + long count = ReadUint32(); + + List<Object> result = new List<Object>(); + for (int i = 0; i < count; i++) + { + byte code = Get(); + QpidType t = GetType(code); + Object value = Read(t); + result.Add(value); + } + return result; + } + + public List<Object> ReadArray() + { + long size = ReadUint32(); + + if (size == 0) + { + return null; + } + + byte code = Get(); + QpidType t = GetType(code); + long count = ReadUint32(); + + List<Object> result = new List<Object>(); + for (int i = 0; i < count; i++) + { + Object value = Read(t); + result.Add(value); + } + return result; + } + + private QpidType GetType(byte code) + { + return QpidType.get(code); + } + + private long ReadSize(QpidType t) + { + return t.Fixed ? t.Width : ReadSize(t.Width); + } + + private long ReadSize(int width) + { + switch (width) + { + case 1: + return ReadUint8(); + case 2: + return ReadUint16(); + case 4: + return ReadUint32(); + default: + throw new Exception("illegal width: " + width); + } + } + + private byte[] ReadBytes(QpidType t) + { + long size = ReadSize(t); + byte[] result = new byte[(int) size]; + Get(result); + return result; + } + + private Object Read(QpidType t) + { + switch (t.Code) + { + case Code.BIN8: + case Code.UINT8: + return ReadUint8(); + case Code.INT8: + return Get(); + case Code.CHAR: + return (char) Get(); + case Code.BOOLEAN: + return Get() > 0; + + case Code.BIN16: + case Code.UINT16: + return ReadUint16(); + case Code.INT16: + return (short) ReadUint16(); + + case Code.BIN32: + case Code.UINT32: + return ReadUint32(); + + case Code.CHAR_UTF32: + case Code.INT32: + return (int) ReadUint32(); + + case Code.FLOAT: + return (float)BitConverter.Int64BitsToDouble(ReadUint32() << 32); + + case Code.BIN64: + case Code.UINT64: + case Code.INT64: + case Code.DATETIME: + return ReadUint64(); + + case Code.DOUBLE: + return BitConverter.Int64BitsToDouble(ReadUint64()); + case Code.UUID: + return ReadUuid(); + case Code.STR8: + return ReadStr8(); + case Code.STR16: + return ReadStr16(); + case Code.STR8_LATIN: + case Code.STR8_UTF16: + case Code.STR16_LATIN: + case Code.STR16_UTF16: + // XXX: need to do character conversion + return Encoding.UTF8.GetString(ReadBytes(t)); + + case Code.MAP: + return ReadMap(); + case Code.LIST: + return ReadList(); + case Code.ARRAY: + return ReadArray(); + case Code.STRUCT32: + return ReadStruct32(); + + case Code.BIN40: + case Code.DEC32: + case Code.BIN72: + case Code.DEC64: + // XXX: what types are we supposed to use here? + return ReadBytes(t); + + case Code.VOID: + return null; + + default: + return ReadBytes(t); + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/AbstractEncoder.cs b/qpid/dotnet/client-010/client/transport/codec/AbstractEncoder.cs new file mode 100644 index 0000000000..eb8bdae80a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/AbstractEncoder.cs @@ -0,0 +1,590 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// AbstractEncoder + /// </summary> + public abstract class AbstractEncoder : IEncoder + { + private static readonly Dictionary<Type, Code> ENCODINGS = new Dictionary<Type, Code>(); + private readonly Dictionary<String, byte[]> str8cache = new Dictionary<String, byte[]>(); + + static AbstractEncoder() + { + ENCODINGS.Add(typeof (Boolean), Code.BOOLEAN); + ENCODINGS.Add(typeof (String), Code.STR16); + ENCODINGS.Add(typeof (long), Code.INT64); + ENCODINGS.Add(typeof (int), Code.INT32); + ENCODINGS.Add(typeof (short), Code.INT16); + ENCODINGS.Add(typeof (Byte), Code.INT8); + ENCODINGS.Add(typeof (Dictionary<String, Object>), Code.MAP); + ENCODINGS.Add(typeof (List<Object>), Code.LIST); + ENCODINGS.Add(typeof (float), Code.FLOAT); + ENCODINGS.Add(typeof (Double), Code.DOUBLE); + ENCODINGS.Add(typeof (char), Code.CHAR); + ENCODINGS.Add(typeof (byte[]), Code.VBIN32); + ENCODINGS.Add(typeof (UUID), Code.UUID); + } + + protected abstract void DoPut(byte b); + + protected abstract void DoPut(MemoryStream src); + + + protected void Put(byte b) + { + DoPut(b); + } + + protected void Put(MemoryStream src) + { + DoPut(src); + } + + protected virtual void Put(byte[] bytes) + { + Put(new MemoryStream(bytes)); + } + + protected abstract int BeginSize8(); + protected abstract void EndSize8(int pos); + + protected abstract int BeginSize16(); + protected abstract void EndSize16(int pos); + + protected abstract int BeginSize32(); + protected abstract void EndSize32(int pos); + + public virtual void WriteUint8(short b) + { + Debug.Assert(b < 0x100); + Put((byte) b); + } + + public virtual void WriteUint16(int s) + { + Debug.Assert(s < 0x10000); + Put((byte) Functions.Lsb(s >> 8)); + Put((byte) Functions.Lsb(s)); + } + + public virtual void WriteUint32(long i) + { + Debug.Assert(i < 0x100000000L); + Put((byte) Functions.Lsb(i >> 24)); + Put((byte) Functions.Lsb(i >> 16)); + Put((byte) Functions.Lsb(i >> 8)); + Put((byte) Functions.Lsb(i)); + } + + public void WriteSequenceNo(int i) + { + WriteUint32(i); + } + + public virtual void WriteUint64(long l) + { + for (int i = 0; i < 8; i++) + { + Put((byte) Functions.Lsb(l >> (56 - i*8))); + } + } + + public abstract void WriteInt8(short b) ; + public abstract void WriteInt16(int s) ; + public abstract void WriteInt32(long i) ; + public abstract void WriteInt64(long l) ; + public abstract void WriteFloat(float f) ; + public abstract void WriteDouble(double d) ; + + public void WriteDatetime(long l) + { + WriteUint64(l); + } + + private static byte[] Encode(String s, Encoding encoding) + { + return encoding.GetBytes(s); + } + + public void WriteStr8(String s) + { + if (s == null) + { + s = ""; + } + + byte[] bytes; + if (! str8cache.ContainsKey(s)) + { + bytes = Encode(s, System.Text.Encoding.UTF8); + str8cache.Add(s, bytes); + } + else + { + bytes = str8cache[s]; + } + WriteUint8((short) bytes.Length); + Put(bytes); + } + + public void WriteStr16(String s) + { + if (s == null) + { + s = ""; + } + + byte[] bytes = Encode(s, System.Text.Encoding.UTF8); + WriteUint16(bytes.Length); + Put(bytes); + } + + public void WriteVbin8(byte[] bytes) + { + if (bytes == null) + { + bytes = new byte[0]; + } + if (bytes.Length > 255) + { + throw new Exception("array too long: " + bytes.Length); + } + WriteUint8((short) bytes.Length); + Put(bytes); + } + + public void WriteVbin16(byte[] bytes) + { + if (bytes == null) + { + bytes = new byte[0]; + } + WriteUint16(bytes.Length); + Put(bytes); + } + + public void WriteVbin32(byte[] bytes) + { + if (bytes == null) + { + bytes = new byte[0]; + } + WriteUint32(bytes.Length); + Put(bytes); + } + + public void WriteSequenceSet(RangeSet ranges) + { + if (ranges == null) + { + WriteUint16(0); + } + else + { + WriteUint16(ranges.Size()*8); + foreach (Range range in ranges) + { + WriteSequenceNo(range.Lower); + WriteSequenceNo(range.Upper); + } + } + } + + public void WriteByteRanges(RangeSet ranges) + { + throw new Exception("not implemented"); + } + + public void WriteUuid(UUID uuid) + { + long msb = 0; + long lsb = 0; + if (uuid != null) + { + msb = uuid.MostSignificantBits; + lsb = uuid.LeastSignificantBits; + } + WriteUint64(msb); + WriteUint64(lsb); + } + + public void WriteStruct(int type, Struct s) + { + if (s == null) + { + s = Struct.Create(type); + } + + int width = s.GetSizeWidth(); + int pos = -1; + if (width > 0) + { + pos = BeginSize(width); + } + + if (type > 0) + { + WriteUint16(type); + } + + s.Write(this); + + if (width > 0) + { + EndSize(width, pos); + } + } + + public void WriteStruct32(Struct s) + { + if (s == null) + { + WriteUint32(0); + } + else + { + int pos = BeginSize32(); + WriteUint16(s.GetEncodedType()); + s.Write(this); + EndSize32(pos); + } + } + + private Code Encoding(Object value) + { + if (value == null) + { + return Code.VOID; + } + + Type klass = value.GetType(); + Code type = Resolve(klass); + + if (type == Code.VOID) + { + throw new Exception + ("unable to resolve type: " + klass + ", " + value); + } + else + { + return type; + } + } + + private static Code Resolve(Type klass) + { + Code type; + if(ENCODINGS.ContainsKey(klass)) + { + return ENCODINGS[klass]; + } + + Type sup = klass.BaseType; + if (sup != null) + { + type = Resolve(sup); + + if (type != Code.VOID) + { + return type; + } + } + foreach (Type iface in klass.GetInterfaces()) + { + type = Resolve(iface); + if (type != Code.VOID) + { + return type; + } + } + return Code.VOID; + } + + public void WriteMap(Dictionary<String, Object> map) + { + int pos = BeginSize32(); + if (map != null) + { + WriteUint32(map.Count); + WriteMapEntries(map); + } + EndSize32(pos); + } + + protected void WriteMapEntries(Dictionary<String, Object> map) + { + foreach (KeyValuePair<String, Object> entry in map) + { + String key = entry.Key; + Object value = entry.Value; + Code type = Encoding(value); + WriteStr8(key); + Put((byte) type); + Write(type, value); + } + } + + public void WriteList(List<Object> list) + { + int pos = BeginSize32(); + if (list != null) + { + WriteUint32(list.Count); + WriteListEntries(list); + } + EndSize32(pos); + } + + protected void WriteListEntries(List<Object> list) + { + foreach (Object value in list) + { + Code type = Encoding(value); + Put((byte) type); + Write(type, value); + } + } + + public void WriteArray(List<Object> array) + { + int pos = BeginSize32(); + if (array != null) + { + WriteArrayEntries(array); + } + EndSize32(pos); + } + + protected void WriteArrayEntries(List<Object> array) + { + Code type; + + if (array.Count == 0) + { + return; + } + else + { + type = Encoding(array[0]); + } + Put((byte) type); + WriteUint32(array.Count); + + foreach (Object value in array) + { + Write(type, value); + } + } + + private void WriteSize(QpidType t, int size) + { + if (t.Fixed) + { + if (size != t.width) + { + throw new Exception("size does not match fixed width " + t.width + ": " + size); + } + } + else + { + WriteSize(t.width, size); + } + } + + private void WriteSize(int width, int size) + { + // XXX: should check lengths + switch (width) + { + case 1: + WriteUint8((short) size); + break; + case 2: + WriteUint16(size); + break; + case 4: + WriteUint32(size); + break; + default: + throw new Exception("illegal width: " + width); + } + } + + private int BeginSize(int width) + { + switch (width) + { + case 1: + return BeginSize8(); + case 2: + return BeginSize16(); + case 4: + return BeginSize32(); + default: + throw new Exception("illegal width: " + width); + } + } + + private void EndSize(int width, int pos) + { + switch (width) + { + case 1: + EndSize8(pos); + break; + case 2: + EndSize16(pos); + break; + case 4: + EndSize32(pos); + break; + default: + throw new Exception("illegal width: " + width); + } + } + + private void WriteBytes(QpidType t, byte[] bytes) + { + WriteSize(t, bytes.Length); + Put(bytes); + } + + private void Write(Code t, Object value) + { + switch (t) + { + case Code.BIN8: + case Code.UINT8: + WriteUint8((short) value); + break; + case Code.INT8: + Put((Byte) value); + break; + case Code.CHAR: + byte[] b = BitConverter.GetBytes((char) value); + Put(b[0]); + break; + case Code.BOOLEAN: + if ((bool) value) + { + Put(1); + } + else + { + Put(0); + } + + break; + + case Code.BIN16: + case Code.UINT16: + WriteUint16((int) value); + break; + + case Code.INT16: + WriteUint16((short) value); + break; + + case Code.BIN32: + case Code.UINT32: + WriteUint32((long) value); + break; + + case Code.CHAR_UTF32: + case Code.INT32: + WriteUint32((int) value); + break; + + case Code.FLOAT: + WriteUint32(BitConverter.DoubleToInt64Bits((float) value) >> 32); + break; + + case Code.BIN64: + case Code.UINT64: + case Code.INT64: + case Code.DATETIME: + WriteUint64((long) value); + break; + + case Code.DOUBLE: + WriteUint64( BitConverter.DoubleToInt64Bits((double) value)); + break; + + case Code.UUID: + WriteUuid((UUID) value); + break; + + case Code.STR8: + WriteStr8((string) value); + break; + + case Code.STR16: + WriteStr16((string) value); + break; + + case Code.STR8_LATIN: + case Code.STR8_UTF16: + case Code.STR16_LATIN: + case Code.STR16_UTF16: + // XXX: need to do character conversion + WriteBytes(QpidType.get((byte) t), Encode((string) value, System.Text.Encoding.Unicode)); + break; + + case Code.MAP: + WriteMap((Dictionary<String, Object>) value); + break; + case Code.LIST: + WriteList((List<Object>) value); + break; + case Code.ARRAY: + WriteList((List<Object>) value); + break; + case Code.STRUCT32: + WriteStruct32((Struct) value); + break; + + case Code.BIN40: + case Code.DEC32: + case Code.BIN72: + case Code.DEC64: + // XXX: what types are we supposed to use here? + WriteBytes(QpidType.get((byte) t), (byte[]) value); + break; + + case Code.VOID: + break; + + default: + WriteBytes(QpidType.get((byte) t), (byte[]) value); + break; + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/Decoder.cs b/qpid/dotnet/client-010/client/transport/codec/Decoder.cs new file mode 100644 index 0000000000..9afc23fd4e --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/Decoder.cs @@ -0,0 +1,72 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// Decoder + /// </summary> + + public interface Decoder + { + + bool hasRemaining(); + + short readUint8(); + int readUint16(); + long readUint32(); + long readUint64(); + + short readInt8(); + int readInt16(); + long readInt32(); + long readInt64(); + + double readDouble() ; + float readFloat() ; + long readDatetime(); + + UUID readUuid(); + + int readSequenceNo(); + RangeSet readSequenceSet(); // XXX + RangeSet readByteRanges(); // XXX + + String readStr8(); + String readStr16(); + + byte[] readVbin8(); + byte[] readVbin16(); + byte[] readVbin32(); + + Struct readStruct32(); + Dictionary<String, Object> readMap(); + List<Object> readList(); + List<Object> readArray(); + + Struct readStruct(int type); + } + +} diff --git a/qpid/dotnet/client-010/client/transport/codec/Encodable.cs b/qpid/dotnet/client-010/client/transport/codec/Encodable.cs new file mode 100644 index 0000000000..71f4f62458 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/Encodable.cs @@ -0,0 +1,37 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.codec +{ + + + /// <summary> + /// Encodable + /// </summary> + + public interface Encodable + { + + void write(Encoder enc); + + void read(Decoder dec); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/codec/Encoder.cs b/qpid/dotnet/client-010/client/transport/codec/Encoder.cs new file mode 100644 index 0000000000..282e3ff5b5 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/Encoder.cs @@ -0,0 +1,70 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using org.apache.qpid.transport.util; +using RangeSet = org.apache.qpid.transport.RangeSet; +using Struct = org.apache.qpid.transport.Struct; +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// Encoder + /// </summary> + + public interface Encoder + { + + void writeUint8(short b); + void writeUint16(int s); + void writeUint32(long i); + void writeUint64(long l); + + void writeInt8(short b); + void writeInt16(int s); + void writeInt32(long i); + void writeInt64(long l); + + void writeFloat(float f) ; + void writeDouble(double d) ; + + void writeDatetime(long l); + void writeUuid(UUID uuid); + + void writeSequenceNo(int s); + void writeSequenceSet(RangeSet ranges); // XXX + void writeByteRanges(RangeSet ranges); // XXX + + void writeStr8(string s); + void writeStr16(string s); + + void writeVbin8(byte[] bytes); + void writeVbin16(byte[] bytes); + void writeVbin32(byte[] bytes); + + void writeStruct32(Struct s); + void writeMap(Dictionary<String, Object> map); + void writeList(List<Object> list); + void writeArray(List<Object> array); + + void writeStruct(int type, Struct s); + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/IDecoder.cs b/qpid/dotnet/client-010/client/transport/codec/IDecoder.cs new file mode 100644 index 0000000000..7de2e93fe7 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/IDecoder.cs @@ -0,0 +1,72 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// Decoder + /// </summary> + + public interface IDecoder + { + + bool HasRemaining(); + + short ReadUint8(); + int ReadUint16(); + long ReadUint32(); + long ReadUint64(); + + short ReadInt8(); + int ReadInt16(); + long ReadInt32(); + long ReadInt64(); + + double ReadDouble() ; + float ReadFloat() ; + long ReadDatetime(); + + UUID ReadUuid(); + + int ReadSequenceNo(); + RangeSet ReadSequenceSet(); // XXX + RangeSet ReadByteRanges(); // XXX + + String ReadStr8(); + String ReadStr16(); + + byte[] ReadVbin8(); + byte[] ReadVbin16(); + byte[] ReadVbin32(); + + Struct ReadStruct32(); + Dictionary<String, Object> ReadMap(); + List<Object> ReadList(); + List<Object> ReadArray(); + + Struct ReadStruct(int type); + } + +} diff --git a/qpid/dotnet/client-010/client/transport/codec/IEncodable.cs b/qpid/dotnet/client-010/client/transport/codec/IEncodable.cs new file mode 100644 index 0000000000..5c63e17fdd --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/IEncodable.cs @@ -0,0 +1,37 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.codec +{ + + + /// <summary> + /// Encodable + /// </summary> + + public interface IEncodable + { + + void Write(IEncoder enc); + + void Read(IDecoder dec); + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/IEncoder.cs b/qpid/dotnet/client-010/client/transport/codec/IEncoder.cs new file mode 100644 index 0000000000..4ffc852052 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/IEncoder.cs @@ -0,0 +1,70 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using org.apache.qpid.transport.util; +using RangeSet = org.apache.qpid.transport.RangeSet; +using Struct = org.apache.qpid.transport.Struct; +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// Encoder + /// </summary> + + public interface IEncoder + { + + void WriteUint8(short b); + void WriteUint16(int s); + void WriteUint32(long i); + void WriteUint64(long l); + + void WriteInt8(short b); + void WriteInt16(int s); + void WriteInt32(long i); + void WriteInt64(long l); + + void WriteFloat(float f) ; + void WriteDouble(double d) ; + + void WriteDatetime(long l); + void WriteUuid(UUID uuid); + + void WriteSequenceNo(int s); + void WriteSequenceSet(RangeSet ranges); // XXX + void WriteByteRanges(RangeSet ranges); // XXX + + void WriteStr8(string s); + void WriteStr16(string s); + + void WriteVbin8(byte[] bytes); + void WriteVbin16(byte[] bytes); + void WriteVbin32(byte[] bytes); + + void WriteStruct32(Struct s); + void WriteMap(Dictionary<String, Object> map); + void WriteList(List<Object> list); + void WriteArray(List<Object> array); + + void WriteStruct(int type, Struct s); + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/MSDecoder.cs b/qpid/dotnet/client-010/client/transport/codec/MSDecoder.cs new file mode 100644 index 0000000000..59731b739a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/MSDecoder.cs @@ -0,0 +1,110 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.IO; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.codec +{ + + + /// <summary> + /// MSDecoder + /// + /// </summary> + + + public sealed class MSDecoder : AbstractDecoder + { + + private BinaryReader _reader; + + public void Init(MemoryStream st) + { + _reader = new BinaryReader(st, Encoding.BigEndianUnicode); + } + + protected override byte DoGet() + { + return _reader.ReadByte(); + } + + protected override void DoGet(byte[] bytes) + { + _reader.Read(bytes, 0, bytes.Length); + } + + public override bool HasRemaining() + { + return (_reader.BaseStream.Position < _reader.BaseStream.Length); + } + + public override short ReadUint8() + { + return (short) (0xFF & _reader.ReadByte()); + } + + public override int ReadUint16() + { + return ByteEncoder.GetBigEndian((UInt16) _reader.ReadInt16()); + } + + public override long ReadUint32() + { + return ByteEncoder.GetBigEndian((UInt32) _reader.ReadInt32()); + } + + public override long ReadUint64() + { + return (long) ByteEncoder.GetBigEndian(_reader.ReadInt64()); + } + + public override short ReadInt8() + { + return (short) (0xFF & _reader.ReadByte()); + } + + public override int ReadInt16() + { + return ByteEncoder.GetBigEndian((Int16) _reader.ReadInt16()); + } + + public override long ReadInt32() + { + return ByteEncoder.GetBigEndian((Int32) _reader.ReadInt32()); + } + + public override long ReadInt64() + { + return (long) ByteEncoder.GetBigEndian(_reader.ReadInt64()); + } + + public override double ReadDouble() { + return (double) ByteEncoder.GetBigEndian(_reader.ReadDouble()) ; + } + + public override float ReadFloat() { + return (float) _reader.ReadSingle() ; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/codec/MSEncoder.cs b/qpid/dotnet/client-010/client/transport/codec/MSEncoder.cs new file mode 100644 index 0000000000..d863c57dee --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/codec/MSEncoder.cs @@ -0,0 +1,172 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Diagnostics; +using System.IO; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.codec +{ + /// <summary> + /// MSEncoder + /// </summary> + public sealed class MSEncoder : AbstractEncoder + { + private readonly MemoryStream _out; + private readonly BinaryWriter _writer; + + public MSEncoder(int capacity) + { + _out = new MemoryStream(capacity); + _writer = new BinaryWriter(_out); + } + + public void Init() + { + _out.Seek(0, SeekOrigin.Begin); + } + + public MemoryStream Segment() + { + int length = (int) _out.Position; + MemoryStream result = new MemoryStream(_out.ToArray(), 0, length); + result.Seek(length, SeekOrigin.Begin); + _out.Seek(0, SeekOrigin.Begin); + return result; + } + + + protected override void DoPut(byte b) + { + _writer.Write(b); + } + + protected override void DoPut(MemoryStream src) + { + _writer.Write(src.ToArray()); + } + + protected override void Put(byte[] bytes) + { + _writer.Write(bytes); + } + + public override void WriteUint8(short b) + { + Debug.Assert(b < 0x100); + _writer.Write((byte) b); + } + + public override void WriteUint16(int s) + { + Debug.Assert(s < 0x10000); + _writer.Write(ByteEncoder.GetBigEndian((UInt16) s)); + } + + public override void WriteUint32(long i) + { + Debug.Assert(i < 0x100000000L); + _writer.Write(ByteEncoder.GetBigEndian((UInt32) i)); + } + + public override void WriteUint64(long l) + { + _writer.Write(ByteEncoder.GetBigEndian(l)); + } + + public override void WriteInt8(short b) + { + Debug.Assert(b < 0x100); + _writer.Write((byte) b); + } + + public override void WriteInt16(int s) + { + Debug.Assert(s < 0x10000); + _writer.Write(ByteEncoder.GetBigEndian((Int16) s)); + } + + public override void WriteInt32(long i) + { + Debug.Assert(i < 0x100000000L); + _writer.Write(ByteEncoder.GetBigEndian((Int32) i)); + } + + public override void WriteInt64(long l) + { + _writer.Write(ByteEncoder.GetBigEndian(l)); + } + + public override void WriteFloat(float f) { + _writer.Write(f) ; + } + + public override void WriteDouble(double d) { + _writer.Write(ByteEncoder.GetBigEndian(d)) ; + } + + protected override int BeginSize8() + { + int pos = (int) _out.Position; + _writer.Write((byte) 0); + return pos; + } + + protected override void EndSize8(int pos) + { + int cur = (int) _out.Position; + _out.Seek(pos, SeekOrigin.Begin); + _writer.Write((byte) (cur - pos - 1)); + _out.Seek(cur, SeekOrigin.Begin); + } + + protected override int BeginSize16() + { + int pos = (int) _out.Position; + _writer.Write((short) 0); + return pos; + } + + protected override void EndSize16(int pos) + { + int cur = (int) _out.Position; + _out.Seek(pos, SeekOrigin.Begin); + _writer.Write((short) (cur - pos - 2)); + _out.Seek(cur, SeekOrigin.Begin); + } + + protected override int BeginSize32() + { + int pos = (int) _out.Position; + _writer.Write(0); + return pos; + } + + protected override void EndSize32(int pos) + { + int cur = (int) _out.Position; + _out.Seek(pos, SeekOrigin.Begin); + _writer.Write(ByteEncoder.GetBigEndian((Int32) cur - pos - 4)); + _out.Seek(cur, SeekOrigin.Begin); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/exception/ConnectionException.cs b/qpid/dotnet/client-010/client/transport/exception/ConnectionException.cs new file mode 100644 index 0000000000..cbf5e39e52 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/exception/ConnectionException.cs @@ -0,0 +1,49 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +namespace org.apache.qpid.transport +{ + + + /// <summary> + /// ConnectionException + /// </summary> + + [Serializable] + public class ConnectionException : Exception + { + virtual public ConnectionClose Close + { + get + { + return _close; + } + + } + + private ConnectionClose _close; + + public ConnectionException(ConnectionClose close):base(close.GetReplyText()) + { + _close = close; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/exception/ExceptionArgs.cs b/qpid/dotnet/client-010/client/transport/exception/ExceptionArgs.cs new file mode 100644 index 0000000000..01793a6ad0 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/exception/ExceptionArgs.cs @@ -0,0 +1,41 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; + +namespace org.apache.qpid.transport +{ + public class ExceptionArgs : EventArgs + { + public ExceptionArgs(Exception e) + { + _exception = e; + } + private Exception _exception; + + public Exception Exception + { + get { return _exception; } + set { _exception = value; } + } + + } +} diff --git a/qpid/dotnet/client-010/client/transport/exception/ProtocolVersionException.cs b/qpid/dotnet/client-010/client/transport/exception/ProtocolVersionException.cs new file mode 100644 index 0000000000..f18fc1173f --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/exception/ProtocolVersionException.cs @@ -0,0 +1,59 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +namespace org.apache.qpid.transport +{ + + + /// <summary> ProtocolVersionException + /// + /// </summary> + + [Serializable] + public sealed class ProtocolVersionException:TransportException + { + public sbyte Major + { + get + { + return _major; + } + + } + public sbyte Minor + { + get + { + return _minor; + } + + } + + private sbyte _major; + private sbyte _minor; + + public ProtocolVersionException(sbyte major, sbyte minor):base(String.Format("version missmatch: %{0}-{1}", major, minor)) + { + this._major = major; + this._minor = minor; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/exception/SessionClosedException.cs b/qpid/dotnet/client-010/client/transport/exception/SessionClosedException.cs new file mode 100644 index 0000000000..89453433ee --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/exception/SessionClosedException.cs @@ -0,0 +1,38 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System.Collections.Generic; + +namespace org.apache.qpid.transport +{ + + + /// <summary> + /// SessionClosedException + /// </summary> + + public class SessionClosedException : SessionException + { + + public SessionClosedException(): base(new List<ExecutionException>()) + { + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/exception/SessionException.cs b/qpid/dotnet/client-010/client/transport/exception/SessionException.cs new file mode 100644 index 0000000000..f02ffa5c2f --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/exception/SessionException.cs @@ -0,0 +1,45 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.Collections.Generic; + +namespace org.apache.qpid.transport +{ + /// <summary> + /// SessionException + /// </summary> + public class SessionException : Exception + { + private readonly List<ExecutionException> _exceptions; + + public SessionException(List<ExecutionException> exceptions) + : base(exceptions.Count == 0 ? "" : exceptions.ToString()) + + { + _exceptions = exceptions; + } + + public List<ExecutionException> Exceptions + { + get { return _exceptions; } + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/exception/TransportException.cs b/qpid/dotnet/client-010/client/transport/exception/TransportException.cs new file mode 100644 index 0000000000..d016f90a83 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/exception/TransportException.cs @@ -0,0 +1,46 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +namespace org.apache.qpid.transport +{ + + + /// <summary> + /// TransportException + /// </summary> + + + public class TransportException : Exception + { + public TransportException(String msg) : base(msg) + { + } + + public TransportException(String msg, Exception cause) : base(msg, cause) + { + } + + public TransportException(Exception cause): base("Transport Exception", cause) + { + } + + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/Assembler.cs b/qpid/dotnet/client-010/client/transport/network/Assembler.cs new file mode 100644 index 0000000000..ff85f11c2f --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Assembler.cs @@ -0,0 +1,254 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Collections.Generic; +using System.IO; +using org.apache.qpid.transport.codec; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// Assembler + /// </summary> + public delegate void Processor(INetworkDelegate ndelegate); + + public class Assembler : INetworkDelegate, IReceiver<ReceivedPayload<IProtocolEvent>> + { + private static readonly Logger log = Logger.Get(typeof (Assembler)); + private readonly Dictionary<int, List<byte[]>> segments; + private readonly Method[] incomplete; + [ThreadStatic] static MSDecoder _decoder; + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + public event EventHandler<ReceivedPayload<IProtocolEvent>> ReceivedEvent; + public event EventHandler Closed; + + + // Not in use : + public event EventHandler<ExceptionArgs> Exception; + + event EventHandler<ReceivedPayload<IProtocolEvent>> IReceiver<ReceivedPayload<IProtocolEvent>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedEvent += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedEvent -= value; + } + } + } + + public Assembler() + { + segments = new Dictionary<int, List<byte[]>>(); + incomplete = new Method[64*1024]; + } + + // Invoked when a network event is received + public void On_ReceivedEvent(object sender, ReceivedPayload<INetworkEvent> payload) + { + payload.Payload.ProcessNetworkEvent(this); + } + + #region Interface INetworkDelegate + + public void Init(ProtocolHeader header) + { + Emit(0, header); + } + + public void Error(ProtocolError error) + { + Emit(0, error); + } + + public void Frame(Frame frame) + { + MemoryStream segment; + if (frame.IsFirstFrame() && frame.IsLastFrame()) + { + byte[] tmp = new byte[frame.BodySize]; + frame.Body.Read(tmp, 0, tmp.Length); + segment = new MemoryStream(); + BinaryWriter w = new BinaryWriter(segment); + w.Write(tmp); + Assemble(frame, new MemoryStream(tmp)); + } + else + { + List<byte[]> frames; + if (frame.IsFirstFrame()) + { + frames = new List<byte[]>(); + SetSegment(frame, frames); + } + else + { + frames = GetSegment(frame); + } + byte[] tmp = new byte[frame.BodySize]; + frame.Body.Read(tmp, 0, tmp.Length); + frames.Add(tmp); + + if (frame.IsLastFrame()) + { + ClearSegment(frame); + segment = new MemoryStream(); + BinaryWriter w = new BinaryWriter(segment); + foreach (byte[] f in frames) + { + w.Write(f); + } + Assemble(frame, segment); + } + } + } + + #endregion + + #region Private Support Functions + + + private MSDecoder GetDecoder() + { + if( _decoder == null ) + { + _decoder = new MSDecoder(); + } + return _decoder; + } + + private void Assemble(Frame frame, MemoryStream segment) + { + MSDecoder decoder = GetDecoder(); + decoder.Init(segment); + int channel = frame.Channel; + Method command; + switch (frame.Type) + { + case SegmentType.CONTROL: + int controlType = decoder.ReadUint16(); + Method control = Method.Create(controlType); + control.Read(decoder); + Emit(channel, control); + break; + case SegmentType.COMMAND: + int commandType = decoder.ReadUint16(); + // read in the session header, right now we don't use it + decoder.ReadUint16(); + command = Method.Create(commandType); + command.Read(decoder); + if (command.HasPayload()) + { + incomplete[channel] = command; + } + else + { + Emit(channel, command); + } + break; + case SegmentType.HEADER: + command = incomplete[channel]; + List<Struct> structs = new List<Struct>(); + while (decoder.HasRemaining()) + { + structs.Add(decoder.ReadStruct32()); + } + command.Header = new Header(structs); + if (frame.IsLastSegment()) + { + incomplete[channel] = null; + Emit(channel, command); + } + break; + case SegmentType.BODY: + command = incomplete[channel]; + segment.Seek(0, SeekOrigin.Begin); + command.Body = segment; + incomplete[channel] = null; + Emit(channel, command); + break; + default: + throw new Exception("unknown frame type: " + frame.Type); + } + } + + private int SegmentKey(Frame frame) + { + return (frame.Track + 1)*frame.Channel; + } + + private List<byte[]> GetSegment(Frame frame) + { + return segments[SegmentKey(frame)]; + } + + private void SetSegment(Frame frame, List<byte[]> segment) + { + int key = SegmentKey(frame); + if (segments.ContainsKey(key)) + { + Error(new ProtocolError(network.Frame.L2, "segment in progress: %s", + frame)); + } + segments.Add(SegmentKey(frame), segment); + } + + private void ClearSegment(Frame frame) + { + segments.Remove(SegmentKey(frame)); + } + + // Emit a protocol event + private void Emit(int channel, IProtocolEvent protevent) + { + protevent.Channel = channel; + log.Debug("Assembler: protocol event:", protevent); + ReceivedPayload<IProtocolEvent> payload = new ReceivedPayload<IProtocolEvent>(); + payload.Payload = protevent; + + if (protevent is ConnectionCloseOk) + { + if (Closed != null) + Closed(this, EventArgs.Empty); + } + else + { + if (ReceivedEvent != null) + ReceivedEvent(this, payload); + else + log.Debug("No listener for event: {0}", protevent); + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/Disassembler.cs b/qpid/dotnet/client-010/client/transport/network/Disassembler.cs new file mode 100644 index 0000000000..3f0a6a8974 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Disassembler.cs @@ -0,0 +1,222 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using org.apache.qpid.transport.codec; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// Disassembler + /// </summary> + public sealed class Disassembler : ISender<IProtocolEvent>, IProtocolDelegate<Object> + { + private readonly IIoSender<MemoryStream> _sender; + private readonly int _maxPayload; + private readonly MemoryStream _header; + private readonly BinaryWriter _writer; + private readonly Object _sendlock = new Object(); + [ThreadStatic] static MSEncoder _encoder; + + + public Disassembler(IIoSender<MemoryStream> sender, int maxFrame) + { + if (maxFrame <= network.Frame.HEADER_SIZE || maxFrame >= 64*1024) + { + throw new Exception(String.Format("maxFrame must be > {0} and < 64K: ", network.Frame.HEADER_SIZE) + maxFrame); + } + _sender = sender; + _maxPayload = maxFrame - network.Frame.HEADER_SIZE; + _header = new MemoryStream(network.Frame.HEADER_SIZE); + _writer = new BinaryWriter(_header); + } + + #region Sender Interface + + public void Send(IProtocolEvent pevent) + { + pevent.ProcessProtocolEvent(null, this); + } + + public void Flush() + { + lock (_sendlock) + { + _sender.Flush(); + } + } + + public void Close() + { + lock (_sendlock) + { + _sender.Close(); + } + } + + #endregion + + #region ProtocolDelegate<Object> Interface + + public void Init(Object v, ProtocolHeader header) + { + lock (_sendlock) + { + _sender.Send(header.ToMemoryStream()); + _sender.Flush(); + } + } + + public void Control(Object v, Method method) + { + InvokeMethod(method, SegmentType.CONTROL); + } + + public void Command(Object v, Method method) + { + InvokeMethod(method, SegmentType.COMMAND); + } + + public void Error(Object v, ProtocolError error) + { + throw new Exception("Error: " + error); + } + + #endregion + + #region private + + private void Frame(byte flags, byte type, byte track, int channel, int size, MemoryStream buf) + { + lock (_sendlock) + { + _writer.Write(flags); + _writer.Write(type); + _writer.Write(ByteEncoder.GetBigEndian((UInt16)(size + network.Frame.HEADER_SIZE))); + _writer.Write((byte)0); + _writer.Write(track); + _writer.Write(ByteEncoder.GetBigEndian((UInt16)( channel))); + _writer.Write((byte)0); + _writer.Write((byte)0); + _writer.Write((byte)0); + _writer.Write((byte)0); + _sender.Send(_header); + _header.Seek(0, SeekOrigin.Begin); + _sender.Send(buf, size); + } + } + + private void Fragment(byte flags, SegmentType type, IProtocolEvent mevent, MemoryStream buf) + { + byte typeb = (byte) type; + byte track = mevent.EncodedTrack == network.Frame.L4 ? (byte) 1 : (byte) 0; + int remaining = (int) buf.Length; + buf.Seek(0, SeekOrigin.Begin); + bool first = true; + while (true) + { + int size = Math.Min(_maxPayload, remaining); + remaining -= size; + + byte newflags = flags; + if (first) + { + newflags |= network.Frame.FIRST_FRAME; + first = false; + } + if (remaining == 0) + { + newflags |= network.Frame.LAST_FRAME; + } + + Frame(newflags, typeb, track, mevent.Channel, size, buf); + + if (remaining == 0) + { + break; + } + } + } + + private MSEncoder GetEncoder() + { + if( _encoder == null) + { + _encoder = new MSEncoder(4 * 1024); + } + return _encoder; + } + + private void InvokeMethod(Method method, SegmentType type) + { + MSEncoder encoder = GetEncoder(); + encoder.Init(); + encoder.WriteUint16(method.GetEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.Sync) + { + encoder.WriteUint16(0x0101); + } + else + { + encoder.WriteUint16(0x0100); + } + } + method.Write(_encoder); + MemoryStream methodSeg = encoder.Segment(); + + byte flags = network.Frame.FIRST_SEG; + + bool payload = method.HasPayload(); + if (!payload) + { + flags |= network.Frame.LAST_SEG; + } + + MemoryStream headerSeg = null; + if (payload) + { + Header hdr = method.Header; + Struct[] structs = hdr.Structs; + + foreach (Struct st in structs) + { + encoder.WriteStruct32(st); + } + headerSeg = encoder.Segment(); + } + + lock (_sendlock) + { + Fragment(flags, type, method, methodSeg); + if (payload) + { + Fragment( 0x0, SegmentType.HEADER, method, headerSeg); + Fragment(network.Frame.LAST_SEG, SegmentType.BODY, method, method.Body); + } + } + } + + #endregion + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/Frame.cs b/qpid/dotnet/client-010/client/transport/network/Frame.cs new file mode 100644 index 0000000000..b8ec36d8b6 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/Frame.cs @@ -0,0 +1,143 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; + +namespace org.apache.qpid.transport.network +{ + public sealed class Frame : INetworkEvent + { + internal static int HEADER_SIZE = 12; + + // XXX: enums? + public const byte L1 = 0; + public const byte L2 = 1; + public const byte L3 = 2; + public const byte L4 = 3; + + public static byte RESERVED = 0x0; + + public static byte VERSION = 0x0; + + public static byte FIRST_SEG = 0x8; + public static byte LAST_SEG = 0x4; + public static byte FIRST_FRAME = 0x2; + public static byte LAST_FRAME = 0x1; + + private readonly byte flags; + private readonly SegmentType type; + private readonly byte track; + private readonly int channel; + private readonly MemoryStream body; + private int _bodySize; + + + public Frame(byte flags, SegmentType type, byte track, int channel, int bodySize, + MemoryStream body) + { + this.flags = flags; + this.type = type; + this.track = track; + this.channel = channel; + this.body = body; + _bodySize = bodySize; + } + + public int BodySize + { + get { return _bodySize; } + } + + public MemoryStream Body + { + get { return body; } + } + + public byte Flags + { + get { return flags; } + } + + public int Channel + { + get { return channel; } + } + + public int Size + { + get { return (int) body.Length;} + } + + public SegmentType Type + { + get { return type; } + } + + public byte Track + { + get { return track; } + } + + private bool Flag(byte mask) + { + return (flags & mask) != 0; + } + + public bool IsFirstSegment() + { + return Flag(FIRST_SEG); + } + + public bool IsLastSegment() + { + return Flag(LAST_SEG); + } + + public bool IsFirstFrame() + { + return Flag(FIRST_FRAME); + } + + public bool IsLastFrame() + { + return Flag(LAST_FRAME); + } + + #region INetworkEvent Methods + + public void ProcessNetworkEvent(INetworkDelegate ndelegate) + { + ndelegate.Frame(this); + } + + #endregion + + public override String ToString() + { + return String.Format + ("[{0:d} {1:d} {2:d} {3} {4}{5}{6}{7}] ", Channel, Size, Track, Type, + IsFirstSegment() ? 1 : 0, IsLastSegment() ? 1 : 0, + IsFirstFrame() ? 1 : 0, IsLastFrame() ? 1 : 0); + } + + + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/IIoSender.cs b/qpid/dotnet/client-010/client/transport/network/IIoSender.cs new file mode 100644 index 0000000000..747b5b9f98 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/IIoSender.cs @@ -0,0 +1,28 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.network +{ + public interface IIOSender<T>:Sender<T> + { + void send(T body, int siz); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/INetworkDelegate.cs b/qpid/dotnet/client-010/client/transport/network/INetworkDelegate.cs new file mode 100644 index 0000000000..9226adc2b7 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/INetworkDelegate.cs @@ -0,0 +1,40 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using ProtocolError = org.apache.qpid.transport.ProtocolError; +using ProtocolHeader = org.apache.qpid.transport.ProtocolHeader; +namespace org.apache.qpid.transport.network +{ + + + /// <summary> + /// NetworkDelegate + /// </summary> + + public interface INetworkDelegate + { + + void Init(ProtocolHeader header); + + void Frame(Frame frame); + + void Error(ProtocolError error); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/INetworkEvent.cs b/qpid/dotnet/client-010/client/transport/network/INetworkEvent.cs new file mode 100644 index 0000000000..e6f0d6fc8a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/INetworkEvent.cs @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport.network +{ + + /// <summary> + /// INetworkEvent + /// </summary> + + public interface INetworkEvent + { + void ProcessNetworkEvent(INetworkDelegate networkDelegate); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/InputHandler.cs b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs new file mode 100644 index 0000000000..c5d5f13727 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/InputHandler.cs @@ -0,0 +1,266 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Text; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network +{ + /// <summary> + /// InputHandler + /// </summary> + public sealed class InputHandler : IReceiver<ReceivedPayload<INetworkEvent>> + { + public enum State + { + PROTO_HDR, + FRAME_HDR, + FRAME_BODY, + ERROR + } + + private static readonly Logger log = Logger.Get(typeof(InputHandler)); + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + public event EventHandler<ReceivedPayload<INetworkEvent>> ReceivedEvent; + public event EventHandler<ExceptionArgs> ExceptionProcessing; + + // Not in used... This even is never raised in the code => the application will block on Close() until the timeout is reached + public event EventHandler Closed; + + event EventHandler<ReceivedPayload<INetworkEvent>> IReceiver<ReceivedPayload<INetworkEvent>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedEvent += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedEvent -= value; + } + } + } + + event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<INetworkEvent>>.Exception + { + add + { + lock (m_objectLock) + { + ExceptionProcessing += value; + } + } + remove + { + lock (m_objectLock) + { + ExceptionProcessing -= value; + } + } + } + + private State state; + private MemoryStream input; + private int needed; + + private byte flags; + private SegmentType type; + private byte track; + private int channel; + + public InputHandler(State state) + { + this.state = state; + switch (state) + { + case State.PROTO_HDR: + needed = 8; + break; + case State.FRAME_HDR: + needed = Frame.HEADER_SIZE; + break; + } + } + + // The command listening for a buffer read. + public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload) + { + MemoryStream buf = payload.Payload; + int remaining = (int) buf.Length; + if( input != null ) + { + remaining += (int) input.Length; + } + try + { + while (remaining > 0) + { + if (remaining >= needed) + { + if (input != null) + { + byte[] tmp = new byte[buf.Length]; + buf.Read(tmp, 0, tmp.Length); + input.Write(tmp, 0, tmp.Length); + input.Seek(0, SeekOrigin.Begin); + buf = input; + } + int startPos = (int)buf.Position; + int consumed = needed; + state = Next(buf); + if ((buf.Position - startPos) < consumed) + { + buf.Seek(consumed - (buf.Position - startPos), SeekOrigin.Current); + } + remaining -= consumed; + input = null; + } + else + { + byte[] tmp; + if (input == null) + { + input = new MemoryStream(); + tmp = new byte[remaining]; + } + else + { + // this is a full buffer + tmp = new byte[buf.Length]; + } + buf.Read(tmp, 0, tmp.Length); + input.Write(tmp, 0, tmp.Length); + remaining = 0; + } + } + } + catch (Exception t) + { + Console.Write(t); + if (ExceptionProcessing != null) + { + ExceptionProcessing(this, new ExceptionArgs(t)); + } + } + } + + #region Private Support Functions + + private State Next(MemoryStream buf) + { + BinaryReader reader = new BinaryReader(buf); + + switch (state) + { + case State.PROTO_HDR: + char a = reader.ReadChar(); + char m = reader.ReadChar(); + char q = reader.ReadChar(); + char p = reader.ReadChar(); + if (a != 'A' && + m != 'M' && + q != 'Q' && + p != 'P') + { + Error("bad protocol header: {0}", buf.ToString()); + return State.ERROR; + } + reader.ReadByte(); + byte instance = reader.ReadByte(); + byte major = reader.ReadByte(); + byte minor = reader.ReadByte(); + Fire_NetworkEvent(new ProtocolHeader(instance, major, minor)); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + case State.FRAME_HDR: + reader = new BinaryReader(buf, Encoding.BigEndianUnicode); + flags = reader.ReadByte(); + type = SegmentTypeGetter.Get(reader.ReadByte()); // generated code + int size = reader.ReadUInt16(); + size = ByteEncoder.GetBigEndian((UInt16)size); + size -= Frame.HEADER_SIZE; + if (size < 0 || size > (64 * 1024 - 12)) + { + Error("bad frame size: {0:d}", size); + return State.ERROR; + } + reader.ReadByte(); + byte b = reader.ReadByte(); + if ((b & 0xF0) != 0) + { + Error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: {0}", b); + return State.ERROR; + } + track = (byte)(b & 0xF); + channel = reader.ReadUInt16(); + channel = ByteEncoder.GetBigEndian((UInt16)channel); + if (size == 0) + { + Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream())); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + } + needed = size; + return State.FRAME_BODY; + case State.FRAME_BODY: + Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf)); + needed = Frame.HEADER_SIZE; + return State.FRAME_HDR; + default: + if (ExceptionProcessing != null) + { + ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame"))); + } + throw new Exception("Error creating frame"); + } + } + + private void Error(String fmt, params Object[] args) + { + Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args)); + } + + private void Fire_NetworkEvent(INetworkEvent netevent) + { + log.Debug("InputHandler: network event:", netevent); + ReceivedPayload<INetworkEvent> payload = new ReceivedPayload<INetworkEvent>(); + payload.Payload = netevent; + if (ReceivedEvent != null) + { + ReceivedEvent(this, payload); + } + else + { + log.Debug("Nobody listening for event: {0}"); + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs b/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs new file mode 100644 index 0000000000..69598a43e8 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs @@ -0,0 +1,40 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using ProtocolError = org.apache.qpid.transport.ProtocolError; +using ProtocolHeader = org.apache.qpid.transport.ProtocolHeader; +namespace org.apache.qpid.transport.network +{ + + + /// <summary> + /// NetworkDelegate + /// </summary> + + public interface NetworkDelegate + { + + void Init(ProtocolHeader header); + + void Frame(Frame frame); + + void Error(ProtocolError error); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs b/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs new file mode 100644 index 0000000000..e5ac6de93a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport.network +{ + + /// <summary> + /// NetworkEvent + /// </summary> + + public interface NetworkEvent + { + void ProcessNetworkEvent(NetworkDelegate networkDelegate); + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/io/IIoSender.cs b/qpid/dotnet/client-010/client/transport/network/io/IIoSender.cs new file mode 100644 index 0000000000..acc7724a06 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IIoSender.cs @@ -0,0 +1,28 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.network +{ + public interface IIoSender<T>:ISender<T> + { + void Send(T body, int siz); + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs b/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs new file mode 100644 index 0000000000..41a09e7079 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System.IO; +using System.Net.Sockets; + +namespace org.apache.qpid.transport.network.io +{ + public interface IIoTransport + { + Connection Connection + { + get; + set; + } + + IReceiver<ReceivedPayload<MemoryStream>> Receiver + { + get; + set; + } + + IoSender Sender + { + get; + set; + } + + + Stream Stream + { + get; + set; + } + + TcpClient Socket + { + get; + set; + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs new file mode 100644 index 0000000000..b60444fa29 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs @@ -0,0 +1,185 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using System.IO; +using System.Threading; +using Logger = org.apache.qpid.transport.util.Logger; + + +namespace org.apache.qpid.transport.network.io +{ + /// <summary> + /// IoReceiver + /// </summary> + public sealed class IoReceiver : IReceiver<ReceivedPayload<MemoryStream>> + { + private static readonly Logger log = Logger.Get(typeof(IoReceiver)); + private readonly int m_bufferSize; + private readonly Stream m_bufStream; + private readonly int m_timeout; + private readonly Thread m_thread; + private bool m_closed; + private readonly Object m_objectLock = new object(); + + // the event raised when a buffer is read from the wire + event EventHandler<ReceivedPayload<MemoryStream>> ReceivedBuffer; + event EventHandler<ExceptionArgs> ExceptionReading; + event EventHandler ReceiverClosed; + + event EventHandler<ReceivedPayload<MemoryStream>> IReceiver<ReceivedPayload<MemoryStream>>.Received + { + add + { + lock (m_objectLock) + { + ReceivedBuffer += value; + } + } + remove + { + lock (m_objectLock) + { + ReceivedBuffer -= value; + } + } + } + + event EventHandler<ExceptionArgs> IReceiver<ReceivedPayload<MemoryStream>>.Exception + { + add + { + lock (m_objectLock) + { + ExceptionReading += value; + } + } + remove + { + lock (m_objectLock) + { + ExceptionReading -= value; + } + } + } + + event EventHandler IReceiver<ReceivedPayload<MemoryStream>>.Closed + { + add + { + lock (m_objectLock) + { + ReceiverClosed += value; + } + } + remove + { + lock (m_objectLock) + { + ReceiverClosed -= value; + } + } + } + + public IoReceiver(Stream stream, int bufferSize, int timeout) + { + m_bufferSize = bufferSize; + m_bufStream = stream; + m_timeout = timeout; + m_thread = new Thread(Go); + m_thread.Name = String.Format("IoReceiver - {0}", stream); + m_thread.IsBackground = true; + m_thread.Start(); + } + + public void Close() + { + Mutex mut = new Mutex(); + mut.WaitOne(); + if (!m_closed) + { + m_closed = true; + try + { + log.Debug("Receiver closing"); + m_bufStream.Close(); + m_thread.Join(m_timeout); + if (m_thread.IsAlive) + { + throw new TransportException("join timed out"); + } + } + catch (ThreadInterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + mut.ReleaseMutex(); + } + + void Go() + { + // create a BufferedStream on top of the NetworkStream. + int threshold = m_bufferSize/2; + byte[] buffer = new byte[m_bufferSize]; + try + { + int read; + int offset = 0; + ReceivedPayload<MemoryStream> payload = new ReceivedPayload<MemoryStream>(); + while ((read = m_bufStream.Read(buffer, offset, m_bufferSize - offset)) > 0) + { + MemoryStream memStream = new MemoryStream(buffer, offset, read); + if (ReceivedBuffer != null) + { + // call the event + payload.Payload = memStream; + ReceivedBuffer(this, payload); + } + offset += read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[m_bufferSize]; + } + } + log.Debug("Receiver thread terminating"); + } + catch (Exception t) + { + if (ExceptionReading != null) + { + ExceptionReading(this, new ExceptionArgs(t)); + } + } + finally + { + if (ReceiverClosed != null) + { + ReceiverClosed(this, new EventArgs()); + } + } + } + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs b/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs new file mode 100644 index 0000000000..b6c7940a1d --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoSSLTransport.cs @@ -0,0 +1,227 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; + +using org.apache.qpid.transport.util; +using org.apache.qpid.client; + +namespace org.apache.qpid.transport.network.io +{ + public sealed class IoSSLTransport : IIoTransport + { + // constants + private const int DEFAULT_READ_WRITE_BUFFER_SIZE = 64*1024; + private const int TIMEOUT = 60000; + private const int QUEUE_SIZE = 1000; + // props + private static readonly Logger log = Logger.Get(typeof (IoSSLTransport)); + private Stream m_stream; + private IoSender m_sender; + private IReceiver<ReceivedPayload<MemoryStream>> m_receiver; + private TcpClient m_socket; + private Connection m_con; + private readonly bool _rejectUntrusted; + + public static Connection Connect(String host, int port, String mechanism, X509Certificate certificate, bool rejectUntrusted, Client client) + { + ClientConnectionDelegate connectionDelegate = new ClientConnectionDelegate(client, string.Empty, string.Empty, mechanism); + ManualResetEvent negotiationComplete = new ManualResetEvent(true); + connectionDelegate.SetCondition(negotiationComplete); + connectionDelegate.VirtualHost = string.Empty; + + IIoTransport transport = new IoSSLTransport(host, port, certificate, rejectUntrusted, connectionDelegate); + + Connection _conn = transport.Connection; + _conn.Send(new ProtocolHeader(1, 0, 10)); + negotiationComplete.WaitOne(); + + if (connectionDelegate.Exception != null) + throw connectionDelegate.Exception; + + connectionDelegate.SetCondition(null); + + return _conn; + } + + public static Connection Connect(String host, int port, String virtualHost, String mechanism, string serverName, string certPath, String certPass, bool rejectUntrusted, Client client) + { + // create certificate object based on whether or not password is null + X509Certificate cert; + if (certPass != null) + { + cert = new X509Certificate2(certPath, certPass); + } + else + { + cert = X509Certificate.CreateFromCertFile(certPath); + } + + return Connect(host, port, mechanism, cert, rejectUntrusted, client); + } + + public IoSSLTransport(String host, int port, X509Certificate certificate, bool rejectUntrusted, ConnectionDelegate conndel) + { + _rejectUntrusted = rejectUntrusted; + CreateSocket(host, port); + CreateSSLStream(host, Socket, certificate); + Sender = new IoSender(this, QUEUE_SIZE, TIMEOUT); + Receiver = new IoReceiver(Stream, Socket.ReceiveBufferSize*2, TIMEOUT); + Assembler assembler = new Assembler(); + InputHandler inputHandler = new InputHandler(InputHandler.State.PROTO_HDR); + Connection = new Connection(assembler, new Disassembler(Sender, 64*1024 - 1), conndel); + // Input handler listen to Receiver events + Receiver.Received += inputHandler.On_ReceivedBuffer; + // Assembler listen to inputhandler events + inputHandler.ReceivedEvent += assembler.On_ReceivedEvent; + // Connection listen to asembler protocol event + Receiver.Closed += Connection.On_ReceivedClosed; + assembler.Closed += Connection.On_ReceivedClosed; + Receiver.Exception += Connection.On_ReceivedException; + inputHandler.ExceptionProcessing += Connection.On_ReceivedException; + assembler.ReceivedEvent += Connection.On_ReceivedEvent; + } + + public Connection Connection + { + get { return m_con; } + set { m_con = value; } + } + + public IReceiver<ReceivedPayload<MemoryStream>> Receiver + { + get { return m_receiver; } + set { m_receiver = value; } + } + + public IoSender Sender + { + get { return m_sender; } + set { m_sender = value; } + } + + + public Stream Stream + { + get { return m_stream; } + set { m_stream = value; } + } + + public TcpClient Socket + { + get { return m_socket; } + set { m_socket = value; } + } + + #region Private Support Functions + + private void CreateSocket(String host, int port) + { + TcpClient socket; + try + { + socket = new TcpClient(); + String noDelay = Environment.GetEnvironmentVariable("qpid.tcpNoDelay"); + String writeBufferSize = Environment.GetEnvironmentVariable("qpid.writeBufferSize"); + String readBufferSize = Environment.GetEnvironmentVariable("qpid.readBufferSize"); + socket.NoDelay = noDelay != null && bool.Parse(noDelay); + socket.ReceiveBufferSize = readBufferSize == null + ? DEFAULT_READ_WRITE_BUFFER_SIZE + : int.Parse(readBufferSize); + socket.SendBufferSize = writeBufferSize == null + ? DEFAULT_READ_WRITE_BUFFER_SIZE + : int.Parse(writeBufferSize); + + log.Debug("NoDelay : {0}", socket.NoDelay); + log.Debug("ReceiveBufferSize : {0}", socket.ReceiveBufferSize); + log.Debug("SendBufferSize : {0}", socket.SendBufferSize); + log.Debug("Openning connection with host : {0}; port: {1}", host, port); + + socket.Connect(host, port); + Socket = socket; + } + catch (Exception e) + { + throw new TransportException(string.Format("Error connecting to broker: {0}", e.Message)); + } + } + + private void CreateSSLStream(String host, TcpClient socket, X509Certificate certificate) + { + try + { + //Initializes a new instance of the SslStream class using the specified Stream, stream closure behavior, certificate validation delegate and certificate selection delegate + SslStream sslStream = new SslStream(socket.GetStream(), false, ValidateServerCertificate, LocalCertificateSelection); + + X509CertificateCollection certCol = new X509CertificateCollection(); + certCol.Add(certificate); + + sslStream.AuthenticateAsClient(host, certCol, SslProtocols.Default, true); + Stream = sslStream; + } + catch (AuthenticationException e) + { + log.Warn("Exception: {0}", e.Message); + if (e.InnerException != null) + { + log.Warn("Inner exception: {0}", e.InnerException.Message); + e = new AuthenticationException(e.InnerException.Message, e.InnerException); + } + socket.Close(); + throw new TransportException(string.Format("Authentication failed, closing connection to broker: {0}", e.Message)); + } + } + + // The following method is invoked by the RemoteCertificateValidationDelegate. + public bool ValidateServerCertificate( + object sender, + X509Certificate certificate, + X509Chain chain, + SslPolicyErrors sslPolicyErrors) + { + bool result = true; + if (sslPolicyErrors != SslPolicyErrors.None && _rejectUntrusted ) + { + log.Warn("Certificate error: {0}", sslPolicyErrors); + // Do not allow this client to communicate with unauthenticated servers. + result = false; + } + return result; + } + + public X509Certificate LocalCertificateSelection( + Object sender, + string targetHost, + X509CertificateCollection localCertificates, + X509Certificate remoteCertificate, + string[] acceptableIssuers + ) + { + // used to be return null; in the original version + return localCertificates[0]; + } + + #endregion + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs new file mode 100644 index 0000000000..025b782a12 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoSender.cs @@ -0,0 +1,137 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Threading; +using common.org.apache.qpid.transport.util; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network.io +{ + public sealed class IoSender : IIoSender<MemoryStream> + { + private static readonly Logger log = Logger.Get(typeof (IoReceiver)); + private readonly IIoTransport ioTransport; + private readonly Stream bufStream; + private bool closed; + private readonly Mutex mutClosed = new Mutex(); + private readonly CircularBuffer<byte[]> queue; + private readonly Thread thread; + private readonly int timeout; + private readonly MemoryStream _tobeSent = new MemoryStream(); + public IoSender(IIoTransport transport, int queueSize, int timeout) + { + this.timeout = timeout; + ioTransport = transport; + bufStream = transport.Stream; + queue = new CircularBuffer<byte[]>(queueSize); + thread = new Thread(Go); + log.Debug("Creating IoSender thread"); + thread.Name = String.Format("IoSender - {0}", transport.Socket) ; + thread.IsBackground = true; + thread.Start(); + } + + public void Send(MemoryStream str) + { + int pos = (int) str.Position; + str.Seek(0, SeekOrigin.Begin); + Send(str, pos); + } + + public void Send(MemoryStream str, int size) + { + mutClosed.WaitOne(); + if (closed) + { + throw new TransportException("sender is Closed"); + } + mutClosed.ReleaseMutex(); + byte[] buf = new byte[size]; + str.Read(buf, 0, size); + _tobeSent.Write(buf, 0, size); + } + + public void Flush() + { + int length = (int)_tobeSent.Position; + byte[] buf = new byte[length]; + _tobeSent.Seek(0, SeekOrigin.Begin); + _tobeSent.Read(buf, 0, length); + queue.Enqueue(buf); + // bufStream.Write(buf, 0, length); + // _tobeSent = new MemoryStream(); + // _writer.Write(buf, 0, length); + // _writer.Flush(); + _tobeSent.Seek(0, SeekOrigin.Begin); + } + + public void Close() + { + log.Debug("Closing Sender"); + mutClosed.WaitOne(); + if (!closed) + { + try + { + closed = true; + queue.Close(); + thread.Join(timeout); + if (thread.IsAlive) + { + throw new TransportException("join timed out"); + } + } + catch (ThreadInterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + mutClosed.ReleaseMutex(); + } + + private void Go() + { + while (! closed) + { + //MemoryStream st = queue.Dequeue(); + byte[] st = queue.Dequeue(); + if (st != null) + { + try + { + // int length = (int) st.Length; + // byte[] buf = new byte[length]; + // st.Read(buf, 0, length); + bufStream.Write(st, 0, st.Length); + } + catch (Exception e) + { + closed = true; + ioTransport.Connection.On_ReceivedException(this, new ExceptionArgs(e)); + } + } + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs b/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs new file mode 100644 index 0000000000..483e5428b8 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/network/io/IoTransport.cs @@ -0,0 +1,141 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +using System; +using System.IO; +using System.Net.Sockets; +using org.apache.qpid.transport.util; + +namespace org.apache.qpid.transport.network.io +{ + /// <summary> + /// This class provides a socket based transport using sync io classes. + /// + /// The following params are configurable via JVM arguments + /// TCP_NO_DELAY - qpid.tcpNoDelay + /// SO_RCVBUF - qpid.readBufferSize + /// SO_SNDBUF - qpid.writeBufferSize + /// </summary> + public sealed class IoTransport : IIoTransport + { + // constants + private const int DEFAULT_READ_WRITE_BUFFER_SIZE = 64*1024; + private const int TIMEOUT = 60000; + private const int QUEUE_SIZE = 1000; + // props + private static readonly Logger log = Logger.Get(typeof (IoTransport)); + private Stream m_stream; + private IoSender m_sender; + private IReceiver<ReceivedPayload<MemoryStream>> m_receiver; + private TcpClient m_socket; + private Connection m_con; + + public static Connection Connect(String host, int port, ConnectionDelegate conndel) + { + IoTransport transport = new IoTransport(host, port, conndel); + return transport.Connection; + } + + public IoTransport(String host, int port, ConnectionDelegate conndel) + { + CreateSocket(host, port); + Sender = new IoSender(this, QUEUE_SIZE, TIMEOUT); + Receiver = new IoReceiver(Stream, Socket.ReceiveBufferSize * 2, TIMEOUT); + Assembler assembler = new Assembler(); + InputHandler inputHandler = new InputHandler(InputHandler.State.PROTO_HDR); + Connection = new Connection(assembler, new Disassembler(Sender, 64 * 1024 - 1), conndel); + // Input handler listen to Receiver events + Receiver.Received += inputHandler.On_ReceivedBuffer; + // Assembler listen to inputhandler events + inputHandler.ReceivedEvent += assembler.On_ReceivedEvent; + // Connection listen to asembler protocol event + Receiver.Closed += Connection.On_ReceivedClosed; + assembler.Closed += Connection.On_ReceivedClosed; + Receiver.Exception += Connection.On_ReceivedException; + inputHandler.ExceptionProcessing += Connection.On_ReceivedException; + assembler.ReceivedEvent += Connection.On_ReceivedEvent; + } + + public Connection Connection + { + get { return m_con; } + set { m_con = value; } + } + + public IReceiver<ReceivedPayload<MemoryStream>> Receiver + { + get { return m_receiver; } + set { m_receiver = value; } + } + + public IoSender Sender + { + get { return m_sender; } + set { m_sender = value; } + } + + + public Stream Stream + { + get { return m_stream; } + set { m_stream = value; } + } + + public TcpClient Socket + { + get { return m_socket; } + set { m_socket = value; } + } + + #region Private Support Functions + + private void CreateSocket(String host, int port) + { + try + { + TcpClient socket = new TcpClient(); + String noDelay = Environment.GetEnvironmentVariable("qpid.tcpNoDelay"); + String writeBufferSize = Environment.GetEnvironmentVariable("qpid.writeBufferSize"); + String readBufferSize = Environment.GetEnvironmentVariable("qpid.readBufferSize"); + socket.NoDelay = noDelay != null && bool.Parse(noDelay); + socket.ReceiveBufferSize = readBufferSize == null ? DEFAULT_READ_WRITE_BUFFER_SIZE : int.Parse(readBufferSize); + socket.SendBufferSize = writeBufferSize == null ? DEFAULT_READ_WRITE_BUFFER_SIZE : int.Parse(writeBufferSize); + + log.Debug("NoDelay : {0}", socket.NoDelay); + log.Debug("ReceiveBufferSize : {0}", socket.ReceiveBufferSize); + log.Debug("SendBufferSize : {0}", socket.SendBufferSize); + log.Debug("Openning connection with host : {0}; port: {1}", host, port); + + socket.Connect(host, port); + Socket = socket; + Stream = socket.GetStream(); + } + catch (SocketException e) + { + Console.WriteLine(e.StackTrace); + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/qpid/dotnet/client-010/client/transport/util/ByteEncoder.cs b/qpid/dotnet/client-010/client/transport/util/ByteEncoder.cs new file mode 100644 index 0000000000..873ca75688 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/ByteEncoder.cs @@ -0,0 +1,218 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; + +namespace org.apache.qpid.transport.util +{ + public static class ByteEncoder + { + #region Endian conversion helper routines + /// <summary> + /// Returns the value encoded in Big Endian (PPC, XDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Big-endian encoded value.</returns> + public static Int32 GetBigEndian(Int32 value) + { + if (BitConverter.IsLittleEndian) + { + return SwapByteOrder(value); + } + return value; + } + + /// <summary> + /// Returns the value encoded in Big Endian (PPC, XDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Big-endian encoded value.</returns> + public static UInt16 GetBigEndian(UInt16 value) + { + if (BitConverter.IsLittleEndian) + { + return SwapByteOrder(value); + } + return value; + } + + /// <summary> + /// Returns the value encoded in Big Endian (PPC, XDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Big-endian encoded value.</returns> + public static UInt32 GetBigEndian(UInt32 value) + { + if (BitConverter.IsLittleEndian) + { + return SwapByteOrder(value); + } + return value; + } + + /// <summary> + /// Returns the value encoded in Big Endian (PPC, XDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Big-endian encoded value.</returns> + public static long GetBigEndian(long value) + { + if (BitConverter.IsLittleEndian) + { + return SwapByteOrder(value); + } + return value; + } + + public static double GetBigEndian(double value) + { + if (BitConverter.IsLittleEndian) + { + return SwapByteOrder(value); + } + return value; + } + + /// <summary> + /// Returns the value encoded in Little Endian (x86, NDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Little-endian encoded value.</returns> + public static Int32 GetLittleEndian(Int32 value) + { + if (BitConverter.IsLittleEndian) + { + return value; + } + return SwapByteOrder(value); + } + + /// <summary> + /// Returns the value encoded in Little Endian (x86, NDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Little-endian encoded value.</returns> + public static UInt32 GetLittleEndian(UInt32 value) + { + if (BitConverter.IsLittleEndian) + { + return value; + } + return SwapByteOrder(value); + } + + /// <summary> + /// Returns the value encoded in Little Endian (x86, NDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Little-endian encoded value.</returns> + public static UInt16 GetLittleEndian(UInt16 value) + { + if (BitConverter.IsLittleEndian) + { + return value; + } + return SwapByteOrder(value); + } + + /// <summary> + /// Returns the value encoded in Little Endian (x86, NDR) format. + /// </summary> + /// <param name="value">Value to encode.</param> + /// <returns>Little-endian encoded value.</returns> + public static long GetLittleEndian(long value) + { + if (BitConverter.IsLittleEndian) + { + return value; + } + return SwapByteOrder(value); + } + + public static double GetLittleEndian(double value) + { + if (BitConverter.IsLittleEndian) + { + return value; + } + return SwapByteOrder(value); + } + + /// <summary> + /// Swaps the Byte order of an <see cref="Int32"/>. + /// </summary> + /// <param name="value"><see cref="Int32"/> to swap the bytes of.</param> + /// <returns>Byte order swapped <see cref="Int32"/>.</returns> + private static Int32 SwapByteOrder(Int32 value) + { + Int32 swapped = (Int32)((0x000000FF) & (value >> 24) + | (0x0000FF00) & (value >> 8) + | (0x00FF0000) & (value << 8) + | (0xFF000000) & (value << 24)); + return swapped; + } + + /// <summary> + /// Swaps the byte order of a <see cref="UInt16"/>. + /// </summary> + /// <param name="value"><see cref="UInt16"/> to swap the bytes of.</param> + /// <returns>Byte order swapped <see cref="UInt16"/>.</returns> + private static UInt16 SwapByteOrder(UInt16 value) + { + return (UInt16)((0x00FF & (value >> 8)) + | (0xFF00 & (value << 8))); + } + + /// <summary> + /// Swaps the byte order of a <see cref="UInt32"/>. + /// </summary> + /// <param name="value"><see cref="UInt32"/> to swap the bytes of.</param> + /// <returns>Byte order swapped <see cref="UInt32"/>.</returns> + private static UInt32 SwapByteOrder(UInt32 value) + { + UInt32 swapped = ((0x000000FF) & (value >> 24) + | (0x0000FF00) & (value >> 8) + | (0x00FF0000) & (value << 8) + | (0xFF000000) & (value << 24)); + return swapped; + } + + /// <summary> + /// Swaps the byte order of a <see cref="Double"/> (double precision IEEE 754) + /// </summary> + /// <param name="value"><see cref="Double"/> to swap.</param> + /// <returns>Byte order swapped <see cref="Double"/> value.</returns> + private static long SwapByteOrder(long value) + { + Byte[] buffer = BitConverter.GetBytes(value); + Array.Reverse(buffer, 0, buffer.Length); + return BitConverter.ToInt64(buffer, 0); + } + + private static double SwapByteOrder(double value) + { + Byte[] buffer = BitConverter.GetBytes(value); + Array.Reverse(buffer, 0, buffer.Length); + return BitConverter.ToDouble(buffer,0) ; + } + #endregion + } + +} diff --git a/qpid/dotnet/client-010/client/transport/util/CircularBuffer.cs b/qpid/dotnet/client-010/client/transport/util/CircularBuffer.cs new file mode 100644 index 0000000000..00d7b20d4c --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/CircularBuffer.cs @@ -0,0 +1,132 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; +using System.Threading; + +namespace common.org.apache.qpid.transport.util +{ + public class CircularBuffer<T> + { + private readonly T[] buffer; + private Int32 nrp, nwp; + private readonly Int32 len; + private Int32 countValue; + private readonly Int32 add; + + + /// <summary> + /// Constructor creates N=len element + /// Circular Buffer that olds MemoryStream + /// </summary> + public CircularBuffer(Int32 len) + { + buffer = new T[len]; + this.len = len; + add = 1 - len; + nrp = 0; + nwp = 0; + countValue = 0; + } + + + public void Enqueue(T t) + { + lock (this) + { + if (countValue >= (len - 1)) + { + // wait for room to be available + Monitor.Wait(this); + } + bool notifyDequeue = countValue <= 0; + Load(t); + if (notifyDequeue) //notifyDequeue) + { + Monitor.PulseAll(this); + } + } + } + + + public T Dequeue() + { + lock (this) + { + if (countValue <= 0) + { + Monitor.Wait(this); + } + bool notifyEnqueue = countValue >= (len - 1); + T temp = Get(); + if (notifyEnqueue) //notifyEnqueue) + { + Monitor.PulseAll(this); + } + return temp; + } + } + + public void Close() + { + nrp = 0; + nwp = 0; + countValue = 0; + Array.Clear(buffer, 0, len); + lock (this) + { + Monitor.PulseAll(this); + } + } + + #region Private Support Functions + + private void Load(T t) + { + Int32 i = nwp; + buffer[i] = t; + i += add; + if (i < 0) i += len; + nwp = i; + UpdateCount(); + } + + private void UpdateCount() + { + countValue = nwp - nrp; + if (countValue <= 0 ) + countValue += len; // modulo buffer size + } + + private T Get() + { + Int32 i = nrp; + T temp = buffer[i]; + i += add; + if (i < 0) i += len; + nrp = i; + countValue--; + return (temp); + } + + #endregion + } +} diff --git a/qpid/dotnet/client-010/client/transport/util/Functions.cs b/qpid/dotnet/client-010/client/transport/util/Functions.cs new file mode 100644 index 0000000000..eee3848386 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/Functions.cs @@ -0,0 +1,41 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +namespace org.apache.qpid.transport.util +{ + + /// <summary> + /// Functions + /// </summary> + + public class Functions + { + public static sbyte Lsb(int i) + { + return (sbyte) (0xFF & i); + } + + public static sbyte Lsb(long l) + { + return (sbyte) (0xFF & l); + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/util/Logger.cs b/qpid/dotnet/client-010/client/transport/util/Logger.cs new file mode 100644 index 0000000000..f889fe2aab --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/Logger.cs @@ -0,0 +1,114 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +using System; +using log4net; + +namespace org.apache.qpid.transport.util +{ + + /// <summary> Logger + /// + /// </summary> + + public sealed class Logger + { + private readonly ILog log; + + public static Logger Get(Type type) + { + return new Logger(LogManager.GetLogger(type)); + } + + private Logger(ILog log) + { + this.log = log; + } + + public bool IsDebugEnabled() + { + return log.IsDebugEnabled; + } + + public void Debug(String message, params Object[] args) + { + if (log.IsDebugEnabled) + { + log.Debug(String.Format(message, args)); + } + } + + public void Debug(Exception t, String message, params Object[] args) + { + if (log.IsDebugEnabled) + { + log.Debug(String.Format(message, args), t); + } + } + + public void Error(String message, params Object[] args) + { + if (log.IsErrorEnabled) + { + log.Error(String.Format(message, args)); + } + } + + public void Error(Exception t, String message, params Object[] args) + { + if (log.IsErrorEnabled) + { + log.Error(String.Format(message, args), t); + } + } + + public void Warn(String message, params Object[] args) + { + if (log.IsWarnEnabled) + { + log.Warn(String.Format(message, args)); + } + } + + public void Warn(Exception t, String message, params Object[] args) + { + if (log.IsWarnEnabled) + { + log.Warn(String.Format(message, args), t); + } + } + + public void Info(String message, params Object[] args) + { + if (log.IsInfoEnabled) + { + log.Info(String.Format(message, args)); + } + } + + public void Info(Exception t, String message, params Object[] args) + { + if (log.IsInfoEnabled) + { + log.Info(String.Format(message, args), t); + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/util/ResultFuture.cs b/qpid/dotnet/client-010/client/transport/util/ResultFuture.cs new file mode 100644 index 0000000000..0de2b27656 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/ResultFuture.cs @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Threading; +using org.apache.qpid.transport; +using org.apache.qpid.transport.util; + +namespace common.org.apache.qpid.transport.util +{ + public class ResultFuture : IFuture + { + const long _timeout = 60000; + private Struct _result; + private Session _session; + private static readonly Logger log = Logger.Get(typeof(ResultFuture)); + + public Struct Get(long timeout) + { + lock (this) + { + DateTime start = DateTime.Now; + long elapsed = 0; + while (! _session.IsClosed && timeout - elapsed > 0 && _result == null) + { + log.Debug("{0} waiting for result: {1}", _session, this ); + Monitor.Wait(this, (int) (timeout - elapsed)); + elapsed = (long) (DateTime.Now.Subtract(start)).TotalMilliseconds; + } + } + if( _session.IsClosed ) + { + throw new SessionException(_session.GetExceptions()); + } + return _result; + } + + public Struct Result + { + get { return Get(_timeout); } + set + { + lock (this) + { + _result = value; + Monitor.PulseAll(this); + } + } + } + + public Session Session + { + set { _session = value; } + } + + public override String ToString() + { + return String.Format("Future({0})", _result); + } + + } +} diff --git a/qpid/dotnet/client-010/client/transport/util/Serial.cs b/qpid/dotnet/client-010/client/transport/util/Serial.cs new file mode 100644 index 0000000000..874097084a --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/Serial.cs @@ -0,0 +1,94 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +namespace org.apache.qpid.transport.util +{ + /// <summary> + /// This class provides basic serial number comparisons as defined in + /// RFC 1982. + /// </summary> + public class Serial + { + /// + /// + ///Compares two numbers using serial arithmetic. + /// + /// param s1 the first serial number + /// param s2 the second serial number + /// + /// return a negative integer, zero, or a positive integer as the + /// first argument is less than, equal to, or greater than the + /// second + /// + public static int Compare(int s1, int s2) + { + return s1 - s2; + } + + public static bool Lt(int s1, int s2) + { + return Compare(s1, s2) < 0; + } + + public static bool Le(int s1, int s2) + { + return Compare(s1, s2) <= 0; + } + + public static bool Gt(int s1, int s2) + { + return Compare(s1, s2) > 0; + } + + public static bool Ge(int s1, int s2) + { + return Compare(s1, s2) >= 0; + } + + public static bool Eq(int s1, int s2) + { + return s1 == s2; + } + + public static int Min(int s1, int s2) + { + if (Lt(s1, s2)) + { + return s1; + } + else + { + return s2; + } + } + + public static int Max(int s1, int s2) + { + if (Gt(s1, s2)) + { + return s1; + } + else + { + return s2; + } + } + } +} diff --git a/qpid/dotnet/client-010/client/transport/util/UUID.cs b/qpid/dotnet/client-010/client/transport/util/UUID.cs new file mode 100644 index 0000000000..07a3d267a5 --- /dev/null +++ b/qpid/dotnet/client-010/client/transport/util/UUID.cs @@ -0,0 +1,129 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +using System; + +namespace org.apache.qpid.transport.util +{ + public class UUID + { + private long _mostSigBits; + private long _leastSigBits; + private static readonly Random _random = new Random(); + private static readonly object _randomLock = new object(); + + + public UUID(long mostSigBits, long leastSigBits) + { + _mostSigBits = mostSigBits; + _leastSigBits = leastSigBits; + } + + public long MostSignificantBits + { + get { return _mostSigBits; } + set { _mostSigBits = value; } + } + + public long LeastSignificantBits + { + get { return _leastSigBits; } + set { _leastSigBits = value; } + } + + internal UUID(byte[] r) + { + MostSignificantBits = 0; + LeastSignificantBits = 0; + for (int i = 0; i < 8; i++) + MostSignificantBits = (MostSignificantBits << 8) | (r[i] & 0xff); + for (int i = 8; i < 16; i++) + LeastSignificantBits = (LeastSignificantBits << 8) | (r[i] & 0xff); + } + + public static UUID RandomUuid() + { + byte[] randomBytes = new byte[16]; + lock (_randomLock) + { + _random.NextBytes(randomBytes); + } + + randomBytes[6] &= 0x0f; + randomBytes[6] |= 0x40; + randomBytes[8] &= 0x3f; + randomBytes[8] |= 0x80; + + return new UUID(randomBytes); + } + + + public override String ToString() + { + return (Digits(_mostSigBits >> 32, 8) + "-" + + Digits(_mostSigBits >> 16, 4) + "-" + + Digits(_mostSigBits, 4) + "-" + + Digits(_leastSigBits >> 48, 4) + "-" + + Digits(_leastSigBits, 12)); + } + + private static String Digits(long val, int digits) + { + long hi = 1L << (digits * 4); + return Convert.ToString((hi | (val & (hi - 1))), 16); + } + + #region equality + public bool Equals(UUID other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return other._mostSigBits == _mostSigBits && other._leastSigBits == _leastSigBits; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != typeof (UUID)) return false; + return Equals((UUID) obj); + } + + public override int GetHashCode() + { + unchecked + { + return (_mostSigBits.GetHashCode()*397) ^ _leastSigBits.GetHashCode(); + } + } + + public static bool operator ==(UUID left, UUID right) + { + return Equals(left, right); + } + + public static bool operator !=(UUID left, UUID right) + { + return !Equals(left, right); + } + #endregion + } +} |