/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using org.apache.qpid.transport.util; using Frame = org.apache.qpid.transport.network.Frame; using Logger = org.apache.qpid.transport.util.Logger; namespace org.apache.qpid.transport { /// /// Session /// /// public class Session : Invoker { private static readonly Logger log = Logger.get(typeof (Session)); private static readonly bool ENABLE_REPLAY; static Session() { const string enableReplay = "enable_command_replay"; try { String var = Environment.GetEnvironmentVariable(enableReplay); if (var != null) { ENABLE_REPLAY = bool.Parse(var); } } catch (Exception) { ENABLE_REPLAY = false; } } private readonly byte[] _name; private const long _timeout = 600000; private bool _autoSync = false; // channel may be null private Channel _channel; // incoming command count private int _commandsIn = 0; // completed incoming commands private readonly Object _processedLock = new Object(); private RangeSet _processed = new RangeSet(); private int _maxProcessed = - 1; private int _syncPoint = -1; // outgoing command count private int _commandsOut = 0; private readonly Dictionary _commands = new Dictionary(); private int _maxComplete = - 1; private bool _needSync = false; private bool _closed; private readonly Dictionary _results = new Dictionary(); private readonly List _exceptions = new List(); public bool Closed { get { lock (this) { return _closed; } } set { lock (this) { _closed = value; } } } public string Name { get { ASCIIEncoding enc = new ASCIIEncoding(); return enc.GetString(_name); } } public Session(byte[] name) { _name = name; } public byte[] getName() { return _name; } public void setAutoSync(bool value) { lock (_commands) { _autoSync = value; } } public Dictionary getOutstandingCommands() { return _commands; } public int getCommandsOut() { return _commandsOut; } public int CommandsIn { get { return _commandsIn; } set { _commandsIn = value; } } public int nextCommandId() { return _commandsIn++; } public void identify(Method cmd) { int id = nextCommandId(); cmd.Id = id; if (log.isDebugEnabled()) { log.debug("ID: [{0}] %{1}", _channel, id); } //if ((id % 65536) == 0) if ((id & 0xff) == 0) { flushProcessed(Option.TIMELY_REPLY); } } public void processed(Method command) { processed(command.Id); } public void processed(int command) { processed(new Range(command, command)); } public void processed(int lower, int upper) { processed(new Range(lower, upper)); } public void processed(Range range) { log.debug("{0} processed({1})", this, range); bool flush; lock (_processedLock) { _processed.add(range); Range first = _processed.getFirst(); int lower = first.Lower; int upper = first.Upper; int old = _maxProcessed; if (Serial.le(lower, _maxProcessed + 1)) { _maxProcessed = Serial.max(_maxProcessed, upper); } flush = Serial.lt(old, _syncPoint) && Serial.ge(_maxProcessed, _syncPoint); _syncPoint = _maxProcessed; } if (flush) { flushProcessed(); } } public void flushProcessed(params Option[] options) { RangeSet copy; lock (_processedLock) { copy = _processed.copy(); } sessionCompleted(copy, options); } public void knownComplete(RangeSet kc) { lock (_processedLock) { RangeSet newProcessed = new RangeSet(); foreach (Range pr in _processed) { foreach (Range kr in kc) { foreach (Range r in pr.subtract(kr)) { newProcessed.add(r); } } } _processed = newProcessed; } } public void syncPoint() { int id = CommandsIn - 1; log.debug("{0} synced to {1}", this, id); bool flush; lock (_processedLock) { _syncPoint = id; flush = Serial.ge(_maxProcessed, _syncPoint); } if (flush) { flushProcessed(); } } public void attach(Channel channel) { _channel = channel; _channel.Session = this; } public Method getCommand(int id) { lock (_commands) { return _commands[id]; } } public bool complete(int lower, int upper) { //avoid autoboxing if (log.isDebugEnabled()) { log.debug("{0} complete({1}, {2})", this, lower, upper); } lock (_commands) { int old = _maxComplete; for (int id = Serial.max(_maxComplete, lower); Serial.le(id, upper); id++) { _commands.Remove(id); } if (Serial.le(lower, _maxComplete + 1)) { _maxComplete = Serial.max(_maxComplete, upper); } log.debug("{0} commands remaining: {1}", this, _commands); Monitor.PulseAll(_commands); return Serial.gt(_maxComplete, old); } } protected override void invoke(Method m) { if (Closed) { List exc = getExceptions(); if (exc.Count > 0) { throw new SessionException(exc); } else if (_close != null) { throw new ConnectionException(_close); } else { throw new SessionClosedException(); } } if (m.EncodedTrack == Frame.L4) { lock (_commands) { int next = _commandsOut++; m.Id = next; if (next == 0) { sessionCommandPoint(0, 0); } if (ENABLE_REPLAY) { _commands.Add(next, m); } if (_autoSync) { m.Sync = true; } _needSync = ! m.Sync; _channel.method(m); if (_autoSync) { sync(); } // flush every 64K commands to avoid ambiguity on // wraparound if ((next%65536) == 0) { sessionFlush(Option.COMPLETED); } } } else { _channel.method(m); } } public void sync() { sync(_timeout); } public void sync(long timeout) { log.debug("{0} sync()", this); lock (_commands) { int point = _commandsOut - 1; if (_needSync && Serial.lt(_maxComplete, point)) { executionSync(Option.SYNC); } DateTime start = DateTime.Now; long elapsed = 0; while (! Closed && elapsed < timeout && Serial.lt(_maxComplete, point)) { log.debug("{0} waiting for[{1}]: {2}, {3}", this, point, _maxComplete, _commands); Monitor.Wait(_commands, (int) (timeout - elapsed)); elapsed = DateTime.Now.Subtract(start).Milliseconds; } if (Serial.lt(_maxComplete, point)) { if (Closed) { throw new SessionException(getExceptions()); } else { throw new Exception (String.Format ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point)); } } } } public void result(int command, Struct result) { Future future; lock (_results) { if (_results.ContainsKey(command)) { future = _results[command]; _results.Remove(command); } else { throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result)); } } future.Result = result; } public void addException(ExecutionException exc) { lock (_exceptions) { _exceptions.Add(exc); } } private ConnectionClose _close = null; public void closeCode(ConnectionClose close) { _close = close; } public List getExceptions() { lock (_exceptions) { return new List(_exceptions); } } public override Future invoke(Method m, Future future) { lock (_commands) { future.Session = this; int command = _commandsOut; lock (_results) { _results.Add(command, future); } invoke(m); } return future; } public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, byte[] body, params Option[] options) { MemoryStream mbody = new MemoryStream(); mbody.Write(body,0, body.Length); messageTransfer(destination, acceptMode, acquireMode, header, mbody, options); } public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, String body, params Option[] options) { messageTransfer(destination, acceptMode, acquireMode, header, new MemoryStream(Convert.ToByte(body)), options); } public void close() { sessionRequestTimeout(0); sessionDetach(_name); lock (_commands) { DateTime start = DateTime.Now; long elapsed = 0; while (! Closed && elapsed < _timeout) { Monitor.Wait(_commands, (int) (_timeout - elapsed)); elapsed = DateTime.Now.Subtract(start).Milliseconds; } } } public void exception(Exception t) { log.error(t, "Caught exception"); } public void closed() { Closed = true; lock (_commands) { Monitor.PulseAll(_commands); } lock (_results) { foreach (Future result in _results.Values) { lock (result) { Monitor.PulseAll(result); } } } _channel.Session = null; _channel = null; } public String toString() { return String.Format("session:{0}", _name); } } }