summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-07-09 13:26:54 +0000
committerRafael H. Schloming <rhs@apache.org>2008-07-09 13:26:54 +0000
commitfa6532edf09b45201f90beaeef62702b00d35947 (patch)
tree1063db9bc554758657d6d45da2107c856ae0d804 /java/client
parent6cddd1d794278e7e68163e88851f09553dd5123f (diff)
downloadqpid-python-fa6532edf09b45201f90beaeef62702b00d35947.tar.gz
Primarily profiling driven changes:
- added batched writes of commands/controls issued on a session - copy fragmented frames and segments rather than trying to decode them piecemeal, removed FragmentDecoder - added caching for str8 encode/decode - compute sizes as we encode by going back and filling in the amount of bytes written rather than computing it up front - added SYNC option to commands - renamed NO_OPTION argument to NONE - added a timeout to Client.java - removed use of UUID.fromString in BasicMessageProducer_0_10.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Client.java7
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Session.java60
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java6
11 files changed, 81 insertions, 76 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
index f26e5418b4..100dcf52bf 100755
--- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
@@ -55,7 +55,7 @@ public class TopicListener implements MessageListener
Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
new MessagePartListenerAdapter(this),
- null, Option.NO_OPTION);
+ null, Option.NONE);
// issue credits
// XXX: need to be able to set to null
session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 499fca3833..2d98fd0dcd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -250,9 +250,9 @@ public class AMQSession_0_10 extends AMQSession
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
{
- getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION,
- autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
- exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+ getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
+ autoDelete ? Option.AUTO_DELETE : Option.NONE,
+ exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -387,7 +387,7 @@ public class AMQSession_0_10 extends AMQSession
getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
{
@@ -477,9 +477,9 @@ public class AMQSession_0_10 extends AMQSession
arguments.put("no-local", true);
}
getQpidSession().queueDeclare(res.toString(), null, arguments,
- amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
- amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
- !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ amqd.isDurable() ? Option.DURABLE : Option.NONE,
+ !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
// We need to sync so that we get notify of an error.
getQpidSession().sync();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 9d01fbfaa2..c0cfc21ee2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -453,19 +453,21 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
+ UUID messageId = null;
if (_disableMessageId)
{
message.setJMSMessageID(null);
}
else
{
+ messageId = UUID.randomUUID();
StringBuilder b = new StringBuilder(39);
b.append("ID:");
- b.append(UUID.randomUUID());
+ b.append(messageId);
message.setJMSMessageID(b.toString());
}
- sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -484,8 +486,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait)throws JMSException;
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 5eb066ec36..5e6a5b5f40 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -68,8 +68,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
* Sends a message to a given destination
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
if (message.get010Message() == null)
@@ -84,7 +84,16 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
MessageProperties messageProps = message.get010Message().getMessageProperties();
- // set the delivery properties
+
+ if (messageId != null)
+ {
+ messageProps.setMessageId(messageId);
+ }
+ else if (messageProps.hasMessageId())
+ {
+ messageProps.clearMessageId();
+ }
+
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();
@@ -142,13 +151,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
messageProps.setContentType(contentHeaderProperties.getContentType().toString());
messageProps.setContentLength(message.getContentLength());
- // XXX: fixme
- String mid = message.getJMSMessageID();
- if( mid != null )
- {
- messageProps.setMessageId(UUID.fromString(mid.substring(3)));
- }
-
AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
if (correlationID != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index ff991b1a03..8ca68850eb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.util.UUID;
+
import javax.jms.JMSException;
import javax.jms.Message;
@@ -65,9 +67,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
_protocolHandler.writeFrame(declare);
}
- void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
- int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+ UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
+ boolean immediate, boolean wait) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
destination.getExchangeName(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index 7a107d748b..ce2f2180b1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -76,7 +76,7 @@ public class XAResourceImpl implements XAResource
_logger.debug("commit tx branch with xid: ", xid);
}
Future<XaResult> future =
- _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
// now wait on the future for the result
XaResult result = null;
@@ -129,8 +129,8 @@ public class XAResourceImpl implements XAResource
}
Future<XaResult> future = _xaSession.getQpidSession()
.dtxEnd(convertXid(xid),
- flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
- flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
// now wait on the future for the result
XaResult result = null;
try
@@ -400,8 +400,8 @@ public class XAResourceImpl implements XAResource
}
Future<XaResult> future = _xaSession.getQpidSession()
.dtxStart(convertXid(xid),
- flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
- flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
// now wait on the future for the result
XaResult result = null;
try
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index bc88160137..1125d9d5cb 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
import org.apache.qpid.client.url.URLParser_0_10;
import org.apache.qpid.jms.BrokerDetails;
@@ -60,6 +59,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
private static Logger _logger = LoggerFactory.getLogger(Client.class);
private Condition closeOk;
private boolean closed = false;
+ private long timeout = 60000;
/**
*
@@ -191,7 +191,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
try
{
- negotiationComplete.await();
+ negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
if( connectionDelegate.getUnsupportedProtocol() != null )
{
_conn.close();
@@ -202,7 +202,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
}
catch (InterruptedException e)
{
- //
+ throw new RuntimeException(e);
}
finally
{
@@ -257,7 +257,6 @@ public class Client implements org.apache.qpidity.nclient.Connection
{
try
{
- long timeout = 60000;
long start = System.currentTimeMillis();
long elapsed = 0;
while (!closed && elapsed < timeout)
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
index 6f15f16470..1d9c63df4f 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
@@ -83,7 +83,7 @@ public interface DtxSession extends Session
*
* @param xid Specifies the xid of the transaction branch to be forgotten.
*/
- public void dtxForget(Xid xid);
+ public void dtxForget(Xid xid, Option ... options);
/**
* This method obtains the current transaction timeout value in seconds. If set-timeout was not
@@ -93,7 +93,7 @@ public interface DtxSession extends Session
* @param xid Specifies the xid of the transaction branch used for getting the timeout.
* @return The current transaction timeout value in seconds.
*/
- public Future<GetTimeoutResult> dtxGetTimeout(Xid xid);
+ public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options);
/**
* This method prepares any message produced or consumed on behalf of xid, ready for commitment.
@@ -109,14 +109,14 @@ public interface DtxSession extends Session
* <p/>
* xa-rbtimeout: The work represented by this transaction branch took too long.
*/
- public Future<XaResult> dtxPrepare(Xid xid);
+ public Future<XaResult> dtxPrepare(Xid xid, Option ... options);
/**
* This method is called to obtain a list of transaction branches that are in a prepared or
* heuristically completed state.
* @return a array of xids to be recovered.
*/
- public Future<RecoverResult> dtxRecover();
+ public Future<RecoverResult> dtxRecover(Option ... options);
/**
* This method rolls back the work associated with xid. Any produced messages are discarded and
@@ -125,7 +125,7 @@ public interface DtxSession extends Session
* @param xid Specifies the xid of the transaction branch to be rolled back.
* @return Confirms to the client that the transaction branch is rolled back or specifies the error condition.
*/
- public Future<XaResult> dtxRollback(Xid xid);
+ public Future<XaResult> dtxRollback(Xid xid, Option ... options);
/**
* Sets the specified transaction branch timeout value in seconds.
@@ -133,5 +133,5 @@ public interface DtxSession extends Session
* @param xid Specifies the xid of the transaction branch for setting the timeout.
* @param timeout The transaction timeout value in seconds.
*/
- public void dtxSetTimeout(Xid xid, long timeout);
+ public void dtxSetTimeout(Xid xid, long timeout, Option ... options);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
index 717ea43654..5e18ebb026 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
@@ -65,9 +65,9 @@ public interface Session
public void close();
- public void sessionDetach(byte[] name);
+ public void sessionDetach(byte[] name, Option ... options);
- public void sessionRequestTimeout(long expiry);
+ public void sessionRequestTimeout(long expiry, Option ... options);
public byte[] getName();
@@ -153,7 +153,8 @@ public interface Session
*
* @param acquireMode Indicates whether or not the transferred message has been acquired.
*/
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode);
+ public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+ Option ... options);
/**
* Make a set of headers to be sent together with a message
@@ -207,7 +208,7 @@ public interface Session
* <ul>
* <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this
* subscription can access the queue.
- * <li>{@link Option#NO_OPTION}: <p> This is an empty option, and has no effect.
+ * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect.
* </ul>
*
* @param queue The queue that the receiver is receiving messages from.
@@ -230,7 +231,7 @@ public interface Session
* @param listener The listener for this destination. To transfer large messages
* use a {@link org.apache.qpidity.nclient.MessagePartListener}.
* @param options Set of options. Valid options are {{@link Option#EXCLUSIVE}
- * and {@link Option#NO_OPTION}.
+ * and {@link Option#NONE}.
* @param filter A set of filters for the subscription. The syntax and semantics of these filters varies
* according to the provider's implementation.
*/
@@ -246,7 +247,7 @@ public interface Session
*
* @param destination The destination to be cancelled.
*/
- public void messageCancel(String destination);
+ public void messageCancel(String destination, Option ... options);
/**
* Associate a message listener with a destination.
@@ -274,7 +275,7 @@ public interface Session
* @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
* <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
*/
- public void messageSetFlowMode(String destination, MessageFlowMode mode);
+ public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options);
/**
@@ -295,7 +296,7 @@ public interface Session
* </ul>
* @param value Number of credits, a value of 0 indicates an infinite amount of credit.
*/
- public void messageFlow(String destination, MessageCreditUnit unit, long value);
+ public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options);
/**
* Forces the broker to exhaust its credit supply.
@@ -304,7 +305,7 @@ public interface Session
*
* @param destination The destination on which the credit supply is to be exhausted.
*/
- public void messageFlush(String destination);
+ public void messageFlush(String destination, Option ... options);
/**
* On receipt of this method, the brokers set credit to zero for a given
@@ -314,7 +315,7 @@ public interface Session
*
* @param destination The destination on which to reset credit.
*/
- public void messageStop(String destination);
+ public void messageStop(String destination, Option ... options);
/**
* Acknowledge the receipt of a range of messages.
@@ -338,7 +339,7 @@ public interface Session
* failed).
* @param text String describing the reason for a message transfer rejection.
*/
- public void messageReject(RangeSet ranges, MessageRejectCode code, String text);
+ public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options);
/**
* As it is possible that the broker does not manage to reject some messages, after completion of
@@ -367,7 +368,7 @@ public interface Session
* @param ranges Ranges of messages to be acquired.
* @return Indicates the acquired messages
*/
- public Future<Acquired> messageAcquire(RangeSet ranges);
+ public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options);
/**
* Give up responsibility for processing ranges of messages.
@@ -384,21 +385,21 @@ public interface Session
/**
* Selects the session for local transaction support.
*/
- public void txSelect();
+ public void txSelect(Option ... options);
/**
* Commit the receipt and delivery of all messages exchanged by this session's resources.
*
* @throws IllegalStateException If this session is not transacted, an exception will be thrown.
*/
- public void txCommit() throws IllegalStateException;
+ public void txCommit(Option ... options) throws IllegalStateException;
/**
* Roll back the receipt and delivery of all messages exchanged by this session's resources.
*
* @throws IllegalStateException If this session is not transacted, an exception will be thrown.
*/
- public void txRollback() throws IllegalStateException;
+ public void txRollback(Option ... options) throws IllegalStateException;
//---------------------------------------------
// Queue methods
@@ -423,7 +424,7 @@ public interface Session
* declaring connection closes.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
* This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
@@ -435,7 +436,7 @@ public interface Session
* the queue. </ol>
* @param arguments Used for backward compatibility
* @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION})
+ * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE})
* @see Option
*/
public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments,
@@ -456,7 +457,8 @@ public interface Session
* routing keys depends on the exchange implementation.
* @param arguments Used for backward compatibility
*/
- public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
+ public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments,
+ Option ... options);
/**
* Unbind a queue from an exchange.
@@ -465,7 +467,7 @@ public interface Session
* @param exchangeName The name of the exchange to unbind from.
* @param routingKey Specifies the routing key of the binding to unbind.
*/
- public void exchangeUnbind(String queueName, String exchangeName, String routingKey);
+ public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options);
/**
* This method removes all messages from a queue. It does not cancel consumers. Purged messages
@@ -474,7 +476,7 @@ public interface Session
* @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
* current queue for the session, which is the last declared queue.
*/
- public void queuePurge(String queueName);
+ public void queuePurge(String queueName, Option ... options);
/**
* This method deletes a queue. When a queue is deleted any pending messages are sent to a
@@ -485,7 +487,7 @@ public interface Session
* <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
* If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option.
* </ul>
* </p>
* <p/>
@@ -494,7 +496,7 @@ public interface Session
* @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
* current queue for the session, which is the last declared queue.
* @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
- * and {@link Option#NO_OPTION})
+ * and {@link Option#NONE})
* @see Option
*/
public void queueDelete(String queueName, Option... options);
@@ -506,7 +508,7 @@ public interface Session
* @param queueName The name of the queue for which information is requested.
* @return Information on the specified queue.
*/
- public Future<QueueQueryResult> queueQuery(String queueName);
+ public Future<QueueQueryResult> queueQuery(String queueName, Option ... options);
/**
@@ -519,7 +521,7 @@ public interface Session
* @return Information on the specified binding.
*/
public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey,
- Map<String, Object> arguments);
+ Map<String, Object> arguments, Option ... options);
// --------------------------------------
// exhcange methods
@@ -536,7 +538,7 @@ public interface Session
* exchanges) are purged when a server restarts.
* <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange.
* The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NO_OPTION}: <p>This option is an empty option, and has no effect.
+ * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option</p>
*
@@ -548,7 +550,7 @@ public interface Session
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#PASSIVE}, {@link Option#NO_OPTION})
+ * {@link Option#PASSIVE}, {@link Option#NONE})
* @param arguments Used for backward compatibility
* @see Option
*/
@@ -563,12 +565,12 @@ public interface Session
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
* exchange has queue bindings the server does not delete it but raises a channel exception
* instead.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option.
* </ul>
* <p>Note that if an option is not set, it will default to false.
*
* @param exchangeName The name of exchange to be deleted.
- * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NO_OPTION}.
+ * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}.
* @see Option
*/
public void exchangeDelete(String exchangeName, Option... options);
@@ -581,7 +583,7 @@ public interface Session
* return information about the default exchange.
* @return Information on the specified exchange.
*/
- public Future<ExchangeQueryResult> exchangeQuery(String exchangeName);
+ public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options);
/**
* If the session receives a sessionClosed with an error code it
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index f7d54a681f..f7978d0d98 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -17,6 +17,8 @@ import org.apache.qpidity.transport.Option;
import org.apache.qpidity.transport.Range;
import org.apache.qpidity.transport.RangeSet;
+import static org.apache.qpidity.transport.Option.*;
+
/**
* Implements a Qpid Sesion.
*/
@@ -66,8 +68,8 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
{
super.processed(range);
}
- super.flushProcessed();
- if( accept )
+ super.flushProcessed(accept ? BATCH : NONE);
+ if (accept)
{
messageAccept(ranges);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
index e57dd08448..da6f5e7d45 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
@@ -32,15 +32,11 @@ public class ClientSessionDelegate extends SessionDelegate
// --------------------------------------------
@Override public void data(Session ssn, Data data)
{
- for (ByteBuffer b : data.getFragments())
- {
- _currentMessageListener.data(b);
- }
+ _currentMessageListener.data(data.getData());
if (data.isLast())
{
_currentMessageListener.messageReceived();
}
-
}
@Override public void header(Session ssn, Header header)