summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
commit3452ccc779803371e6ba04a1a3b9bec0d973d11d (patch)
tree2e386ff54b83830477a70e694e8f5c914da13316
parentf824b8b88b8065716006380306f0a6274ec85f2e (diff)
downloadqpid-python-3452ccc779803371e6ba04a1a3b9bec0d973d11d.tar.gz
QPID-1665: add a timer to ensure message acking is never delayed more than 1000 ms by default, this is configurable by qpid.session.max_ack_delay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@745892 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java60
-rw-r--r--qpid/java/common/Composite.tpl1
-rw-r--r--qpid/java/common/Option.tpl1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java15
5 files changed, 72 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 45b74f317e..8ab8110dd4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -52,9 +52,12 @@ import static org.apache.qpid.transport.Option.*;
import javax.jms.*;
import javax.jms.IllegalStateException;
+import java.util.Date;
import java.util.HashMap;
import java.util.UUID;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
/**
* This is a 0.10 Session
@@ -68,6 +71,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
+ private static Timer timer = new Timer("ack-flusher", true);
+
/**
* The underlying QpidSession
@@ -83,6 +88,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// a ref on the qpid connection
protected org.apache.qpid.transport.Connection _qpidConnection;
+ private TimerTask flushTask = new TimerTask()
+ {
+ public void run()
+ {
+ try
+ {
+ flushAcknowledgments();
+ }
+ catch (Throwable t)
+ {
+ _logger.error("error flushing acks", t);
+ }
+ }
+ };
private RangeSet unacked = new RangeSet();
private int unackedCount = 0;
@@ -119,6 +138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
_qpidSession.txSelect();
}
+ timer.schedule(flushTask, new Date(), Long.getLong("qpid.session.max_ack_delay", 1000));
}
/**
@@ -142,14 +162,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private void addUnacked(int id)
{
- unacked.add(id);
- unackedCount++;
+ synchronized (unacked)
+ {
+ unacked.add(id);
+ unackedCount++;
+ }
}
private void clearUnacked()
{
- unacked.clear();
- unackedCount = 0;
+ synchronized (unacked)
+ {
+ unacked.clear();
+ unackedCount = 0;
+ }
}
//------- overwritten methods of class AMQSession
@@ -196,19 +222,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ if (unackedCount >= prefetch/2)
{
flushAcknowledgments();
- }
+ }
}
void flushAcknowledgments()
{
- if (unackedCount > 0)
+ synchronized (unacked)
{
- messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- clearUnacked();
+ if (unackedCount > 0)
+ {
+ messageAcknowledge
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ clearUnacked();
+ }
}
}
@@ -222,7 +251,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
ssn.flushProcessed(accept ? BATCH : NONE);
if (accept)
{
- ssn.messageAccept(ranges);
+ ssn.messageAccept(ranges, UNRELIABLE);
}
}
@@ -267,6 +296,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ flushTask.cancel();
flushAcknowledgments();
getQpidSession().sync();
getQpidSession().close();
@@ -692,7 +722,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
String binddingKey = "";
for(AMQShortString key : amqd.getBindingKeys())
{
- binddingKey = binddingKey + "_" + key.toString();
+ binddingKey = binddingKey + "_" + key.toString();
}
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
@@ -722,7 +752,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
-
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -800,14 +830,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Store non committed messages for this session
* With 0.10 messages are consumed with window mode, we must send a completion
- * before the window size is reached so credits don't dry up.
+ * before the window size is reached so credits don't dry up.
* @param id
*/
@Override protected void addDeliveredMessage(long id)
{
_txRangeSet.add((int) id);
_txSize++;
- // this is a heuristic, we may want to have that configurable
+ // this is a heuristic, we may want to have that configurable
if (_connection.getMaxPrefetch() == 1 ||
_connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
{
diff --git a/qpid/java/common/Composite.tpl b/qpid/java/common/Composite.tpl
index 17cf846d8c..c46d0a12cc 100644
--- a/qpid/java/common/Composite.tpl
+++ b/qpid/java/common/Composite.tpl
@@ -145,6 +145,7 @@ if options or base == "Method":
if base == "Method":
out(""" case SYNC: this.setSync(true); break;
case BATCH: this.setBatch(true); break;
+ case UNRELIABLE: this.setUnreliable(true); break;
""")
out(""" case NONE: break;
default: throw new IllegalArgumentException("invalid option: " + _options[i]);
diff --git a/qpid/java/common/Option.tpl b/qpid/java/common/Option.tpl
index 776b211ad5..c22b35b999 100644
--- a/qpid/java/common/Option.tpl
+++ b/qpid/java/common/Option.tpl
@@ -37,5 +37,6 @@ for c in composites:
options[option] = None
out(" $option,\n")}
BATCH,
+ UNRELIABLE,
NONE
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 09cfd119be..611c742fb1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -48,6 +48,7 @@ public abstract class Method extends Struct implements ProtocolEvent
private boolean idSet = false;
private boolean sync = false;
private boolean batch = false;
+ private boolean unreliable = false;
public final int getId()
{
@@ -90,6 +91,16 @@ public abstract class Method extends Struct implements ProtocolEvent
this.batch = value;
}
+ public final boolean isUnreliable()
+ {
+ return unreliable;
+ }
+
+ final void setUnreliable(boolean value)
+ {
+ this.unreliable = value;
+ }
+
public abstract boolean hasPayload();
public Header getHeader()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index f94edcc655..4079097f96 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -384,7 +384,15 @@ public class Session extends SessionInvoker
{
copy = processed.copy();
}
- sessionCompleted(copy, options);
+
+ synchronized (commands)
+ {
+ if (state == DETACHED)
+ {
+ return;
+ }
+ sessionCompleted(copy, options);
+ }
}
void knownComplete(RangeSet kc)
@@ -484,6 +492,11 @@ public class Session extends SessionInvoker
synchronized (commands)
{
+ if (state == DETACHED && m.isUnreliable())
+ {
+ return;
+ }
+
if (state != OPEN && state != CLOSED)
{
Waiter w = new Waiter(commands, timeout);