summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h6
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp11
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp6
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h1
-rw-r--r--qpid/cpp/src/qpid/sys/OutputTask.h10
-rw-r--r--qpid/cpp/src/tests/Makefile.am3
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp5
-rwxr-xr-xqpid/cpp/src/tests/restart_cluster18
13 files changed, 66 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 2525fed864..ab18d1f035 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -193,6 +193,8 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions.
}
}
+bool Connection::hasOutput() { return outputTasks.hasOutput(); }
+
bool Connection::doOutput() { return doOutputFn(); }
bool Connection::doOutputImpl() {
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index ae8708861a..1367f3b9ca 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -76,6 +76,7 @@ class Connection : public sys::ConnectionInputHandler,
void received(framing::AMQFrame& frame);
void idleOut();
void idleIn();
+ bool hasOutput();
bool doOutput();
void closed();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index ebb143a472..3b447e97f2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -212,6 +212,11 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
}
}
+bool Queue::empty() const {
+ Mutex::ScopedLock locker(messageLock);
+ return messages.empty();
+}
+
bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
{
while (true) {
@@ -348,7 +353,6 @@ void Queue::consume(Consumer& c, bool requestExclusive){
}
}
consumerCount++;
-
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 2d238ff57d..e35b3ef7ee 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -107,7 +107,6 @@ namespace qpid {
void notify();
void removeListener(Consumer&);
- void addListener(Consumer&);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
@@ -115,6 +114,9 @@ namespace qpid {
void popAndDequeue();
public:
+ // FIXME aconway 2008-08-06: was private, verify if needed public.
+ void addListener(Consumer&);
+
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -126,6 +128,8 @@ namespace qpid {
management::Manageable* parent = 0);
~Queue();
+ bool empty() const;
+
bool dispatch(Consumer&);
void create(const qpid::framing::FieldTable& settings);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 484a406c3b..bf034a0559 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -590,6 +590,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
unacked.erase(range.start, range.end);
}
+bool SemanticState::ConsumerImpl::hasOutput() {
+ queue->addListener(*this);
+ return !queue->empty();
+}
+
bool SemanticState::ConsumerImpl::doOutput()
{
//TODO: think through properly
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index a0424bf747..e03d5ec89b 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -96,6 +96,7 @@ class SemanticState : public sys::OutputTask,
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
+ bool hasOutput();
bool doOutput();
};
@@ -180,6 +181,7 @@ class SemanticState : public sys::OutputTask,
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
+ bool hasOutput() { return outputTasks.hasOutput(); }
bool doOutput() { return outputTasks.doOutput(); }
//final 0-10 spec (completed and accepted are distinct):
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
index 32c2054631..656f05e685 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -82,15 +82,16 @@ void ConnectionInterceptor::deliverClosed() {
}
bool ConnectionInterceptor::doOutput() {
- cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
+ if (connection->hasOutput()) {
+ printf("doOutput send %p\n", (void*)this);
+ cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
+ }
+
return false;
}
void ConnectionInterceptor::deliverDoOutput() {
- // FIXME aconway 2008-07-16: review thread safety.
- // All connection processing happens in cluster queue, only read & write
- // (from mutex-locked frameQueue) happens in reader/writer threads.
- //
+ printf("doOutput deliver %p\n", (void*)this);
doOutputNext();
}
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index 57cc0c5a33..2fad28c381 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -31,6 +31,12 @@ void AggregateOutput::activateOutput()
control.activateOutput();
}
+bool AggregateOutput::hasOutput() {
+ for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i)
+ if ((*i)->hasOutput()) return true;
+ return false;
+}
+
bool AggregateOutput::doOutput()
{
bool result = false;
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
index a870fcb95a..02a53ed50b 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.h
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -43,6 +43,7 @@ namespace sys {
void activateOutput();
//all the following will be called on the same thread
bool doOutput();
+ bool hasOutput();
void addOutputTask(OutputTask* t);
void removeOutputTask(OutputTask* t);
};
diff --git a/qpid/cpp/src/qpid/sys/OutputTask.h b/qpid/cpp/src/qpid/sys/OutputTask.h
index 109765b8c3..005ae7dbc4 100644
--- a/qpid/cpp/src/qpid/sys/OutputTask.h
+++ b/qpid/cpp/src/qpid/sys/OutputTask.h
@@ -28,7 +28,17 @@ namespace sys {
{
public:
virtual ~OutputTask() {}
+ /** Generate some output.
+ *@return true if output was generated, false if there is no work to do.
+ */
virtual bool doOutput() = 0;
+
+ /** Check if there may be work to do, but don't do it.
+ * @return True if there may be work to do, false if there is none.
+ * Can to return a false positive, to allow implementations to do a
+ * faster check than doOutput(). Must never return a false negative.
+ */
+ virtual bool hasOutput() = 0;
};
}
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index d68b88b7bf..31328ef59a 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -135,7 +135,8 @@ EXTRA_DIST += \
MessageUtils.h \
TestMessageStore.h \
MockConnectionInputHandler.h \
- TxMocks.h
+ TxMocks.h \
+ start_cluster stop_cluster restart_cluster
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 9a907fc476..da542352d9 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -162,6 +162,8 @@ struct Callback : public Cpg::Handler {
}
};
+#if 0 // FIXME aconway 2008-08-06:
+
QPID_AUTO_TEST_CASE(CpgBasic) {
// Verify basic functionality of cpg. This will catch any
// openais configuration or permission errors.
@@ -182,7 +184,6 @@ QPID_AUTO_TEST_CASE(CpgBasic) {
BOOST_CHECK_EQUAL(0, cb.configChanges[1]);
}
-
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
@@ -249,7 +250,7 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) {
BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-
+#endif
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
ClusterFixture cluster(3);
// First start a subscription.
diff --git a/qpid/cpp/src/tests/restart_cluster b/qpid/cpp/src/tests/restart_cluster
new file mode 100755
index 0000000000..e288805674
--- /dev/null
+++ b/qpid/cpp/src/tests/restart_cluster
@@ -0,0 +1,18 @@
+#!/bin/sh
+# Re-start a cluster on the local host.
+
+srcdir=`dirname $0`
+$srcdir/stop_cluster
+exec $srcdir/start_cluster "$@"
+#!/bin/sh
+# Re-start a cluster on the local host.
+
+srcdir=`dirname $0`
+$srcdir/stop_cluster
+exec $srcdir/start_cluster "$@"
+#!/bin/sh
+# Re-start a cluster on the local host.
+
+srcdir=`dirname $0`
+$srcdir/stop_cluster
+exec $srcdir/start_cluster "$@"