summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/store
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java14
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/State.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java350
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java41
10 files changed, 184 insertions, 394 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index ede01d247e..ab7ef3f55b 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -46,19 +46,7 @@ public interface ConfigurationRecoveryHandler
public static interface BindingRecoveryHandler
{
void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
- BrokerLinkRecoveryHandler completeBindingRecovery();
- }
-
- public static interface BrokerLinkRecoveryHandler
- {
- BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
- void completeBrokerLinkRecovery();
- }
-
- public static interface BridgeRecoveryHandler
- {
- void bridge(UUID id, long createTime, Map<String,String> arguments);
- void completeBridgeRecoveryForLink();
+ void completeBindingRecovery();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 655887e5c2..4e7bbf04a6 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -26,8 +26,6 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.queue.AMQQueue;
public interface DurableConfigurationStore
@@ -122,12 +120,5 @@ public interface DurableConfigurationStore
* @throws AMQStoreException If the operation fails for any reason.
*/
void updateQueue(AMQQueue queue) throws AMQStoreException;
-
- void createBrokerLink(BrokerLink link) throws AMQStoreException;
-
- void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
-
- void createBridge(Bridge bridge) throws AMQStoreException;
-
- void deleteBridge(Bridge bridge) throws AMQStoreException;
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 262d7d0213..3f1d1b9530 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore extends NullMessageStore
{
+ public static final String TYPE = "Memory";
private final AtomicLong _messageId = new AtomicLong(1);
private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -138,6 +139,6 @@ public class MemoryMessageStore extends NullMessageStore
@Override
public String getStoreType()
{
- return "Memory";
+ return TYPE;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
new file mode 100644
index 0000000000..20b6b7a8a6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+public class MemoryMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return MemoryMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new MemoryMessageStore();
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
new file mode 100644
index 0000000000..0d5a4850f6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+
+public class MessageStoreCreator
+{
+ private Map<String, MessageStoreFactory> _factories = new HashMap<String, MessageStoreFactory>();
+
+ public MessageStoreCreator()
+ {
+ QpidServiceLoader<MessageStoreFactory> qpidServiceLoader = new QpidServiceLoader<MessageStoreFactory>();
+ Iterable<MessageStoreFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(MessageStoreFactory.class);
+ for (MessageStoreFactory messageStoreFactory : factories)
+ {
+ String type = messageStoreFactory.getType();
+ MessageStoreFactory factory = _factories.put(type.toLowerCase(), messageStoreFactory);
+ if (factory != null)
+ {
+ throw new IllegalStateException("MessageStoreFactory with type name '" + type
+ + "' is already registered using class '" + factory.getClass().getName() + "', can not register class '"
+ + messageStoreFactory.getClass().getName() + "'");
+ }
+ }
+ }
+
+ public MessageStore createMessageStore(String storeType)
+ {
+ MessageStoreFactory factory = _factories.get(storeType.toLowerCase());
+ if (factory == null)
+ {
+ throw new IllegalConfigurationException("Unknown store type: " + storeType);
+ }
+ return factory.createMessageStore();
+ }
+
+ public Collection<MessageStoreFactory> getFactories()
+ {
+ return Collections.unmodifiableCollection(_factories.values());
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
new file mode 100644
index 0000000000..a1afd02f12
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public interface MessageStoreFactory
+{
+ String getType();
+
+ MessageStore createMessageStore();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index be08e309e6..c6bffbc1de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -24,8 +24,6 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.queue.AMQQueue;
public abstract class NullMessageStore implements MessageStore
@@ -78,26 +76,6 @@ public abstract class NullMessageStore implements MessageStore
}
@Override
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- }
-
- @Override
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- }
-
- @Override
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- }
-
- @Override
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/java/broker/src/main/java/org/apache/qpid/server/store/State.java
index 2783637b2a..1d0936cec4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/State.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/State.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-
public enum State
{
/** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */
@@ -30,7 +28,7 @@ public enum State
INITIALISING,
/**
* The initial set-up of the store has completed.
- * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+ * If the store is persistent, it has not yet loaded configuration from disk.
*
* From the point of view of the user, the store is essentially stopped.
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 154d7e6535..e9946d1860 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.derby;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -41,7 +40,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -55,12 +53,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
@@ -236,7 +231,7 @@ public class DerbyMessageStore implements MessageStore
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
- private static final String DERBY_STORE_TYPE = "DERBY";
+ public static final String TYPE = "DERBY";
private final StateManager _stateManager;
@@ -572,8 +567,7 @@ public class DerbyMessageStore implements MessageStore
BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
+ brh.completeBindingRecovery();
}
catch (SQLException e)
{
@@ -581,144 +575,6 @@ public class DerbyMessageStore implements MessageStore
}
}
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- throws SQLException
- {
- _logger.info("Recovering broker links...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
-
- try
- {
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- UUID id = new UUID(rs.getLong(2), rs.getLong(1));
- long createTime = rs.getLong(3);
- Blob argumentsAsBlob = rs.getBlob(4);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
- int size = dis.readInt();
-
- Map<String,String> arguments = new HashMap<String, String>();
-
- for(int i = 0; i < size; i++)
- {
- arguments.put(dis.readUTF(), dis.readUTF());
- }
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
-
- }
- }
- catch (IOException e)
- {
- throw new SQLException(e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- throws SQLException
- {
- _logger.info("Recovering bridges for link " + linkId + "...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
-
- try
- {
- stmt.setLong(1, linkId.getLeastSignificantBits());
- stmt.setLong(2, linkId.getMostSignificantBits());
-
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- UUID id = new UUID(rs.getLong(2), rs.getLong(1));
- long createTime = rs.getLong(3);
- Blob argumentsAsBlob = rs.getBlob(6);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
- int size = dis.readInt();
-
- Map<String,String> arguments = new HashMap<String, String>();
-
- for(int i = 0; i < size; i++)
- {
- arguments.put(dis.readUTF(), dis.readUTF());
- }
-
- brh.bridge(id, createTime, arguments);
-
- }
- brh.completeBridgeRecoveryForLink();
- }
- catch (IOException e)
- {
- throw new SQLException(e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
@Override
public void close() throws Exception
{
@@ -975,71 +831,6 @@ public class DerbyMessageStore implements MessageStore
}
}
- @Override
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
-
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
- try
- {
-
- stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, link.getQMFId().getMostSignificantBits());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
-
- try
- {
-
- insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
- insertStmt.setLong(3, link.getCreateTime());
-
- byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
- ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
- insertStmt.setBinaryStream(4,bis,argumentBytes.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
- }
- }
- }
-
private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
{
byte[] argumentBytes;
@@ -1072,139 +863,7 @@ public class DerbyMessageStore implements MessageStore
return argumentBytes;
}
- @Override
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- _logger.debug("public void deleteBrokerLink( " + link + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_LINKS);
- stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, link.getQMFId().getMostSignificantBits());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
-
- }
-
- @Override
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
- try
- {
-
- UUID id = bridge.getQMFId();
- stmt.setLong(1, id.getLeastSignificantBits());
- stmt.setLong(2, id.getMostSignificantBits());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
-
- try
- {
-
- insertStmt.setLong(1, id.getLeastSignificantBits());
- insertStmt.setLong(2, id.getMostSignificantBits());
-
- insertStmt.setLong(3, bridge.getCreateTime());
-
- UUID linkId = bridge.getLink().getQMFId();
- insertStmt.setLong(4, linkId.getLeastSignificantBits());
- insertStmt.setLong(5, linkId.getMostSignificantBits());
-
- byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
- ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
- insertStmt.setBinaryStream(6,bis,argumentBytes.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- @Override
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- _logger.debug("public void deleteBridge( " + bridge + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
- stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
- }
@Override
public Transaction newTransaction()
@@ -2134,8 +1793,9 @@ public class DerbyMessageStore implements MessageStore
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
buf.position(0);
+ buf.limit(length);
return buf;
}
@@ -2673,7 +2333,7 @@ public class DerbyMessageStore implements MessageStore
@Override
public String getStoreType()
{
- return DERBY_STORE_TYPE;
+ return TYPE;
}
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
new file mode 100644
index 0000000000..046b503d8a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.derby;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+
+public class DerbyMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return DerbyMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new DerbyMessageStore();
+ }
+
+}