diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Stoppable.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 30 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster.mk | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 15 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/tests/testagent.mk | 2 |
12 files changed, 51 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 8f807cd0fe..32b037bb21 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1293,14 +1293,13 @@ void Queue::UsageBarrier::destroy() // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? void Queue::stop() { - QPID_LOG(critical, "FIXME Queue stopped " << getName()); + QPID_LOG(trace, "Queue stopped: " << getName()); // FIXME aconway 2011-05-25: rename dispatching - acquiring? dispatching.stop(); } void Queue::start() { - QPID_LOG(critical, "FIXME Queue started " << getName()); - assert(clusterContext); // FIXME aconway 2011-06-08: XXX + QPID_LOG(trace, "Queue started: " << getName()); dispatching.start(); notifyListener(); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 31d03e95b0..9435750b4e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -402,16 +402,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, void start(); /** Context information used in a cluster. */ - boost::intrusive_ptr<RefCounted> getClusterContext() { - // FIXME aconway 2011-06-08: XXX - QPID_LOG(critical, "FIXME q get context " << name << clusterContext); - return clusterContext; - } - void setClusterContext(boost::intrusive_ptr<RefCounted> context) { - // FIXME aconway 2011-06-08: XXX - clusterContext = context; - QPID_LOG(critical, "FIXME q set context " << name << clusterContext); - } + boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; } + void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; } + }; }} // qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index fa247ae8f5..f30a790547 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -136,7 +136,7 @@ void BrokerContext::create(broker::Queue& q) { framing::Buffer buf(&data[0], data.size()); q.encode(buf); core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data)); - QPID_LOG(critical, "FIXME BrokerContext create " << q.getName() << q.getClusterContext().get()); + // FIXME aconway 2011-07-29: Need asynchronous completion. } void BrokerContext::destroy(broker::Queue& q) { diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 86894b9dd9..211b7052e5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -95,7 +95,7 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) { BrokerContext::ScopedSuppressReplication ssr; bool ok = queue->acquireMessageAt(position, qm); (void)ok; // Avoid unused variable warnings. - assert(ok); + assert(ok); // FIXME aconway 2011-08-04: failing this assertion. assert(qm.position.getValue() == position); assert(qm.payload); } diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 122163ee7e..60b218da14 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -56,7 +56,7 @@ QueueContext::~QueueContext() { // FIXME aconway 2011-07-27: revisit shutdown logic. // timeout() could be called concurrently with destructor. sys::Mutex::ScopedLock l(lock); - timerTask->cancel(); + if (timerTask) timerTask->cancel(); } void QueueContext::replicaState(QueueOwnership state) { @@ -84,6 +84,7 @@ void QueueContext::replicaState(QueueOwnership state) { // FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer. +// Called in connection threads when a consumer is added void QueueContext::consume(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; @@ -91,6 +92,7 @@ void QueueContext::consume(size_t n) { framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); } +// Called in connection threads when a consumer is cancelled void QueueContext::cancel(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; @@ -100,6 +102,7 @@ void QueueContext::cancel(size_t n) { void QueueContext::timeout() { QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName()); queue.stop(); + // When all threads have stopped, queue will call stopped() } @@ -109,7 +112,7 @@ void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-07-28: review thread safety of state. // Deffered call to stopped doesn't sit well. - // queueActive is invaled while stop is in progress? + // queueActive is invalid while stop is in progress? if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h index 6bb02bc6af..ac0f03d3a1 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -64,10 +64,9 @@ class Stoppable { /** * Set state to "stopped", so no new threads can enter. - * Call notify function when all busy threads have left. + * Notify function will be called when all busy threads have left. + * No-op if already stopping. */ - // FIXME aconway 2011-06-27: not guaranteed that stopped will be called, - // deadlock? void stop() { sys::Monitor::ScopedLock l(lock); stopped = true; @@ -75,6 +74,7 @@ class Stoppable { } /** Set the state to "started", allow threads to enter. + * If already stopping this will prevent notify function from being called. */ void start() { sys::Monitor::ScopedLock l(lock); diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index bd75432d57..8c8c6c6b15 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -51,9 +51,9 @@ CLEAN_LOCAL= qpidexecdir = $(libexecdir)/qpid qpidexec_PROGRAMS = qpidexec_SCRIPTS = -qpidtestdir = $(qpidexecdir)/tests -qpidtest_PROGRAMS = -qpidtest_SCRIPTS = +qpidexectestdir = $(qpidexecdir)/tests +qpidexectest_PROGRAMS = +qpidexectest_SCRIPTS = tmoduledir = $(libdir)/qpid/tests tmodule_LTLIBRARIES= @@ -150,17 +150,17 @@ endif # Test programs that are installed and therefore built as part of make, not make check -qpidtest_SCRIPTS += qpid-cpp-benchmark install_env.sh +qpidexectest_SCRIPTS += qpid-cpp-benchmark install_env.sh EXTRA_DIST += qpid-cpp-benchmark install_env.sh -qpidtest_PROGRAMS += receiver +qpidexectest_PROGRAMS += receiver receiver_SOURCES = \ receiver.cpp \ TestOptions.h \ ConnectionOptions.h receiver_LDADD = $(lib_client) -qpidtest_PROGRAMS += sender +qpidexectest_PROGRAMS += sender sender_SOURCES = \ sender.cpp \ TestOptions.h \ @@ -168,7 +168,7 @@ sender_SOURCES = \ Statistics.cpp sender_LDADD = $(lib_messaging) -qpidtest_PROGRAMS += qpid-receive +qpidexectest_PROGRAMS += qpid-receive qpid_receive_SOURCES = \ qpid-receive.cpp \ TestOptions.h \ @@ -177,7 +177,7 @@ qpid_receive_SOURCES = \ Statistics.cpp qpid_receive_LDADD = $(lib_messaging) -qpidtest_PROGRAMS += qpid-send +qpidexectest_PROGRAMS += qpid-send qpid_send_SOURCES = \ qpid-send.cpp \ TestOptions.h \ @@ -186,37 +186,37 @@ qpid_send_SOURCES = \ Statistics.cpp qpid_send_LDADD = $(lib_messaging) -qpidtest_PROGRAMS+=qpid-perftest +qpidexectest_PROGRAMS+=qpid-perftest qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) qpid_perftest_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid-txtest +qpidexectest_PROGRAMS+=qpid-txtest qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h qpid_txtest_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid-latency-test +qpidexectest_PROGRAMS+=qpid-latency-test qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h qpid_latency_test_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid-client-test +qpidexectest_PROGRAMS+=qpid-client-test qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h qpid_client_test_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid-topic-listener +qpidexectest_PROGRAMS+=qpid-topic-listener qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h qpid_topic_listener_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid-topic-publisher +qpidexectest_PROGRAMS+=qpid-topic-publisher qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h qpid_topic_publisher_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid-ping +qpidexectest_PROGRAMS+=qpid-ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_ping_LDADD=$(lib_client) diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 7b231259d5..2a21b26a2b 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -418,7 +418,6 @@ class Cluster: self.args += [ cluster_name, "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] - self.args += [ "--log-enable=info+", "--log-enable=trace+:cluster"] assert cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", cluster_lib ] diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index bf5064e74c..bbcc46a120 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -80,7 +80,7 @@ LONG_TESTS += \ cluster_python_tests \ stop_cluster -qpidtest_PROGRAMS += cluster_test +qpidexectest_PROGRAMS += cluster_test cluster_test_SOURCES = \ cluster_test.cpp \ @@ -94,7 +94,7 @@ cluster_test_SOURCES = \ cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework -qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail -qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST) +qpidexectest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail +qpidexectest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST) endif diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 74bf2df22d..fcc76f6cf3 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -74,7 +74,7 @@ def posix_quote(string): return "'" + single_quote_re.sub("\\'", string) + "'"; def ssh_command(host, command): - """Convert command into an ssh command on host with quoting""" + """ Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] class Clients: @@ -150,8 +150,8 @@ def first_line(p): def queue_exists(queue,broker): c = qpid.messaging.Connection(broker) c.open() - s = c.session() try: + s = c.session() try: s.sender(queue) return True @@ -168,12 +168,12 @@ def recreate_queues(queues, brokers): except qpid.messaging.exceptions.NotFound: pass # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. for b in brokers: - while queue_exists(q,b): time.sleep(0.001); + while queue_exists(q,b): time.sleep(0.1); for q in queues: s.sender("%s;{create:always}"%q) # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. for b in brokers: - while not queue_exists(q,b): time.sleep(0.001); + while not queue_exists(q,b): time.sleep(0.1); c.close() def print_header(timestamp): @@ -251,9 +251,12 @@ class RoundRobin: def main(): opts, args = op.parse_args() - if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker - opts.broker = flatten(opts.broker) opts.client_host = flatten(opts.client_host) + if not opts.broker: + if opts.client_host: + raise Exception("--broker must be specified if --client_host is.") + opts.broker = ["127.0.0.1"] # Deafult to local broker + opts.broker = flatten(opts.broker) brokers = RoundRobin(opts.broker) client_hosts = RoundRobin(opts.client_host) send_out = "" diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 9c713e872a..fc33685407 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -190,13 +190,20 @@ int main(int argc, char ** argv) session.createSender(opts.readyAddress).send(msg); // For receive rate calculation - qpid::sys::AbsTime start = qpid::sys::now(); + qpid::sys::AbsTime start; // Will be set on first itertion. + bool started=false; int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; std::map<std::string,Sender> replyTo; while (!done && receiver.fetch(msg, timeout)) { + if (!started) { + // Start the time on receipt of the first message to avoid counting + // idle time at process startup. + start = qpid::sys::AbsTime::now(); + started = true; + } reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { diff --git a/qpid/cpp/src/tests/testagent.mk b/qpid/cpp/src/tests/testagent.mk index 19d91ccab9..25cf43d71e 100644 --- a/qpid/cpp/src/tests/testagent.mk +++ b/qpid/cpp/src/tests/testagent.mk @@ -43,7 +43,7 @@ testagent_gen.timestamp: testagent.xml ${TESTAGENT_DEPS} CLEANFILES+=$(TESTAGENT_GEN_SRC) testagent_gen.timestamp testagent-testagent.$(OBJEXT): $(TESTAGENT_GEN_SRC) -qpidtest_PROGRAMS+=testagent +qpidexectest_PROGRAMS+=testagent testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC) testagent_LDADD=$(top_builddir)/src/libqmf.la |