diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java | 177 |
1 files changed, 78 insertions, 99 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 758945d6a1..fdb36c9013 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -43,16 +43,13 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore { @@ -164,9 +161,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC _stateManager = new StateManager(_eventManager); } - private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); - - @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler configRecoveryHandler, @@ -462,18 +456,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { try { - List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - - ExchangeRecoveryHandler erh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverExchanges(erh, configuredObjects); + recoveryHandler.beginConfigurationRecovery(this); + loadConfiguredObjects(recoveryHandler); - QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - BindingRecoveryHandler brh = qrh.completeQueueRecovery(); - _configuredObjectHelper.recoverBindings(brh, configuredObjects); - - brh.completeBindingRecovery(); + recoveryHandler.completeConfigurationRecovery(); } catch (SQLException e) { @@ -573,92 +559,42 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - @Override - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); - insertConfiguredObject(configuredObject); - } - - } - - @Override - public void removeExchange(Exchange exchange) throws AMQStoreException - { - int results = removeConfiguredObject(exchange.getId()); - if (results == 0) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found"); - } - } @Override - public void bindQueue(Binding binding) - throws AMQStoreException + public void create(UUID id, String type, Map<String,Object> attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); - insertConfiguredObject(configuredObject); + insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes)); } - } - @Override - public void unbindQueue(Binding binding) - throws AMQStoreException - { - int results = removeConfiguredObject(binding.getId()); - if (results == 0) - { - throw new AMQStoreException("Binding " + binding + " not found"); - } - } - - @Override - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); } @Override - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException + public void remove(UUID id, String type) throws AMQStoreException { - getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called"); - - if (_stateManager.isInState(State.ACTIVE)) + int results = removeConfiguredObject(id); + if (results == 0) { - ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); - insertConfiguredObject(queueConfiguredObject); + throw new AMQStoreException(type + " with id " + id + " not found"); } } - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * NOTE: Currently only updates the exclusivity. - * - * @param queue The queue to update the entry for. - * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. - */ @Override - public void updateQueue(final AMQQueue queue) throws AMQStoreException + public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId()); + ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id); if (queueConfiguredObject != null) { - ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); updateConfiguredObject(newQueueRecord); } } } - /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED * isolation and with auto-commit transactions enabled. @@ -714,18 +650,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract Connection getConnection() throws SQLException; - @Override - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - AMQShortString name = queue.getNameShortString(); - getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called"); - int results = removeConfiguredObject(queue.getId()); - if (results == 0) - { - throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found"); - } - } - private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException { byte[] argumentBytes; @@ -1825,7 +1749,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } else { - byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + final Map<String, Object> attributes = configuredObject.getAttributes(); + byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); } @@ -1852,6 +1777,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } catch (SQLException e) { throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); @@ -1914,7 +1851,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt2.setString(1, configuredObject.getType()); if (configuredObject.getAttributes() != null) { - byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes( + configuredObject.getAttributes()); ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); stmt2.setBinaryStream(2, bis, attributesAsBytes.length); } @@ -1946,6 +1884,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } catch (SQLException e) { throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); @@ -1972,7 +1922,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { String type = rs.getString(1); String attributes = getBlobAsString(rs, 2); - result = new ConfiguredObjectRecord(id, type, attributes); + result = new ConfiguredObjectRecord(id, type, + (new ObjectMapper()).readValue(attributes,Map.class)); } } finally @@ -1990,6 +1941,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } catch (SQLException e) { throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " @@ -1998,10 +1964,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC return result; } - private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException + private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, AMQStoreException { - ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); Connection conn = newAutoCommitConnection(); + + final ObjectMapper objectMapper = new ObjectMapper(); try { PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); @@ -2015,9 +1982,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC String id = rs.getString(1); String objectType = rs.getString(2); String attributes = getBlobAsString(rs, 3); - results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes)); + recoveryHandler.configuredObject(UUID.fromString(id), objectType, + objectMapper.readValue(attributes,Map.class)); } } + catch (JsonMappingException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); + } finally { rs.close(); @@ -2032,7 +2012,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { conn.close(); } - return results; } protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; |