diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-07-09 13:26:54 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-09 13:26:54 +0000 |
| commit | fa6532edf09b45201f90beaeef62702b00d35947 (patch) | |
| tree | 1063db9bc554758657d6d45da2107c856ae0d804 /java/client | |
| parent | 6cddd1d794278e7e68163e88851f09553dd5123f (diff) | |
| download | qpid-python-fa6532edf09b45201f90beaeef62702b00d35947.tar.gz | |
Primarily profiling driven changes:
- added batched writes of commands/controls issued on a session
- copy fragmented frames and segments rather than trying to decode
them piecemeal, removed FragmentDecoder
- added caching for str8 encode/decode
- compute sizes as we encode by going back and filling in the amount
of bytes written rather than computing it up front
- added SYNC option to commands
- renamed NO_OPTION argument to NONE
- added a timeout to Client.java
- removed use of UUID.fromString in BasicMessageProducer_0_10.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
11 files changed, 81 insertions, 76 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index f26e5418b4..100dcf52bf 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -55,7 +55,7 @@ public class TopicListener implements MessageListener Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, new MessagePartListenerAdapter(this), - null, Option.NO_OPTION); + null, Option.NONE); // issue credits // XXX: need to be able to set to null session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 499fca3833..2d98fd0dcd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -250,9 +250,9 @@ public class AMQSession_0_10 extends AMQSession public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException { - getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION, - autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, - exclusive ? Option.EXCLUSIVE : Option.NO_OPTION); + getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE, + exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -387,7 +387,7 @@ public class AMQSession_0_10 extends AMQSession getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { @@ -477,9 +477,9 @@ public class AMQSession_0_10 extends AMQSession arguments.put("no-local", true); } getQpidSession().queueDeclare(res.toString(), null, arguments, - amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION, - amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION, - !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + amqd.isDurable() ? Option.DURABLE : Option.NONE, + !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false // We need to sync so that we get notify of an error. getQpidSession().sync(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 9d01fbfaa2..c0cfc21ee2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -453,19 +453,21 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } + UUID messageId = null; if (_disableMessageId) { message.setJMSMessageID(null); } else { + messageId = UUID.randomUUID(); StringBuilder b = new StringBuilder(39); b.append("ID:"); - b.append(UUID.randomUUID()); + b.append(messageId); message.setJMSMessageID(b.toString()); } - sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); if (message != origMessage) { @@ -484,8 +486,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, - int deliveryMode, int priority, long timeToLive, boolean mandatory, - boolean immediate, boolean wait)throws JMSException; + UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException; private void checkTemporaryDestination(AMQDestination destination) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 5eb066ec36..5e6a5b5f40 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -68,8 +68,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer * Sends a message to a given destination */ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, - int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, - boolean wait) throws JMSException + UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException { message.prepareForSending(); if (message.get010Message() == null) @@ -84,7 +84,16 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties(); MessageProperties messageProps = message.get010Message().getMessageProperties(); - // set the delivery properties + + if (messageId != null) + { + messageProps.setMessageId(messageId); + } + else if (messageProps.hasMessageId()) + { + messageProps.clearMessageId(); + } + if (!_disableTimestamps) { final long currentTime = System.currentTimeMillis(); @@ -142,13 +151,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.setContentType(contentHeaderProperties.getContentType().toString()); messageProps.setContentLength(message.getContentLength()); - // XXX: fixme - String mid = message.getJMSMessageID(); - if( mid != null ) - { - messageProps.setMessageId(UUID.fromString(mid.substring(3))); - } - AMQShortString correlationID = contentHeaderProperties.getCorrelationId(); if (correlationID != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index ff991b1a03..8ca68850eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.util.UUID; + import javax.jms.JMSException; import javax.jms.Message; @@ -65,9 +67,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer _protocolHandler.writeFrame(declare); } - void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message, - int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, - boolean wait) throws JMSException + void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, + UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException { BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), destination.getExchangeName(), diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 7a107d748b..ce2f2180b1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -76,7 +76,7 @@ public class XAResourceImpl implements XAResource _logger.debug("commit tx branch with xid: ", xid); } Future<XaResult> future = - _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION); + _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE); // now wait on the future for the result XaResult result = null; @@ -129,8 +129,8 @@ public class XAResourceImpl implements XAResource } Future<XaResult> future = _xaSession.getQpidSession() .dtxEnd(convertXid(xid), - flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, - flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE); // now wait on the future for the result XaResult result = null; try @@ -400,8 +400,8 @@ public class XAResourceImpl implements XAResource } Future<XaResult> future = _xaSession.getQpidSession() .dtxStart(convertXid(xid), - flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, - flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE); // now wait on the future for the result XaResult result = null; try diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index bc88160137..1125d9d5cb 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.TimeUnit; import org.apache.qpid.client.url.URLParser_0_10; import org.apache.qpid.jms.BrokerDetails; @@ -60,6 +59,7 @@ public class Client implements org.apache.qpidity.nclient.Connection private static Logger _logger = LoggerFactory.getLogger(Client.class); private Condition closeOk; private boolean closed = false; + private long timeout = 60000; /** * @@ -191,7 +191,7 @@ public class Client implements org.apache.qpidity.nclient.Connection try { - negotiationComplete.await(); + negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); if( connectionDelegate.getUnsupportedProtocol() != null ) { _conn.close(); @@ -202,7 +202,7 @@ public class Client implements org.apache.qpidity.nclient.Connection } catch (InterruptedException e) { - // + throw new RuntimeException(e); } finally { @@ -257,7 +257,6 @@ public class Client implements org.apache.qpidity.nclient.Connection { try { - long timeout = 60000; long start = System.currentTimeMillis(); long elapsed = 0; while (!closed && elapsed < timeout) diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java index 6f15f16470..1d9c63df4f 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java @@ -83,7 +83,7 @@ public interface DtxSession extends Session * * @param xid Specifies the xid of the transaction branch to be forgotten. */ - public void dtxForget(Xid xid); + public void dtxForget(Xid xid, Option ... options); /** * This method obtains the current transaction timeout value in seconds. If set-timeout was not @@ -93,7 +93,7 @@ public interface DtxSession extends Session * @param xid Specifies the xid of the transaction branch used for getting the timeout. * @return The current transaction timeout value in seconds. */ - public Future<GetTimeoutResult> dtxGetTimeout(Xid xid); + public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options); /** * This method prepares any message produced or consumed on behalf of xid, ready for commitment. @@ -109,14 +109,14 @@ public interface DtxSession extends Session * <p/> * xa-rbtimeout: The work represented by this transaction branch took too long. */ - public Future<XaResult> dtxPrepare(Xid xid); + public Future<XaResult> dtxPrepare(Xid xid, Option ... options); /** * This method is called to obtain a list of transaction branches that are in a prepared or * heuristically completed state. * @return a array of xids to be recovered. */ - public Future<RecoverResult> dtxRecover(); + public Future<RecoverResult> dtxRecover(Option ... options); /** * This method rolls back the work associated with xid. Any produced messages are discarded and @@ -125,7 +125,7 @@ public interface DtxSession extends Session * @param xid Specifies the xid of the transaction branch to be rolled back. * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. */ - public Future<XaResult> dtxRollback(Xid xid); + public Future<XaResult> dtxRollback(Xid xid, Option ... options); /** * Sets the specified transaction branch timeout value in seconds. @@ -133,5 +133,5 @@ public interface DtxSession extends Session * @param xid Specifies the xid of the transaction branch for setting the timeout. * @param timeout The transaction timeout value in seconds. */ - public void dtxSetTimeout(Xid xid, long timeout); + public void dtxSetTimeout(Xid xid, long timeout, Option ... options); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 717ea43654..5e18ebb026 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -65,9 +65,9 @@ public interface Session public void close(); - public void sessionDetach(byte[] name); + public void sessionDetach(byte[] name, Option ... options); - public void sessionRequestTimeout(long expiry); + public void sessionRequestTimeout(long expiry, Option ... options); public byte[] getName(); @@ -153,7 +153,8 @@ public interface Session * * @param acquireMode Indicates whether or not the transferred message has been acquired. */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode); + public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, + Option ... options); /** * Make a set of headers to be sent together with a message @@ -207,7 +208,7 @@ public interface Session * <ul> * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this * subscription can access the queue. - * <li>{@link Option#NO_OPTION}: <p> This is an empty option, and has no effect. + * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect. * </ul> * * @param queue The queue that the receiver is receiving messages from. @@ -230,7 +231,7 @@ public interface Session * @param listener The listener for this destination. To transfer large messages * use a {@link org.apache.qpidity.nclient.MessagePartListener}. * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} - * and {@link Option#NO_OPTION}. + * and {@link Option#NONE}. * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies * according to the provider's implementation. */ @@ -246,7 +247,7 @@ public interface Session * * @param destination The destination to be cancelled. */ - public void messageCancel(String destination); + public void messageCancel(String destination, Option ... options); /** * Associate a message listener with a destination. @@ -274,7 +275,7 @@ public interface Session * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul> */ - public void messageSetFlowMode(String destination, MessageFlowMode mode); + public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); /** @@ -295,7 +296,7 @@ public interface Session * </ul> * @param value Number of credits, a value of 0 indicates an infinite amount of credit. */ - public void messageFlow(String destination, MessageCreditUnit unit, long value); + public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); /** * Forces the broker to exhaust its credit supply. @@ -304,7 +305,7 @@ public interface Session * * @param destination The destination on which the credit supply is to be exhausted. */ - public void messageFlush(String destination); + public void messageFlush(String destination, Option ... options); /** * On receipt of this method, the brokers set credit to zero for a given @@ -314,7 +315,7 @@ public interface Session * * @param destination The destination on which to reset credit. */ - public void messageStop(String destination); + public void messageStop(String destination, Option ... options); /** * Acknowledge the receipt of a range of messages. @@ -338,7 +339,7 @@ public interface Session * failed). * @param text String describing the reason for a message transfer rejection. */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text); + public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); /** * As it is possible that the broker does not manage to reject some messages, after completion of @@ -367,7 +368,7 @@ public interface Session * @param ranges Ranges of messages to be acquired. * @return Indicates the acquired messages */ - public Future<Acquired> messageAcquire(RangeSet ranges); + public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options); /** * Give up responsibility for processing ranges of messages. @@ -384,21 +385,21 @@ public interface Session /** * Selects the session for local transaction support. */ - public void txSelect(); + public void txSelect(Option ... options); /** * Commit the receipt and delivery of all messages exchanged by this session's resources. * * @throws IllegalStateException If this session is not transacted, an exception will be thrown. */ - public void txCommit() throws IllegalStateException; + public void txCommit(Option ... options) throws IllegalStateException; /** * Roll back the receipt and delivery of all messages exchanged by this session's resources. * * @throws IllegalStateException If this session is not transacted, an exception will be thrown. */ - public void txRollback() throws IllegalStateException; + public void txRollback(Option ... options) throws IllegalStateException; //--------------------------------------------- // Queue methods @@ -423,7 +424,7 @@ public interface Session * declaring connection closes. * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue. * This field allows the client to assert the presence of a queue without modifying the server state. - * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option. + * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option. * </ul> * <p>In the absence of a particular option, the defaul value is false for each option * @@ -435,7 +436,7 @@ public interface Session * the queue. </ol> * @param arguments Used for backward compatibility * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION}) + * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) * @see Option */ public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, @@ -456,7 +457,8 @@ public interface Session * routing keys depends on the exchange implementation. * @param arguments Used for backward compatibility */ - public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments); + public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments, + Option ... options); /** * Unbind a queue from an exchange. @@ -465,7 +467,7 @@ public interface Session * @param exchangeName The name of the exchange to unbind from. * @param routingKey Specifies the routing key of the binding to unbind. */ - public void exchangeUnbind(String queueName, String exchangeName, String routingKey); + public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); /** * This method removes all messages from a queue. It does not cancel consumers. Purged messages @@ -474,7 +476,7 @@ public interface Session * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the * current queue for the session, which is the last declared queue. */ - public void queuePurge(String queueName); + public void queuePurge(String queueName, Option ... options); /** * This method deletes a queue. When a queue is deleted any pending messages are sent to a @@ -485,7 +487,7 @@ public interface Session * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages. * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers. * If the queue has consumers the server does does not delete it but raises a channel exception instead. - * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option. + * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option. * </ul> * </p> * <p/> @@ -494,7 +496,7 @@ public interface Session * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the * current queue for the session, which is the last declared queue. * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NO_OPTION}) + * and {@link Option#NONE}) * @see Option */ public void queueDelete(String queueName, Option... options); @@ -506,7 +508,7 @@ public interface Session * @param queueName The name of the queue for which information is requested. * @return Information on the specified queue. */ - public Future<QueueQueryResult> queueQuery(String queueName); + public Future<QueueQueryResult> queueQuery(String queueName, Option ... options); /** @@ -519,7 +521,7 @@ public interface Session * @return Information on the specified binding. */ public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey, - Map<String, Object> arguments); + Map<String, Object> arguments, Option ... options); // -------------------------------------- // exhcange methods @@ -536,7 +538,7 @@ public interface Session * exchanges) are purged when a server restarts. * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange. * The client can use this to check whether an exchange exists without modifying the server state. - * <li> {@link Option#NO_OPTION}: <p>This option is an empty option, and has no effect. + * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect. * </ul> * <p>In the absence of a particular option, the defaul value is false for each option</p> * @@ -548,7 +550,7 @@ public interface Session * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which * the message will be sent. * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NO_OPTION}) + * {@link Option#PASSIVE}, {@link Option#NONE}) * @param arguments Used for backward compatibility * @see Option */ @@ -563,12 +565,12 @@ public interface Session * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the * exchange has queue bindings the server does not delete it but raises a channel exception * instead. - * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option. + * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option. * </ul> * <p>Note that if an option is not set, it will default to false. * * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NO_OPTION}. + * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. * @see Option */ public void exchangeDelete(String exchangeName, Option... options); @@ -581,7 +583,7 @@ public interface Session * return information about the default exchange. * @return Information on the specified exchange. */ - public Future<ExchangeQueryResult> exchangeQuery(String exchangeName); + public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options); /** * If the session receives a sessionClosed with an error code it diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index f7d54a681f..f7978d0d98 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -17,6 +17,8 @@ import org.apache.qpidity.transport.Option; import org.apache.qpidity.transport.Range; import org.apache.qpidity.transport.RangeSet; +import static org.apache.qpidity.transport.Option.*; + /** * Implements a Qpid Sesion. */ @@ -66,8 +68,8 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { super.processed(range); } - super.flushProcessed(); - if( accept ) + super.flushProcessed(accept ? BATCH : NONE); + if (accept) { messageAccept(ranges); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java index e57dd08448..da6f5e7d45 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java @@ -32,15 +32,11 @@ public class ClientSessionDelegate extends SessionDelegate // -------------------------------------------- @Override public void data(Session ssn, Data data) { - for (ByteBuffer b : data.getFragments()) - { - _currentMessageListener.data(b); - } + _currentMessageListener.data(data.getData()); if (data.isLast()) { _currentMessageListener.messageReceived(); } - } @Override public void header(Session ssn, Header header) |
