diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-10 09:10:51 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-10 09:10:51 +0000 |
commit | eaa8c11396b13c46c59c2030a23cc7763ecee9d7 (patch) | |
tree | 1035b7dd270a843436871ef4f321e956c5d220f3 /qpid/java/broker | |
parent | 934d23d90cb12c820ff71e54f2220991fd72c081 (diff) | |
download | qpid-python-eaa8c11396b13c46c59c2030a23cc7763ecee9d7.tar.gz |
QPID-4983 : [Java Broker] Move store implementations to broker plugins
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1501682 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
15 files changed, 15 insertions, 1525 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index dbb2ee8aee..3c83715305 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -25,9 +25,9 @@ <property name="module.genpom" value="true"/> <!-- Add dependencies to the broker pom for the broker-plugins and bdbstore modules --> - <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx"/> + <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx broker-plugins/jdbc-store broker-plugins/derby-store"/> <!-- Make them runtime dependencies, make bdbstore modules optional --> - <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone"/> + <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone -Sqpid-broker-plugins-jdbc-store=runtime -Sqpid-broker-plugins-derby-store=runtime"/> <import file="../module.xml"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java deleted file mode 100644 index ac310d02c9..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ /dev/null @@ -1,466 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.derby; - - -import java.io.File; -import java.sql.Blob; -import java.sql.CallableStatement; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; -import org.apache.qpid.util.FileUtils; - -/** - * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence - * mechanism. - * - */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore -{ - - private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - - private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - - public static final String MEMORY_STORE_LOCATION = ":memory:"; - - private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; - - public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - - public static final String TYPE = "DERBY"; - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - protected String _connectionURL; - - private String _storeLocation; - private Class<Driver> _driverClass; - - public DerbyMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - @Override - protected String getSqlBlobType() - { - return "blob"; - } - - @Override - protected String getSqlVarBinaryType(int size) - { - return "varchar("+size+") for bit data"; - } - - @Override - protected String getSqlBigIntType() - { - return "bigint"; - } - - protected void doClose() throws SQLException - { - try - { - Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); - // Shouldn't reach this point - shutdown=true should throw SQLException - conn.close(); - getLogger().error("Unable to shut down the store"); - } - catch (SQLException e) - { - if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { - //expected and represents a clean shutdown of this database only, do nothing. - } - else - { - getLogger().error("Exception whilst shutting down the store: " + e); - throw e; - } - } - } - - @Override - protected void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) - throws ClassNotFoundException - { - //Update to pick up QPID_WORK and use that as the default location not just derbyDB - - _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); - - String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; - String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(databasePath == null) - { - databasePath = defaultPath; - } - - if(!MEMORY_STORE_LOCATION.equals(databasePath)) - { - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - } - - _storeLocation = databasePath; - - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. - _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true"; - - - - _eventManager.addEventListener(new EventListener() - { - @Override - public void event(Event event) - { - setInitialSize(); - } - }, Event.BEFORE_ACTIVATE); - - } - - private void setInitialSize() - { - Connection conn = null; - try - { - - - try - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - } - finally - { - if(conn != null) - { - conn.close(); - - - } - } - } - catch (SQLException e) - { - getLogger().error("Unable to set initial store size", e); - } - } - - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - byte[] bytes = blob.getBytes(1, (int)blob.length()); - return new String(bytes, UTF8_CHARSET); - } - - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - } - - - protected boolean tableExists(final String tableName, final Connection conn) throws SQLException - { - PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); - try - { - stmt.setString(1, tableName); - ResultSet rs = stmt.executeQuery(); - try - { - return rs.next(); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - - - @Override - public String getStoreLocation() - { - return _storeLocation; - } - - protected synchronized void storedSizeChange(final int delta) - { - if(getPersistentSizeHighThreshold() > 0) - { - synchronized(this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 3*delta; - - Connection conn = null; - try - { - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(conn); - - _totalStoreSize = getSizeOnDisk(conn); - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new RuntimeException("Exception while processing store size change", e); - } - } - } - } - - private void reduceSizeOnDisk(Connection conn) - { - CallableStatement cs = null; - PreparedStatement stmt = null; - try - { - String tableQuery = - "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; - stmt = conn.prepareStatement(tableQuery); - ResultSet rs = null; - - List<String> schemas = new ArrayList<String>(); - List<String> tables = new ArrayList<String>(); - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - schemas.add(rs.getString(1)); - tables.add(rs.getString(2)); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - - cs = conn.prepareCall - ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); - - for(int i = 0; i < schemas.size(); i++) - { - cs.setString(1, schemas.get(i)); - cs.setString(2, tables.get(i)); - cs.setShort(3, (short) 0); - cs.execute(); - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new RuntimeException("Error reducing on disk size", e); - } - finally - { - closePreparedStatement(stmt); - closePreparedStatement(cs); - } - - } - - private long getSizeOnDisk(Connection conn) - { - PreparedStatement stmt = null; - try - { - String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + - " FROM " + - " SYS.SYSTABLES systabs," + - " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + - " WHERE systabs.tabletype = 'T'"; - - stmt = conn.prepareStatement(sizeQuery); - - ResultSet rs = null; - long size = 0l; - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - size = rs.getLong(1); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - return size; - - } - catch (SQLException e) - { - closeConnection(conn); - throw new RuntimeException("Error establishing on disk size", e); - } - finally - { - closePreparedStatement(stmt); - } - - } - - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; - } - - @Override - public String getStoreType() - { - return TYPE; - } - - @Override - public void onDelete() - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Deleting store " + _storeLocation); - } - - if (MEMORY_STORE_LOCATION.equals(_storeLocation)) - { - return; - } - - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - _logger.error("Cannot delete " + _storeLocation); - } - } - } - } - - protected Connection getConnection() throws SQLException - { - return DriverManager.getConnection(_connectionURL); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java deleted file mode 100644 index 1b111ad65e..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import java.util.Collections; -import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.MessageStoreFactory; -import org.apache.qpid.server.store.MessageStore; - -public class DerbyMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public String getType() - { - return DerbyMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new DerbyMessageStore(); - } - - @Override - public Map<String, Object> convertStoreConfiguration(Configuration configuration) - { - return Collections.emptyMap(); - } - - - @Override - public void validateAttributes(Map<String, Object> attributes) - { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) - { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH - +"' is required and must be of type String."); - - } - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java index c66fa4e869..54978776e7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java @@ -29,3 +29,4 @@ public interface ConnectionProvider void close() throws SQLException; } + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java deleted file mode 100644 index 7945ae3b46..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -class DefaultConnectionProvider implements ConnectionProvider -{ - private final String _connectionUrl; - - public DefaultConnectionProvider(String connectionUrl) - { - _connectionUrl = connectionUrl; - } - - @Override - public Connection getConnection() throws SQLException - { - return DriverManager.getConnection(_connectionUrl); - } - - @Override - public void close() throws SQLException - { - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java deleted file mode 100644 index 8fc7de12d0..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; - -public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory -{ - - @Override - public String getType() - { - return "NONE"; - } - - @Override - public ConnectionProvider getConnectionProvider(String connectionUrl, - VirtualHost virtualHost) - { - return new DefaultConnectionProvider(connectionUrl); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java deleted file mode 100644 index f8d93536bb..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ /dev/null @@ -1,462 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.jdbc; - - -import java.sql.Blob; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.Transaction; - -/** - * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence - * mechanism. - * - */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore -{ - - private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); - - - public static final String TYPE = "JDBC"; - public static final String CONNECTION_URL = "connectionURL"; - - protected String _connectionURL; - private ConnectionProvider _connectionProvider; - - - private static class JDBCDetails - { - private final String _vendor; - private String _blobType; - private String _varBinaryType; - private String _bigintType; - private boolean _useBytesMethodsForBlob; - - private JDBCDetails(String vendor, - String blobType, - String varBinaryType, - String bigIntType, - boolean useBytesMethodsForBlob) - { - _vendor = vendor; - setBlobType(blobType); - setVarBinaryType(varBinaryType); - setBigintType(bigIntType); - setUseBytesMethodsForBlob(useBytesMethodsForBlob); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - JDBCDetails that = (JDBCDetails) o; - - if (!getVendor().equals(that.getVendor())) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - return getVendor().hashCode(); - } - - @Override - public String toString() - { - return "JDBCDetails{" + - "vendor='" + getVendor() + '\'' + - ", blobType='" + getBlobType() + '\'' + - ", varBinaryType='" + getVarBinaryType() + '\'' + - ", bigIntType='" + getBigintType() + '\'' + - ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + - '}'; - } - - public String getVendor() - { - return _vendor; - } - - public String getBlobType() - { - return _blobType; - } - - public void setBlobType(String blobType) - { - _blobType = blobType; - } - - public String getVarBinaryType() - { - return _varBinaryType; - } - - public void setVarBinaryType(String varBinaryType) - { - _varBinaryType = varBinaryType; - } - - public boolean isUseBytesMethodsForBlob() - { - return _useBytesMethodsForBlob; - } - - public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) - { - _useBytesMethodsForBlob = useBytesMethodsForBlob; - } - - public String getBigintType() - { - return _bigintType; - } - - public void setBigintType(String bigintType) - { - _bigintType = bigintType; - } - } - - private static JDBCDetails DERBY_DETAILS = - new JDBCDetails("derby", - "blob", - "varchar(%d) for bit data", - "bigint", - false); - - private static JDBCDetails POSTGRESQL_DETAILS = - new JDBCDetails("postgresql", - "bytea", - "bytea", - "bigint", - true); - - private static JDBCDetails MYSQL_DETAILS = - new JDBCDetails("mysql", - "blob", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails SYBASE_DETAILS = - new JDBCDetails("sybase", - "image", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails ORACLE_DETAILS = - new JDBCDetails("oracle", - "blob", - "raw(%d)", - "number", - false); - - - private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>(); - - static - { - - addDetails(DERBY_DETAILS); - addDetails(POSTGRESQL_DETAILS); - addDetails(MYSQL_DETAILS); - addDetails(SYBASE_DETAILS); - addDetails(ORACLE_DETAILS); - } - - private static void addDetails(JDBCDetails details) - { - VENDOR_DETAILS.put(details.getVendor(), details); - } - - private String _blobType; - private String _varBinaryType; - private String _bigIntType; - private boolean _useBytesMethodsForBlob; - - private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>(); - - - public JDBCMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - protected String getSqlBlobType() - { - return _blobType; - } - - protected String getSqlVarBinaryType(int size) - { - return String.format(_varBinaryType, size); - } - - public String getSqlBigIntType() - { - return _bigIntType; - } - - @Override - protected void doClose() throws AMQStoreException - { - while(!_transactions.isEmpty()) - { - RecordedJDBCTransaction txn = _transactions.get(0); - txn.abortTran(); - } - try - { - _connectionProvider.close(); - } - catch (SQLException e) - { - throw new AMQStoreException("Unable to close connection provider ", e); - } - } - - - protected Connection getConnection() throws SQLException - { - return _connectionProvider.getConnection(); - } - - - protected void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) - throws ClassNotFoundException, SQLException - { - - - String connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null - ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) - : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); - - JDBCDetails details = null; - - String[] components = connectionURL.split(":",3); - if(components.length >= 2) - { - String vendor = components[1]; - details = VENDOR_DETAILS.get(vendor); - } - - if(details == null) - { - getLogger().info("Do not recognize vendor from connection URL: " + connectionURL); - - // TODO - is there a better default than derby - details = DERBY_DETAILS; - } - - - Object poolAttribute = virtualHost.getAttribute("connectionPool"); - String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute); - - JDBCConnectionProviderFactory connectionProviderFactory = - JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); - if(connectionProviderFactory == null) - { - _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); - connectionProviderFactory = new DefaultConnectionProviderFactory(); - } - - _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost); - - _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType()); - _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType()); - _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob()); - _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType()); - } - - - private String getStringAttribute(VirtualHost virtualHost, String attributeName, String defaultVal) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - return attrValue.toString(); - } - return defaultVal; - } - - private boolean getBooleanAttribute(VirtualHost virtualHost, String attributeName, boolean defaultVal) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - if(attrValue instanceof Boolean) - { - return ((Boolean) attrValue).booleanValue(); - } - else if(attrValue instanceof String) - { - return Boolean.parseBoolean((String)attrValue); - } - - } - return defaultVal; - } - - - protected void storedSizeChange(int contentSize) - { - } - - @Override - public String getStoreLocation() - { - return ""; - } - - @Override - public String getStoreType() - { - return TYPE; - } - - @Override - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - if(_useBytesMethodsForBlob) - { - return rs.getBytes(col); - } - else - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - - } - } - - @Override - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - byte[] bytes; - if(_useBytesMethodsForBlob) - { - bytes = rs.getBytes(col); - return new String(bytes,UTF8_CHARSET); - } - else - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - bytes = blob.getBytes(1, (int)blob.length()); - } - return new String(bytes, UTF8_CHARSET); - - } - - @Override - public Transaction newTransaction() - { - return new RecordedJDBCTransaction(); - } - - private class RecordedJDBCTransaction extends JDBCTransaction - { - private RecordedJDBCTransaction() - { - super(); - JDBCMessageStore.this._transactions.add(this); - } - - @Override - public void commitTran() throws AMQStoreException - { - try - { - super.commitTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public StoreFuture commitTranAsync() throws AMQStoreException - { - try - { - return super.commitTranAsync(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public void abortTran() throws AMQStoreException - { - try - { - super.abortTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java deleted file mode 100644 index 82d2275156..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.MessageStoreFactory; -import org.apache.qpid.server.store.MessageStore; - -public class JDBCMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public String getType() - { - return JDBCMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new JDBCMessageStore(); - } - - @Override - public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) - { - Map<String,Object> convertedMap = new HashMap<String,Object>(); - convertedMap.put("jdbcBlobType", storeConfiguration.getString("sqlBlobType")); - convertedMap.put("jdbcVarbinaryType", storeConfiguration.getString("sqlVarbinaryType")); - if(storeConfiguration.containsKey("useBytesForBlob")) - { - convertedMap.put("jdbcUseBytesForBlob", storeConfiguration.getBoolean("useBytesForBlob")); - } - convertedMap.put("jdbcBigIntType", storeConfiguration.getString("sqlBigIntType")); - convertedMap.put("connectionPool", storeConfiguration.getString("pool.type")); - convertedMap.put("minConnectionsPerPartition", storeConfiguration.getInteger("pool.minConnectionsPerPartition", - null)); - convertedMap.put("maxConnectionsPerPartition", storeConfiguration.getInteger("pool.maxConnectionsPerPartition", - null)); - convertedMap.put("partitionCount", storeConfiguration.getInteger("pool.partitionCount", null)); - - return convertedMap; - } - - - @Override - public void validateAttributes(Map<String, Object> attributes) - { - Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); - if(!(connectionURL instanceof String)) - { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) - { - throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL - +"' is required and must be of type String."); - - } - } - } - -} diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory index 0edd44f5a5..02f22eb21a 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -16,6 +16,4 @@ # specific language governing permissions and limitations # under the License. # -org.apache.qpid.server.store.derby.DerbyMessageStoreFactory org.apache.qpid.server.store.MemoryMessageStoreFactory -org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index c6473d9520..042abca9c4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -78,8 +78,7 @@ public class VirtualHostRecovererTest extends TestCase attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_PATH, "/path/to/virtualhost/store"); - attributes.put(VirtualHost.STORE_TYPE, "DERBY"); + attributes.put(VirtualHost.STORE_TYPE, "MEMORY"); when(entry.getAttributes()).thenReturn(attributes); VirtualHost host = recoverer.create(null, entry, parent); @@ -99,8 +98,7 @@ public class VirtualHostRecovererTest extends TestCase attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); - attributes.put(VirtualHost.STORE_PATH, "/path/to/store"); - attributes.put(VirtualHost.STORE_TYPE, "DERBY"); + attributes.put(VirtualHost.STORE_TYPE, "MEMORY"); mandatoryAttributes = new String[]{VirtualHost.NAME, VirtualHost.STORE_TYPE}; checkMandatoryAttributesAreValidated(mandatoryAttributes, attributes); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 4a6b3f2cad..c6d166bc4c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -42,9 +42,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -54,11 +51,10 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockStoredMessage; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; -public class DurableConfigurationStoreTest extends QpidTestCase +public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { private static final String EXCHANGE_NAME = "exchangeName"; private String _storePath; @@ -371,6 +367,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase private void reopenStore() throws Exception { + onReopenStore(); if (_messageStore != null) { _messageStore.close(); @@ -383,8 +380,10 @@ public class DurableConfigurationStoreTest extends QpidTestCase _messageStore.activate(); } - protected MessageStore createMessageStore() throws Exception - { + protected abstract void onReopenStore(); + + abstract protected MessageStore createMessageStore() throws Exception; + /*{ String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); if (storeClass == null) { @@ -394,9 +393,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance(); return messageStore; } - - protected DurableConfigurationStore createConfigStore() throws Exception - { +*/ + abstract protected DurableConfigurationStore createConfigStore() throws Exception; + /*{ String storeClass = System.getProperty(CONFIGURATION_STORE_CLASS_NAME_KEY); if (storeClass == null) { @@ -414,7 +413,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase } return configurationStore; } - +*/ public void testRecordXid() throws Exception { Record enqueueRecord = getTestRecord(1); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java deleted file mode 100644 index e74937dd1c..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.store.derby.DerbyMessageStore; -import org.apache.qpid.test.utils.QpidTestCase; - -public class MessageStoreCreatorTest extends QpidTestCase -{ - private static final String[] STORE_TYPES = {MemoryMessageStore.TYPE, DerbyMessageStore.TYPE}; - - public void testMessageStoreCreator() - { - MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java deleted file mode 100644 index 479675dac1..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; -import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase -{ - private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); - - private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; - - /** - * Estimated using an assumption that a physical disk space occupied by a - * message is 3 times bigger then a message size - */ - private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8); - - private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8); - - @Override - protected int getNumberOfMessagesToFillStore() - { - return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; - } - - @Override - protected void applyStoreSpecificConfiguration(VirtualHost vhost) - { - _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - - when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); - when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); - } - - @Override - protected MessageStore createStore() throws Exception - { - return new DerbyMessageStore(); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java deleted file mode 100644 index 859fad629b..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import java.io.File; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreTestCase; -import org.apache.qpid.util.FileUtils; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class DerbyMessageStoreTest extends MessageStoreTestCase -{ - private String _storeLocation; - - @Override - public void tearDown() throws Exception - { - try - { - deleteStoreIfExists(); - } - finally - { - super.tearDown(); - } - } - - public void testOnDelete() throws Exception - { - File location = new File(_storeLocation); - assertTrue("Store does not exist at " + _storeLocation, location.exists()); - - getStore().close(); - assertTrue("Store does not exist at " + _storeLocation, location.exists()); - - getStore().onDelete(); - assertFalse("Store exists at " + _storeLocation, location.exists()); - } - - @Override - protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception - { - _storeLocation = TMP_FOLDER + File.separator + getTestName(); - when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation); - deleteStoreIfExists(); - } - - private void deleteStoreIfExists() - { - File location = new File(_storeLocation); - if (location.exists()) - { - FileUtils.delete(location, true); - } - } - - @Override - protected MessageStore createMessageStore() - { - return new DerbyMessageStore(); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java deleted file mode 100644 index a8e0460cea..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashSet; -import java.util.Set; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreTestCase; -import org.apache.qpid.server.store.derby.DerbyMessageStore; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class JDBCMessageStoreTest extends MessageStoreTestCase -{ - private String _connectionURL; - - @Override - public void tearDown() throws Exception - { - try - { - shutdownDerby(); - } - finally - { - super.tearDown(); - } - } - - public void testOnDelete() throws Exception - { - String[] expectedTables = JDBCMessageStore.ALL_TABLES; - assertTablesExist(expectedTables, true); - getStore().close(); - assertTablesExist(expectedTables, true); - getStore().onDelete(); - assertTablesExist(expectedTables, false); - } - - @Override - protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception - { - _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; - - when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL); - } - - @Override - protected MessageStore createMessageStore() - { - return new JDBCMessageStore(); - } - - private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException - { - Set<String> existingTables = getTableNames(); - for (String tableName : expectedTables) - { - assertEquals("Table " + tableName + (exists ? " is not found" : " actually exist"), exists, - existingTables.contains(tableName)); - } - } - - private Set<String> getTableNames() throws SQLException - { - Set<String> tableNames = new HashSet<String>(); - Connection conn = null; - try - { - conn = openConnection(); - DatabaseMetaData metaData = conn.getMetaData(); - ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" }); - try - { - while (tables.next()) - { - tableNames.add(tables.getString("TABLE_NAME")); - } - } - finally - { - tables.close(); - } - } - finally - { - if (conn != null) - { - conn.close(); - } - } - return tableNames; - } - - private Connection openConnection() throws SQLException - { - return DriverManager.getConnection(_connectionURL); - } - - - private void shutdownDerby() throws SQLException - { - Connection connection = null; - try - { - connection = DriverManager.getConnection("jdbc:derby:memory:/" + getTestName() + ";shutdown=true"); - } - catch(SQLException e) - { - if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { - //expected and represents a clean shutdown of this database only, do nothing. - } - else - { - throw e; - } - } - finally - { - if (connection != null) - { - connection.close(); - } - } - } -} |