diff options
author | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
commit | 73d704196c485e9709b649ce5ae64497079ec0ba (patch) | |
tree | e06f5ab8fca5962f40bd9eb61a0b929294885576 /java/broker-core/src | |
parent | 553fb17bb27ad154628c11ba5b479563686dad27 (diff) | |
download | qpid-python-73d704196c485e9709b649ce5ae64497079ec0ba.tar.gz |
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with
another persistent store implementation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
3 files changed, 1185 insertions, 1041 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java new file mode 100644 index 0000000000..fb256abbf5 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java @@ -0,0 +1,1013 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.Module; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializerProvider; +import org.codehaus.jackson.map.module.SimpleModule; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + +public abstract class AbstractJDBCConfigurationStore implements MessageStoreProvider, DurableConfigurationStore +{ + private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; + + private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY"; + + private static final int DEFAULT_CONFIG_VERSION = 0; + + public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME)); + + private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; + private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME; + + private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id, object_type, attributes) VALUES (?,?,?)"; + private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME + + " set object_type =?, attributes = ? where id = ?"; + private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; + + + private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"; + + private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " where child_id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME; + + private static final Module _module; + static + { + SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null)); + + final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>() + { + @Override + public void serialize(final ConfiguredObject value, + final JsonGenerator jgen, + final SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeString(value.getId().toString()); + } + }; + module.addSerializer(ConfiguredObject.class, serializer); + + _module = module; + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + checkConfigurationStoreOpen(); + + try + { + handler.begin(); + doVisitAllConfiguredObjectRecords(handler); + handler.end(); + } + catch (SQLException e) + { + throw new StoreException("Cannot visit configured object records", e); + } + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException + { + Connection conn = newAutoCommitConnection(); + Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + String id = rs.getString(1); + String objectType = rs.getString(2); + String attributes = getBlobAsString(rs, 3); + final ConfiguredObjectRecordImpl configuredObjectRecord = + new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, + objectMapper.readValue(attributes, Map.class)); + configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); + + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID childId = UUID.fromString(rs.getString(1)); + String parentType = rs.getString(2); + UUID parentId = UUID.fromString(rs.getString(3)); + + ConfiguredObjectRecordImpl child = configuredObjects.get(childId); + ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); + + if(child != null && parent != null) + { + child.addParent(parentType, parent); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + conn.close(); + } + + for(ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shouldContinue = handler.handle(record); + if (!shouldContinue) + { + break; + } + } + } + + protected abstract void checkConfigurationStoreOpen(); + + protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws StoreException + { + Connection connection = null; + try + { + connection = newConnection(); + + boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); + if(tableExists) + { + int configVersion = getConfigVersion(connection); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader read existing config version " + configVersion); + } + + switch(configVersion) + { + + case 7: + upgradeFromV7(parent); + break; + default: + throw new UnsupportedOperationException("Cannot upgrade from configuration version : " + + configVersion); + } + } + } + catch (SQLException se) + { + throw new StoreException("Failed to upgrade database", se); + } + finally + { + JdbcUtils.closeConnection(connection, getLogger()); + } + + } + + private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException + { + @SuppressWarnings("serial") + Map<String, String> defaultExchanges = new HashMap<String, String>() + {{ + put("amq.direct", "direct"); + put("amq.topic", "topic"); + put("amq.fanout", "fanout"); + put("amq.match", "headers"); + }}; + + Connection connection = newConnection(); + try + { + String virtualHostName = parent.getName(); + UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); + + String stringifiedConfigVersion = "0." + DEFAULT_CONFIG_VERSION; + + boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); + if(tableExists) + { + int configVersion = getConfigVersion(connection); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader read existing config version " + configVersion); + } + + stringifiedConfigVersion = "0." + configVersion; + } + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); + virtualHostAttributes.put("name", virtualHostName); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); + insertConfiguredObject(virtualHostRecord, connection); + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion); + } + + Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>(); + List<UUID> others = new ArrayList<UUID>(); + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + + PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID id = UUID.fromString(rs.getString(1)); + String objectType = rs.getString(2); + if ("VirtualHost".equals(objectType)) + { + continue; + } + Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); + + if(objectType.endsWith("Binding")) + { + bindingsToUpdate.put(id,attributes); + } + else + { + if (objectType.equals("Exchange")) + { + defaultExchanges.remove((String)attributes.get("name")); + } + others.add(id); + } + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); + try + { + for (UUID id : others) + { + stmt.setString(1, id.toString()); + stmt.setString(2, "VirtualHost"); + stmt.setString(3, virtualHostId.toString()); + stmt.execute(); + } + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) + { + stmt.setString(1, bindingEntry.getKey().toString()); + stmt.setString(2,"Queue"); + stmt.setString(3, bindingEntry.getValue().remove("queue").toString()); + stmt.execute(); + + stmt.setString(1, bindingEntry.getKey().toString()); + stmt.setString(2,"Exchange"); + stmt.setString(3, bindingEntry.getValue().remove("exchange").toString()); + stmt.execute(); + } + } + finally + { + stmt.close(); + } + + for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet()) + { + UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); + Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); + exchangeAttributes.put("name", defaultExchangeEntry.getKey()); + exchangeAttributes.put("type", defaultExchangeEntry.getValue()); + exchangeAttributes.put("lifetimePolicy", "PERMANENT"); + Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord); + ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents); + insertConfiguredObject(exchangeRecord, connection); + } + + stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) + { + stmt.setString(1, "Binding"); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue()); + + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt.setBinaryStream(2, bis, attributesAsBytes.length); + stmt.setString(3, bindingEntry.getKey().toString()); + stmt.execute(); + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + stmt.close(); + } + + if (tableExists) + { + dropConfigVersionTable(connection); + } + + connection.commit(); + } + catch(SQLException e) + { + try + { + connection.rollback(); + } + catch(SQLException re) + { + } + throw e; + } + finally + { + connection.close(); + } + } + + protected abstract Logger getLogger(); + + protected abstract String getSqlBlobType(); + + protected abstract String getSqlVarBinaryType(int size); + + protected abstract String getSqlBigIntType(); + + + protected void createOrOpenConfigurationStoreDatabase() throws StoreException + { + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + + createConfiguredObjectsTable(conn); + createConfiguredObjectHierarchyTable(conn); + } + catch (SQLException e) + { + throw new StoreException("Unable to open configuration tables", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } + } + + private void dropConfigVersionTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(DROP_CONFIG_VERSION_TABLE); + } + finally + { + stmt.close(); + } + } + } + + private void createConfiguredObjectsTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))"); + } + finally + { + stmt.close(); + } + } + } + + private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))"); + } + finally + { + stmt.close(); + } + } + } + + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + return JdbcUtils.tableExists(tableName, conn); + } + + private int getConfigVersion(Connection conn) throws SQLException + { + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); + try + { + + if(rs.next()) + { + return rs.getInt(1); + } + return DEFAULT_CONFIG_VERSION; + } + finally + { + rs.close(); + } + + } + finally + { + stmt.close(); + } + + } + + @Override + public void create(ConfiguredObjectRecord object) throws StoreException + { + checkConfigurationStoreOpen(); + try + { + Connection conn = newConnection(); + try + { + insertConfiguredObject(object, conn); + conn.commit(); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error creating ConfiguredObject " + object); + } + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions enabled. + */ + protected Connection newAutoCommitConnection() throws SQLException + { + final Connection connection = newConnection(); + try + { + connection.setAutoCommit(true); + } + catch (SQLException sqlEx) + { + + try + { + connection.close(); + } + finally + { + throw sqlEx; + } + } + + return connection; + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions disabled. + */ + protected Connection newConnection() throws SQLException + { + final Connection connection = getConnection(); + try + { + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + } + catch (SQLException sqlEx) + { + try + { + connection.close(); + } + finally + { + throw sqlEx; + } + } + return connection; + } + + protected abstract Connection getConnection() throws SQLException; + + private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException + { + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + boolean exists; + try + { + exists = rs.next(); + + } + finally + { + rs.close(); + } + // If we don't have any data in the result set then we can add this configured object + if (!exists) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + final Map<String, Object> attributes = configuredObject.getAttributes(); + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); + + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + + writeHierarchy(configuredObject, conn); + } + + } + finally + { + stmt.close(); + } + + } + catch (JsonMappingException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (SQLException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + } + + @Override + public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException + { + checkConfigurationStoreOpen(); + + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + try + { + + Connection conn = newAutoCommitConnection(); + try + { + for(ConfiguredObjectRecord record : objects) + { + if(removeConfiguredObject(record.getId(), conn) != 0) + { + removed.add(record.getId()); + } + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e); + } + return removed.toArray(new UUID[removed.size()]); + } + + private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException + { + final int results; + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); + try + { + stmt.setString(1, id.toString()); + results = stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + stmt.setString(1, id.toString()); + stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + + return results; + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException + { + checkConfigurationStoreOpen(); + try + { + Connection conn = newConnection(); + try + { + for(ConfiguredObjectRecord record : records) + { + updateConfiguredObject(record, createIfNecessary, conn); + } + conn.commit(); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e); + } + } + + private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, + boolean createIfNecessary, + Connection conn) + throws SQLException, StoreException + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + stmt2.setString(1, configuredObject.getType()); + if (configuredObject.getAttributes() != null) + { + byte[] attributesAsBytes = objectMapper.writeValueAsBytes( + configuredObject.getAttributes()); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt2.setBinaryStream(2, bis, attributesAsBytes.length); + } + else + { + stmt2.setNull(2, Types.BLOB); + } + stmt2.setString(3, configuredObject.getId().toString()); + stmt2.execute(); + } + finally + { + stmt2.close(); + } + } + else if(createIfNecessary) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + final Map<String, Object> attributes = configuredObject.getAttributes(); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + writeHierarchy(configuredObject, conn); + } + } + finally + { + rs.close(); + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + finally + { + stmt.close(); + } + } + + private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); + try + { + for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet()) + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, parentEntry.getKey()); + insertStmt.setString(3, parentEntry.getValue().getId().toString()); + + insertStmt.execute(); + } + } + finally + { + insertStmt.close(); + } + } + + protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; + + @Override + public void onDelete() + { + // TODO should probably check we are closed + try + { + Connection conn = newAutoCommitConnection(); + try + { + List<String> tables = new ArrayList<String>(); + tables.addAll(CONFIGURATION_STORE_TABLE_NAMES); + + for (String tableName : tables) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("DROP TABLE " + tableName); + } + catch(SQLException e) + { + getLogger().warn("Failed to drop table '" + tableName + "' :" + e); + } + finally + { + stmt.close(); + } + } + } + finally + { + conn.close(); + } + } + catch(SQLException e) + { + getLogger().error("Exception while deleting store tables", e); + } + } + + private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord + { + + private final UUID _id; + private final String _type; + private final Map<String, Object> _attributes; + private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); + + private ConfiguredObjectRecordImpl(final UUID id, + final String type, + final Map<String, Object> attributes) + { + _id = id; + _type = type; + _attributes = Collections.unmodifiableMap(attributes); + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public String getType() + { + return _type; + } + + private void addParent(String parentType, ConfiguredObjectRecord parent) + { + _parents.put(parentType, parent); + } + + @Override + public Map<String, Object> getAttributes() + { + return _attributes; + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return Collections.unmodifiableMap(_parents); + } + + @Override + public String toString() + { + return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents=" + + _parents + "]"; + } + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 24563bae61..7487315000 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -24,54 +24,32 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.Version; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.JsonSerializer; -import org.codehaus.jackson.map.Module; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializerProvider; -import org.codehaus.jackson.map.module.SimpleModule; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; -abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore +public abstract class AbstractJDBCMessageStore implements MessageStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; - private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES"; @@ -82,16 +60,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; - private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; - private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY"; - - private static final int DEFAULT_CONFIG_VERSION = 0; - - public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME)); public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME, META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, - QUEUE_ENTRY_TABLE_NAME, - XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME)); + QUEUE_ENTRY_TABLE_NAME, + XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME)); private static final int DB_VERSION = 8; @@ -103,19 +75,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?"; - private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; - private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME; - - private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME - + "( message_id, content ) values (?, ?)"; + + "( message_id, content ) values (?, ?)"; private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME - + " WHERE message_id = ?"; + + " WHERE message_id = ?"; private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME - + " WHERE message_id = ?"; + + " WHERE message_id = ?"; private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)"; private static final String SELECT_FROM_META_DATA = @@ -126,282 +94,63 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, private static final String INSERT_INTO_XIDS = "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)"; private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME - + " WHERE format = ? and global_id = ? and branch_id = ?"; + + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME; private static final String INSERT_INTO_XID_ACTIONS = "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " + "queue_id, message_id ) values (?,?,?,?,?,?) "; private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME - + " WHERE format = ? and global_id = ? and branch_id = ?"; + + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XID_ACTIONS = "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; - private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME - + " ( id, object_type, attributes) VALUES (?,?,?)"; - private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME - + " set object_type =?, attributes = ? where id = ?"; - private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME - + " where id = ?"; - private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME - + " where id = ?"; - private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; - - - private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME - + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"; - - private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME - + " where child_id = ?"; - private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME; - - protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); - - - private static final Module _module; - static - { - SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null)); - - final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>() - { - @Override - public void serialize(final ConfiguredObject value, - final JsonGenerator jgen, - final SerializerProvider provider) - throws IOException, JsonProcessingException - { - jgen.writeString(value.getId().toString()); - } - }; - module.addSerializer(ConfiguredObject.class, serializer); - - _module = module; - } protected final EventManager _eventManager = new EventManager(); - private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); - private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + protected abstract boolean isMessageStoreOpen(); - private boolean _initialized; + protected abstract void checkMessageStoreOpen(); - @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + protected void setMaximumMessageId() { - if (_configurationStoreOpen.compareAndSet(false, true)) + visitMessages(new MessageHandler() { - initialiseIfNecessary(parent.getName(), storeSettings); - try - { - createOrOpenConfigurationStoreDatabase(); - upgradeIfVersionTableExists(parent); - } - catch(SQLException e) - { - throw new StoreException("Cannot create databases or upgrade", e); - } - } - } - - private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings) - { - if (!_initialized) - { - try - { - implementationSpecificConfiguration(virtualHostName, storeSettings); - } - catch (ClassNotFoundException e) - { - throw new StoreException("Cannot find driver class", e); - } - catch (SQLException e) - { - throw new StoreException("Unexpected exception occured", e); - } - _initialized = true; - } - } - - @Override - public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) - { - checkConfigurationStoreOpen(); - - try - { - handler.begin(); - doVisitAllConfiguredObjectRecords(handler); - handler.end(); - } - catch (SQLException e) - { - throw new StoreException("Cannot visit configured object records", e); - } - - } - - private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException - { - Connection conn = newAutoCommitConnection(); - Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); - final ObjectMapper objectMapper = new ObjectMapper(); - try - { - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); - try - { - ResultSet rs = stmt.executeQuery(); - try - { - while (rs.next()) - { - String id = rs.getString(1); - String objectType = rs.getString(2); - String attributes = getBlobAsString(rs, 3); - final ConfiguredObjectRecordImpl configuredObjectRecord = - new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, - objectMapper.readValue(attributes, Map.class)); - configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); - - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonParseException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); - try + @Override + public boolean handle(StoredMessage<?> storedMessage) { - ResultSet rs = stmt.executeQuery(); - try - { - while (rs.next()) - { - UUID childId = UUID.fromString(rs.getString(1)); - String parentType = rs.getString(2); - UUID parentId = UUID.fromString(rs.getString(3)); - - ConfiguredObjectRecordImpl child = configuredObjects.get(childId); - ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); - - if(child != null && parent != null) - { - child.addParent(parentType, parent); - } - } - } - finally + long id = storedMessage.getMessageNumber(); + if (_messageId.get() < id) { - rs.close(); + _messageId.set(id); } + return true; } - finally - { - stmt.close(); - } - - } - finally - { - conn.close(); - } - - for(ConfiguredObjectRecord record : configuredObjects.values()) - { - boolean shoudlContinue = handler.handle(record); - if (!shoudlContinue) - { - break; - } - } - } - - private void checkConfigurationStoreOpen() - { - if (!_configurationStoreOpen.get()) - { - throw new IllegalStateException("Configuration store is not open"); - } + }); } - private void checkMessageStoreOpen() + protected void upgrade(ConfiguredObject<?> parent) throws StoreException { - if (!_messageStoreOpen.get()) - { - throw new IllegalStateException("Message store is not open"); - } - } - - private void upgradeIfVersionTableExists(ConfiguredObject<?> parent) - throws SQLException { - Connection conn = newAutoCommitConnection(); + Connection conn = null; try { + conn = newAutoCommitConnection(); if (tableExists(DB_VERSION_TABLE_NAME, conn)) { upgradeIfNecessary(parent); } } - finally + catch (SQLException e) { - if (conn != null) - { - conn.close(); - } + throw new StoreException("Failed to upgrade database", e); } - } - - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) - { - if (_messageStoreOpen.compareAndSet(false, true)) + finally { - initialiseIfNecessary(parent.getName(), messageStoreSettings); - try - { - createOrOpenMessageStoreDatabase(); - upgradeIfNecessary(parent); - - visitMessages(new MessageHandler() - { - @Override - public boolean handle(StoredMessage<?> storedMessage) - { - long id = storedMessage.getMessageNumber(); - if (_messageId.get() < id) - { - _messageId.set(id); - } - return true; - } - }); - } - catch (SQLException e) - { - throw new StoreException("Unable to activate message store ", e); - } + JdbcUtils.closeConnection(conn, getLogger()); } } - protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException + private void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException { Connection conn = newAutoCommitConnection(); try @@ -423,7 +172,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, case 6: upgradeFromV6(); case 7: - upgradeFromV7(parent); + upgradeFromV7(); case DB_VERSION: return; default: @@ -447,214 +196,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } - private void upgradeFromV6() throws SQLException + private void upgradeFromV7() { - updateDbVersion(7); } - - private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException + private void upgradeFromV6() throws SQLException { - @SuppressWarnings("serial") - Map<String, String> defaultExchanges = new HashMap<String, String>() - {{ - put("amq.direct", "direct"); - put("amq.topic", "topic"); - put("amq.fanout", "fanout"); - put("amq.match", "headers"); - }}; - - Connection connection = newConnection(); - try - { - String virtualHostName = parent.getName(); - UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); - - String stringifiedConfigVersion = BrokerModel.MODEL_VERSION; - boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); - if(tableExists) - { - int configVersion = getConfigVersion(connection); - if (getLogger().isDebugEnabled()) - { - getLogger().debug("Upgrader read existing config version " + configVersion); - } - - stringifiedConfigVersion = "0." + configVersion; - } - - Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); - virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); - virtualHostAttributes.put("name", virtualHostName); - - ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); - insertConfiguredObject(virtualHostRecord, connection); - - if (getLogger().isDebugEnabled()) - { - getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion); - } - - Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>(); - List<UUID> others = new ArrayList<UUID>(); - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - - PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); - try - { - ResultSet rs = stmt.executeQuery(); - try - { - while (rs.next()) - { - UUID id = UUID.fromString(rs.getString(1)); - String objectType = rs.getString(2); - if ("VirtualHost".equals(objectType)) - { - continue; - } - Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); - - if(objectType.endsWith("Binding")) - { - bindingsToUpdate.put(id,attributes); - } - else - { - if (objectType.equals("Exchange")) - { - defaultExchanges.remove((String)attributes.get("name")); - } - others.add(id); - } - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonParseException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); - try - { - for (UUID id : others) - { - stmt.setString(1, id.toString()); - stmt.setString(2, "VirtualHost"); - stmt.setString(3, virtualHostId.toString()); - stmt.execute(); - } - for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) - { - stmt.setString(1, bindingEntry.getKey().toString()); - stmt.setString(2,"Queue"); - stmt.setString(3, bindingEntry.getValue().remove("queue").toString()); - stmt.execute(); - - stmt.setString(1, bindingEntry.getKey().toString()); - stmt.setString(2,"Exchange"); - stmt.setString(3, bindingEntry.getValue().remove("exchange").toString()); - stmt.execute(); - } - } - finally - { - stmt.close(); - } - - for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet()) - { - UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); - Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); - exchangeAttributes.put("name", defaultExchangeEntry.getKey()); - exchangeAttributes.put("type", defaultExchangeEntry.getValue()); - exchangeAttributes.put("lifetimePolicy", "PERMANENT"); - Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord); - ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents); - insertConfiguredObject(exchangeRecord, connection); - } - - stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); - try - { - for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) - { - stmt.setString(1, "Binding"); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue()); - - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - stmt.setBinaryStream(2, bis, attributesAsBytes.length); - stmt.setString(3, bindingEntry.getKey().toString()); - stmt.execute(); - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - finally - { - stmt.close(); - } - stmt = connection.prepareStatement(UPDATE_DB_VERSION); - try - { - stmt.setInt(1, 8); - stmt.execute(); - } - finally - { - stmt.close(); - } - - if (tableExists) - { - dropConfigVersionTable(connection); - } - - connection.commit(); - } - catch(SQLException e) - { - try - { - connection.rollback(); - } - catch(SQLException re) - { - } - throw e; - } - finally - { - connection.close(); - } + updateDbVersion(7); } private void updateDbVersion(int newVersion) throws SQLException @@ -680,44 +228,36 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } } - protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException; + protected abstract Logger getLogger(); - abstract protected Logger getLogger(); + protected abstract String getSqlBlobType(); - abstract protected String getSqlBlobType(); - - abstract protected String getSqlVarBinaryType(int size); - - abstract protected String getSqlBigIntType(); - - @Override - public void upgradeStoreStructure() throws StoreException - { - // TODO acquire connection to the database using the attribute of the parents, - // run the upgrader in a transaction, close the connection. - } - - protected void createOrOpenMessageStoreDatabase() throws SQLException - { - Connection conn = newAutoCommitConnection(); + protected abstract String getSqlVarBinaryType(int size); - createVersionTable(conn); - createQueueEntryTable(conn); - createMetaDataTable(conn); - createMessageContentTable(conn); - createXidTable(conn); - createXidActionTable(conn); - conn.close(); - } + protected abstract String getSqlBigIntType(); - protected void createOrOpenConfigurationStoreDatabase() throws SQLException + protected void createOrOpenMessageStoreDatabase() throws StoreException { - Connection conn = newAutoCommitConnection(); - - createConfiguredObjectsTable(conn); - createConfiguredObjectHierarchyTable(conn); + Connection conn = null; + try + { + conn = newAutoCommitConnection(); - conn.close(); + createVersionTable(conn); + createQueueEntryTable(conn); + createMetaDataTable(conn); + createMessageContentTable(conn); + createXidTable(conn); + createXidActionTable(conn); + } + catch (SQLException e) + { + throw new StoreException("Failed to create message store tables", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } } private void createVersionTable(final Connection conn) throws SQLException @@ -747,56 +287,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } } - private void dropConfigVersionTable(final Connection conn) throws SQLException - { - if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute(DROP_CONFIG_VERSION_TABLE); - } - finally - { - stmt.close(); - } - } - } - - private void createConfiguredObjectsTable(final Connection conn) throws SQLException - { - if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME - + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))"); - } - finally - { - stmt.close(); - } - } - } - - private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException - { - if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME - + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))"); - } - finally - { - stmt.close(); - } - } - } - private void createQueueEntryTable(final Connection conn) throws SQLException { if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) @@ -805,7 +295,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, try { stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id " - + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )"); + + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )"); } finally { @@ -909,81 +399,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, protected boolean tableExists(final String tableName, final Connection conn) throws SQLException { - DatabaseMetaData metaData = conn.getMetaData(); - ResultSet rs = metaData.getTables(null, null, "%", null); - - try - { - - while(rs.next()) - { - final String table = rs.getString(3); - if(tableName.equalsIgnoreCase(table)) - { - return true; - } - } - return false; - } - finally - { - rs.close(); - } - } - - private int getConfigVersion(Connection conn) throws SQLException - { - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); - try - { - - if(rs.next()) - { - return rs.getInt(1); - } - return DEFAULT_CONFIG_VERSION; - } - finally - { - rs.close(); - } - - } - finally - { - stmt.close(); - } - - } - - public void closeMessageStore() - { - if (_messageStoreOpen.compareAndSet(true, false)) - { - if (!_configurationStoreOpen.get()) - { - doClose(); - } - } + return JdbcUtils.tableExists(tableName, conn); } @Override - public void closeConfigurationStore() - { - if (_configurationStoreOpen.compareAndSet(true, false)) - { - if (!_messageStoreOpen.get()) - { - doClose(); - } - } - } - - protected abstract void doClose(); - public StoredMessage addMessage(StorableMessageMetaData metaData) { checkMessageStoreOpen(); @@ -1058,30 +477,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } - - @Override - public void create(ConfiguredObjectRecord object) throws StoreException - { - checkConfigurationStoreOpen(); - try - { - Connection conn = newConnection(); - try - { - insertConfiguredObject(object, conn); - conn.commit(); - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error creating ConfiguredObject " + object); - } - } - /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED * isolation and with auto-commit transactions enabled. @@ -1137,6 +532,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, protected abstract Connection getConnection() throws SQLException; + @Override public Transaction newTransaction() { checkMessageStoreOpen(); @@ -1154,13 +550,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, if (getLogger().isDebugEnabled()) { getLogger().debug("Enqueuing message " - + messageId - + " on queue " - + queue.getName() - + " with id " + queue.getId() - + " [Connection" - + conn - + "]"); + + messageId + + " on queue " + + queue.getName() + + " with id " + queue.getId() + + " [Connection" + + conn + + "]"); } PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); @@ -1180,7 +576,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, { getLogger().error("Failed to enqueue: " + e.getMessage(), e); throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() - + " to database", e); + + " to database", e); } } @@ -1205,13 +601,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, if(results != 1) { throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName() - + " with id " + queue.getId()); + + " with id " + queue.getId()); } if (getLogger().isDebugEnabled()) { getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName() - + " with id " + queue.getId()); + + " with id " + queue.getId()); } } finally @@ -1224,7 +620,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, { getLogger().error("Failed to dequeue: " + e.getMessage(), e); throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName() - + " with id " + queue.getId() + " from database", e); + + " with id " + queue.getId() + " from database", e); } } @@ -1420,7 +816,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData) - throws SQLException + throws SQLException { if(getLogger().isDebugEnabled()) { @@ -1606,12 +1002,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } catch (SQLException e) { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e); } finally { - closePreparedStatement(stmt); + JdbcUtils.closePreparedStatement(stmt, getLogger()); } } @@ -1640,7 +1036,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, if (offset > size) { throw new StoreException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); + + " for message id " + messageId + "!"); } @@ -1662,13 +1058,14 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closePreparedStatement(stmt); - closeConnection(conn); + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closeConnection(conn, getLogger()); } } + @Override public boolean isPersistent() { return true; @@ -1904,7 +1301,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger()); } return StoreFuture.IMMEDIATE_FUTURE; } @@ -1927,7 +1324,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, { storeMetaData(conn, _messageId, _metaData); AbstractJDBCMessageStore.this.addContent(conn, _messageId, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); } finally { @@ -1948,310 +1345,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } } - protected void closeConnection(final Connection conn) - { - if(conn != null) - { - try - { - conn.close(); - } - catch (SQLException e) - { - getLogger().error("Problem closing connection", e); - } - } - } - - protected void closePreparedStatement(final PreparedStatement stmt) - { - if (stmt != null) - { - try - { - stmt.close(); - } - catch(SQLException e) - { - getLogger().error("Problem closing prepared statement", e); - } - } - } - + @Override public void addEventListener(EventListener eventListener, Event... events) { _eventManager.addEventListener(eventListener, events); } - private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException - { - try - { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); - try - { - stmt.setString(1, configuredObject.getId().toString()); - ResultSet rs = stmt.executeQuery(); - boolean exists; - try - { - exists = rs.next(); - - } - finally - { - rs.close(); - } - // If we don't have any data in the result set then we can add this configured object - if (!exists) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); - try - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, configuredObject.getType()); - if(configuredObject.getAttributes() == null) - { - insertStmt.setNull(3, Types.BLOB); - } - else - { - final Map<String, Object> attributes = configuredObject.getAttributes(); - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); - - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); - } - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - - writeHierarchy(configuredObject, conn); - } - - } - finally - { - stmt.close(); - } - - } - catch (JsonMappingException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (SQLException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - } - @Override - public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException - { - checkConfigurationStoreOpen(); - - Collection<UUID> removed = new ArrayList<UUID>(objects.length); - try - { - - Connection conn = newAutoCommitConnection(); - try - { - for(ConfiguredObjectRecord record : objects) - { - if(removeConfiguredObject(record.getId(), conn) != 0) - { - removed.add(record.getId()); - } - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e); - } - return removed.toArray(new UUID[removed.size()]); - } - - private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException - { - final int results; - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); - try - { - stmt.setString(1, id.toString()); - results = stmt.executeUpdate(); - } - finally - { - stmt.close(); - } - stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY); - try - { - stmt.setString(1, id.toString()); - stmt.executeUpdate(); - } - finally - { - stmt.close(); - } - - return results; - } - - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - checkConfigurationStoreOpen(); - try - { - Connection conn = newConnection(); - try - { - for(ConfiguredObjectRecord record : records) - { - updateConfiguredObject(record, createIfNecessary, conn); - } - conn.commit(); - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e); - } - } - - private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, - boolean createIfNecessary, - Connection conn) - throws SQLException, StoreException - { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); - try - { - stmt.setString(1, configuredObject.getId().toString()); - ResultSet rs = stmt.executeQuery(); - try - { - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - if (rs.next()) - { - PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); - try - { - stmt2.setString(1, configuredObject.getType()); - if (configuredObject.getAttributes() != null) - { - byte[] attributesAsBytes = objectMapper.writeValueAsBytes( - configuredObject.getAttributes()); - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - stmt2.setBinaryStream(2, bis, attributesAsBytes.length); - } - else - { - stmt2.setNull(2, Types.BLOB); - } - stmt2.setString(3, configuredObject.getId().toString()); - stmt2.execute(); - } - finally - { - stmt2.close(); - } - } - else if(createIfNecessary) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); - try - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, configuredObject.getType()); - if(configuredObject.getAttributes() == null) - { - insertStmt.setNull(3, Types.BLOB); - } - else - { - final Map<String, Object> attributes = configuredObject.getAttributes(); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); - } - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - writeHierarchy(configuredObject, conn); - } - } - finally - { - rs.close(); - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } - finally - { - stmt.close(); - } - } - - private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); - try - { - for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet()) - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, parentEntry.getKey()); - insertStmt.setString(3, parentEntry.getValue().getId().toString()); - - insertStmt.execute(); - } - } - finally - { - insertStmt.close(); - } - } - public void visitMessages(MessageHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2298,10 +1398,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); } } + @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2342,10 +1443,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); } } + @Override public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2422,8 +1524,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - enqueues.toArray(new RecordImpl[enqueues.size()]), - dequeues.toArray(new RecordImpl[dequeues.size()]))) + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()]))) { break; } @@ -2437,13 +1539,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); } } - - protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; - protected abstract void storedSizeChange(int storeSizeIncrease); @Override @@ -2456,7 +1555,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, try { List<String> tables = new ArrayList<String>(); - tables.addAll(CONFIGURATION_STORE_TABLE_NAMES); tables.addAll(MESSAGE_STORE_TABLE_NAMES); for (String tableName : tables) @@ -2488,57 +1586,4 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } - private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord - { - - private final UUID _id; - private final String _type; - private final Map<String, Object> _attributes; - private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); - - private ConfiguredObjectRecordImpl(final UUID id, - final String type, - final Map<String, Object> attributes) - { - _id = id; - _type = type; - _attributes = Collections.unmodifiableMap(attributes); - } - - @Override - public UUID getId() - { - return _id; - } - - @Override - public String getType() - { - return _type; - } - - private void addParent(String parentType, ConfiguredObjectRecord parent) - { - _parents.put(parentType, parent); - } - - @Override - public Map<String, Object> getAttributes() - { - return _attributes; - } - - @Override - public Map<String, ConfiguredObjectRecord> getParents() - { - return Collections.unmodifiableMap(_parents); - } - - @Override - public String toString() - { - return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents=" - + _parents + "]"; - } - } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java new file mode 100644 index 0000000000..a26e478a50 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + + +import org.apache.log4j.Logger; + +public class JdbcUtils +{ + public static void closeConnection(final Connection conn, final Logger logger) + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + logger.error("Problem closing connection", e); + } + } + } + + public static void closePreparedStatement(final PreparedStatement stmt, final Logger logger) + { + if (stmt != null) + { + try + { + stmt.close(); + } + catch(SQLException e) + { + logger.error("Problem closing prepared statement", e); + } + } + } + + public static boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + DatabaseMetaData metaData = conn.getMetaData(); + ResultSet rs = metaData.getTables(null, null, "%", null); + + try + { + + while(rs.next()) + { + final String table = rs.getString(3); + if(tableName.equalsIgnoreCase(table)) + { + return true; + } + } + return false; + } + finally + { + rs.close(); + } + } +} |