summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/client/transport/Session.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/client-010/client/transport/Session.cs')
-rw-r--r--qpid/dotnet/client-010/client/transport/Session.cs522
1 files changed, 522 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/client/transport/Session.cs b/qpid/dotnet/client-010/client/transport/Session.cs
new file mode 100644
index 0000000000..7b4aff9811
--- /dev/null
+++ b/qpid/dotnet/client-010/client/transport/Session.cs
@@ -0,0 +1,522 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading;
+using org.apache.qpid.transport.util;
+using Frame = org.apache.qpid.transport.network.Frame;
+using Logger = org.apache.qpid.transport.util.Logger;
+
+
+namespace org.apache.qpid.transport
+{
+ /// <summary>
+ /// Session
+ ///
+ /// </summary>
+ public class Session : Invoker, ISession
+ {
+ private static readonly Logger log = Logger.Get(typeof (Session));
+ private static readonly bool ENABLE_REPLAY;
+
+ static Session()
+ {
+ const string enableReplay = "enable_command_replay";
+ try
+ {
+ String var = Environment.GetEnvironmentVariable(enableReplay);
+ if (var != null)
+ {
+ ENABLE_REPLAY = bool.Parse(var);
+ }
+ }
+ catch (Exception)
+ {
+ ENABLE_REPLAY = false;
+ }
+ }
+
+ private readonly byte[] _name;
+ private const long _timeout = 600000;
+ private bool _autoSync = false;
+
+ // channel may be null
+ private Channel _channel;
+
+ // incoming command count
+ private int _commandsIn = 0;
+ // completed incoming commands
+ private readonly Object _processedLock = new Object();
+ private RangeSet _processed = new RangeSet();
+ private int _maxProcessed = - 1;
+ private int _syncPoint = -1;
+
+ // outgoing command count
+ private int _commandsOut = 0;
+ private readonly Dictionary<int, Method> _commands = new Dictionary<int, Method>();
+ private int _maxComplete = - 1;
+ private bool _needSync = false;
+ private bool _closed;
+ private readonly Dictionary<int, IFuture> _results = new Dictionary<int, IFuture>();
+ private readonly List<ExecutionException> _exceptions = new List<ExecutionException>();
+
+
+ public bool IsClosed
+ {
+ get
+ {
+ lock (this)
+ {
+ return _closed;
+ }
+ }
+ set
+ {
+ lock (this)
+ {
+ _closed = value;
+ }
+ }
+ }
+
+ public string Name
+ {
+ get
+ {
+ ASCIIEncoding enc = new ASCIIEncoding();
+ return enc.GetString(_name);
+ }
+ }
+
+ public Session(byte[] name)
+ {
+ _name = name;
+ }
+
+ public byte[] GetName()
+ {
+ return _name;
+ }
+
+ public void SetAutoSync(bool value)
+ {
+ lock (_commands)
+ {
+ _autoSync = value;
+ }
+ }
+
+ public Dictionary<int, Method> GetOutstandingCommands()
+ {
+ return _commands;
+ }
+
+ public int GetCommandsOut()
+ {
+ return _commandsOut;
+ }
+
+ public int CommandsIn
+ {
+ get { return _commandsIn; }
+ set { _commandsIn = value; }
+ }
+
+ public int NextCommandId()
+ {
+ return _commandsIn++;
+ }
+
+ public void Identify(Method cmd)
+ {
+ int id = NextCommandId();
+ cmd.Id = id;
+
+ if (log.IsDebugEnabled())
+ {
+ log.Debug("ID: [{0}] %{1}", _channel, id);
+ }
+
+ //if ((id % 65536) == 0)
+ if ((id & 0xff) == 0)
+ {
+ FlushProcessed(Option.TIMELY_REPLY);
+ }
+ }
+
+ public void Processed(Method command)
+ {
+ Processed(command.Id);
+ }
+
+ public void Processed(int command)
+ {
+ Processed(new Range(command, command));
+ }
+
+ public void Processed(int lower, int upper)
+ {
+ Processed(new Range(lower, upper));
+ }
+
+ public void Processed(Range range)
+ {
+ log.Debug("{0} processed({1})", this, range);
+
+ bool flush;
+ lock (_processedLock)
+ {
+ _processed.Add(range);
+ Range first = _processed.GetFirst();
+ int lower = first.Lower;
+ int upper = first.Upper;
+ int old = _maxProcessed;
+ if (Serial.Le(lower, _maxProcessed + 1))
+ {
+ _maxProcessed = Serial.Max(_maxProcessed, upper);
+ }
+ flush = Serial.Lt(old, _syncPoint) && Serial.Ge(_maxProcessed, _syncPoint);
+ _syncPoint = _maxProcessed;
+ }
+ if (flush)
+ {
+ FlushProcessed();
+ }
+ }
+
+ public void FlushProcessed(params Option[] options)
+ {
+ RangeSet copy;
+ lock (_processedLock)
+ {
+ copy = _processed.Copy();
+ }
+ SessionCompleted(copy, options);
+ }
+
+ public void KnownComplete(RangeSet kc)
+ {
+ lock (_processedLock)
+ {
+ RangeSet newProcessed = new RangeSet();
+ foreach (Range pr in _processed)
+ {
+ foreach (Range kr in kc)
+ {
+ foreach (Range r in pr.Subtract(kr))
+ {
+ newProcessed.Add(r);
+ }
+ }
+ }
+ _processed = newProcessed;
+ }
+ }
+
+ public void SyncPoint()
+ {
+ int id = CommandsIn - 1;
+ log.Debug("{0} synced to {1}", this, id);
+ bool flush;
+ lock (_processedLock)
+ {
+ _syncPoint = id;
+ flush = Serial.Ge(_maxProcessed, _syncPoint);
+ }
+ if (flush)
+ {
+ FlushProcessed();
+ }
+ }
+
+ public void Attach(Channel channel)
+ {
+ _channel = channel;
+ _channel.Session = this;
+ }
+
+ public Method GetCommand(int id)
+ {
+ lock (_commands)
+ {
+ return _commands[id];
+ }
+ }
+
+ public bool Complete(int lower, int upper)
+ {
+ //avoid autoboxing
+ if (log.IsDebugEnabled())
+ {
+ log.Debug("{0} complete({1}, {2})", this, lower, upper);
+ }
+ lock (_commands)
+ {
+ int old = _maxComplete;
+ for (int id = Serial.Max(_maxComplete, lower); Serial.Le(id, upper); id++)
+ {
+ _commands.Remove(id);
+ }
+ if (Serial.Le(lower, _maxComplete + 1))
+ {
+ _maxComplete = Serial.Max(_maxComplete, upper);
+ }
+ log.Debug("{0} commands remaining: {1}", this, _commands);
+ Monitor.PulseAll(_commands);
+ return Serial.Gt(_maxComplete, old);
+ }
+ }
+
+ protected override void Invoke(Method m)
+ {
+ if (IsClosed)
+ {
+ List<ExecutionException> exc = GetExceptions();
+ if (exc.Count > 0)
+ {
+ throw new SessionException(exc);
+ }
+ else if (_close != null)
+ {
+ throw new ConnectionException(_close);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
+ }
+
+ if (m.EncodedTrack == Frame.L4)
+ {
+ lock (_commands)
+ {
+ int next = _commandsOut++;
+ m.Id = next;
+ if (next == 0)
+ {
+ SessionCommandPoint(0, 0);
+ }
+ if (ENABLE_REPLAY)
+ {
+ _commands.Add(next, m);
+ }
+ if (_autoSync)
+ {
+ m.Sync = true;
+ }
+ _needSync = ! m.Sync;
+ _channel.Method(m);
+ if (_autoSync)
+ {
+ Sync();
+ }
+
+ // flush every 64K commands to avoid ambiguity on
+ // wraparound
+ if ((next%65536) == 0)
+ {
+ SessionFlush(Option.COMPLETED);
+ }
+ }
+ }
+ else
+ {
+ _channel.Method(m);
+ }
+ }
+
+ public void Sync()
+ {
+ Sync(_timeout);
+ }
+
+ public void Sync(long timeout)
+ {
+ log.Debug("{0} sync()", this);
+ lock (_commands)
+ {
+ int point = _commandsOut - 1;
+
+ if (_needSync && Serial.Lt(_maxComplete, point))
+ {
+ ExecutionSync(Option.SYNC);
+ }
+
+ DateTime start = DateTime.Now;
+ long elapsed = 0;
+
+ while (!IsClosed && elapsed < timeout && Serial.Lt(_maxComplete, point))
+ {
+ log.Debug("{0} waiting for[{1}]: {2}, {3}", this, point,
+ _maxComplete, _commands);
+ Monitor.Wait(_commands, (int) (timeout - elapsed));
+ elapsed = DateTime.Now.Subtract(start).Milliseconds;
+ }
+
+ if (Serial.Lt(_maxComplete, point))
+ {
+ if (IsClosed)
+ {
+ throw new SessionException(GetExceptions());
+ }
+ else
+ {
+ throw new Exception
+ (String.Format
+ ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point));
+ }
+ }
+ }
+ }
+
+
+ public void Result(int command, Struct result)
+ {
+ IFuture future;
+ lock (_results)
+ {
+ if (_results.ContainsKey(command))
+ {
+ future = _results[command];
+ _results.Remove(command);
+ }
+ else
+ {
+ throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result));
+ }
+ }
+ future.Result = result;
+ }
+
+ public void AddException(ExecutionException exc)
+ {
+ lock (_exceptions)
+ {
+ _exceptions.Add(exc);
+ }
+ }
+
+ private ConnectionClose _close = null;
+
+ public void CloseCode(ConnectionClose close)
+ {
+ _close = close;
+ }
+
+ public List<ExecutionException> GetExceptions()
+ {
+ lock (_exceptions)
+ {
+ return new List<ExecutionException>(_exceptions);
+ }
+ }
+
+ public override IFuture Invoke(Method m, IFuture future)
+ {
+ lock (_commands)
+ {
+ future.Session = this;
+ int command = _commandsOut;
+ lock (_results)
+ {
+ _results.Add(command, future);
+ }
+ Invoke(m);
+ }
+ return future;
+ }
+
+
+ public void MessageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ byte[] body,
+ params Option[] options)
+ {
+ MemoryStream mbody = new MemoryStream();
+ mbody.Write(body,0, body.Length);
+ MessageTransfer(destination, acceptMode, acquireMode, header,
+ mbody, options);
+ }
+
+ public void MessageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ String body,
+ params Option[] options)
+ {
+ MessageTransfer(destination, acceptMode, acquireMode, header,
+ new MemoryStream(Convert.ToByte(body)), options);
+ }
+
+ public void Close()
+ {
+ SessionRequestTimeout(0);
+ SessionDetach(_name);
+ lock (_commands)
+ {
+ DateTime start = DateTime.Now;
+ long elapsed = 0;
+
+ while (!IsClosed && elapsed < _timeout)
+ {
+ Monitor.Wait(_commands, (int) (_timeout - elapsed));
+ elapsed = DateTime.Now.Subtract(start).Milliseconds;
+ }
+ }
+ }
+
+ public void Exception(Exception t)
+ {
+ log.Error(t, "Caught exception");
+ }
+
+ public void Closed()
+ {
+ IsClosed = true;
+ lock (_commands)
+ {
+ Monitor.PulseAll(_commands);
+ }
+ lock (_results)
+ {
+ foreach (IFuture result in _results.Values)
+ {
+ lock (result)
+ {
+ Monitor.PulseAll(result);
+ }
+ }
+ }
+ _channel.Session = null;
+ _channel = null;
+ }
+
+ public override String ToString()
+ {
+ return String.Format("session:{0}", _name);
+ }
+ }
+}