summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-26 12:04:47 +0000
committerKeith Wall <kwall@apache.org>2014-03-26 12:04:47 +0000
commit2d36d9dc8d71d3db215ee58591d875c36e54a246 (patch)
treef2ba0347432606f4cd2ad3ce5fb75db066f4c26e
parent51dc0c2df39ec85b1685845ca839b689aaa5f8b9 (diff)
downloadqpid-python-2d36d9dc8d71d3db215ee58591d875c36e54a246.tar.gz
QPID-5410: [Java Broker/BDB]. Introduce a thin facade (EnvironmentFacade) between the BDBMessage and BDB JE's Environment/ReplicatedEnvironment. The motivation behind this facade is principally HA; there are a number of cases where JE requires the ReplicatedEnvironment is recreated. The facade layer allows for this to be done transparently from the upper tiers (the BDBMessageStore). The facade has two implementations StandardFacade used in the non-HA use case, and ReplicatedEnvironmentFacade in the HA case.
Key changes: * BDBMessageStore reverts to a single implementation without knowledge of HA. * BDBMessageStore now interacts with JE via the facade. * BDBHAVirtualHost is now responsible for the creation of ReplicatedEnvironmentFacade * BDBHAMessageStoreManagerMBean interrogates the facade * ReplicatedEnvironmentFacade monitors the group for changes in state (nodes becoming uncontactable etc), if such a state change is detected, the DatabasePinger fires a single transaction to determine if quorum still exists. If quorum does not exist, the environment is restarted, thus transition the environment into the UNKNOWN state. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1581797 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java57
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java12
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java49
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java1913
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java665
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java85
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java1742
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java313
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java (renamed from qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java)40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java58
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java)17
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java37
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java228
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java76
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java76
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java1083
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java152
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java26
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java170
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java14
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java128
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java208
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java336
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java70
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java4
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java46
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java3
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java40
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java2
34 files changed, 4799 insertions, 2913 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index aa4ddd8181..f36c1ecc6f 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -36,19 +36,12 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
* Management mbean for BDB HA.
- * <p>
- * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is
- * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the
- * same classloader as broker-plugins-management-jmx. See the <b>Fragment-Host:</b> header within the MANIFEST.MF
- * of this bundle.
- * </p>
*/
public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
{
@@ -63,7 +56,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
try
{
GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
- final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
itemNames,
@@ -71,7 +64,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
GROUP_MEMBER_ATTRIBUTE_TYPES );
GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
GROUP_MEMBER_ROW,
- new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+ new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
}
catch (final OpenDataException ode)
{
@@ -79,44 +72,46 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
- private final BDBHAMessageStore _store;
+ private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private final String _objectName;
- protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store, ManagedObject parent) throws JMException
+ protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
{
super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
- LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
- _store = store;
+ LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
+ _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+ _objectName = ObjectName.quote(virtualHostName);
register();
}
@Override
public String getObjectInstanceName()
{
- return ObjectName.quote(_store.getName());
+ return _objectName;
}
@Override
public String getGroupName()
{
- return _store.getGroupName();
+ return _replicatedEnvironmentFacade.getGroupName();
}
@Override
public String getNodeName()
{
- return _store.getNodeName();
+ return _replicatedEnvironmentFacade.getNodeName();
}
@Override
public String getNodeHostPort()
{
- return _store.getNodeHostPort();
+ return _replicatedEnvironmentFacade.getHostPort();
}
@Override
public String getHelperHostPort()
{
- return _store.getHelperHostPort();
+ return _replicatedEnvironmentFacade.getHelperHostPort();
}
@Override
@@ -124,7 +119,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _store.getDurability();
+ return _replicatedEnvironmentFacade.getDurability();
}
catch (RuntimeException e)
{
@@ -137,7 +132,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _store.isCoalescingSync();
+ return _replicatedEnvironmentFacade.isCoalescingSync();
}
@Override
@@ -145,7 +140,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _store.getNodeState();
+ return _replicatedEnvironmentFacade.getNodeState();
}
catch (RuntimeException e)
{
@@ -159,7 +154,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _store.isDesignatedPrimary();
+ return _replicatedEnvironmentFacade.isDesignatedPrimary();
}
catch (RuntimeException e)
{
@@ -172,7 +167,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
public TabularData getAllNodesInGroup() throws IOException, JMException
{
final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
- final List<Map<String, String>> members = _store.getGroupMembers();
+ final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
for (Map<String, String> map : members)
{
@@ -187,9 +182,9 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _store.removeNodeFromGroup(nodeName);
+ _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
}
- catch (StoreException e)
+ catch (RuntimeException e)
{
LOGGER.error("Failed to remove node " + nodeName + " from group", e);
throw new JMException(e.getMessage());
@@ -201,11 +196,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _store.setDesignatedPrimary(primary);
+ _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
}
- catch (StoreException e)
+ catch (RuntimeException e)
{
- LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+ LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e);
throw new JMException(e.getMessage());
}
}
@@ -215,9 +210,9 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _store.updateAddress(nodeName, newHostName, newPort);
+ _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
}
- catch(StoreException e)
+ catch(RuntimeException e)
{
LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
throw new JMException(e.getMessage());
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 0492350a25..16199d30a3 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -28,11 +28,12 @@ import org.apache.qpid.server.jmx.MBeanProvider;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
* This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual
- * host and of type {@link BDBHAMessageStore#TYPE}.
+ * host and of type {@link ReplicatedEnvironmentFacade#TYPE}.
*
*/
public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@@ -48,7 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
public boolean isChildManageableByMBean(ConfiguredObject child)
{
return (child instanceof VirtualHost
- && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
}
@Override
@@ -56,14 +57,15 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
{
VirtualHost virtualHostChild = (VirtualHost) child;
- BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore();
+ BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Creating mBean for child " + child);
}
- return new BDBHAMessageStoreManagerMBean(messageStore, (ManagedObject) parent);
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
+ return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade, (ManagedObject) parent);
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 0d963ebdae..fa16d1061a 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -37,10 +37,9 @@ import javax.management.openmbean.TabularData;
import junit.framework.TestCase;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
-import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
@@ -53,7 +52,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
private static final String TEST_STORE_NAME = "testStoreName";
private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
- private BDBHAMessageStore _store;
+ private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
private BDBHAMessageStoreManagerMBean _mBean;
private AMQManagedObject _mBeanParent;
@@ -62,10 +61,10 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
super.setUp();
- _store = mock(BDBHAMessageStore.class);
+ _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class);
_mBeanParent = mock(AMQManagedObject.class);
when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
- _mBean = new BDBHAMessageStoreManagerMBean(_store, _mBeanParent);
+ _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacade, _mBeanParent);
}
@Override
@@ -76,64 +75,62 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testObjectName() throws Exception
{
- when(_store.getName()).thenReturn(TEST_STORE_NAME);
-
String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
assertEquals(expectedObjectName, _mBean.getObjectName().toString());
}
public void testGroupName() throws Exception
{
- when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(TEST_GROUP_NAME);
assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
}
public void testNodeName() throws Exception
{
- when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+ when(_replicatedEnvironmentFacade.getNodeName()).thenReturn(TEST_NODE_NAME);
assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
}
public void testNodeHostPort() throws Exception
{
- when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+ when(_replicatedEnvironmentFacade.getHostPort()).thenReturn(TEST_NODE_HOST_PORT);
assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
}
public void testHelperHostPort() throws Exception
{
- when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+ when(_replicatedEnvironmentFacade.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
}
public void testDurability() throws Exception
{
- when(_store.getDurability()).thenReturn(TEST_DURABILITY);
+ when(_replicatedEnvironmentFacade.getDurability()).thenReturn(TEST_DURABILITY);
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
public void testCoalescingSync() throws Exception
{
- when(_store.isCoalescingSync()).thenReturn(true);
+ when(_replicatedEnvironmentFacade.isCoalescingSync()).thenReturn(true);
assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
}
public void testNodeState() throws Exception
{
- when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+ when(_replicatedEnvironmentFacade.getNodeState()).thenReturn(TEST_NODE_STATE);
assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
}
public void testDesignatedPrimaryFlag() throws Exception
{
- when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+ when(_replicatedEnvironmentFacade.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
}
@@ -141,29 +138,29 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testGroupMembersForGroupWithOneNode() throws Exception
{
List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
- when(_store.getGroupMembers()).thenReturn(members);
+ when(_replicatedEnvironmentFacade.getGroupMembers()).thenReturn(members);
final TabularData resultsTable = _mBean.getAllNodesInGroup();
- assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+ assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
final int numberOfDataRows = resultsTable.size();
assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
- assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
- assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
}
public void testRemoveNodeFromReplicationGroup() throws Exception
{
_mBean.removeNodeFromGroup(TEST_NODE_NAME);
- verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ verify(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
}
public void testRemoveNodeFromReplicationGroupWithError() throws Exception
{
- doThrow(new StoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(TEST_NODE_NAME);
try
{
@@ -180,12 +177,12 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
_mBean.setDesignatedPrimary(true);
- verify(_store).setDesignatedPrimary(true);
+ verify(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
}
public void testSetAsDesignatedPrimaryWithError() throws Exception
{
- doThrow(new StoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+ doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacade).setDesignatedPrimary(true);
try
{
@@ -205,7 +202,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
_mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
- verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ verify(_replicatedEnvironmentFacade).updateAddress(TEST_NODE_NAME, newHostName, newPort);
}
private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -220,8 +217,8 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
private Map<String, String> createTestNodeResult()
{
Map<String, String> items = new HashMap<String, String>();
- items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
- items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
return items;
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
deleted file mode 100644
index 9a06f06999..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ /dev/null
@@ -1,1913 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.*;
-import com.sleepycat.je.Transaction;
-
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.*;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
-import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
-import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
-import org.apache.qpid.server.store.berkeleydb.tuple.*;
-import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import org.apache.qpid.util.FileUtils;
-
-public abstract class AbstractBDBMessageStore implements MessageStore, DurableConfigurationStore
-{
- private static final Logger LOGGER = Logger.getLogger(AbstractBDBMessageStore.class);
-
- private static final int LOCK_RETRY_ATTEMPTS = 5;
-
- public static final int VERSION = 8;
-
- private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
- {{
- put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
- put(EnvironmentConfig.STATS_COLLECT, "false"); // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84
- }});
-
- private final AtomicBoolean _closed = new AtomicBoolean(false);
-
- private Environment _environment;
-
- private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
- private static String CONFIGURED_OBJECT_HIERARCHY = "CONFIGURED_OBJECT_HIERARCHY";
-
- private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
- private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
- private static String BRIDGEDB_NAME = "BRIDGES";
- private static String LINKDB_NAME = "LINKS";
- private static String XIDDB_NAME = "XIDS";
- private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
-
- private Database _configuredObjectsDb;
- private Database _configuredObjectHierarchyDb;
- private Database _configVersionDb;
- private Database _messageMetaDataDb;
- private Database _messageContentDb;
- private Database _deliveryDb;
- private Database _bridgeDb;
- private Database _linkDb;
- private Database _xidDb;
-
- /* =======
- * Schema:
- * =======
- *
- * Queue:
- * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
- * arguments(FieldTable encoded as binary), exclusive (boolean)
- *
- * Exchange:
- * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean)
- *
- * Binding:
- * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString),
- * arguments (FieldTable encoded as binary) - 0 (zero)
- *
- * QueueEntry:
- * queueName(AMQShortString), messageId (long) - 0 (zero)
- *
- * Message (MetaData):
- * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
- *
- * Message (Content):
- * messageId (long), byteOffset (integer) - dataLength(integer), data(binary)
- */
-
- private final AtomicLong _messageId = new AtomicLong(0);
-
- protected final StateManager _stateManager;
-
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
- private ConfigurationRecoveryHandler _configRecoveryHandler;
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
-
- private final EventManager _eventManager = new EventManager();
- private String _storeLocation;
-
- private Map<String, String> _envConfigMap;
- private VirtualHost _virtualHost;
-
-
- public AbstractBDBMessageStore()
- {
- _stateManager = new StateManager(_eventManager);
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
-
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
- {
- _stateManager.attainState(State.INITIALISING);
-
- _configRecoveryHandler = recoveryHandler;
- _virtualHost = virtualHost;
- }
-
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
- if(_stateManager.isInState(State.INITIAL))
- {
- // Is acting as a message store, but not a durable config store
- _stateManager.attainState(State.INITIALISING);
- }
-
- _messageRecoveryHandler = messageRecoveryHandler;
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _virtualHost = virtualHost;
-
- completeInitialisation();
- }
-
- private void completeInitialisation()
- {
- configure();
-
- _stateManager.attainState(State.INITIALISED);
- }
-
- public synchronized void activate()
- {
- // check if acting as a durable config store, but not a message store
- if(_stateManager.isInState(State.INITIALISING))
- {
- completeInitialisation();
- }
- _stateManager.attainState(State.ACTIVATING);
-
- if(_configRecoveryHandler != null)
- {
- recoverConfig(_configRecoveryHandler);
- }
- if(_messageRecoveryHandler != null)
- {
- recoverMessages(_messageRecoveryHandler);
- }
- if(_tlogRecoveryHandler != null)
- {
- recoverQueueEntries(_tlogRecoveryHandler);
- }
-
- _stateManager.attainState(State.ACTIVE);
- }
-
- public org.apache.qpid.server.store.Transaction newTransaction()
- {
- return new BDBTransaction();
- }
-
- /**
- * Called after instantiation in order to configure the message store.
- *
- * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
- *
- * @throws Exception If any error occurs that means the store is unable to configure itself.
- */
- public void configure()
- {
- configure(_messageRecoveryHandler != null);
- }
-
- public void configure(boolean isMessageStore)
- {
- String name = _virtualHost.getName();
- final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name;
-
- String storeLocation;
- if(isMessageStore)
- {
- storeLocation = (String) _virtualHost.getAttribute(VirtualHost.STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
- else // we are acting only as the durable config store
- {
- storeLocation = (String) _virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
- }
-
- Object overfullAttr = _virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = _virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
-
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
-
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- File environmentPath = new File(storeLocation);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
-
- _storeLocation = storeLocation;
-
- _envConfigMap = new HashMap<String, String>();
- _envConfigMap.putAll(ENVCONFIG_DEFAULTS);
-
- Object bdbEnvConfigAttr = _virtualHost.getAttribute("bdbEnvironmentConfig");
- if(bdbEnvConfigAttr instanceof Map)
- {
- _envConfigMap.putAll((Map)bdbEnvConfigAttr);
- }
-
- LOGGER.info("Configuring BDB message store");
-
- setupStore(environmentPath);
- }
-
- @Override
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
- /**
- * Move the store state from INITIAL to ACTIVE without actually recovering.
- *
- * This is required if you do not want to perform recovery of the store data
- *
- * @throws org.apache.qpid.server.store.StoreException if the store is not in the correct state
- */
- void startWithNoRecover() throws StoreException
- {
- _stateManager.attainState(State.INITIALISING);
- _stateManager.attainState(State.INITIALISED);
- _stateManager.attainState(State.ACTIVATING);
- _stateManager.attainState(State.ACTIVE);
- }
-
- protected void setupStore(File storePath)
- {
- _environment = createEnvironment(storePath);
-
- new Upgrader(_environment, _virtualHost).upgradeIfNecessary();
-
- openDatabases();
-
- _totalStoreSize = getSizeOnDisk();
- }
-
- protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException;
-
- public Environment getEnvironment()
- {
- return _environment;
- }
-
- protected final VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- private void openDatabases() throws DatabaseException
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
-
- //This is required if we are wanting read only access.
- dbConfig.setReadOnly(false);
-
- _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
- _configuredObjectHierarchyDb = openDatabase(CONFIGURED_OBJECT_HIERARCHY, dbConfig);
- _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
- _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
- _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
- _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
- _linkDb = openDatabase(LINKDB_NAME, dbConfig);
- _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig);
- _xidDb = openDatabase(XIDDB_NAME, dbConfig);
- }
-
- private Database openDatabase(final String dbName, final DatabaseConfig dbConfig)
- {
- // if opening read-only and the database doesn't exist, then you can't create it
- return dbConfig.getReadOnly() && !_environment.getDatabaseNames().contains(dbName)
- ? null
- : _environment.openDatabase(null, dbName, dbConfig);
- }
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
- public void close()
- {
- if (_closed.compareAndSet(false, true))
- {
- _stateManager.attainState(State.CLOSING);
- closeInternal();
- _stateManager.attainState(State.CLOSED);
- }
- }
-
- protected void closeInternal()
- {
- if (_messageMetaDataDb != null)
- {
- LOGGER.info("Closing message metadata database");
- _messageMetaDataDb.close();
- }
-
- if (_messageContentDb != null)
- {
- LOGGER.info("Closing message content database");
- _messageContentDb.close();
- }
-
- if (_configuredObjectsDb != null)
- {
- LOGGER.info("Closing configurable objects database");
- _configuredObjectsDb.close();
- }
-
- if (_configuredObjectHierarchyDb != null)
- {
- LOGGER.info("Closing configurable object hierarchy database");
- _configuredObjectHierarchyDb.close();
- }
-
- if (_deliveryDb != null)
- {
- LOGGER.info("Close delivery database");
- _deliveryDb.close();
- }
-
- if (_bridgeDb != null)
- {
- LOGGER.info("Close bridge database");
- _bridgeDb.close();
- }
-
- if (_linkDb != null)
- {
- LOGGER.info("Close link database");
- _linkDb.close();
- }
-
-
- if (_xidDb != null)
- {
- LOGGER.info("Close xid database");
- _xidDb.close();
- }
-
-
- if (_configVersionDb != null)
- {
- LOGGER.info("Close config version database");
- _configVersionDb.close();
- }
-
- closeEnvironment();
-
- }
-
- private void closeEnvironment() throws DatabaseException
- {
- if (_environment != null)
- {
- // Clean the log before closing. This makes sure it doesn't contain
- // redundant data. Closing without doing this means the cleaner may not
- // get a chance to finish.
- try
- {
- _environment.cleanLog();
- }
- finally
- {
- _environment.close();
- }
- }
- }
-
-
- private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler)
- {
- try
- {
- final int configVersion = getConfigVersion();
- recoveryHandler.beginConfigurationRecovery(this, configVersion);
- loadConfiguredObjects(recoveryHandler);
-
- final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
- if(newConfigVersion != configVersion)
- {
- updateConfigVersion(newConfigVersion);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
-
- }
-
- private void updateConfigVersion(int newConfigVersion) throws StoreException
- {
- Cursor cursor = null;
- try
- {
- Transaction txn = _environment.beginTransaction(null, null);
- cursor = _configVersionDb.openCursor(txn, null);
- DatabaseEntry key = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0,key);
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- IntegerBinding.intToEntry(newConfigVersion, value);
- OperationStatus status = cursor.put(key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error setting config version: " + status);
- }
- }
- cursor.close();
- cursor = null;
- txn.commit();
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
- private int getConfigVersion() throws StoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = _configVersionDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- return IntegerBinding.entryToInt(value);
- }
-
- // Insert 0 as the default config version
- IntegerBinding.intToEntry(0,value);
- ByteBinding.byteToEntry((byte) 0,key);
- OperationStatus status = _configVersionDb.put(null, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error initialising config version: " + status);
- }
- return 0;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
- {
- Cursor objectsCursor = null;
- Cursor hierarchyCursor = null;
- try
- {
- objectsCursor = _configuredObjectsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
- new HashMap<UUID, BDBConfiguredObjectRecord>();
-
- while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
- BDBConfiguredObjectRecord configuredObject =
- (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
- configuredObjects.put(configuredObject.getId(), configuredObject);
- }
-
- // set parents
- hierarchyCursor = _configuredObjectHierarchyDb.openCursor(null, null);
- while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
- UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
- BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
- if(child != null)
- {
- ConfiguredObjectRecord parent = configuredObjects.get(parentId);
- if(parent != null)
- {
- child.addParent(hk.getParentType(), parent);
- }
- else if(hk.getParentType().equals("Exchange"))
- {
- // TODO - remove this hack for the pre-defined exchanges
- child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
- }
- }
- }
-
- for (ConfiguredObjectRecord record : configuredObjects.values())
- {
- crh.configuredObject(record);
- }
-
-
-
- }
- finally
- {
- closeCursorSafely(objectsCursor);
- closeCursorSafely(hierarchyCursor);
- }
- }
-
- private void closeCursorSafely(Cursor cursor)
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
-
- private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
- {
- StoredMessageRecoveryHandler mrh = msrh.begin();
-
- Cursor cursor = null;
- try
- {
- cursor = _messageMetaDataDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
- long maxId = 0;
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- mrh.message(message);
-
- maxId = Math.max(maxId, messageId);
- }
-
- _messageId.set(maxId);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
- }
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws DatabaseException
- {
- QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
- ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueEntryKey qek = keyBinding.entryToObject(key);
-
- entries.add(qek);
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- qerh.queueEntry(queueId, messageId);
- }
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
- cursor = null;
- try
- {
- cursor = _xidDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
- preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
- }
-
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
-
- public void removeMessage(long messageId, boolean sync) throws StoreException
- {
-
- boolean complete = false;
- com.sleepycat.je.Transaction tx = null;
-
- Random rand = null;
- int attempts = 0;
- try
- {
- do
- {
- tx = null;
- try
- {
- tx = _environment.beginTransaction(null, null);
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removing message id " + messageId);
- }
-
-
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted metadata for message " + messageId);
- }
-
- //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- _messageContentDb.delete(tx, contentKeyEntry);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleted content for message " + messageId);
- }
-
- commit(tx, sync);
- complete = true;
- tx = null;
- }
- catch (LockConflictException e)
- {
- try
- {
- if(tx != null)
- {
- tx.abort();
- }
- }
- catch(DatabaseException e2)
- {
- LOGGER.warn("Unable to abort transaction after LockConflictExcption", e2);
- // rethrow the original log conflict exception, the secondary exception should already have
- // been logged.
- throw e;
- }
-
-
- LOGGER.warn("Lock timeout exception. Retrying (attempt "
- + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
-
- if(++attempts < LOCK_RETRY_ATTEMPTS)
- {
- if(rand == null)
- {
- rand = new Random();
- }
-
- try
- {
- Thread.sleep(500l + (long)(500l * rand.nextDouble()));
- }
- catch (InterruptedException e1)
- {
-
- }
- }
- else
- {
- // rethrow the lock conflict exception since we could not solve by retrying
- throw e;
- }
- }
- }
- while(!complete);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Unexpected BDB exception", e);
-
- if (tx != null)
- {
- try
- {
- tx.abort();
- tx = null;
- }
- catch (DatabaseException e1)
- {
- throw new StoreException("Error aborting transaction " + e1, e1);
- }
- }
-
- throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
- finally
- {
- if (tx != null)
- {
- try
- {
- tx.abort();
- tx = null;
- }
- catch (DatabaseException e1)
- {
- throw new StoreException("Error aborting transaction " + e1, e1);
- }
- }
- }
- }
-
- @Override
- public void create(ConfiguredObjectRecord configuredObject) throws StoreException
- {
-
- if (_stateManager.isInState(State.ACTIVE))
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
-
- storeConfiguredObjectEntry(txn, configuredObject);
-
- txn.commit();
- }
- }
-
- public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- Collection<UUID> removed = new ArrayList<UUID>(objects.length);
- for(ConfiguredObjectRecord record : objects)
- {
- if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS)
- {
- removed.add(record.getId());
- }
- }
-
- txn.commit();
- return removed.toArray(new UUID[removed.size()]);
- }
-
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- for(ConfiguredObjectRecord record : records)
- {
- update(createIfNecessary, record, txn);
- }
- txn.commit();
- }
-
- private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws
- StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
- }
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(record.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- DatabaseEntry newValue = new DatabaseEntry();
- ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
-
- OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
- final boolean isNewRecord = status == OperationStatus.NOTFOUND;
- if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord))
- {
-
- // write the updated entry to the store
- configuredObjectBinding.objectToEntry(record, newValue);
- status = _configuredObjectsDb.put(txn, key, newValue);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error updating queue details within the store: " + status);
- }
- if(isNewRecord)
- {
- writeHierarchyRecords(txn, record);
- }
- }
- else if (status != OperationStatus.NOTFOUND)
- {
- throw new StoreException("Error finding queue details within the store: " + status);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error updating queue details within the store: " + e,e);
- }
- }
-
- /**
- * Places a message onto a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The the queue to place the message on.
- * @param messageId The message to enqueue.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
- keyBinding.objectToEntry(dd, key);
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Enqueuing message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " in transaction " + tx);
- }
- _deliveryDb.put(tx, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
- throw new StoreException("Error writing enqueued message with id " + messageId + " for queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " to database", e);
- }
- }
-
- /**
- * Extracts a message from a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The queue to take the message from.
- * @param messageId The message to dequeue.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
- long messageId) throws StoreException
- {
-
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
- UUID id = queue.getId();
- keyBinding.objectToEntry(queueEntryKey, key);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Dequeue message id " + messageId + " from queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
- }
-
- try
- {
-
- OperationStatus status = _deliveryDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Removed message " + messageId + " on queue "
- + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
- + " from delivery db");
-
- }
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
- LOGGER.error(tx);
-
- throw new StoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
- }
- }
-
-
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
- keyBinding.objectToEntry(xid,key);
-
- DatabaseEntry value = new DatabaseEntry();
- PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- valueBinding.objectToEntry(preparedTransaction, value);
-
- try
- {
- _xidDb.put(txn, key, value);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Failed to write xid: " + e.getMessage(), e);
- throw new StoreException("Error writing xid to database", e);
- }
- }
-
- private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
- throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- Xid xid = new Xid(format, globalId, branchId);
- XidBinding keyBinding = XidBinding.getInstance();
-
- keyBinding.objectToEntry(xid, key);
-
-
- try
- {
-
- OperationStatus status = _xidDb.delete(txn, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new StoreException("Unable to find xid");
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Unable to remove xid");
- }
-
- }
- catch (DatabaseException e)
- {
-
- LOGGER.error("Failed to remove xid ", e);
- LOGGER.error(txn);
-
- throw new StoreException("Error accessing database while removing xid: " + e.getMessage(), e);
- }
- }
-
- /**
- * Commits all operations performed within a given transaction.
- *
- * @param tx The transaction to commit all operations for.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws
- StoreException
- {
- if (tx == null)
- {
- throw new StoreException("Fatal internal error: transactional is null at commitTran");
- }
-
- StoreFuture result;
- try
- {
- result = commit(tx, syncCommit);
-
- if (LOGGER.isDebugEnabled())
- {
- String transactionType = syncCommit ? "synchronous" : "asynchronous";
- LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error commit tx: " + e.getMessage(), e);
- }
-
- return result;
- }
-
- /**
- * Abandons all operations performed within a given transaction.
- *
- * @param tx The transaction to abandon.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("abortTran called for transaction " + tx);
- }
-
- try
- {
- tx.abort();
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error aborting transaction: " + e.getMessage(), e);
- }
- }
-
- /**
- * Primarily for testing purposes.
- *
- * @param queueId
- *
- * @return a list of message ids for messages enqueued for a particular queue
- */
- List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
-
- QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(dd, key);
-
- DatabaseEntry value = new DatabaseEntry();
-
- LinkedList<Long> messageIds = new LinkedList<Long>();
-
- OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = keyBinding.entryToObject(key);
-
- while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
- {
-
- messageIds.add(dd.getMessageId());
- status = cursor.getNext(key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- dd = keyBinding.entryToObject(key);
- }
- }
-
- return messageIds;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Database error: " + e.getMessage(), e);
- }
- finally
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error closing cursor: " + e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- public long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
- /**
- * Stores a chunk of message data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
- * @param contentBody The content of the data chunk.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
- ByteBuffer contentBody) throws StoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding messageBinding = ContentBinding.getInstance();
- messageBinding.objectToEntry(contentBody.array(), value);
- try
- {
- OperationStatus status = _messageContentDb.put(tx, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error adding content for message id " + messageId + ": " + status);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
-
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Stores message meta-data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
- StorableMessageMetaData messageMetaData)
- throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("storeMetaData called for transaction " + tx
- + ", messageId " + messageId
- + ", messageMetaData " + messageMetaData);
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
-
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
- messageBinding.objectToEntry(messageMetaData, value);
- try
- {
- _messageMetaDataDb.put(tx, key, value);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Retrieves message meta-data.
- *
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
- + messageId + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
-
- try
- {
- OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Metadata not found for message with id " + messageId);
- }
-
- StorableMessageMetaData mdd = messageBinding.entryToObject(value);
-
- return mdd;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
- }
- }
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- LongBinding.longToEntry(messageId, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
-
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
- }
-
- try
- {
-
- int written = 0;
- OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
- if (status == OperationStatus.SUCCESS)
- {
- byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.length;
- if (offset > size)
- {
- throw new StoreException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
-
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
-
- dst.put(dataAsBytes, offset, written);
- }
- return written;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- @SuppressWarnings("unchecked")
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
- if(metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNewMessageId(), metaData);
- }
- }
-
- //Package getters for the various databases used by the Store
-
- Database getMetaDataDb()
- {
- return _messageMetaDataDb;
- }
-
- Database getContentDb()
- {
- return _messageContentDb;
- }
-
- Database getDeliveryDb()
- {
- return _deliveryDb;
- }
-
- /**
- * Makes the specified configured object persistent.
- *
- *
- * @param txn
- * @param configuredObject Details of the configured object to store.
- * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
- */
- private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- LOGGER.debug("Storing configured object: " + configuredObject);
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(configuredObject.getId(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
-
- queueBinding.objectToEntry(configuredObject, value);
- try
- {
- OperationStatus status = _configuredObjectsDb.put(txn, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " to database: "
- + status);
- }
- writeHierarchyRecords(txn, configuredObject);
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error writing configured object " + configuredObject
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
- {
- OperationStatus status;
- HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance();
- DatabaseEntry hierarchyKey = new DatabaseEntry();
- DatabaseEntry hierarchyValue = new DatabaseEntry();
-
- for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet())
- {
-
- hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey);
- UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue);
- status = _configuredObjectHierarchyDb.put(txn, hierarchyKey, hierarchyValue);
- if (status != OperationStatus.SUCCESS)
- {
- throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: "
- + status);
- }
- }
- }
-
- private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException
- {
- UUID id = record.getId();
- Map<String, ConfiguredObjectRecord> parents = record.getParents();
- LOGGER.debug("Removing configured object: " + id);
- DatabaseEntry key = new DatabaseEntry();
- UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
- uuidBinding.objectToEntry(id, key);
- try
- {
- OperationStatus status = _configuredObjectsDb.delete(tx, key);
- if(status == OperationStatus.SUCCESS)
- {
- for(String parentType : parents.keySet())
- {
- DatabaseEntry hierarchyKey = new DatabaseEntry();
- HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance();
- keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey);
- _configuredObjectHierarchyDb.delete(tx, hierarchyKey);
- }
- }
- return status;
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Error deleting of configured object with id " + id + " from database", e);
- }
- }
-
- protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException;
-
-
- private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
- {
-
- private final long _messageId;
- private final boolean _isRecovered;
-
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-
- private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
- {
- this(messageId, metaData, false);
- }
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
- {
- _messageId = messageId;
- _isRecovered = isRecovered;
-
- if(!_isRecovered)
- {
- _metaData = metaData;
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- public StorableMessageMetaData getMetaData()
- {
- StorableMessageMetaData metaData = _metaDataRef.get();
- if(metaData == null)
- {
- metaData = AbstractBDBMessageStore.this.getMessageMetaData(_messageId);
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- return metaData;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- src = src.slice();
-
- if(_data == null)
- {
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
- }
- else
- {
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
-
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
- }
-
- }
-
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
- {
- return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- }
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- return ByteBuffer.wrap(data,offsetInMessage,size);
- }
- else
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.limit(length);
- buf.position(0);
- return buf;
- }
- }
-
- synchronized void store(com.sleepycat.je.Transaction txn)
- {
- if (!stored())
- {
- try
- {
- _dataRef = new SoftReference<byte[]>(_data);
- AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
- }
- catch(DatabaseException e)
- {
- throw new StoreException(e);
- }
- finally
- {
- _metaData = null;
- _data = null;
- }
- }
- }
-
- public synchronized StoreFuture flushToStore()
- {
- if(!stored())
- {
- com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
- store(txn);
- AbstractBDBMessageStore.this.commit(txn,true);
- storedSizeChange(getMetaData().getContentSize());
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void remove()
- {
- int delta = getMetaData().getContentSize();
- AbstractBDBMessageStore.this.removeMessage(_messageId, false);
- storedSizeChange(-delta);
- }
-
- private boolean stored()
- {
- return _metaData == null || _isRecovered;
- }
- }
-
- private class BDBTransaction implements org.apache.qpid.server.store.Transaction
- {
- private com.sleepycat.je.Transaction _txn;
- private int _storeSizeIncrease;
-
- private BDBTransaction()
- {
- try
- {
- _txn = _environment.beginTransaction(null, null);
- }
- catch (DatabaseException e)
- {
- LOGGER.error("Exception during transaction begin, closing store environment.", e);
- closeEnvironmentSafely();
-
- throw new StoreException("Exception during transaction begin, store environment closed.", e);
- }
- }
-
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- if(message.getStoredMessage() instanceof StoredBDBMessage)
- {
- final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
- }
-
- AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
- }
-
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
- }
-
- public void commitTran()
- {
- AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
- AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
- }
-
- public StoreFuture commitTranAsync()
- {
- AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
- return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
- }
-
- public void abortTran()
- {
- AbstractBDBMessageStore.this.abortTran(_txn);
- }
-
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
-
- }
-
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues)
- {
- AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
- }
- }
-
- private void storedSizeChange(final int delta)
- {
- if(getPersistentSizeHighThreshold() > 0)
- {
- synchronized (this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 2*delta;
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- _totalStoreSize = getSizeOnDisk();
-
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- _totalStoreSize = getSizeOnDisk();
-
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk();
-
- _totalStoreSize = getSizeOnDisk();
-
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- }
- }
-
- private void reduceSizeOnDisk()
- {
- _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
- boolean cleaned = false;
- while (_environment.cleanLog() > 0)
- {
- cleaned = true;
- }
- if (cleaned)
- {
- CheckpointConfig force = new CheckpointConfig();
- force.setForce(true);
- _environment.checkpoint(force);
- }
-
-
- _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
- }
-
- private long getSizeOnDisk()
- {
- return _environment.getStats(null).getTotalLogSize();
- }
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
- }
-
- private void setEnvironmentConfigProperties(EnvironmentConfig envConfig)
- {
- for (Map.Entry<String, String> configItem : _envConfigMap.entrySet())
- {
- LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
- envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
- }
- }
-
- protected EnvironmentConfig createEnvironmentConfig()
- {
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
-
- setEnvironmentConfigProperties(envConfig);
-
- envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
-
- return envConfig;
- }
-
- protected void closeEnvironmentSafely()
- {
- try
- {
- _environment.close();
- }
- catch (DatabaseException ex)
- {
- LOGGER.error("Exception closing store environment", ex);
- }
- catch (IllegalStateException ex)
- {
- LOGGER.error("Exception closing store environment", ex);
- }
- }
-
-
- private class LoggingAsyncExceptionListener implements ExceptionListener
- {
- @Override
- public void exceptionThrown(ExceptionEvent event)
- {
- LOGGER.error("Asynchronous exception thrown by BDB thread '"
- + event.getThreadName() + "'", event.getException());
- }
- }
-
- @Override
- public void onDelete()
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting store " + _storeLocation);
- }
-
- if (_storeLocation != null)
- {
- File location = new File(_storeLocation);
- if (location.exists())
- {
- if (!FileUtils.delete(location, true))
- {
- LOGGER.error("Cannot delete " + _storeLocation);
- }
- }
- }
- }
-
- void setVirtualHost(final VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
deleted file mode 100644
index bdd48e99bb..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ /dev/null
@@ -1,665 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.HAMessageStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.ReplicaAckPolicy;
-import com.sleepycat.je.Durability.SyncPolicy;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.OperationFailureException;
-import com.sleepycat.je.Transaction;
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-import com.sleepycat.je.rep.ReplicationMutableConfig;
-import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
-
-import javax.security.auth.Subject;
-
-public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
-{
- private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
-
- private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY);
-
- public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
- public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
-
- @SuppressWarnings("serial")
- private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
- {{
- /**
- * Parameter decreased as the 24h default may lead very large log files for most users.
- */
- put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
- /**
- * Parameter increased as the 5 s default may lead to spurious timeouts.
- */
- put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
- /**
- * Parameter increased as the 10 s default may lead to spurious timeouts.
- */
- put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
- /**
- * Parameter increased as the 10 h default may cause user confusion.
- */
- put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
- /**
- * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
- * is scheduled to become default after JE 5.0.48.
- */
- put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
- /**
- * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
- * with NO_SYN durability in case if such Node crushes.
- */
- put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
- }});
-
- public static final String TYPE = "BDB-HA";
-
- private String _groupName;
- private String _nodeName;
- private String _nodeHostPort;
- private String _helperHostPort;
- private Durability _durability;
-
- private String _name;
-
- private CommitThreadWrapper _commitThreadWrapper;
- private boolean _coalescingSync;
- private boolean _designatedPrimary;
- private Map<String, String> _repConfig;
-
- @Override
- public void configure()
- {
- //Mandatory configuration
- VirtualHost virtualHost = getVirtualHost();
- _groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
- _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
- _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
- _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
- _name = virtualHost.getName();
-
- //Optional configuration
- String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null);
- if (durabilitySetting == null)
- {
- _durability = DEFAULT_DURABILITY;
- }
- else
- {
- _durability = Durability.parse(durabilitySetting);
- }
- _designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
- _coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
-
- _repConfig = new HashMap<String, String>(REPCONFIG_DEFAULTS);
- Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
- if(repConfigAttr instanceof Map)
- {
- _repConfig.putAll((Map)repConfigAttr);
- }
-
- if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC)
- {
- throw new StoreException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
- + "! Please set highAvailability.coalescingSync to false in store configuration.");
- }
-
- super.configure();
- }
-
-
- private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if(attrValue != null)
- {
- return attrValue.toString();
- }
- else
- {
- throw new StoreException("BDB HA configuration key not found. Please specify configuration attribute: "
- + attributeName);
- }
- }
-
- private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if(attrValue != null)
- {
- return attrValue.toString();
- }
- return defaultVal;
- }
-
- private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
- {
- Object attrValue = virtualHost.getAttribute(attributeName);
- if(attrValue != null)
- {
- if(attrValue instanceof Boolean)
- {
- return ((Boolean) attrValue).booleanValue();
- }
- else if(attrValue instanceof String)
- {
- return Boolean.parseBoolean((String)attrValue);
- }
-
- }
- return defaultVal;
- }
-
-
- @Override
- protected void setupStore(File storePath) throws DatabaseException
- {
- super.setupStore(storePath);
-
- if(_coalescingSync)
- {
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + getVirtualHost().getName(), getEnvironment());
- _commitThreadWrapper.startCommitThread();
- }
- }
-
- @Override
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
- {
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Environment path " + environmentPath.getAbsolutePath());
- LOGGER.info("Group name " + _groupName);
- LOGGER.info("Node name " + _nodeName);
- LOGGER.info("Node host port " + _nodeHostPort);
- LOGGER.info("Helper host port " + _helperHostPort);
- LOGGER.info("Durability " + _durability);
- LOGGER.info("Coalescing sync " + _coalescingSync);
- LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
- }
-
- final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
-
- replicationConfig.setHelperHosts(_helperHostPort);
- replicationConfig.setDesignatedPrimary(_designatedPrimary);
- setReplicationConfigProperties(replicationConfig);
-
- final EnvironmentConfig envConfig = createEnvironmentConfig();
- envConfig.setDurability(_durability);
-
- ReplicatedEnvironment replicatedEnvironment = null;
- try
- {
- replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
- }
- catch (final InsufficientLogException ile)
- {
- LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
- NetworkRestore restore = new NetworkRestore();
- NetworkRestoreConfig config = new NetworkRestoreConfig();
- config.setRetainLogFiles(false);
- restore.execute(ile, config);
- replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
- }
-
- return replicatedEnvironment;
- }
-
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
- super.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
-
- final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
-
- replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
- }
-
- @Override
- public synchronized void activate()
- {
- // Before proceeding, perform a log flush with an fsync
- getEnvironment().flushLog(true);
-
- super.activate();
- }
-
- @Override
- public synchronized void passivate()
- {
- if (_stateManager.isNotInState(State.INITIALISED))
- {
- LOGGER.debug("Store becoming passive");
- _stateManager.attainState(State.INITIALISED);
- }
- }
-
- public String getName()
- {
- return _name;
- }
-
- public String getGroupName()
- {
- return _groupName;
- }
-
- public String getNodeName()
- {
- return _nodeName;
- }
-
- public String getNodeHostPort()
- {
- return _nodeHostPort;
- }
-
- public String getHelperHostPort()
- {
- return _helperHostPort;
- }
-
- public String getDurability()
- {
- return _durability.toString();
- }
-
- public boolean isCoalescingSync()
- {
- return _coalescingSync;
- }
-
- public String getNodeState()
- {
- ReplicatedEnvironment.State state = getReplicatedEnvironment().getState();
- return state.toString();
- }
-
- public Boolean isDesignatedPrimary()
- {
- return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary();
- }
-
- public List<Map<String, String>> getGroupMembers()
- {
- List<Map<String, String>> members = new ArrayList<Map<String,String>>();
-
- for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes())
- {
- Map<String, String> nodeMap = new HashMap<String, String>();
- nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName());
- nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
- members.add(nodeMap);
- }
-
- return members;
- }
-
- public void removeNodeFromGroup(String nodeName) throws StoreException
- {
- try
- {
- createReplicationGroupAdmin().removeMember(nodeName);
- }
- catch (OperationFailureException ofe)
- {
- throw new StoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
- }
- }
-
- public void setDesignatedPrimary(boolean isPrimary) throws StoreException
- {
- try
- {
- final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
- synchronized(replicatedEnvironment)
- {
- final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig();
- final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
- replicatedEnvironment.setRepMutableConfig(newConfig);
- }
-
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
- }
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e);
- }
- }
-
- ReplicatedEnvironment getReplicatedEnvironment()
- {
- return (ReplicatedEnvironment)getEnvironment();
- }
-
- public void updateAddress(String nodeName, String newHostName, int newPort) throws StoreException
- {
- try
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
- }
- catch (OperationFailureException ofe)
- {
- throw new StoreException("Failed to update address for '" + nodeName +
- "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
- }
- catch (DatabaseException e)
- {
- throw new StoreException("Failed to update address for '" + nodeName +
- "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e);
- }
- }
-
- @Override
- protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
- {
- // Using commit() instead of commitNoSync() for the HA store to allow
- // the HA durability configuration to influence resulting behaviour.
- try
- {
- tx.commit();
- }
- catch (DatabaseException de)
- {
- LOGGER.error("Got DatabaseException on commit, closing environment", de);
-
- closeEnvironmentSafely();
-
- throw de;
- }
-
- if(_coalescingSync)
- {
- return _commitThreadWrapper.commit(tx, syncCommit);
- }
- else
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
- }
-
- @Override
- protected void closeInternal()
- {
- substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
-
- try
- {
- if(_coalescingSync)
- {
- try
- {
- _commitThreadWrapper.stopCommitThread();
- }
- catch (InterruptedException e)
- {
- throw new StoreException(e);
- }
- }
- }
- finally
- {
- super.closeInternal();
- }
- }
-
- /**
- * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during
- * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change.
- */
- private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment)
- {
- LOGGER.debug("Substituting no-op state change listener for environment close");
- replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener());
- }
-
- private ReplicationGroupAdmin createReplicationGroupAdmin()
- {
- final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
- helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets());
-
- final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig();
- helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
-
- return new ReplicationGroupAdmin(_groupName, helpers);
- }
-
-
- private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
- {
- for (Map.Entry<String, String> configItem : _repConfig.entrySet())
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
- }
- replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
- }
- }
-
- private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
- {
- if (!config.containsKey(key))
- {
- throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
- + key.replace('.', '/'));
- }
- return config.getString(key);
- }
-
- private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
- {
- private final Executor _executor = Executors.newSingleThreadExecutor();
-
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent)
- {
- com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
-
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Received BDB event indicating transition to state " + state);
- }
-
- switch (state)
- {
- case MASTER:
- activateStoreAsync();
- break;
- case REPLICA:
- passivateStoreAsync();
- break;
- case DETACHED:
- LOGGER.error("BDB replicated node in detached state, therefore passivating.");
- passivateStoreAsync();
- break;
- case UNKNOWN:
- LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
- break;
- default:
- LOGGER.error("Unexpected state change: " + state);
- throw new IllegalStateException("Unexpected state change: " + state);
- }
- }
-
- /**
- * Calls {@link MessageStore#activate()}.
- *
- * <p/>
- *
- * This is done a background thread, in line with
- * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
- * activate may execute transactions, which can't complete until
- * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
- */
- private void activateStoreAsync()
- {
- String threadName = "BDBHANodeActivationThread-" + _name;
- executeStateChangeAsync(new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- try
- {
- activate();
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to activate on hearing MASTER change event",e);
- throw e;
- }
- return null;
- }
- }, threadName);
- }
-
- /**
- * Calls {@link #passivate()}.
- *
- * <p/>
- * This is done a background thread, in line with
- * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
- * passivation due to the effect of state change listeners.
- */
- private void passivateStoreAsync()
- {
- String threadName = "BDBHANodePassivationThread-" + _name;
- executeStateChangeAsync(new Callable<Void>()
- {
-
- @Override
- public Void call() throws Exception
- {
- try
- {
- passivate();
- }
- catch (Exception e)
- {
- LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e);
- throw e;
- }
- return null;
- }
- }, threadName);
- }
-
- private void executeStateChangeAsync(final Callable<Void> callable, final String threadName)
- {
-
- _executor.execute(new Runnable()
- {
-
- @Override
- public void run()
- {
- final String originalThreadName = Thread.currentThread().getName();
- Thread.currentThread().setName(threadName);
-
- try
- {
- Subject.doAs(SecurityManager.getSystemTaskSubject("BDB HA State Change"), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
-
- try
- {
- callable.call();
- }
- catch (Exception e)
- {
- LOGGER.error("Exception during state change", e);
- }
- return null;
- }
- });
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
- }
- });
- }
- }
-
- private class NoOpStateChangeListener implements StateChangeListener
- {
- @Override
- public void stateChange(StateChangeEvent stateChangeEvent)
- {
- }
- }
-
- @Override
- public String getStoreType()
- {
- return TYPE;
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index b055f8bd90..3fdc12ba31 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.store.berkeleydb;
*
*/
+import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
@@ -31,15 +32,22 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.OperationalLoggingListener;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
public class BDBHAVirtualHost extends AbstractVirtualHost
{
- private BDBHAMessageStore _messageStore;
+ private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHost.class);
+
+ private BDBMessageStore _messageStore;
private boolean _inVhostInitiatedClose;
@@ -52,11 +60,9 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
}
-
-
protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost)
{
- _messageStore = new BDBHAMessageStore();
+ _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
final MessageStoreLogSubject storeLogSubject =
new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
@@ -84,6 +90,11 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
virtualHost, recoveryHandler,
recoveryHandler
);
+
+ // Make the virtualhost model object a replication group listener
+ ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade();
+ environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+
}
@@ -194,4 +205,70 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
}
}
+ private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+ {
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Received BDB event indicating transition to state " + state
+ + " when current message store state is " + _messageStore._stateManager.getState());
+ }
+
+ switch (state)
+ {
+ case MASTER:
+ activate();
+ break;
+ case REPLICA:
+ passivate();
+ break;
+ case DETACHED:
+ LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+ passivate();
+ break;
+ case UNKNOWN:
+ LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+ break;
+ default:
+ LOGGER.error("Unexpected state change: " + state);
+ throw new IllegalStateException("Unexpected state change: " + state);
+ }
+ }
+
+ private void activate()
+ {
+ try
+ {
+ _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+ _messageStore.activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event", e);
+ }
+ }
+
+ private void passivate()
+ {
+ try
+ {
+ //TODO: move this this into the store method passivate()
+ if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
+ {
+ _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
+ }
+ }
+
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 0fd5060397..16255eb5ed 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,15 +21,73 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StateManager;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.util.FileUtils;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -39,83 +97,1703 @@ import com.sleepycat.je.EnvironmentConfig;
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
-public class BDBMessageStore extends AbstractBDBMessageStore
+public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
- public static final String TYPE = "BDB";
- private CommitThreadWrapper _commitThreadWrapper;
+
+ public static final int VERSION = 8;
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+ private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
+ private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY";
+
+ private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+ private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+ private static String BRIDGEDB_NAME = "BRIDGES";
+ private static String LINKDB_NAME = "LINKS";
+ private static String XID_DB_NAME = "XIDS";
+ private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION";
+ private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME, MESSAGE_META_DATA_DB_NAME,
+ MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME };
+
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private EnvironmentFacade _environmentFacade;
+ private final AtomicLong _messageId = new AtomicLong(0);
+
+ protected final StateManager _stateManager;
+
+ private MessageStoreRecoveryHandler _messageRecoveryHandler;
+
+ private TransactionLogRecoveryHandler _tlogRecoveryHandler;
+
+ private ConfigurationRecoveryHandler _configRecoveryHandler;
+
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
+ private final EventManager _eventManager = new EventManager();
+ private final String _type;
+ private VirtualHost _virtualHost;
+
+ private final EnvironmentFacadeFactory _environmentFacadeFactory;
+
+ private volatile Committer _committer;
+
+ public BDBMessageStore()
+ {
+ this(new StandardEnvironmentFacadeFactory());
+ }
+
+ public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
+ {
+ _type = environmentFacadeFactory.getType();
+ _environmentFacadeFactory = environmentFacadeFactory;
+ _stateManager = new StateManager(_eventManager);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
@Override
- protected void setupStore(File storePath) throws DatabaseException
+ public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
{
- super.setupStore(storePath);
+ _stateManager.attainState(State.INITIALISING);
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + getVirtualHost().getName(), getEnvironment());
- _commitThreadWrapper.startCommitThread();
+ _configRecoveryHandler = recoveryHandler;
+ _virtualHost = virtualHost;
}
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
+ @Override
+ public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException
{
- LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = createEnvironmentConfig();
+ if(_stateManager.isInState(State.INITIAL))
+ {
+ // Is acting as a message store, but not a durable config store
+ _stateManager.attainState(State.INITIALISING);
+ }
+
+ _messageRecoveryHandler = messageRecoveryHandler;
+ _tlogRecoveryHandler = tlogRecoveryHandler;
+ _virtualHost = virtualHost;
+
+ completeInitialisation();
+ }
+ private void completeInitialisation() throws StoreException
+ {
+ configure(_virtualHost, _messageRecoveryHandler != null);
+
+ _stateManager.attainState(State.INITIALISED);
+ }
+
+ private void startActivation() throws StoreException
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
try
{
- return new Environment(environmentPath, envConfig);
+ new Upgrader(_environmentFacade.getEnvironment(), _virtualHost).upgradeIfNecessary();
+ _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES);
+ _totalStoreSize = getSizeOnDisk();
}
- catch (DatabaseException de)
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
+ }
+
+ }
+
+ @Override
+ public synchronized void activate() throws StoreException
+ {
+ // check if acting as a durable config store, but not a message store
+ if(_stateManager.isInState(State.INITIALISING))
+ {
+ completeInitialisation();
+ }
+
+ _stateManager.attainState(State.ACTIVATING);
+ startActivation();
+
+ if(_configRecoveryHandler != null)
{
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
+ recoverConfig(_configRecoveryHandler);
+ }
+ if(_messageRecoveryHandler != null)
+ {
+ recoverMessages(_messageRecoveryHandler);
+ }
+ if(_tlogRecoveryHandler != null)
+ {
+ recoverQueueEntries(_tlogRecoveryHandler);
+ }
+
+ _stateManager.attainState(State.ACTIVE);
+ }
+
+ @Override
+ public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
+ {
+ return new BDBTransaction();
+ }
+
+ private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
+ {
+ Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
+
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore);
+
+ _committer = _environmentFacade.createCommitter(virtualHost.getName());
+ _committer.start();
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ if (_environmentFacade == null)
+ {
+ return null;
+ }
+ return _environmentFacade.getStoreLocation();
+ }
+
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ return _environmentFacade;
+ }
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ @Override
+ public void close() throws StoreException
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ _stateManager.attainState(State.CLOSING);
+ try
{
- //Allow the creation this time
- envConfig.setAllowCreate(true);
- return new Environment(environmentPath, envConfig);
+ try
+ {
+ _committer.stop();
+ }
+ finally
+ {
+ closeEnvironment();
+ }
}
- else
+ catch(DatabaseException e)
{
- throw de;
+ throw new StoreException("Exception occured on message store close", e);
+ }
+ _stateManager.attainState(State.CLOSED);
+ }
+ }
+
+ private void closeEnvironment()
+ {
+ if (_environmentFacade != null)
+ {
+ try
+ {
+ _environmentFacade.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw new StoreException("Exception occured on message store close", e);
+ }
+ }
+ }
+
+ private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
+ {
+ try
+ {
+ final int configVersion = getConfigVersion();
+ recoveryHandler.beginConfigurationRecovery(this, configVersion);
+ loadConfiguredObjects(recoveryHandler);
+
+ final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+
+ }
+
+ @SuppressWarnings("resource")
+ private void updateConfigVersion(int newConfigVersion) throws StoreException
+ {
+ Transaction txn = null;
+ Cursor cursor = null;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ cursor = getConfigVersionDb().openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0,key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ IntegerBinding.intToEntry(newConfigVersion, value);
+ OperationStatus status = cursor.put(key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error setting config version: " + status);
+ }
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ txn = null;
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ abortTransactionIgnoringException("Error setting config version", txn);;
+ }
+
+ }
+
+ private int getConfigVersion() throws StoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = getConfigVersionDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ return IntegerBinding.entryToInt(value);
+ }
+
+ // Insert 0 as the default config version
+ IntegerBinding.intToEntry(0,value);
+ ByteBinding.byteToEntry((byte) 0,key);
+ OperationStatus status = getConfigVersionDb().put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error initialising config version: " + status);
+ }
+ return 0;
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
+ {
+ Cursor objectsCursor = null;
+ Cursor hierarchyCursor = null;
+ try
+ {
+ objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
+ new HashMap<UUID, BDBConfiguredObjectRecord>();
+
+ while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+ BDBConfiguredObjectRecord configuredObject =
+ (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+ configuredObjects.put(configuredObject.getId(), configuredObject);
+ }
+
+ // set parents
+ hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+ while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+ UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+ BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+ if(child != null)
+ {
+ ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+ if(parent != null)
+ {
+ child.addParent(hk.getParentType(), parent);
+ }
+ else if(hk.getParentType().equals("Exchange"))
+ {
+ // TODO - remove this hack for the pre-defined exchanges
+ child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+
+ for (ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ crh.configuredObject(record);
+ }
+ }
+ finally
+ {
+ closeCursorSafely(objectsCursor);
+ closeCursorSafely(hierarchyCursor);
+ }
+ }
+
+ private void closeCursorSafely(Cursor cursor) throws StoreException
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot close cursor", e);
+ }
+ }
+ }
+
+ private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
+ {
+ StoredMessageRecoveryHandler mrh = msrh.begin();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getMessageMetaDataDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+ long maxId = 0;
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+ mrh.message(message);
+
+ maxId = Math.max(maxId, messageId);
+ }
+
+ _messageId.set(maxId);
+ mrh.completeMessageRecovery();
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
+ throws StoreException
+ {
+ QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
+
+ ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey qek = keyBinding.entryToObject(key);
+
+ entries.add(qek);
+ }
+
+ try
+ {
+ cursor.close();
+ }
+ finally
+ {
+ cursor = null;
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ qerh.queueEntry(queueId, messageId);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
+
+ cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+ preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
+ }
+
+ void removeMessage(long messageId, boolean sync) throws StoreException
+ {
+ boolean complete = false;
+ com.sleepycat.je.Transaction tx = null;
+
+ Random rand = null;
+ int attempts = 0;
+ try
+ {
+ do
+ {
+ tx = null;
+ try
+ {
+ tx = _environmentFacade.getEnvironment().beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removing message id " + messageId);
+ }
+
+
+ OperationStatus status = getMessageMetaDataDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ LOGGER.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleted metadata for message " + messageId);
+ }
+
+ //now remove the content data from the store if there is any.
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ getMessageContentDb().delete(tx, contentKeyEntry);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleted content for message " + messageId);
+ }
+
+ _environmentFacade.commit(tx);
+ _committer.commit(tx, sync);
+
+ complete = true;
+ tx = null;
+ }
+ catch (LockConflictException e)
+ {
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ LOGGER.warn("Unable to abort transaction after LockConflictExcption on removal of message with id " + messageId, e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw _environmentFacade.handleDatabaseException("Cannot remove message with id " + messageId, e);
+ }
+
+
+ LOGGER.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
+ {
+ if(rand == null)
+ {
+ rand = new Random();
+ }
+
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
+
+ }
+ }
+ else
+ {
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw _environmentFacade.handleDatabaseException("Cannot remove messages", e);
+ }
+ }
+ }
+ while(!complete);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Unexpected BDB exception", e);
+
+ try
+ {
+ abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
+ }
+ finally
+ {
+ tx = null;
+ }
+
+ throw _environmentFacade.handleDatabaseException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ try
+ {
+ abortTransactionIgnoringException("Error aborting transaction on removal of message with id " + messageId, tx);
+ }
+ finally
+ {
+ tx = null;
+ }
+ }
+ }
+
+ private void abortTransactionIgnoringException(String errorMessage, com.sleepycat.je.Transaction tx)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch (DatabaseException e1)
+ {
+ // We need the possible side effect of the handler restarting the environment but don't care about the exception
+ _environmentFacade.handleDatabaseException(null, e1);
+ LOGGER.warn(errorMessage, e1);
+ }
+ }
+
+ @Override
+ public void create(ConfiguredObjectRecord configuredObject) throws StoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ com.sleepycat.je.Transaction txn = null;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ storeConfiguredObjectEntry(txn, configuredObject);
+ txn.commit();
+ txn = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject
+ + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error creating configured object", txn);
+ }
}
}
}
@Override
- protected void closeInternal()
+ public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException
{
+ com.sleepycat.je.Transaction txn = null;
try
{
- _commitThreadWrapper.stopCommitThread();
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ for(ConfiguredObjectRecord record : objects)
+ {
+ if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS)
+ {
+ removed.add(record.getId());
+ }
+ }
+
+ txn.commit();
+ txn = null;
+ return removed.toArray(new UUID[removed.size()]);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e);
}
- catch (InterruptedException e)
+ finally
{
- throw new StoreException(e);
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error deleting configured objects", txn);
+ }
}
- super.closeInternal();
}
@Override
- protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
+ {
+ com.sleepycat.je.Transaction txn = null;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ for(ConfiguredObjectRecord record : records)
+ {
+ update(createIfNecessary, record, txn);
+ }
+ txn.commit();
+ txn = null;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e);
+ }
+ finally
+ {
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error updating configuration details within the store", txn);
+ }
+ }
+
+ }
+
+ private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
+ keyBinding.objectToEntry(record.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ DatabaseEntry newValue = new DatabaseEntry();
+ ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
+
+ OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT);
+ final boolean isNewRecord = status == OperationStatus.NOTFOUND;
+ if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord))
+ {
+ // write the updated entry to the store
+ configuredObjectBinding.objectToEntry(record, newValue);
+ status = getConfiguredObjectsDb().put(txn, key, newValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error updating configuration details within the store: " + status);
+ }
+ if(isNewRecord)
+ {
+ writeHierarchyRecords(txn, record);
+ }
+ }
+ else if (status != OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Error finding configuration details within the store: " + status);
+ }
+ }
+
+ /**
+ * Places a message onto a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The the queue to place the message on.
+ * @param messageId The message to enqueue.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
{
+
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
+ keyBinding.objectToEntry(dd, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0, value);
+
try
{
- tx.commitNoSync();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
+ + " in transaction " + tx);
+ }
+ getDeliveryDb().put(tx, key, value);
}
- catch(DatabaseException de)
+ catch (DatabaseException e)
{
- LOGGER.error("Got DatabaseException on commit, closing environment", de);
+ LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
+ throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
+ + " to database", e);
+ }
+ }
- closeEnvironmentSafely();
+ /**
+ * Extracts a message from a specified queue, in a given transaction.
+ *
+ * @param tx The transaction for the operation.
+ * @param queue The queue to take the message from.
+ * @param messageId The message to dequeue.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws StoreException
+ {
- throw de;
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
+ UUID id = queue.getId();
+ keyBinding.objectToEntry(queueEntryKey, key);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Dequeue message id " + messageId + " from queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
}
- return _commitThreadWrapper.commit(tx, syncCommit);
+ try
+ {
+
+ OperationStatus status = getDeliveryDb().delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find message with id " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove message with id " + messageId + " on queue"
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removed message " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
+ + " from delivery db");
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+
+ LOGGER.error("Failed to dequeue message " + messageId + " in transaction " + tx , e);
+
+ throw _environmentFacade.handleDatabaseException("Error accessing database while dequeuing message: " + e.getMessage(), e);
+ }
+ }
+
+
+ private void recordXid(com.sleepycat.je.Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
+ org.apache.qpid.server.store.Transaction.Record[] enqueues,
+ org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+ keyBinding.objectToEntry(xid,key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ valueBinding.objectToEntry(preparedTransaction, value);
+
+ try
+ {
+ getXidDb().put(txn, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.error("Failed to write xid: " + e.getMessage(), e);
+ throw _environmentFacade.handleDatabaseException("Error writing xid to database", e);
+ }
+ }
+
+ private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId)
+ throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ Xid xid = new Xid(format, globalId, branchId);
+ XidBinding keyBinding = XidBinding.getInstance();
+
+ keyBinding.objectToEntry(xid, key);
+
+
+ try
+ {
+
+ OperationStatus status = getXidDb().delete(txn, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new StoreException("Unable to find xid");
+ }
+ else if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Unable to remove xid");
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+
+ LOGGER.error("Failed to remove xid in transaction " + txn, e);
+
+ throw _environmentFacade.handleDatabaseException("Error accessing database while removing xid: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Commits all operations performed within a given transaction.
+ *
+ * @param tx The transaction to commit all operations for.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws StoreException
+ {
+ if (tx == null)
+ {
+ throw new StoreException("Fatal internal error: transactional is null at commitTran");
+ }
+
+ _environmentFacade.commit(tx);
+ StoreFuture result = _committer.commit(tx, syncCommit);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
+ }
+
+ return result;
+ }
+
+ /**
+ * Abandons all operations performed within a given transaction.
+ *
+ * @param tx The transaction to abandon.
+ *
+ * @throws StoreException If the operation fails for any reason.
+ */
+ private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("abortTran called for transaction " + tx);
+ }
+
+ try
+ {
+ tx.abort();
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error aborting transaction: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Primarily for testing purposes.
+ *
+ * @param queueId
+ *
+ * @return a list of message ids for messages enqueued for a particular queue
+ */
+ List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+
+ DatabaseEntry key = new DatabaseEntry();
+
+ QueueEntryKey dd = new QueueEntryKey(queueId, 0);
+
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ keyBinding.objectToEntry(dd, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+
+ LinkedList<Long> messageIds = new LinkedList<Long>();
+
+ OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
+ dd = keyBinding.entryToObject(key);
+
+ while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
+ {
+
+ messageIds.add(dd.getMessageId());
+ status = cursor.getNext(key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ dd = keyBinding.entryToObject(key);
+ }
+ }
+
+ return messageIds;
+ }
+ catch (DatabaseException e)
+ {
+ throw new StoreException("Database error: " + e.getMessage(), e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
+ /**
+ * Return a valid, currently unused message id.
+ *
+ * @return A fresh message id.
+ */
+ private long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+ /**
+ * Stores a chunk of message data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param offset The offset of the data chunk in the message.
+ * @param contentBody The content of the data chunk.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
+ ByteBuffer contentBody) throws StoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding messageBinding = ContentBinding.getInstance();
+ messageBinding.objectToEntry(contentBody.array(), value);
+ try
+ {
+ OperationStatus status = getMessageContentDb().put(tx, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error adding content for message id " + messageId + ": " + status);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stores message meta-data.
+ *
+ * @param tx The transaction for the operation.
+ * @param messageId The message to store the data for.
+ * @param messageMetaData The message meta data to store.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
+ throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+ messageBinding.objectToEntry(messageMetaData, value);
+ try
+ {
+ getMessageMetaDataDb().put(tx, key, value);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Retrieves message meta-data.
+ *
+ * @param messageId The message to get the meta-data for.
+ *
+ * @return The message meta data.
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ + messageId + "): called");
+ }
+
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
+
+ try
+ {
+ OperationStatus status = getMessageMetaDataDb().get(null, key, value, LockMode.READ_UNCOMMITTED);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Metadata not found for message with id " + messageId);
+ }
+
+ StorableMessageMetaData mdd = messageBinding.entryToObject(value);
+
+ return mdd;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+ * from the specified offset in the message.
+ *
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
+ *
+ * @return The number of bytes inserted into the destination
+ *
+ * @throws StoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
+
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
+ }
+
+ try
+ {
+
+ int written = 0;
+ OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
+ {
+ byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.length;
+ if (offset > size)
+ {
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
+ }
+
+ written = size - offset;
+ if(written > dst.remaining())
+ {
+ written = dst.remaining();
+ }
+
+ dst.put(dataAsBytes, offset, written);
+ }
+ return written;
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ {
+ if(metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage(getNewMessageId(), metaData);
+ }
+ }
+
+ private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Storing configured object: " + configuredObject);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(configuredObject.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
+
+ queueBinding.objectToEntry(configuredObject, value);
+ try
+ {
+ OperationStatus status = getConfiguredObjectsDb().put(txn, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error writing configured object " + configuredObject + " to database: "
+ + status);
+ }
+ writeHierarchyRecords(txn, configuredObject);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Error writing configured object " + configuredObject
+ + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject)
+ {
+ OperationStatus status;
+ HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance();
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ DatabaseEntry hierarchyValue = new DatabaseEntry();
+
+ for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet())
+ {
+
+ hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey);
+ UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue);
+ status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: "
+ + status);
+ }
+ }
+ }
+
+ private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException
+ {
+ UUID id = record.getId();
+ Map<String, ConfiguredObjectRecord> parents = record.getParents();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Removing configured object: " + id);
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(id, key);
+ OperationStatus status = getConfiguredObjectsDb().delete(tx, key);
+ if(status == OperationStatus.SUCCESS)
+ {
+ for(String parentType : parents.keySet())
+ {
+ DatabaseEntry hierarchyKey = new DatabaseEntry();
+ HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance();
+ keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey);
+ getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey);
+ }
+ }
+ return status;
+ }
+
+ private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
+ {
+
+ private final long _messageId;
+ private final boolean _isRecovered;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, false);
+ }
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
+ {
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+
+ if(!_isRecovered)
+ {
+ _metaData = metaData;
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
+ }
+
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+ }
+ else
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ int length = getContent(offsetInMessage, buf);
+ buf.limit(length);
+ buf.position(0);
+ return buf;
+ }
+ }
+
+ synchronized void store(com.sleepycat.je.Transaction txn)
+ {
+ if (!stored())
+ {
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ BDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ public synchronized StoreFuture flushToStore()
+ {
+ if(!stored())
+ {
+ com.sleepycat.je.Transaction txn;
+ try
+ {
+ txn = _environmentFacade.getEnvironment().beginTransaction(
+ null, null);
+ }
+ catch (DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
+ }
+ store(txn);
+ _environmentFacade.commit(txn);
+ _committer.commit(txn, true);
+
+ storedSizeChangeOccured(getMetaData().getContentSize());
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ BDBMessageStore.this.removeMessage(_messageId, false);
+ storedSizeChangeOccured(-delta);
+ }
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
+ }
+
+ private class BDBTransaction implements org.apache.qpid.server.store.Transaction
+ {
+ private com.sleepycat.je.Transaction _txn;
+ private int _storeSizeIncrease;
+
+ private BDBTransaction() throws StoreException
+ {
+ try
+ {
+ _txn = _environmentFacade.getEnvironment().beginTransaction(null, null);
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot create store transaction", e);
+ }
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+
+ BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ {
+ BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void commitTran() throws StoreException
+ {
+ BDBMessageStore.this.commitTranImpl(_txn, true);
+ BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
+ }
+
+ public StoreFuture commitTranAsync() throws StoreException
+ {
+ BDBMessageStore.this.storedSizeChangeOccured(_storeSizeIncrease);
+ return BDBMessageStore.this.commitTranImpl(_txn, false);
+ }
+
+ public void abortTran() throws StoreException
+ {
+ BDBMessageStore.this.abortTran(_txn);
+ }
+
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
+ {
+ BDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ }
+
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
+ Record[] dequeues) throws StoreException
+ {
+ BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
+ }
+ }
+
+ private void storedSizeChangeOccured(final int delta) throws StoreException
+ {
+ try
+ {
+ storedSizeChange(delta);
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Stored size change exception", e);
+ }
+ }
+
+ private void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
+
+ _totalStoreSize = getSizeOnDisk();
+
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk()
+ {
+ _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (_environmentFacade.getEnvironment().cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ _environmentFacade.getEnvironment().checkpoint(force);
+ }
+
+
+ _environmentFacade.getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return _environmentFacade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+
+ @Override
+ public void onDelete()
+ {
+ String storeLocation = getStoreLocation();
+
+ if (storeLocation != null)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + storeLocation);
+ }
+
+ File location = new File(storeLocation);
+ if (location.exists())
+ {
+ if (!FileUtils.delete(location, true))
+ {
+ LOGGER.error("Cannot delete " + storeLocation);
+ }
+ }
+ }
}
@Override
public String getStoreType()
{
- return TYPE;
+ return _type;
+ }
+
+ private Database getConfiguredObjectsDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
+ }
+
+ private Database getConfiguredObjectHierarchyDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME);
+ }
+
+ private Database getMessageContentDb()
+ {
+ return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME);
+ }
+
+ private Database getConfigVersionDb()
+ {
+ return _environmentFacade.getOpenDatabase(CONFIG_VERSION_DB_NAME);
+ }
+
+ private Database getMessageMetaDataDb()
+ {
+ return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME);
+ }
+
+ private Database getDeliveryDb()
+ {
+ return _environmentFacade.getOpenDatabase(DELIVERY_DB_NAME);
+ }
+
+ private Database getXidDb()
+ {
+ return _environmentFacade.getOpenDatabase(XID_DB_NAME);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index d7c8b23d39..4abe81c56c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
@@ -37,7 +38,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public String getType()
{
- return BDBMessageStore.TYPE;
+ return StandardEnvironmentFacade.TYPE;
}
@Override
@@ -71,7 +72,7 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
if(initialSize != 0)
{
- return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes);
+ return Collections.singletonMap(BDBMessageStore.ENVIRONMENT_CONFIGURATION, (Object)attributes);
}
else
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
new file mode 100644
index 0000000000..a137e38baf
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -0,0 +1,313 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class CoalescingCommiter implements Committer
+{
+ private final CommitThread _commitThread;
+
+ public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
+ {
+ _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
+ }
+
+ @Override
+ public void start()
+ {
+ _commitThread.start();
+ }
+
+ @Override
+ public void stop()
+ {
+ _commitThread.close();
+ try
+ {
+ _commitThread.join();
+ }
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Commit thread has not shutdown", ie);
+ }
+ }
+
+ @Override
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
+ {
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+ return commitFuture;
+ }
+
+ private static final class BDBCommitFuture implements StoreFuture
+ {
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+
+ private final CommitThread _commitThread;
+ private final Transaction _tx;
+ private final boolean _syncCommit;
+ private RuntimeException _databaseException;
+ private boolean _complete;
+
+ public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ {
+ _commitThread = commitThread;
+ _tx = tx;
+ _syncCommit = syncCommit;
+ }
+
+ public synchronized void complete()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("complete() called for transaction " + _tx);
+ }
+ _complete = true;
+
+ notifyAll();
+ }
+
+ public synchronized void abort(RuntimeException databaseException)
+ {
+ _complete = true;
+ _databaseException = databaseException;
+
+ notifyAll();
+ }
+
+ public void commit() throws DatabaseException
+ {
+ _commitThread.addJob(this, _syncCommit);
+
+ if(!_syncCommit)
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ }
+ return;
+ }
+
+ waitForCompletion();
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
+
+ }
+
+ public synchronized boolean isComplete()
+ {
+ return _complete;
+ }
+
+ public synchronized void waitForCompletion()
+ {
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
+ while (!isComplete())
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
+ }
+
+ /**
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
+ * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
+ * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
+ */
+ private static class CommitThread extends Thread
+ {
+ private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
+
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Object _lock = new Object();
+ private final EnvironmentFacade _environmentFacade;
+
+ public CommitThread(String name, EnvironmentFacade environmentFacade)
+ {
+ super(name);
+ _environmentFacade = environmentFacade;
+ }
+
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
+ public void run()
+ {
+ while (!_stopped.get())
+ {
+ synchronized (_lock)
+ {
+ while (!_stopped.get() && !hasJobs())
+ {
+ try
+ {
+ // Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ processJobs();
+ }
+ }
+
+ private void processJobs()
+ {
+ int size = _jobQueue.size();
+
+ try
+ {
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
+ Environment environment = _environmentFacade.getEnvironment();
+ if (environment != null && environment.isValid())
+ {
+ environment.flushLog(true);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("flushLog completed in " + duration + " ms");
+ }
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.complete();
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ try
+ {
+ LOGGER.error("Exception during environment log flush", e);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.abort(e);
+ }
+ }
+ finally
+ {
+ LOGGER.error("Closing store environment", e);
+
+ try
+ {
+ _environmentFacade.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ }
+ }
+ }
+
+ private boolean hasJobs()
+ {
+ return !_jobQueue.isEmpty();
+ }
+
+ public void addJob(BDBCommitFuture commit, final boolean sync)
+ {
+ if (_stopped.get())
+ {
+ throw new IllegalStateException("Commit thread is stopped");
+ }
+ _jobQueue.add(commit);
+ if(sync)
+ {
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
+ }
+
+ public void close()
+ {
+ RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
+ synchronized (_lock)
+ {
+ _stopped.set(true);
+ BDBCommitFuture commit = null;
+ while ((commit = _jobQueue.poll()) != null)
+ {
+ commit.abort(e);
+ }
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 6a42b2691f..36ee2ad306 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,27 +20,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.server.store.StoreFuture;
-import static org.mockito.Mockito.mock;
+import com.sleepycat.je.Transaction;
-public class HAMessageStoreSmokeTest extends QpidTestCase
+public interface Committer
{
- private final BDBHAMessageStore _store = new BDBHAMessageStore();
+ void start();
+
+ StoreFuture commit(Transaction tx, boolean syncCommit);
+
+ void stop();
- public void testMissingHAConfigThrowsException() throws Exception
+ Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
{
- try
+
+ @Override
+ public void start()
{
- _store.setVirtualHost(mock(VirtualHost.class));
- _store.configure();
- fail("Expected an exception to be thrown");
}
- catch (ServerScopedRuntimeException ce)
+
+ @Override
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
{
- assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+ return StoreFuture.IMMEDIATE_FUTURE;
}
- }
-}
+
+ @Override
+ public void stop()
+ {
+ }
+ };
+
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
new file mode 100644
index 0000000000..144ab83238
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+
+public interface EnvironmentFacade
+{
+ @SuppressWarnings("serial")
+ final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+ // Turn off stats generation - feature introduced (and on by default) from BDB JE 5.0.84
+ put(EnvironmentConfig.STATS_COLLECT, "false");
+ }});
+
+ Environment getEnvironment();
+
+ Committer createCommitter(String name);
+
+ void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
+
+ Database getOpenDatabase(String name);
+
+ void commit(com.sleepycat.je.Transaction tx);
+
+ DatabaseException handleDatabaseException(String contextMessage, DatabaseException e);
+
+ String getStoreLocation();
+
+ void close();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index 59483751ca..b784e436b9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,13 +18,15 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.model.VirtualHost;
-public interface HAMessageStore extends MessageStore
+public interface EnvironmentFacadeFactory
{
- /**
- * Used to indicate that a store requires to make itself unavailable for read and read/write
- * operations.
- */
- void passivate();
+
+ EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+
+ String getType();
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
new file mode 100644
index 0000000000..b13766a136
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/LoggingAsyncExceptionListener.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
+
+public class LoggingAsyncExceptionListener implements ExceptionListener
+{
+ private static final Logger LOGGER = Logger.getLogger(LoggingAsyncExceptionListener.class);
+
+ @Override
+ public void exceptionThrown(ExceptionEvent event)
+ {
+ LOGGER.error("Asynchronous exception thrown by BDB thread '" + event.getThreadName() + "'", event.getException());
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
new file mode 100644
index 0000000000..8117ca1a9a
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -0,0 +1,228 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+
+public class StandardEnvironmentFacade implements EnvironmentFacade
+{
+ private static final Logger LOGGER = Logger.getLogger(StandardEnvironmentFacade.class);
+ public static final String TYPE = "BDB";
+
+ private final String _storePath;
+ private final Map<String, Database> _databases = new HashMap<String, Database>();
+
+ private Environment _environment;
+
+ public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
+ {
+ _storePath = storePath;
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Creating environment at environment path " + _storePath);
+ }
+
+ File environmentPath = new File(storePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+
+ for (Map.Entry<String, String> configItem : attributes.entrySet())
+ {
+ LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
+ _environment = new Environment(environmentPath, envConfig);
+ }
+
+ @Override
+ public void commit(com.sleepycat.je.Transaction tx)
+ {
+ try
+ {
+ tx.commitNoSync();
+ }
+ catch (DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing environment", de);
+
+ closeEnvironmentSafely();
+
+ throw handleDatabaseException("Got DatabaseException on commit", de);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ closeDatabases();
+ closeEnvironment();
+ }
+
+ private void closeDatabases()
+ {
+ RuntimeException firstThrownException = null;
+ for (Database database : _databases.values())
+ {
+ try
+ {
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
+ private void closeEnvironmentSafely()
+ {
+ if (_environment != null)
+ {
+ if (_environment.isValid())
+ {
+ try
+ {
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Exception closing environment databases", e);
+ }
+ }
+ try
+ {
+ _environment.close();
+ }
+ catch (DatabaseException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ catch (IllegalStateException ex)
+ {
+ LOGGER.error("Exception closing store environment", ex);
+ }
+ finally
+ {
+ _environment = null;
+ }
+ }
+ }
+
+ @Override
+ public Environment getEnvironment()
+ {
+ return _environment;
+ }
+
+ private void closeEnvironment()
+ {
+ if (_environment != null)
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may
+ // not get a chance to finish.
+ try
+ {
+ _environment.cleanLog();
+ }
+ finally
+ {
+ _environment.close();
+ _environment = null;
+ }
+ }
+ }
+
+ @Override
+ public DatabaseException handleDatabaseException(String contextMessage, DatabaseException e)
+ {
+ if (_environment != null && !_environment.isValid())
+ {
+ closeEnvironmentSafely();
+ }
+ return e;
+ }
+
+ @Override
+ public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+ {
+ for (String databaseName : databaseNames)
+ {
+ Database database = _environment.openDatabase(null, databaseName, dbConfig);
+ _databases .put(databaseName, database);
+ }
+ }
+
+ @Override
+ public Database getOpenDatabase(String name)
+ {
+ Database database = _databases.get(name);
+ if (database == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ }
+ return database;
+ }
+
+ @Override
+ public Committer createCommitter(String name)
+ {
+ return new CoalescingCommiter(name, this);
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _storePath;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
new file mode 100644
index 0000000000..384ceba98a
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.VirtualHost;
+
+public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+ {
+ Map<String, String> envConfigMap = new HashMap<String, String>();
+ envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+
+ Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ if (environmentConfigurationAttributes instanceof Map)
+ {
+ envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
+ }
+
+ String name = virtualHost.getName();
+ final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
+
+ String storeLocation;
+ if(isMessageStore)
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+ else // we are acting only as the durable config store
+ {
+ storeLocation = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+ }
+
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+ }
+
+ @Override
+ public String getType()
+ {
+ return StandardEnvironmentFacade.TYPE;
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
new file mode 100644
index 0000000000..38fdf34196
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DatabasePinger.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+public class DatabasePinger
+{
+ public static final String PING_DATABASE_NAME = "PINGDB";
+ private static final int ID = 0;
+
+ public void pingDb(EnvironmentFacade facade)
+ {
+ try
+ {
+ final Database db = facade.getOpenDatabase(PING_DATABASE_NAME);
+
+ DatabaseEntry key = new DatabaseEntry();
+ IntegerBinding.intToEntry(ID, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ Transaction txn = null;
+ try
+ {
+ txn = facade.getEnvironment().beginTransaction(null, null);
+ db.put(txn, key, value);
+ txn.commit();
+ txn = null;
+ }
+ finally
+ {
+ try
+ {
+ if (txn != null)
+ {
+ txn.abort();
+ }
+ }
+ finally
+ {
+ db.close();
+ }
+ }
+ }
+ catch (DatabaseException de)
+ {
+ facade.handleDatabaseException("DatabaseException from DatabasePinger ", de);
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
new file mode 100644
index 0000000000..76a48c189e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+public interface ReplicatedEnvironmentConfiguration
+{
+ String getName();
+ String getGroupName();
+ String getHostPort();
+ String getHelperHostPort();
+ String getDurability();
+ boolean isCoalescingSync();
+ boolean isDesignatedPrimary();
+ int getPriority();
+ int getQuorumOverride();
+ String getStorePath();
+ Map<String, String> getParameters();
+ Map<String, String> getReplicationParameters();
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
new file mode 100644
index 0000000000..3e15e9bdcc
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -0,0 +1,1083 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
+import org.apache.qpid.server.store.berkeleydb.Committer;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
+import org.apache.qpid.server.util.DaemonThreadFactory;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.RepInternal;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.RestartRequiredException;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
+import com.sleepycat.je.rep.vlsn.VLSNRange;
+import com.sleepycat.je.utilint.PropUtil;
+import com.sleepycat.je.utilint.VLSN;
+
+public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
+{
+ public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
+ public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+
+ private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
+ private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
+
+ private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+ private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
+
+ @SuppressWarnings("serial")
+ private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ /**
+ * Parameter decreased as the 24h default may lead very large log files for most users.
+ */
+ put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+ /**
+ * Parameter increased as the 5 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+ /**
+ * Parameter increased as the 10 s default may lead to spurious timeouts.
+ */
+ put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+ /**
+ * Parameter decreased as the 10 h default may cause user confusion.
+ */
+ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+ /**
+ * Parameter changed from default (off) to allow the Environment to start in the
+ * UNKNOWN state when the majority is not available.
+ */
+ put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
+ /**
+ * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False
+ * is scheduled to become default after JE 5.1.
+ */
+ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString());
+ /**
+ * Parameter decreased as a default 5min interval may lead to bigger data losses on Node
+ * with NO_SYN durability in case if such Node crushes.
+ */
+ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+ }});
+
+ public static final String TYPE = "BDB-HA";
+
+ // TODO: JMX will change to observe the model, at that point these names will disappear
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+ private final ReplicatedEnvironmentConfiguration _configuration;
+ private final Durability _durability;
+ private final Boolean _coalescingSync;
+ private final String _prettyGroupNodeName;
+ private final File _environmentDirectory;
+
+ private final ExecutorService _environmentJobExecutor;
+ private final ScheduledExecutorService _groupChangeExecutor;
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
+ private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
+ private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
+
+ private volatile ReplicatedEnvironment _environment;
+ private volatile long _joinTime;
+ private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+ {
+ _environmentDirectory = new File(configuration.getStorePath());
+ if (!_environmentDirectory.exists())
+ {
+ if (!_environmentDirectory.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+
+ _configuration = configuration;
+
+ _durability = Durability.parse(_configuration.getDurability());
+ _coalescingSync = _configuration.isCoalescingSync();
+ _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
+
+ // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
+ _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
+ _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+ _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), 100, TimeUnit.MILLISECONDS); // TODO make configurable
+
+ // create environment in a separate thread to avoid renaming of the current thread by JE
+ _environment = createEnvironment(true);
+ }
+
+ @Override
+ public void commit(final Transaction tx)
+ {
+ try
+ {
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
+ tx.commit();
+ }
+ catch (DatabaseException de)
+ {
+ throw handleDatabaseException("Got DatabaseException on commit, closing environment", de);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (_state.compareAndSet(State.OPENING, State.CLOSING) ||
+ _state.compareAndSet(State.OPEN, State.CLOSING) ||
+ _state.compareAndSet(State.RESTARTING, State.CLOSING) )
+ {
+ try
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get());
+ }
+
+ shutdownAndAwaitExecutorService(_environmentJobExecutor);
+ shutdownAndAwaitExecutorService(_groupChangeExecutor);
+
+ closeDatabases();
+ closeEnvironment();
+ }
+ finally
+ {
+ _state.compareAndSet(State.CLOSING, State.CLOSED);
+ }
+ }
+ }
+
+ private void shutdownAndAwaitExecutorService(ExecutorService executorService)
+ {
+ executorService.shutdown();
+ try
+ {
+ boolean wasShutdown = executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ if (!wasShutdown)
+ {
+ LOGGER.warn("Executor service " + executorService + " did not shutdown within allowed time period, ignoring");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Shutdown of executor service " + executorService + " was interrupted");
+ }
+ }
+
+ @Override
+ public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe)
+ {
+ boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
+ if (restart)
+ {
+ tryToRestartEnvironment(dbe);
+ }
+ return dbe;
+ }
+
+ private void tryToRestartEnvironment(final DatabaseException dbe)
+ {
+ if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+ {
+ if (dbe != null && LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
+ }
+
+ _environmentJobExecutor.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ restartEnvironment();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Exception on environment restart", e);
+ }
+ }
+ });
+
+ }
+ else
+ {
+ LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+ }
+ }
+
+ @Override
+ public void openDatabases(DatabaseConfig dbConfig, String... databaseNames)
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not in opened state");
+ }
+
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+
+ if (_environment.getState() != ReplicatedEnvironment.State.MASTER)
+ {
+ throw new IllegalStateException("Databases can only be opened on Master node");
+ }
+
+ for (String databaseName : databaseNames)
+ {
+ _databases.put(databaseName, new DatabaseHolder(dbConfig));
+ }
+ for (String databaseName : databaseNames)
+ {
+ DatabaseHolder holder = _databases.get(databaseName);
+ openDatabaseInternally(databaseName, holder);
+ }
+ }
+
+ private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
+ {
+ if (_state.get() == State.OPEN)
+ {
+ Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
+ holder.setDatabase(database);
+ }
+ }
+
+ @Override
+ public Database getOpenDatabase(String name)
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not in opened state");
+ }
+
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+ DatabaseHolder databaseHolder = _databases.get(name);
+ if (databaseHolder == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened");
+ }
+ Database database = databaseHolder.getDatabase();
+ if (database == null)
+ {
+ throw new IllegalArgumentException("Database with name '" + name + "' has not been opened");
+ }
+ return database;
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _environmentDirectory.getAbsolutePath();
+ }
+
+ @Override
+ public void stateChange(final StateChangeEvent stateChangeEvent)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + stateChangeEvent.getState());
+ }
+
+ if (_state.get() != State.CLOSING && _state.get() != State.CLOSED)
+ {
+ _groupChangeExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ stateChanged(stateChangeEvent);
+ }
+ });
+ }
+ else
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Ignoring the state environment change event as the environment facade for node '" + _prettyGroupNodeName
+ + "' is in state " + _state.get());
+ }
+ }
+ }
+
+ private void stateChanged(StateChangeEvent stateChangeEvent)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Received BDB event, new BDB state " + stateChangeEvent.getState() + " Facade state : " + _state.get());
+ }
+ ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ if ( _state.get() != State.CLOSED && _state.get() != State.CLOSING)
+ {
+ if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
+ {
+ if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+ {
+ LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
+ _joinTime = System.currentTimeMillis();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ reopenDatabases();
+ }
+ }
+
+ StateChangeListener listener = _stateChangeListener.get();
+ if (listener != null && (_state.get() == State.OPEN || _state.get() == State.RESTARTING))
+ {
+ listener.stateChange(stateChangeEvent);
+ }
+
+ if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
+ {
+ tryToRestartEnvironment(null);
+ }
+ }
+ _lastKnownEnvironmentState = state;
+ }
+
+ private void reopenDatabases()
+ {
+ if (_state.get() == State.OPEN)
+ {
+ DatabaseConfig pingDbConfig = new DatabaseConfig();
+ pingDbConfig.setTransactional(true);
+ pingDbConfig.setAllowCreate(true);
+
+ _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
+
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ openDatabaseInternally(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public String getGroupName()
+ {
+ return (String)_configuration.getGroupName();
+ }
+
+ public String getNodeName()
+ {
+ return _configuration.getName();
+ }
+
+ public String getHostPort()
+ {
+ return (String)_configuration.getHostPort();
+ }
+
+ public String getHelperHostPort()
+ {
+ return (String)_configuration.getHelperHostPort();
+ }
+
+ public String getDurability()
+ {
+ return _durability.toString();
+ }
+
+ public boolean isCoalescingSync()
+ {
+ return _coalescingSync;
+ }
+
+ public String getNodeState()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ return ReplicatedEnvironment.State.UNKNOWN.name();
+ }
+ ReplicatedEnvironment.State state = _environment.getState();
+ return state.toString();
+ }
+
+ public boolean isDesignatedPrimary()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ return _environment.getRepMutableConfig().getDesignatedPrimary();
+ }
+
+ public Future<Void> setDesignatedPrimary(final boolean isPrimary)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setDesignatedPrimaryInternal(isPrimary);
+ return null;
+ }
+ });
+ }
+
+ void setDesignatedPrimaryInternal(final boolean isPrimary)
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
+ }
+ }
+
+ int getPriority()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ return repConfig.getNodePriority();
+ }
+
+ public Future<Void> setPriority(final int priority)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setPriorityInternal(priority);
+ return null;
+ }
+ });
+ }
+
+ void setPriorityInternal(int priority)
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
+ }
+ }
+
+ int getElectableGroupSizeOverride()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
+ return repConfig.getElectableGroupSizeOverride();
+ }
+
+ public Future<Void> setElectableGroupSizeOverride(final int electableGroupOverride)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setElectableGroupSizeOverrideInternal(electableGroupOverride);
+ return null;
+ }
+ });
+ }
+
+ void setElectableGroupSizeOverrideInternal(int electableGroupOverride)
+ {
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
+ }
+ }
+
+
+ public long getJoinTime()
+ {
+ return _joinTime ;
+ }
+
+ public long getLastKnownReplicationTransactionId()
+ {
+ if (_state.get() == State.OPEN)
+ {
+ VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+ VLSN lastTxnEnd = range.getLastTxnEnd();
+ return lastTxnEnd.getSequence();
+ }
+ else
+ {
+ return -1L;
+ }
+ }
+
+ public List<Map<String, String>> getGroupMembers()
+ {
+ List<Map<String, String>> members = new ArrayList<Map<String, String>>();
+
+ for (ReplicationNode node : _environment.getGroup().getNodes())
+ {
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
+ nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+ members.add(nodeMap);
+ }
+
+ return members;
+ }
+
+ public void removeNodeFromGroup(final String nodeName)
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+
+ public void updateAddress(final String nodeName, final String newHostName, final int newPort)
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
+
+ private ReplicationGroupAdmin createReplicationGroupAdmin()
+ {
+ final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ helpers.addAll(_environment.getRepConfig().getHelperSockets());
+
+ final ReplicationConfig repConfig = _environment.getRepConfig();
+ helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+ return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
+ }
+
+
+ public ReplicatedEnvironment getEnvironment()
+ {
+ return _environment;
+ }
+
+ public State getFacadeState()
+ {
+ return _state.get();
+ }
+
+ public void setStateChangeListener(StateChangeListener stateChangeListener)
+ {
+ if (_stateChangeListener.compareAndSet(null, stateChangeListener))
+ {
+ _environment.setStateChangeListener(this);
+ }
+ else
+ {
+ throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName);
+ }
+ }
+
+ private void closeEnvironment()
+ {
+ // Clean the log before closing. This makes sure it doesn't contain
+ // redundant data. Closing without doing this means the cleaner may not
+ // get a chance to finish.
+ try
+ {
+ if (_environment.isValid())
+ {
+ _environment.cleanLog();
+ }
+ }
+ finally
+ {
+ _environment.close();
+ _environment = null;
+ }
+ }
+
+ private void restartEnvironment()
+ {
+ LOGGER.info("Restarting environment");
+
+ closeEnvironmentOnRestart();
+
+ _environment = createEnvironment(false);
+
+ if (_stateChangeListener.get() != null)
+ {
+ _environment.setStateChangeListener(this);
+ }
+
+ LOGGER.info("Environment is restarted");
+ }
+
+ private void closeEnvironmentOnRestart()
+ {
+ Environment environment = _environment;
+ if (environment != null)
+ {
+ try
+ {
+ if (environment.isValid())
+ {
+ try
+ {
+ closeDatabases();
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing databases", e);
+ }
+ }
+ environment.close();
+ }
+ catch (EnvironmentFailureException efe)
+ {
+ LOGGER.warn("Ignoring an exception whilst closing environment", efe);
+ }
+ }
+ }
+
+ private void closeDatabases()
+ {
+ RuntimeException firstThrownException = null;
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ DatabaseHolder databaseHolder = entry.getValue();
+ Database database = databaseHolder.getDatabase();
+ if (database != null)
+ {
+ try
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName);
+ }
+
+ database.close();
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e);
+ if (firstThrownException == null)
+ {
+ firstThrownException = e;
+ }
+ }
+ finally
+ {
+ databaseHolder.setDatabase(null);
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread)
+ {
+ String groupName = _configuration.getGroupName();
+ String helperHostPort = _configuration.getHelperHostPort();
+ String hostPort = _configuration.getHostPort();
+ Map<String, String> environmentParameters = _configuration.getParameters();
+ Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters();
+ boolean designatedPrimary = _configuration.isDesignatedPrimary();
+ int priority = _configuration.getPriority();
+ int quorumOverride = _configuration.getQuorumOverride();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Creating environment");
+ LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
+ LOGGER.info("Group name " + groupName);
+ LOGGER.info("Node name " + _configuration.getName());
+ LOGGER.info("Node host port " + hostPort);
+ LOGGER.info("Helper host port " + helperHostPort);
+ LOGGER.info("Durability " + _durability);
+ LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+ LOGGER.info("Node priority " + priority);
+ LOGGER.info("Quorum override " + quorumOverride);
+ }
+
+ Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
+ if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
+ {
+ replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
+ }
+ Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
+ if (environmentParameters != null && !environmentParameters.isEmpty())
+ {
+ environmentSettings.putAll(environmentParameters);
+ }
+
+ ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort);
+ replicationConfig.setHelperHosts(helperHostPort);
+ replicationConfig.setDesignatedPrimary(designatedPrimary);
+ replicationConfig.setNodePriority(priority);
+ replicationConfig.setElectableGroupSizeOverride(quorumOverride);
+
+ for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+ envConfig.setDurability(_durability);
+
+ for (Map.Entry<String, String> configItem : environmentSettings.entrySet())
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ }
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+
+ if (createEnvironmentInSeparateThread)
+ {
+ return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig);
+ }
+ else
+ {
+ return createEnvironment(_environmentDirectory, envConfig, replicationConfig);
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
+ final ReplicationConfig replicationConfig)
+ {
+ Future<ReplicatedEnvironment> environmentFuture = _environmentJobExecutor.submit(new Callable<ReplicatedEnvironment>(){
+ @Override
+ public ReplicatedEnvironment call() throws Exception
+ {
+ String originalThreadName = Thread.currentThread().getName();
+ try
+ {
+ return createEnvironment(environmentPathFile, envConfig, replicationConfig);
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }});
+
+ long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ try
+ {
+ return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Environment creation was interrupted", e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException("Unexpected exception on environment creation", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException("JE environment has not been created in due time");
+ }
+ }
+
+ private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig,
+ final ReplicationConfig replicationConfig)
+ {
+ ReplicatedEnvironment environment = null;
+ try
+ {
+ environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ }
+ catch (final InsufficientLogException ile)
+ {
+ LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ NetworkRestore restore = new NetworkRestore();
+ NetworkRestoreConfig config = new NetworkRestoreConfig();
+ config.setRetainLogFiles(false);
+ restore.execute(ile, config);
+ environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ }
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
+ }
+ return environment;
+ }
+
+ @Override
+ public Committer createCommitter(String name)
+ {
+ if (_coalescingSync)
+ {
+ return new CoalescingCommiter(name, this);
+ }
+ else
+ {
+ return Committer.IMMEDIATE_FUTURE_COMMITTER;
+ }
+ }
+
+ public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+ {
+ if (repNode == null)
+ {
+ throw new IllegalArgumentException("Node cannot be null");
+ }
+ return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState();
+ }
+
+ // For testing only
+ int getNumberOfElectableGroupMembers()
+ {
+ if (_state.get() != State.OPEN)
+ {
+ throw new IllegalStateException("Environment facade is not opened");
+ }
+ return _environment.getGroup().getElectableNodes().size();
+ }
+
+ private class RemoteNodeStateLearner implements Callable<Void>
+ {
+ private Map<String, ReplicatedEnvironment.State> _previousGroupState = Collections.emptyMap();
+ @Override
+ public Void call()
+ {
+ final Map<String, ReplicatedEnvironment.State> currentGroupState = new HashMap<String, ReplicatedEnvironment.State>();
+ try
+ {
+ Set<Future<Void>> futures = new HashSet<Future<Void>>();
+
+ for (final ReplicationNode node : _environment.getGroup().getElectableNodes())
+ {
+ Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ DbPing ping = new DbPing(node, _configuration.getGroupName(), REMOTE_NODE_MONITOR_INTERVAL);
+ ReplicatedEnvironment.State nodeState;
+ try
+ {
+ nodeState = ping.getNodeState().getNodeState();
+ }
+ catch (IOException e)
+ {
+ nodeState = ReplicatedEnvironment.State.UNKNOWN;
+ }
+ catch (ServiceConnectFailedException e)
+ {
+ nodeState = ReplicatedEnvironment.State.UNKNOWN;
+ }
+
+ currentGroupState.put(node.getName(), nodeState);
+ return null;
+ }
+ });
+ futures.add(future);
+ }
+
+ for (Future<Void> future : futures)
+ {
+ try
+ {
+ future.get(REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
+ future.cancel(true);
+ }
+ }
+
+ if (ReplicatedEnvironment.State.MASTER == _environment.getState())
+ {
+ boolean stateChanged = !_previousGroupState.equals(currentGroupState);
+ _previousGroupState = currentGroupState;
+ if (stateChanged && State.OPEN == _state.get())
+ {
+ new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+ finally
+ {
+ _groupChangeExecutor.schedule(this, REMOTE_NODE_MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+ }
+ public static enum State
+ {
+ OPENING,
+ OPEN,
+ RESTARTING,
+ CLOSING,
+ CLOSED
+ }
+
+ private static class DatabaseHolder
+ {
+ private final DatabaseConfig _config;
+ private Database _database;
+
+ public DatabaseHolder(DatabaseConfig config)
+ {
+ _config = config;
+ }
+
+ public Database getDatabase()
+ {
+ return _database;
+ }
+
+ public void setDatabase(Database database)
+ {
+ _database = database;
+ }
+
+ public DatabaseConfig getConfig()
+ {
+ return _config;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]";
+ }
+
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
new file mode 100644
index 0000000000..cd53afe891
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+
+public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
+{
+
+ private static final int DEFAULT_NODE_PRIORITY = 1;
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
+ ReplicaAckPolicy.SIMPLE_MAJORITY);
+ private static final boolean DEFAULT_COALESCING_SYNC = true;
+
+
+
+ @Override
+ public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+ {
+ ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
+ {
+ @Override
+ public boolean isDesignatedPrimary()
+ {
+ return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+ }
+
+ @Override
+ public boolean isCoalescingSync()
+ {
+ return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+ }
+
+ @Override
+ public String getStorePath()
+ {
+ return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ }
+
+ @Override
+ public Map<String, String> getParameters()
+ {
+ return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+ }
+
+ @Override
+ public Map<String, String> getReplicationParameters()
+ {
+ return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+ }
+
+ @Override
+ public int getQuorumOverride()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return DEFAULT_NODE_PRIORITY;
+ }
+
+
+
+ @Override
+ public String getName()
+ {
+ return (String)virtualHost.getAttribute("haNodeName");
+ }
+
+ @Override
+ public String getHostPort()
+ {
+ return (String)virtualHost.getAttribute("haNodeAddress");
+ }
+
+ @Override
+ public String getHelperHostPort()
+ {
+ return (String)virtualHost.getAttribute("haHelperAddress");
+ }
+
+ @Override
+ public String getGroupName()
+ {
+ return (String)virtualHost.getAttribute("haGroupName");
+ }
+
+ @Override
+ public String getDurability()
+ {
+ return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+ }
+ };
+ return new ReplicatedEnvironmentFacade(configuration);
+
+ }
+
+ @Override
+ public String getType()
+ {
+ return ReplicatedEnvironmentFacade.TYPE;
+ }
+
+ private boolean convertBoolean(final Object value, boolean defaultValue)
+ {
+ if(value instanceof Boolean)
+ {
+ return (Boolean) value;
+ }
+ else if(value instanceof String)
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else if(value == null)
+ {
+ return defaultValue;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Boolean");
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index e960518719..0c77bb565c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -21,12 +21,14 @@
package org.apache.qpid.server.store.berkeleydb.upgrade;
import com.sleepycat.je.Cursor;
+
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -39,6 +41,8 @@ import com.sleepycat.je.OperationStatus;
public class Upgrader
{
+ private static final Logger LOGGER = Logger.getLogger(Upgrader.class);
+
static final String VERSION_DB_NAME = "DB_VERSION";
private Environment _environment;
@@ -64,7 +68,8 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
+
+ int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -74,11 +79,17 @@ public class Upgrader
}
int version = getSourceVersion(versionDb);
- if(version > AbstractBDBMessageStore.VERSION)
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Source message store version is " + version);
+ }
+
+ if(version > BDBMessageStore.VERSION)
{
throw new StoreException("Database version " + version
+ " is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION);
+ + BDBMessageStore.VERSION);
}
performUpgradeFromVersion(version, versionDb);
}
@@ -125,8 +136,9 @@ public class Upgrader
}
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
+ throws StoreException
{
- while(sourceVersion != AbstractBDBMessageStore.VERSION)
+ while(sourceVersion != BDBMessageStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();
@@ -137,7 +149,7 @@ public class Upgrader
}
}
- void upgrade(final int fromVersion, final int toVersion)
+ void upgrade(final int fromVersion, final int toVersion) throws StoreException
{
try
{
@@ -178,7 +190,7 @@ public class Upgrader
private int identifyOldStoreVersion() throws DatabaseException
{
- int version = 0;
+ int version = BDBMessageStore.VERSION;
for (String databaseName : _environment.getDatabaseNames())
{
if (databaseName.contains("_v"))
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
deleted file mode 100644
index c2b3aeab3e..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.net.InetAddress;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class BDBHAMessageStoreTest extends QpidTestCase
-{
- private static final String TEST_LOG_FILE_MAX = "1000000";
- private static final String TEST_ELECTION_RETRIES = "1000";
- private static final String TEST_NUMBER_OF_THREADS = "10";
- private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999";
- private String _groupName;
- private String _workDir;
- private int _masterPort;
- private String _host;
- private XMLConfiguration _configXml;
- private VirtualHost _virtualHost;
- private org.apache.qpid.server.model.VirtualHost _modelVhost;
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- _workDir = TMP_FOLDER + File.separator + getName();
- _host = InetAddress.getByName("localhost").getHostAddress();
- _groupName = "group" + getName();
- _masterPort = -1;
-
- FileUtils.delete(new File(_workDir), true);
- _configXml = new XMLConfiguration();
- _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class);
-
-
- BrokerTestHelper.setUp();
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- if (_virtualHost != null)
- {
- _virtualHost.close();
- }
- FileUtils.delete(new File(_workDir), true);
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public void testSetSystemConfiguration() throws Exception
- {
- // create virtual host configuration, registry and host instance
- addVirtualHostConfiguration();
- String vhostName = "test" + _masterPort;
- VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
-
- _virtualHost = BrokerTestHelper.createVirtualHost(configuration,new VirtualHostRegistry(new EventLogger()),_modelVhost);
- BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
-
- // test whether JVM system settings were applied
- Environment env = store.getEnvironment();
- assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
- assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
-
- ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
- assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
- assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
- repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
- }
-
- private void addVirtualHostConfiguration() throws Exception
- {
- int port = findFreePort();
- if (_masterPort == -1)
- {
- _masterPort = port;
- }
- String nodeName = getNodeNameForNodeAt(port);
-
- String vhostName = "test" + port;
- String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
-
- _configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
- _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE);
-
- when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator
- + port);
- when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName);
- when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName);
- when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port));
- when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort());
-
- Map<String,String> bdbEnvConfig = new HashMap<String,String>();
- bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS);
- bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX);
-
- when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig);
-
- Map<String,String> repConfig = new HashMap<String,String>();
- repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES);
- repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT);
- when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig);
-
- }
-
- private String getNodeNameForNodeAt(final int bdbPort)
- {
- return "node" + getName() + bdbPort;
- }
-
- private String getNodeHostPortForNodeAt(final int bdbPort)
- {
- return _host + ":" + bdbPort;
- }
-
- private String getHelperHostPort()
- {
- if (_masterPort == -1)
- {
- throw new IllegalStateException("Helper port not yet assigned.");
- }
- return _host + ":" + _masterPort;
- }
-}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
index 730001d849..385681446a 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java
@@ -22,20 +22,14 @@ package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
public class MessageStoreCreatorTest extends QpidTestCase
{
- private static final String[] STORE_TYPES = {BDBMessageStore.TYPE};
-
public void testMessageStoreCreator()
{
MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
- for (String type : STORE_TYPES)
- {
- MessageStore store = messageStoreCreator.createMessageStore(type);
- assertNotNull("Store of type " + type + " is not created", store);
- }
- }
-}
+ String type = new BDBMessageStoreFactory().getType();
+ MessageStore store = messageStoreCreator.createMessageStore(type);
+ assertNotNull("Store of type " + type + " is not created", store);
+ }}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..b19e18b204
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Environment;
+
+public class StandardEnvironmentFacadeTest extends QpidTestCase
+{
+ protected File _storePath;
+ protected EnvironmentFacade _environmentFacade;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _storePath = new File(TMP_FOLDER + File.separator + "bdb" + File.separator + getTestName());
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ if (_environmentFacade != null)
+ {
+ _environmentFacade.close();
+ }
+ }
+ finally
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ }
+
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = getEnvironmentFacade();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has not been opened", e.getMessage());
+ }
+ }
+
+ EnvironmentFacade getEnvironmentFacade() throws Exception
+ {
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = createEnvironmentFacade();
+ }
+ return _environmentFacade;
+ }
+
+ EnvironmentFacade createEnvironmentFacade()
+ {
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
new file mode 100644
index 0000000000..a05a30b459
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class VirtualHostTest extends QpidTestCase
+{
+
+ private Broker _broker;
+ private StatisticsGatherer _statisticsGatherer;
+ private RecovererProvider _recovererProvider;
+ private File _configFile;
+ private File _bdbStorePath;
+ private VirtualHost _host;
+ private ConfigurationEntryStore _store;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _store = mock(ConfigurationEntryStore.class);
+ _broker = BrokerTestHelper.createBrokerMock();
+ TaskExecutor taslExecutor = mock(TaskExecutor.class);
+ when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+
+
+ _statisticsGatherer = mock(StatisticsGatherer.class);
+
+ _bdbStorePath = new File(TMP_FOLDER, getTestName() + "." + System.currentTimeMillis());
+ _bdbStorePath.deleteOnExit();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (_host != null)
+ {
+ _host.setDesiredState(_host.getState(), State.STOPPED);
+ }
+ }
+ finally
+ {
+ if (_configFile != null)
+ {
+ _configFile.delete();
+ }
+ if (_bdbStorePath != null)
+ {
+ FileUtils.delete(_bdbStorePath, true);
+ }
+ super.tearDown();
+ }
+ }
+
+
+ public void testCreateBdbVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+ long logFileMax = 2000000;
+ _host = createHostFromConfiguration(hostName, logFileMax);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", StandardVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", new BDBMessageStoreFactory().getType(), _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ EnvironmentConfig envConfig = messageStore.getEnvironmentFacade().getEnvironment().getConfig();
+ assertEquals("Unexpected JE log file max", String.valueOf(logFileMax), envConfig.getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ }
+
+ public void testCreateBdbHaVirtualHostFromConfigurationFile()
+ {
+ String hostName = getName();
+
+ String repStreamTimeout = "2 h";
+ String nodeName = "node";
+ String groupName = "group";
+ String nodeHostPort = "localhost:" + findFreePort();
+ String helperHostPort = nodeHostPort;
+ String durability = "NO_SYNC,SYNC,NONE";
+ _host = createHaHostFromConfiguration(hostName, groupName, nodeName, nodeHostPort, helperHostPort, durability, repStreamTimeout);
+ _host.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
+ assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
+ assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
+ ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
+ ReplicationConfig repConfig = environment.getRepConfig();
+ assertEquals("Unexpected JE replication groupName", groupName, repConfig.getConfigParam(ReplicationConfig.GROUP_NAME));
+ assertEquals("Unexpected JE replication nodeName", nodeName, repConfig.getConfigParam(ReplicationConfig.NODE_NAME));
+ assertEquals("Unexpected JE replication nodeHostPort", nodeHostPort, repConfig.getConfigParam(ReplicationConfig.NODE_HOST_PORT));
+ assertEquals("Unexpected JE replication nodeHostPort", helperHostPort, repConfig.getConfigParam(ReplicationConfig.HELPER_HOSTS));
+ assertEquals("Unexpected JE replication nodeHostPort", "false", repConfig.getConfigParam(ReplicationConfig.DESIGNATED_PRIMARY));
+ assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, repConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes, Set<UUID> children)
+ {
+ ConfigurationEntry entry = new ConfigurationEntry(UUID.randomUUID(), VirtualHost.class.getSimpleName(), attributes,
+ children, _store);
+
+ return new VirtualHostRecoverer(_statisticsGatherer).create(_recovererProvider, entry, _broker);
+ }
+
+ private VirtualHost createHost(Map<String, Object> attributes)
+ {
+ return createHost(attributes, Collections.<UUID> emptySet());
+ }
+
+ private VirtualHost createHostFromConfiguration(String hostName, long logFileMax)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<envConfig><name>" + EnvironmentConfig.LOG_FILE_MAX + "</name><value>" + logFileMax + "</value></envConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+
+ private VirtualHost createHaHostFromConfiguration(String hostName, String groupName, String nodeName, String nodeHostPort, String helperHostPort, String durability, String repStreamTimeout)
+ {
+ String content = "<virtualhosts><virtualhost><name>" + hostName + "</name><" + hostName + ">"
+ + "<type>" + BDBHAVirtualHostFactory.TYPE + "</type>"
+ + "<store><class>" + BDBMessageStore.class.getName() + "</class>"
+ + "<environment-path>" + _bdbStorePath.getAbsolutePath() + "</environment-path>"
+ + "<highAvailability>"
+ + "<groupName>" + groupName + "</groupName>"
+ + "<nodeName>" + nodeName + "</nodeName>"
+ + "<nodeHostPort>" + nodeHostPort + "</nodeHostPort>"
+ + "<helperHostPort>" + helperHostPort + "</helperHostPort>"
+ + "<durability>" + durability.replaceAll(",", "\\\\,") + "</durability>"
+ + "</highAvailability>"
+ + "<repConfig><name>" + ReplicationConfig.REP_STREAM_TIMEOUT + "</name><value>" + repStreamTimeout + "</value></repConfig>"
+ + "</store>"
+ + "</" + hostName + "></virtualhost></virtualhosts>";
+ Map<String, Object> attributes = writeConfigAndGenerateAttributes(content);
+ return createHost(attributes);
+ }
+
+ private Map<String, Object> writeConfigAndGenerateAttributes(String content)
+ {
+ _configFile = TestFileUtils.createTempFile(this, ".virtualhost.xml", content);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.CONFIG_PATH, _configFile.getAbsolutePath());
+ return attributes;
+ }
+}
+
+ \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..cd7dd69c46
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
+{
+
+ private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
+ private static final int LISTENER_TIMEOUT = 5;
+ private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "localhost:" + TEST_NODE_PORT;
+ private static final String TEST_NODE_HELPER_HOST_PORT = TEST_NODE_HOST_PORT;
+ private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
+ private static final boolean TEST_DESIGNATED_PRIMARY = false;
+ private static final boolean TEST_COALESCING_SYNC = true;
+ private static final int TEST_PRIORITY = 1;
+ private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
+
+ private File _storePath;
+ private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
+ private VirtualHost _virtualHost = mock(VirtualHost.class);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ TaskExecutor taskExecutor = mock(TaskExecutor.class);
+ when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
+
+ _storePath = TestFileUtils.createTestDirectory("bdb", true);
+
+ setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ for (EnvironmentFacade ef : _nodes.values())
+ {
+ ef.close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ if (_storePath != null)
+ {
+ FileUtils.delete(_storePath, true);
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+ }
+ public void testEnvironmentFacade() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ assertNotNull("Environment should not be null", ef);
+ Environment e = ef.getEnvironment();
+ assertTrue("Environment is not valid", e.isValid());
+ }
+
+ public void testClose() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ ef.close();
+ Environment e = ef.getEnvironment();
+
+ assertNull("Environment should be null after facade close", e);
+ }
+
+ public void testOpenDatabases() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1", "test2");
+ Database test1 = ef.getOpenDatabase("test1");
+ Database test2 = ef.getOpenDatabase("test2");
+
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ assertEquals("Unexpected name for open database test2", "test2" , test2.getDatabaseName());
+ }
+
+ public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
+ {
+ EnvironmentFacade ef = createMaster();
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ ef.openDatabases(dbConfig, "test1");
+ Database test1 = ef.getOpenDatabase("test1");
+ assertEquals("Unexpected name for open database test1", "test1" , test1.getDatabaseName());
+ try
+ {
+ ef.getOpenDatabase("test2");
+ fail("An exception should be thrown for the non existing database");
+ }
+ catch(IllegalArgumentException e)
+ {
+ assertEquals("Unexpected exception message", "Database with name 'test2' has never been requested to be opened", e.getMessage());
+ }
+ }
+
+ public void testGetGroupName() throws Exception
+ {
+ assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
+ }
+
+ public void testGetNodeName() throws Exception
+ {
+ assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
+ }
+
+ public void testLastKnownReplicationTransactionId() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
+ assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
+ }
+
+ public void testGetNodeHostPort() throws Exception
+ {
+ assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
+ }
+
+ public void testGetHelperHostPort() throws Exception
+ {
+ assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
+ }
+
+ public void testGetDurability() throws Exception
+ {
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
+ }
+
+ public void testIsCoalescingSync() throws Exception
+ {
+ assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
+ }
+
+ public void testGetNodeState() throws Exception
+ {
+ assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
+ }
+
+
+ public void testPriority() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
+ Future<Void> future = facade.setPriority(TEST_PRIORITY + 1);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
+ }
+
+ public void testDesignatedPrimary() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ }
+
+ public void testElectableGroupSizeOverride() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
+ Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
+ future.get(5, TimeUnit.SECONDS);
+ assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
+ }
+
+ public void testEnvironmentAutomaticallyRestartsAndBecomesUnknownOnInsufficientReplicas() throws Exception
+ {
+ final CountDownLatch masterLatch = new CountDownLatch(1);
+ final AtomicInteger masterStateChangeCount = new AtomicInteger();
+ final CountDownLatch unknownLatch = new CountDownLatch(1);
+ final AtomicInteger unknownStateChangeCount = new AtomicInteger();
+ StateChangeListener stateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ if (stateChangeEvent.getState() == State.MASTER)
+ {
+ masterStateChangeCount.incrementAndGet();
+ masterLatch.countDown();
+ }
+ else if (stateChangeEvent.getState() == State.UNKNOWN)
+ {
+ unknownStateChangeCount.incrementAndGet();
+ unknownLatch.countDown();
+ }
+ }
+ };
+
+ addNode(State.MASTER, stateChangeListener);
+ assertTrue("Master was not started", masterLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+
+ ReplicatedEnvironmentFacade replica1 = createReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ ReplicatedEnvironmentFacade replica2 = createReplica(TEST_NODE_NAME + "_2", node2NodeHostPort);
+
+ // close replicas
+ replica1.close();
+ replica2.close();
+
+ assertTrue("Environment should be recreated and go into unknown state",
+ unknownLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals("Node made master an unexpected number of times", 1, masterStateChangeCount.get());
+ assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
+ }
+
+ public void testCloseStateTransitions() throws Exception
+ {
+ ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
+
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.OPEN, replicatedEnvironmentFacade.getFacadeState());
+ replicatedEnvironmentFacade.close();
+ assertEquals("Unexpected state " + replicatedEnvironmentFacade.getFacadeState(), ReplicatedEnvironmentFacade.State.CLOSED, replicatedEnvironmentFacade.getFacadeState());
+ }
+
+ private ReplicatedEnvironmentFacade createMaster() throws Exception
+ {
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ ReplicatedEnvironmentFacade env = addNode(State.MASTER, stateChangeListener);
+ assertTrue("Environment was not created", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+ return env;
+ }
+
+ private ReplicatedEnvironmentFacade createReplica(String nodeName, String nodeHostPort) throws Exception
+ {
+ TestStateChangeListener testStateChangeListener = new TestStateChangeListener(State.REPLICA);
+ ReplicatedEnvironmentFacade replicaEnvironmentFacade = addNode(nodeName, nodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener);
+ boolean awaitForStateChange = testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS);
+ assertTrue("Replica " + nodeName + " did not go into desired state; current actual state is " + testStateChangeListener.getCurrentActualState(), awaitForStateChange);
+ return replicaEnvironmentFacade;
+ }
+
+ private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
+ State desiredState, StateChangeListener stateChangeListener)
+ {
+ ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
+ ref.setStateChangeListener(stateChangeListener);
+ _nodes.put(nodeName, ref);
+ return ref;
+ }
+
+ private ReplicatedEnvironmentFacade addNode(State desiredState, StateChangeListener stateChangeListener)
+ {
+ return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener);
+ }
+
+ private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
+ {
+ ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
+ when(node.getName()).thenReturn(nodeName);
+ when(node.getHostPort()).thenReturn(nodeHostPort);
+ when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
+ when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
+ when(node.getPriority()).thenReturn(TEST_PRIORITY);
+ when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+ when(node.getDurability()).thenReturn(TEST_DURABILITY);
+ when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
+
+ Map<String, String> repConfig = new HashMap<String, String>();
+ repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
+ repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+ when(node.getReplicationParameters()).thenReturn(repConfig);
+ when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
+ return node;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
new file mode 100644
index 0000000000..0870191b35
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TestStateChangeListener.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+class TestStateChangeListener implements StateChangeListener
+{
+ private final Set<State> _expectedStates;
+ private final CountDownLatch _latch;
+ private final AtomicReference<State> _currentActualState = new AtomicReference<State>();
+
+ public TestStateChangeListener(State expectedState)
+ {
+ this(Collections.singleton(expectedState));
+ }
+
+ public TestStateChangeListener(Set<State> expectedStates)
+ {
+ _expectedStates = new HashSet<State>(expectedStates);
+ _latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ _currentActualState.set(stateChangeEvent.getState());
+ if (_expectedStates.contains(stateChangeEvent.getState()))
+ {
+ _latch.countDown();
+ }
+ }
+
+ public boolean awaitForStateChange(long timeout, TimeUnit timeUnit) throws InterruptedException
+ {
+ return _latch.await(timeout, timeUnit);
+ }
+
+ public State getCurrentActualState()
+ {
+ return _currentActualState.get();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
index c0de884a28..c407be50c6 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
@@ -26,7 +26,7 @@ import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
-import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
@@ -94,7 +94,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
catch(ServerScopedRuntimeException ex)
{
assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
- + AbstractBDBMessageStore.VERSION, ex.getMessage());
+ + BDBMessageStore.VERSION, ex.getMessage());
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 29e9ae19d7..f9627dc6c4 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,13 +20,16 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -42,6 +45,14 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreTest;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -51,8 +62,6 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
-import static org.mockito.Mockito.mock;
-
/**
* Subclass of MessageStoreTest which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -70,7 +79,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore store = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = assertBDBStore(store);
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -123,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
/*
* reload the store only (read-only)
*/
- AbstractBDBMessageStore readOnlyStore = reloadStore(bdbStore);
+ BDBMessageStore readOnlyStore = reloadStore(bdbStore);
/*
* Read back and validate the 0-8 message metadata and content
@@ -222,15 +231,16 @@ public class BDBMessageStoreTest extends MessageStoreTest
* Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
- private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception
+ private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
{
messageStore.close();
- AbstractBDBMessageStore newStore = new BDBMessageStore();
- newStore.setVirtualHost(getVirtualHostModel());
- newStore.configure(true);
+ BDBMessageStore newStore = new BDBMessageStore();
+ MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
+ when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
+ newStore.configureMessageStore(getVirtualHostModel(), recoveryHandler, null);
- newStore.startWithNoRecover();
+ newStore.activate();
return newStore;
}
@@ -285,7 +295,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testGetContentWithOffset() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -345,7 +355,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testMessageCreationAndRemoval() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ BDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -370,12 +380,12 @@ public class BDBMessageStoreTest extends MessageStoreTest
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private AbstractBDBMessageStore assertBDBStore(MessageStore store)
+ private BDBMessageStore assertBDBStore(MessageStore store)
{
assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
- return (AbstractBDBMessageStore) store;
+ return (BDBMessageStore) store;
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
@@ -408,7 +418,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -458,7 +468,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -504,7 +514,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
public void testOnDelete() throws Exception
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
String storeLocation = bdbStore.getStoreLocation();
File location = new File(storeLocation);
@@ -527,7 +537,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
MessageStore log = getVirtualHost().getMessageStore();
- AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ BDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index 4b50121a7a..e8d18971ad 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -143,7 +144,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
CompositeData row = groupMembers.get(new Object[] {nodeName});
assertNotNull("Table does not contain row for node name " + nodeName, row);
- assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(nodeHostPort, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
index 95626f7fa5..c3679f3d4a 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -185,6 +185,12 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
storeBean.setDesignatedPrimary(true);
+
+ long limit = System.currentTimeMillis() + 5000;
+ while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
+ {
+ Thread.sleep(100);
+ }
assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
final Connection connection = getConnection(_brokerFailoverUrl);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
index e4efc26477..63612da455 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
@@ -24,8 +24,6 @@ package org.apache.qpid.server.store;
import java.util.EnumMap;
import java.util.Map;
-import org.apache.qpid.server.store.StateManager.Transition;
-
public class StateManager
{
private State _state = State.INITIAL;
@@ -78,7 +76,8 @@ public class StateManager
public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
- public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);;
+ public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);
+ public static final Transition CLOSE_ACTIVATING = new Transition(State.ACTIVATING, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java
new file mode 100644
index 0000000000..4f1f830fd0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/DaemonThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.ThreadFactory;
+
+public final class DaemonThreadFactory implements ThreadFactory
+{
+ private String _threadName;
+ public DaemonThreadFactory(String threadName)
+ {
+ _threadName = threadName;
+ }
+
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ Thread thread = new Thread(r, _threadName);
+ thread.setDaemon(true);
+ return thread;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 7a4f92f0ca..7b29a48d60 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -71,7 +72,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
_store = createStore();
((DurableConfigurationStore)_store).configureConfigStore(vhost, null);
- _store.configureMessageStore(vhost, mock(MessageStoreRecoveryHandler.class), null);
+ MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
+ when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
+ _store.configureMessageStore(vhost, recoveryHandler, null);
+ _store.activate();
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
index 3ee98f9a21..16d18de713 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
@@ -141,7 +141,7 @@ public class StateManagerTest extends TestCase implements EventListener
performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED);
performInvalidTransitions(StateManager.INITIALISE_COMPLETE, State.ACTIVATING, State.CLOSING);
- performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE);
+ performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE, State.CLOSING);
performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED);
performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING);