summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-18 11:11:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-18 11:11:02 +0000
commit03e923e600ec39f245de5c384f798692e9b38f26 (patch)
treedc7c545c29b6756ed8f62e212d1ad032490eac6a
parent657f4cceee799aa773898071eeb035c24d72bec1 (diff)
downloadqpid-python-03e923e600ec39f245de5c384f798692e9b38f26.tar.gz
QPID-4999 : [Java Broker] Strip selector arguments from persistent bindings to non-topic exchanges created by buggy old clients
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1504429 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java122
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java213
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java25
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java6
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java109
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java266
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java8
13 files changed, 706 insertions, 71 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index f6b7e1790f..017b02ac7f 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -21,20 +21,10 @@
package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.ExceptionEvent;
-import com.sleepycat.je.ExceptionListener;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.*;
+import com.sleepycat.je.Transaction;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -85,15 +75,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private Environment _environment;
- private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
- private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
- private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private String DELIVERYDB_NAME = "QUEUE_ENTRIES";
- private String BRIDGEDB_NAME = "BRIDGES";
- private String LINKDB_NAME = "LINKS";
- private String XIDDB_NAME = "XIDS";
+ private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
+ private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
+ private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
+ private static String BRIDGEDB_NAME = "BRIDGES";
+ private static String LINKDB_NAME = "LINKS";
+ private static String XIDDB_NAME = "XIDS";
+ private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
private Database _configuredObjectsDb;
+ private Database _configVersionDb;
private Database _messageMetaDataDb;
private Database _messageContentDb;
private Database _deliveryDb;
@@ -326,6 +318,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
dbConfig.setReadOnly(false);
_configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
+ _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
_messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
_messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
_deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
@@ -426,10 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
try
{
- recoveryHandler.beginConfigurationRecovery(this);
+ final int configVersion = getConfigVersion();
+ recoveryHandler.beginConfigurationRecovery(this, configVersion);
loadConfiguredObjects(recoveryHandler);
- recoveryHandler.completeConfigurationRecovery();
+ final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
}
catch (DatabaseException e)
{
@@ -438,6 +436,66 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
+ private void updateConfigVersion(int newConfigVersion) throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ Transaction txn = _environment.beginTransaction(null, null);
+ cursor = _configVersionDb.openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0,key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ IntegerBinding.intToEntry(newConfigVersion, value);
+ OperationStatus status = cursor.put(key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error setting config version: " + status);
+ }
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ }
+
+ private int getConfigVersion() throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _configVersionDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ return IntegerBinding.entryToInt(value);
+ }
+
+ // Insert 0 as the default config version
+ IntegerBinding.intToEntry(0,value);
+ ByteBinding.byteToEntry((byte) 0,key);
+ OperationStatus status = _configVersionDb.put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error initialising config version: " + status);
+ }
+ return 0;
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
Cursor cursor = null;
@@ -750,9 +808,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
+
@Override
public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
+ update(id, type, attributes, null);
+ }
+
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ for(ConfiguredObjectRecord record : records)
+ {
+ update(record.getId(), record.getType(), record.getAttributes(), txn);
+ }
+ txn.commit();
+ }
+
+ private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException
+ {
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Updating " +type + ", id: " + id);
@@ -768,14 +842,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
DatabaseEntry newValue = new DatabaseEntry();
ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
+ OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
- status = _configuredObjectsDb.put(null, key, newValue);
+ status = _configuredObjectsDb.put(txn, key, newValue);
if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Error updating queue details within the store: " + status);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index 2b9c5ad290..647e19d659 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -531,6 +531,10 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
if (fieldValues != null)
{
Object[] array = fieldValues.toArray(new Object[fieldValues.size()]);
+ if (attributes == null)
+ {
+ attributes = new HashMap<String, Object>();
+ }
attributes.put(fieldName, array);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
index e0d1ac8695..81a89c9b4b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
@@ -91,12 +91,20 @@ public class FilterSupport
}
- static boolean argumentsContainFilter(final Map<String, Object> args)
+ public static boolean argumentsContainFilter(final Map<String, Object> args)
{
return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
}
+ public static void removeFilters(final Map<String, Object> args)
+ {
+ args.remove(AMQPFilterTypes.JMS_SELECTOR.toString());
+ args.remove(AMQPFilterTypes.NO_LOCAL.toString());
+ }
+
+
+
static boolean argumentsContainNoLocal(final Map<String, Object> args)
{
return args != null
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 08e9c5ca5c..9e32d303fb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -119,6 +119,7 @@ public interface VirtualHost extends ConfiguredObject
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH));
+ int CURRENT_CONFIG_VERSION = 1;
//children
Collection<VirtualHostAlias> getAliases();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 905c83f7ed..e97c0d662d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -55,6 +55,7 @@ import org.codehaus.jackson.map.ObjectMapper;
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+ private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -68,9 +69,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final int DEFAULT_CONFIG_VERSION = 0;
public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
- XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME };
+ XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
private static final int DB_VERSION = 6;
@@ -80,6 +82,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+ private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
+
+
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
@@ -223,6 +231,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
+ createConfigVersionTable(conn);
createConfiguredObjectsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
@@ -259,7 +268,33 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
pstmt.close();
}
}
+ }
+ private void createConfigVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_CONFIG_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
+ try
+ {
+ pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
}
private void createConfiguredObjectsTable(final Connection conn) throws SQLException
@@ -279,6 +314,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
+
+
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -457,10 +494,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
try
{
- recoveryHandler.beginConfigurationRecovery(this);
+ recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
loadConfiguredObjects(recoveryHandler);
- recoveryHandler.completeConfigurationRecovery();
+ setConfigVersion(recoveryHandler.completeConfigurationRecovery());
}
catch (SQLException e)
{
@@ -468,6 +505,67 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
+ private void setConfigVersion(int version) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
+ try
+ {
+ stmt.setInt(1, version);
+ stmt.execute();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private int getConfigVersion() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+ try
+ {
+
+ if(rs.next())
+ {
+ return rs.getInt(1);
+ }
+ return DEFAULT_CONFIG_VERSION;
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
@Override
public void close() throws Exception
{
@@ -1837,52 +1935,89 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn = newAutoCommitConnection();
try
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
+ updateConfiguredObject(configuredObject, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : records)
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
+ updateConfiguredObject(record, conn);
+ }
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
+
+ }
+
+ }
+
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, Connection conn)
+ throws SQLException, AMQStoreException
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
try
{
- if (rs.next())
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
{
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
+ byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
}
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
}
finally
{
- rs.close();
+ stmt2.close();
}
}
- finally
- {
- stmt.close();
- }
}
finally
{
- conn.close();
+ rs.close();
}
}
catch (JsonMappingException e)
@@ -1897,11 +2032,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
- catch (SQLException e)
+ finally
{
- throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ stmt.close();
}
- }
+
}
private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index 2a4aed5373..a3534d3fa5 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -28,10 +28,14 @@ import java.util.UUID;
public interface ConfigurationRecoveryHandler
{
- void beginConfigurationRecovery(DurableConfigurationStore store);
+ void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
void configuredObject(UUID id, String type, Map<String, Object> attributes);
- void completeConfigurationRecovery();
+ /**
+ *
+ * @return the model version of the configuration
+ */
+ int completeConfigurationRecovery();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
index 5c8f452dc7..44490385d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
@@ -60,4 +60,29 @@ public class ConfiguredObjectRecord
return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ ConfiguredObjectRecord that = (ConfiguredObjectRecord) o;
+
+ return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _id.hashCode();
+ result = 31 * result + _type.hashCode();
+ result = 31 * result + _attributes.hashCode();
+ return result;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index fdde21ba89..7ce761af18 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -91,4 +91,7 @@ public interface DurableConfigurationStore
void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException;
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 5fc6bad368..c605e1b599 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
-
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 77cde80af9..078a0d3752 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.store;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.model.VirtualHost;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
@@ -38,6 +39,11 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ }
+
+ @Override
public void remove(UUID id, String type)
{
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index f364d93d98..605bbe5f45 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@@ -35,6 +36,8 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -49,6 +52,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
@@ -63,6 +67,8 @@ import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
import org.apache.qpid.util.ByteBufferInputStream;
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
@@ -85,6 +91,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private MessageStoreLogSubject _logSubject;
private MessageStore _store;
+ private int _currentConfigVersion;
+ private DurableConfigurationStore _configStore;
public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
ExchangeRegistry exchangeRegistry,
@@ -96,10 +104,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
@Override
- public void beginConfigurationRecovery(DurableConfigurationStore store)
+ public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion)
{
_logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
-
+ _configStore = store;
+ _currentConfigVersion = configVersion;
CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START());
}
@@ -482,8 +491,20 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
@Override
- public void completeConfigurationRecovery()
+ public int completeConfigurationRecovery()
{
+ if(CURRENT_CONFIG_VERSION !=_currentConfigVersion)
+ {
+ try
+ {
+ upgrade();
+ }
+ catch (AMQStoreException e)
+ {
+ throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION);
+ }
+ }
+
Map<UUID, Map<String, Object>> exchangeObjects =
_configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
@@ -511,6 +532,88 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+
+ return CURRENT_CONFIG_VERSION;
+ }
+
+ private void upgrade() throws AMQStoreException
+ {
+
+ Map<UUID, String> updates = new HashMap<UUID, String>();
+
+ final String bindingType = Binding.class.getName();
+
+ switch(_currentConfigVersion)
+ {
+ case 0:
+ Map<UUID, Map<String, Object>> bindingObjects =
+ _configuredObjects.get(bindingType);
+ if(bindingObjects != null)
+ {
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet())
+ {
+ Map<String, Object> binding = bindingEntry.getValue();
+
+ if(hasSelectorArguments(binding) && !isTopicExchange(binding))
+ {
+ binding = new LinkedHashMap<String, Object>(binding);
+ removeSelectorArguments(binding);
+ bindingEntry.setValue(binding);
+
+ updates.put(bindingEntry.getKey(), bindingType);
+ }
+ }
+ }
+ case CURRENT_CONFIG_VERSION:
+ if(!updates.isEmpty())
+ {
+ ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()];
+ int i = 0;
+ for(Map.Entry<UUID, String> update : updates.entrySet())
+ {
+ updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey()));
+ }
+ _configStore.update(updateRecords);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?");
+ }
+ }
+
+ private void removeSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+
+ FilterSupport.removeFilters(arguments);
+ binding.put(Binding.ARGUMENTS, arguments);
+ }
+
+ private boolean isTopicExchange(Map<String, Object> binding)
+ {
+ UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE));
+ final
+ Map<UUID, Map<String, Object>> exchanges =
+ _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName());
+
+ if(exchanges != null && exchanges.containsKey(exchangeId))
+ {
+ return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE));
+ }
+ else
+ {
+ return _exchangeRegistry.getExchange(exchangeId) != null
+ && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE;
+ }
+
+ }
+
+ private boolean hasSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
}
private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
new file mode 100644
index 0000000000..ac81f5d625
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase
+{
+ private Exchange _directExchange;
+ private Exchange _topicExchange;
+ private VirtualHost _vhost;
+ private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler;
+ private DurableConfigurationStore _store;
+
+ private static final UUID QUEUE_ID = new UUID(0,0);
+ private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
+ private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+
+ _directExchange = mock(Exchange.class);
+ when(_directExchange.getType()).thenReturn(DirectExchange.TYPE);
+
+
+ _topicExchange = mock(Exchange.class);
+ when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE);
+
+ AMQQueue queue = mock(AMQQueue.class);
+
+ _vhost = mock(VirtualHost.class);
+
+ ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class);
+ when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
+ when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+
+ QueueRegistry queueRegistry = mock(QueueRegistry.class);
+ when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
+
+ when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+
+ ExchangeFactory exchangeFactory = mock(ExchangeFactory.class);
+ _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory);
+
+ _store = mock(DurableConfigurationStore.class);
+
+ CurrentActor.set(mock(LogActor.class));
+ }
+
+ public void testUpgradeEmptyStore() throws Exception
+ {
+ _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
+ assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery());
+ }
+
+ public void testUpgradeNewerStoreFails() throws Exception
+ {
+ try
+ {
+ _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1);
+ _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+ fail("Should not be able to start when config model is newer than current");
+ }
+ catch (IllegalStateException e)
+ {
+ // pass
+ }
+ }
+
+ public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception
+ {
+
+ _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
+
+ _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+ }
+
+
+
+ public void testUpgradeOnlyRemovesSelectorBindings() throws Exception
+ {
+
+ _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
+
+ _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
+
+
+ UUID customExchangeId = new UUID(3,0);
+
+ _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
+
+ _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId,
+ "org.apache.qpid.server.model.Exchange",
+ createExchange("customExchange", HeadersExchange.TYPE));
+
+
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")),
+ new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo"))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+ }
+
+
+ public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception
+ {
+
+ _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
+
+ _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+ }
+
+ public void testUpgradeDoesNotRecur() throws Exception
+ {
+
+ _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1);
+
+ _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
+
+ doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class));
+
+ _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
+ }
+
+ private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
+ {
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ Object[] args = invocation.getArguments();
+ assertEquals("Updated records are not as expected", new HashSet(Arrays.asList(
+ expected)), new HashSet(Arrays.asList(args)));
+
+ return null;
+ }
+ }).when(_store).update(any(ConfiguredObjectRecord[].class));
+ }
+
+ private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
+ {
+ Map<String, Object> binding = new LinkedHashMap<String, Object>();
+
+ binding.put("name", bindingKey);
+ binding.put(Binding.EXCHANGE, exchangeId.toString());
+ binding.put(Binding.QUEUE, queueId.toString());
+ Map<String,String> argumentMap = new LinkedHashMap<String, String>();
+ if(args != null && args.length != 0)
+ {
+ String key = null;
+ for(String arg : args)
+ {
+ if(key == null)
+ {
+ key = arg;
+ }
+ else
+ {
+ argumentMap.put(key, arg);
+ key = null;
+ }
+ }
+ }
+ binding.put(Binding.ARGUMENTS, argumentMap);
+ return binding;
+ }
+
+
+ private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type)
+ {
+ Map<String, Object> exchange = new LinkedHashMap<String, Object>();
+
+ exchange.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
+
+ return exchange;
+
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index c44d4778d4..b541fcc9c6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -197,6 +197,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("update");
}
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ doPreDelay("update");
+ _durableConfigurationStore.update(records);
+ doPostDelay("update");
+ }
+
public Transaction newTransaction()
{
doPreDelay("beginTran");