summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java467
1 files changed, 219 insertions, 248 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index ad3e685004..28ac79075e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -21,11 +21,7 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -46,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -53,7 +50,6 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.transport.ConnectionOpen;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -180,24 +176,19 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected final EventManager _eventManager = new EventManager();
- protected final StateManager _messageStoreStateManager;
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
- private StateManager _configurationStoreStateManager;
private boolean _initialized;
- public AbstractJDBCMessageStore()
- {
- _messageStoreStateManager = new StateManager(_eventManager);
- _configurationStoreStateManager = new StateManager(new EventManager());
- }
-
@Override
public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _configurationStoreStateManager.attainState(State.INITIALISING);
- initialiseIfNecessary(virtualHostName, storeSettings);
- _configurationStoreStateManager.attainState(State.INITIALISED);
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ initialiseIfNecessary(virtualHostName, storeSettings);
+ }
}
private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
@@ -216,14 +207,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
throw new StoreException("Unexpected exception occured", e);
}
- _initialized =true;
+ _initialized = true;
}
}
@Override
public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
{
- _configurationStoreStateManager.attainState(State.ACTIVATING);
+ checkConfigurationStoreOpen();
+
try
{
createOrOpenConfigurationStoreDatabase();
@@ -236,7 +228,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
- _configurationStoreStateManager.attainState(State.ACTIVE);
+ }
+
+ private void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ private void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
}
private void upgradeIfVersionTableExists(ConfiguredObject<?> parent)
@@ -261,15 +268,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
{
- _messageStoreStateManager.attainState(State.INITIALISING);
- initialiseIfNecessary(virtualHostName, messageStoreSettings);
- _messageStoreStateManager.attainState(State.INITIALISED);
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ initialiseIfNecessary(virtualHostName, messageStoreSettings);
+ }
}
@Override
public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
- _messageStoreStateManager.attainState(State.ACTIVATING);
+ checkMessageStoreOpen();
try
{
createOrOpenMessageStoreDatabase();
@@ -305,8 +313,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
-
- _messageStoreStateManager.attainState(State.ACTIVE);
}
protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
@@ -634,8 +640,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
-
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -700,8 +704,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
-
-
private void createXidTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_TABLE_NAME, conn))
@@ -836,35 +838,34 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void closeMessageStore()
{
- _messageStoreStateManager.attainState(State.CLOSING);
-
- if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL))
+ if (_messageStoreOpen.compareAndSet(true, false))
{
- doClose();
+ if (!_configurationStoreOpen.get())
+ {
+ doClose();
+ }
}
-
- _messageStoreStateManager.attainState(State.CLOSED);
}
@Override
public void closeConfigurationStore()
{
- _configurationStoreStateManager.attainState(State.CLOSING);
-
- if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL))
+ if (_configurationStoreOpen.compareAndSet(true, false))
{
- doClose();
+ if (!_messageStoreOpen.get())
+ {
+ doClose();
+ }
}
-
- _configurationStoreStateManager.attainState(State.CLOSED);
}
-
protected abstract void doClose();
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
+ checkMessageStoreOpen();
+
if(metaData.isPersistent())
{
return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
@@ -875,12 +876,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- public StoredMessage getMessage(long messageNumber)
- {
- return null;
- }
-
- public void removeMessage(long messageId)
+ private void removeMessage(long messageId)
{
try
{
@@ -944,26 +940,24 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void create(ConfiguredObjectRecord object) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE))
+ checkConfigurationStoreOpen();
+ try
{
+ Connection conn = newConnection();
try
{
- Connection conn = newConnection();
- try
- {
- insertConfiguredObject(object, conn);
- conn.commit();
- }
- finally
- {
- conn.close();
- }
+ insertConfiguredObject(object, conn);
+ conn.commit();
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error creating ConfiguredObject " + object);
+ conn.close();
}
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Error creating ConfiguredObject " + object);
+ }
}
/**
@@ -1021,46 +1015,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract Connection getConnection() throws SQLException;
- private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws StoreException
- {
- byte[] argumentBytes;
- if(arguments == null)
- {
- argumentBytes = new byte[0];
- }
- else
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
-
- try
- {
- dos.writeInt(arguments.size());
- for(Map.Entry<String,String> arg : arguments.entrySet())
- {
- dos.writeUTF(arg.getKey());
- dos.writeUTF(arg.getValue());
- }
- }
- catch (IOException e)
- {
- // This should never happen
- throw new StoreException(e.getMessage(), e);
- }
- argumentBytes = bos.toByteArray();
- }
- return argumentBytes;
- }
-
@Override
public Transaction newTransaction()
{
+ checkMessageStoreOpen();
+
return new JDBCTransaction();
}
- public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws
- StoreException
+ private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -1103,8 +1066,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws
- StoreException
+ private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -1284,7 +1246,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public void commitTran(ConnectionWrapper connWrapper) throws StoreException
+ private void commitTran(ConnectionWrapper connWrapper) throws StoreException
{
try
@@ -1309,13 +1271,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
return StoreFuture.IMMEDIATE_FUTURE;
}
- public void abortTran(ConnectionWrapper connWrapper) throws StoreException
+ private void abortTran(ConnectionWrapper connWrapper) throws StoreException
{
if (connWrapper == null)
{
@@ -1340,11 +1302,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public Long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
@@ -1398,7 +1355,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1455,7 +1412,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1585,7 +1542,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1672,7 +1629,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- StorableMessageMetaData getMetaData(long messageId) throws SQLException
+ private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -1754,7 +1711,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public int getContent(long messageId, int offset, ByteBuffer dst)
+ private int getContent(long messageId, int offset, ByteBuffer dst)
{
Connection conn = null;
PreparedStatement stmt = null;
@@ -1835,6 +1792,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ checkMessageStoreOpen();
+
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
@@ -1855,12 +1814,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void commitTran()
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@@ -1868,6 +1831,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public StoreFuture commitTranAsync()
{
+ checkMessageStoreOpen();
+
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
@@ -1876,18 +1841,24 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void abortTran()
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
}
}
@@ -1929,6 +1900,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
+ checkMessageStoreOpen();
try
{
metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
@@ -1984,6 +1956,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
else
{
+ checkMessageStoreOpen();
return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
@@ -2002,6 +1975,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public synchronized StoreFuture flushToStore()
{
+ checkMessageStoreOpen();
+
Connection conn = null;
try
{
@@ -2033,6 +2008,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void remove()
{
+ checkMessageStoreOpen();
+
int delta = getMetaData().getContentSize();
AbstractJDBCMessageStore.this.removeMessage(_messageId);
storedSizeChange(-delta);
@@ -2105,87 +2082,85 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE))
+ try
{
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
try
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ boolean exists;
try
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- boolean exists;
+ exists = rs.next();
+
+ }
+ finally
+ {
+ rs.close();
+ }
+ // If we don't have any data in the result set then we can add this configured object
+ if (!exists)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
try
{
- exists = rs.next();
-
- }
- finally
- {
- rs.close();
- }
- // If we don't have any data in the result set then we can add this configured object
- if (!exists)
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
{
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
+ insertStmt.setNull(3, Types.BLOB);
}
- finally
+ else
{
- insertStmt.close();
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
-
- writeHierarchy(configuredObject, conn);
+ insertStmt.execute();
}
-
- }
- finally
- {
- stmt.close();
+ finally
+ {
+ insertStmt.close();
+ }
+
+ writeHierarchy(configuredObject, conn);
}
-
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ stmt.close();
}
- }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
}
@Override
public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
{
+ checkConfigurationStoreOpen();
+
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
try
{
@@ -2242,31 +2217,27 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE) || _configurationStoreStateManager.isInState(State.ACTIVATING))
+ checkConfigurationStoreOpen();
+ try
{
+ Connection conn = newConnection();
try
{
- Connection conn = newConnection();
- try
- {
- for(ConfiguredObjectRecord record : records)
- {
- updateConfiguredObject(record, createIfNecessary, conn);
- }
- conn.commit();
- }
- finally
+ for(ConfiguredObjectRecord record : records)
{
- conn.close();
+ updateConfiguredObject(record, createIfNecessary, conn);
}
+ conn.commit();
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ conn.close();
}
-
}
-
+ catch (SQLException e)
+ {
+ throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
}
private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
@@ -2274,89 +2245,88 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn)
throws SQLException, StoreException
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
try
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ if (rs.next())
{
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- if (rs.next())
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
{
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
{
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
}
- finally
+ else
{
- stmt2.close();
+ stmt2.setNull(2, Types.BLOB);
}
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
}
- else if(createIfNecessary)
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
{
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
{
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
+ insertStmt.setNull(3, Types.BLOB);
}
- finally
+ else
{
- insertStmt.close();
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
- writeHierarchy(configuredObject, conn);
+ insertStmt.execute();
}
+ finally
+ {
+ insertStmt.close();
+ }
+ writeHierarchy(configuredObject, conn);
}
- finally
- {
- rs.close();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
finally
{
- stmt.close();
+ rs.close();
}
-
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
}
private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
@@ -2483,6 +2453,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void onDelete()
{
+ // TODO should probably check we are closed
try
{
Connection conn = newAutoCommitConnection();