diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-07 22:47:17 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-07 22:47:17 +0000 |
commit | 0129e12deaabcf3cf3be23913967397be6a12e3a (patch) | |
tree | 1ff521e7be49675201bf66f96e4956dc20bac0a8 /java/bdbstore/src | |
parent | ad776f381e2690c58c37c33d23b2389da1b2028e (diff) | |
download | qpid-python-0129e12deaabcf3cf3be23913967397be6a12e3a.tar.gz |
QPID-946 , QPID-2379 : QMF and Federation fixes (now works again with qpid-config, qpid-route, qpid-tool) and store (durable) routes in the DB
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1228748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/bdbstore/src')
3 files changed, 295 insertions, 2 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 1d8187401d..9efa2937aa 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -27,13 +27,16 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.bind.tuple.StringBinding; import com.sleepycat.je.*; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; @@ -41,6 +44,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; @@ -100,12 +105,17 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private String DELIVERYDB_NAME = "deliveryDb"; private String EXCHANGEDB_NAME = "exchangeDb"; private String QUEUEDB_NAME = "queueDb"; + private String BRIDGEDB_NAME = "bridges"; + private String LINKDB_NAME = "links"; + private Database _messageMetaDataDb; private Database _messageContentDb; private Database _queueBindingsDb; private Database _deliveryDb; private Database _exchangeDb; private Database _queueDb; + private Database _bridgeDb; + private Database _linkDb; /* ======= * Schema: @@ -190,6 +200,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore EXCHANGEDB_NAME += "_v" + version; QUEUEBINDINGSDB_NAME += "_v" + version; + + LINKDB_NAME += "_v" + version; + + BRIDGEDB_NAME += "_v" + version; } } @@ -461,6 +475,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig); _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig); + _linkDb = _environment.openDatabase(null, LINKDB_NAME, dbConfig); + _bridgeDb = _environment.openDatabase(null, BRIDGEDB_NAME, dbConfig); + } @@ -517,6 +534,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _deliveryDb.close(); } + if (_bridgeDb != null) + { + _log.info("Close bridge database"); + _bridgeDb.close(); + } + + if (_linkDb != null) + { + _log.info("Close link database"); + _linkDb.close(); + } + closeEnvironment(); _state = State.CLOSED; @@ -556,8 +585,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BindingRecoveryHandler brh = erh.completeExchangeRecovery(); recoverBindings(brh); - - brh.completeBindingRecovery(); + + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + recoverBrokerLinks(lrh); } catch (DatabaseException e) { @@ -674,6 +704,74 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } + + private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) + { + Cursor cursor = null; + + try + { + cursor = _linkDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + long createTime = LongBinding.entryToLong(value); + Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); + + ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); + + recoverBridges(brh, id); + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + } + + private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) + { + Cursor cursor = null; + + try + { + cursor = _bridgeDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + if(parentId.equals(linkId)) + { + + long createTime = LongBinding.entryToLong(value); + Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); + brh.bridge(id,createTime,arguments); + } + } + brh.completeBridgeRecoveryForLink(); + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + } + + private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException { StoredMessageRecoveryHandler mrh = msrh.begin(); @@ -1163,6 +1261,90 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + LongBinding.longToEntry(link.getCreateTime(),value); + StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); + + try + { + _linkDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing Link " + link + + " to database: " + e.getMessage(), e); + } + } + } + + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); + try + { + OperationStatus status = _linkDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Link " + link + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e); + } + } + + public void createBridge(final Bridge bridge) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value); + LongBinding.longToEntry(bridge.getCreateTime(),value); + StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value); + + try + { + _bridgeDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing Bridge " + bridge + + " to database: " + e.getMessage(), e); + } + + } + } + + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key); + try + { + OperationStatus status = _bridgeDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Bridge " + bridge + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e); + } + } + /** * Places a message onto a specified queue, in a given transaction. * diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java new file mode 100644 index 0000000000..f8fd39e127 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +import java.util.HashMap; +import java.util.Map; + +public class StringMapBinding extends TupleBinding<Map<String,String>> +{ + + private static final StringMapBinding INSTANCE = new StringMapBinding(); + + public Map<String, String> entryToObject(final TupleInput tupleInput) + { + int entries = tupleInput.readInt(); + Map<String,String> map = new HashMap<String,String>(entries); + for(int i = 0; i < entries; i++) + { + map.put(tupleInput.readString(), tupleInput.readString()); + } + return map; + } + + + public void objectToEntry(final Map<String, String> stringStringMap, final TupleOutput tupleOutput) + { + tupleOutput.writeInt(stringStringMap.size()); + for(Map.Entry<String,String> entry : stringStringMap.entrySet()) + { + tupleOutput.writeString(entry.getKey()); + tupleOutput.writeString(entry.getValue()); + } + } + + public static StringMapBinding getInstance() + { + return INSTANCE; + } +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java new file mode 100644 index 0000000000..c1a5d473f0 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +import java.util.UUID; + +public class UUIDTupleBinding extends TupleBinding<UUID> +{ + private static final UUIDTupleBinding INSTANCE = new UUIDTupleBinding(); + + public UUID entryToObject(final TupleInput tupleInput) + { + return new UUID(tupleInput.readLong(), tupleInput.readLong()); + } + + public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput) + { + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + } + + public static UUIDTupleBinding getInstance() + { + return INSTANCE; + } + + +} |