summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java177
1 files changed, 78 insertions, 99 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 758945d6a1..fdb36c9013 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -43,16 +43,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -164,9 +161,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
_stateManager = new StateManager(_eventManager);
}
- private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
-
@Override
public void configureConfigStore(String name,
ConfigurationRecoveryHandler configRecoveryHandler,
@@ -462,18 +456,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
try
{
- List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-
- ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
+ recoveryHandler.beginConfigurationRecovery(this);
+ loadConfiguredObjects(recoveryHandler);
- QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- BindingRecoveryHandler brh = qrh.completeQueueRecovery();
- _configuredObjectHelper.recoverBindings(brh, configuredObjects);
-
- brh.completeBindingRecovery();
+ recoveryHandler.completeConfigurationRecovery();
}
catch (SQLException e)
{
@@ -573,92 +559,42 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- @Override
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
- insertConfiguredObject(configuredObject);
- }
-
- }
-
- @Override
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- int results = removeConfiguredObject(exchange.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
- }
- }
@Override
- public void bindQueue(Binding binding)
- throws AMQStoreException
+ public void create(UUID id, String type, Map<String,Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
- insertConfiguredObject(configuredObject);
+ insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes));
}
- }
- @Override
- public void unbindQueue(Binding binding)
- throws AMQStoreException
- {
- int results = removeConfiguredObject(binding.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Binding " + binding + " not found");
- }
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
}
@Override
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ public void remove(UUID id, String type) throws AMQStoreException
{
- getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called");
-
- if (_stateManager.isInState(State.ACTIVE))
+ int results = removeConfiguredObject(id);
+ if (results == 0)
{
- ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
- insertConfiguredObject(queueConfiguredObject);
+ throw new AMQStoreException(type + " with id " + id + " not found");
}
}
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * NOTE: Currently only updates the exclusivity.
- *
- * @param queue The queue to update the entry for.
- * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
- */
@Override
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
+ ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id);
if (queueConfiguredObject != null)
{
- ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
updateConfiguredObject(newQueueRecord);
}
}
}
-
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
@@ -714,18 +650,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract Connection getConnection() throws SQLException;
- @Override
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- AMQShortString name = queue.getNameShortString();
- getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called");
- int results = removeConfiguredObject(queue.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
- }
- }
-
private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
{
byte[] argumentBytes;
@@ -1825,7 +1749,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
else
{
- byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
@@ -1852,6 +1777,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
catch (SQLException e)
{
throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
@@ -1914,7 +1851,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt2.setString(1, configuredObject.getType());
if (configuredObject.getAttributes() != null)
{
- byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ configuredObject.getAttributes());
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
}
@@ -1946,6 +1884,18 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
catch (SQLException e)
{
throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
@@ -1972,7 +1922,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
String type = rs.getString(1);
String attributes = getBlobAsString(rs, 2);
- result = new ConfiguredObjectRecord(id, type, attributes);
+ result = new ConfiguredObjectRecord(id, type,
+ (new ObjectMapper()).readValue(attributes,Map.class));
}
}
finally
@@ -1990,6 +1941,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
catch (SQLException e)
{
throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
@@ -1998,10 +1964,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
return result;
}
- private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, AMQStoreException
{
- ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
Connection conn = newAutoCommitConnection();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
@@ -2015,9 +1982,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
String id = rs.getString(1);
String objectType = rs.getString(2);
String attributes = getBlobAsString(rs, 3);
- results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
+ recoveryHandler.configuredObject(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes,Map.class));
}
}
+ catch (JsonMappingException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
finally
{
rs.close();
@@ -2032,7 +2012,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
conn.close();
}
- return results;
}
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;