summaryrefslogtreecommitdiff
path: root/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
committerKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
commit73d704196c485e9709b649ce5ae64497079ec0ba (patch)
treee06f5ab8fca5962f40bd9eb61a0b929294885576 /java/broker-core/src
parent553fb17bb27ad154628c11ba5b479563686dad27 (diff)
downloadqpid-python-73d704196c485e9709b649ce5ae64497079ec0ba.tar.gz
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with another persistent store implementation. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java1013
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java1127
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java86
3 files changed, 1185 insertions, 1041 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
new file mode 100644
index 0000000000..fb256abbf5
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
@@ -0,0 +1,1013 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.Module;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
+public abstract class AbstractJDBCConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
+{
+ private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
+
+ private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
+
+ private static final int DEFAULT_CONFIG_VERSION = 0;
+
+ public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME));
+
+ private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+ private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME;
+
+ private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id, object_type, attributes) VALUES (?,?,?)";
+ private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " set object_type =?, attributes = ? where id = ?";
+ private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+
+ private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " ( child_id, parent_type, parent_id) VALUES (?,?,?)";
+
+ private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " where child_id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
+
+ private static final Module _module;
+ static
+ {
+ SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
+
+ final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
+ {
+ @Override
+ public void serialize(final ConfiguredObject value,
+ final JsonGenerator jgen,
+ final SerializerProvider provider)
+ throws IOException, JsonProcessingException
+ {
+ jgen.writeString(value.getId().toString());
+ }
+ };
+ module.addSerializer(ConfiguredObject.class, serializer);
+
+ _module = module;
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+ {
+ checkConfigurationStoreOpen();
+
+ try
+ {
+ handler.begin();
+ doVisitAllConfiguredObjectRecords(handler);
+ handler.end();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Cannot visit configured object records", e);
+ }
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ final ConfiguredObjectRecordImpl configuredObjectRecord =
+ new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes, Map.class));
+ configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID childId = UUID.fromString(rs.getString(1));
+ String parentType = rs.getString(2);
+ UUID parentId = UUID.fromString(rs.getString(3));
+
+ ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+ ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+ if(child != null && parent != null)
+ {
+ child.addParent(parentType, parent);
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ for(ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shouldContinue = handler.handle(record);
+ if (!shouldContinue)
+ {
+ break;
+ }
+ }
+ }
+
+ protected abstract void checkConfigurationStoreOpen();
+
+ protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws StoreException
+ {
+ Connection connection = null;
+ try
+ {
+ connection = newConnection();
+
+ boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
+ if(tableExists)
+ {
+ int configVersion = getConfigVersion(connection);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Upgrader read existing config version " + configVersion);
+ }
+
+ switch(configVersion)
+ {
+
+ case 7:
+ upgradeFromV7(parent);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot upgrade from configuration version : "
+ + configVersion);
+ }
+ }
+ }
+ catch (SQLException se)
+ {
+ throw new StoreException("Failed to upgrade database", se);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(connection, getLogger());
+ }
+
+ }
+
+ private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
+ {
+ @SuppressWarnings("serial")
+ Map<String, String> defaultExchanges = new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }};
+
+ Connection connection = newConnection();
+ try
+ {
+ String virtualHostName = parent.getName();
+ UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
+
+ String stringifiedConfigVersion = "0." + DEFAULT_CONFIG_VERSION;
+
+ boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
+ if(tableExists)
+ {
+ int configVersion = getConfigVersion(connection);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Upgrader read existing config version " + configVersion);
+ }
+
+ stringifiedConfigVersion = "0." + configVersion;
+ }
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
+ virtualHostAttributes.put("name", virtualHostName);
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
+ insertConfiguredObject(virtualHostRecord, connection);
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion);
+ }
+
+ Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>();
+ List<UUID> others = new ArrayList<UUID>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+
+ PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID id = UUID.fromString(rs.getString(1));
+ String objectType = rs.getString(2);
+ if ("VirtualHost".equals(objectType))
+ {
+ continue;
+ }
+ Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
+
+ if(objectType.endsWith("Binding"))
+ {
+ bindingsToUpdate.put(id,attributes);
+ }
+ else
+ {
+ if (objectType.equals("Exchange"))
+ {
+ defaultExchanges.remove((String)attributes.get("name"));
+ }
+ others.add(id);
+ }
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ for (UUID id : others)
+ {
+ stmt.setString(1, id.toString());
+ stmt.setString(2, "VirtualHost");
+ stmt.setString(3, virtualHostId.toString());
+ stmt.execute();
+ }
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+ {
+ stmt.setString(1, bindingEntry.getKey().toString());
+ stmt.setString(2,"Queue");
+ stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
+ stmt.execute();
+
+ stmt.setString(1, bindingEntry.getKey().toString());
+ stmt.setString(2,"Exchange");
+ stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
+ stmt.execute();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
+ Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
+ exchangeAttributes.put("name", defaultExchangeEntry.getKey());
+ exchangeAttributes.put("type", defaultExchangeEntry.getValue());
+ exchangeAttributes.put("lifetimePolicy", "PERMANENT");
+ Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord);
+ ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents);
+ insertConfiguredObject(exchangeRecord, connection);
+ }
+
+ stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+ {
+ stmt.setString(1, "Binding");
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt.setBinaryStream(2, bis, attributesAsBytes.length);
+ stmt.setString(3, bindingEntry.getKey().toString());
+ stmt.execute();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ if (tableExists)
+ {
+ dropConfigVersionTable(connection);
+ }
+
+ connection.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch(SQLException re)
+ {
+ }
+ throw e;
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ protected abstract Logger getLogger();
+
+ protected abstract String getSqlBlobType();
+
+ protected abstract String getSqlVarBinaryType(int size);
+
+ protected abstract String getSqlBigIntType();
+
+
+ protected void createOrOpenConfigurationStoreDatabase() throws StoreException
+ {
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ createConfiguredObjectsTable(conn);
+ createConfiguredObjectHierarchyTable(conn);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to open configuration tables", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+ }
+
+ private void dropConfigVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(DROP_CONFIG_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createConfiguredObjectsTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ return JdbcUtils.tableExists(tableName, conn);
+ }
+
+ private int getConfigVersion(Connection conn) throws SQLException
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+ try
+ {
+
+ if(rs.next())
+ {
+ return rs.getInt(1);
+ }
+ return DEFAULT_CONFIG_VERSION;
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+ @Override
+ public void create(ConfiguredObjectRecord object) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ insertConfiguredObject(object, conn);
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error creating ConfiguredObject " + object);
+ }
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions enabled.
+ */
+ protected Connection newAutoCommitConnection() throws SQLException
+ {
+ final Connection connection = newConnection();
+ try
+ {
+ connection.setAutoCommit(true);
+ }
+ catch (SQLException sqlEx)
+ {
+
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+
+ return connection;
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions disabled.
+ */
+ protected Connection newConnection() throws SQLException
+ {
+ final Connection connection = getConnection();
+ try
+ {
+ connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+ catch (SQLException sqlEx)
+ {
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+ return connection;
+ }
+
+ protected abstract Connection getConnection() throws SQLException;
+
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
+ {
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ boolean exists;
+ try
+ {
+ exists = rs.next();
+
+ }
+ finally
+ {
+ rs.close();
+ }
+ // If we don't have any data in the result set then we can add this configured object
+ if (!exists)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+
+ writeHierarchy(configuredObject, conn);
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ try
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : objects)
+ {
+ if(removeConfiguredObject(record.getId(), conn) != 0)
+ {
+ removed.add(record.getId());
+ }
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+ {
+ final int results;
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ stmt.setString(1, id.toString());
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ return results;
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : records)
+ {
+ updateConfiguredObject(record, createIfNecessary, conn);
+ }
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
+ }
+
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+ boolean createIfNecessary,
+ Connection conn)
+ throws SQLException, StoreException
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
+ {
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
+ }
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ writeHierarchy(configuredObject, conn);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, parentEntry.getKey());
+ insertStmt.setString(3, parentEntry.getValue().getId().toString());
+
+ insertStmt.execute();
+ }
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+
+ protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
+
+ @Override
+ public void onDelete()
+ {
+ // TODO should probably check we are closed
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ List<String> tables = new ArrayList<String>();
+ tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
+
+ for (String tableName : tables)
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("DROP TABLE " + tableName);
+ }
+ catch(SQLException e)
+ {
+ getLogger().warn("Failed to drop table '" + tableName + "' :" + e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch(SQLException e)
+ {
+ getLogger().error("Exception while deleting store tables", e);
+ }
+ }
+
+ private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
+ {
+
+ private final UUID _id;
+ private final String _type;
+ private final Map<String, Object> _attributes;
+ private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
+
+ private ConfiguredObjectRecordImpl(final UUID id,
+ final String type,
+ final Map<String, Object> attributes)
+ {
+ _id = id;
+ _type = type;
+ _attributes = Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public String getType()
+ {
+ return _type;
+ }
+
+ private void addParent(String parentType, ConfiguredObjectRecord parent)
+ {
+ _parents.put(parentType, parent);
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return Collections.unmodifiableMap(_parents);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents="
+ + _parents + "]";
+ }
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 24563bae61..7487315000 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -24,54 +24,32 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.Version;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.JsonSerializer;
-import org.codehaus.jackson.map.Module;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializerProvider;
-import org.codehaus.jackson.map.module.SimpleModule;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore
+public abstract class AbstractJDBCMessageStore implements MessageStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
- private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -82,16 +60,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
- private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
- private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
-
- private static final int DEFAULT_CONFIG_VERSION = 0;
-
- public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME));
public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME,
META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME,
- QUEUE_ENTRY_TABLE_NAME,
- XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
+ QUEUE_ENTRY_TABLE_NAME,
+ XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
private static final int DB_VERSION = 8;
@@ -103,19 +75,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
- private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
- private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME;
-
-
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
- + "( message_id, content ) values (?, ?)";
+ + "( message_id, content ) values (?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
- + " WHERE message_id = ?";
+ + " WHERE message_id = ?";
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
- + " WHERE message_id = ?";
+ + " WHERE message_id = ?";
private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
private static final String SELECT_FROM_META_DATA =
@@ -126,282 +94,63 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
private static final String INSERT_INTO_XIDS =
"INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
- + " WHERE format = ? and global_id = ? and branch_id = ?";
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
private static final String INSERT_INTO_XID_ACTIONS =
"INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
"queue_id, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
- + " WHERE format = ? and global_id = ? and branch_id = ?";
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
"SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
- private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id, object_type, attributes) VALUES (?,?,?)";
- private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " set object_type =?, attributes = ? where id = ?";
- private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
- + " where id = ?";
- private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
- + " where id = ?";
- private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
-
-
- private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
- + " ( child_id, parent_type, parent_id) VALUES (?,?,?)";
-
- private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
- + " where child_id = ?";
- private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
-
- protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
-
- private static final Module _module;
- static
- {
- SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
-
- final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
- {
- @Override
- public void serialize(final ConfiguredObject value,
- final JsonGenerator jgen,
- final SerializerProvider provider)
- throws IOException, JsonProcessingException
- {
- jgen.writeString(value.getId().toString());
- }
- };
- module.addSerializer(ConfiguredObject.class, serializer);
-
- _module = module;
- }
protected final EventManager _eventManager = new EventManager();
- private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
- private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ protected abstract boolean isMessageStoreOpen();
- private boolean _initialized;
+ protected abstract void checkMessageStoreOpen();
- @Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ protected void setMaximumMessageId()
{
- if (_configurationStoreOpen.compareAndSet(false, true))
+ visitMessages(new MessageHandler()
{
- initialiseIfNecessary(parent.getName(), storeSettings);
- try
- {
- createOrOpenConfigurationStoreDatabase();
- upgradeIfVersionTableExists(parent);
- }
- catch(SQLException e)
- {
- throw new StoreException("Cannot create databases or upgrade", e);
- }
- }
- }
-
- private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
- {
- if (!_initialized)
- {
- try
- {
- implementationSpecificConfiguration(virtualHostName, storeSettings);
- }
- catch (ClassNotFoundException e)
- {
- throw new StoreException("Cannot find driver class", e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unexpected exception occured", e);
- }
- _initialized = true;
- }
- }
-
- @Override
- public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
- {
- checkConfigurationStoreOpen();
-
- try
- {
- handler.begin();
- doVisitAllConfiguredObjectRecords(handler);
- handler.end();
- }
- catch (SQLException e)
- {
- throw new StoreException("Cannot visit configured object records", e);
- }
-
- }
-
- private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
- final ObjectMapper objectMapper = new ObjectMapper();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
- try
- {
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- String id = rs.getString(1);
- String objectType = rs.getString(2);
- String attributes = getBlobAsString(rs, 3);
- final ConfiguredObjectRecordImpl configuredObjectRecord =
- new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
- objectMapper.readValue(attributes, Map.class));
- configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
- try
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
{
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- UUID childId = UUID.fromString(rs.getString(1));
- String parentType = rs.getString(2);
- UUID parentId = UUID.fromString(rs.getString(3));
-
- ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
- ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
- if(child != null && parent != null)
- {
- child.addParent(parentType, parent);
- }
- }
- }
- finally
+ long id = storedMessage.getMessageNumber();
+ if (_messageId.get() < id)
{
- rs.close();
+ _messageId.set(id);
}
+ return true;
}
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- conn.close();
- }
-
- for(ConfiguredObjectRecord record : configuredObjects.values())
- {
- boolean shoudlContinue = handler.handle(record);
- if (!shoudlContinue)
- {
- break;
- }
- }
- }
-
- private void checkConfigurationStoreOpen()
- {
- if (!_configurationStoreOpen.get())
- {
- throw new IllegalStateException("Configuration store is not open");
- }
+ });
}
- private void checkMessageStoreOpen()
+ protected void upgrade(ConfiguredObject<?> parent) throws StoreException
{
- if (!_messageStoreOpen.get())
- {
- throw new IllegalStateException("Message store is not open");
- }
- }
-
- private void upgradeIfVersionTableExists(ConfiguredObject<?> parent)
- throws SQLException {
- Connection conn = newAutoCommitConnection();
+ Connection conn = null;
try
{
+ conn = newAutoCommitConnection();
if (tableExists(DB_VERSION_TABLE_NAME, conn))
{
upgradeIfNecessary(parent);
}
}
- finally
+ catch (SQLException e)
{
- if (conn != null)
- {
- conn.close();
- }
+ throw new StoreException("Failed to upgrade database", e);
}
- }
-
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
- {
- if (_messageStoreOpen.compareAndSet(false, true))
+ finally
{
- initialiseIfNecessary(parent.getName(), messageStoreSettings);
- try
- {
- createOrOpenMessageStoreDatabase();
- upgradeIfNecessary(parent);
-
- visitMessages(new MessageHandler()
- {
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
- {
- long id = storedMessage.getMessageNumber();
- if (_messageId.get() < id)
- {
- _messageId.set(id);
- }
- return true;
- }
- });
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to activate message store ", e);
- }
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
- protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
+ private void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -423,7 +172,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
case 6:
upgradeFromV6();
case 7:
- upgradeFromV7(parent);
+ upgradeFromV7();
case DB_VERSION:
return;
default:
@@ -447,214 +196,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
- private void upgradeFromV6() throws SQLException
+ private void upgradeFromV7()
{
- updateDbVersion(7);
}
-
- private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
+ private void upgradeFromV6() throws SQLException
{
- @SuppressWarnings("serial")
- Map<String, String> defaultExchanges = new HashMap<String, String>()
- {{
- put("amq.direct", "direct");
- put("amq.topic", "topic");
- put("amq.fanout", "fanout");
- put("amq.match", "headers");
- }};
-
- Connection connection = newConnection();
- try
- {
- String virtualHostName = parent.getName();
- UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
-
- String stringifiedConfigVersion = BrokerModel.MODEL_VERSION;
- boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
- if(tableExists)
- {
- int configVersion = getConfigVersion(connection);
- if (getLogger().isDebugEnabled())
- {
- getLogger().debug("Upgrader read existing config version " + configVersion);
- }
-
- stringifiedConfigVersion = "0." + configVersion;
- }
-
- Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
- virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
- virtualHostAttributes.put("name", virtualHostName);
-
- ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
- insertConfiguredObject(virtualHostRecord, connection);
-
- if (getLogger().isDebugEnabled())
- {
- getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion);
- }
-
- Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>();
- List<UUID> others = new ArrayList<UUID>();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
-
- PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
- try
- {
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- UUID id = UUID.fromString(rs.getString(1));
- String objectType = rs.getString(2);
- if ("VirtualHost".equals(objectType))
- {
- continue;
- }
- Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
-
- if(objectType.endsWith("Binding"))
- {
- bindingsToUpdate.put(id,attributes);
- }
- else
- {
- if (objectType.equals("Exchange"))
- {
- defaultExchanges.remove((String)attributes.get("name"));
- }
- others.add(id);
- }
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
- try
- {
- for (UUID id : others)
- {
- stmt.setString(1, id.toString());
- stmt.setString(2, "VirtualHost");
- stmt.setString(3, virtualHostId.toString());
- stmt.execute();
- }
- for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
- {
- stmt.setString(1, bindingEntry.getKey().toString());
- stmt.setString(2,"Queue");
- stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
- stmt.execute();
-
- stmt.setString(1, bindingEntry.getKey().toString());
- stmt.setString(2,"Exchange");
- stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
- stmt.execute();
- }
- }
- finally
- {
- stmt.close();
- }
-
- for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
- {
- UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
- Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
- exchangeAttributes.put("name", defaultExchangeEntry.getKey());
- exchangeAttributes.put("type", defaultExchangeEntry.getValue());
- exchangeAttributes.put("lifetimePolicy", "PERMANENT");
- Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord);
- ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents);
- insertConfiguredObject(exchangeRecord, connection);
- }
-
- stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
- {
- stmt.setString(1, "Binding");
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt.setBinaryStream(2, bis, attributesAsBytes.length);
- stmt.setString(3, bindingEntry.getKey().toString());
- stmt.execute();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- finally
- {
- stmt.close();
- }
- stmt = connection.prepareStatement(UPDATE_DB_VERSION);
- try
- {
- stmt.setInt(1, 8);
- stmt.execute();
- }
- finally
- {
- stmt.close();
- }
-
- if (tableExists)
- {
- dropConfigVersionTable(connection);
- }
-
- connection.commit();
- }
- catch(SQLException e)
- {
- try
- {
- connection.rollback();
- }
- catch(SQLException re)
- {
- }
- throw e;
- }
- finally
- {
- connection.close();
- }
+ updateDbVersion(7);
}
private void updateDbVersion(int newVersion) throws SQLException
@@ -680,44 +228,36 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
}
- protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException;
+ protected abstract Logger getLogger();
- abstract protected Logger getLogger();
+ protected abstract String getSqlBlobType();
- abstract protected String getSqlBlobType();
-
- abstract protected String getSqlVarBinaryType(int size);
-
- abstract protected String getSqlBigIntType();
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- // TODO acquire connection to the database using the attribute of the parents,
- // run the upgrader in a transaction, close the connection.
- }
-
- protected void createOrOpenMessageStoreDatabase() throws SQLException
- {
- Connection conn = newAutoCommitConnection();
+ protected abstract String getSqlVarBinaryType(int size);
- createVersionTable(conn);
- createQueueEntryTable(conn);
- createMetaDataTable(conn);
- createMessageContentTable(conn);
- createXidTable(conn);
- createXidActionTable(conn);
- conn.close();
- }
+ protected abstract String getSqlBigIntType();
- protected void createOrOpenConfigurationStoreDatabase() throws SQLException
+ protected void createOrOpenMessageStoreDatabase() throws StoreException
{
- Connection conn = newAutoCommitConnection();
-
- createConfiguredObjectsTable(conn);
- createConfiguredObjectHierarchyTable(conn);
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
- conn.close();
+ createVersionTable(conn);
+ createQueueEntryTable(conn);
+ createMetaDataTable(conn);
+ createMessageContentTable(conn);
+ createXidTable(conn);
+ createXidActionTable(conn);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create message store tables", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
}
private void createVersionTable(final Connection conn) throws SQLException
@@ -747,56 +287,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
}
- private void dropConfigVersionTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(DROP_CONFIG_VERSION_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createConfiguredObjectsTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
- + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -805,7 +295,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
try
{
stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
- + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
+ + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
}
finally
{
@@ -909,81 +399,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
- DatabaseMetaData metaData = conn.getMetaData();
- ResultSet rs = metaData.getTables(null, null, "%", null);
-
- try
- {
-
- while(rs.next())
- {
- final String table = rs.getString(3);
- if(tableName.equalsIgnoreCase(table))
- {
- return true;
- }
- }
- return false;
- }
- finally
- {
- rs.close();
- }
- }
-
- private int getConfigVersion(Connection conn) throws SQLException
- {
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
- try
- {
-
- if(rs.next())
- {
- return rs.getInt(1);
- }
- return DEFAULT_CONFIG_VERSION;
- }
- finally
- {
- rs.close();
- }
-
- }
- finally
- {
- stmt.close();
- }
-
- }
-
- public void closeMessageStore()
- {
- if (_messageStoreOpen.compareAndSet(true, false))
- {
- if (!_configurationStoreOpen.get())
- {
- doClose();
- }
- }
+ return JdbcUtils.tableExists(tableName, conn);
}
@Override
- public void closeConfigurationStore()
- {
- if (_configurationStoreOpen.compareAndSet(true, false))
- {
- if (!_messageStoreOpen.get())
- {
- doClose();
- }
- }
- }
-
- protected abstract void doClose();
-
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
checkMessageStoreOpen();
@@ -1058,30 +477,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
-
- @Override
- public void create(ConfiguredObjectRecord object) throws StoreException
- {
- checkConfigurationStoreOpen();
- try
- {
- Connection conn = newConnection();
- try
- {
- insertConfiguredObject(object, conn);
- conn.commit();
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error creating ConfiguredObject " + object);
- }
- }
-
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
@@ -1137,6 +532,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
protected abstract Connection getConnection() throws SQLException;
+ @Override
public Transaction newTransaction()
{
checkMessageStoreOpen();
@@ -1154,13 +550,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
if (getLogger().isDebugEnabled())
{
getLogger().debug("Enqueuing message "
- + messageId
- + " on queue "
- + queue.getName()
- + " with id " + queue.getId()
- + " [Connection"
- + conn
- + "]");
+ + messageId
+ + " on queue "
+ + queue.getName()
+ + " with id " + queue.getId()
+ + " [Connection"
+ + conn
+ + "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
@@ -1180,7 +576,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
- + " to database", e);
+ + " to database", e);
}
}
@@ -1205,13 +601,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
if(results != 1)
{
throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
- + " with id " + queue.getId());
+ + " with id " + queue.getId());
}
if (getLogger().isDebugEnabled())
{
getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
- + " with id " + queue.getId());
+ + " with id " + queue.getId());
}
}
finally
@@ -1224,7 +620,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
- + " with id " + queue.getId() + " from database", e);
+ + " with id " + queue.getId() + " from database", e);
}
}
@@ -1420,7 +816,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
- throws SQLException
+ throws SQLException
{
if(getLogger().isDebugEnabled())
{
@@ -1606,12 +1002,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
catch (SQLException e)
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
- closePreparedStatement(stmt);
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
}
}
@@ -1640,7 +1036,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
if (offset > size)
{
throw new StoreException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
+ + " for message id " + messageId + "!");
}
@@ -1662,13 +1058,14 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closePreparedStatement(stmt);
- closeConnection(conn);
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
+ @Override
public boolean isPersistent()
{
return true;
@@ -1904,7 +1301,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger());
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@@ -1927,7 +1324,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
{
storeMetaData(conn, _messageId, _metaData);
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
{
@@ -1948,310 +1345,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
}
- protected void closeConnection(final Connection conn)
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- getLogger().error("Problem closing connection", e);
- }
- }
- }
-
- protected void closePreparedStatement(final PreparedStatement stmt)
- {
- if (stmt != null)
- {
- try
- {
- stmt.close();
- }
- catch(SQLException e)
- {
- getLogger().error("Problem closing prepared statement", e);
- }
- }
- }
-
+ @Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
}
- private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
- {
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- boolean exists;
- try
- {
- exists = rs.next();
-
- }
- finally
- {
- rs.close();
- }
- // If we don't have any data in the result set then we can add this configured object
- if (!exists)
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
-
- writeHierarchy(configuredObject, conn);
- }
-
- }
- finally
- {
- stmt.close();
- }
-
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- }
-
@Override
- public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
- {
- checkConfigurationStoreOpen();
-
- Collection<UUID> removed = new ArrayList<UUID>(objects.length);
- try
- {
-
- Connection conn = newAutoCommitConnection();
- try
- {
- for(ConfiguredObjectRecord record : objects)
- {
- if(removeConfiguredObject(record.getId(), conn) != 0)
- {
- removed.add(record.getId());
- }
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
- }
- return removed.toArray(new UUID[removed.size()]);
- }
-
- private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
- {
- final int results;
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
- try
- {
- stmt.setString(1, id.toString());
- results = stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
- stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
- try
- {
- stmt.setString(1, id.toString());
- stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
-
- return results;
- }
-
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
- {
- checkConfigurationStoreOpen();
- try
- {
- Connection conn = newConnection();
- try
- {
- for(ConfiguredObjectRecord record : records)
- {
- updateConfiguredObject(record, createIfNecessary, conn);
- }
- conn.commit();
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
- }
- }
-
- private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
- boolean createIfNecessary,
- Connection conn)
- throws SQLException, StoreException
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- if (rs.next())
- {
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
- }
- else if(createIfNecessary)
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- writeHierarchy(configuredObject, conn);
- }
- }
- finally
- {
- rs.close();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- finally
- {
- stmt.close();
- }
- }
-
- private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
- try
- {
- for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, parentEntry.getKey());
- insertStmt.setString(3, parentEntry.getValue().getId().toString());
-
- insertStmt.execute();
- }
- }
- finally
- {
- insertStmt.close();
- }
- }
-
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2298,10 +1398,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
+ @Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2342,10 +1443,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
+ @Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2422,8 +1524,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()])))
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()])))
{
break;
}
@@ -2437,13 +1539,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
-
- protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
-
protected abstract void storedSizeChange(int storeSizeIncrease);
@Override
@@ -2456,7 +1555,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
try
{
List<String> tables = new ArrayList<String>();
- tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
tables.addAll(MESSAGE_STORE_TABLE_NAMES);
for (String tableName : tables)
@@ -2488,57 +1586,4 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
- private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
- {
-
- private final UUID _id;
- private final String _type;
- private final Map<String, Object> _attributes;
- private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
-
- private ConfiguredObjectRecordImpl(final UUID id,
- final String type,
- final Map<String, Object> attributes)
- {
- _id = id;
- _type = type;
- _attributes = Collections.unmodifiableMap(attributes);
- }
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
- public String getType()
- {
- return _type;
- }
-
- private void addParent(String parentType, ConfiguredObjectRecord parent)
- {
- _parents.put(parentType, parent);
- }
-
- @Override
- public Map<String, Object> getAttributes()
- {
- return _attributes;
- }
-
- @Override
- public Map<String, ConfiguredObjectRecord> getParents()
- {
- return Collections.unmodifiableMap(_parents);
- }
-
- @Override
- public String toString()
- {
- return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents="
- + _parents + "]";
- }
- }
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java
new file mode 100644
index 0000000000..a26e478a50
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+
+import org.apache.log4j.Logger;
+
+public class JdbcUtils
+{
+ public static void closeConnection(final Connection conn, final Logger logger)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ logger.error("Problem closing connection", e);
+ }
+ }
+ }
+
+ public static void closePreparedStatement(final PreparedStatement stmt, final Logger logger)
+ {
+ if (stmt != null)
+ {
+ try
+ {
+ stmt.close();
+ }
+ catch(SQLException e)
+ {
+ logger.error("Problem closing prepared statement", e);
+ }
+ }
+ }
+
+ public static boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ DatabaseMetaData metaData = conn.getMetaData();
+ ResultSet rs = metaData.getTables(null, null, "%", null);
+
+ try
+ {
+
+ while(rs.next())
+ {
+ final String table = rs.getString(3);
+ if(tableName.equalsIgnoreCase(table))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+}