summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-02-13 17:26:37 +0000
committerRobert Gemmell <robbie@apache.org>2012-02-13 17:26:37 +0000
commit76cab2fbbc9c47d254414d588b5bb98552f5ab8d (patch)
tree15a4afb0facee780af2661a78e1ca58fa573309e
parent1171769f32e7b7b55ec7ee1b30d10c5da3a20242 (diff)
downloadqpid-python-76cab2fbbc9c47d254414d588b5bb98552f5ab8d.tar.gz
QPID-3835: add the empty 'no selector' argument to bindings for DurableSubscription queues that dont yet have the selector argument at all
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1243616 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java21
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java70
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java)6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java10
5 files changed, 84 insertions, 29 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 92dd592143..29f2a2f2fb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -68,6 +68,7 @@ import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
+import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
@@ -691,13 +692,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
cursor = _queueBindingsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _bindingTupleBindingFactory.getInstance();
+ TupleBinding<BindingRecord> binding = _bindingTupleBindingFactory.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
//yes, this is retrieving all the useful information from the key only.
//For table compatibility it shall currently be left as is
- BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
+ BindingRecord bindingRecord = binding.entryToObject(key);
String exchangeName = bindingRecord.getExchangeName() == null ? null :
bindingRecord.getExchangeName().asString();
@@ -1105,18 +1106,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
-
-
/**
* @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
*/
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
+ bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args));
+ }
+
+ protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException
+ {
if (_state != State.RECOVERING)
{
- BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
- queue.getNameShortString(), routingKey, args);
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
@@ -1134,8 +1135,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
catch (DatabaseException e)
{
- throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " to database: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange "
+ + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e);
}
}
}
@@ -1148,7 +1149,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
+ keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
try
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
index f064079606..817ba2a5f5 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
@@ -37,14 +37,17 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
+import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
@@ -417,11 +420,6 @@ public class BDBStoreUpgrade
TopicExchangeDiscoverer exchangeListVisitor = new TopicExchangeDiscoverer();
_oldMessageStore.visitExchanges(exchangeListVisitor);
-
- //Migrate _queueBindingsDb
- _logger.info("Queue Bindings");
- moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
-
//Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
//which have a colon in their name and are bound to the Topic exchanges above
DurableSubDiscoverer durSubQueueListVisitor =
@@ -431,6 +429,14 @@ public class BDBStoreUpgrade
final List<AMQShortString> durableSubQueues = durSubQueueListVisitor.getDurableSubQueues();
+
+ //Migrate _queueBindingsDb
+ _logger.info("Queue Bindings");
+ BindingsVisitor bindingsVisitor = new BindingsVisitor(durableSubQueues,
+ _oldMessageStore.getBindingTupleBindingFactory().getInstance(), _newMessageStore);
+ _oldMessageStore.visitBindings(bindingsVisitor);
+ logCount(bindingsVisitor.getVisitedCount(), "Queue Binding");
+
//Migrate _queueDb
_logger.info("Queues");
@@ -1133,11 +1139,11 @@ public class BDBStoreUpgrade
private class DurableSubDiscoverer extends DatabaseVisitor
{
private final List<AMQShortString> _durableSubQueues;
- private final TupleBinding<BindingKey> _bindingTB;
+ private final TupleBinding<BindingRecord> _bindingTB;
private final List<AMQShortString> _topicExchanges;
- public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingKey> bindingTB)
+ public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingRecord> bindingTB)
{
_durableSubQueues = new ArrayList<AMQShortString>();
_bindingTB = bindingTB;
@@ -1146,7 +1152,7 @@ public class BDBStoreUpgrade
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
- BindingKey bindingRec = _bindingTB.entryToObject(key);
+ BindingRecord bindingRec = _bindingTB.entryToObject(key);
AMQShortString queueName = bindingRec.getQueueName();
AMQShortString exchangeName = bindingRec.getExchangeName();
@@ -1204,6 +1210,54 @@ public class BDBStoreUpgrade
}
}
+ private static class BindingsVisitor extends DatabaseVisitor
+ {
+ private final List<AMQShortString> _durableSubQueues;
+ private final BDBMessageStore _newMessageStore;
+ private final TupleBinding<BindingRecord> _oldBindingTB;
+ private AMQShortString _selectorFilterKey;
+
+ public BindingsVisitor(List<AMQShortString> durableSubQueues,
+ TupleBinding<BindingRecord> oldBindingTB,
+ BDBMessageStore newMessageStore)
+ {
+ _oldBindingTB = oldBindingTB;
+ _durableSubQueues = durableSubQueues;
+ _newMessageStore = newMessageStore;
+ _selectorFilterKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
+ }
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
+ {
+ //All the information required in binding entries is actually in the *key* not value.
+ BindingRecord oldBindingRec = _oldBindingTB.entryToObject(key);
+
+ AMQShortString queueName = oldBindingRec.getQueueName();
+ AMQShortString exchangeName = oldBindingRec.getExchangeName();
+ AMQShortString routingKey = oldBindingRec.getRoutingKey();
+ FieldTable arguments = oldBindingRec.getArguments();
+
+ //if the queue name is in the gathered list then inspect its binding arguments
+ if (_durableSubQueues.contains(queueName))
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+
+ if(!arguments.containsKey(_selectorFilterKey))
+ {
+ //add the empty string (i.e. 'no selector') value for the selector argument
+ arguments.put(_selectorFilterKey, "");
+ }
+ }
+
+ //create the binding in the new store
+ _newMessageStore.bindQueue(
+ new BindingRecord(exchangeName, queueName, routingKey, arguments));
+ }
+ }
+
private static class MetaDataVisitor extends DatabaseVisitor
{
private final TupleBinding<Object> _oldMetaDataTupleBinding;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java
index 396f0ed817..394a6ea85c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java
@@ -18,19 +18,19 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.records;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-public class BindingKey extends Object
+public class BindingRecord extends Object
{
private final AMQShortString _exchangeName;
private final AMQShortString _queueName;
private final AMQShortString _routingKey;
private final FieldTable _arguments;
- public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
+ public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
{
_exchangeName = exchangeName;
_queueName = queueName;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
index 09d43e6a08..468096ccc5 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
@@ -22,16 +22,16 @@ package org.apache.qpid.server.store.berkeleydb.tuples;
import com.sleepycat.bind.tuple.TupleBinding;
-import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey>
+public class BindingTupleBindingFactory extends TupleBindingFactory<BindingRecord>
{
public BindingTupleBindingFactory(int version)
{
super(version);
}
- public TupleBinding<BindingKey> getInstance()
+ public TupleBinding<BindingRecord> getInstance()
{
switch (getVersion())
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
index e00ea5aac2..c6a5e63bc8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
@@ -29,10 +29,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingTuple
+public class BindingTuple_4 extends TupleBinding<BindingRecord> implements BindingTuple
{
protected static final Logger _log = Logger.getLogger(BindingTuple.class);
@@ -41,7 +41,7 @@ public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingT
super();
}
- public BindingKey entryToObject(TupleInput tupleInput)
+ public BindingRecord entryToObject(TupleInput tupleInput)
{
AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
@@ -60,10 +60,10 @@ public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingT
return null;
}
- return new BindingKey(exchangeName, queueName, routingKey, arguments);
+ return new BindingRecord(exchangeName, queueName, routingKey, arguments);
}
- public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
+ public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput)
{
AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);