summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-01-23 20:31:10 +0000
committerRafael H. Schloming <rhs@apache.org>2007-01-23 20:31:10 +0000
commit4eb87d127edb3dc44512eb63318638f91a2f0f2d (patch)
tree795628bf73a23dca8e562cefed41f3926609399e
parent11870d0fb76e74b6994ee9d2cfdae9df671673b1 (diff)
downloadqpid-python-4eb87d127edb3dc44512eb63318638f91a2f0f2d.tar.gz
removed XXX from resend, centralized message deliver, cleaned up exception handling, added per channel max frame size
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499121 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java130
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/Content.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java9
10 files changed, 215 insertions, 186 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7c161522ca..4b2c3f3beb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.framing.Content;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.MessageCloseBody;
@@ -47,6 +51,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -74,6 +79,7 @@ public class AMQChannel
private RequestManager _requestManager;
private ResponseManager _responseManager;
+ private AMQProtocolSession _session;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -94,16 +100,9 @@ public class AMQChannel
private int _consumerTag;
/**
- * The set of current messages - which may be partial in the sense that not all frames have been received yet -
- * which has been received by this channel. As the frames are received the references get updated and once all
- * frames have been received the message can then be routed.
- */
- private Map<String, List<AMQMessage>> _messages = new LinkedHashMap();
-
- /**
* The set of open references on this channel.
*/
- private Map<String, List<MessageAppendBody>> _references = new LinkedHashMap();
+ private Map<String, Reference> _references = new LinkedHashMap();
/**
* Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
@@ -129,16 +128,17 @@ public class AMQChannel
private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
private Set<Long> _browsedAcks = new HashSet<Long>();
- public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
- throws AMQException
+ // XXX: clean up arguments
+ public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
{
_channelId = channelId;
+ _session = session;
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, protocolWriter, true);
- _responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true);
+ _requestManager = new RequestManager(channelId, _session, true);
+ _responseManager = new ResponseManager(channelId, methodListener, _session, true);
_txnBuffer = new TxnBuffer(_messageStore);
}
@@ -189,51 +189,54 @@ public class AMQChannel
public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException
{
- AMQMessage message = new AMQMessage(_messageStore, transferBody);
- message.setPublisher(publisher);
Content body = transferBody.getBody();
+ AMQMessage message;
switch (body.getContentType()) {
case INLINE_T:
+ message = new AMQMessage(_messageStore, transferBody,
+ Collections.singletonList(body.getContent()));
+ message.setPublisher(publisher);
route(message);
break;
case REF_T:
- getMessages(body.getContentAsByteArray()).add(message);
+ Reference ref = getReference(body.getContentAsByteArray());
+ message = new AMQMessage(_messageStore, transferBody, ref.contents);
+ message.setPublisher(publisher);
+ ref.messages.add(message);
break;
}
}
- private List<AMQMessage> getMessages(byte[] reference) {
- String key = new String(reference);
- List<AMQMessage> result = _messages.get(key);
- if (result == null) {
- throw new IllegalArgumentException(key);
- }
- return result;
+ private static String key(byte[] id) {
+ return new String(id);
}
- private List<MessageAppendBody> getReference(byte[] reference) {
- String key = new String(reference);
- List<MessageAppendBody> result = _references.get(key);
- if (result == null) {
+ private Reference getReference(byte[] id) {
+ String key = key(id);
+ Reference ref = _references.get(key);
+ if (ref == null) {
throw new IllegalArgumentException(key);
}
- return result;
+ return ref;
}
- private void createReference(byte[] reference) {
- String key = new String(reference);
+ private Reference createReference(byte[] id) {
+ String key = key(id);
if (_references.containsKey(key)) {
throw new IllegalArgumentException(key);
- } else {
- _references.put(key, new LinkedList());
- _messages.put(key, new LinkedList());
}
+ Reference ref = new Reference();
+ _references.put(key, ref);
+ return ref;
}
- private void clearReference(byte[] reference) {
- String key = new String(reference);
- _references.remove(key);
- _messages.remove(key);
+ private Reference removeReference(byte[] id) {
+ String key = key(id);
+ Reference ref = _references.remove(key);
+ if (ref == null) {
+ throw new IllegalArgumentException(key);
+ }
+ return ref;
}
public void addMessageOpen(MessageOpenBody open) {
@@ -241,18 +244,48 @@ public class AMQChannel
}
public void addMessageAppend(MessageAppendBody append) {
- getReference(append.reference).add(append);
+ Reference ref = getReference(append.reference);
+ ref.contents.add(ByteBuffer.wrap(append.bytes));
}
public void addMessageClose(MessageCloseBody close) throws AMQException {
- List<AMQMessage> messages = getMessages(close.reference);
- try {
- for (AMQMessage msg : messages) {
- route(msg);
+ Reference ref = removeReference(close.reference);
+ for (AMQMessage msg : ref.messages) {
+ route(msg);
+ }
+ }
+
+ public void deliver(AMQMessage msg, String destination, final long deliveryTag) {
+ deliver(msg, destination, new AMQMethodListener() {
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+ AMQMethodBody method = evt.getMethod();
+ if (_log.isDebugEnabled()) {
+ _log.debug(method + " received on channel " + _channelId);
+ }
+ // XXX: multiple?
+ if (method instanceof MessageOkBody) {
+ acknowledgeMessage(deliveryTag, false);
+ return true;
+ } else {
+ // TODO: implement reject
+ return false;
+ }
}
- } finally {
- clearReference(close.reference);
+ public void error(Exception e) {}
+ });
+ }
+
+ public void deliver(AMQMessage msg, String destination, AMQMethodListener listener) {
+ // XXX: should reframe if necessary to conform to max frame size
+ MessageTransferBody mtb = msg.getTransferBody().copy();
+ mtb.destination = destination;
+ ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
+ for (ByteBuffer bb : msg.getContents()) {
+ buf.put(bb);
}
+ buf.flip();
+ mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
+ _session.writeRequest(_channelId, mtb, listener);
}
protected void route(AMQMessage msg) throws AMQException
@@ -452,7 +485,7 @@ public class AMQChannel
String consumerTag = entry.getValue().consumerTag;
AMQMessage msg = entry.getValue().message;
msg.setRedelivered(true);
- session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ deliver(msg, consumerTag, deliveryTag);
}
}
}
@@ -867,4 +900,11 @@ public class AMQChannel
}
}
+ private static class Reference {
+
+ public List<AMQMessage> messages = new LinkedList();
+ public List<ByteBuffer> contents = new LinkedList();
+
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
index d607ed859a..69bf103729 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
@@ -52,5 +52,6 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
}
protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
protocolSession.initHeartbeats(body.heartbeat);
+ protocolSession.setFrameMax(body.getFrameMax());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 2c14323df8..1cfc90d8e0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -108,6 +108,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
+ // XXX: is this spec or should this be set to the configurable default?
+ private long _maxFrameSize = 65536;
/* AMQP Version for this session */
private byte _major;
@@ -180,8 +182,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private AMQChannel createChannel(int id) throws AMQException
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
- AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
- _exchangeRegistry, this, _stateManager);
+ AMQChannel channel = new AMQChannel(id, this, registry.getMessageStore(),
+ _exchangeRegistry, _stateManager);
addChannel(channel);
return channel;
}
@@ -302,10 +304,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener)
- throws AMQException
{
- if (!checkMethodBodyVersion(methodBody))
- throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
+ checkMethodBodyVersion(methodBody);
AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
return requestManager.sendRequest(methodBody, methodListener);
@@ -313,23 +313,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// This version uses this session's instance of AMQStateManager as the listener
public long writeRequest(int channelNum, AMQMethodBody methodBody)
- throws AMQException
{
return writeRequest(channelNum, methodBody, _stateManager);
}
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
- throws AMQException
{
- if (!checkMethodBodyVersion(methodBody))
- throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
+ checkMethodBodyVersion(methodBody);
AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
- responseManager.sendResponse(requestId, methodBody);
+ try {
+ responseManager.sendResponse(requestId, methodBody);
+ } catch (RequestResponseMappingException e) {
+ throw new RuntimeException(e);
+ }
}
public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
- throws AMQException
{
writeResponse(evt.getChannelId(), evt.getRequestId(), response);
}
@@ -361,16 +361,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return new ArrayList<AMQChannel>(_channelMap.values());
}
- public AMQChannel getChannel(int channelId) throws AMQException
+ public AMQChannel getChannel(int channelId)
{
return _channelMap.get(channelId);
}
- public void addChannel(AMQChannel channel) throws AMQException
+ public void addChannel(AMQChannel channel)
{
if (_closed)
{
- throw new AMQException("Session is closed");
+ throw new IllegalStateException("Session is closed");
}
_channelMap.put(channel.getChannelId(), channel);
@@ -538,6 +538,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
+ * Set the negotiated maximum frame size for this connection.
+ * @param size the size in bytes
+ */
+ public void setFrameMax(long size) {
+ _maxFrameSize = size;
+ }
+
+ /**
+ * Gets the negotiaed maximum frame size for this connection.
+ * @return the size in bytes
+ */
+ public long getFrameMax() {
+ return _maxFrameSize;
+ }
+
+ /**
* Closes all channels that were opened by this protocol session. This frees up all resources
* used by the channel.
*
@@ -649,9 +665,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return _major == major && _minor == minor;
}
-
- public boolean checkMethodBodyVersion(AMQMethodBody methodBody)
- {
- return versionEquals(methodBody.getMajor(), methodBody.getMinor());
+
+ public void checkMethodBodyVersion(AMQMethodBody methodBody) {
+ if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) {
+ throw new RuntimeException("MethodBody version did not match version of current session.");
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index f9e5439890..7f7fcf20a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -82,8 +82,7 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void closeSessionResponse(long requestId) throws AMQException;
- void closeSession() throws AMQException;
-
+
/**
* Remove a channel from the session but do not close it.
* @param channelId
@@ -97,6 +96,24 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void initHeartbeats(int delay);
/**
+ * Set the maximum frame size for this client.
+ * @param size the size in bytes
+ */
+ void setFrameMax(long size);
+
+ /**
+ * Get the maximum frame size for this client.
+ * @return the size in bytes
+ */
+ long getFrameMax();
+
+ /**
+ * This must be called when the session is _closed in order to free up any resources
+ * managed by the session.
+ */
+ void closeSession() throws AMQException;
+
+ /**
* @return a key that uniquely identifies this session
*/
Object getKey();
@@ -131,5 +148,5 @@ public interface AMQProtocolSession extends AMQProtocolWriter
byte getMajor();
byte getMinor();
boolean versionEquals(byte major, byte minor);
- boolean checkMethodBodyVersion(AMQMethodBody methodBody);
+ void checkMethodBodyVersion(AMQMethodBody methodBody);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 9a228b48d4..e80312106a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -51,7 +51,7 @@ public class AMQMessage
private final MessageTransferBody _transferBody;
- private List<MessageAppendBody> _contentBodies;
+ private List<ByteBuffer> _contents;
private boolean _redelivered;
@@ -101,34 +101,34 @@ public class AMQMessage
_messageId = messageStore.getNewMessageId();
_transferBody = transferBody;
_store = messageStore;
- _contentBodies = new LinkedList<MessageAppendBody>();
+ _contents = new LinkedList();
_decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
_taken = new AtomicBoolean(false);
}
public AMQMessage(MessageStore store, long messageId, MessageTransferBody transferBody,
- List<MessageAppendBody> contentBodies)
+ List<ByteBuffer> contents)
throws AMQException
{
_transferBody = transferBody;
- _contentBodies = contentBodies;
+ _contents = contents;
_decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_messageId = messageId;
_store = store;
storeMessage();
}
- public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<MessageAppendBody> contentBodies)
+ public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents)
throws AMQException
{
- this(store, store.getNewMessageId(), transferBody, contentBodies);
+ this(store, store.getNewMessageId(), transferBody, contents);
}
protected AMQMessage(AMQMessage msg) throws AMQException
{
- this(msg._store, msg._messageId, msg._transferBody, msg._contentBodies);
+ this(msg._store, msg._messageId, msg._transferBody, msg._contents);
}
public long getSize() {
@@ -147,21 +147,9 @@ public class AMQMessage
}
public long getBodySize() {
- Content body = _transferBody.getBody();
- switch (body.getContentType()) {
- case INLINE_T:
- return _transferBody.getBody().getContent().limit();
- case REF_T:
- return getReferenceSize();
- default:
- throw new IllegalStateException("unrecognized type: " + body.getContentType());
- }
- }
-
- public long getReferenceSize() {
long size = 0;
- for (MessageAppendBody mab : _contentBodies) {
- size += mab.getBytes().length;
+ for (ByteBuffer buffer : _contents) {
+ size += buffer.limit();
}
return size;
}
@@ -258,56 +246,17 @@ public class AMQMessage
}
}
- public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel)
- {
- AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
-
- if (true) throw new Error("XXX");
- /*allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
- for (int i = 1; i < allFrames.length; i++)
- {
- allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1));
- }*/
- return new CompositeAMQDataBlock(encodedDeliverBody, allFrames);
- }
-
- public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
+ public MessageTransferBody getTransferBody()
{
-
- AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
-
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- if (true) throw new Error("XXX");
- /*
- allFrames[0] = MessageTransferBody.createAMQFrame(channel,
- (byte)0, (byte)9, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- getExchangeName(), // exchange
- _redelivered, // redelivered
- getRoutingKey() // routingKey
- );
- allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
- for (int i = 2; i < allFrames.length; i++)
- {
- allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
- }*/
- return new CompositeAMQDataBlock(allFrames);
+ return _transferBody;
}
- public List<AMQBody> getPayload()
- {
- List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size());
- payload.add(_transferBody);
- payload.addAll(_contentBodies);
- return payload;
+ public List<ByteBuffer> getContents() {
+ return _contents;
}
- public MessageTransferBody getTransferBody()
- {
- return _transferBody;
+ public List<AMQBody> getPayload() {
+ throw new Error("XXX");
}
public boolean isAllContentReceived()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 70074e2c65..0b61f95efe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -208,13 +208,28 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- if (_isBrowser)
- {
- sendToBrowser(msg, queue);
- }
- else
- {
- sendToConsumer(msg, queue);
+ try {
+ if (!_isBrowser && !_acks) {
+ queue.dequeue(msg);
+ }
+
+ synchronized(channel) {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (_acks) {
+ if (_isBrowser) {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ } else {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ }
+
+ channel.deliver(msg, consumerTag, deliveryTag);
+ }
+ } finally {
+ if (!_isBrowser) {
+ msg.setDeliveredToConsumer();
+ }
}
}
else
@@ -223,7 +238,8 @@ public class SubscriptionImpl implements Subscription
}
}
- private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ // XXX
+ /* private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -241,9 +257,7 @@ public class SubscriptionImpl implements Subscription
ByteBuffer deliver = null;
if (true) throw new Error("XXX");
//createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ channel.deliver(msg, consumerTag, null);
}
}
@@ -273,33 +287,25 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- // XXX: references
- MessageTransferBody mtb = msg.getTransferBody().copy();
- mtb.destination = consumerTag;
- try {
- protocolSession.writeRequest
- (channel.getChannelId(),
- mtb, new AMQMethodListener() {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
- if (_logger.isDebugEnabled()) {
- _logger.debug("Ack received on channel " + evt.getChannelId());
- }
- // XXX: multiple
- channel.acknowledgeMessage(deliveryTag, false);
- return true;
- }
- public void error(Exception e) {}
- });
- } catch (AMQException e) {
- throw new RuntimeException(e);
- }
+ channel.deliver(msg, consumerTag, new AMQMethodListener() {
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("Ack received on channel " + evt.getChannelId());
+ }
+ // XXX: reject?
+ // XXX: multiple
+ channel.acknowledgeMessage(deliveryTag, false);
+ return true;
+ }
+ public void error(Exception e) {}
+ });
}
}
finally
{
msg.setDeliveredToConsumer();
}
- }
+ }*/
public boolean isSuspended()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 5ea2e66b35..2980a86374 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -314,25 +315,26 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
public long writeRequest(int channelNum, AMQMethodBody methodBody,
AMQMethodListener methodListener)
- throws AMQException
{
RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelNum);
if (requestManager == null)
- throw new AMQException("Unable to find RequestManager for channel " + channelNum);
+ throw new IllegalArgumentException("Unable to find RequestManager for channel " + channelNum);
return requestManager.sendRequest(methodBody, methodListener);
}
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
- throws AMQException
{
ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelNum);
if (responseManager == null)
- throw new AMQException("Unable to find ResponseManager for channel " + channelNum);
- responseManager.sendResponse(requestId, methodBody);
+ throw new IllegalArgumentException("Unable to find ResponseManager for channel " + channelNum);
+ try {
+ responseManager.sendResponse(requestId, methodBody);
+ } catch (RequestResponseMappingException e) {
+ throw new RuntimeException(e);
+ }
}
public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
- throws AMQException
{
writeResponse(evt.getChannelId(), evt.getRequestId(), response);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index 01da4ab357..e2b388e174 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -61,7 +61,6 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
}
public AMQChannel getChannel(int channelId)
- throws AMQException
{
AMQChannel channel = super.getChannel(channelId);
if (isPeerSession() && channel == null)
@@ -102,21 +101,19 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
*/
private class OneUseChannel extends AMQChannel
{
- public OneUseChannel(int channelId, AMQProtocolWriter protocolWriter,
+ public OneUseChannel(int channelId, AMQProtocolSession session,
AMQMethodListener methodListener)
- throws AMQException
{
- this(channelId, ApplicationRegistry.getInstance(), protocolWriter, methodListener);
+ this(channelId, session, ApplicationRegistry.getInstance(), methodListener);
}
- public OneUseChannel(int channelId, IApplicationRegistry registry,
- AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
- throws AMQException
+ public OneUseChannel(int channelId, AMQProtocolSession session, IApplicationRegistry registry,
+ AMQMethodListener methodListener)
{
super(channelId,
+ session,
registry.getMessageStore(),
registry.getExchangeRegistry(),
- protocolWriter,
methodListener);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java
index 222500aa8a..31b947d50a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/Content.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java
@@ -65,6 +65,7 @@ public class Content
this.contentType = contentType;
this.content = ByteBuffer.allocate(content.length);
this.content.put(content);
+ this.content.flip();
}
public Content(TypeEnum contentType, String contentStr)
@@ -99,7 +100,7 @@ public class Content
public byte[] getContentAsByteArray()
{
- ByteBuffer dup = content.duplicate().rewind();
+ ByteBuffer dup = content.duplicate();
byte[] ba = new byte[dup.remaining()];
dup.get(ba);
return ba;
@@ -123,9 +124,11 @@ public class Content
public void writePayload(ByteBuffer buffer)
{
+ System.out.println("Before: " + content);
EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
buffer.put(content);
+ System.out.println("After: " + content);
}
public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
@@ -139,6 +142,6 @@ public class Content
public synchronized String toString()
{
- return getContent().rewind().toString();
+ return getContent().toString();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
index 63c464e5af..d07af78544 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
@@ -35,12 +35,9 @@ public interface AMQProtocolWriter
public void writeFrame(AMQDataBlock frame);
public long writeRequest(int channelNum, AMQMethodBody methodBody,
- AMQMethodListener methodListener)
- throws AMQException;
+ AMQMethodListener methodListener);
- public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
- throws AMQException;
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody);
- public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
- throws AMQException;
+ public void writeResponse(AMQMethodEvent evt, AMQMethodBody response);
}