summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-03-05 12:17:54 +0000
committerRobert Gemmell <robbie@apache.org>2012-03-05 12:17:54 +0000
commit2637290ed5226aadc31c44ea0a8cfb31e3a3843b (patch)
treee5b23745cc28a1201ef17198d8bcd3f01a4f5987
parentbb2137a1492b9f16cd9e2f94da58bf03e142d27d (diff)
downloadqpid-python-2637290ed5226aadc31c44ea0a8cfb31e3a3843b.tar.gz
QPID-3881: Ensure we only put 0-8/0-9/0-9-1 messages in the store if they are actually routable. Remove some unused and test-only methods.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297026 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java29
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java39
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java19
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java2
9 files changed, 74 insertions, 85 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1724b0e4ec..588b2079f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -295,26 +296,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_currentMessage.setExpiration();
+ _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
- _currentMessage.setStoredMessage(handle);
-
- routeCurrentMessage();
-
-
- _transaction.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- }
-
- public void onRollback()
- {
- handle.remove();
- }
- });
+ _currentMessage.route();
deliverCurrentMessageIfComplete();
}
@@ -346,17 +330,41 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
_actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
}
-
}
else
{
+ final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData());
+ _currentMessage.setStoredMessage(handle);
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
+ {
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
+ {
+ ContentChunk contentChunk = _currentMessage.getContentChunk(i);
+ handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
+ bodyLengthReceived += contentChunk.getSize();
+ }
+ }
+
+ _transaction.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ }
+
+ public void onRollback()
+ {
+ handle.remove();
+ }
+ });
+
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ updateTransactionalActivity();
+ _currentMessage.getStoredMessage().flushToStore();
}
}
- _currentMessage.getStoredMessage().flushToStore();
-
}
finally
{
@@ -383,9 +391,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
try
{
-
- // returns true iff the message was delivered (i.e. if all data was
- // received
final ContentChunk contentChunk =
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
@@ -409,11 +414,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- protected void routeCurrentMessage() throws AMQException
- {
- _currentMessage.route();
- }
-
public long getNextDeliveryTag()
{
return ++_deliveryTag;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index cb018f1772..c5a610c7b6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -70,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private Exchange _exchange;
-
- private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
// we keep both the original meta data object and the store reference to it just in case the
@@ -132,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
- public MessageMetaData headersReceived()
- {
-
- return headersReceived(System.currentTimeMillis());
- }
-
public MessageMetaData headersReceived(long currentTime)
{
_messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -150,16 +142,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _destinationQueues;
}
- public int addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
+ public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
-
-
-
- return _receivedChunkCount++;
}
public boolean allContentReceived()
@@ -259,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _expiration;
}
- public int getReceivedChunkCount()
- {
- return _receivedChunkCount;
- }
-
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ public ContentChunk getContentChunk(int index)
{
return _contentChunks.get(index);
}
@@ -330,4 +310,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
return _connectionReference;
}
+
+ public MessageMetaData getMessageMetaData()
+ {
+ return _messageMetaData;
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 27a0462e09..7c7645e9e6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -123,7 +123,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
protected int route(Message m) throws AMQException
{
- m.getIncomingMessage().headersReceived();
+ m.getIncomingMessage().headersReceived(System.currentTimeMillis());
m.route(exchange);
if(m.getIncomingMessage().allContentReceived())
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index dfa31e131f..00c8a18d9f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -350,7 +350,7 @@ public class TopicExchangeTest extends InternalBrokerBaseCase
private int routeMessage(final IncomingMessage message)
throws AMQException
{
- MessageMetaData mmd = message.headersReceived();
+ MessageMetaData mmd = message.headersReceived(System.currentTimeMillis());
message.setStoredMessage(_store.addMessage(mmd));
message.enqueue(_exchange.route(message));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 91e35f07de..25d35aab16 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -35,6 +35,8 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import javax.management.Notification;
+
+import java.nio.ByteBuffer;
import java.util.ArrayList;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
@@ -300,7 +302,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
messages[i] = message(false, size);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(getQueue());
- metaData[i] = messages[i].headersReceived();
+ metaData[i] = messages[i].headersReceived(System.currentTimeMillis());
messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i]));
messages[i].enqueue(qs);
@@ -309,30 +311,29 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
for (int i = 0; i < messageCount; i++)
{
- messages[i].addContentBodyFrame(
- new ContentChunk()
- {
-
- private byte[] _data = new byte[(int)size];
+ ContentChunk contentChunk = new ContentChunk()
+ {
+ private byte[] _data = new byte[(int)size];
- public int getSize()
- {
- return (int) size;
- }
+ public int getSize()
+ {
+ return (int) size;
+ }
- public byte[] getData()
- {
- return _data;
- }
+ public byte[] getData()
+ {
+ return _data;
+ }
- public void reduceToFit()
- {
+ public void reduceToFit()
+ {
+ }
+ };
- }
- });
+ messages[i].addContentBodyFrame(contentChunk);
+ messages[i].getStoredMessage().addContent(0, ByteBuffer.wrap(contentChunk.getData()));
getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));
-
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 933eb3b84f..45933e7064 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.AMQChannel;
@@ -45,6 +46,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -457,15 +459,16 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
currentMessage.enqueue(qs);
// route header
- MessageMetaData mmd = currentMessage.headersReceived();
- currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+ MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
- // Add the body so we have something to test later
- currentMessage.addContentBodyFrame(
- getSession().getMethodRegistry()
- .getProtocolVersionMethodConverter()
- .convertToContentChunk(
- new ContentBody(new byte[(int) MESSAGE_SIZE])));
+ // Add the message to the store so we have something to test later
+ currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+ ContentChunk chunk = getSession().getMethodRegistry()
+ .getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(new byte[(int) MESSAGE_SIZE]));
+ currentMessage.addContentBodyFrame(chunk);
+ currentMessage.getStoredMessage().addContent(0, ByteBuffer.wrap(chunk.getData()));
AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
for(BaseQueue q : currentMessage.getDestinationQueues())
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 1b35dfe34f..273f0dc018 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -143,7 +143,7 @@ public class AckTest extends InternalBrokerBaseCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived();
+ MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
msg.setStoredMessage(storedMessage);
final AMQMessage message = new AMQMessage(storedMessage);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 9f022dcdde..79c744902d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -633,7 +633,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Send persistent message
qs.add(_queue);
- MessageMetaData metaData = msg.headersReceived();
+ MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis());
StoredMessage handle = _store.addMessage(metaData);
msg.setStoredMessage(handle);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 5fc074d877..d49f0586ba 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -600,7 +600,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
currentMessage.setExpiration();
- MessageMetaData mmd = currentMessage.headersReceived();
+ MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
currentMessage.getStoredMessage().flushToStore();
currentMessage.route();