summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java396
1 files changed, 396 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
new file mode 100644
index 0000000000..2c4b0e8119
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -0,0 +1,396 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.sql.Blob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.AbstractJDBCMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
+
+/**
+ * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
+ * mechanism.
+ *
+ */
+public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
+
+
+ public static final String TYPE = "JDBC";
+
+
+ private static class JDBCDetails
+ {
+ private final String _vendor;
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigintType;
+ private boolean _useBytesMethodsForBlob;
+
+ private JDBCDetails(String vendor,
+ String blobType,
+ String varBinaryType,
+ String bigIntType,
+ boolean useBytesMethodsForBlob)
+ {
+ _vendor = vendor;
+ setBlobType(blobType);
+ setVarBinaryType(varBinaryType);
+ setBigintType(bigIntType);
+ setUseBytesMethodsForBlob(useBytesMethodsForBlob);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ JDBCDetails that = (JDBCDetails) o;
+
+ if (!getVendor().equals(that.getVendor()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getVendor().hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JDBCDetails{" +
+ "vendor='" + getVendor() + '\'' +
+ ", blobType='" + getBlobType() + '\'' +
+ ", varBinaryType='" + getVarBinaryType() + '\'' +
+ ", bigIntType='" + getBigintType() + '\'' +
+ ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
+ '}';
+ }
+
+ public String getVendor()
+ {
+ return _vendor;
+ }
+
+ public String getBlobType()
+ {
+ return _blobType;
+ }
+
+ public void setBlobType(String blobType)
+ {
+ _blobType = blobType;
+ }
+
+ public String getVarBinaryType()
+ {
+ return _varBinaryType;
+ }
+
+ public void setVarBinaryType(String varBinaryType)
+ {
+ _varBinaryType = varBinaryType;
+ }
+
+ public boolean isUseBytesMethodsForBlob()
+ {
+ return _useBytesMethodsForBlob;
+ }
+
+ public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
+ {
+ _useBytesMethodsForBlob = useBytesMethodsForBlob;
+ }
+
+ public String getBigintType()
+ {
+ return _bigintType;
+ }
+
+ public void setBigintType(String bigintType)
+ {
+ _bigintType = bigintType;
+ }
+ }
+
+ private static JDBCDetails DERBY_DETAILS =
+ new JDBCDetails("derby",
+ "blob",
+ "varchar(%d) for bit data",
+ "bigint",
+ false);
+
+ private static JDBCDetails POSTGRESQL_DETAILS =
+ new JDBCDetails("postgresql",
+ "bytea",
+ "bytea",
+ "bigint",
+ true);
+
+ private static JDBCDetails MYSQL_DETAILS =
+ new JDBCDetails("mysql",
+ "blob",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails SYBASE_DETAILS =
+ new JDBCDetails("sybase",
+ "image",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails ORACLE_DETAILS =
+ new JDBCDetails("oracle",
+ "blob",
+ "raw(%d)",
+ "number",
+ false);
+
+
+ private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>();
+
+ static
+ {
+
+ addDetails(DERBY_DETAILS);
+ addDetails(POSTGRESQL_DETAILS);
+ addDetails(MYSQL_DETAILS);
+ addDetails(SYBASE_DETAILS);
+ addDetails(ORACLE_DETAILS);
+ }
+
+ private static void addDetails(JDBCDetails details)
+ {
+ VENDOR_DETAILS.put(details.getVendor(), details);
+ }
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+ private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>();
+
+
+ public JDBCMessageStore()
+ {
+ }
+
+ protected Logger getLogger()
+ {
+ return _logger;
+ }
+
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ protected void doClose() throws AMQStoreException
+ {
+ while(!_transactions.isEmpty())
+ {
+ RecordedJDBCTransaction txn = _transactions.get(0);
+ txn.abortTran();
+ }
+ }
+
+ protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration)
+ throws ClassNotFoundException
+ {
+
+ _connectionURL = storeConfiguration.getString("connectionUrl",
+ storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
+
+ JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":",3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = VENDOR_DETAILS.get(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ // TODO - is there a better default than derby
+ details = DERBY_DETAILS;
+ }
+
+ _blobType = storeConfiguration.getString("sqlBlobType",details.getBlobType());
+ _varBinaryType = storeConfiguration.getString("sqlVarbinaryType",details.getVarBinaryType());
+ _useBytesMethodsForBlob = storeConfiguration.getBoolean("useBytesForBlob",details.isUseBytesMethodsForBlob());
+ _bigIntType = storeConfiguration.getString("sqlBigIntType", details.getBigintType());
+ }
+
+ protected void storedSizeChange(int contentSize)
+ {
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return "";
+ }
+
+ @Override
+ public String getStoreType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ byte[] bytes;
+ if(_useBytesMethodsForBlob)
+ {
+ bytes = rs.getBytes(col);
+ return new String(bytes,UTF8_CHARSET);
+ }
+ else
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ bytes = blob.getBytes(1, (int)blob.length());
+ }
+ return new String(bytes, UTF8_CHARSET);
+
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new RecordedJDBCTransaction();
+ }
+
+ private class RecordedJDBCTransaction extends JDBCTransaction
+ {
+ private RecordedJDBCTransaction()
+ {
+ super();
+ JDBCMessageStore.this._transactions.add(this);
+ }
+
+ @Override
+ public void commitTran() throws AMQStoreException
+ {
+ try
+ {
+ super.commitTran();
+ }
+ finally
+ {
+ JDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ try
+ {
+ return super.commitTranAsync();
+ }
+ finally
+ {
+ JDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public void abortTran() throws AMQStoreException
+ {
+ try
+ {
+ super.abortTran();
+ }
+ finally
+ {
+ JDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+ }
+
+}