summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java190
1 files changed, 164 insertions, 26 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 62a1e2b0f5..d63934e6be 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -21,6 +21,11 @@
package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
import static org.apache.qpid.util.Serial.gt;
import java.security.Principal;
@@ -30,17 +35,21 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+
import javax.security.auth.Subject;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -64,24 +73,19 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.DistributedTransaction;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,13 +104,12 @@ public class ServerSession extends Session
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+ private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -356,7 +359,15 @@ public class ServerSession extends Session
public void onClose()
{
- _transaction.rollback();
+ if(_transaction instanceof LocalTransaction)
+ {
+ _transaction.rollback();
+ }
+ else if(_transaction instanceof DistributedTransaction)
+ {
+ getVirtualHost().getDtxRegistry().endAssociations(this);
+ }
+
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease(true);
@@ -392,6 +403,9 @@ public class ServerSession extends Session
public void onRollback()
{
+ // The client has acknowledge the message and therefore have seen it.
+ // In the event of rollback, the message must be marked as redelivered.
+ entry.setRedelivered();
entry.release();
}
});
@@ -452,6 +466,95 @@ public class ServerSession extends Session
_txnStarts.incrementAndGet();
}
+ public void selectDtx()
+ {
+ _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost());
+
+ }
+
+
+ public void startDtx(Xid xid, boolean join, boolean resume)
+ throws JoinAndResumeDtxException,
+ UnknownDtxBranchException,
+ AlreadyKnownDtxException,
+ DtxNotSelectedException
+ {
+ DistributedTransaction distributedTransaction = assertDtxTransaction();
+ distributedTransaction.start(xid, join, resume);
+ }
+
+
+ public void endDtx(Xid xid, boolean fail, boolean suspend)
+ throws NotAssociatedDtxException,
+ UnknownDtxBranchException,
+ DtxNotSelectedException,
+ SuspendAndFailDtxException, TimeoutDtxException
+ {
+ DistributedTransaction distributedTransaction = assertDtxTransaction();
+ distributedTransaction.end(xid, fail, suspend);
+ }
+
+
+ public long getTimeoutDtx(Xid xid)
+ throws UnknownDtxBranchException
+ {
+ return getVirtualHost().getDtxRegistry().getTimeout(xid);
+ }
+
+
+ public void setTimeoutDtx(Xid xid, long timeout)
+ throws UnknownDtxBranchException
+ {
+ getVirtualHost().getDtxRegistry().setTimeout(xid, timeout);
+ }
+
+
+ public void prepareDtx(Xid xid)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+ {
+ getVirtualHost().getDtxRegistry().prepare(xid);
+ }
+
+ public void commitDtx(Xid xid, boolean onePhase)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+ {
+ getVirtualHost().getDtxRegistry().commit(xid, onePhase);
+ }
+
+
+ public void rollbackDtx(Xid xid)
+ throws UnknownDtxBranchException,
+ IncorrectDtxStateException, AMQStoreException, TimeoutDtxException
+ {
+ getVirtualHost().getDtxRegistry().rollback(xid);
+ }
+
+
+ public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
+ {
+ getVirtualHost().getDtxRegistry().forget(xid);
+ }
+
+ public List<Xid> recoverDtx()
+ {
+ return getVirtualHost().getDtxRegistry().recover();
+ }
+
+ private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
+ {
+ if(_transaction instanceof DistributedTransaction)
+ {
+ return (DistributedTransaction) _transaction;
+ }
+ else
+ {
+ throw new DtxNotSelectedException();
+ }
+ }
+
+
public void commit()
{
_transaction.commit();
@@ -524,12 +627,12 @@ public class ServerSession extends Session
public Principal getAuthorizedPrincipal()
{
- return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
+ return getConnection().getAuthorizedPrincipal();
}
public Subject getAuthorizedSubject()
{
- return ((ServerConnection) getConnection()).getAuthorizedSubject();
+ return getConnection().getAuthorizedSubject();
}
public void addSessionCloseTask(Task task)
@@ -544,7 +647,7 @@ public class ServerSession extends Session
public Object getReference()
{
- return ((ServerConnection) getConnection()).getReference();
+ return getConnection().getReference();
}
public MessageStore getMessageStore()
@@ -624,7 +727,7 @@ public class ServerSession extends Session
public AMQConnectionModel getConnectionModel()
{
- return (ServerConnection) getConnection();
+ return getConnection();
}
public String getClientID()
@@ -632,6 +735,12 @@ public class ServerSession extends Session
return getConnection().getClientId();
}
+ @Override
+ public ServerConnection getConnection()
+ {
+ return (ServerConnection) super.getConnection();
+ }
+
public LogActor getLogActor()
{
return _actor;
@@ -676,7 +785,8 @@ public class ServerSession extends Session
public void block(AMQQueue queue)
{
- if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+
+ if(_blockingQueues.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -694,7 +804,7 @@ public class ServerSession extends Session
{
if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
{
- if(_blocking.compareAndSet(true,false))
+ if(_blocking.compareAndSet(true,false) && !isClosing())
{
_actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
@@ -710,11 +820,19 @@ public class ServerSession extends Session
}
}
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ return ((inbound instanceof MessageTransferMessage)
+ && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference())
+ || ((inbound instanceof MessageMetaData_0_10)
+ && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference());
+ }
+
public String toLogString()
{
- long connectionId = getConnection() instanceof ServerConnection
- ? ((ServerConnection) getConnection()).getConnectionId()
+ long connectionId = super.getConnection() instanceof ServerConnection
+ ? getConnection().getConnectionId()
: -1;
String remoteAddress = _connectionConfig instanceof ProtocolEngine
@@ -749,6 +867,16 @@ public class ServerSession extends Session
}
}
+ void stopSubscriptions()
+ {
+ final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+ for (Subscription_0_10 subscription_0_10 : subscriptions)
+ {
+ subscription_0_10.stop();
+ }
+ }
+
+
public void receivedComplete()
{
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
@@ -889,4 +1017,14 @@ public class ServerSession extends Session
return _future.isComplete();
}
}
+
+ protected void setClose(boolean close)
+ {
+ super.setClose(close);
+ }
+
+ public int compareTo(AMQSessionModel session)
+ {
+ return getId().toString().compareTo(session.getID().toString());
+ }
}