summaryrefslogtreecommitdiff
path: root/trunk/qpid/dotnet/client-010/client/transport/Session.cs
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/dotnet/client-010/client/transport/Session.cs')
-rw-r--r--trunk/qpid/dotnet/client-010/client/transport/Session.cs522
1 files changed, 0 insertions, 522 deletions
diff --git a/trunk/qpid/dotnet/client-010/client/transport/Session.cs b/trunk/qpid/dotnet/client-010/client/transport/Session.cs
deleted file mode 100644
index 7b4aff9811..0000000000
--- a/trunk/qpid/dotnet/client-010/client/transport/Session.cs
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Text;
-using System.Threading;
-using org.apache.qpid.transport.util;
-using Frame = org.apache.qpid.transport.network.Frame;
-using Logger = org.apache.qpid.transport.util.Logger;
-
-
-namespace org.apache.qpid.transport
-{
- /// <summary>
- /// Session
- ///
- /// </summary>
- public class Session : Invoker, ISession
- {
- private static readonly Logger log = Logger.Get(typeof (Session));
- private static readonly bool ENABLE_REPLAY;
-
- static Session()
- {
- const string enableReplay = "enable_command_replay";
- try
- {
- String var = Environment.GetEnvironmentVariable(enableReplay);
- if (var != null)
- {
- ENABLE_REPLAY = bool.Parse(var);
- }
- }
- catch (Exception)
- {
- ENABLE_REPLAY = false;
- }
- }
-
- private readonly byte[] _name;
- private const long _timeout = 600000;
- private bool _autoSync = false;
-
- // channel may be null
- private Channel _channel;
-
- // incoming command count
- private int _commandsIn = 0;
- // completed incoming commands
- private readonly Object _processedLock = new Object();
- private RangeSet _processed = new RangeSet();
- private int _maxProcessed = - 1;
- private int _syncPoint = -1;
-
- // outgoing command count
- private int _commandsOut = 0;
- private readonly Dictionary<int, Method> _commands = new Dictionary<int, Method>();
- private int _maxComplete = - 1;
- private bool _needSync = false;
- private bool _closed;
- private readonly Dictionary<int, IFuture> _results = new Dictionary<int, IFuture>();
- private readonly List<ExecutionException> _exceptions = new List<ExecutionException>();
-
-
- public bool IsClosed
- {
- get
- {
- lock (this)
- {
- return _closed;
- }
- }
- set
- {
- lock (this)
- {
- _closed = value;
- }
- }
- }
-
- public string Name
- {
- get
- {
- ASCIIEncoding enc = new ASCIIEncoding();
- return enc.GetString(_name);
- }
- }
-
- public Session(byte[] name)
- {
- _name = name;
- }
-
- public byte[] GetName()
- {
- return _name;
- }
-
- public void SetAutoSync(bool value)
- {
- lock (_commands)
- {
- _autoSync = value;
- }
- }
-
- public Dictionary<int, Method> GetOutstandingCommands()
- {
- return _commands;
- }
-
- public int GetCommandsOut()
- {
- return _commandsOut;
- }
-
- public int CommandsIn
- {
- get { return _commandsIn; }
- set { _commandsIn = value; }
- }
-
- public int NextCommandId()
- {
- return _commandsIn++;
- }
-
- public void Identify(Method cmd)
- {
- int id = NextCommandId();
- cmd.Id = id;
-
- if (log.IsDebugEnabled())
- {
- log.Debug("ID: [{0}] %{1}", _channel, id);
- }
-
- //if ((id % 65536) == 0)
- if ((id & 0xff) == 0)
- {
- FlushProcessed(Option.TIMELY_REPLY);
- }
- }
-
- public void Processed(Method command)
- {
- Processed(command.Id);
- }
-
- public void Processed(int command)
- {
- Processed(new Range(command, command));
- }
-
- public void Processed(int lower, int upper)
- {
- Processed(new Range(lower, upper));
- }
-
- public void Processed(Range range)
- {
- log.Debug("{0} processed({1})", this, range);
-
- bool flush;
- lock (_processedLock)
- {
- _processed.Add(range);
- Range first = _processed.GetFirst();
- int lower = first.Lower;
- int upper = first.Upper;
- int old = _maxProcessed;
- if (Serial.Le(lower, _maxProcessed + 1))
- {
- _maxProcessed = Serial.Max(_maxProcessed, upper);
- }
- flush = Serial.Lt(old, _syncPoint) && Serial.Ge(_maxProcessed, _syncPoint);
- _syncPoint = _maxProcessed;
- }
- if (flush)
- {
- FlushProcessed();
- }
- }
-
- public void FlushProcessed(params Option[] options)
- {
- RangeSet copy;
- lock (_processedLock)
- {
- copy = _processed.Copy();
- }
- SessionCompleted(copy, options);
- }
-
- public void KnownComplete(RangeSet kc)
- {
- lock (_processedLock)
- {
- RangeSet newProcessed = new RangeSet();
- foreach (Range pr in _processed)
- {
- foreach (Range kr in kc)
- {
- foreach (Range r in pr.Subtract(kr))
- {
- newProcessed.Add(r);
- }
- }
- }
- _processed = newProcessed;
- }
- }
-
- public void SyncPoint()
- {
- int id = CommandsIn - 1;
- log.Debug("{0} synced to {1}", this, id);
- bool flush;
- lock (_processedLock)
- {
- _syncPoint = id;
- flush = Serial.Ge(_maxProcessed, _syncPoint);
- }
- if (flush)
- {
- FlushProcessed();
- }
- }
-
- public void Attach(Channel channel)
- {
- _channel = channel;
- _channel.Session = this;
- }
-
- public Method GetCommand(int id)
- {
- lock (_commands)
- {
- return _commands[id];
- }
- }
-
- public bool Complete(int lower, int upper)
- {
- //avoid autoboxing
- if (log.IsDebugEnabled())
- {
- log.Debug("{0} complete({1}, {2})", this, lower, upper);
- }
- lock (_commands)
- {
- int old = _maxComplete;
- for (int id = Serial.Max(_maxComplete, lower); Serial.Le(id, upper); id++)
- {
- _commands.Remove(id);
- }
- if (Serial.Le(lower, _maxComplete + 1))
- {
- _maxComplete = Serial.Max(_maxComplete, upper);
- }
- log.Debug("{0} commands remaining: {1}", this, _commands);
- Monitor.PulseAll(_commands);
- return Serial.Gt(_maxComplete, old);
- }
- }
-
- protected override void Invoke(Method m)
- {
- if (IsClosed)
- {
- List<ExecutionException> exc = GetExceptions();
- if (exc.Count > 0)
- {
- throw new SessionException(exc);
- }
- else if (_close != null)
- {
- throw new ConnectionException(_close);
- }
- else
- {
- throw new SessionClosedException();
- }
- }
-
- if (m.EncodedTrack == Frame.L4)
- {
- lock (_commands)
- {
- int next = _commandsOut++;
- m.Id = next;
- if (next == 0)
- {
- SessionCommandPoint(0, 0);
- }
- if (ENABLE_REPLAY)
- {
- _commands.Add(next, m);
- }
- if (_autoSync)
- {
- m.Sync = true;
- }
- _needSync = ! m.Sync;
- _channel.Method(m);
- if (_autoSync)
- {
- Sync();
- }
-
- // flush every 64K commands to avoid ambiguity on
- // wraparound
- if ((next%65536) == 0)
- {
- SessionFlush(Option.COMPLETED);
- }
- }
- }
- else
- {
- _channel.Method(m);
- }
- }
-
- public void Sync()
- {
- Sync(_timeout);
- }
-
- public void Sync(long timeout)
- {
- log.Debug("{0} sync()", this);
- lock (_commands)
- {
- int point = _commandsOut - 1;
-
- if (_needSync && Serial.Lt(_maxComplete, point))
- {
- ExecutionSync(Option.SYNC);
- }
-
- DateTime start = DateTime.Now;
- long elapsed = 0;
-
- while (!IsClosed && elapsed < timeout && Serial.Lt(_maxComplete, point))
- {
- log.Debug("{0} waiting for[{1}]: {2}, {3}", this, point,
- _maxComplete, _commands);
- Monitor.Wait(_commands, (int) (timeout - elapsed));
- elapsed = DateTime.Now.Subtract(start).Milliseconds;
- }
-
- if (Serial.Lt(_maxComplete, point))
- {
- if (IsClosed)
- {
- throw new SessionException(GetExceptions());
- }
- else
- {
- throw new Exception
- (String.Format
- ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point));
- }
- }
- }
- }
-
-
- public void Result(int command, Struct result)
- {
- IFuture future;
- lock (_results)
- {
- if (_results.ContainsKey(command))
- {
- future = _results[command];
- _results.Remove(command);
- }
- else
- {
- throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result));
- }
- }
- future.Result = result;
- }
-
- public void AddException(ExecutionException exc)
- {
- lock (_exceptions)
- {
- _exceptions.Add(exc);
- }
- }
-
- private ConnectionClose _close = null;
-
- public void CloseCode(ConnectionClose close)
- {
- _close = close;
- }
-
- public List<ExecutionException> GetExceptions()
- {
- lock (_exceptions)
- {
- return new List<ExecutionException>(_exceptions);
- }
- }
-
- public override IFuture Invoke(Method m, IFuture future)
- {
- lock (_commands)
- {
- future.Session = this;
- int command = _commandsOut;
- lock (_results)
- {
- _results.Add(command, future);
- }
- Invoke(m);
- }
- return future;
- }
-
-
- public void MessageTransfer(String destination,
- MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- Header header,
- byte[] body,
- params Option[] options)
- {
- MemoryStream mbody = new MemoryStream();
- mbody.Write(body,0, body.Length);
- MessageTransfer(destination, acceptMode, acquireMode, header,
- mbody, options);
- }
-
- public void MessageTransfer(String destination,
- MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- Header header,
- String body,
- params Option[] options)
- {
- MessageTransfer(destination, acceptMode, acquireMode, header,
- new MemoryStream(Convert.ToByte(body)), options);
- }
-
- public void Close()
- {
- SessionRequestTimeout(0);
- SessionDetach(_name);
- lock (_commands)
- {
- DateTime start = DateTime.Now;
- long elapsed = 0;
-
- while (!IsClosed && elapsed < _timeout)
- {
- Monitor.Wait(_commands, (int) (_timeout - elapsed));
- elapsed = DateTime.Now.Subtract(start).Milliseconds;
- }
- }
- }
-
- public void Exception(Exception t)
- {
- log.Error(t, "Caught exception");
- }
-
- public void Closed()
- {
- IsClosed = true;
- lock (_commands)
- {
- Monitor.PulseAll(_commands);
- }
- lock (_results)
- {
- foreach (IFuture result in _results.Values)
- {
- lock (result)
- {
- Monitor.PulseAll(result);
- }
- }
- }
- _channel.Session = null;
- _channel = null;
- }
-
- public override String ToString()
- {
- return String.Format("session:{0}", _name);
- }
- }
-}