/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using org.apache.qpid.transport.util; using Frame = org.apache.qpid.transport.network.Frame; using Logger = org.apache.qpid.transport.util.Logger; namespace org.apache.qpid.transport { /// /// Session /// /// public class Session : Invoker, ISession { private static readonly Logger log = Logger.Get(typeof (Session)); private static readonly bool ENABLE_REPLAY; static Session() { const string enableReplay = "enable_command_replay"; try { String var = Environment.GetEnvironmentVariable(enableReplay); if (var != null) { ENABLE_REPLAY = bool.Parse(var); } } catch (Exception) { ENABLE_REPLAY = false; } } private readonly byte[] _name; private const long _timeout = 600000; private bool _autoSync = false; // channel may be null private Channel _channel; // incoming command count private int _commandsIn = 0; // completed incoming commands private readonly Object _processedLock = new Object(); private RangeSet _processed = new RangeSet(); private int _maxProcessed = - 1; private int _syncPoint = -1; // outgoing command count private int _commandsOut = 0; private readonly Dictionary _commands = new Dictionary(); private int _maxComplete = - 1; private bool _needSync = false; private bool _closed; private readonly Dictionary _results = new Dictionary(); private readonly List _exceptions = new List(); public bool IsClosed { get { lock (this) { return _closed; } } set { lock (this) { _closed = value; } } } public string Name { get { ASCIIEncoding enc = new ASCIIEncoding(); return enc.GetString(_name); } } public Session(byte[] name) { _name = name; } public byte[] GetName() { return _name; } public void SetAutoSync(bool value) { lock (_commands) { _autoSync = value; } } public Dictionary GetOutstandingCommands() { return _commands; } public int GetCommandsOut() { return _commandsOut; } public int CommandsIn { get { return _commandsIn; } set { _commandsIn = value; } } public int NextCommandId() { return _commandsIn++; } public void Identify(Method cmd) { int id = NextCommandId(); cmd.Id = id; if (log.IsDebugEnabled()) { log.Debug("ID: [{0}] %{1}", _channel, id); } //if ((id % 65536) == 0) if ((id & 0xff) == 0) { FlushProcessed(Option.TIMELY_REPLY); } } public void Processed(Method command) { Processed(command.Id); } public void Processed(int command) { Processed(new Range(command, command)); } public void Processed(int lower, int upper) { Processed(new Range(lower, upper)); } public void Processed(Range range) { log.Debug("{0} processed({1})", this, range); bool flush; lock (_processedLock) { _processed.Add(range); Range first = _processed.GetFirst(); int lower = first.Lower; int upper = first.Upper; int old = _maxProcessed; if (Serial.Le(lower, _maxProcessed + 1)) { _maxProcessed = Serial.Max(_maxProcessed, upper); } flush = Serial.Lt(old, _syncPoint) && Serial.Ge(_maxProcessed, _syncPoint); _syncPoint = _maxProcessed; } if (flush) { FlushProcessed(); } } public void FlushProcessed(params Option[] options) { RangeSet copy; lock (_processedLock) { copy = _processed.Copy(); } SessionCompleted(copy, options); } public void KnownComplete(RangeSet kc) { lock (_processedLock) { RangeSet newProcessed = new RangeSet(); foreach (Range pr in _processed) { foreach (Range kr in kc) { foreach (Range r in pr.Subtract(kr)) { newProcessed.Add(r); } } } _processed = newProcessed; } } public void SyncPoint() { int id = CommandsIn - 1; log.Debug("{0} synced to {1}", this, id); bool flush; lock (_processedLock) { _syncPoint = id; flush = Serial.Ge(_maxProcessed, _syncPoint); } if (flush) { FlushProcessed(); } } public void Attach(Channel channel) { _channel = channel; _channel.Session = this; } public Method GetCommand(int id) { lock (_commands) { return _commands[id]; } } public bool Complete(int lower, int upper) { //avoid autoboxing if (log.IsDebugEnabled()) { log.Debug("{0} complete({1}, {2})", this, lower, upper); } lock (_commands) { int old = _maxComplete; for (int id = Serial.Max(_maxComplete, lower); Serial.Le(id, upper); id++) { _commands.Remove(id); } if (Serial.Le(lower, _maxComplete + 1)) { _maxComplete = Serial.Max(_maxComplete, upper); } log.Debug("{0} commands remaining: {1}", this, _commands); Monitor.PulseAll(_commands); return Serial.Gt(_maxComplete, old); } } protected override void Invoke(Method m) { if (IsClosed) { List exc = GetExceptions(); if (exc.Count > 0) { throw new SessionException(exc); } else if (_close != null) { throw new ConnectionException(_close); } else { throw new SessionClosedException(); } } if (m.EncodedTrack == Frame.L4) { lock (_commands) { int next = _commandsOut++; m.Id = next; if (next == 0) { SessionCommandPoint(0, 0); } if (ENABLE_REPLAY) { _commands.Add(next, m); } if (_autoSync) { m.Sync = true; } _needSync = ! m.Sync; _channel.Method(m); if (_autoSync) { Sync(); } // flush every 64K commands to avoid ambiguity on // wraparound if ((next%65536) == 0) { SessionFlush(Option.COMPLETED); } } } else { _channel.Method(m); } } public void Sync() { Sync(_timeout); } public void Sync(long timeout) { log.Debug("{0} sync()", this); lock (_commands) { int point = _commandsOut - 1; if (_needSync && Serial.Lt(_maxComplete, point)) { ExecutionSync(Option.SYNC); } DateTime start = DateTime.Now; long elapsed = 0; while (!IsClosed && elapsed < timeout && Serial.Lt(_maxComplete, point)) { log.Debug("{0} waiting for[{1}]: {2}, {3}", this, point, _maxComplete, _commands); Monitor.Wait(_commands, (int) (timeout - elapsed)); elapsed = DateTime.Now.Subtract(start).Milliseconds; } if (Serial.Lt(_maxComplete, point)) { if (IsClosed) { throw new SessionException(GetExceptions()); } else { throw new Exception (String.Format ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point)); } } } } public void Result(int command, Struct result) { IFuture future; lock (_results) { if (_results.ContainsKey(command)) { future = _results[command]; _results.Remove(command); } else { throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result)); } } future.Result = result; } public void AddException(ExecutionException exc) { lock (_exceptions) { _exceptions.Add(exc); } } private ConnectionClose _close = null; public void CloseCode(ConnectionClose close) { _close = close; } public List GetExceptions() { lock (_exceptions) { return new List(_exceptions); } } public override IFuture Invoke(Method m, IFuture future) { lock (_commands) { future.Session = this; int command = _commandsOut; lock (_results) { _results.Add(command, future); } Invoke(m); } return future; } public void MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, byte[] body, params Option[] options) { MemoryStream mbody = new MemoryStream(); mbody.Write(body,0, body.Length); MessageTransfer(destination, acceptMode, acquireMode, header, mbody, options); } public void MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, String body, params Option[] options) { MessageTransfer(destination, acceptMode, acquireMode, header, new MemoryStream(Convert.ToByte(body)), options); } public void Close() { SessionRequestTimeout(0); SessionDetach(_name); lock (_commands) { DateTime start = DateTime.Now; long elapsed = 0; while (!IsClosed && elapsed < _timeout) { Monitor.Wait(_commands, (int) (_timeout - elapsed)); elapsed = DateTime.Now.Subtract(start).Milliseconds; } } } public void Exception(Exception t) { log.Error(t, "Caught exception"); } public void Closed() { IsClosed = true; lock (_commands) { Monitor.PulseAll(_commands); } lock (_results) { foreach (IFuture result in _results.Values) { lock (result) { Monitor.PulseAll(result); } } } _channel.Session = null; _channel = null; } public override String ToString() { return String.Format("session:{0}", _name); } } }