summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java22
1 files changed, 17 insertions, 5 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 95c3e4669f..8b29d6e424 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -25,7 +25,6 @@ import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
-
import static org.apache.qpid.transport.Option.COMPLETED;
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.TIMELY_REPLY;
@@ -132,19 +131,31 @@ public class Session extends SessionInvoker
private final Object stateLock = new Object();
private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+ private boolean _isNoReplay = false;
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
}
+ protected Session(Connection connection, Binary name, long expiry, boolean noReplay)
+ {
+ this(connection, new SessionDelegate(), name, expiry, noReplay);
+ }
+
protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
+ this(connection, delegate, name, expiry,false);
+ }
+
+ protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay)
+ {
this.connection = connection;
this.delegate = delegate;
this.name = name;
this.expiry = expiry;
this.closing = false;
+ this._isNoReplay = noReplay;
initReceiver();
}
@@ -282,6 +293,7 @@ public class Session extends SessionInvoker
void resume()
{
_failoverRequired.set(false);
+
synchronized (commandsLock)
{
attach();
@@ -414,7 +426,7 @@ public class Session extends SessionInvoker
if(log.isDebugEnabled())
{
- log.debug("ID: [%s] %s", this.channel, id);
+ log.debug("identify: ch=%s, commandId=%s", this.channel, id);
}
if ((id & 0xff) == 0)
@@ -443,7 +455,7 @@ public class Session extends SessionInvoker
{
if(log.isDebugEnabled())
{
- log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed);
+ log.debug("%s ch=%s processed([%d,%d]) %s %s", this, channel, lower, upper, syncPoint, maxProcessed);
}
boolean flush;
@@ -451,7 +463,7 @@ public class Session extends SessionInvoker
{
if(log.isDebugEnabled())
{
- log.debug("%s", processed);
+ log.debug("%s processed: %s", this, processed);
}
if (ge(upper, commandsIn))
@@ -740,7 +752,7 @@ public class Session extends SessionInvoker
sessionCommandPoint(0, 0);
}
- boolean replayTransfer = !closing && !transacted &&
+ boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();