summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:05:44 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:05:44 +0000
commit6e55ec37dfab608e3df77dcbc19df746035d2183 (patch)
tree05541556ad3101807262101f9cb640203aa3a87c
parent6009984585d817d2d16d2cc5d9cb2d1053293b9d (diff)
downloadqpid-python-6e55ec37dfab608e3df77dcbc19df746035d2183.tar.gz
QPID-3603: Fix QueueReplicator subscription parameters.
- Queue::destroyed cleans up observers. - Clean up log messages, comments, some variable names. - Improvements to brokertest.py git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233660 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp65
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
-rw-r--r--qpid/cpp/src/tests/brokertest.py14
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py48
7 files changed, 96 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index df13c87f01..f17795743b 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -118,7 +118,8 @@ bool DeliveryRecord::accept(TransactionContext* ctxt) {
} else if (isDelayedCompletion) {
// FIXME aconway 2011-12-05: This should be done in HA code.
msg.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(debug, "Completed " << msg.payload.get());
+ QPID_LOG(debug, "Completed " << msg.queue->getName()
+ << "[" << msg.position << "]");
}
setEnded();
QPID_LOG(debug, "Accepted " << id);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index eb6ab8ba15..12886eff0b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1070,6 +1070,10 @@ void Queue::destroyed()
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observers.clear();
+ }
}
void Queue::notifyDeleted()
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 1d14b23ee1..ccdc4dd0b1 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -83,7 +83,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
- peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
@@ -98,6 +98,7 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
qpid::framing::SequenceSet latest;
latest.decode(buffer);
+ QPID_LOG(trace, "HA: Backup received dequeues: " << latest);
//TODO: should be able to optimise the following
for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) {
if (current < *i) {
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 4284107b8a..5f7fe611cf 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -48,23 +48,26 @@ class ReplicationStateInitialiser
ReplicationStateInitialiser(
qpid::framing::SequenceSet& r,
const qpid::framing::SequenceNumber& s,
- const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e)
+ const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e)
{
- results.add(start, end);
+ dequeues.add(start, end);
}
void operator()(const QueuedMessage& message) {
if (message.position < start) {
//replica does not have a message that should still be on the queue
QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
+ // FIXME aconway 2011-12-09: we want the replica to dump
+ // its messages and start from scratch in this case.
} else if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued, so remove it from the results
- results.remove(message.position);
+ //i.e. message is within the intial range and has not been dequeued,
+ //so remove it from the dequeues
+ dequeues.remove(message.position);
} //else message has not been seen by replica yet so can be ignored here
}
private:
- qpid::framing::SequenceSet& results;
+ qpid::framing::SequenceSet& dequeues;
const qpid::framing::SequenceNumber start;
const qpid::framing::SequenceNumber end;
};
@@ -94,6 +97,7 @@ ReplicatingSubscription::Factory::create(
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, false, exclusive, tag,
resumeId, resumeTtl, arguments));
+ // FIXME aconway 2011-12-08: need to removeObserver also.
queue->addObserver(rs);
}
return rs;
@@ -115,6 +119,12 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ // FIXME aconway 2011-12-09: Here we take advantage of existing
+ // messages on the backup queue to reduce replication
+ // effort. However if the backup queue is inconsistent with being
+ // a backup of the primary queue, then we want to issue a warning
+ // and tell the backup to dump its messages and start replicating
+ // from scratch.
QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
@@ -127,19 +137,22 @@ ReplicatingSubscription::ReplicatingSubscription(
qpid::framing::SequenceNumber oldest;
if (queue->getOldest(oldest)) {
if (oldest >= hwm) {
- range.add(lwm, --oldest);
+ dequeues.add(lwm, --oldest);
} else if (oldest >= lwm) {
- ReplicationStateInitialiser initialiser(range, lwm, hwm);
+ ReplicationStateInitialiser initialiser(dequeues, lwm, hwm);
queue->eachMessage(initialiser);
- } else { //i.e. have older message on master than is reported to exist on replica
- QPID_LOG(warning, "HA: Replica missing message on master");
+ } else { //i.e. older message on master than is reported to exist on replica
+ // FIXME aconway 2011-12-09: dump and start from scratch?
+ QPID_LOG(warning, "HA: Replica missing message on primary");
}
} else {
//local queue (i.e. master) is empty
- range.add(lwm, queue->getPosition());
+ dequeues.add(lwm, queue->getPosition());
+ // FIXME aconway 2011-12-09: if hwm >
+ // queue->getPosition(), dump and start from scratch?
}
QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
- << range << " (lwm=" << lwm << ", hwm=" << hwm
+ << dequeues << " (lwm=" << lwm << ", hwm=" << hwm
<< ", current=" << queue->getPosition() << ")");
//set position of 'cursor'
position = hwm;
@@ -162,7 +175,7 @@ ReplicatingSubscription::~ReplicatingSubscription() {}
//under the message lock in the queue
void ReplicatingSubscription::enqueued(const QueuedMessage& m)
{
- QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
+ QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << getName());
//delay completion
m.payload->getIngressCompletion().startCompleter();
}
@@ -170,11 +183,11 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& m)
// Called with lock held.
void ReplicatingSubscription::generateDequeueEvent()
{
- QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range);
- string buf(range.encodedSize(),'\0');
+ QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << dequeues << " on " << getName());
+ string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
- range.encode(buffer);
- range.clear();
+ dequeues.encode(buffer);
+ dequeues.clear();
buffer.reset();
//generate event message
boost::intrusive_ptr<Message> event = new Message();
@@ -199,24 +212,20 @@ void ReplicatingSubscription::generateDequeueEvent()
events->deliver(event);
}
-// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this?
-// If a queue is drained with no new messages coming on
-// will the messages be dequeued on the backup?
-
-//called after the message has been removed from the deque and under
-//the message lock in the queue
+// Called after the message has been removed from the deque and under
+// the message lock in the queue.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
{
sys::Mutex::ScopedLock l(lock);
- range.add(m.position);
- // FIXME aconway 2011-11-29: q[pos] logging
- QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
+ dequeues.add(m.position);
+ QPID_LOG(trace, "HA: Added " << QueuePos(m)
+ << " to dequeue event; subscription at " << position);
}
- notify();
+ notify(); // Ensure a call to doDispatch
if (m.position > position) {
m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue");
+ QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early, dequeued.");
}
}
@@ -224,7 +233,7 @@ bool ReplicatingSubscription::doDispatch()
{
{
sys::Mutex::ScopedLock l(lock);
- if (!range.empty()) {
+ if (!dequeues.empty()) {
generateDequeueEvent();
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 2236aeffcd..c946b7b993 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -86,7 +86,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
private:
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
- qpid::framing::SequenceSet range;
+ qpid::framing::SequenceSet dequeues;
void generateDequeueEvent();
class DelegatingConsumer : public Consumer
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 8dcde5a863..f4a20eda5a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -500,30 +500,30 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
return cluster
- def browse(self, session, queue, timeout=0):
+ def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
r.capacity = 100
try:
contents = []
try:
- while True: contents.append(r.fetch(timeout=timeout).content)
+ while True: contents.append(transform(r.fetch(timeout=timeout)))
except messaging.Empty: pass
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda d:m.content):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
- actual_contents = self.browse(session, queue, timeout)
+ actual_contents = self.browse(session, queue, timeout, transform=transform)
self.assertEqual(expect_contents, actual_contents)
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
"""Wait up to timeout for contents of queue to match expect_contents"""
- def test(): return self.browse(session, queue, 0) == expect_contents
+ test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
- self.assertEqual(expect_contents, self.browse(session, queue, 0))
+ self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
def join(thread, timeout=10):
thread.join(timeout)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 43e17c8ebb..876853cb4c 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -118,9 +118,12 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", [])
self.assert_browse_retry(b, "foo", [])
+ def qpid_replicate(self, value="all"):
+ return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
+
def test_sync(self):
def queue(name, replicate):
- return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+ return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
s = p.sender(queue("q","all"))
@@ -134,12 +137,47 @@ class ShortTests(BrokerTest):
s.sync()
msgs = [str(i) for i in range(30)]
- b = backup1.connect().session()
- self.assert_browse_retry(b, "q", msgs)
-
- b = backup2.connect().session()
+ self.assert_browse_retry(backup1.connect().session(), "q", msgs)
self.assert_browse_retry(backup2.connect().session(), "q", msgs)
+ def test_send_receive(self):
+ # FIXME aconway 2011-12-09: test with concurrent senders/receivers.
+ debug = ["-t"] # FIXME aconway 2011-12-08:
+ primary = self.ha_broker(name="primary", broker_url="primary", args=debug)
+ backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port(), args=debug)
+ backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port(), args=debug)
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", primary.host_port(),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+ "--messages=1000", # FIXME aconway 2011-12-09:
+ "--content-string=x"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", primary.host_port(),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")),
+ "--messages=990", # FIXME aconway 2011-12-09:
+ "--timeout=10"
+ ])
+ try:
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ self.assert_browse_retry(backup1.connect().session(), "q", expect, transform=sn)
+ self.assert_browse_retry(backup2.connect().session(), "q", expect, transform=sn)
+ except:
+ # FIXME aconway 2011-12-09:
+ print self.browse(primary.connect().session(), "q", transform=sn)
+ print self.browse(backup1.connect().session(), "q", transform=sn)
+ print self.browse(backup2.connect().session(), "q", transform=sn)
+# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(primary.host_port()))
+# print "---- backup1"
+# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup1.host_port()))
+# print "---- backup2"
+# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup2.host_port()))
+ raise
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)