diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java | 350 |
1 files changed, 5 insertions, 345 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 154d7e6535..e9946d1860 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.store.derby; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; @@ -41,7 +40,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -55,12 +53,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectHelper; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.Event; @@ -236,7 +231,7 @@ public class DerbyMessageStore implements MessageStore private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - private static final String DERBY_STORE_TYPE = "DERBY"; + public static final String TYPE = "DERBY"; private final StateManager _stateManager; @@ -572,8 +567,7 @@ public class DerbyMessageStore implements MessageStore BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); - recoverBrokerLinks(lrh); + brh.completeBindingRecovery(); } catch (SQLException e) { @@ -581,144 +575,6 @@ public class DerbyMessageStore implements MessageStore } } - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) - throws SQLException - { - _logger.info("Recovering broker links..."); - - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS); - - try - { - ResultSet rs = stmt.executeQuery(); - - try - { - - while(rs.next()) - { - UUID id = new UUID(rs.getLong(2), rs.getLong(1)); - long createTime = rs.getLong(3); - Blob argumentsAsBlob = rs.getBlob(4); - - byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); - int size = dis.readInt(); - - Map<String,String> arguments = new HashMap<String, String>(); - - for(int i = 0; i < size; i++) - { - arguments.put(dis.readUTF(), dis.readUTF()); - } - - ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); - - recoverBridges(brh, id); - - } - } - catch (IOException e) - { - throw new SQLException(e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - - private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) - throws SQLException - { - _logger.info("Recovering bridges for link " + linkId + "..."); - - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES); - - try - { - stmt.setLong(1, linkId.getLeastSignificantBits()); - stmt.setLong(2, linkId.getMostSignificantBits()); - - ResultSet rs = stmt.executeQuery(); - - try - { - - while(rs.next()) - { - UUID id = new UUID(rs.getLong(2), rs.getLong(1)); - long createTime = rs.getLong(3); - Blob argumentsAsBlob = rs.getBlob(6); - - byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); - int size = dis.readInt(); - - Map<String,String> arguments = new HashMap<String, String>(); - - for(int i = 0; i < size; i++) - { - arguments.put(dis.readUTF(), dis.readUTF()); - } - - brh.bridge(id, createTime, arguments); - - } - brh.completeBridgeRecoveryForLink(); - } - catch (IOException e) - { - throw new SQLException(e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - @Override public void close() throws Exception { @@ -975,71 +831,6 @@ public class DerbyMessageStore implements MessageStore } } - @Override - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called"); - - if (_stateManager.isInState(State.ACTIVE)) - { - try - { - Connection conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(FIND_LINK); - try - { - - stmt.setLong(1, link.getQMFId().getLeastSignificantBits()); - stmt.setLong(2, link.getQMFId().getMostSignificantBits()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this queue - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS); - - try - { - - insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits()); - insertStmt.setLong(2, link.getQMFId().getMostSignificantBits()); - insertStmt.setLong(3, link.getCreateTime()); - - byte[] argumentBytes = convertStringMapToBytes(link.getArguments()); - ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes); - - insertStmt.setBinaryStream(4,bis,argumentBytes.length); - - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - conn.close(); - - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e); - } - } - } - private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException { byte[] argumentBytes; @@ -1072,139 +863,7 @@ public class DerbyMessageStore implements MessageStore return argumentBytes; } - @Override - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - _logger.debug("public void deleteBrokerLink( " + link + "): called"); - Connection conn = null; - PreparedStatement stmt = null; - try - { - conn = newAutoCommitConnection(); - stmt = conn.prepareStatement(DELETE_FROM_LINKS); - stmt.setLong(1, link.getQMFId().getLeastSignificantBits()); - stmt.setLong(2, link.getQMFId().getMostSignificantBits()); - int results = stmt.executeUpdate(); - - if (results == 0) - { - throw new AMQStoreException("Link " + link + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e); - } - finally - { - closePreparedStatement(stmt); - closeConnection(conn); - } - - - } - - @Override - public void createBridge(final Bridge bridge) throws AMQStoreException - { - _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called"); - if (_stateManager.isInState(State.ACTIVE)) - { - try - { - Connection conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE); - try - { - - UUID id = bridge.getQMFId(); - stmt.setLong(1, id.getLeastSignificantBits()); - stmt.setLong(2, id.getMostSignificantBits()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this queue - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES); - - try - { - - insertStmt.setLong(1, id.getLeastSignificantBits()); - insertStmt.setLong(2, id.getMostSignificantBits()); - - insertStmt.setLong(3, bridge.getCreateTime()); - - UUID linkId = bridge.getLink().getQMFId(); - insertStmt.setLong(4, linkId.getLeastSignificantBits()); - insertStmt.setLong(5, linkId.getMostSignificantBits()); - - byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments()); - ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes); - - insertStmt.setBinaryStream(6,bis,argumentBytes.length); - - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - conn.close(); - - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e); - } - } - } - - @Override - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - _logger.debug("public void deleteBridge( " + bridge + "): called"); - Connection conn = null; - PreparedStatement stmt = null; - try - { - conn = newAutoCommitConnection(); - stmt = conn.prepareStatement(DELETE_FROM_BRIDGES); - stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits()); - stmt.setLong(2, bridge.getQMFId().getMostSignificantBits()); - int results = stmt.executeUpdate(); - - if (results == 0) - { - throw new AMQStoreException("Bridge " + bridge + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e); - } - finally - { - closePreparedStatement(stmt); - closeConnection(conn); - } - - } @Override public Transaction newTransaction() @@ -2134,8 +1793,9 @@ public class DerbyMessageStore implements MessageStore public ByteBuffer getContent(int offsetInMessage, int size) { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); buf.position(0); + buf.limit(length); return buf; } @@ -2673,7 +2333,7 @@ public class DerbyMessageStore implements MessageStore @Override public String getStoreType() { - return DERBY_STORE_TYPE; + return TYPE; } }
\ No newline at end of file |