diff options
author | Robert Gemmell <robbie@apache.org> | 2012-07-31 13:08:37 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-07-31 13:08:37 +0000 |
commit | 660b3224fa533e87828dba175b95835cf5eba9e2 (patch) | |
tree | 37162a1ce405406d0dd24afdff1bfe36793782cb | |
parent | 974579e2632ec4e82346c35cbb85ac5bf7f39cc7 (diff) | |
download | qpid-python-660b3224fa533e87828dba175b95835cf5eba9e2.tar.gz |
QPID-4164: Prevent the erroneous re-storing of recovered messages during move/copyMessage management functions.
merged from trunk r1365832
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1367529 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 138 insertions, 55 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index a812436f34..755cb9c89e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -546,7 +546,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore long messageId = LongBinding.entryToLong(key); StorableMessageMetaData metaData = valueBinding.entryToObject(value); - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); mrh.message(message); @@ -1576,34 +1576,26 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private final long _messageId; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private final boolean _isRecovered; private StorableMessageMetaData _metaData; - private volatile SoftReference<byte[]> _dataRef; + private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private byte[] _data; + private volatile SoftReference<byte[]> _dataRef; StoredBDBMessage(long messageId, StorableMessageMetaData metaData) { - this(messageId, metaData, true); + this(messageId, metaData, false); } - - StoredBDBMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) + StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered) { - try - { - _messageId = messageId; - _metaData = metaData; - - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - - } - catch (DatabaseException e) - { - throw new RuntimeException(e); - } + _messageId = messageId; + _isRecovered = isRecovered; + _metaData = metaData; + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } public StorableMessageMetaData getMetaData() @@ -1693,8 +1685,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore synchronized void store(com.sleepycat.je.Transaction txn) { - - if(unstored()) + if (!stored()) { try { @@ -1724,14 +1715,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private boolean unstored() - { - return _metaData != null; - } - public synchronized StoreFuture flushToStore() { - if(unstored()) + if(!stored()) { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); store(txn); @@ -1755,6 +1741,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore throw new RuntimeException(e); } } + + private boolean stored() + { + return _metaData == null || _isRecovered; + } } private class BDBTransaction implements org.apache.qpid.server.store.Transaction diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 8ae4fec975..79d04b239e 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -41,10 +41,12 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import javax.management.Notification; import javax.management.NotificationListener; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; +import javax.naming.NamingException; import java.util.ArrayList; import java.util.Arrays; @@ -83,7 +85,6 @@ public class QueueManagementTest extends QpidBrokerTestCase private ManagedQueue _managedSourceQueue; private ManagedQueue _managedDestinationQueue; - public void setUp() throws Exception { _jmxUtils = new JMXTestUtils(this); @@ -93,10 +94,8 @@ public class QueueManagementTest extends QpidBrokerTestCase _sourceQueueName = getTestQueueName() + "_src"; _destinationQueueName = getTestQueueName() + "_dest"; - _connection = getConnection(); - _connection.start(); + createConnectionAndSession(); - _session = _connection.createSession(true, Session.SESSION_TRANSACTED); _sourceQueue = _session.createQueue(_sourceQueueName); _destinationQueue = _session.createQueue(_destinationQueueName); createQueueOnBroker(_sourceQueue); @@ -104,8 +103,7 @@ public class QueueManagementTest extends QpidBrokerTestCase _jmxUtils.open(); - _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); - _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); + createManagementInterfacesForQueues(); } public void tearDown() throws Exception @@ -498,6 +496,70 @@ public class QueueManagementTest extends QpidBrokerTestCase assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); } + /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception + { + final int numberOfMessagesToSend = 1; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + restartBroker(); + + createManagementInterfacesForQueues(); + createConnectionAndSession(); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move messages to destination + long messageId = amqMessagesIds.get(0); + _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception + { + final int numberOfMessagesToSend = 1; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + restartBroker(); + + createManagementInterfacesForQueues(); + createConnectionAndSession(); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move messages to destination + long messageId = amqMessagesIds.get(0); + _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0); + } + + @Override + public Message createNextMessage(Session session, int messageNumber) throws JMSException + { + Message message = session.createTextMessage(getContentForMessageNumber(messageNumber)); + message.setIntProperty(INDEX, messageNumber); + return message; + } + private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception { @@ -521,9 +583,10 @@ public class QueueManagementTest extends QpidBrokerTestCase for (int i : expectedIndices) { - Message message = consumer.receive(1000); + TextMessage message = (TextMessage)consumer.receive(1000); assertNotNull("Expected message with index " + i, message); assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); + assertEquals("Expected message content", getContentForMessageNumber(i), message.getText()); } assertNull("Unexpected message encountered", consumer.receive(1000)); @@ -574,6 +637,25 @@ public class QueueManagementTest extends QpidBrokerTestCase ((AMQSession<?,?>)session).sync(); } + private void createConnectionAndSession() throws JMSException, + NamingException + { + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + } + + private void createManagementInterfacesForQueues() + { + _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); + _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); + } + + private String getContentForMessageNumber(int msgCount) + { + return "Message count " + msgCount; + } + private final class RecordingNotificationListener implements NotificationListener { private final CountDownLatch _notificationReceivedLatch; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 36ac8b3d40..bc9cda7f71 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -1575,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore buf = buf.slice(); MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); - StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); + StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true); messageHandler.message(message); } @@ -2037,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore { private final long _messageId; + private final boolean _isRecovered; + private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; @@ -2045,21 +2047,18 @@ public class DerbyMessageStore implements MessageStore StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) { - this(messageId, metaData, true); + this(messageId, metaData, false); } StoredDerbyMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) + StorableMessageMetaData metaData, boolean isRecovered) { _messageId = messageId; - + _isRecovered = isRecovered; _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) - { - _metaData = metaData; - } + _metaData = metaData; } @Override @@ -2140,16 +2139,16 @@ public class DerbyMessageStore implements MessageStore @Override public synchronized StoreFuture flushToStore() { + Connection conn = null; try { - if(_metaData != null) + if(!stored()) { - Connection conn = newConnection(); + conn = newConnection(); store(conn); conn.commit(); - conn.close(); storedSizeChange(getMetaData().getContentSize()); } } @@ -2161,12 +2160,24 @@ public class DerbyMessageStore implements MessageStore } throw new RuntimeException(e); } + finally + { + closeConnection(conn); + } return StoreFuture.IMMEDIATE_FUTURE; } + @Override + public void remove() + { + int delta = getMetaData().getContentSize(); + DerbyMessageStore.this.removeMessage(_messageId); + storedSizeChange(-delta); + } + private synchronized void store(final Connection conn) throws SQLException { - if(_metaData != null) + if (!stored()) { try { @@ -2179,20 +2190,17 @@ public class DerbyMessageStore implements MessageStore _metaData = null; _data = null; } - } - if(_logger.isDebugEnabled()) - { - _logger.debug("Storing message " + _messageId + " to store"); + if(_logger.isDebugEnabled()) + { + _logger.debug("Storing message " + _messageId + " to store"); + } } } - @Override - public void remove() + private boolean stored() { - int delta = getMetaData().getContentSize(); - DerbyMessageStore.this.removeMessage(_messageId); - storedSizeChange(-delta); + return _metaData == null || _isRecovered; } } diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 3e42b79d99..c49653051a 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -45,3 +45,5 @@ org.apache.qpid.server.store.DurableConfigurationStoreTest#* org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart +org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetweenQueuesWithBrokerRestart +org.apache.qpid.systest.management.jmx.QueueManagementTest#testCopyMessageBetweenQueuesWithBrokerRestart |