summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:05:24 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:05:24 +0000
commitf2493aca9f7e250895b72918c7aef4c915636621 (patch)
tree5f1afb045fa663b1b509928e40149eff6bb39c8d
parent9c47ca9c1525c9add266eb733257c8539ad96c7e (diff)
downloadqpid-python-f2493aca9f7e250895b72918c7aef4c915636621.tar.gz
QPID-3603: Set bridge sync parameter to 1.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233658 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py12
4 files changed, 18 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 4fe76dabd8..df13c87f01 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -116,7 +116,7 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired) {
queue->dequeue(ctxt, msg);
} else if (isDelayedCompletion) {
- //TODO: this is a nasty way to do this; change it
+ // FIXME aconway 2011-12-05: This should be done in HA code.
msg.payload->getIngressCompletion().finishCompleter();
QPID_LOG(debug, "Completed " << msg.payload.get());
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index d3eefe7369..28f7911614 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -36,6 +36,7 @@
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
const std::string TYPE_NAME("qpid.queue-replicator");
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
}
namespace qpid {
@@ -50,6 +51,7 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
{
// FIXME aconway 2011-11-24: consistent logging.
QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
+ // Declare the replicator bridge.
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -77,11 +79,11 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
framing::FieldTable settings;
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ settings.setInt(QPID_SYNC_FREQUENCY, 1);
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
-
- peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 620d27f8ae..f0aa96db9b 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -169,12 +169,12 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m)
// Called with lock held.
void ReplicatingSubscription::generateDequeueEvent()
{
+ QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range);
string buf(range.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
range.encode(buffer);
range.clear();
buffer.reset();
-
//generate event message
boost::intrusive_ptr<Message> event = new Message();
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 51ef786c44..646ddefe54 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -106,6 +106,18 @@ class ShortTests(BrokerTest):
verify(b, "1", p)
verify(b, "2", p)
+ # Test a series of messages, enqueue and dequeue.
+ s = p.sender(queue("foo","all"))
+ msgs = [str(i) for i in range(10)]
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(b, "foo", msgs)
+ self.assert_browse_retry(p, "foo", msgs)
+ r = p.receiver("foo")
+ for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
+ p.acknowledge()
+ self.assert_browse_retry(p, "foo", [])
+ self.assert_browse_retry(b, "foo", [])
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])