diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 1057 |
1 files changed, 1057 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java new file mode 100644 index 0000000000..862c37283b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -0,0 +1,1057 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +import static org.apache.qpid.transport.Option.COMPLETED; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.TIMELY_REPLY; +import static org.apache.qpid.transport.Session.State.CLOSED; +import static org.apache.qpid.transport.Session.State.CLOSING; +import static org.apache.qpid.transport.Session.State.DETACHED; +import static org.apache.qpid.transport.Session.State.NEW; +import static org.apache.qpid.transport.Session.State.OPEN; +import static org.apache.qpid.transport.Session.State.RESUMING; +import org.apache.qpid.transport.network.Frame; +import static org.apache.qpid.transport.util.Functions.mod; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; +import static org.apache.qpid.util.Serial.ge; +import static org.apache.qpid.util.Serial.gt; +import static org.apache.qpid.util.Serial.le; +import static org.apache.qpid.util.Serial.lt; +import static org.apache.qpid.util.Serial.max; +import static org.apache.qpid.util.Strings.toUTF8; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * Session + * + * @author Rafael H. Schloming + */ + +public class Session extends SessionInvoker +{ + + private static final Logger log = Logger.get(Session.class); + + public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } + + static class DefaultSessionListener implements SessionListener + { + + public void opened(Session ssn) {} + + public void resumed(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + log.info("message: %s", xfr); + } + + public void exception(Session ssn, SessionException exc) + { + log.error(exc, "session exception"); + } + + public void closed(Session ssn) {} + } + + public static final int UNLIMITED_CREDIT = 0xFFFFFFFF; + + private Connection connection; + private Binary name; + private long expiry; + private boolean closing; + private int channel; + private SessionDelegate delegate; + private SessionListener listener = new DefaultSessionListener(); + private long timeout = 60000; + private boolean autoSync = false; + + private boolean incomingInit; + // incoming command count + private int commandsIn; + // completed incoming commands + private final Object processedLock = new Object(); + private RangeSet processed; + private int maxProcessed; + private int syncPoint; + + // outgoing command count + private int commandsOut = 0; + private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private int commandBytes = 0; + private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); + private int maxComplete = commandsOut - 1; + private boolean needSync = false; + + private State state = NEW; + + // transfer flow control + private volatile boolean flowControl = false; + private Semaphore credit = new Semaphore(0); + + private Thread resumer = null; + private boolean transacted = false; + + protected Session(Connection connection, Binary name, long expiry) + { + this(connection, new SessionDelegate(), name, expiry); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) + { + this.connection = connection; + this.delegate = delegate; + this.name = name; + this.expiry = expiry; + this.closing = false; + initReceiver(); + } + + public Connection getConnection() + { + return connection; + } + + public Binary getName() + { + return name; + } + + void setExpiry(long expiry) + { + this.expiry = expiry; + } + + void setClose(boolean close) + { + this.closing = close; + } + + public int getChannel() + { + return channel; + } + + void setChannel(int channel) + { + this.channel = channel; + } + + public void setSessionListener(SessionListener listener) + { + if (listener == null) + { + this.listener = new DefaultSessionListener(); + } + else + { + this.listener = listener; + } + } + + public SessionListener getSessionListener() + { + return listener; + } + + public void setAutoSync(boolean value) + { + synchronized (commands) + { + this.autoSync = value; + } + } + + protected void setState(State state) + { + synchronized (commands) + { + this.state = state; + commands.notifyAll(); + } + } + + void setFlowControl(boolean value) + { + flowControl = value; + } + + void addCredit(int value) + { + credit.release(value); + } + + void drainCredit() + { + credit.drainPermits(); + } + + void acquireCredit() + { + if (flowControl) + { + try + { + if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS)) + { + throw new SessionException + ("timed out waiting for message credit"); + } + } + catch (InterruptedException e) + { + throw new SessionException + ("interrupted while waiting for credit", null, e); + } + } + } + + private void initReceiver() + { + synchronized (processedLock) + { + incomingInit = false; + processed = new RangeSet(); + } + } + + void attach() + { + initReceiver(); + sessionAttach(name.getBytes()); + sessionRequestTimeout(0);//use expiry here only if/when session resume is supported + } + + void resume() + { + synchronized (commands) + { + for (int i = maxComplete + 1; lt(i, commandsOut); i++) + { + Method m = commands[mod(i, commands.length)]; + if (m == null) + { + m = new ExecutionSync(); + m.setId(i); + } + else if (m instanceof MessageTransfer) + { + MessageTransfer xfr = (MessageTransfer)m; + + if (xfr.getHeader() != null) + { + if (xfr.getHeader().get(DeliveryProperties.class) != null) + { + xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true); + } + else + { + Struct[] structs = xfr.getHeader().getStructs(); + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRedelivered(true); + + List<Struct> list = Arrays.asList(structs); + list.add(deliveryProps); + xfr.setHeader(new Header(list)); + } + + } + else + { + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRedelivered(true); + xfr.setHeader(new Header(deliveryProps)); + } + } + sessionCommandPoint(m.getId(), 0); + send(m); + } + + sessionCommandPoint(commandsOut, 0); + sessionFlush(COMPLETED); + resumer = Thread.currentThread(); + state = RESUMING; + listener.resumed(this); + resumer = null; + } + } + + void dump() + { + synchronized (commands) + { + for (Method m : commands) + { + if (m != null) + { + log.debug("%s", m); + } + } + } + } + + final void commandPoint(int id) + { + synchronized (processedLock) + { + this.commandsIn = id; + if (!incomingInit) + { + incomingInit = true; + maxProcessed = commandsIn - 1; + syncPoint = maxProcessed; + } + } + } + + public int getCommandsOut() + { + return commandsOut; + } + + public int getCommandsIn() + { + return commandsIn; + } + + public int nextCommandId() + { + return commandsIn++; + } + + final void identify(Method cmd) + { + if (!incomingInit) + { + throw new IllegalStateException(); + } + + int id = nextCommandId(); + cmd.setId(id); + + if(log.isDebugEnabled()) + { + log.debug("ID: [%s] %s", this.channel, id); + } + + //if ((id % 65536) == 0) + if ((id & 0xff) == 0) + { + flushProcessed(TIMELY_REPLY); + } + } + + public void processed(Method command) + { + processed(command.getId()); + } + + public void processed(int command) + { + processed(new Range(command, command)); + } + + public void processed(int lower, int upper) + { + + processed(new Range(lower, upper)); + } + + public void processed(Range range) + { + log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed); + + boolean flush; + synchronized (processedLock) + { + log.debug("%s", processed); + + if (ge(range.getUpper(), commandsIn)) + { + throw new IllegalArgumentException + ("range exceeds max received command-id: " + range); + } + + processed.add(range); + Range first = processed.getFirst(); + int lower = first.getLower(); + int upper = first.getUpper(); + int old = maxProcessed; + if (le(lower, maxProcessed + 1)) + { + maxProcessed = max(maxProcessed, upper); + } + boolean synced = ge(maxProcessed, syncPoint); + flush = lt(old, syncPoint) && synced; + if (synced) + { + syncPoint = maxProcessed; + } + } + if (flush) + { + flushProcessed(); + } + } + + void flushExpected() + { + RangeSet rs = new RangeSet(); + synchronized (processedLock) + { + if (incomingInit) + { + rs.add(commandsIn); + } + } + sessionExpected(rs, null); + } + + public void flushProcessed(Option ... options) + { + RangeSet copy; + synchronized (processedLock) + { + copy = processed.copy(); + } + + synchronized (commands) + { + if (state == DETACHED || state == CLOSING) + { + return; + } + if (copy.size() > 0) + { + sessionCompleted(copy, options); + } + } + } + + void knownComplete(RangeSet kc) + { + synchronized (processedLock) + { + RangeSet newProcessed = new RangeSet(); + for (Range pr : processed) + { + for (Range kr : kc) + { + for (Range r : pr.subtract(kr)) + { + newProcessed.add(r); + } + } + } + this.processed = newProcessed; + } + } + + void syncPoint() + { + int id = getCommandsIn() - 1; + log.debug("%s synced to %d", this, id); + boolean flush; + synchronized (processedLock) + { + syncPoint = id; + flush = ge(maxProcessed, syncPoint); + } + if (flush) + { + flushProcessed(); + } + } + + protected boolean complete(int lower, int upper) + { + //avoid autoboxing + if(log.isDebugEnabled()) + { + log.debug("%s complete(%d, %d)", this, lower, upper); + } + synchronized (commands) + { + int old = maxComplete; + for (int id = max(maxComplete, lower); le(id, upper); id++) + { + int idx = mod(id, commands.length); + Method m = commands[idx]; + if (m != null) + { + commandBytes -= m.getBodySize(); + m.complete(); + commands[idx] = null; + } + } + if (le(lower, maxComplete + 1)) + { + maxComplete = max(maxComplete, upper); + } + log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); + commands.notifyAll(); + return gt(maxComplete, old); + } + } + + void received(Method m) + { + m.delegate(this, delegate); + } + + private void send(Method m) + { + m.setChannel(channel); + connection.send(m); + + if (!m.isBatch()) + { + connection.flush(); + } + } + + protected boolean isFull(int id) + { + return isCommandsFull(id) || isBytesFull(); + } + + protected boolean isBytesFull() + { + return commandBytes >= byteLimit; + } + + protected boolean isCommandsFull(int id) + { + return id - maxComplete >= commands.length; + } + + public void invoke(Method m) + { + invoke(m,(Runnable)null); + } + + public void invoke(Method m, Runnable postIdSettingAction) + { + if (m.getEncodedTrack() == Frame.L4) + { + + if (state == DETACHED && transacted) + { + state = CLOSED; + delegate.closed(this); + connection.removeSession(this); + throw new SessionException( + "Session failed over, possibly in the middle of a transaction. " + + "Closing the session. Any Transaction in progress will be rolledback."); + } + + if (m.hasPayload()) + { + acquireCredit(); + } + + synchronized (commands) + { + if (state == DETACHED && m.isUnreliable()) + { + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + return; + } + } + + if (state != OPEN && state != CLOSED && state != CLOSING) + { + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && (state != OPEN && state != CLOSED)) + { + w.await(); + } + } + } + + switch (state) + { + case OPEN: + break; + case RESUMING: + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + throw new SessionException + ("timed out waiting for resume to finish"); + } + break; + case CLOSING: + case CLOSED: + ExecutionException exc = getException(); + if (exc != null) + { + throw new SessionException(exc); + } + else + { + throw new SessionClosedException(); + } + default: + throw new SessionException + (String.format + ("timed out waiting for session to become open " + + "(state=%s)", state)); + } + + int next; + next = commandsOut++; + m.setId(next); + if(postIdSettingAction != null) + { + postIdSettingAction.run(); + } + + if (isFull(next)) + { + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && isFull(next) && state != CLOSED) + { + if (state == OPEN || state == RESUMING) + { + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (!closing) + { + // if expiry is > 0 then this will + // happen again on resume + log.error(e, "error sending flush (full replay buffer)"); + } + else + { + e.rethrow(); + } + } + } + w.await(); + } + } + + if (state == CLOSED) + { + ExecutionException exc = getException(); + if (exc != null) + { + throw new SessionException(exc); + } + else + { + throw new SessionClosedException(); + } + } + + if (isFull(next)) + { + throw new SessionException("timed out waiting for completion"); + } + + if (next == 0) + { + sessionCommandPoint(0, 0); + } + + boolean replayTransfer = !closing && !transacted && + m instanceof MessageTransfer && + ! m.isUnreliable(); + + if ((replayTransfer) || m.hasCompletionListener()) + { + commands[mod(next, commands.length)] = m; + commandBytes += m.getBodySize(); + } + if (autoSync) + { + m.setSync(true); + } + needSync = !m.isSync(); + + try + { + send(m); + } + catch (SenderException e) + { + if (!closing) + { + // if we are not closing then this will happen + // again on resume + log.error(e, "error sending command"); + } + else + { + e.rethrow(); + } + } + if (autoSync) + { + sync(); + } + + // flush every 64K commands to avoid ambiguity on + // wraparound + if (shouldIssueFlush(next)) + { + try + { + sessionFlush(COMPLETED); + } + catch (SenderException e) + { + if (!closing) + { + // if expiry is > 0 then this will happen + // again on resume + log.error(e, "error sending flush (periodic)"); + } + else + { + e.rethrow(); + } + } + } + } + } + else + { + send(m); + } + } + + protected boolean shouldIssueFlush(int next) + { + return (next % 65536) == 0; + } + + public void sync() + { + sync(timeout); + } + + public void sync(long timeout) + { + log.debug("%s sync()", this); + synchronized (commands) + { + int point = commandsOut - 1; + + if (needSync && lt(maxComplete, point)) + { + executionSync(SYNC); + } + + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) + { + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); + w.await(); + } + + if (lt(maxComplete, point)) + { + if (state != CLOSED) + { + throw new SessionException( + String.format("timed out waiting for sync: complete = %s, point = %s", + maxComplete, point)); + } + else + { + ExecutionException ee = getException(); + if (ee != null) + { + throw new SessionException(ee); + } + } + } + } + } + + private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>(); + private ExecutionException exception = null; + + void result(int command, Struct result) + { + ResultFuture<?> future; + synchronized (results) + { + future = results.remove(command); + } + + if (future != null) + { + future.set(result); + } + else + { + log.warn("Received a response to a command" + + " that's no longer valid on the client side." + + " [ command id : %s , result : %s ]",command, result); + } + } + + void setException(ExecutionException exc) + { + synchronized (results) + { + if (exception != null) + { + throw new IllegalStateException( + String.format("too many exceptions: %s, %s", exception, exc)); + } + exception = exc; + } + } + + private ConnectionClose close = null; + + void closeCode(ConnectionClose close) + { + this.close = close; + } + + ExecutionException getException() + { + synchronized (results) + { + return exception; + } + } + + protected <T> Future<T> invoke(Method m, Class<T> klass) + { + synchronized (commands) + { + int command = commandsOut; + ResultFuture<T> future = new ResultFuture<T>(klass); + synchronized (results) + { + results.put(command, future); + } + invoke(m); + return future; + } + } + + private class ResultFuture<T> implements Future<T> + { + + private final Class<T> klass; + private T result; + + private ResultFuture(Class<T> klass) + { + this.klass = klass; + } + + private void set(Struct result) + { + synchronized (this) + { + this.result = klass.cast(result); + notifyAll(); + } + } + + public T get(long timeout) + { + synchronized (this) + { + Waiter w = new Waiter(this, timeout); + while (w.hasTime() && state != CLOSED && !isDone()) + { + log.debug("%s waiting for result: %s", Session.this, this); + w.await(); + } + } + + if (isDone()) + { + return result; + } + else if (state == CLOSED) + { + throw new SessionException(getException()); + } + else + { + throw new SessionException( + String.format("%s timed out waiting for result: %s", + Session.this, this)); + } + } + + public T get() + { + return get(timeout); + } + + public boolean isDone() + { + return result != null; + } + + public String toString() + { + return String.format("Future(%s)", isDone() ? result : klass); + } + + } + + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + byte[] body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + ByteBuffer.wrap(body), _options); + } + + public final void messageTransfer(String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + Header header, + String body, + Option ... _options) { + messageTransfer(destination, acceptMode, acquireMode, header, + toUTF8(body), _options); + } + + public void close() + { + synchronized (commands) + { + state = CLOSING; + setClose(true); + sessionRequestTimeout(0); + sessionDetach(name.getBytes()); + + awaitClose(); + + + } + } + + protected void awaitClose() + { + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED) + { + w.await(); + } + + if (state != CLOSED) + { + throw new SessionException("close() timed out"); + } + } + + public void exception(Throwable t) + { + log.error(t, "caught exception"); + } + + public void closed() + { + synchronized (commands) + { + if (closing || getException() != null) + { + state = CLOSED; + } + else + { + state = DETACHED; + } + + commands.notifyAll(); + + synchronized (results) + { + for (ResultFuture<?> result : results.values()) + { + synchronized(result) + { + result.notifyAll(); + } + } + } + if(state == CLOSED) + { + delegate.closed(this); + } + else + { + delegate.detached(this); + } + } + + if(state == CLOSED) + { + connection.removeSession(this); + listener.closed(this); + } + } + + public boolean isClosing() + { + return state == CLOSED || state == CLOSING; + } + + public String toString() + { + return String.format("ssn:%s", name); + } + + public void setTransacted(boolean b) { + this.transacted = b; + } + + public boolean isTransacted(){ + return transacted; + } + +} |