diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-01-23 20:31:10 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-01-23 20:31:10 +0000 |
commit | 4eb87d127edb3dc44512eb63318638f91a2f0f2d (patch) | |
tree | 795628bf73a23dca8e562cefed41f3926609399e | |
parent | 11870d0fb76e74b6994ee9d2cfdae9df671673b1 (diff) | |
download | qpid-python-4eb87d127edb3dc44512eb63318638f91a2f0f2d.tar.gz |
removed XXX from resend, centralized message deliver, cleaned up exception handling, added per channel max frame size
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499121 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 215 insertions, 186 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7c161522ca..4b2c3f3beb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,11 +20,15 @@ */ package org.apache.qpid.server; -import org.apache.qpid.framing.Content; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.Content; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.framing.MessageCloseBody; @@ -47,6 +51,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -74,6 +79,7 @@ public class AMQChannel private RequestManager _requestManager; private ResponseManager _responseManager; + private AMQProtocolSession _session; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -94,16 +100,9 @@ public class AMQChannel private int _consumerTag; /** - * The set of current messages - which may be partial in the sense that not all frames have been received yet - - * which has been received by this channel. As the frames are received the references get updated and once all - * frames have been received the message can then be routed. - */ - private Map<String, List<AMQMessage>> _messages = new LinkedHashMap(); - - /** * The set of open references on this channel. */ - private Map<String, List<MessageAppendBody>> _references = new LinkedHashMap(); + private Map<String, Reference> _references = new LinkedHashMap(); /** * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. @@ -129,16 +128,17 @@ public class AMQChannel private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); private Set<Long> _browsedAcks = new HashSet<Long>(); - public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolWriter, AMQMethodListener methodListener) - throws AMQException + // XXX: clean up arguments + public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener) { _channelId = channelId; + _session = session; _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; - _requestManager = new RequestManager(channelId, protocolWriter, true); - _responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true); + _requestManager = new RequestManager(channelId, _session, true); + _responseManager = new ResponseManager(channelId, methodListener, _session, true); _txnBuffer = new TxnBuffer(_messageStore); } @@ -189,51 +189,54 @@ public class AMQChannel public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException { - AMQMessage message = new AMQMessage(_messageStore, transferBody); - message.setPublisher(publisher); Content body = transferBody.getBody(); + AMQMessage message; switch (body.getContentType()) { case INLINE_T: + message = new AMQMessage(_messageStore, transferBody, + Collections.singletonList(body.getContent())); + message.setPublisher(publisher); route(message); break; case REF_T: - getMessages(body.getContentAsByteArray()).add(message); + Reference ref = getReference(body.getContentAsByteArray()); + message = new AMQMessage(_messageStore, transferBody, ref.contents); + message.setPublisher(publisher); + ref.messages.add(message); break; } } - private List<AMQMessage> getMessages(byte[] reference) { - String key = new String(reference); - List<AMQMessage> result = _messages.get(key); - if (result == null) { - throw new IllegalArgumentException(key); - } - return result; + private static String key(byte[] id) { + return new String(id); } - private List<MessageAppendBody> getReference(byte[] reference) { - String key = new String(reference); - List<MessageAppendBody> result = _references.get(key); - if (result == null) { + private Reference getReference(byte[] id) { + String key = key(id); + Reference ref = _references.get(key); + if (ref == null) { throw new IllegalArgumentException(key); } - return result; + return ref; } - private void createReference(byte[] reference) { - String key = new String(reference); + private Reference createReference(byte[] id) { + String key = key(id); if (_references.containsKey(key)) { throw new IllegalArgumentException(key); - } else { - _references.put(key, new LinkedList()); - _messages.put(key, new LinkedList()); } + Reference ref = new Reference(); + _references.put(key, ref); + return ref; } - private void clearReference(byte[] reference) { - String key = new String(reference); - _references.remove(key); - _messages.remove(key); + private Reference removeReference(byte[] id) { + String key = key(id); + Reference ref = _references.remove(key); + if (ref == null) { + throw new IllegalArgumentException(key); + } + return ref; } public void addMessageOpen(MessageOpenBody open) { @@ -241,18 +244,48 @@ public class AMQChannel } public void addMessageAppend(MessageAppendBody append) { - getReference(append.reference).add(append); + Reference ref = getReference(append.reference); + ref.contents.add(ByteBuffer.wrap(append.bytes)); } public void addMessageClose(MessageCloseBody close) throws AMQException { - List<AMQMessage> messages = getMessages(close.reference); - try { - for (AMQMessage msg : messages) { - route(msg); + Reference ref = removeReference(close.reference); + for (AMQMessage msg : ref.messages) { + route(msg); + } + } + + public void deliver(AMQMessage msg, String destination, final long deliveryTag) { + deliver(msg, destination, new AMQMethodListener() { + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { + AMQMethodBody method = evt.getMethod(); + if (_log.isDebugEnabled()) { + _log.debug(method + " received on channel " + _channelId); + } + // XXX: multiple? + if (method instanceof MessageOkBody) { + acknowledgeMessage(deliveryTag, false); + return true; + } else { + // TODO: implement reject + return false; + } } - } finally { - clearReference(close.reference); + public void error(Exception e) {} + }); + } + + public void deliver(AMQMessage msg, String destination, AMQMethodListener listener) { + // XXX: should reframe if necessary to conform to max frame size + MessageTransferBody mtb = msg.getTransferBody().copy(); + mtb.destination = destination; + ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize()); + for (ByteBuffer bb : msg.getContents()) { + buf.put(bb); } + buf.flip(); + mtb.body = new Content(Content.TypeEnum.INLINE_T, buf); + _session.writeRequest(_channelId, mtb, listener); } protected void route(AMQMessage msg) throws AMQException @@ -452,7 +485,7 @@ public class AMQChannel String consumerTag = entry.getValue().consumerTag; AMQMessage msg = entry.getValue().message; msg.setRedelivered(true); - session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + deliver(msg, consumerTag, deliveryTag); } } } @@ -867,4 +900,11 @@ public class AMQChannel } } + private static class Reference { + + public List<AMQMessage> messages = new LinkedList(); + public List<ByteBuffer> contents = new LinkedList(); + + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index d607ed859a..69bf103729 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -52,5 +52,6 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C } protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED); protocolSession.initHeartbeats(body.heartbeat); + protocolSession.setFrameMax(body.getFrameMax()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 2c14323df8..1cfc90d8e0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -108,6 +108,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private boolean _closed; // maximum number of channels this session should have private long _maxNoOfChannels = 1000; + // XXX: is this spec or should this be set to the configurable default? + private long _maxFrameSize = 65536; /* AMQP Version for this session */ private byte _major; @@ -180,8 +182,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private AMQChannel createChannel(int id) throws AMQException { IApplicationRegistry registry = ApplicationRegistry.getInstance(); - AMQChannel channel = new AMQChannel(id, registry.getMessageStore(), - _exchangeRegistry, this, _stateManager); + AMQChannel channel = new AMQChannel(id, this, registry.getMessageStore(), + _exchangeRegistry, _stateManager); addChannel(channel); return channel; } @@ -302,10 +304,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) - throws AMQException { - if (!checkMethodBodyVersion(methodBody)) - throw new AMQProtocolVersionException("MethodBody version did not match version of current session."); + checkMethodBodyVersion(methodBody); AMQChannel channel = getChannel(channelNum); RequestManager requestManager = channel.getRequestManager(); return requestManager.sendRequest(methodBody, methodListener); @@ -313,23 +313,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // This version uses this session's instance of AMQStateManager as the listener public long writeRequest(int channelNum, AMQMethodBody methodBody) - throws AMQException { return writeRequest(channelNum, methodBody, _stateManager); } public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) - throws AMQException { - if (!checkMethodBodyVersion(methodBody)) - throw new AMQProtocolVersionException("MethodBody version did not match version of current session."); + checkMethodBodyVersion(methodBody); AMQChannel channel = getChannel(channelNum); ResponseManager responseManager = channel.getResponseManager(); - responseManager.sendResponse(requestId, methodBody); + try { + responseManager.sendResponse(requestId, methodBody); + } catch (RequestResponseMappingException e) { + throw new RuntimeException(e); + } } public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) - throws AMQException { writeResponse(evt.getChannelId(), evt.getRequestId(), response); } @@ -361,16 +361,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return new ArrayList<AMQChannel>(_channelMap.values()); } - public AMQChannel getChannel(int channelId) throws AMQException + public AMQChannel getChannel(int channelId) { return _channelMap.get(channelId); } - public void addChannel(AMQChannel channel) throws AMQException + public void addChannel(AMQChannel channel) { if (_closed) { - throw new AMQException("Session is closed"); + throw new IllegalStateException("Session is closed"); } _channelMap.put(channel.getChannelId(), channel); @@ -538,6 +538,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** + * Set the negotiated maximum frame size for this connection. + * @param size the size in bytes + */ + public void setFrameMax(long size) { + _maxFrameSize = size; + } + + /** + * Gets the negotiaed maximum frame size for this connection. + * @return the size in bytes + */ + public long getFrameMax() { + return _maxFrameSize; + } + + /** * Closes all channels that were opened by this protocol session. This frees up all resources * used by the channel. * @@ -649,9 +665,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _major == major && _minor == minor; } - - public boolean checkMethodBodyVersion(AMQMethodBody methodBody) - { - return versionEquals(methodBody.getMajor(), methodBody.getMinor()); + + public void checkMethodBodyVersion(AMQMethodBody methodBody) { + if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) { + throw new RuntimeException("MethodBody version did not match version of current session."); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index f9e5439890..7f7fcf20a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -82,8 +82,7 @@ public interface AMQProtocolSession extends AMQProtocolWriter void closeSessionResponse(long requestId) throws AMQException; - void closeSession() throws AMQException; - + /** * Remove a channel from the session but do not close it. * @param channelId @@ -97,6 +96,24 @@ public interface AMQProtocolSession extends AMQProtocolWriter void initHeartbeats(int delay); /** + * Set the maximum frame size for this client. + * @param size the size in bytes + */ + void setFrameMax(long size); + + /** + * Get the maximum frame size for this client. + * @return the size in bytes + */ + long getFrameMax(); + + /** + * This must be called when the session is _closed in order to free up any resources + * managed by the session. + */ + void closeSession() throws AMQException; + + /** * @return a key that uniquely identifies this session */ Object getKey(); @@ -131,5 +148,5 @@ public interface AMQProtocolSession extends AMQProtocolWriter byte getMajor(); byte getMinor(); boolean versionEquals(byte major, byte minor); - boolean checkMethodBodyVersion(AMQMethodBody methodBody); + void checkMethodBodyVersion(AMQMethodBody methodBody); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 9a228b48d4..e80312106a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -51,7 +51,7 @@ public class AMQMessage private final MessageTransferBody _transferBody; - private List<MessageAppendBody> _contentBodies; + private List<ByteBuffer> _contents; private boolean _redelivered; @@ -101,34 +101,34 @@ public class AMQMessage _messageId = messageStore.getNewMessageId(); _transferBody = transferBody; _store = messageStore; - _contentBodies = new LinkedList<MessageAppendBody>(); + _contents = new LinkedList(); _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); _storeWhenComplete = storeWhenComplete; _taken = new AtomicBoolean(false); } public AMQMessage(MessageStore store, long messageId, MessageTransferBody transferBody, - List<MessageAppendBody> contentBodies) + List<ByteBuffer> contents) throws AMQException { _transferBody = transferBody; - _contentBodies = contentBodies; + _contents = contents; _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); _messageId = messageId; _store = store; storeMessage(); } - public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<MessageAppendBody> contentBodies) + public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents) throws AMQException { - this(store, store.getNewMessageId(), transferBody, contentBodies); + this(store, store.getNewMessageId(), transferBody, contents); } protected AMQMessage(AMQMessage msg) throws AMQException { - this(msg._store, msg._messageId, msg._transferBody, msg._contentBodies); + this(msg._store, msg._messageId, msg._transferBody, msg._contents); } public long getSize() { @@ -147,21 +147,9 @@ public class AMQMessage } public long getBodySize() { - Content body = _transferBody.getBody(); - switch (body.getContentType()) { - case INLINE_T: - return _transferBody.getBody().getContent().limit(); - case REF_T: - return getReferenceSize(); - default: - throw new IllegalStateException("unrecognized type: " + body.getContentType()); - } - } - - public long getReferenceSize() { long size = 0; - for (MessageAppendBody mab : _contentBodies) { - size += mab.getBytes().length; + for (ByteBuffer buffer : _contents) { + size += buffer.limit(); } return size; } @@ -258,56 +246,17 @@ public class AMQMessage } } - public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel) - { - AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()]; - - if (true) throw new Error("XXX"); - /*allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); - for (int i = 1; i < allFrames.length; i++) - { - allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1)); - }*/ - return new CompositeAMQDataBlock(encodedDeliverBody, allFrames); - } - - public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) + public MessageTransferBody getTransferBody() { - - AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; - - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - if (true) throw new Error("XXX"); - /* - allFrames[0] = MessageTransferBody.createAMQFrame(channel, - (byte)0, (byte)9, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - getExchangeName(), // exchange - _redelivered, // redelivered - getRoutingKey() // routingKey - ); - allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); - for (int i = 2; i < allFrames.length; i++) - { - allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2)); - }*/ - return new CompositeAMQDataBlock(allFrames); + return _transferBody; } - public List<AMQBody> getPayload() - { - List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size()); - payload.add(_transferBody); - payload.addAll(_contentBodies); - return payload; + public List<ByteBuffer> getContents() { + return _contents; } - public MessageTransferBody getTransferBody() - { - return _transferBody; + public List<AMQBody> getPayload() { + throw new Error("XXX"); } public boolean isAllContentReceived() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 70074e2c65..0b61f95efe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -208,13 +208,28 @@ public class SubscriptionImpl implements Subscription { if (msg != null) { - if (_isBrowser) - { - sendToBrowser(msg, queue); - } - else - { - sendToConsumer(msg, queue); + try { + if (!_isBrowser && !_acks) { + queue.dequeue(msg); + } + + synchronized(channel) { + long deliveryTag = channel.getNextDeliveryTag(); + + if (_acks) { + if (_isBrowser) { + channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); + } else { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + } + + channel.deliver(msg, consumerTag, deliveryTag); + } + } finally { + if (!_isBrowser) { + msg.setDeliveredToConsumer(); + } } } else @@ -223,7 +238,8 @@ public class SubscriptionImpl implements Subscription } } - private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + // XXX + /* private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -241,9 +257,7 @@ public class SubscriptionImpl implements Subscription ByteBuffer deliver = null; if (true) throw new Error("XXX"); //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - - protocolSession.writeFrame(frame); + channel.deliver(msg, consumerTag, null); } } @@ -273,33 +287,25 @@ public class SubscriptionImpl implements Subscription channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - // XXX: references - MessageTransferBody mtb = msg.getTransferBody().copy(); - mtb.destination = consumerTag; - try { - protocolSession.writeRequest - (channel.getChannelId(), - mtb, new AMQMethodListener() { - public boolean methodReceived(AMQMethodEvent evt) throws AMQException { - if (_logger.isDebugEnabled()) { - _logger.debug("Ack received on channel " + evt.getChannelId()); - } - // XXX: multiple - channel.acknowledgeMessage(deliveryTag, false); - return true; - } - public void error(Exception e) {} - }); - } catch (AMQException e) { - throw new RuntimeException(e); - } + channel.deliver(msg, consumerTag, new AMQMethodListener() { + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { + if (_logger.isDebugEnabled()) { + _logger.debug("Ack received on channel " + evt.getChannelId()); + } + // XXX: reject? + // XXX: multiple + channel.acknowledgeMessage(deliveryTag, false); + return true; + } + public void error(Exception e) {} + }); } } finally { msg.setDeliveredToConsumer(); } - } + }*/ public boolean isSuspended() { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 5ea2e66b35..2980a86374 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.framing.MessageAppendBody; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.RequestManager; +import org.apache.qpid.framing.RequestResponseMappingException; import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -314,25 +315,26 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) - throws AMQException { RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelNum); if (requestManager == null) - throw new AMQException("Unable to find RequestManager for channel " + channelNum); + throw new IllegalArgumentException("Unable to find RequestManager for channel " + channelNum); return requestManager.sendRequest(methodBody, methodListener); } public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) - throws AMQException { ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelNum); if (responseManager == null) - throw new AMQException("Unable to find ResponseManager for channel " + channelNum); - responseManager.sendResponse(requestId, methodBody); + throw new IllegalArgumentException("Unable to find ResponseManager for channel " + channelNum); + try { + responseManager.sendResponse(requestId, methodBody); + } catch (RequestResponseMappingException e) { + throw new RuntimeException(e); + } } public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) - throws AMQException { writeResponse(evt.getChannelId(), evt.getRequestId(), response); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 01da4ab357..e2b388e174 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -61,7 +61,6 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession } public AMQChannel getChannel(int channelId) - throws AMQException { AMQChannel channel = super.getChannel(channelId); if (isPeerSession() && channel == null) @@ -102,21 +101,19 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession */ private class OneUseChannel extends AMQChannel { - public OneUseChannel(int channelId, AMQProtocolWriter protocolWriter, + public OneUseChannel(int channelId, AMQProtocolSession session, AMQMethodListener methodListener) - throws AMQException { - this(channelId, ApplicationRegistry.getInstance(), protocolWriter, methodListener); + this(channelId, session, ApplicationRegistry.getInstance(), methodListener); } - public OneUseChannel(int channelId, IApplicationRegistry registry, - AMQProtocolWriter protocolWriter, AMQMethodListener methodListener) - throws AMQException + public OneUseChannel(int channelId, AMQProtocolSession session, IApplicationRegistry registry, + AMQMethodListener methodListener) { super(channelId, + session, registry.getMessageStore(), registry.getExchangeRegistry(), - protocolWriter, methodListener); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java index 222500aa8a..31b947d50a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/Content.java +++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -65,6 +65,7 @@ public class Content this.contentType = contentType; this.content = ByteBuffer.allocate(content.length); this.content.put(content); + this.content.flip(); } public Content(TypeEnum contentType, String contentStr) @@ -99,7 +100,7 @@ public class Content public byte[] getContentAsByteArray() { - ByteBuffer dup = content.duplicate().rewind(); + ByteBuffer dup = content.duplicate(); byte[] ba = new byte[dup.remaining()]; dup.get(ba); return ba; @@ -123,9 +124,11 @@ public class Content public void writePayload(ByteBuffer buffer) { + System.out.println("Before: " + content); EncodingUtils.writeUnsignedByte(buffer, contentType.toByte()); EncodingUtils.writeUnsignedInteger(buffer, content.remaining()); buffer.put(content); + System.out.println("After: " + content); } public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException @@ -139,6 +142,6 @@ public class Content public synchronized String toString() { - return getContent().rewind().toString(); + return getContent().toString(); } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java index 63c464e5af..d07af78544 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java @@ -35,12 +35,9 @@ public interface AMQProtocolWriter public void writeFrame(AMQDataBlock frame); public long writeRequest(int channelNum, AMQMethodBody methodBody, - AMQMethodListener methodListener) - throws AMQException; + AMQMethodListener methodListener); - public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) - throws AMQException; + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody); - public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) - throws AMQException; + public void writeResponse(AMQMethodEvent evt, AMQMethodBody response); } |