/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.transport; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import static org.apache.qpid.transport.Option.COMPLETED; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.TIMELY_REPLY; import static org.apache.qpid.transport.Session.State.CLOSED; import static org.apache.qpid.transport.Session.State.CLOSING; import static org.apache.qpid.transport.Session.State.DETACHED; import static org.apache.qpid.transport.Session.State.NEW; import static org.apache.qpid.transport.Session.State.OPEN; import static org.apache.qpid.transport.Session.State.RESUMING; import static org.apache.qpid.util.Serial.ge; import static org.apache.qpid.util.Serial.gt; import static org.apache.qpid.util.Serial.le; import static org.apache.qpid.util.Serial.lt; import static org.apache.qpid.util.Serial.max; import static org.apache.qpid.util.Strings.toUTF8; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * Session * * @author Rafael H. Schloming */ public class Session extends SessionInvoker { private static final Logger log = Logger.get(Session.class); public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } static class DefaultSessionListener implements SessionListener { public void opened(Session ssn) {} public void resumed(Session ssn) {} public void message(Session ssn, MessageTransfer xfr) { log.info("message: %s", xfr); } public void exception(Session ssn, SessionException exc) { log.error(exc, "session exception"); } public void closed(Session ssn) {} } public static final int UNLIMITED_CREDIT = 0xFFFFFFFF; private Connection connection; private Binary name; private long expiry; private boolean closing; private int channel; private SessionDelegate delegate; private SessionListener listener = new DefaultSessionListener(); private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); private final long blockedSendTimeout = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE, ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); private long blockedSendReportingPeriod = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); private boolean autoSync = false; private boolean incomingInit; // incoming command count private int commandsIn; // completed incoming commands private final Object processedLock = new Object(); private RangeSet processed; private int maxProcessed; private int syncPoint; // outgoing command count private int commandsOut = 0; private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024); private Map commands = new HashMap(); private final Object commandsLock = new Object(); private int commandBytes = 0; private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; private boolean needSync = false; private State state = NEW; // transfer flow control private volatile boolean flowControl = false; private Semaphore credit = new Semaphore(0); private Thread resumer = null; private boolean transacted = false; private SessionDetachCode detachCode; private final Object stateLock = new Object(); private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); } protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this.connection = connection; this.delegate = delegate; this.name = name; this.expiry = expiry; this.closing = false; initReceiver(); } public Connection getConnection() { return connection; } public Binary getName() { return name; } void setExpiry(long expiry) { this.expiry = expiry; } protected void setClose(boolean close) { this.closing = close; } public int getChannel() { return channel; } void setChannel(int channel) { this.channel = channel; } public void setSessionListener(SessionListener listener) { if (listener == null) { this.listener = new DefaultSessionListener(); } else { this.listener = listener; } } public SessionListener getSessionListener() { return listener; } public void setAutoSync(boolean value) { synchronized (commandsLock) { this.autoSync = value; } } protected void setState(State state) { synchronized (commandsLock) { this.state = state; commandsLock.notifyAll(); } } protected State getState() { return this.state; } void setFlowControl(boolean value) { flowControl = value; } void addCredit(int value) { credit.release(value); } void drainCredit() { credit.drainPermits(); } void acquireCredit() { if (flowControl) { try { long wait = blockedSendTimeout > blockedSendReportingPeriod ? blockedSendReportingPeriod : blockedSendTimeout; long totalWait = 1L; while(totalWait <= blockedSendTimeout && !credit.tryAcquire(wait, TimeUnit.MILLISECONDS)) { totalWait+=wait; log.warn("Message send delayed by " + (totalWait)/1000 + "s due to broker enforced flow control"); } if(totalWait > blockedSendTimeout) { log.error("Message send failed due to timeout waiting on broker enforced flow control"); throw new SessionException ("timed out waiting for message credit"); } } catch (InterruptedException e) { throw new SessionException ("interrupted while waiting for credit", null, e); } } } private void initReceiver() { synchronized (processedLock) { incomingInit = false; processed = RangeSetFactory.createRangeSet(); } } void attach() { initReceiver(); sessionAttach(name.getBytes()); sessionRequestTimeout(0);//use expiry here only if/when session resume is supported } void resume() { _failoverRequired.set(false); synchronized (commandsLock) { attach(); for (int i = maxComplete + 1; lt(i, commandsOut); i++) { Method m = getCommand(i); if (m == null) { m = new ExecutionSync(); m.setId(i); } else if (m instanceof MessageTransfer) { MessageTransfer xfr = (MessageTransfer)m; Header header = xfr.getHeader(); if (header != null) { if (header.getDeliveryProperties() != null) { header.getDeliveryProperties().setRedelivered(true); } else { DeliveryProperties deliveryProps = new DeliveryProperties(); deliveryProps.setRedelivered(true); xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(), header.getNonStandardProperties())); } } else { DeliveryProperties deliveryProps = new DeliveryProperties(); deliveryProps.setRedelivered(true); xfr.setHeader(new Header(deliveryProps, null, null)); } } sessionCommandPoint(m.getId(), 0); send(m); } sessionCommandPoint(commandsOut, 0); sessionFlush(COMPLETED); resumer = Thread.currentThread(); state = RESUMING; if(isTransacted()) { txSelect(); } listener.resumed(this); resumer = null; } } private Method getCommand(int i) { return commands.get(i); } private void setCommand(int commandId, Method command) { commands.put(commandId, command); } private Method removeCommand(int id) { return commands.remove(id); } void dump() { synchronized (commandsLock) { TreeMap ordered = new TreeMap(commands); for (Method m : ordered.values()) { if (m != null) { log.debug("%s", m); } } } } final void commandPoint(int id) { synchronized (processedLock) { this.commandsIn = id; if (!incomingInit) { incomingInit = true; maxProcessed = commandsIn - 1; syncPoint = maxProcessed; } } } public int getCommandsOut() { return commandsOut; } public int getCommandsIn() { return commandsIn; } public int nextCommandId() { return commandsIn++; } final void identify(Method cmd) { if (!incomingInit) { throw new IllegalStateException(); } int id = nextCommandId(); cmd.setId(id); if(log.isDebugEnabled()) { log.debug("ID: [%s] %s", this.channel, id); } if ((id & 0xff) == 0) { flushProcessed(TIMELY_REPLY); } } public void processed(Method command) { processed(command.getId()); } public void processed(int command) { processed(command, command); } public void processed(Range range) { processed(range.getLower(), range.getUpper()); } public void processed(int lower, int upper) { if(log.isDebugEnabled()) { log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed); } boolean flush; synchronized (processedLock) { if(log.isDebugEnabled()) { log.debug("%s", processed); } if (ge(upper, commandsIn)) { throw new IllegalArgumentException ("range exceeds max received command-id: " + Range.newInstance(lower, upper)); } processed.add(lower, upper); Range first = processed.getFirst(); int flower = first.getLower(); int fupper = first.getUpper(); int old = maxProcessed; if (le(flower, maxProcessed + 1)) { maxProcessed = max(maxProcessed, fupper); } boolean synced = ge(maxProcessed, syncPoint); flush = lt(old, syncPoint) && synced; if (synced) { syncPoint = maxProcessed; } } if (flush) { flushProcessed(); } } void flushExpected() { RangeSet rs = RangeSetFactory.createRangeSet(); synchronized (processedLock) { if (incomingInit) { rs.add(commandsIn); } } sessionExpected(rs, null); } public void flushProcessed(Option ... options) { RangeSet copy; synchronized (processedLock) { copy = processed.copy(); } synchronized (commandsLock) { if (state == DETACHED || state == CLOSING || state == CLOSED) { return; } if (copy.size() > 0) { sessionCompleted(copy, options); } } } void knownComplete(RangeSet kc) { if (kc.size() > 0) { synchronized (processedLock) { processed.subtract(kc) ; } } } void syncPoint() { int id = getCommandsIn() - 1; log.debug("%s synced to %d", this, id); boolean flush; synchronized (processedLock) { syncPoint = id; flush = ge(maxProcessed, syncPoint); } if (flush) { flushProcessed(); } } protected boolean complete(int lower, int upper) { //avoid autoboxing if(log.isDebugEnabled()) { log.debug("%s complete(%d, %d)", this, lower, upper); } synchronized (commandsLock) { int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { Method m = removeCommand(id); if (m != null) { commandBytes -= m.getBodySize(); m.complete(); } } if (le(lower, maxComplete + 1)) { maxComplete = max(maxComplete, upper); } if(log.isDebugEnabled()) { log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); } commandsLock.notifyAll(); return gt(maxComplete, old); } } void received(Method m) { m.delegate(this, delegate); } private void send(Method m) { m.setChannel(channel); connection.send(m); if (!m.isBatch()) { connection.flush(); } } protected boolean isFull(int id) { return isCommandsFull(id) || isBytesFull(); } protected boolean isBytesFull() { return commandBytes >= byteLimit; } protected boolean isCommandsFull(int id) { return id - maxComplete >= commandLimit; } public void invoke(Method m) { invoke(m,(Runnable)null); } public void invoke(Method m, Runnable postIdSettingAction) { if (m.getEncodedTrack() == Frame.L4) { if (m.hasPayload()) { acquireCredit(); } synchronized (commandsLock) { if (state == DETACHED && m.isUnreliable()) { Thread current = Thread.currentThread(); if (!current.equals(resumer)) { return; } } if (state != OPEN && state != CLOSED && state != CLOSING) { Thread current = Thread.currentThread(); if (!current.equals(resumer) ) { Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && (state != OPEN && state != CLOSED)) { checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } } switch (state) { case OPEN: break; case RESUMING: Thread current = Thread.currentThread(); if (!current.equals(resumer)) { throw new SessionException ("timed out waiting for resume to finish"); } break; case CLOSING: case CLOSED: ExecutionException exc = getException(); if (exc != null) { throw new SessionException(exc); } else { throw new SessionClosedException(); } default: throw new SessionException (String.format ("timed out waiting for session to become open " + "(state=%s)", state)); } int next; next = commandsOut++; m.setId(next); if(postIdSettingAction != null) { postIdSettingAction.run(); } if (isFull(next)) { Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && isFull(next) && state != CLOSED) { if (state == OPEN || state == RESUMING) { try { sessionFlush(COMPLETED); } catch (SenderException e) { if (!closing) { // if expiry is > 0 then this will // happen again on resume log.error(e, "error sending flush (full replay buffer)"); } else { e.rethrow(); } } } checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } if (state == CLOSED) { ExecutionException exc = getException(); if (exc != null) { throw new SessionException(exc); } else { throw new SessionClosedException(); } } if (isFull(next)) { throw new SessionException("timed out waiting for completion"); } if (next == 0) { sessionCommandPoint(0, 0); } boolean replayTransfer = !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); if ((replayTransfer) || m.hasCompletionListener()) { setCommand(next, m); commandBytes += m.getBodySize(); } if (autoSync) { m.setSync(true); } needSync = !m.isSync(); try { send(m); } catch (SenderException e) { if (!closing) { // if we are not closing then this will happen // again on resume log.error(e, "error sending command"); } else { e.rethrow(); } } if (autoSync) { sync(); } // flush every 64K commands to avoid ambiguity on // wraparound if (shouldIssueFlush(next)) { try { sessionFlush(COMPLETED); } catch (SenderException e) { if (!closing) { // if expiry is > 0 then this will happen // again on resume log.error(e, "error sending flush (periodic)"); } else { e.rethrow(); } } } } } else { send(m); } } private void checkFailoverRequired(String message) { if (_failoverRequired.get()) { throw new SessionException(message); } } protected boolean shouldIssueFlush(int next) { return (next % 65536) == 0; } public void sync() { sync(timeout); } public void sync(long timeout) { log.debug("%s sync()", this); synchronized (commandsLock) { int point = commandsOut - 1; if (needSync && lt(maxComplete, point)) { executionSync(SYNC); } Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { checkFailoverRequired("Session sync was interrupted by failover."); if(log.isDebugEnabled()) { log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); } w.await(); } if (lt(maxComplete, point)) { if (state != CLOSED) { throw new SessionException( String.format("timed out waiting for sync: complete = %s, point = %s", maxComplete, point)); } else { ExecutionException ee = getException(); if (ee != null) { throw new SessionException(ee); } } } } } private Map> results = new HashMap>(); private ExecutionException exception = null; void result(int command, Struct result) { ResultFuture future; synchronized (results) { future = results.remove(command); } if (future != null) { future.set(result); } else { log.warn("Received a response to a command" + " that's no longer valid on the client side." + " [ command id : %s , result : %s ]",command, result); } } void setException(ExecutionException exc) { synchronized (results) { if (exception != null) { throw new IllegalStateException( String.format("too many exceptions: %s, %s", exception, exc)); } exception = exc; } } ExecutionException getException() { synchronized (results) { return exception; } } protected Future invoke(Method m, Class klass) { synchronized (commandsLock) { int command = commandsOut; ResultFuture future = new ResultFuture(klass); synchronized (results) { results.put(command, future); } invoke(m); return future; } } private class ResultFuture implements Future { private final Class klass; private T result; private ResultFuture(Class klass) { this.klass = klass; } private void set(Struct result) { synchronized (this) { this.result = klass.cast(result); notifyAll(); } } public T get(long timeout) { synchronized (this) { Waiter w = new Waiter(this, timeout); while (w.hasTime() && state != CLOSED && !isDone()) { checkFailoverRequired("Operation was interrupted by failover."); log.debug("%s waiting for result: %s", Session.this, this); w.await(); } } if (isDone()) { return result; } else if (state == CLOSED) { ExecutionException ex = getException(); if(ex == null) { throw new SessionClosedException(); } throw new SessionException(ex); } else { throw new SessionException( String.format("%s timed out waiting for result: %s", Session.this, this)); } } public T get() { return get(timeout); } public boolean isDone() { return result != null; } public String toString() { return String.format("Future(%s)", isDone() ? result : klass); } } public final void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, byte[] body, Option ... _options) { messageTransfer(destination, acceptMode, acquireMode, header, ByteBuffer.wrap(body), _options); } public final void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, String body, Option ... _options) { messageTransfer(destination, acceptMode, acquireMode, header, toUTF8(body), _options); } public void close() { if (log.isDebugEnabled()) { log.debug("Closing [%s] in state [%s]", this, state); } synchronized (commandsLock) { switch(state) { case DETACHED: state = CLOSED; delegate.closed(this); connection.removeSession(this); listener.closed(this); break; case CLOSED: break; default: state = CLOSING; setClose(true); sessionRequestTimeout(0); sessionDetach(name.getBytes()); awaitClose(); } } } protected void awaitClose() { Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED) { checkFailoverRequired("close() was interrupted by failover."); w.await(); } if (state != CLOSED) { throw new SessionException("close() timed out"); } } public void exception(Throwable t) { log.error(t, "caught exception"); } public void closed() { synchronized (commandsLock) { if (closing || getException() != null) { state = CLOSED; } else { state = DETACHED; } commandsLock.notifyAll(); synchronized (results) { for (ResultFuture result : results.values()) { synchronized(result) { result.notifyAll(); } } } if(state == CLOSED) { delegate.closed(this); } else { delegate.detached(this); } } if(state == CLOSED) { connection.removeSession(this); listener.closed(this); } } public boolean isClosing() { return state == CLOSED || state == CLOSING; } public String toString() { return String.format("ssn:%s", name); } public void setTransacted(boolean b) { this.transacted = b; } public boolean isTransacted(){ return transacted; } public void setDetachCode(SessionDetachCode dtc) { this.detachCode = dtc; } public SessionDetachCode getDetachCode() { return this.detachCode; } public void awaitOpen() { switch (state) { case NEW: synchronized(stateLock) { Waiter w = new Waiter(stateLock, timeout); while (w.hasTime() && state == NEW) { checkFailoverRequired("Session opening was interrupted by failover."); w.await(); } } if (state != OPEN) { throw new SessionException("Timed out waiting for Session to open"); } break; case DETACHED: case CLOSING: case CLOSED: throw new SessionException("Session closed"); default : break; } } public Object getStateLock() { return stateLock; } protected void notifyFailoverRequired() { //ensure any operations waiting are aborted to //prevent them waiting for timeout for 60 seconds //and possibly preventing failover proceeding _failoverRequired.set(true); synchronized (commandsLock) { commandsLock.notifyAll(); } synchronized (results) { for (ResultFuture result : results.values()) { synchronized(result) { result.notifyAll(); } } } } /** * An auxiliary method for test purposes only */ public boolean isFlowBlocked() { return flowControl && credit.availablePermits() == 0; } }