diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store')
10 files changed, 184 insertions, 394 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index ede01d247e..ab7ef3f55b 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -46,19 +46,7 @@ public interface ConfigurationRecoveryHandler public static interface BindingRecoveryHandler { void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf); - BrokerLinkRecoveryHandler completeBindingRecovery(); - } - - public static interface BrokerLinkRecoveryHandler - { - BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments); - void completeBrokerLinkRecovery(); - } - - public static interface BridgeRecoveryHandler - { - void bridge(UUID id, long createTime, Map<String,String> arguments); - void completeBridgeRecoveryForLink(); + void completeBindingRecovery(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 655887e5c2..4e7bbf04a6 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -26,8 +26,6 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.queue.AMQQueue; public interface DurableConfigurationStore @@ -122,12 +120,5 @@ public interface DurableConfigurationStore * @throws AMQStoreException If the operation fails for any reason. */ void updateQueue(AMQQueue queue) throws AMQStoreException; - - void createBrokerLink(BrokerLink link) throws AMQStoreException; - - void deleteBrokerLink(BrokerLink link) throws AMQStoreException; - - void createBridge(Bridge bridge) throws AMQStoreException; - - void deleteBridge(Bridge bridge) throws AMQStoreException; + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 262d7d0213..3f1d1b9530 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class MemoryMessageStore extends NullMessageStore { + public static final String TYPE = "Memory"; private final AtomicLong _messageId = new AtomicLong(1); private final AtomicBoolean _closed = new AtomicBoolean(false); @@ -138,6 +139,6 @@ public class MemoryMessageStore extends NullMessageStore @Override public String getStoreType() { - return "Memory"; + return TYPE; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java new file mode 100644 index 0000000000..20b6b7a8a6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +public class MemoryMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return MemoryMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new MemoryMessageStore(); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java new file mode 100644 index 0000000000..0d5a4850f6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.plugin.QpidServiceLoader; + +public class MessageStoreCreator +{ + private Map<String, MessageStoreFactory> _factories = new HashMap<String, MessageStoreFactory>(); + + public MessageStoreCreator() + { + QpidServiceLoader<MessageStoreFactory> qpidServiceLoader = new QpidServiceLoader<MessageStoreFactory>(); + Iterable<MessageStoreFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(MessageStoreFactory.class); + for (MessageStoreFactory messageStoreFactory : factories) + { + String type = messageStoreFactory.getType(); + MessageStoreFactory factory = _factories.put(type.toLowerCase(), messageStoreFactory); + if (factory != null) + { + throw new IllegalStateException("MessageStoreFactory with type name '" + type + + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '" + + messageStoreFactory.getClass().getName() + "'"); + } + } + } + + public MessageStore createMessageStore(String storeType) + { + MessageStoreFactory factory = _factories.get(storeType.toLowerCase()); + if (factory == null) + { + throw new IllegalConfigurationException("Unknown store type: " + storeType); + } + return factory.createMessageStore(); + } + + public Collection<MessageStoreFactory> getFactories() + { + return Collections.unmodifiableCollection(_factories.values()); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java new file mode 100644 index 0000000000..a1afd02f12 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public interface MessageStoreFactory +{ + String getType(); + + MessageStore createMessageStore(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index be08e309e6..c6bffbc1de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -24,8 +24,6 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.queue.AMQQueue; public abstract class NullMessageStore implements MessageStore @@ -78,26 +76,6 @@ public abstract class NullMessageStore implements MessageStore } @Override - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - } - - @Override - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - } - - @Override - public void createBridge(final Bridge bridge) throws AMQStoreException - { - } - - @Override - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - } - - @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/java/broker/src/main/java/org/apache/qpid/server/store/State.java index 2783637b2a..1d0936cec4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/State.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/State.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.configuration.ConfiguredObject; - public enum State { /** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */ @@ -30,7 +28,7 @@ public enum State INITIALISING, /** * The initial set-up of the store has completed. - * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk. + * If the store is persistent, it has not yet loaded configuration from disk. * * From the point of view of the user, the store is essentially stopped. */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 154d7e6535..e9946d1860 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.store.derby; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; @@ -41,7 +40,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -55,12 +53,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectHelper; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.Event; @@ -236,7 +231,7 @@ public class DerbyMessageStore implements MessageStore private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - private static final String DERBY_STORE_TYPE = "DERBY"; + public static final String TYPE = "DERBY"; private final StateManager _stateManager; @@ -572,8 +567,7 @@ public class DerbyMessageStore implements MessageStore BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); - recoverBrokerLinks(lrh); + brh.completeBindingRecovery(); } catch (SQLException e) { @@ -581,144 +575,6 @@ public class DerbyMessageStore implements MessageStore } } - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) - throws SQLException - { - _logger.info("Recovering broker links..."); - - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS); - - try - { - ResultSet rs = stmt.executeQuery(); - - try - { - - while(rs.next()) - { - UUID id = new UUID(rs.getLong(2), rs.getLong(1)); - long createTime = rs.getLong(3); - Blob argumentsAsBlob = rs.getBlob(4); - - byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); - int size = dis.readInt(); - - Map<String,String> arguments = new HashMap<String, String>(); - - for(int i = 0; i < size; i++) - { - arguments.put(dis.readUTF(), dis.readUTF()); - } - - ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); - - recoverBridges(brh, id); - - } - } - catch (IOException e) - { - throw new SQLException(e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - - private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) - throws SQLException - { - _logger.info("Recovering bridges for link " + linkId + "..."); - - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES); - - try - { - stmt.setLong(1, linkId.getLeastSignificantBits()); - stmt.setLong(2, linkId.getMostSignificantBits()); - - ResultSet rs = stmt.executeQuery(); - - try - { - - while(rs.next()) - { - UUID id = new UUID(rs.getLong(2), rs.getLong(1)); - long createTime = rs.getLong(3); - Blob argumentsAsBlob = rs.getBlob(6); - - byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); - int size = dis.readInt(); - - Map<String,String> arguments = new HashMap<String, String>(); - - for(int i = 0; i < size; i++) - { - arguments.put(dis.readUTF(), dis.readUTF()); - } - - brh.bridge(id, createTime, arguments); - - } - brh.completeBridgeRecoveryForLink(); - } - catch (IOException e) - { - throw new SQLException(e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - @Override public void close() throws Exception { @@ -975,71 +831,6 @@ public class DerbyMessageStore implements MessageStore } } - @Override - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called"); - - if (_stateManager.isInState(State.ACTIVE)) - { - try - { - Connection conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(FIND_LINK); - try - { - - stmt.setLong(1, link.getQMFId().getLeastSignificantBits()); - stmt.setLong(2, link.getQMFId().getMostSignificantBits()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this queue - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS); - - try - { - - insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits()); - insertStmt.setLong(2, link.getQMFId().getMostSignificantBits()); - insertStmt.setLong(3, link.getCreateTime()); - - byte[] argumentBytes = convertStringMapToBytes(link.getArguments()); - ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes); - - insertStmt.setBinaryStream(4,bis,argumentBytes.length); - - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - conn.close(); - - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e); - } - } - } - private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException { byte[] argumentBytes; @@ -1072,139 +863,7 @@ public class DerbyMessageStore implements MessageStore return argumentBytes; } - @Override - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - _logger.debug("public void deleteBrokerLink( " + link + "): called"); - Connection conn = null; - PreparedStatement stmt = null; - try - { - conn = newAutoCommitConnection(); - stmt = conn.prepareStatement(DELETE_FROM_LINKS); - stmt.setLong(1, link.getQMFId().getLeastSignificantBits()); - stmt.setLong(2, link.getQMFId().getMostSignificantBits()); - int results = stmt.executeUpdate(); - - if (results == 0) - { - throw new AMQStoreException("Link " + link + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e); - } - finally - { - closePreparedStatement(stmt); - closeConnection(conn); - } - - - } - - @Override - public void createBridge(final Bridge bridge) throws AMQStoreException - { - _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called"); - if (_stateManager.isInState(State.ACTIVE)) - { - try - { - Connection conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE); - try - { - - UUID id = bridge.getQMFId(); - stmt.setLong(1, id.getLeastSignificantBits()); - stmt.setLong(2, id.getMostSignificantBits()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this queue - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES); - - try - { - - insertStmt.setLong(1, id.getLeastSignificantBits()); - insertStmt.setLong(2, id.getMostSignificantBits()); - - insertStmt.setLong(3, bridge.getCreateTime()); - - UUID linkId = bridge.getLink().getQMFId(); - insertStmt.setLong(4, linkId.getLeastSignificantBits()); - insertStmt.setLong(5, linkId.getMostSignificantBits()); - - byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments()); - ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes); - - insertStmt.setBinaryStream(6,bis,argumentBytes.length); - - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - conn.close(); - - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e); - } - } - } - - @Override - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - _logger.debug("public void deleteBridge( " + bridge + "): called"); - Connection conn = null; - PreparedStatement stmt = null; - try - { - conn = newAutoCommitConnection(); - stmt = conn.prepareStatement(DELETE_FROM_BRIDGES); - stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits()); - stmt.setLong(2, bridge.getQMFId().getMostSignificantBits()); - int results = stmt.executeUpdate(); - - if (results == 0) - { - throw new AMQStoreException("Bridge " + bridge + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e); - } - finally - { - closePreparedStatement(stmt); - closeConnection(conn); - } - - } @Override public Transaction newTransaction() @@ -2134,8 +1793,9 @@ public class DerbyMessageStore implements MessageStore public ByteBuffer getContent(int offsetInMessage, int size) { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); buf.position(0); + buf.limit(length); return buf; } @@ -2673,7 +2333,7 @@ public class DerbyMessageStore implements MessageStore @Override public String getStoreType() { - return DERBY_STORE_TYPE; + return TYPE; } }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java new file mode 100644 index 0000000000..046b503d8a --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreFactory; + +public class DerbyMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return DerbyMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new DerbyMessageStore(); + } + +} |