diff options
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server')
6 files changed, 127 insertions, 163 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 9323111fdd..6e64ea5597 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -54,13 +54,10 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; @@ -73,7 +70,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; @@ -423,8 +419,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); - recoverBrokerLinks(lrh); + brh.completeBindingRecovery(); } catch (DatabaseException e) { @@ -466,66 +461,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) - { - Cursor cursor = null; - - try - { - cursor = _linkDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - long createTime = LongBinding.entryToLong(value); - Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); - - ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); - - recoverBridges(brh, id); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) - { - Cursor cursor = null; - - try - { - cursor = _bridgeDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - if(parentId.equals(linkId)) - { - - long createTime = LongBinding.entryToLong(value); - Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); - brh.bridge(id,createTime,arguments); - } - } - brh.completeBridgeRecoveryForLink(); - } - finally - { - closeCursorSafely(cursor); - } - - } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException { @@ -940,89 +875,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); - - DatabaseEntry value = new DatabaseEntry(); - LongBinding.longToEntry(link.getCreateTime(), value); - StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); - - try - { - _linkDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Link " + link - + " to database: " + e.getMessage(), e); - } - } - } - - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); - try - { - OperationStatus status = _linkDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Link " + link + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e); - } - } - - public void createBridge(final Bridge bridge) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); - - DatabaseEntry value = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value); - LongBinding.longToEntry(bridge.getCreateTime(),value); - StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value); - - try - { - _bridgeDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Bridge " + bridge - + " to database: " + e.getMessage(), e); - } - - } - } - - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); - try - { - OperationStatus status = _bridgeDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Bridge " + bridge + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e); - } - } /** * Places a message onto a specified queue, in a given transaction. @@ -1050,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { LOGGER.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " [Transaction" + tx + "]"); + + " in transaction " + tx); } _deliveryDb.put(tx, key, value); } @@ -1204,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]"); + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); } } catch (DatabaseException e) @@ -1226,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("abortTran called for [Transaction:" + tx + "]"); + LOGGER.debug("abortTran called for transaction " + tx); } try @@ -1338,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); } } @@ -1363,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = " - + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called"); + LOGGER.debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); } DatabaseEntry key = new DatabaseEntry(); @@ -1378,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageMetaDataDb.put(tx, key, value); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); } } catch (DatabaseException e) @@ -1680,7 +1534,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore else { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); + buf.limit(length); buf.position(0); return buf; } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index c40f24dbc3..ba111e8091 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -105,7 +105,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); }}); - public static final String BDB_HA_STORE_TYPE = "BDB-HA"; + public static final String TYPE = "BDB-HA"; private String _groupName; private String _nodeName; @@ -602,6 +602,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess @Override public String getStoreType() { - return BDB_HA_STORE_TYPE; + return TYPE; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java new file mode 100644 index 0000000000..20dce2628d --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreFactory; + +public class BDBHAMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return BDBHAMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new BDBHAMessageStore(); + } + +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 82bc3d8564..4028de4b80 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -42,7 +42,7 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - private static final String BDB_STORE_TYPE = "BDB"; + public static final String TYPE = "BDB"; private CommitThreadWrapper _commitThreadWrapper; @Override @@ -108,7 +108,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore @Override public String getStoreType() { - return BDB_STORE_TYPE; + return TYPE; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java new file mode 100644 index 0000000000..126bf1928a --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreFactory; + +public class BDBMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return BDBMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new BDBMessageStore(); + } + +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index fe1556b5a6..598d20146c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -80,7 +80,7 @@ public class CommitThreadWrapper { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + LOGGER.debug("complete() called for transaction " + _tx); } _complete = true; @@ -101,7 +101,10 @@ public class CommitThreadWrapper if(!_syncCommit) { - LOGGER.debug("CommitAsync was requested, returning immediately."); + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + } return; } @@ -121,6 +124,12 @@ public class CommitThreadWrapper public synchronized void waitForCompletion() { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + while (!isComplete()) { _commitThread.explicitNotify(); @@ -133,6 +142,12 @@ public class CommitThreadWrapper throw new RuntimeException(e); } } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } } } @@ -198,8 +213,20 @@ public class CommitThreadWrapper try { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + _environment.flushLog(true); + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("flushLog completed in " + duration + " ms"); + } + for(int i = 0; i < size; i++) { BDBCommitFuture commit = _jobQueue.poll(); |