diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 190 |
1 files changed, 164 insertions, 26 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 62a1e2b0f5..d63934e6be 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -21,6 +21,11 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.txn.RollbackOnlyDtxException; +import org.apache.qpid.server.txn.TimeoutDtxException; import static org.apache.qpid.util.Serial.gt; import java.security.Principal; @@ -30,17 +35,21 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import javax.security.auth.Subject; + import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; @@ -64,24 +73,19 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; +import org.apache.qpid.server.txn.DistributedTransaction; +import org.apache.qpid.server.txn.DtxNotSelectedException; +import org.apache.qpid.server.txn.IncorrectDtxStateException; +import org.apache.qpid.server.txn.JoinAndResumeDtxException; import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.NotAssociatedDtxException; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.SuspendAndFailDtxException; +import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlow; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageSetFlowMode; -import org.apache.qpid.transport.MessageStop; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,13 +104,12 @@ public class ServerSession extends Session private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - public static interface MessageDispositionChangeListener { public void onAccept(); @@ -356,7 +359,15 @@ public class ServerSession extends Session public void onClose() { - _transaction.rollback(); + if(_transaction instanceof LocalTransaction) + { + _transaction.rollback(); + } + else if(_transaction instanceof DistributedTransaction) + { + getVirtualHost().getDtxRegistry().endAssociations(this); + } + for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { listener.onRelease(true); @@ -392,6 +403,9 @@ public class ServerSession extends Session public void onRollback() { + // The client has acknowledge the message and therefore have seen it. + // In the event of rollback, the message must be marked as redelivered. + entry.setRedelivered(); entry.release(); } }); @@ -452,6 +466,95 @@ public class ServerSession extends Session _txnStarts.incrementAndGet(); } + public void selectDtx() + { + _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost()); + + } + + + public void startDtx(Xid xid, boolean join, boolean resume) + throws JoinAndResumeDtxException, + UnknownDtxBranchException, + AlreadyKnownDtxException, + DtxNotSelectedException + { + DistributedTransaction distributedTransaction = assertDtxTransaction(); + distributedTransaction.start(xid, join, resume); + } + + + public void endDtx(Xid xid, boolean fail, boolean suspend) + throws NotAssociatedDtxException, + UnknownDtxBranchException, + DtxNotSelectedException, + SuspendAndFailDtxException, TimeoutDtxException + { + DistributedTransaction distributedTransaction = assertDtxTransaction(); + distributedTransaction.end(xid, fail, suspend); + } + + + public long getTimeoutDtx(Xid xid) + throws UnknownDtxBranchException + { + return getVirtualHost().getDtxRegistry().getTimeout(xid); + } + + + public void setTimeoutDtx(Xid xid, long timeout) + throws UnknownDtxBranchException + { + getVirtualHost().getDtxRegistry().setTimeout(xid, timeout); + } + + + public void prepareDtx(Xid xid) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().prepare(xid); + } + + public void commitDtx(Xid xid, boolean onePhase) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().commit(xid, onePhase); + } + + + public void rollbackDtx(Xid xid) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().rollback(xid); + } + + + public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException + { + getVirtualHost().getDtxRegistry().forget(xid); + } + + public List<Xid> recoverDtx() + { + return getVirtualHost().getDtxRegistry().recover(); + } + + private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException + { + if(_transaction instanceof DistributedTransaction) + { + return (DistributedTransaction) _transaction; + } + else + { + throw new DtxNotSelectedException(); + } + } + + public void commit() { _transaction.commit(); @@ -524,12 +627,12 @@ public class ServerSession extends Session public Principal getAuthorizedPrincipal() { - return ((ServerConnection) getConnection()).getAuthorizedPrincipal(); + return getConnection().getAuthorizedPrincipal(); } public Subject getAuthorizedSubject() { - return ((ServerConnection) getConnection()).getAuthorizedSubject(); + return getConnection().getAuthorizedSubject(); } public void addSessionCloseTask(Task task) @@ -544,7 +647,7 @@ public class ServerSession extends Session public Object getReference() { - return ((ServerConnection) getConnection()).getReference(); + return getConnection().getReference(); } public MessageStore getMessageStore() @@ -624,7 +727,7 @@ public class ServerSession extends Session public AMQConnectionModel getConnectionModel() { - return (ServerConnection) getConnection(); + return getConnection(); } public String getClientID() @@ -632,6 +735,12 @@ public class ServerSession extends Session return getConnection().getClientId(); } + @Override + public ServerConnection getConnection() + { + return (ServerConnection) super.getConnection(); + } + public LogActor getLogActor() { return _actor; @@ -676,7 +785,8 @@ public class ServerSession extends Session public void block(AMQQueue queue) { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + + if(_blockingQueues.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -694,7 +804,7 @@ public class ServerSession extends Session { if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) { - if(_blocking.compareAndSet(true,false)) + if(_blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); @@ -710,11 +820,19 @@ public class ServerSession extends Session } } + public boolean onSameConnection(InboundMessage inbound) + { + return ((inbound instanceof MessageTransferMessage) + && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference()) + || ((inbound instanceof MessageMetaData_0_10) + && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference()); + } + public String toLogString() { - long connectionId = getConnection() instanceof ServerConnection - ? ((ServerConnection) getConnection()).getConnectionId() + long connectionId = super.getConnection() instanceof ServerConnection + ? getConnection().getConnectionId() : -1; String remoteAddress = _connectionConfig instanceof ProtocolEngine @@ -749,6 +867,16 @@ public class ServerSession extends Session } } + void stopSubscriptions() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.stop(); + } + } + + public void receivedComplete() { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); @@ -889,4 +1017,14 @@ public class ServerSession extends Session return _future.isComplete(); } } + + protected void setClose(boolean close) + { + super.setClose(close); + } + + public int compareTo(AMQSessionModel session) + { + return getId().toString().compareTo(session.getID().toString()); + } } |