diff options
25 files changed, 205 insertions, 123 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fa4219ecd1..8b36576a30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; @@ -112,7 +113,7 @@ public class AMQChannel * A context used by the message store enabling it to track context for a given channel even across * thread boundaries */ - private final StoreContext _storeContext = new StoreContext(); + private final StoreContext _storeContext; private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); @@ -120,12 +121,16 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + private final AMQProtocolSession _session; - public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { + _session = session; _channelId = channelId; + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; @@ -338,7 +343,8 @@ public class AMQChannel _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); - _txnContext.commit(); + _txnContext.commit(); + } private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException @@ -386,8 +392,10 @@ public class AMQChannel _txnContext.deliver(unacked.message, unacked.queue); } } + } + /** * Called to resend all outstanding unacknowledged messages to this same channel. */ @@ -403,7 +411,7 @@ public class AMQChannel AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); - if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended()) { msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); } @@ -417,6 +425,7 @@ public class AMQChannel msgToRequeue.add(message); } } + // false means continue processing return false; } @@ -430,6 +439,7 @@ public class AMQChannel { _txnContext.deliver(message.message, message.queue); _unacknowledgedMessageMap.remove(message.deliveryTag); + message.message.decrementReference(_storeContext); } } @@ -559,6 +569,8 @@ public class AMQChannel public void rollback() throws AMQException { _txnContext.rollback(); + + } public String toString() diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index bbfab8132c..c987c12154 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -100,6 +100,7 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (UnacknowledgedMessage msg : _unacked) { + msg.restoreTransientMessageData(); msg.discard(storeContext); } } @@ -112,6 +113,7 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (UnacknowledgedMessage msg : _unacked) { + msg.clearTransientMessageData(); msg.message.incrementReference(); } } @@ -120,6 +122,11 @@ public class TxAck implements TxnOp { //remove the unacked messages from the channels map _map.remove(_unacked); + for (UnacknowledgedMessage msg : _unacked) + { + msg.clearTransientMessageData(); + } + } public void rollback(StoreContext storeContext) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index ff3c901be5..3f2348b71b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -50,5 +50,15 @@ public class UnacknowledgedMessage } message.decrementReference(storeContext); } + + public void restoreTransientMessageData() throws AMQException + { + message.restoreTransientMessageData(); + } + + public void clearTransientMessageData() + { + message.clearTransientMessageData(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index fb198ef4f7..03fc7a3926 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -49,7 +49,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(), + final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(), virtualHost.getExchangeRegistry()); session.addChannel(channel); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index f23ec85391..be81734ae4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.server.queue; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -111,7 +108,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; } catch (AMQException e) { @@ -124,7 +121,7 @@ public class AMQMessage { try { - ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index); + ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index); return ContentBody.createAMQFrame(_channel, cb); } catch (AMQException e) @@ -141,6 +138,11 @@ public class AMQMessage } } + private StoreContext getStoreContext() + { + return _txnContext.getStoreContext(); + } + private class BodyContentIterator implements Iterator<ContentBody> { @@ -150,7 +152,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; } catch (AMQException e) { @@ -163,7 +165,7 @@ public class AMQMessage { try { - return _messageHandle.getContentBody(_messageId, ++_index); + return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index); } catch (AMQException e) { @@ -201,10 +203,11 @@ public class AMQMessage * @param factory * @throws AMQException */ - public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException + public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(messageId, store, true); + _txnContext = txnConext; _transientMessageData = null; } @@ -276,7 +279,7 @@ public class AMQMessage } else { - return _messageHandle.getContentHeaderBody(_messageId); + return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId); } } @@ -342,14 +345,16 @@ public class AMQMessage _referenceCount.incrementAndGet(); if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount); + + _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + } } /** * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. - * + * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ @@ -365,7 +370,9 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " is zero; removing message"); + _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + + } // must check if the handle is null since there may be cases where we decide to throw away a message @@ -386,7 +393,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId); + _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); if (_referenceCount.get() < 0) { Thread.dumpStack(); @@ -475,7 +482,7 @@ public class AMQMessage } else { - return _messageHandle.isPersistent(_messageId); + return _messageHandle.isPersistent(getStoreContext(),_messageId); } } @@ -504,7 +511,7 @@ public class AMQMessage } else { - pb = _messageHandle.getPublishBody(_messageId); + pb = _messageHandle.getPublishBody(getStoreContext(),_messageId); } return pb; } @@ -541,7 +548,7 @@ public class AMQMessage List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); if (_log.isDebugEnabled()) { - _log.debug("Delivering message " + _messageId); + _log.debug("Delivering message " + _messageId + " to " + destinationQueues); } try { @@ -575,7 +582,7 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(_messageId); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); if(bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, @@ -591,7 +598,7 @@ public class AMQMessage // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentBody cb = _messageHandle.getContentBody(_messageId, 0); + ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0); AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -603,7 +610,7 @@ public class AMQMessage // for(int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentBody(_messageId, i); + cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i); protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); } @@ -619,7 +626,7 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(_messageId); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); if(bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, @@ -634,7 +641,7 @@ public class AMQMessage // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentBody cb = _messageHandle.getContentBody(_messageId, 0); + ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0); AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -646,7 +653,7 @@ public class AMQMessage // for(int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentBody(_messageId, i); + cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i); protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); } @@ -749,13 +756,30 @@ public class AMQMessage } catch (AMQException e) { - _log.error(e); + _log.error(e.toString(),e); return 0; } } + + public void restoreTransientMessageData() throws AMQException + { + TransientMessageData transientMessageData = new TransientMessageData(); + transientMessageData.setPublishBody(getPublishBody()); + transientMessageData.setContentHeaderBody(getContentHeaderBody()); + transientMessageData.addBodyLength(getContentHeaderBody().getSize()); + _transientMessageData = transientMessageData; + } + + + public void clearTransientMessageData() + { + _transientMessageData = null; + } + + public String toString() { return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 6aa8f98403..210c9f01a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -35,17 +35,17 @@ import org.apache.qpid.server.store.StoreContext; */ public interface AMQMessageHandle { - ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException; + ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException; /** * @return the number of body frames associated with this message */ - int getBodyCount(Long messageId) throws AMQException; + int getBodyCount(StoreContext context, Long messageId) throws AMQException; /** * @return the size of the body */ - long getBodySize(Long messageId) throws AMQException; + long getBodySize(StoreContext context, Long messageId) throws AMQException; /** * Get a particular content body @@ -53,17 +53,17 @@ public interface AMQMessageHandle * @return a content body * @throws IllegalArgumentException if the index is invalid */ - ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException; + ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException; void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException; - BasicPublishBody getPublishBody(Long messageId) throws AMQException; + BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException; boolean isRedelivered(); void setRedelivered(boolean redelivered); - boolean isPersistent(Long messageId) throws AMQException; + boolean isPersistent(StoreContext context, Long messageId) throws AMQException; void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 5890d7b72c..79f875ce1e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -49,22 +49,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle { } - public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException { return _contentHeaderBody; } - public int getBodyCount(Long messageId) + public int getBodyCount(StoreContext context, Long messageId) { return _contentBodies.size(); } - public long getBodySize(Long messageId) throws AMQException + public long getBodySize(StoreContext context, Long messageId) throws AMQException { - return getContentHeaderBody(messageId).bodySize; + return getContentHeaderBody(context, messageId).bodySize; } - public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -80,7 +80,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle _contentBodies.add(contentBody); } - public BasicPublishBody getPublishBody(Long messageId) throws AMQException + public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException { return _publishBody; } @@ -96,10 +96,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(Long messageId) throws AMQException + public boolean isPersistent(StoreContext context, Long messageId) throws AMQException { //todo remove literal values to a constant file such as AMQConstants in common - ContentHeaderBody chb = getContentHeaderBody(messageId); + ContentHeaderBody chb = getContentHeaderBody(context, messageId); return chb.properties instanceof BasicContentHeaderProperties && ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 8e270f9772..05841ccfc0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -267,9 +267,11 @@ public class SubscriptionImpl implements Subscription if (_acks) { channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + msg.decrementReference(storeContext); } msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + } } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 161913ef15..670d895950 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -56,21 +56,21 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _messageStore = messageStore; } - public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException { ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null); if (chb == null) { - MessageMetaData mmd = loadMessageMetaData(messageId); + MessageMetaData mmd = loadMessageMetaData(context, messageId); chb = mmd.getContentHeaderBody(); } return chb; } - private MessageMetaData loadMessageMetaData(Long messageId) + private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId) throws AMQException { - MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId); populateFromMessageMetaData(mmd); return mmd; } @@ -82,11 +82,11 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody()); } - public int getBodyCount(Long messageId) throws AMQException + public int getBodyCount(StoreContext context, Long messageId) throws AMQException { if (_contentBodies == null) { - MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId); int chunkCount = mmd.getContentChunkCount(); _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount); for (int i = 0; i < chunkCount; i++) @@ -97,12 +97,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return _contentBodies.size(); } - public long getBodySize(Long messageId) throws AMQException + public long getBodySize(StoreContext context, Long messageId) throws AMQException { - return getContentHeaderBody(messageId).bodySize; + return getContentHeaderBody(context, messageId).bodySize; } - public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -113,7 +113,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle ContentBody cb = wr.get(); if (cb == null) { - cb = _messageStore.getContentBodyChunk(messageId, index); + cb = _messageStore.getContentBodyChunk(context, messageId, index); _contentBodies.set(index, new WeakReference<ContentBody>(cb)); } return cb; @@ -145,12 +145,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody); } - public BasicPublishBody getPublishBody(Long messageId) throws AMQException + public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException { BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null); if (bpb == null) { - MessageMetaData mmd = loadMessageMetaData(messageId); + MessageMetaData mmd = loadMessageMetaData(context, messageId); bpb = mmd.getPublishBody(); } @@ -167,10 +167,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(Long messageId) throws AMQException + public boolean isPersistent(StoreContext context, Long messageId) throws AMQException { //todo remove literal values to a constant file such as AMQConstants in common - ContentHeaderBody chb = getContentHeaderBody(messageId); + ContentHeaderBody chb = getContentHeaderBody(context, messageId); return chb.properties instanceof BasicContentHeaderProperties && ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 6c4ad10429..f678cea630 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -174,12 +174,12 @@ public class MemoryMessageStore implements MessageStore _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException { return _metaDataMap.get(messageId); } - public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException + public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { List<ContentBody> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index d707ece8da..7fa46eb1ca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -84,8 +84,8 @@ public interface MessageStore void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; - MessageMetaData getMessageMetaData(Long messageId) throws AMQException; + MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; - ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException; + ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index 55e5067852..2e2f2ba7d6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store; +import org.apache.log4j.Logger; + + /** * A context that the store can use to associate with a transactional context. For example, it could store * some kind of txn id. @@ -28,8 +31,22 @@ package org.apache.qpid.server.store; */ public class StoreContext { + + private static final Logger _logger = Logger.getLogger(StoreContext.class); + + private String _name; private Object _payload; + public StoreContext() + { + _name = super.toString(); + } + + public StoreContext(String name) + { + _name = name; + } + public Object getPayload() { return _payload; @@ -37,6 +54,7 @@ public class StoreContext public void setPayload(Object payload) { + _logger.debug("["+_name+"] Setting payload: " + payload); _payload = payload; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 7481a96ae4..5c915b5c84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -168,7 +168,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Starting transaction on message store"); + _log.debug("Starting transaction on message store: " + this); } _messageStore.beginTran(_storeContext); _inTran = true; @@ -179,7 +179,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Committing transactional context"); + _log.debug("Committing transactional context: " + this); } if (_ackOp != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 17ce6debbd..c04380ba8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -75,7 +75,7 @@ public class AMQBrokerDetails implements BrokerDetails } else { - URLHelper.parseError(0, transport.length(), "Unknown transport", url); + throw URLHelper.parseError(0, transport.length(), "Unknown transport", url); } } } @@ -89,7 +89,7 @@ public class AMQBrokerDetails implements BrokerDetails if (transport == null) { - URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + + throw URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, ""); } @@ -144,7 +144,7 @@ public class AMQBrokerDetails implements BrokerDetails } else { - URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, + throw URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, "Illegal character in port number", connection.toString()); } @@ -172,7 +172,7 @@ public class AMQBrokerDetails implements BrokerDetails throw(URLSyntaxException) uris; } - URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index 928aa55ea2..fea83d3128 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -68,7 +68,7 @@ public class AMQConnectionURL implements ConnectionURL String uid = AMQConnectionFactory.getUniqueClientID(); if (uid == null) { - URLHelper.parseError(-1, "Client Name not specified", fullURL); + throw URLHelper.parseError(-1, "Client Name not specified", fullURL); } else { @@ -106,7 +106,7 @@ public class AMQConnectionURL implements ConnectionURL if (userInfo == null) { - URLHelper.parseError(AMQ_PROTOCOL.length() + 3, + throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, "User information not found on url", fullURL); } else @@ -126,11 +126,11 @@ public class AMQConnectionURL implements ConnectionURL int testIndex = start + authLength; if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') { - URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + throw URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); } else { - URLHelper.parseError(-1, "Virtual host not specified", fullURL); + throw URLHelper.parseError(-1, "Virtual host not specified", fullURL); } } @@ -155,17 +155,17 @@ public class AMQConnectionURL implements ConnectionURL if (slash == -1) { - URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); } else { if (slash != 0 && fullURL.charAt(slash - 1) == ':') { - URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); } else { - URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + throw URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); } } @@ -180,7 +180,7 @@ public class AMQConnectionURL implements ConnectionURL if (colonIndex == -1) { - URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), + throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), "Null password in user information not allowed.", _url); } else diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 0698da3eba..a994dbc670 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -76,7 +76,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); - _queueName = new AMQShortString(binding.getQueueName()); + _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName()); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index bf975c426c..c05667902f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -28,6 +28,7 @@ import javax.jms.JMSException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -42,7 +43,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text /** * This constant represents the name of a property that is set when the message payload is null. */ - private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL"); + private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName(); private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); public JMSTextMessage() throws JMSException diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index fc635cc7ea..eea660c4f0 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -107,7 +107,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession public OneUseChannel(int channelId, VirtualHost virtualHost) throws AMQException { - super(channelId, + super(ClusteredProtocolSession.this,channelId, virtualHost.getMessageStore(), virtualHost.getExchangeRegistry()); } diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 04d152acf5..2ee4ce21cb 100644 --- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -26,9 +26,12 @@ import java.util.HashMap; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.log4j.Logger; public class AMQBindingURL implements BindingURL { + private static final Logger _logger = Logger.getLogger(AMQBindingURL.class); + String _url; AMQShortString _exchangeClass; AMQShortString _exchangeName; @@ -41,7 +44,7 @@ public class AMQBindingURL implements BindingURL { //format: // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - + _logger.debug("Parsing URL: " + url); _url = url; _options = new HashMap<String, String>(); @@ -73,17 +76,19 @@ public class AMQBindingURL implements BindingURL if (exchangeName == null) { - URLHelper.parseError(-1, "Exchange Name not specified.", _url); + throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); } else { setExchangeName(exchangeName); } + String queueName; + if (connection.getPath() == null || connection.getPath().equals("")) { - URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), + throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), "Destination or Queue requried", _url); } else @@ -91,7 +96,7 @@ public class AMQBindingURL implements BindingURL int slash = connection.getPath().indexOf("/", 1); if (slash == -1) { - URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), + throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), "Destination requried", _url); } else @@ -99,7 +104,10 @@ public class AMQBindingURL implements BindingURL String path = connection.getPath(); setDestinationName(path.substring(1, slash)); - setQueueName(path.substring(slash + 1)); + // We don't set queueName yet as the actual value we use depends on options set + // when we are dealing with durable subscriptions + + queueName = path.substring(slash + 1); } } @@ -108,14 +116,19 @@ public class AMQBindingURL implements BindingURL processOptions(); + // We can now call setQueueName as the URL is full parsed. + + setQueueName(queueName); + //Fragment is #string (not used) //System.out.println(connection.getFragment()); + _logger.debug("URL Parsed: " + this); } catch (URISyntaxException uris) { - URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); } } @@ -125,7 +138,7 @@ public class AMQBindingURL implements BindingURL setExchangeClass(new AMQShortString(exchangeClass)); } - private void setQueueName(String name) + private void setQueueName(String name) throws URLSyntaxException { setQueueName(new AMQShortString(name)); } @@ -155,8 +168,9 @@ public class AMQBindingURL implements BindingURL return _exchangeClass; } - public void setExchangeClass(AMQShortString exchangeClass) + private void setExchangeClass(AMQShortString exchangeClass) { + _exchangeClass = exchangeClass; } @@ -165,7 +179,7 @@ public class AMQBindingURL implements BindingURL return _exchangeName; } - public void setExchangeName(AMQShortString name) + private void setExchangeName(AMQShortString name) { _exchangeName = name; @@ -180,40 +194,43 @@ public class AMQBindingURL implements BindingURL return _destinationName; } - public void setDestinationName(AMQShortString name) + private void setDestinationName(AMQShortString name) { _destinationName = name; } public AMQShortString getQueueName() { + return _queueName; + } + + public void setQueueName(AMQShortString name) throws URLSyntaxException + { if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) { if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) { - return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); + _queueName = new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); } else { - return getDestinationName(); + throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url); + } } else { - return getDestinationName(); + _queueName = null; } } else { - return _queueName; + _queueName = name; } - } - public void setQueueName(AMQShortString name) - { - _queueName = name; + } public String getOption(String key) diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 86a8420d30..67be2db86f 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -40,29 +40,17 @@ public interface BindingURL AMQShortString getExchangeClass(); - void setExchangeClass(AMQShortString name); - AMQShortString getExchangeName(); - void setExchangeName(AMQShortString name); - AMQShortString getDestinationName(); - void setDestinationName(AMQShortString name); - AMQShortString getQueueName(); - void setQueueName(AMQShortString name); - String getOption(String key); - void setOption(String key, String value); - boolean containsOption(String key); AMQShortString getRoutingKey(); - void setRoutingKey(AMQShortString key); - String toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java index 2121346c02..806f879818 100644 --- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java +++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -114,11 +114,11 @@ public class URLHelper if (sepIndex >= options.length() || sepIndex == 0) { - parseError(valueIndex, "Unterminated option", options); + throw parseError(valueIndex, "Unterminated option", options); } else { - parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + + throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + options.charAt(sepIndex) + "'", options); } } @@ -136,14 +136,14 @@ public class URLHelper } - public static void parseError(int index, String error, String url) throws URLSyntaxException + public static URLSyntaxException parseError(int index, String error, String url) { - parseError(index, 1, error, url); + return parseError(index, 1, error, url); } - public static void parseError(int index, int length, String error, String url) throws URLSyntaxException + public static URLSyntaxException parseError(int index, int length, String error, String url) { - throw new URLSyntaxException(url, error, index, length); + return new URLSyntaxException(url, error, index, length); } public static String printOptions(HashMap<String, String> options) diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index dac0f06744..da1455294a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -58,7 +58,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase assertTrue(channelCount == 1); AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), false, new AMQShortString("test"), true, _virtualHost); - AMQChannel channel = new AMQChannel(2, _messageStore, null); + AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore, null); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); @@ -69,7 +69,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); // check APIs - AMQChannel channel3 = new AMQChannel(3, _messageStore, null); + AMQChannel channel3 = new AMQChannel(_protocolSession,3, _messageStore, null); channel3.setLocalTransactional(); _protocolSession.addChannel(channel3); _mbean.rollbackTransactions(2); @@ -89,14 +89,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase } // check if closing of session works - _protocolSession.addChannel(new AMQChannel(5, _messageStore, null)); + _protocolSession.addChannel(new AMQChannel(_protocolSession,5, _messageStore, null)); _mbean.closeConnection(); try { channelCount = _mbean.channels().size(); assertTrue(channelCount == 0); // session is now closed so adding another channel should throw an exception - _protocolSession.addChannel(new AMQChannel(6, _messageStore, null)); + _protocolSession.addChannel(new AMQChannel(_protocolSession,6, _messageStore, null)); fail(); } catch(AMQException ex) @@ -109,13 +109,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _channel = new AMQChannel(1, _messageStore, null); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); _queueRegistry = _virtualHost.getQueueRegistry(); _exchangeRegistry = _virtualHost.getExchangeRegistry(); _mockIOSession = new MockIoSession(); _protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true)); + _channel = new AMQChannel(_protocolSession,1, _messageStore, null); _protocolSession.addChannel(_channel); _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject(); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 84dde9dd6f..c35d38e4ab 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -78,8 +78,9 @@ public class AMQQueueMBeanTest extends TestCase assertFalse(mgr.hasActiveSubscribers()); assertTrue(_queueMBean.getActiveConsumerCount() == 0); - _channel = new AMQChannel(1, _messageStore, null); + _protocolSession = new MockProtocolSession(_messageStore); + _channel = new AMQChannel(_protocolSession, 1, _messageStore, null); _protocolSession.addChannel(_channel); _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index ccc7752fd3..93050af2b7 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -75,8 +75,9 @@ public class AckTest extends TestCase { super.setUp(); _messageStore = new TestableMemoryMessageStore(); - _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/); _protocolSession = new MockProtocolSession(_messageStore); + _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/); + _protocolSession.addChannel(_channel); _subscriptionManager = new SubscriptionSet(); _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 42dd1a4b74..89889ca017 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -97,12 +97,12 @@ public class SkeletonMessageStore implements MessageStore } - public MessageMetaData getMessageMetaData(Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException { return null; } - public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException + public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException { return null; } |