diff options
author | Robert Gemmell <robbie@apache.org> | 2012-03-05 12:17:54 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-03-05 12:17:54 +0000 |
commit | 2637290ed5226aadc31c44ea0a8cfb31e3a3843b (patch) | |
tree | e5b23745cc28a1201ef17198d8bcd3f01a4f5987 | |
parent | bb2137a1492b9f16cd9e2f94da58bf03e142d27d (diff) | |
download | qpid-python-2637290ed5226aadc31c44ea0a8cfb31e3a3843b.tar.gz |
QPID-3881: Ensure we only put 0-8/0-9/0-9-1 messages in the store if they are actually routable. Remove some unused and test-only methods.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297026 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 74 insertions, 85 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1724b0e4ec..588b2079f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -295,26 +296,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _currentMessage.setExpiration(); + _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); - MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); - final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd); - _currentMessage.setStoredMessage(handle); - - routeCurrentMessage(); - - - _transaction.addPostTransactionAction(new ServerTransaction.Action() - { - - public void postCommit() - { - } - - public void onRollback() - { - handle.remove(); - } - }); + _currentMessage.route(); deliverCurrentMessageIfComplete(); } @@ -346,17 +330,41 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); } - } else { + final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData()); + _currentMessage.setStoredMessage(handle); + int bodyCount = _currentMessage.getBodyCount(); + if(bodyCount > 0) + { + long bodyLengthReceived = 0; + for(int i = 0 ; i < bodyCount ; i++) + { + ContentChunk contentChunk = _currentMessage.getContentChunk(i); + handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); + bodyLengthReceived += contentChunk.getSize(); + } + } + + _transaction.addPostTransactionAction(new ServerTransaction.Action() + { + public void postCommit() + { + } + + public void onRollback() + { + handle.remove(); + } + }); + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); + updateTransactionalActivity(); + _currentMessage.getStoredMessage().flushToStore(); } } - _currentMessage.getStoredMessage().flushToStore(); - } finally { @@ -383,9 +391,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm try { - - // returns true iff the message was delivered (i.e. if all data was - // received final ContentChunk contentChunk = _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); @@ -409,11 +414,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - protected void routeCurrentMessage() throws AMQException - { - _currentMessage.route(); - } - public long getNextDeliveryTag() { return ++_deliveryTag; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index cb018f1772..c5a610c7b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -70,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private Exchange _exchange; - - private int _receivedChunkCount = 0; private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); // we keep both the original meta data object and the store reference to it just in case the @@ -132,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } - public MessageMetaData headersReceived() - { - - return headersReceived(System.currentTimeMillis()); - } - public MessageMetaData headersReceived(long currentTime) { _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); @@ -150,16 +142,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _destinationQueues; } - public int addContentBodyFrame(final ContentChunk contentChunk) - throws AMQException + public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); - - - - return _receivedChunkCount++; } public boolean allContentReceived() @@ -259,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _expiration; } - public int getReceivedChunkCount() - { - return _receivedChunkCount; - } - - public int getBodyCount() throws AMQException { return _contentChunks.size(); } - public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException + public ContentChunk getContentChunk(int index) { return _contentChunks.get(index); } @@ -330,4 +310,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes { return _connectionReference; } + + public MessageMetaData getMessageMetaData() + { + return _messageMetaData; + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 27a0462e09..7c7645e9e6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -123,7 +123,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase protected int route(Message m) throws AMQException { - m.getIncomingMessage().headersReceived(); + m.getIncomingMessage().headersReceived(System.currentTimeMillis()); m.route(exchange); if(m.getIncomingMessage().allContentReceived()) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index dfa31e131f..00c8a18d9f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -350,7 +350,7 @@ public class TopicExchangeTest extends InternalBrokerBaseCase private int routeMessage(final IncomingMessage message) throws AMQException { - MessageMetaData mmd = message.headersReceived(); + MessageMetaData mmd = message.headersReceived(System.currentTimeMillis()); message.setStoredMessage(_store.addMessage(mmd)); message.enqueue(_exchange.route(message)); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 91e35f07de..25d35aab16 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -35,6 +35,8 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.util.InternalBrokerBaseCase; import javax.management.Notification; + +import java.nio.ByteBuffer; import java.util.ArrayList; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ @@ -300,7 +302,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase messages[i] = message(false, size); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(getQueue()); - metaData[i] = messages[i].headersReceived(); + metaData[i] = messages[i].headersReceived(System.currentTimeMillis()); messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i])); messages[i].enqueue(qs); @@ -309,30 +311,29 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase for (int i = 0; i < messageCount; i++) { - messages[i].addContentBodyFrame( - new ContentChunk() - { - - private byte[] _data = new byte[(int)size]; + ContentChunk contentChunk = new ContentChunk() + { + private byte[] _data = new byte[(int)size]; - public int getSize() - { - return (int) size; - } + public int getSize() + { + return (int) size; + } - public byte[] getData() - { - return _data; - } + public byte[] getData() + { + return _data; + } - public void reduceToFit() - { + public void reduceToFit() + { + } + }; - } - }); + messages[i].addContentBodyFrame(contentChunk); + messages[i].getStoredMessage().addContent(0, ByteBuffer.wrap(contentChunk.getData())); getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage())); - } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 933eb3b84f..45933e7064 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.AMQChannel; @@ -45,6 +46,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -457,15 +459,16 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase currentMessage.enqueue(qs); // route header - MessageMetaData mmd = currentMessage.headersReceived(); - currentMessage.setStoredMessage(getMessageStore().addMessage(mmd)); + MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis()); - // Add the body so we have something to test later - currentMessage.addContentBodyFrame( - getSession().getMethodRegistry() - .getProtocolVersionMethodConverter() - .convertToContentChunk( - new ContentBody(new byte[(int) MESSAGE_SIZE]))); + // Add the message to the store so we have something to test later + currentMessage.setStoredMessage(getMessageStore().addMessage(mmd)); + ContentChunk chunk = getSession().getMethodRegistry() + .getProtocolVersionMethodConverter() + .convertToContentChunk( + new ContentBody(new byte[(int) MESSAGE_SIZE])); + currentMessage.addContentBodyFrame(chunk); + currentMessage.getStoredMessage().addContent(0, ByteBuffer.wrap(chunk.getData())); AMQMessage m = new AMQMessage(currentMessage.getStoredMessage()); for(BaseQueue q : currentMessage.getDestinationQueues()) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 1b35dfe34f..273f0dc018 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -143,7 +143,7 @@ public class AckTest extends InternalBrokerBaseCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - MessageMetaData mmd = msg.headersReceived(); + MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis()); final StoredMessage storedMessage = _messageStore.addMessage(mmd); msg.setStoredMessage(storedMessage); final AMQMessage message = new AMQMessage(storedMessage); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 9f022dcdde..79c744902d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -633,7 +633,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Send persistent message qs.add(_queue); - MessageMetaData metaData = msg.headersReceived(); + MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); StoredMessage handle = _store.addMessage(metaData); msg.setStoredMessage(handle); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 5fc074d877..d49f0586ba 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -600,7 +600,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase currentMessage.setExpiration(); - MessageMetaData mmd = currentMessage.headersReceived(); + MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis()); currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd)); currentMessage.getStoredMessage().flushToStore(); currentMessage.route(); |