diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-18 11:11:02 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-18 11:11:02 +0000 |
commit | 03e923e600ec39f245de5c384f798692e9b38f26 (patch) | |
tree | dc7c545c29b6756ed8f62e212d1ad032490eac6a | |
parent | 657f4cceee799aa773898071eeb035c24d72bec1 (diff) | |
download | qpid-python-03e923e600ec39f245de5c384f798692e9b38f26.tar.gz |
QPID-4999 : [Java Broker] Strip selector arguments from persistent bindings to non-topic exchanges created by buggy old clients
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1504429 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 706 insertions, 71 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index f6b7e1790f..017b02ac7f 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -21,20 +21,10 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.CheckpointConfig; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; -import com.sleepycat.je.LockConflictException; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.*; +import com.sleepycat.je.Transaction; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -85,15 +75,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private Environment _environment; - private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; - private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; - private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private String DELIVERYDB_NAME = "QUEUE_ENTRIES"; - private String BRIDGEDB_NAME = "BRIDGES"; - private String LINKDB_NAME = "LINKS"; - private String XIDDB_NAME = "XIDS"; + private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; + private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; + private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; + private static String DELIVERYDB_NAME = "QUEUE_ENTRIES"; + private static String BRIDGEDB_NAME = "BRIDGES"; + private static String LINKDB_NAME = "LINKS"; + private static String XIDDB_NAME = "XIDS"; + private static String CONFIG_VERSION_DB = "CONFIG_VERSION"; private Database _configuredObjectsDb; + private Database _configVersionDb; private Database _messageMetaDataDb; private Database _messageContentDb; private Database _deliveryDb; @@ -326,6 +318,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo dbConfig.setReadOnly(false); _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); + _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig); _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); @@ -426,10 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { try { - recoveryHandler.beginConfigurationRecovery(this); + final int configVersion = getConfigVersion(); + recoveryHandler.beginConfigurationRecovery(this, configVersion); loadConfiguredObjects(recoveryHandler); - recoveryHandler.completeConfigurationRecovery(); + final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } } catch (DatabaseException e) { @@ -438,6 +436,66 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } + private void updateConfigVersion(int newConfigVersion) throws AMQStoreException + { + Cursor cursor = null; + try + { + Transaction txn = _environment.beginTransaction(null, null); + cursor = _configVersionDb.openCursor(txn, null); + DatabaseEntry key = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0,key); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + IntegerBinding.intToEntry(newConfigVersion, value); + OperationStatus status = cursor.put(key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error setting config version: " + status); + } + } + cursor.close(); + cursor = null; + txn.commit(); + } + finally + { + closeCursorSafely(cursor); + } + + } + + private int getConfigVersion() throws AMQStoreException + { + Cursor cursor = null; + try + { + cursor = _configVersionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + return IntegerBinding.entryToInt(value); + } + + // Insert 0 as the default config version + IntegerBinding.intToEntry(0,value); + ByteBinding.byteToEntry((byte) 0,key); + OperationStatus status = _configVersionDb.put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error initialising config version: " + status); + } + return 0; + } + finally + { + closeCursorSafely(cursor); + } + } + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { Cursor cursor = null; @@ -750,9 +808,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } + @Override public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { + update(id, type, attributes, null); + } + + public void update(ConfiguredObjectRecord... records) throws AMQStoreException + { + com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); + for(ConfiguredObjectRecord record : records) + { + update(record.getId(), record.getType(), record.getAttributes(), txn); + } + txn.commit(); + } + + private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException + { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Updating " +type + ", id: " + id); @@ -768,14 +842,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo DatabaseEntry newValue = new DatabaseEntry(); ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); + OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store configuredObjectBinding.objectToEntry(newQueueRecord, newValue); - status = _configuredObjectsDb.put(null, key, newValue); + status = _configuredObjectsDb.put(txn, key, newValue); if (status != OperationStatus.SUCCESS) { throw new AMQStoreException("Error updating queue details within the store: " + status); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index 2b9c5ad290..647e19d659 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -531,6 +531,10 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore if (fieldValues != null) { Object[] array = fieldValues.toArray(new Object[fieldValues.size()]); + if (attributes == null) + { + attributes = new HashMap<String, Object>(); + } attributes.put(fieldName, array); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java index e0d1ac8695..81a89c9b4b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java @@ -91,12 +91,20 @@ public class FilterSupport } - static boolean argumentsContainFilter(final Map<String, Object> args) + public static boolean argumentsContainFilter(final Map<String, Object> args) { return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); } + public static void removeFilters(final Map<String, Object> args) + { + args.remove(AMQPFilterTypes.JMS_SELECTOR.toString()); + args.remove(AMQPFilterTypes.NO_LOCAL.toString()); + } + + + static boolean argumentsContainNoLocal(final Map<String, Object> args) { return args != null diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 08e9c5ca5c..9e32d303fb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -119,6 +119,7 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH)); + int CURRENT_CONFIG_VERSION = 1; //children Collection<VirtualHostAlias> getAliases(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 905c83f7ed..e97c0d662d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -55,6 +55,7 @@ import org.codehaus.jackson.map.ObjectMapper; abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; + private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES"; @@ -68,9 +69,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + private static final int DEFAULT_CONFIG_VERSION = 0; public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME, - XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME }; + XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME }; private static final int DB_VERSION = 6; @@ -80,6 +82,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; + private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )"; + private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; + private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; + private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?"; + + private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; @@ -223,6 +231,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection conn = newAutoCommitConnection(); createVersionTable(conn); + createConfigVersionTable(conn); createConfiguredObjectsTable(conn); createQueueEntryTable(conn); createMetaDataTable(conn); @@ -259,7 +268,33 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC pstmt.close(); } } + } + private void createConfigVersionTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_CONFIG_VERSION_TABLE); + } + finally + { + stmt.close(); + } + + PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION); + try + { + pstmt.setInt(1, DEFAULT_CONFIG_VERSION); + pstmt.execute(); + } + finally + { + pstmt.close(); + } + } } private void createConfiguredObjectsTable(final Connection conn) throws SQLException @@ -279,6 +314,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } + + private void createQueueEntryTable(final Connection conn) throws SQLException { if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) @@ -457,10 +494,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { try { - recoveryHandler.beginConfigurationRecovery(this); + recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); loadConfiguredObjects(recoveryHandler); - recoveryHandler.completeConfigurationRecovery(); + setConfigVersion(recoveryHandler.completeConfigurationRecovery()); } catch (SQLException e) { @@ -468,6 +505,67 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } + private void setConfigVersion(int version) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + + PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION); + try + { + stmt.setInt(1, version); + stmt.execute(); + + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + + private int getConfigVersion() throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); + try + { + + if(rs.next()) + { + return rs.getInt(1); + } + return DEFAULT_CONFIG_VERSION; + } + finally + { + rs.close(); + } + + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + + } + @Override public void close() throws Exception { @@ -1837,52 +1935,89 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection conn = newAutoCommitConnection(); try { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); - try + updateConfiguredObject(configuredObject, conn); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + } + } + + @Override + public void update(ConfiguredObjectRecord... records) throws AMQStoreException + { + if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING)) + { + try + { + Connection conn = newConnection(); + try + { + for(ConfiguredObjectRecord record : records) { - stmt.setString(1, configuredObject.getId().toString()); - ResultSet rs = stmt.executeQuery(); + updateConfiguredObject(record, conn); + } + conn.commit(); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e); + } + + } + + } + + private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, Connection conn) + throws SQLException, AMQStoreException + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); try { - if (rs.next()) + stmt2.setString(1, configuredObject.getType()); + if (configuredObject.getAttributes() != null) { - PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); - try - { - stmt2.setString(1, configuredObject.getType()); - if (configuredObject.getAttributes() != null) - { - byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes( - configuredObject.getAttributes()); - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - stmt2.setBinaryStream(2, bis, attributesAsBytes.length); - } - else - { - stmt2.setNull(2, Types.BLOB); - } - stmt2.setString(3, configuredObject.getId().toString()); - stmt2.execute(); - } - finally - { - stmt2.close(); - } + byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes( + configuredObject.getAttributes()); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt2.setBinaryStream(2, bis, attributesAsBytes.length); + } + else + { + stmt2.setNull(2, Types.BLOB); } + stmt2.setString(3, configuredObject.getId().toString()); + stmt2.execute(); } finally { - rs.close(); + stmt2.close(); } } - finally - { - stmt.close(); - } } finally { - conn.close(); + rs.close(); } } catch (JsonMappingException e) @@ -1897,11 +2032,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); } - catch (SQLException e) + finally { - throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + stmt.close(); } - } + } private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index 2a4aed5373..a3534d3fa5 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -28,10 +28,14 @@ import java.util.UUID; public interface ConfigurationRecoveryHandler { - void beginConfigurationRecovery(DurableConfigurationStore store); + void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion); void configuredObject(UUID id, String type, Map<String, Object> attributes); - void completeConfigurationRecovery(); + /** + * + * @return the model version of the configuration + */ + int completeConfigurationRecovery(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java index 5c8f452dc7..44490385d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java @@ -60,4 +60,29 @@ public class ConfiguredObjectRecord return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]"; } + @Override + public boolean equals(Object o) + { + if(this == o) + { + return true; + } + if(o == null || getClass() != o.getClass()) + { + return false; + } + + ConfiguredObjectRecord that = (ConfiguredObjectRecord) o; + + return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes); + } + + @Override + public int hashCode() + { + int result = _id.hashCode(); + result = 31 * result + _type.hashCode(); + result = 31 * result + _attributes.hashCode(); + return result; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index fdde21ba89..7ce761af18 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -91,4 +91,7 @@ public interface DurableConfigurationStore void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException; + public void update(ConfiguredObjectRecord... records) throws AMQStoreException; + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 5fc6bad368..c605e1b599 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; - /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. * diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 77cde80af9..078a0d3752 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.util.Map; import java.util.UUID; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.model.VirtualHost; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore @@ -38,6 +39,11 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override + public void update(ConfiguredObjectRecord... records) throws AMQStoreException + { + } + + @Override public void remove(UUID id, String type) { } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index f364d93d98..605bbe5f45 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -24,6 +24,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -35,6 +36,8 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.FilterSupport; +import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; @@ -49,6 +52,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; @@ -63,6 +67,8 @@ import org.apache.qpid.transport.Xid; import org.apache.qpid.transport.util.Functions; import org.apache.qpid.util.ByteBufferInputStream; +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, @@ -85,6 +91,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private MessageStoreLogSubject _logSubject; private MessageStore _store; + private int _currentConfigVersion; + private DurableConfigurationStore _configStore; public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, ExchangeRegistry exchangeRegistry, @@ -96,10 +104,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } @Override - public void beginConfigurationRecovery(DurableConfigurationStore store) + public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion) { _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); - + _configStore = store; + _currentConfigVersion = configVersion; CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START()); } @@ -482,8 +491,20 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } @Override - public void completeConfigurationRecovery() + public int completeConfigurationRecovery() { + if(CURRENT_CONFIG_VERSION !=_currentConfigVersion) + { + try + { + upgrade(); + } + catch (AMQStoreException e) + { + throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION); + } + } + Map<UUID, Map<String, Object>> exchangeObjects = _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName()); @@ -511,6 +532,88 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); + + return CURRENT_CONFIG_VERSION; + } + + private void upgrade() throws AMQStoreException + { + + Map<UUID, String> updates = new HashMap<UUID, String>(); + + final String bindingType = Binding.class.getName(); + + switch(_currentConfigVersion) + { + case 0: + Map<UUID, Map<String, Object>> bindingObjects = + _configuredObjects.get(bindingType); + if(bindingObjects != null) + { + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet()) + { + Map<String, Object> binding = bindingEntry.getValue(); + + if(hasSelectorArguments(binding) && !isTopicExchange(binding)) + { + binding = new LinkedHashMap<String, Object>(binding); + removeSelectorArguments(binding); + bindingEntry.setValue(binding); + + updates.put(bindingEntry.getKey(), bindingType); + } + } + } + case CURRENT_CONFIG_VERSION: + if(!updates.isEmpty()) + { + ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()]; + int i = 0; + for(Map.Entry<UUID, String> update : updates.entrySet()) + { + updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey())); + } + _configStore.update(updateRecords); + } + break; + default: + throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?"); + } + } + + private void removeSelectorArguments(Map<String, Object> binding) + { + @SuppressWarnings("unchecked") + Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS)); + + FilterSupport.removeFilters(arguments); + binding.put(Binding.ARGUMENTS, arguments); + } + + private boolean isTopicExchange(Map<String, Object> binding) + { + UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE)); + final + Map<UUID, Map<String, Object>> exchanges = + _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName()); + + if(exchanges != null && exchanges.containsKey(exchangeId)) + { + return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE)); + } + else + { + return _exchangeRegistry.getExchange(exchangeId) != null + && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE; + } + + } + + private boolean hasSelectorArguments(Map<String, Object> binding) + { + @SuppressWarnings("unchecked") + Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS); + return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); } private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects) diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java new file mode 100644 index 0000000000..ac81f5d625 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java @@ -0,0 +1,266 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + +public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase +{ + private Exchange _directExchange; + private Exchange _topicExchange; + private VirtualHost _vhost; + private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler; + private DurableConfigurationStore _store; + + private static final UUID QUEUE_ID = new UUID(0,0); + private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); + private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); + + @Override + public void setUp() throws Exception + { + super.setUp(); + + + _directExchange = mock(Exchange.class); + when(_directExchange.getType()).thenReturn(DirectExchange.TYPE); + + + _topicExchange = mock(Exchange.class); + when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE); + + AMQQueue queue = mock(AMQQueue.class); + + _vhost = mock(VirtualHost.class); + + ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class); + when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); + when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + + QueueRegistry queueRegistry = mock(QueueRegistry.class); + when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); + + when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + + ExchangeFactory exchangeFactory = mock(ExchangeFactory.class); + _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory); + + _store = mock(DurableConfigurationStore.class); + + CurrentActor.set(mock(LogActor.class)); + } + + public void testUpgradeEmptyStore() throws Exception + { + _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); + assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery()); + } + + public void testUpgradeNewerStoreFails() throws Exception + { + try + { + _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1); + _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + fail("Should not be able to start when config model is newer than current"); + } + catch (IllegalStateException e) + { + // pass + } + } + + public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception + { + + _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); + + _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) + }; + + verifyCorrectUpdates(expected); + + _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + } + + + + public void testUpgradeOnlyRemovesSelectorBindings() throws Exception + { + + _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); + + _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); + + + UUID customExchangeId = new UUID(3,0); + + _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); + + _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId, + "org.apache.qpid.server.model.Exchange", + createExchange("customExchange", HeadersExchange.TYPE)); + + + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), + new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) + }; + + verifyCorrectUpdates(expected); + + _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + } + + + public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception + { + + _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); + + _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) + }; + + verifyCorrectUpdates(expected); + + _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + } + + public void testUpgradeDoesNotRecur() throws Exception + { + + _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1); + + _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); + + doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class)); + + _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); + } + + private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException + { + doAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + Object[] args = invocation.getArguments(); + assertEquals("Updated records are not as expected", new HashSet(Arrays.asList( + expected)), new HashSet(Arrays.asList(args))); + + return null; + } + }).when(_store).update(any(ConfiguredObjectRecord[].class)); + } + + private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args) + { + Map<String, Object> binding = new LinkedHashMap<String, Object>(); + + binding.put("name", bindingKey); + binding.put(Binding.EXCHANGE, exchangeId.toString()); + binding.put(Binding.QUEUE, queueId.toString()); + Map<String,String> argumentMap = new LinkedHashMap<String, String>(); + if(args != null && args.length != 0) + { + String key = null; + for(String arg : args) + { + if(key == null) + { + key = arg; + } + else + { + argumentMap.put(key, arg); + key = null; + } + } + } + binding.put(Binding.ARGUMENTS, argumentMap); + return binding; + } + + + private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type) + { + Map<String, Object> exchange = new LinkedHashMap<String, Object>(); + + exchange.put(org.apache.qpid.server.model.Exchange.NAME, name); + exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); + + return exchange; + + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index c44d4778d4..b541fcc9c6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -197,6 +197,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("update"); } + @Override + public void update(ConfiguredObjectRecord... records) throws AMQStoreException + { + doPreDelay("update"); + _durableConfigurationStore.update(records); + doPostDelay("update"); + } + public Transaction newTransaction() { doPreDelay("beginTran"); |