diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-07-09 13:26:54 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-09 13:26:54 +0000 |
commit | fa6532edf09b45201f90beaeef62702b00d35947 (patch) | |
tree | 1063db9bc554758657d6d45da2107c856ae0d804 | |
parent | 6cddd1d794278e7e68163e88851f09553dd5123f (diff) | |
download | qpid-python-fa6532edf09b45201f90beaeef62702b00d35947.tar.gz |
Primarily profiling driven changes:
- added batched writes of commands/controls issued on a session
- copy fragmented frames and segments rather than trying to decode
them piecemeal, removed FragmentDecoder
- added caching for str8 encode/decode
- compute sizes as we encode by going back and filling in the amount
of bytes written rather than computing it up front
- added SYNC option to commands
- renamed NO_OPTION argument to NONE
- added a timeout to Client.java
- removed use of UUID.fromString in BasicMessageProducer_0_10.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675165 13f79535-47bb-0310-9956-ffa450edef68
40 files changed, 774 insertions, 694 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index f26e5418b4..100dcf52bf 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -55,7 +55,7 @@ public class TopicListener implements MessageListener Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, new MessagePartListenerAdapter(this), - null, Option.NO_OPTION); + null, Option.NONE); // issue credits // XXX: need to be able to set to null session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 499fca3833..2d98fd0dcd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -250,9 +250,9 @@ public class AMQSession_0_10 extends AMQSession public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException { - getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION, - autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, - exclusive ? Option.EXCLUSIVE : Option.NO_OPTION); + getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE, + exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -387,7 +387,7 @@ public class AMQSession_0_10 extends AMQSession getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { @@ -477,9 +477,9 @@ public class AMQSession_0_10 extends AMQSession arguments.put("no-local", true); } getQpidSession().queueDeclare(res.toString(), null, arguments, - amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION, - amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION, - !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + amqd.isDurable() ? Option.DURABLE : Option.NONE, + !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false // We need to sync so that we get notify of an error. getQpidSession().sync(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 9d01fbfaa2..c0cfc21ee2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -453,19 +453,21 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } + UUID messageId = null; if (_disableMessageId) { message.setJMSMessageID(null); } else { + messageId = UUID.randomUUID(); StringBuilder b = new StringBuilder(39); b.append("ID:"); - b.append(UUID.randomUUID()); + b.append(messageId); message.setJMSMessageID(b.toString()); } - sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait); + sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait); if (message != origMessage) { @@ -484,8 +486,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, - int deliveryMode, int priority, long timeToLive, boolean mandatory, - boolean immediate, boolean wait)throws JMSException; + UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException; private void checkTemporaryDestination(AMQDestination destination) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 5eb066ec36..5e6a5b5f40 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -68,8 +68,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer * Sends a message to a given destination */ void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, - int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, - boolean wait) throws JMSException + UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException { message.prepareForSending(); if (message.get010Message() == null) @@ -84,7 +84,16 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties(); MessageProperties messageProps = message.get010Message().getMessageProperties(); - // set the delivery properties + + if (messageId != null) + { + messageProps.setMessageId(messageId); + } + else if (messageProps.hasMessageId()) + { + messageProps.clearMessageId(); + } + if (!_disableTimestamps) { final long currentTime = System.currentTimeMillis(); @@ -142,13 +151,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer messageProps.setContentType(contentHeaderProperties.getContentType().toString()); messageProps.setContentLength(message.getContentLength()); - // XXX: fixme - String mid = message.getJMSMessageID(); - if( mid != null ) - { - messageProps.setMessageId(UUID.fromString(mid.substring(3))); - } - AMQShortString correlationID = contentHeaderProperties.getCorrelationId(); if (correlationID != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index ff991b1a03..8ca68850eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.util.UUID; + import javax.jms.JMSException; import javax.jms.Message; @@ -65,9 +67,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer _protocolHandler.writeFrame(declare); } - void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message, - int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, - boolean wait) throws JMSException + void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, + UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, + boolean immediate, boolean wait) throws JMSException { BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), destination.getExchangeName(), diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 7a107d748b..ce2f2180b1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -76,7 +76,7 @@ public class XAResourceImpl implements XAResource _logger.debug("commit tx branch with xid: ", xid); } Future<XaResult> future = - _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION); + _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE); // now wait on the future for the result XaResult result = null; @@ -129,8 +129,8 @@ public class XAResourceImpl implements XAResource } Future<XaResult> future = _xaSession.getQpidSession() .dtxEnd(convertXid(xid), - flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION, - flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION); + flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE, + flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE); // now wait on the future for the result XaResult result = null; try @@ -400,8 +400,8 @@ public class XAResourceImpl implements XAResource } Future<XaResult> future = _xaSession.getQpidSession() .dtxStart(convertXid(xid), - flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION, - flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION); + flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE, + flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE); // now wait on the future for the result XaResult result = null; try diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index bc88160137..1125d9d5cb 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.TimeUnit; import org.apache.qpid.client.url.URLParser_0_10; import org.apache.qpid.jms.BrokerDetails; @@ -60,6 +59,7 @@ public class Client implements org.apache.qpidity.nclient.Connection private static Logger _logger = LoggerFactory.getLogger(Client.class); private Condition closeOk; private boolean closed = false; + private long timeout = 60000; /** * @@ -191,7 +191,7 @@ public class Client implements org.apache.qpidity.nclient.Connection try { - negotiationComplete.await(); + negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); if( connectionDelegate.getUnsupportedProtocol() != null ) { _conn.close(); @@ -202,7 +202,7 @@ public class Client implements org.apache.qpidity.nclient.Connection } catch (InterruptedException e) { - // + throw new RuntimeException(e); } finally { @@ -257,7 +257,6 @@ public class Client implements org.apache.qpidity.nclient.Connection { try { - long timeout = 60000; long start = System.currentTimeMillis(); long elapsed = 0; while (!closed && elapsed < timeout) diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java index 6f15f16470..1d9c63df4f 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java @@ -83,7 +83,7 @@ public interface DtxSession extends Session * * @param xid Specifies the xid of the transaction branch to be forgotten. */ - public void dtxForget(Xid xid); + public void dtxForget(Xid xid, Option ... options); /** * This method obtains the current transaction timeout value in seconds. If set-timeout was not @@ -93,7 +93,7 @@ public interface DtxSession extends Session * @param xid Specifies the xid of the transaction branch used for getting the timeout. * @return The current transaction timeout value in seconds. */ - public Future<GetTimeoutResult> dtxGetTimeout(Xid xid); + public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options); /** * This method prepares any message produced or consumed on behalf of xid, ready for commitment. @@ -109,14 +109,14 @@ public interface DtxSession extends Session * <p/> * xa-rbtimeout: The work represented by this transaction branch took too long. */ - public Future<XaResult> dtxPrepare(Xid xid); + public Future<XaResult> dtxPrepare(Xid xid, Option ... options); /** * This method is called to obtain a list of transaction branches that are in a prepared or * heuristically completed state. * @return a array of xids to be recovered. */ - public Future<RecoverResult> dtxRecover(); + public Future<RecoverResult> dtxRecover(Option ... options); /** * This method rolls back the work associated with xid. Any produced messages are discarded and @@ -125,7 +125,7 @@ public interface DtxSession extends Session * @param xid Specifies the xid of the transaction branch to be rolled back. * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. */ - public Future<XaResult> dtxRollback(Xid xid); + public Future<XaResult> dtxRollback(Xid xid, Option ... options); /** * Sets the specified transaction branch timeout value in seconds. @@ -133,5 +133,5 @@ public interface DtxSession extends Session * @param xid Specifies the xid of the transaction branch for setting the timeout. * @param timeout The transaction timeout value in seconds. */ - public void dtxSetTimeout(Xid xid, long timeout); + public void dtxSetTimeout(Xid xid, long timeout, Option ... options); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 717ea43654..5e18ebb026 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -65,9 +65,9 @@ public interface Session public void close(); - public void sessionDetach(byte[] name); + public void sessionDetach(byte[] name, Option ... options); - public void sessionRequestTimeout(long expiry); + public void sessionRequestTimeout(long expiry, Option ... options); public byte[] getName(); @@ -153,7 +153,8 @@ public interface Session * * @param acquireMode Indicates whether or not the transferred message has been acquired. */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode); + public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, + Option ... options); /** * Make a set of headers to be sent together with a message @@ -207,7 +208,7 @@ public interface Session * <ul> * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this * subscription can access the queue. - * <li>{@link Option#NO_OPTION}: <p> This is an empty option, and has no effect. + * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect. * </ul> * * @param queue The queue that the receiver is receiving messages from. @@ -230,7 +231,7 @@ public interface Session * @param listener The listener for this destination. To transfer large messages * use a {@link org.apache.qpidity.nclient.MessagePartListener}. * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} - * and {@link Option#NO_OPTION}. + * and {@link Option#NONE}. * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies * according to the provider's implementation. */ @@ -246,7 +247,7 @@ public interface Session * * @param destination The destination to be cancelled. */ - public void messageCancel(String destination); + public void messageCancel(String destination, Option ... options); /** * Associate a message listener with a destination. @@ -274,7 +275,7 @@ public interface Session * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul> */ - public void messageSetFlowMode(String destination, MessageFlowMode mode); + public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); /** @@ -295,7 +296,7 @@ public interface Session * </ul> * @param value Number of credits, a value of 0 indicates an infinite amount of credit. */ - public void messageFlow(String destination, MessageCreditUnit unit, long value); + public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); /** * Forces the broker to exhaust its credit supply. @@ -304,7 +305,7 @@ public interface Session * * @param destination The destination on which the credit supply is to be exhausted. */ - public void messageFlush(String destination); + public void messageFlush(String destination, Option ... options); /** * On receipt of this method, the brokers set credit to zero for a given @@ -314,7 +315,7 @@ public interface Session * * @param destination The destination on which to reset credit. */ - public void messageStop(String destination); + public void messageStop(String destination, Option ... options); /** * Acknowledge the receipt of a range of messages. @@ -338,7 +339,7 @@ public interface Session * failed). * @param text String describing the reason for a message transfer rejection. */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text); + public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); /** * As it is possible that the broker does not manage to reject some messages, after completion of @@ -367,7 +368,7 @@ public interface Session * @param ranges Ranges of messages to be acquired. * @return Indicates the acquired messages */ - public Future<Acquired> messageAcquire(RangeSet ranges); + public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options); /** * Give up responsibility for processing ranges of messages. @@ -384,21 +385,21 @@ public interface Session /** * Selects the session for local transaction support. */ - public void txSelect(); + public void txSelect(Option ... options); /** * Commit the receipt and delivery of all messages exchanged by this session's resources. * * @throws IllegalStateException If this session is not transacted, an exception will be thrown. */ - public void txCommit() throws IllegalStateException; + public void txCommit(Option ... options) throws IllegalStateException; /** * Roll back the receipt and delivery of all messages exchanged by this session's resources. * * @throws IllegalStateException If this session is not transacted, an exception will be thrown. */ - public void txRollback() throws IllegalStateException; + public void txRollback(Option ... options) throws IllegalStateException; //--------------------------------------------- // Queue methods @@ -423,7 +424,7 @@ public interface Session * declaring connection closes. * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue. * This field allows the client to assert the presence of a queue without modifying the server state. - * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option. + * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option. * </ul> * <p>In the absence of a particular option, the defaul value is false for each option * @@ -435,7 +436,7 @@ public interface Session * the queue. </ol> * @param arguments Used for backward compatibility * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION}) + * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) * @see Option */ public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, @@ -456,7 +457,8 @@ public interface Session * routing keys depends on the exchange implementation. * @param arguments Used for backward compatibility */ - public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments); + public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments, + Option ... options); /** * Unbind a queue from an exchange. @@ -465,7 +467,7 @@ public interface Session * @param exchangeName The name of the exchange to unbind from. * @param routingKey Specifies the routing key of the binding to unbind. */ - public void exchangeUnbind(String queueName, String exchangeName, String routingKey); + public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); /** * This method removes all messages from a queue. It does not cancel consumers. Purged messages @@ -474,7 +476,7 @@ public interface Session * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the * current queue for the session, which is the last declared queue. */ - public void queuePurge(String queueName); + public void queuePurge(String queueName, Option ... options); /** * This method deletes a queue. When a queue is deleted any pending messages are sent to a @@ -485,7 +487,7 @@ public interface Session * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages. * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers. * If the queue has consumers the server does does not delete it but raises a channel exception instead. - * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option. + * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option. * </ul> * </p> * <p/> @@ -494,7 +496,7 @@ public interface Session * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the * current queue for the session, which is the last declared queue. * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NO_OPTION}) + * and {@link Option#NONE}) * @see Option */ public void queueDelete(String queueName, Option... options); @@ -506,7 +508,7 @@ public interface Session * @param queueName The name of the queue for which information is requested. * @return Information on the specified queue. */ - public Future<QueueQueryResult> queueQuery(String queueName); + public Future<QueueQueryResult> queueQuery(String queueName, Option ... options); /** @@ -519,7 +521,7 @@ public interface Session * @return Information on the specified binding. */ public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey, - Map<String, Object> arguments); + Map<String, Object> arguments, Option ... options); // -------------------------------------- // exhcange methods @@ -536,7 +538,7 @@ public interface Session * exchanges) are purged when a server restarts. * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange. * The client can use this to check whether an exchange exists without modifying the server state. - * <li> {@link Option#NO_OPTION}: <p>This option is an empty option, and has no effect. + * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect. * </ul> * <p>In the absence of a particular option, the defaul value is false for each option</p> * @@ -548,7 +550,7 @@ public interface Session * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which * the message will be sent. * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NO_OPTION}) + * {@link Option#PASSIVE}, {@link Option#NONE}) * @param arguments Used for backward compatibility * @see Option */ @@ -563,12 +565,12 @@ public interface Session * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the * exchange has queue bindings the server does not delete it but raises a channel exception * instead. - * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option. + * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option. * </ul> * <p>Note that if an option is not set, it will default to false. * * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NO_OPTION}. + * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. * @see Option */ public void exchangeDelete(String exchangeName, Option... options); @@ -581,7 +583,7 @@ public interface Session * return information about the default exchange. * @return Information on the specified exchange. */ - public Future<ExchangeQueryResult> exchangeQuery(String exchangeName); + public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options); /** * If the session receives a sessionClosed with an error code it diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index f7d54a681f..f7978d0d98 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -17,6 +17,8 @@ import org.apache.qpidity.transport.Option; import org.apache.qpidity.transport.Range; import org.apache.qpidity.transport.RangeSet; +import static org.apache.qpidity.transport.Option.*; + /** * Implements a Qpid Sesion. */ @@ -66,8 +68,8 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { super.processed(range); } - super.flushProcessed(); - if( accept ) + super.flushProcessed(accept ? BATCH : NONE); + if (accept) { messageAccept(ranges); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java index e57dd08448..da6f5e7d45 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java @@ -32,15 +32,11 @@ public class ClientSessionDelegate extends SessionDelegate // -------------------------------------------- @Override public void data(Session ssn, Data data) { - for (ByteBuffer b : data.getFragments()) - { - _currentMessageListener.data(b); - } + _currentMessageListener.data(data.getData()); if (data.isLast()) { _currentMessageListener.messageReceived(); } - } @Override public void header(Session ssn, Header header) diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl index 46a45b0b91..5df1ef44fb 100644 --- a/java/common/Composite.tpl +++ b/java/common/Composite.tpl @@ -80,7 +80,7 @@ if pack > 0: out(" private $(PACK_TYPES[pack]) packing_flags = 0;\n"); fields = get_fields(type) -params = get_parameters(fields) +params = get_parameters(type, fields) options = get_options(fields) for f in fields: @@ -99,7 +99,7 @@ for f in fields: if f.option: continue out(" $(f.set)($(f.name));\n") -if options: +if options or base == "Method": out(""" for (int i=0; i < _options.length; i++) { switch (_options[i]) { @@ -108,7 +108,11 @@ if options: for f in options: out(" case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n") - out(""" case NO_OPTION: break; + if base == "Method": + out(""" case SYNC: this.setSync(true); break; + case BATCH: this.setBatch(true); break; +""") + out(""" case NONE: break; default: throw new IllegalArgumentException("invalid option: " + _options[i]); } } diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl index d9905c71a0..4e174619f0 100644 --- a/java/common/Invoker.tpl +++ b/java/common/Invoker.tpl @@ -15,8 +15,8 @@ from genutil import * for c in composites: name = cname(c) fields = get_fields(c) - params = get_parameters(fields) - args = get_arguments(fields) + params = get_parameters(c, fields) + args = get_arguments(c, fields) result = c["result"] if result: if not result["@type"]: @@ -32,7 +32,7 @@ for c in composites: jclass = "" out(""" - public $jresult $(dromedary(name))($(", ".join(params))) { + public final $jresult $(dromedary(name))($(", ".join(params))) { $(jreturn)invoke(new $name($(", ".join(args)))$jclass); } """) diff --git a/java/common/Option.tpl b/java/common/Option.tpl index 5fa2b95b9f..3228949d87 100644 --- a/java/common/Option.tpl +++ b/java/common/Option.tpl @@ -15,5 +15,6 @@ for c in composites: if not options.has_key(option): options[option] = None out(" $option,\n")} - NO_OPTION + BATCH, + NONE } diff --git a/java/common/genutil.py b/java/common/genutil.py index 9636a91cc3..2f1caa41c4 100644 --- a/java/common/genutil.py +++ b/java/common/genutil.py @@ -206,7 +206,7 @@ def get_fields(nd): index += 1 return fields -def get_parameters(fields): +def get_parameters(type, fields): params = [] options = False for f in fields: @@ -214,11 +214,11 @@ def get_parameters(fields): options = True else: params.append("%s %s" % (f.type, f.name)) - if options: + if options or type.name in ("control", "command"): params.append("Option ... _options") return params -def get_arguments(fields): +def get_arguments(type, fields): args = [] options = False for f in fields: @@ -226,7 +226,7 @@ def get_arguments(fields): options = True else: args.append(f.name) - if options: + if options or type.name in ("control", "command"): args.append("_options") return args diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java index 4e05aa574c..6262bd25c6 100644 --- a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java +++ b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java @@ -41,6 +41,11 @@ public class ConsoleOutput implements Sender<ByteBuffer> System.out.println(str(buf)); } + public void flush() + { + // pass + } + public void close() { System.out.println("CLOSED"); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 5f9917e30a..0055855c0a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -188,10 +188,7 @@ class ToyBroker extends SessionDelegate ssn.header(m.header); for (Data d : m.body) { - for (ByteBuffer b : d.getFragments()) - { - ssn.data(b); - } + ssn.data(d.getData()); } ssn.endData(); } @@ -245,11 +242,8 @@ class ToyBroker extends SessionDelegate for (Data d : body) { - for (ByteBuffer b : d.getFragments()) - { - sb.append(" | "); - sb.append(str(b)); - } + sb.append(" | "); + sb.append(d); } return sb.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Binary.java b/java/common/src/main/java/org/apache/qpidity/transport/Binary.java new file mode 100644 index 0000000000..1a1112d424 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/Binary.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * Binary + * + */ + +public final class Binary +{ + + private byte[] bytes; + private int offset; + private int size; + private int hash = 0; + + public Binary(byte[] bytes, int offset, int size) + { + if (offset + size > bytes.length) + { + throw new ArrayIndexOutOfBoundsException(); + } + + this.bytes = bytes; + this.offset = offset; + this.size = size; + } + + public Binary(byte[] bytes) + { + this(bytes, 0, bytes.length); + } + + public final byte[] array() + { + return bytes; + } + + public final int offset() + { + return offset; + } + + public final int size() + { + return size; + } + + public final Binary slice(int low, int high) + { + int sz; + + if (high < 0) + { + sz = size + high; + } + else + { + sz = high - low; + } + + if (sz < 0) + { + sz = 0; + } + + return new Binary(bytes, offset + low, sz); + } + + public final int hashCode() + { + if (hash == 0) + { + int hc = 0; + for (int i = 0; i < size; i++) + { + hc = 31*hc + (0xFF & bytes[offset + i]); + } + hash = hc; + } + + return hash; + } + + public final boolean equals(Object o) + { + if (!(o instanceof Binary)) + { + return false; + } + + Binary buf = (Binary) o; + if (this.size != buf.size) + { + return false; + } + + for (int i = 0; i < size; i++) + { + if (bytes[offset + i] != buf.bytes[buf.offset + i]) + { + return false; + } + } + + return true; + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index fb8918eb7b..eb37ce1590 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -56,6 +56,7 @@ public class Channel extends Invoker private Lock commandLock = new ReentrantLock(); private boolean first = true; private ByteBuffer data = null; + private boolean batch = false; public Channel(Connection connection, int channel, SessionDelegate delegate) { @@ -162,6 +163,13 @@ public class Channel extends Invoker emit(m); + if (!m.isBatch() && !m.hasPayload()) + { + connection.flush(); + } + + batch = m.isBatch(); + if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload()) { commandLock.unlock(); @@ -199,6 +207,10 @@ public class Channel extends Invoker emit(new Data(data, first, true)); first = true; data = null; + if (!batch) + { + connection.flush(); + } commandLock.unlock(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java index 9829343491..15116be1c3 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java @@ -40,7 +40,6 @@ import java.nio.ByteBuffer; * short instead of Short */ -// RA making this public until we sort out the package issues public class Connection implements Receiver<ConnectionEvent>, Sender<ConnectionEvent> { @@ -90,6 +89,12 @@ public class Connection sender.send(event); } + public void flush() + { + log.debug("FLUSH: [%s]", this); + sender.flush(); + } + public int getChannelMax() { return channelMax; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java index 4f61380809..8792518834 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -37,25 +37,20 @@ import static org.apache.qpidity.transport.util.Functions.*; public class Data implements ProtocolEvent { - private final Iterable<ByteBuffer> fragments; + private final ByteBuffer data; private final boolean first; private final boolean last; - public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last) + public Data(ByteBuffer data, boolean first, boolean last) { - this.fragments = fragments; + this.data = data; this.first = first; this.last = last; } - public Data(ByteBuffer buf, boolean first, boolean last) + public ByteBuffer getData() { - this(Collections.singletonList(buf), first, last); - } - - public Iterable<ByteBuffer> getFragments() - { - return fragments; + return data.slice(); } public boolean isFirst() @@ -82,25 +77,7 @@ public class Data implements ProtocolEvent { StringBuffer str = new StringBuffer(); str.append("Data("); - boolean first = true; - int left = 64; - for (ByteBuffer buf : getFragments()) - { - if (first) - { - first = false; - } - else - { - str.append(" | "); - } - str.append(str(buf, left)); - left -= buf.remaining(); - if (left < 0) - { - break; - } - } + str.append(str(data, 64)); str.append(")"); return str.toString(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java b/java/common/src/main/java/org/apache/qpidity/transport/Echo.java index 03d0d3e161..ed323c7eac 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Echo.java @@ -49,10 +49,7 @@ public class Echo extends SessionDelegate public void data(Session ssn, Data data) { - for (ByteBuffer buf : data.getFragments()) - { - ssn.data(buf); - } + ssn.data(data.getData()); if (data.isLast()) { ssn.endData(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java index f72ebd570c..a0605e6e66 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -42,6 +42,7 @@ public abstract class Method extends Struct implements ProtocolEvent private int id; private boolean idSet = false; private boolean sync = false; + private boolean batch = false; public final int getId() { @@ -59,11 +60,21 @@ public abstract class Method extends Struct implements ProtocolEvent return sync; } - void setSync(boolean value) + final void setSync(boolean value) { this.sync = value; } + public final boolean isBatch() + { + return batch; + } + + final void setBatch(boolean value) + { + this.batch = value; + } + public abstract boolean hasPayload(); public abstract byte getEncodedTrack(); @@ -84,26 +95,30 @@ public abstract class Method extends Struct implements ProtocolEvent public String toString() { - if (getEncodedTrack() != Frame.L4) - { - return super.toString(); - } - StringBuilder str = new StringBuilder(); - if (idSet) + if (getEncodedTrack() == Frame.L4 && idSet) { str.append("id="); str.append(id); } - if (sync) + if (sync || batch) { if (str.length() > 0) { str.append(" "); } - str.append(" [sync]"); + str.append("["); + if (sync) + { + str.append("S"); + } + if (batch) + { + str.append("B"); + } + str.append("]"); } if (str.length() > 0) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java index 6da8358bd6..ba3e67e578 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java @@ -31,6 +31,8 @@ public interface Sender<T> void send(T msg); + void flush(); + void close(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 988ac4788f..ca572119d9 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -130,7 +130,7 @@ public class Session extends Invoker log.debug("ID: [%s] %s", this.channel, id); if ((id % 65536) == 0) { - flushProcessed(true); + flushProcessed(TIMELY_REPLY); } } @@ -166,19 +166,14 @@ public class Session extends Invoker } } - public void flushProcessed() - { - flushProcessed(false); - } - - private void flushProcessed(boolean timely_reply) + public void flushProcessed(Option ... options) { RangeSet copy; synchronized (processedLock) { copy = processed.copy(); } - sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION); + sessionCompleted(copy, options); } void knownComplete(RangeSet kc) @@ -353,9 +348,7 @@ public class Session extends Invoker if (needSync && lt(maxComplete, point)) { - ExecutionSync sync = new ExecutionSync(); - sync.setSync(true); - invoke(sync); + executionSync(SYNC); } long start = System.currentTimeMillis(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index ebfc6b120f..77899ad712 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpidity.transport.Binary; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.transport.Struct; import org.apache.qpidity.transport.Type; @@ -45,6 +46,14 @@ import static org.apache.qpidity.transport.util.Functions.*; abstract class AbstractDecoder implements Decoder { + private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>() + { + @Override protected boolean removeEldestEntry(Map.Entry<Binary,String> me) + { + return size() > 4*1024; + } + }; + protected abstract byte doGet(); protected abstract void doGet(byte[] bytes); @@ -59,6 +68,13 @@ abstract class AbstractDecoder implements Decoder doGet(bytes); } + protected Binary get(int size) + { + byte[] bytes = new byte[size]; + get(bytes); + return new Binary(bytes); + } + protected short uget() { return (short) (0xFF & get()); @@ -105,11 +121,11 @@ abstract class AbstractDecoder implements Decoder return readUint64(); } - private static final String decode(byte[] bytes, String charset) + private static final String decode(byte[] bytes, int offset, int length, String charset) { try { - return new String(bytes, charset); + return new String(bytes, offset, length, charset); } catch (UnsupportedEncodingException e) { @@ -117,13 +133,22 @@ abstract class AbstractDecoder implements Decoder } } + private static final String decode(byte[] bytes, String charset) + { + return decode(bytes, 0, bytes.length, charset); + } public String readStr8() { short size = readUint8(); - byte[] bytes = new byte[size]; - get(bytes); - return decode(bytes, "UTF-8"); + Binary bin = get(size); + String str = str8cache.get(bin); + if (str == null) + { + str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8"); + str8cache.put(bin, str); + } + return str; } public String readStr16() @@ -233,7 +258,19 @@ abstract class AbstractDecoder implements Decoder public Map<String,Object> readMap() { long size = readUint32(); + + if (size == 0) + { + return null; + } + long count = readUint32(); + + if (count == 0) + { + return Collections.EMPTY_MAP; + } + Map<String,Object> result = new LinkedHashMap(); for (int i = 0; i < count; i++) { @@ -243,13 +280,26 @@ abstract class AbstractDecoder implements Decoder Object value = read(t); result.put(key, value); } + return result; } public List<Object> readList() { long size = readUint32(); + + if (size == 0) + { + return null; + } + long count = readUint32(); + + if (count == 0) + { + return Collections.EMPTY_LIST; + } + List<Object> result = new ArrayList(); for (int i = 0; i < count; i++) { @@ -264,15 +314,21 @@ abstract class AbstractDecoder implements Decoder public List<Object> readArray() { long size = readUint32(); + if (size == 0) { - return Collections.EMPTY_LIST; + return null; } byte code = get(); Type t = getType(code); long count = readUint32(); + if (count == 0) + { + return Collections.EMPTY_LIST; + } + List<Object> result = new ArrayList<Object>(); for (int i = 0; i < count; i++) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index aa90627943..8908b94ed3 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -64,10 +65,13 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(byte[].class, Type.VBIN32); } - protected Sizer sizer() + private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>() { - return new SizeEncoder(); - } + @Override protected boolean removeEldestEntry(Map.Entry<String,byte[]> me) + { + return size() > 4*1024; + } + }; protected abstract void doPut(byte b); @@ -88,6 +92,15 @@ abstract class AbstractEncoder implements Encoder put(ByteBuffer.wrap(bytes)); } + protected abstract int beginSize8(); + protected abstract void endSize8(int pos); + + protected abstract int beginSize16(); + protected abstract void endSize16(int pos); + + protected abstract int beginSize32(); + protected abstract void endSize32(int pos); + public void writeUint8(short b) { assert b < 0x100; @@ -132,23 +145,6 @@ abstract class AbstractEncoder implements Encoder writeUint64(l); } - private static final String checkLength(String s, int n) - { - if (s == null) - { - return ""; - } - - if (s.length() > n) - { - throw new IllegalArgumentException("string too long: " + s); - } - else - { - return s; - } - } - private static final byte[] encode(String s, String charset) { try @@ -163,16 +159,31 @@ abstract class AbstractEncoder implements Encoder public void writeStr8(String s) { - s = checkLength(s, 255); - writeUint8((short) s.length()); - put(ByteBuffer.wrap(encode(s, "UTF-8"))); + if (s == null) + { + s = ""; + } + + byte[] bytes = str8cache.get(s); + if (bytes == null) + { + bytes = encode(s, "UTF-8"); + str8cache.put(s, bytes); + } + writeUint8((short) bytes.length); + put(bytes); } public void writeStr16(String s) { - s = checkLength(s, 65535); - writeUint16(s.length()); - put(ByteBuffer.wrap(encode(s, "UTF-8"))); + if (s == null) + { + s = ""; + } + + byte[] bytes = encode(s, "UTF-8"); + writeUint16(bytes.length); + put(bytes); } public void writeVbin8(byte[] bytes) @@ -245,18 +256,10 @@ abstract class AbstractEncoder implements Encoder } int width = s.getSizeWidth(); + int pos = -1; if (width > 0) { - if (empty) - { - writeSize(width, 0); - } - else - { - Sizer sizer = sizer(); - s.write(sizer); - writeSize(width, sizer.size()); - } + pos = beginSize(width); } if (type > 0) @@ -265,6 +268,11 @@ abstract class AbstractEncoder implements Encoder } s.write(this); + + if (width > 0) + { + endSize(width, pos); + } } public void writeStruct32(Struct s) @@ -275,12 +283,10 @@ abstract class AbstractEncoder implements Encoder } else { - Sizer sizer = sizer(); - sizer.writeUint16(s.getEncodedType()); - s.write(sizer); - writeUint32(sizer.size()); + int pos = beginSize32(); writeUint16(s.getEncodedType()); s.write(this); + endSize32(pos); } } @@ -338,18 +344,13 @@ abstract class AbstractEncoder implements Encoder public void writeMap(Map<String,Object> map) { - if (map == null) + int pos = beginSize32(); + if (map != null) { - writeUint32(0); - return; + writeUint32(map.size()); + writeMapEntries(map); } - - Sizer sizer = sizer(); - sizer.writeMap(map); - // XXX: - 4 - writeUint32(sizer.size() - 4); - writeUint32(map.size()); - writeMapEntries(map); + endSize32(pos); } protected void writeMapEntries(Map<String,Object> map) @@ -367,12 +368,13 @@ abstract class AbstractEncoder implements Encoder public void writeList(List<Object> list) { - Sizer sizer = sizer(); - sizer.writeList(list); - // XXX: - 4 - writeUint32(sizer.size() - 4); - writeUint32(list.size()); - writeListEntries(list); + int pos = beginSize32(); + if (list != null) + { + writeUint32(list.size()); + writeListEntries(list); + } + endSize32(pos); } protected void writeListEntries(List<Object> list) @@ -387,16 +389,12 @@ abstract class AbstractEncoder implements Encoder public void writeArray(List<Object> array) { - if (array == null) + int pos = beginSize32(); + if (array != null) { - array = Collections.EMPTY_LIST; + writeArrayEntries(array); } - - Sizer sizer = sizer(); - sizer.writeArray(array); - // XXX: -4 - writeUint32(sizer.size() - 4); - writeArrayEntries(array); + endSize32(pos); } protected void writeArrayEntries(List<Object> array) @@ -458,6 +456,39 @@ abstract class AbstractEncoder implements Encoder } } + private int beginSize(int width) + { + switch (width) + { + case 1: + return beginSize8(); + case 2: + return beginSize16(); + case 4: + return beginSize32(); + default: + throw new IllegalStateException("illegal width: " + width); + } + } + + private void endSize(int width, int pos) + { + switch (width) + { + case 1: + endSize8(pos); + break; + case 2: + endSize16(pos); + break; + case 4: + endSize32(pos); + break; + default: + throw new IllegalStateException("illegal width: " + width); + } + } + private void writeBytes(Type t, byte[] bytes) { writeSize(t, bytes.length); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java index cf40cef8bf..f036fd8dee 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java @@ -23,6 +23,8 @@ package org.apache.qpidity.transport.codec; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpidity.transport.Binary; + /** * BBDecoder @@ -33,9 +35,9 @@ import java.nio.ByteOrder; public final class BBDecoder extends AbstractDecoder { - private final ByteBuffer in; + private ByteBuffer in; - public BBDecoder(ByteBuffer in) + public void init(ByteBuffer in) { this.in = in; this.in.order(ByteOrder.BIG_ENDIAN); @@ -51,6 +53,21 @@ public final class BBDecoder extends AbstractDecoder in.get(bytes); } + protected Binary get(int size) + { + if (in.hasArray()) + { + byte[] bytes = in.array(); + Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size); + in.position(in.position() + size); + return bin; + } + else + { + return super.get(size); + } + } + public boolean hasRemaining() { return in.hasRemaining(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java index 2e7b41bf42..a976d38efc 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport.codec; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -33,47 +34,194 @@ import java.nio.ByteOrder; public final class BBEncoder extends AbstractEncoder { - private final ByteBuffer out; + private ByteBuffer out; - public BBEncoder(ByteBuffer out) { - this.out = out; - this.out.order(ByteOrder.BIG_ENDIAN); + public BBEncoder(int capacity) { + out = ByteBuffer.allocate(capacity); + out.order(ByteOrder.BIG_ENDIAN); + } + + public void init() + { + out.clear(); + } + + public ByteBuffer done() + { + out.flip(); + ByteBuffer encoded = ByteBuffer.allocate(out.remaining()); + encoded.put(out); + encoded.flip(); + return encoded; + } + + private void grow(int size) + { + ByteBuffer old = out; + int capacity = old.capacity(); + out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity)); + out.order(ByteOrder.BIG_ENDIAN); + out.put(old); } protected void doPut(byte b) { - out.put(b); + try + { + out.put(b); + } + catch (BufferOverflowException e) + { + grow(1); + out.put(b); + } } protected void doPut(ByteBuffer src) { - out.put(src); + try + { + out.put(src); + } + catch (BufferOverflowException e) + { + grow(src.remaining()); + out.put(src); + } + } + + protected void put(byte[] bytes) + { + try + { + out.put(bytes); + } + catch (BufferOverflowException e) + { + grow(bytes.length); + out.put(bytes); + } } public void writeUint8(short b) { assert b < 0x100; - out.put((byte) b); + try + { + out.put((byte) b); + } + catch (BufferOverflowException e) + { + grow(1); + out.put((byte) b); + } } public void writeUint16(int s) { assert s < 0x10000; - out.putShort((short) s); + try + { + out.putShort((short) s); + } + catch (BufferOverflowException e) + { + grow(2); + out.putShort((short) s); + } } public void writeUint32(long i) { assert i < 0x100000000L; - out.putInt((int) i); + try + { + out.putInt((int) i); + } + catch (BufferOverflowException e) + { + grow(4); + out.putInt((int) i); + } } public void writeUint64(long l) { - out.putLong(l); + try + { + out.putLong(l); + } + catch (BufferOverflowException e) + { + grow(8); + out.putLong(l); + } + } + + public int beginSize8() + { + int pos = out.position(); + try + { + out.put((byte) 0); + } + catch (BufferOverflowException e) + { + grow(1); + out.put((byte) 0); + } + return pos; + } + + public void endSize8(int pos) + { + int cur = out.position(); + out.put(pos, (byte) (cur - pos - 1)); + } + + public int beginSize16() + { + int pos = out.position(); + try + { + out.putShort((short) 0); + } + catch (BufferOverflowException e) + { + grow(2); + out.putShort((short) 0); + } + return pos; + } + + public void endSize16(int pos) + { + int cur = out.position(); + out.putShort(pos, (short) (cur - pos - 2)); + } + + public int beginSize32() + { + int pos = out.position(); + try + { + out.putInt(0); + } + catch (BufferOverflowException e) + { + grow(4); + out.putInt(0); + } + return pos; + } + + public void endSize32(int pos) + { + int cur = out.position(); + out.putInt(pos, (cur - pos - 4)); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java deleted file mode 100644 index 474211ced2..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; - -import java.util.Iterator; - -import static java.lang.Math.*; - - -/** - * FragmentDecoder - * - * @author Rafael H. Schloming - */ - -public class FragmentDecoder extends AbstractDecoder -{ - - private final Iterator<ByteBuffer> fragments; - private ByteBuffer current; - - public FragmentDecoder(Iterator<ByteBuffer> fragments) - { - this.fragments = fragments; - this.current = null; - } - - public boolean hasRemaining() - { - advance(); - return current != null || fragments.hasNext(); - } - - private void advance() - { - while (current == null && fragments.hasNext()) - { - current = fragments.next(); - if (current.hasRemaining()) - { - break; - } - else - { - current = null; - } - } - } - - private void preRead() - { - advance(); - - if (current == null) - { - throw new BufferUnderflowException(); - } - } - - private void postRead() - { - if (current.remaining() == 0) - { - current = null; - } - } - - protected byte doGet() - { - preRead(); - byte b = current.get(); - postRead(); - return b; - } - - protected void doGet(byte[] bytes) - { - int remaining = bytes.length; - int offset = 0; - while (remaining > 0) - { - preRead(); - int size = min(remaining, current.remaining()); - current.get(bytes, offset, size); - offset += size; - remaining -= size; - postRead(); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java deleted file mode 100644 index 2e7e883a0b..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.nio.ByteBuffer; - -import java.util.Map; -import java.util.UUID; - -import org.apache.qpidity.transport.RangeSet; - - -/** - * SizeEncoder - * - * @author Rafael H. Schloming - */ - -public class SizeEncoder extends AbstractEncoder implements Sizer -{ - - private int size; - - public SizeEncoder() { - this(0); - } - - public SizeEncoder(int size) { - this.size = size; - } - - protected Sizer sizer() - { - return Sizer.NULL; - } - - public int getSize() { - return size; - } - - public void setSize(int size) { - this.size = size; - } - - public int size() - { - return getSize(); - } - - protected void doPut(byte b) - { - size += 1; - } - - protected void doPut(ByteBuffer src) - { - size += src.remaining(); - } - - public void writeUint8(short b) - { - size += 1; - } - - public void writeUint16(int s) - { - size += 2; - } - - public void writeUint32(long i) - { - size += 4; - } - - public void writeUint64(long l) - { - size += 8; - } - - public void writeDatetime(long l) - { - size += 8; - } - - public void writeUuid(UUID uuid) - { - size += 16; - } - - public void writeSequenceNo(int s) - { - size += 4; - } - - public void writeSequenceSet(RangeSet ranges) - { - size += 2 + 8*ranges.size(); - } - - //void writeByteRanges(RangeSet ranges); // XXX - - //void writeStr8(String s); - //void writeStr16(String s); - - //void writeVbin8(byte[] bytes); - //void writeVbin16(byte[] bytes); - //void writeVbin32(byte[] bytes); - - //void writeStruct32(Struct s); - //void writeMap(Map<String,Object> map); - //void writeList(List<Object> list); - //void writeArray(List<Object> array); - - //void writeStruct(int type, Struct s); - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java deleted file mode 100644 index d386987d64..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity.transport.codec; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpidity.transport.RangeSet; -import org.apache.qpidity.transport.Struct; - - -/** - * Sizer - * - */ - -public interface Sizer extends Encoder -{ - - public static final Sizer NULL = new Sizer() - { - public void writeUint8(short b) {} - public void writeUint16(int s) {} - public void writeUint32(long i) {} - public void writeUint64(long l) {} - - public void writeDatetime(long l) {} - public void writeUuid(UUID uuid) {} - - public void writeSequenceNo(int s) {} - public void writeSequenceSet(RangeSet ranges) {} // XXX - public void writeByteRanges(RangeSet ranges) {} // XXX - - public void writeStr8(String s) {} - public void writeStr16(String s) {} - - public void writeVbin8(byte[] bytes) {} - public void writeVbin16(byte[] bytes) {} - public void writeVbin32(byte[] bytes) {} - - public void writeStruct32(Struct s) {} - public void writeMap(Map<String,Object> map) {} - public void writeList(List<Object> list) {} - public void writeArray(List<Object> array) {} - - public void writeStruct(int type, Struct s) {} - - public int getSize() { return 0; } - - public int size() { return 0; } - }; - - int getSize(); - - int size(); - -} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java index 3a7a550573..e188c15b35 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -29,7 +29,6 @@ import java.nio.ByteBuffer; import org.apache.qpidity.transport.codec.BBDecoder; import org.apache.qpidity.transport.codec.Decoder; -import org.apache.qpidity.transport.codec.FragmentDecoder; import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.Data; @@ -52,26 +51,32 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { private final Receiver<ConnectionEvent> receiver; - private final Map<Integer,List<ByteBuffer>> segments; + private final Map<Integer,List<Frame>> segments; + private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() + { + public BBDecoder initialValue() + { + return new BBDecoder(); + } + }; public Assembler(Receiver<ConnectionEvent> receiver) { this.receiver = receiver; - segments = new HashMap<Integer,List<ByteBuffer>>(); + segments = new HashMap<Integer,List<Frame>>(); } private int segmentKey(Frame frame) { - // XXX: can this overflow? return (frame.getTrack() + 1) * frame.getChannel(); } - private List<ByteBuffer> getSegment(Frame frame) + private List<Frame> getSegment(Frame frame) { return segments.get(segmentKey(frame)); } - private void setSegment(Frame frame, List<ByteBuffer> segment) + private void setSegment(Frame frame, List<Frame> segment) { int key = segmentKey(frame); if (segments.containsKey(key)) @@ -122,7 +127,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate switch (frame.getType()) { case BODY: - emit(frame, new Data(frame, frame.isFirstFrame(), + emit(frame, new Data(frame.getBody(), frame.isFirstFrame(), frame.isLastFrame())); break; default: @@ -138,42 +143,54 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private void assemble(Frame frame) { - List<ByteBuffer> segment; - if (frame.isFirstFrame()) + ByteBuffer segment; + if (frame.isFirstFrame() && frame.isLastFrame()) { - segment = new ArrayList<ByteBuffer>(); - setSegment(frame, segment); + segment = frame.getBody(); + emit(frame, decode(frame, segment)); } else { - segment = getSegment(frame); - } + List<Frame> frames; + if (frame.isFirstFrame()) + { + frames = new ArrayList<Frame>(); + setSegment(frame, frames); + } + else + { + frames = getSegment(frame); + } - for (ByteBuffer buf : frame) - { - segment.add(buf); - } + frames.add(frame); - if (frame.isLastFrame()) - { - clearSegment(frame); - emit(frame, decode(frame, frame.getType(), segment)); + if (frame.isLastFrame()) + { + clearSegment(frame); + + int size = 0; + for (Frame f : frames) + { + size += f.getSize(); + } + segment = ByteBuffer.allocate(size); + for (Frame f : frames) + { + segment.put(f.getBody()); + } + segment.flip(); + emit(frame, decode(frame, segment)); + } } + } - private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment) + private ProtocolEvent decode(Frame frame, ByteBuffer segment) { - Decoder dec; - if (segment.size() == 1) - { - dec = new BBDecoder(segment.get(0)); - } - else - { - dec = new FragmentDecoder(segment.iterator()); - } + BBDecoder dec = decoder.get(); + dec.init(segment); - switch (type) + switch (frame.getType()) { case CONTROL: int controlType = dec.readUint16(); @@ -193,9 +210,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { structs.add(dec.readStruct32()); } - return new Header(structs,frame.isLastFrame() && frame.isLastSegment()); + return new Header(structs, frame.isLastFrame() && frame.isLastSegment()); default: - throw new IllegalStateException("unknown frame type: " + type); + throw new IllegalStateException("unknown frame type: " + frame.getType()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java index da9ba84ab0..074057df56 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -21,7 +21,6 @@ package org.apache.qpidity.transport.network; import org.apache.qpidity.transport.codec.BBEncoder; -import org.apache.qpidity.transport.codec.SizeEncoder; import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.Data; @@ -54,6 +53,13 @@ public class Disassembler implements Sender<ConnectionEvent>, private final Sender<NetworkEvent> sender; private final int maxPayload; + private final ThreadLocal<BBEncoder> encoder = new ThreadLocal() + { + public BBEncoder initialValue() + { + return new BBEncoder(4*1024); + } + }; public Disassembler(Sender<NetworkEvent> sender, int maxFrame) { @@ -72,6 +78,11 @@ public class Disassembler implements Sender<ConnectionEvent>, event.getProtocolEvent().delegate(event, this); } + public void flush() + { + sender.flush(); + } + public void close() { sender.close(); @@ -92,8 +103,7 @@ public class Disassembler implements Sender<ConnectionEvent>, first = false; } nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, track, event.getChannel()); - // frame.addFragment(buf); + Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice()); sender.send(frame); } else @@ -115,8 +125,7 @@ public class Disassembler implements Sender<ConnectionEvent>, newflags |= LAST_FRAME; } - Frame frame = new Frame(newflags, type, track, event.getChannel()); - frame.addFragment(slice); + Frame frame = new Frame(newflags, type, track, event.getChannel(), slice); sender.send(frame); } } @@ -137,18 +146,18 @@ public class Disassembler implements Sender<ConnectionEvent>, method(event, method, SegmentType.COMMAND); } - private void method(ConnectionEvent event, Method method, SegmentType type) + private ByteBuffer copy(ByteBuffer src) { - SizeEncoder sizer = new SizeEncoder(); - sizer.writeUint16(method.getEncodedType()); - if (type == SegmentType.COMMAND) - { - sizer.writeUint16(0); - } - method.write(sizer); + ByteBuffer buf = ByteBuffer.allocate(src.remaining()); + buf.put(src); + buf.flip(); + return buf; + } - ByteBuffer buf = ByteBuffer.allocate(sizer.size()); - BBEncoder enc = new BBEncoder(buf); + private void method(ConnectionEvent event, Method method, SegmentType type) + { + BBEncoder enc = encoder.get(); + enc.init(); enc.writeUint16(method.getEncodedType()); if (type == SegmentType.COMMAND) { @@ -162,7 +171,7 @@ public class Disassembler implements Sender<ConnectionEvent>, } } method.write(enc); - buf.flip(); + ByteBuffer buf = enc.done(); byte flags = FIRST_SEG; @@ -176,42 +185,29 @@ public class Disassembler implements Sender<ConnectionEvent>, public void header(ConnectionEvent event, Header header) { - ByteBuffer buf; - if( header.getBuf() == null) + ByteBuffer buf; + if (header.getBuf() == null) { - SizeEncoder sizer = new SizeEncoder(); - for (Struct st : header.getStructs()) - { - sizer.writeStruct32(st); - } - - buf = ByteBuffer.allocate(sizer.size()); - BBEncoder enc = new BBEncoder(buf); + BBEncoder enc = encoder.get(); + enc.init(); for (Struct st : header.getStructs()) { enc.writeStruct32(st); } + buf = enc.done(); header.setBuf(buf); } else { buf = header.getBuf(); + buf.flip(); } - buf.flip(); fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true); } public void data(ConnectionEvent event, Data data) { - boolean first = data.isFirst(); - for (Iterator<ByteBuffer> it = data.getFragments().iterator(); - it.hasNext(); ) - { - ByteBuffer buf = it.next(); - boolean last = data.isLast() && !it.hasNext(); - fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last); - first = false; - } + fragment(LAST_SEG, SegmentType.BODY, event, data.getData(), data.isFirst(), data.isLast()); } public void error(ConnectionEvent event, ProtocolError error) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java index 2abac382e6..7b8675b39d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -38,7 +38,7 @@ import static org.apache.qpidity.transport.util.Functions.*; * @author Rafael H. Schloming */ -public final class Frame implements NetworkEvent, Iterable<ByteBuffer> +public final class Frame implements NetworkEvent { public static final int HEADER_SIZE = 12; @@ -61,23 +61,21 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> final private SegmentType type; final private byte track; final private int channel; - final private List<ByteBuffer> fragments; - private int size; + final private ByteBuffer body; - public Frame(byte flags, SegmentType type, byte track, int channel) + public Frame(byte flags, SegmentType type, byte track, int channel, + ByteBuffer body) { this.flags = flags; this.type = type; this.track = track; this.channel = channel; - this.size = 0; - this.fragments = new ArrayList<ByteBuffer>(); + this.body = body; } - public void addFragment(ByteBuffer fragment) + public ByteBuffer getBody() { - fragments.add(fragment); - size += fragment.remaining(); + return body.slice(); } public byte getFlags() @@ -92,7 +90,7 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> public int getSize() { - return size; + return body.remaining(); } public SegmentType getType() @@ -130,16 +128,6 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> return flag(LAST_FRAME); } - public Iterator<ByteBuffer> getFragments() - { - return new SliceIterator(fragments.iterator()); - } - - public Iterator<ByteBuffer> iterator() - { - return getFragments(); - } - public void delegate(NetworkDelegate delegate) { delegate.frame(this); @@ -148,26 +136,14 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer> public String toString() { StringBuilder str = new StringBuilder(); + str.append(String.format ("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(), getTrack(), getType(), isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); - boolean first = true; - for (ByteBuffer buf : this) - { - if (first) - { - first = false; - } - else - { - str.append(" | "); - } - - str.append(str(buf)); - } + str.append(str(body)); return str.toString(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index d1c03348b4..48f68e9020 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -79,7 +79,7 @@ public class InputHandler implements Receiver<ByteBuffer> private byte track; private int channel; private int size; - private Frame frame; + private ByteBuffer body; public InputHandler(Receiver<NetworkEvent> receiver, State state) { @@ -99,9 +99,9 @@ public class InputHandler implements Receiver<ByteBuffer> private void frame() { + Frame frame = new Frame(flags, type, track, channel, body); assert size == frame.getSize(); receiver.received(frame); - frame = null; } private void error(String fmt, Object ... args) @@ -191,30 +191,28 @@ public class InputHandler implements Receiver<ByteBuffer> return ERROR; } - frame = new Frame(flags, type, track, channel); if (size > buf.remaining()) { - frame.addFragment(buf.slice()); - buf.position(buf.limit()); + body = ByteBuffer.allocate(size); + body.put(buf); return FRAME_FRAGMENT; } else { - ByteBuffer payload = buf.slice(); - payload.limit(size); + body = buf.slice(); + body.limit(size); buf.position(buf.position() + size); - frame.addFragment(payload); frame(); return FRAME_HDR; } case FRAME_FRAGMENT: - int delta = size - frame.getSize(); + int delta = body.remaining(); if (delta > buf.remaining()) { - frame.addFragment(buf.slice()); - buf.position(buf.limit()); + body.put(buf); return FRAME_FRAGMENT; } else { ByteBuffer fragment = buf.slice(); fragment.limit(delta); buf.position(buf.position() + delta); - frame.addFragment(fragment); + body.put(fragment); + body.flip(); frame(); return FRAME_HDR; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index b749332fa3..e2ef8ca0d5 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -69,6 +69,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate synchronized (lock) { sender.send(header.toByteBuffer()); + sender.flush(); } } @@ -79,35 +80,43 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate frames.add(frame); bytes += HEADER_SIZE + frame.getSize(); - if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024) + if (bytes > 64*1024) { - ByteBuffer buf = ByteBuffer.allocate(bytes); - for (Frame f : frames) - { - buf.put(f.getFlags()); - buf.put((byte) f.getType().getValue()); - buf.putShort((short) (f.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(f.getTrack()); - buf.putShort((short) f.getChannel()); - // RESERVED - buf.putInt(0); - for(ByteBuffer frg : f) - { - buf.put(frg); - } - } - buf.flip(); - - frames.clear(); - bytes = 0; - - sender.send(buf); + flush(); } } } + public void flush() + { + synchronized (lock) + { + ByteBuffer buf = ByteBuffer.allocate(bytes); + int nframes = frames.size(); + for (int i = 0; i < nframes; i++) + { + Frame frame = frames.get(i); + buf.put(frame.getFlags()); + buf.put((byte) frame.getType().getValue()); + buf.putShort((short) (frame.getSize() + HEADER_SIZE)); + // RESERVED + buf.put(RESERVED); + buf.put(frame.getTrack()); + buf.putShort((short) frame.getChannel()); + // RESERVED + buf.putInt(0); + buf.put(frame.getBody()); + } + buf.flip(); + + frames.clear(); + bytes = 0; + + sender.send(buf); + sender.flush(); + } + } + public void error(ProtocolError error) { throw new IllegalStateException("XXX"); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java index 1adde531a6..d50442be5c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java @@ -55,6 +55,11 @@ public class IoSender implements Sender<java.nio.ByteBuffer> write(buf); } + public void flush() + { + // pass + } + /* The extra copying sucks. * If I know for sure that the buf is backed * by an array then I could do buf.array() diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java index f0f5731037..a53c36ae2e 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java @@ -58,6 +58,11 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> } } + public void flush() + { + // pass + } + public synchronized void close() { // MINA will sometimes throw away in-progress writes when you diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java index 2cfe6c2089..798f24b528 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java @@ -47,6 +47,11 @@ public class NioSender implements Sender<java.nio.ByteBuffer> } } + public void flush() + { + // pass + } + private void write(java.nio.ByteBuffer buf) { synchronized (lock) |