diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-13 17:26:37 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-13 17:26:37 +0000 |
commit | 76cab2fbbc9c47d254414d588b5bb98552f5ab8d (patch) | |
tree | 15a4afb0facee780af2661a78e1ca58fa573309e | |
parent | 1171769f32e7b7b55ec7ee1b30d10c5da3a20242 (diff) | |
download | qpid-python-76cab2fbbc9c47d254414d588b5bb98552f5ab8d.tar.gz |
QPID-3835: add the empty 'no selector' argument to bindings for DurableSubscription queues that dont yet have the selector argument at all
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1243616 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 21 | ||||
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java | 70 | ||||
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java) | 6 | ||||
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java | 6 | ||||
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java | 10 |
5 files changed, 84 insertions, 29 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 92dd592143..29f2a2f2fb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -68,6 +68,7 @@ import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory; @@ -691,13 +692,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore cursor = _queueBindingsDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - TupleBinding binding = _bindingTupleBindingFactory.getInstance(); + TupleBinding<BindingRecord> binding = _bindingTupleBindingFactory.getInstance(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { //yes, this is retrieving all the useful information from the key only. //For table compatibility it shall currently be left as is - BindingKey bindingRecord = (BindingKey) binding.entryToObject(key); + BindingRecord bindingRecord = binding.entryToObject(key); String exchangeName = bindingRecord.getExchangeName() == null ? null : bindingRecord.getExchangeName().asString(); @@ -1105,18 +1106,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } - - /** * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) */ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { + bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args)); + } + + protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException + { if (_state != State.RECOVERING) { - BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), - queue.getNameShortString(), routingKey, args); - DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); @@ -1134,8 +1135,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } catch (DatabaseException e) { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " - + exchange.getName() + " to database: " + e.getMessage(), e); + throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange " + + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e); } } } @@ -1148,7 +1149,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); - keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); + keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); try { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java index f064079606..817ba2a5f5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java @@ -37,14 +37,17 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.NullRootMessageLogger; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4; @@ -417,11 +420,6 @@ public class BDBStoreUpgrade TopicExchangeDiscoverer exchangeListVisitor = new TopicExchangeDiscoverer(); _oldMessageStore.visitExchanges(exchangeListVisitor); - - //Migrate _queueBindingsDb - _logger.info("Queue Bindings"); - moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding"); - //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those //which have a colon in their name and are bound to the Topic exchanges above DurableSubDiscoverer durSubQueueListVisitor = @@ -431,6 +429,14 @@ public class BDBStoreUpgrade final List<AMQShortString> durableSubQueues = durSubQueueListVisitor.getDurableSubQueues(); + + //Migrate _queueBindingsDb + _logger.info("Queue Bindings"); + BindingsVisitor bindingsVisitor = new BindingsVisitor(durableSubQueues, + _oldMessageStore.getBindingTupleBindingFactory().getInstance(), _newMessageStore); + _oldMessageStore.visitBindings(bindingsVisitor); + logCount(bindingsVisitor.getVisitedCount(), "Queue Binding"); + //Migrate _queueDb _logger.info("Queues"); @@ -1133,11 +1139,11 @@ public class BDBStoreUpgrade private class DurableSubDiscoverer extends DatabaseVisitor { private final List<AMQShortString> _durableSubQueues; - private final TupleBinding<BindingKey> _bindingTB; + private final TupleBinding<BindingRecord> _bindingTB; private final List<AMQShortString> _topicExchanges; - public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingKey> bindingTB) + public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingRecord> bindingTB) { _durableSubQueues = new ArrayList<AMQShortString>(); _bindingTB = bindingTB; @@ -1146,7 +1152,7 @@ public class BDBStoreUpgrade public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException { - BindingKey bindingRec = _bindingTB.entryToObject(key); + BindingRecord bindingRec = _bindingTB.entryToObject(key); AMQShortString queueName = bindingRec.getQueueName(); AMQShortString exchangeName = bindingRec.getExchangeName(); @@ -1204,6 +1210,54 @@ public class BDBStoreUpgrade } } + private static class BindingsVisitor extends DatabaseVisitor + { + private final List<AMQShortString> _durableSubQueues; + private final BDBMessageStore _newMessageStore; + private final TupleBinding<BindingRecord> _oldBindingTB; + private AMQShortString _selectorFilterKey; + + public BindingsVisitor(List<AMQShortString> durableSubQueues, + TupleBinding<BindingRecord> oldBindingTB, + BDBMessageStore newMessageStore) + { + _oldBindingTB = oldBindingTB; + _durableSubQueues = durableSubQueues; + _newMessageStore = newMessageStore; + _selectorFilterKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException + { + //All the information required in binding entries is actually in the *key* not value. + BindingRecord oldBindingRec = _oldBindingTB.entryToObject(key); + + AMQShortString queueName = oldBindingRec.getQueueName(); + AMQShortString exchangeName = oldBindingRec.getExchangeName(); + AMQShortString routingKey = oldBindingRec.getRoutingKey(); + FieldTable arguments = oldBindingRec.getArguments(); + + //if the queue name is in the gathered list then inspect its binding arguments + if (_durableSubQueues.contains(queueName)) + { + if(arguments == null) + { + arguments = new FieldTable(); + } + + if(!arguments.containsKey(_selectorFilterKey)) + { + //add the empty string (i.e. 'no selector') value for the selector argument + arguments.put(_selectorFilterKey, ""); + } + } + + //create the binding in the new store + _newMessageStore.bindQueue( + new BindingRecord(exchangeName, queueName, routingKey, arguments)); + } + } + private static class MetaDataVisitor extends DatabaseVisitor { private final TupleBinding<Object> _oldMetaDataTupleBinding; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java index 396f0ed817..394a6ea85c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java @@ -18,19 +18,19 @@ * under the License. * */ -package org.apache.qpid.server.store.berkeleydb; +package org.apache.qpid.server.store.berkeleydb.records; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -public class BindingKey extends Object +public class BindingRecord extends Object { private final AMQShortString _exchangeName; private final AMQShortString _queueName; private final AMQShortString _routingKey; private final FieldTable _arguments; - public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) + public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) { _exchangeName = exchangeName; _queueName = queueName; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java index 09d43e6a08..468096ccc5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java @@ -22,16 +22,16 @@ package org.apache.qpid.server.store.berkeleydb.tuples; import com.sleepycat.bind.tuple.TupleBinding; -import org.apache.qpid.server.store.berkeleydb.BindingKey; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; -public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey> +public class BindingTupleBindingFactory extends TupleBindingFactory<BindingRecord> { public BindingTupleBindingFactory(int version) { super(version); } - public TupleBinding<BindingKey> getInstance() + public TupleBinding<BindingRecord> getInstance() { switch (getVersion()) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java index e00ea5aac2..c6a5e63bc8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java @@ -29,10 +29,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.BindingKey; import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; -public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingTuple +public class BindingTuple_4 extends TupleBinding<BindingRecord> implements BindingTuple { protected static final Logger _log = Logger.getLogger(BindingTuple.class); @@ -41,7 +41,7 @@ public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingT super(); } - public BindingKey entryToObject(TupleInput tupleInput) + public BindingRecord entryToObject(TupleInput tupleInput) { AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); @@ -60,10 +60,10 @@ public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingT return null; } - return new BindingKey(exchangeName, queueName, routingKey, arguments); + return new BindingRecord(exchangeName, queueName, routingKey, arguments); } - public void objectToEntry(BindingKey binding, TupleOutput tupleOutput) + public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput) { AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); |