diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:24 +0000 |
commit | f2493aca9f7e250895b72918c7aef4c915636621 (patch) | |
tree | 5f1afb045fa663b1b509928e40149eff6bb39c8d | |
parent | 9c47ca9c1525c9add266eb733257c8539ad96c7e (diff) | |
download | qpid-python-f2493aca9f7e250895b72918c7aef4c915636621.tar.gz |
QPID-3603: Set bridge sync parameter to 1.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233658 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 12 |
4 files changed, 18 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 4fe76dabd8..df13c87f01 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -116,7 +116,7 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired) { queue->dequeue(ctxt, msg); } else if (isDelayedCompletion) { - //TODO: this is a nasty way to do this; change it + // FIXME aconway 2011-12-05: This should be done in HA code. msg.payload->getIngressCompletion().finishCompleter(); QPID_LOG(debug, "Completed " << msg.payload.get()); } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index d3eefe7369..28f7911614 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -36,6 +36,7 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); const std::string TYPE_NAME("qpid.queue-replicator"); +const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); } namespace qpid { @@ -50,6 +51,7 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L { // FIXME aconway 2011-11-24: consistent logging. QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); + // Declare the replicator bridge. queue->getBroker()->getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -77,11 +79,11 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa framing::FieldTable settings; settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + settings.setInt(QPID_SYNC_FREQUENCY, 1); qpid::framing::SequenceNumber oldest; if (queue->getOldest(oldest)) settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest); - - peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings); + peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 620d27f8ae..f0aa96db9b 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -169,12 +169,12 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m) // Called with lock held. void ReplicatingSubscription::generateDequeueEvent() { + QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range); string buf(range.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); range.encode(buffer); range.clear(); buffer.reset(); - //generate event message boost::intrusive_ptr<Message> event = new Message(); AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 51ef786c44..646ddefe54 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -106,6 +106,18 @@ class ShortTests(BrokerTest): verify(b, "1", p) verify(b, "2", p) + # Test a series of messages, enqueue and dequeue. + s = p.sender(queue("foo","all")) + msgs = [str(i) for i in range(10)] + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(b, "foo", msgs) + self.assert_browse_retry(p, "foo", msgs) + r = p.receiver("foo") + for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", []) + self.assert_browse_retry(b, "foo", []) + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) |