summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-07-31 13:08:37 +0000
committerRobert Gemmell <robbie@apache.org>2012-07-31 13:08:37 +0000
commit660b3224fa533e87828dba175b95835cf5eba9e2 (patch)
tree37162a1ce405406d0dd24afdff1bfe36793782cb
parent974579e2632ec4e82346c35cbb85ac5bf7f39cc7 (diff)
downloadqpid-python-660b3224fa533e87828dba175b95835cf5eba9e2.tar.gz
QPID-4164: Prevent the erroneous re-storing of recovered messages during move/copyMessage management functions.
merged from trunk r1365832 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1367529 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java45
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java96
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java50
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes2
4 files changed, 138 insertions, 55 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index a812436f34..755cb9c89e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -546,7 +546,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
mrh.message(message);
@@ -1576,34 +1576,26 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
private final long _messageId;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private final boolean _isRecovered;
private StorableMessageMetaData _metaData;
- private volatile SoftReference<byte[]> _dataRef;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+
private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
- this(messageId, metaData, true);
+ this(messageId, metaData, false);
}
-
- StoredBDBMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
{
- try
- {
- _messageId = messageId;
- _metaData = metaData;
-
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+ _metaData = metaData;
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
public StorableMessageMetaData getMetaData()
@@ -1693,8 +1685,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
synchronized void store(com.sleepycat.je.Transaction txn)
{
-
- if(unstored())
+ if (!stored())
{
try
{
@@ -1724,14 +1715,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- private boolean unstored()
- {
- return _metaData != null;
- }
-
public synchronized StoreFuture flushToStore()
{
- if(unstored())
+ if(!stored())
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
store(txn);
@@ -1755,6 +1741,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
throw new RuntimeException(e);
}
}
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
}
private class BDBTransaction implements org.apache.qpid.server.store.Transaction
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 8ae4fec975..79d04b239e 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -41,10 +41,12 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
+import javax.naming.NamingException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -83,7 +85,6 @@ public class QueueManagementTest extends QpidBrokerTestCase
private ManagedQueue _managedSourceQueue;
private ManagedQueue _managedDestinationQueue;
-
public void setUp() throws Exception
{
_jmxUtils = new JMXTestUtils(this);
@@ -93,10 +94,8 @@ public class QueueManagementTest extends QpidBrokerTestCase
_sourceQueueName = getTestQueueName() + "_src";
_destinationQueueName = getTestQueueName() + "_dest";
- _connection = getConnection();
- _connection.start();
+ createConnectionAndSession();
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
_sourceQueue = _session.createQueue(_sourceQueueName);
_destinationQueue = _session.createQueue(_destinationQueueName);
createQueueOnBroker(_sourceQueue);
@@ -104,8 +103,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
_jmxUtils.open();
- _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
- _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
+ createManagementInterfacesForQueues();
}
public void tearDown() throws Exception
@@ -498,6 +496,70 @@ public class QueueManagementTest extends QpidBrokerTestCase
assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue());
}
+ /**
+ * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface.
+ */
+ public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception
+ {
+ final int numberOfMessagesToSend = 1;
+
+ sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+ syncSession(_session);
+ assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
+
+ restartBroker();
+
+ createManagementInterfacesForQueues();
+ createConnectionAndSession();
+
+ List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+ // Move messages to destination
+ long messageId = amqMessagesIds.get(0);
+ _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName);
+
+ assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue());
+
+ assertMessageIndicesOn(_destinationQueue, 0);
+ }
+
+ /**
+ * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
+ */
+ public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception
+ {
+ final int numberOfMessagesToSend = 1;
+
+ sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+ syncSession(_session);
+ assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
+
+ restartBroker();
+
+ createManagementInterfacesForQueues();
+ createConnectionAndSession();
+
+ List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+ // Move messages to destination
+ long messageId = amqMessagesIds.get(0);
+ _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName);
+
+ assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue());
+
+ assertMessageIndicesOn(_destinationQueue, 0);
+ }
+
+ @Override
+ public Message createNextMessage(Session session, int messageNumber) throws JMSException
+ {
+ Message message = session.createTextMessage(getContentForMessageNumber(messageNumber));
+ message.setIntProperty(INDEX, messageNumber);
+ return message;
+ }
+
private void startAsyncConsumerOn(Destination queue, Connection asyncConnection,
final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception
{
@@ -521,9 +583,10 @@ public class QueueManagementTest extends QpidBrokerTestCase
for (int i : expectedIndices)
{
- Message message = consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
assertNotNull("Expected message with index " + i, message);
assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX));
+ assertEquals("Expected message content", getContentForMessageNumber(i), message.getText());
}
assertNull("Unexpected message encountered", consumer.receive(1000));
@@ -574,6 +637,25 @@ public class QueueManagementTest extends QpidBrokerTestCase
((AMQSession<?,?>)session).sync();
}
+ private void createConnectionAndSession() throws JMSException,
+ NamingException
+ {
+ _connection = getConnection();
+ _connection.start();
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ private void createManagementInterfacesForQueues()
+ {
+ _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
+ _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
+ }
+
+ private String getContentForMessageNumber(int msgCount)
+ {
+ return "Message count " + msgCount;
+ }
+
private final class RecordingNotificationListener implements NotificationListener
{
private final CountDownLatch _notificationReceivedLatch;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 36ac8b3d40..bc9cda7f71 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -1575,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+ StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -2037,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore
{
private final long _messageId;
+ private final boolean _isRecovered;
+
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -2045,21 +2047,18 @@ public class DerbyMessageStore implements MessageStore
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
- this(messageId, metaData, true);
+ this(messageId, metaData, false);
}
StoredDerbyMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
+ StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
-
+ _isRecovered = isRecovered;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _metaData = metaData;
- }
+ _metaData = metaData;
}
@Override
@@ -2140,16 +2139,16 @@ public class DerbyMessageStore implements MessageStore
@Override
public synchronized StoreFuture flushToStore()
{
+ Connection conn = null;
try
{
- if(_metaData != null)
+ if(!stored())
{
- Connection conn = newConnection();
+ conn = newConnection();
store(conn);
conn.commit();
- conn.close();
storedSizeChange(getMetaData().getContentSize());
}
}
@@ -2161,12 +2160,24 @@ public class DerbyMessageStore implements MessageStore
}
throw new RuntimeException(e);
}
+ finally
+ {
+ closeConnection(conn);
+ }
return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ DerbyMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
+ }
+
private synchronized void store(final Connection conn) throws SQLException
{
- if(_metaData != null)
+ if (!stored())
{
try
{
@@ -2179,20 +2190,17 @@ public class DerbyMessageStore implements MessageStore
_metaData = null;
_data = null;
}
- }
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Storing message " + _messageId + " to store");
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
+ }
}
}
- @Override
- public void remove()
+ private boolean stored()
{
- int delta = getMetaData().getContentSize();
- DerbyMessageStore.this.removeMessage(_messageId);
- storedSizeChange(-delta);
+ return _metaData == null || _isRecovered;
}
}
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 3e42b79d99..c49653051a 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -45,3 +45,5 @@ org.apache.qpid.server.store.DurableConfigurationStoreTest#*
org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart
+org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetweenQueuesWithBrokerRestart
+org.apache.qpid.systest.management.jmx.QueueManagementTest#testCopyMessageBetweenQueuesWithBrokerRestart