summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java149
1 files changed, 96 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 4c3d768020..aa0ff66545 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,17 +27,18 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.util.Serial;
-import org.apache.qpidity.nclient.Session;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.transport.MessageCreditUnit;
-import org.apache.qpidity.transport.MessageFlowMode;
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Option;
-import org.apache.qpidity.transport.ExchangeBoundResult;
-import org.apache.qpidity.transport.Future;
+import org.apache.qpid.nclient.Session;
+import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ import java.util.Map;
/**
* This is a 0.10 Session
*/
-public class AMQSession_0_10 extends AMQSession
+public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
{
/**
@@ -71,8 +72,11 @@ public class AMQSession_0_10 extends AMQSession
private Object _currentExceptionLock = new Object();
private QpidException _currentException;
- // a ref on the qpidity connection
- protected org.apache.qpidity.nclient.Connection _qpidConnection;
+ // a ref on the qpid connection
+ protected org.apache.qpid.nclient.Connection _qpidConnection;
+
+ private RangeSet unacked = new RangeSet();
+ private int unackedCount = 0;
/**
* USed to store the range of in tx messages
@@ -93,7 +97,7 @@ public class AMQSession_0_10 extends AMQSession
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
* @param qpidConnection The qpid connection
*/
- AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
@@ -123,7 +127,7 @@ public class AMQSession_0_10 extends AMQSession
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
* @param qpidConnection The connection
*/
- AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -131,6 +135,18 @@ public class AMQSession_0_10 extends AMQSession
defaultPrefetchHigh, defaultPrefetchLow);
}
+ private void addUnacked(int id)
+ {
+ unacked.add(id);
+ unackedCount++;
+ }
+
+ private void clearUnacked()
+ {
+ unacked.clear();
+ unackedCount = 0;
+ }
+
//------- overwritten methods of class AMQSession
/**
@@ -140,6 +156,7 @@ public class AMQSession_0_10 extends AMQSession
* @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
* delivery tag, <tt>false</tt> to just acknowledge that message.
*/
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
if (_logger.isDebugEnabled())
@@ -147,14 +164,13 @@ public class AMQSession_0_10 extends AMQSession
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
}
// acknowledge this message
- RangeSet ranges = new RangeSet();
if (multiple)
{
for (Long messageTag : _unacknowledgedMessageTags)
{
if( messageTag <= deliveryTag )
{
- ranges.add((int) (long) messageTag);
+ addUnacked(messageTag.intValue());
_unacknowledgedMessageTags.remove(messageTag);
}
}
@@ -163,10 +179,26 @@ public class AMQSession_0_10 extends AMQSession
}
else
{
- ranges.add((int) deliveryTag);
+ addUnacked((int) deliveryTag);
_unacknowledgedMessageTags.remove(deliveryTag);
}
- getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+ long prefetch = getAMQConnection().getMaxPrefetch();
+
+ if (unackedCount >= prefetch/2)
+ {
+ flushAcknowledgments();
+ }
+ }
+
+ void flushAcknowledgments()
+ {
+ if (unackedCount > 0)
+ {
+ getQpidSession().messageAcknowledge
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ clearUnacked();
+ }
}
/**
@@ -210,6 +242,7 @@ public class AMQSession_0_10 extends AMQSession
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ flushAcknowledgments();
getQpidSession().sync();
getQpidSession().close();
getCurrentException();
@@ -243,15 +276,16 @@ public class AMQSession_0_10 extends AMQSession
* @param durable If set when creating a new queue,
* the queue will be marked as durable.
* @param exclusive Exclusive queues can only be used from one connection at a time.
+ * @param arguments Exclusive queues can only be used from one connection at a time.
* @throws AMQException
* @throws FailoverException
*/
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive) throws AMQException, FailoverException
+ final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
{
- getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
- autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
- exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+ getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
+ autoDelete ? Option.AUTO_DELETE : Option.NONE,
+ exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -311,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Create an 0_10 message consumer
*/
- public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final FieldTable ft, final boolean noConsume,
@@ -372,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession
* This method is invoked when a consumer is creted
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, AMQShortString tag)
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait, String messageSelector, int tag)
throws AMQException, FailoverException
{
boolean preAcquire;
@@ -382,32 +416,34 @@ public class AMQSession_0_10 extends AMQSession
preAcquire = ( ! consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
|| !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+ getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
- new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ (BasicMessageConsumer_0_10) consumer, null,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
{
throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
}
+ String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+
if (! prefetch())
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
else
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
@@ -418,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent,
long producerId)
{
@@ -476,9 +512,9 @@ public class AMQSession_0_10 extends AMQSession
arguments.put("no-local", true);
}
getQpidSession().queueDeclare(res.toString(), null, arguments,
- amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
- amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
- !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ amqd.isDurable() ? Option.DURABLE : Option.NONE,
+ !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
// We need to sync so that we get notify of an error.
getQpidSession().sync();
@@ -508,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
}
}
else
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer_0_10 consumer : _consumers.values())
{
+ String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
try
{
@@ -522,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession
{
if (consumer.getMessageListener() != null)
{
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE, 1);
}
}
else
{
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
+ .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
}
catch (Exception e)
{
@@ -561,7 +598,7 @@ public class AMQSession_0_10 extends AMQSession
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpidity.nclient.Session getQpidSession()
+ protected org.apache.qpid.nclient.Session getQpidSession()
{
return _qpidSession;
}
@@ -594,7 +631,7 @@ public class AMQSession_0_10 extends AMQSession
try
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
- sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
+ sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null);
sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
result.setQueueName(result.getRoutingKey());
}
@@ -612,7 +649,7 @@ public class AMQSession_0_10 extends AMQSession
/**
* Lstener for qpid protocol exceptions
*/
- private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
+ private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener
{
public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
@@ -681,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession
AMQTopic origTopic=checkValidTopic(topic);
AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
@@ -732,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession
}
}
- subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -740,7 +777,7 @@ public class AMQSession_0_10 extends AMQSession
return subscriber;
}
- Long requestQueueDepth(AMQDestination amqd)
+ protected Long requestQueueDepth(AMQDestination amqd)
{
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
@@ -757,10 +794,11 @@ public class AMQSession_0_10 extends AMQSession
_txRangeSet.add((int) id);
_txSize++;
// this is a heuristic, we may want to have that configurable
- if( _txSize > _connection.getMaxPrefetch() / 2 )
+ if (_connection.getMaxPrefetch() == 1 ||
+ _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
{
- // send completed so consumer credits don't dry up
- getQpidSession().messageAcknowledge(_txRangeSet, false);
+ // send completed so consumer credits don't dry up
+ getQpidSession().messageAcknowledge(_txRangeSet, false);
}
}
@@ -787,14 +825,19 @@ public class AMQSession_0_10 extends AMQSession
}
}
- final boolean tagLE(long tag1, long tag2)
+ protected final boolean tagLE(long tag1, long tag2)
{
return Serial.le((int) tag1, (int) tag2);
}
- final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_10;
+ }
+
}