summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-21 19:15:44 +0000
committerAlan Conway <aconway@apache.org>2008-05-21 19:15:44 +0000
commit872d5283acf5839755e6c3194ceca5174aa198df (patch)
treea628df948b7c06ff17bcc83ad74487a76766b5cf
parent1ca56a8cc418df64061653777962b61852dff63f (diff)
downloadqpid-python-872d5283acf5839755e6c3194ceca5174aa198df.tar.gz
Replaced AtomicCount with AtomicValue template.
Uses gcc atomics for gcc on i686/x86_64, falls back to mutex otherwise. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658816 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am4
-rw-r--r--cpp/src/qpid/sys/AtomicCount.h53
-rw-r--r--cpp/src/qpid/sys/AtomicValue.h34
-rw-r--r--cpp/src/qpid/sys/AtomicValue_gcc.h68
-rw-r--r--cpp/src/qpid/sys/AtomicValue_mutex.h81
-rw-r--r--cpp/src/tests/AtomicValue.cpp49
-rw-r--r--cpp/src/tests/EventChannelTest.cpp187
-rw-r--r--cpp/src/tests/EventChannelThreadsTest.cpp247
-rw-r--r--cpp/src/tests/Makefile.am7
9 files changed, 237 insertions, 493 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index a946f17b56..f17d322dab 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -518,7 +518,9 @@ nobase_include_HEADERS = \
qpid/sys/AggregateOutput.h \
qpid/sys/AsynchIO.h \
qpid/sys/AsynchIOHandler.h \
- qpid/sys/AtomicCount.h \
+ qpid/sys/AtomicValue.h \
+ qpid/sys/AtomicValue_gcc.h \
+ qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
qpid/sys/Condition.h \
qpid/sys/ConnectionCodec.h \
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h
deleted file mode 100644
index 54081092c8..0000000000
--- a/cpp/src/qpid/sys/AtomicCount.h
+++ /dev/null
@@ -1,53 +0,0 @@
-#ifndef _posix_AtomicCount_h
-#define _posix_AtomicCount_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/detail/atomic_count.hpp>
-#include "ScopedIncrement.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Atomic counter.
- */
-class AtomicCount {
- public:
- typedef ::qpid::sys::ScopedDecrement<AtomicCount> ScopedDecrement;
- typedef ::qpid::sys::ScopedIncrement<AtomicCount> ScopedIncrement;
-
- AtomicCount(long value = 0) : count(value) {}
-
- void operator++() { ++count ; }
-
- long operator--() { return --count; }
-
- operator long() const { return count; }
-
-
- private:
- boost::detail::atomic_count count;
-};
-
-
-}}
-
-
-#endif // _posix_AtomicCount_h
diff --git a/cpp/src/qpid/sys/AtomicValue.h b/cpp/src/qpid/sys/AtomicValue.h
new file mode 100644
index 0000000000..426e6a8f49
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicValue.h
@@ -0,0 +1,34 @@
+#ifndef QPID_SYS_ATOMICVALUE_H
+#define QPID_SYS_ATOMICVALUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#if defined( __GNUC__ ) && ( defined( __i686__ ) || defined( __x86_64__ ) )
+// Use the Gnu C built-in atomic operations if compiling with gcc on a suitable platform.
+#include "qpid/sys/AtomicValue_gcc.h"
+
+#else
+// Fall-back to mutex locked operations if we don't have atomic ops.
+#include "qpid/sys/AtomicValue_mutex.h"
+#endif
+
+#endif /*!QPID_SYS_ATOMICVALUE_GCC_H*/
diff --git a/cpp/src/qpid/sys/AtomicValue_gcc.h b/cpp/src/qpid/sys/AtomicValue_gcc.h
new file mode 100644
index 0000000000..164c19d88e
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicValue_gcc.h
@@ -0,0 +1,68 @@
+#ifndef QPID_SYS_ATOMICVALUE_GCC_H
+#define QPID_SYS_ATOMICVALUE_GCC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#if !defined(QPID_SYS_ATOMICVALUE_H) || !defined(__GNUC__)
+#error "This file should only be included via AtomicValue.h."
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic value of type T. T must be an integral type of size 1,2,4 or 8 bytes.
+ * All operations are atomic and preform a full memory barrier unless otherwise noted.
+ */
+template <class T>
+class AtomicValue
+{
+ public:
+ AtomicValue(T init=0) : value(init) {}
+
+ // Update and return new value.
+ inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); }
+ inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); }
+ inline T operator++() { return *this += 1; }
+ inline T operator--() { return *this -= 1; }
+
+ // Update and return old value.
+ inline T fetchAndAdd(T n) { return __sync_fetch_and_add(&value, n); }
+ inline T fetchAndSub(T n) { return __sync_fetch_and_sub(&value, n); }
+ inline T operator++(int) { return fetchAndAdd(1); }
+ inline T operator--(int) { return fetchAndSub(1); }
+
+ /** If current value == testval then set to newval. Returns the old value. */
+ T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); }
+
+ /** If current value == testval then set to newval. Returns true if the swap was performed. */
+ bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
+
+ T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(0); }
+
+ private:
+ T value;
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_ATOMICVALUE_GCC_H*/
diff --git a/cpp/src/qpid/sys/AtomicValue_mutex.h b/cpp/src/qpid/sys/AtomicValue_mutex.h
new file mode 100644
index 0000000000..8871addbda
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicValue_mutex.h
@@ -0,0 +1,81 @@
+#ifndef QPID_SYS_ATOMICVALUE_MUTEX_H
+#define QPID_SYS_ATOMICVALUE_MUTEX_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#if !defined(QPID_SYS_ATOMICVALUE_H)
+#error "This file should only be included via AtomicValue.h."
+#endif
+
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic value of type T. T must be an integral type of size 1,2,4 or 8 bytes.
+ * All operations are atomic and preform a full memory barrier unless otherwise noted.
+ */
+template <class T>
+class AtomicValue
+{
+ public:
+ AtomicValue(T init=0) : value(init) {}
+
+ // Update and return new value.
+ inline T operator+=(T n) { Lock l(lock); return value += n; }
+ inline T operator-=(T n) { Lock l(lock); return value -= n; }
+ inline T operator++() { return *this += 1; }
+ inline T operator--() { return *this -= 1; }
+
+ // Update and return old value.
+ inline T fetchAndAdd(T n) { Lock l(lock); T old=value; value += n; return old; }
+ inline T fetchAndSub(T n) { Lock l(lock); T old=value; value -= n; return old; }
+ inline T operator++(int) { return fetchAndAdd(1); }
+ inline T operator--(int) { return fetchAndSub(1); }
+
+ /** If current value == testval then set to newval. Returns the old value. */
+ T valueCompareAndSwap(T testval, T newval) {
+ Lock l(lock);
+ T old=value;
+ if (value == testval) value = newval;
+ return old;
+ }
+
+ /** If current value == testval then set to newval. Returns true if the swap was performed. */
+ bool boolCompareAndSwap(T testval, T newval) {
+ Lock l(lock);
+ if (value == testval) { value = newval; return true; }
+ return false;
+ }
+
+ T get() const { Lock l(lock); return value; }
+
+ private:
+ typedef Mutex::ScopedLock Lock;
+ T value;
+ mutable Mutex lock;
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_ATOMICVALUE_MUTEX_H*/
diff --git a/cpp/src/tests/AtomicValue.cpp b/cpp/src/tests/AtomicValue.cpp
new file mode 100644
index 0000000000..05083ad177
--- /dev/null
+++ b/cpp/src/tests/AtomicValue.cpp
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/sys/AtomicValue.h"
+
+QPID_AUTO_TEST_SUITE(AtomicValueTestSuite)
+
+QPID_AUTO_TEST_CASE(test) {
+ qpid::sys::AtomicValue<int> x(0);
+ BOOST_CHECK_EQUAL(++x, 1);
+ BOOST_CHECK_EQUAL(--x,0);
+ BOOST_CHECK_EQUAL(x+=5,5);
+ BOOST_CHECK_EQUAL(x-=10,-5);
+ BOOST_CHECK_EQUAL(x.fetchAndAdd(7), -5);
+ BOOST_CHECK_EQUAL(x.get(),2);
+ BOOST_CHECK_EQUAL(x.fetchAndSub(3), 2);
+ BOOST_CHECK_EQUAL(x.get(),-1);
+
+ BOOST_CHECK_EQUAL(x.valueCompareAndSwap(-1,10), -1);
+ BOOST_CHECK_EQUAL(x.get(), 10);
+ BOOST_CHECK_EQUAL(x.valueCompareAndSwap(5, 6), 10);
+ BOOST_CHECK_EQUAL(x.get(), 10);
+
+ BOOST_CHECK(!x.boolCompareAndSwap(5, 6));
+ BOOST_CHECK_EQUAL(x.get(), 10);
+ BOOST_CHECK(x.boolCompareAndSwap(10, 6));
+ BOOST_CHECK_EQUAL(x.get(), 6);
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/EventChannelTest.cpp b/cpp/src/tests/EventChannelTest.cpp
deleted file mode 100644
index 6d8d64e165..0000000000
--- a/cpp/src/tests/EventChannelTest.cpp
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/posix/EventChannel.h"
-#include "qpid/sys/posix/check.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/Thread.h"
-#include "qpid_test_plugin.h"
-
-#include <sys/socket.h>
-#include <signal.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <iostream>
-
-using namespace qpid::sys;
-
-
-const char hello[] = "hello";
-const size_t size = sizeof(hello);
-
-struct RunMe : public Runnable
-{
- bool ran;
- RunMe() : ran(false) {}
- void run() { ran = true; }
-};
-
-class EventChannelTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(EventChannelTest);
- CPPUNIT_TEST(testEvent);
- CPPUNIT_TEST(testRead);
- CPPUNIT_TEST(testFailedRead);
- CPPUNIT_TEST(testWrite);
- CPPUNIT_TEST(testFailedWrite);
- CPPUNIT_TEST(testReadWrite);
- CPPUNIT_TEST(testAccept);
- CPPUNIT_TEST_SUITE_END();
-
- private:
- EventChannel::shared_ptr ec;
- int pipe[2];
- char readBuf[size];
-
- public:
-
- void setUp()
- {
- memset(readBuf, size, 0);
- ec = EventChannel::create();
- if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno);
- // Ignore SIGPIPE, otherwise we will crash writing to broken pipe.
- signal(SIGPIPE, SIG_IGN);
- }
-
- // Verify that calling getEvent returns event.
- template <class T> bool isNextEvent(T& event)
- {
- return &event == dynamic_cast<T*>(ec->getEvent());
- }
-
- template <class T> bool isNextEventOk(T& event)
- {
- Event* next = ec->getEvent();
- if (next) next->throwIfError();
- return &event == next;
- }
-
- void testEvent()
- {
- RunMe runMe;
- CPPUNIT_ASSERT(!runMe.ran);
- // Instances of Event just pass thru the channel immediately.
- Event e(runMe.functor());
- ec->postEvent(e);
- CPPUNIT_ASSERT(isNextEventOk(e));
- e.dispatch();
- CPPUNIT_ASSERT(runMe.ran);
- }
-
- void testRead() {
- ReadEvent re(pipe[0], readBuf, size);
- ec->postEvent(re);
- CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size));
- CPPUNIT_ASSERT(isNextEventOk(re));
- CPPUNIT_ASSERT_EQUAL(size, re.getSize());
- CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
- }
-
- void testFailedRead()
- {
- ReadEvent re(pipe[0], readBuf, size);
- ec->postEvent(re);
-
- // EOF before all data read.
- ::close(pipe[1]);
- CPPUNIT_ASSERT(isNextEvent(re));
- CPPUNIT_ASSERT(re.hasError());
- try {
- re.throwIfError();
- CPPUNIT_FAIL("Expected Exception.");
- }
- catch (const qpid::Exception&) { }
-
- // Bad file descriptor. Note in this case we fail
- // in postEvent and throw immediately.
- try {
- ReadEvent bad;
- ec->postEvent(bad);
- CPPUNIT_FAIL("Expected Exception.");
- }
- catch (const qpid::Exception&) { }
- }
-
- void testWrite() {
- WriteEvent wr(pipe[1], hello, size);
- ec->postEvent(wr);
- CPPUNIT_ASSERT(isNextEventOk(wr));
- CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));;
- CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
- }
-
- void testFailedWrite() {
- WriteEvent wr(pipe[1], hello, size);
- ::close(pipe[0]);
- ec->postEvent(wr);
- CPPUNIT_ASSERT(isNextEvent(wr));
- CPPUNIT_ASSERT(wr.hasError());
- }
-
- void testReadWrite()
- {
- ReadEvent re(pipe[0], readBuf, size);
- WriteEvent wr(pipe[1], hello, size);
- ec->postEvent(re);
- ec->postEvent(wr);
- ec->getEvent();
- ec->getEvent();
- CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
- }
-
- void testAccept() {
- Socket s = Socket::createTcp();
- int port = s.listen(0, 10);
- CPPUNIT_ASSERT(port != 0);
-
- AcceptEvent ae(s.fd());
- ec->postEvent(ae);
- Socket client = Socket::createTcp();
- client.connect("localhost", port);
- CPPUNIT_ASSERT(isNextEvent(ae));
- ae.dispatch();
-
- // Verify client writes are read by the accepted descriptor.
- char readBuf[size];
- ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
- ec->postEvent(re);
- CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello)));
- CPPUNIT_ASSERT(isNextEvent(re));
- re.dispatch();
- CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest);
-
diff --git a/cpp/src/tests/EventChannelThreadsTest.cpp b/cpp/src/tests/EventChannelThreadsTest.cpp
deleted file mode 100644
index 22ea57d675..0000000000
--- a/cpp/src/tests/EventChannelThreadsTest.cpp
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <boost/bind.hpp>
-
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/posix/EventChannelThreads.h"
-#include "qpid_test_plugin.h"
-
-
-using namespace std;
-
-using namespace qpid::sys;
-
-const int nConnections = 5;
-const int nMessages = 10; // Messages read/written per connection.
-
-
-// Accepts + reads + writes.
-const int totalEvents = nConnections+2*nConnections*nMessages;
-
-/**
- * Messages are numbered 0..nMessages.
- * We count the total number of events, and the
- * number of reads and writes for each message number.
- */
-class TestResults : public Monitor {
- public:
- TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
-
- void countEvent() {
- if (--nEventsRemaining == 0)
- shutdown();
- }
-
- void countRead(int messageNo) {
- ++reads[messageNo];
- countEvent();
- }
-
- void countWrite(int messageNo) {
- ++writes[messageNo];
- countEvent();
- }
-
- void shutdown(const std::string& exceptionMsg = std::string()) {
- ScopedLock lock(*this);
- exception = exceptionMsg;
- isShutdown = true;
- notifyAll();
- }
-
- void wait() {
- ScopedLock lock(*this);
- Time deadline = now() + 10*TIME_SEC;
- while (!isShutdown) {
- CPPUNIT_ASSERT(Monitor::wait(deadline));
- }
- }
-
- bool isShutdown;
- std::string exception;
- AtomicCount reads[nMessages];
- AtomicCount writes[nMessages];
- AtomicCount nEventsRemaining;
-};
-
-TestResults results;
-
-EventChannelThreads::shared_ptr threads;
-
-// Functor to wrap callbacks in try/catch.
-class SafeCallback {
- public:
- SafeCallback(Runnable& r) : callback(r.functor()) {}
- SafeCallback(Event::Callback cb) : callback(cb) {}
-
- void operator()() {
- std::string exception;
- try {
- callback();
- return;
- }
- catch (const std::exception& e) {
- exception = e.what();
- }
- catch (...) {
- exception = "Unknown exception.";
- }
- results.shutdown(exception);
- }
-
- private:
- Event::Callback callback;
-};
-
-/** Repost an event N times. */
-class Repost {
- public:
- Repost(int n) : count (n) {}
- virtual ~Repost() {}
-
- void repost(Event* event) {
- if (--count==0) {
- delete event;
- } else {
- threads->postEvent(event);
- }
- }
- private:
- int count;
-};
-
-
-
-/** Repeating read event. */
-class TestReadEvent : public ReadEvent, public Runnable, private Repost {
- public:
- explicit TestReadEvent(int fd=-1) :
- ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
- Repost(nMessages)
- {}
-
- void run() {
- CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize());
- CPPUNIT_ASSERT(0 <= value);
- CPPUNIT_ASSERT(value < nMessages);
- results.countRead(value);
- repost(this);
- }
-
- private:
- int value;
- ReadEvent original;
-};
-
-
-/** Fire and forget write event */
-class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
- public:
- TestWriteEvent(int fd=-1) :
- WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
- Repost(nMessages),
- value(0)
- {}
-
- void run() {
- CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
- results.countWrite(value++);
- repost(this);
- }
-
- private:
- int value;
-};
-
-/** Fire-and-forget Accept event, posts reads on the accepted connection. */
-class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
- public:
- TestAcceptEvent(int fd=-1) :
- AcceptEvent(fd, SafeCallback(*this)),
- Repost(nConnections)
- {}
-
- void run() {
- threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
- results.countEvent();
- repost(this);
- }
-};
-
-class EventChannelThreadsTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(EventChannelThreadsTest);
- CPPUNIT_TEST(testThreads);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void setUp() {
- threads = EventChannelThreads::create(EventChannel::create());
- }
-
- void tearDown() {
- threads.reset();
- }
-
- void testThreads()
- {
- Socket listener = Socket::createTcp();
- int port = listener.listen();
-
- // Post looping accept events, will repost nConnections times.
- // The accept event will automatically post read events.
- threads->postEvent(new TestAcceptEvent(listener.fd()));
-
- // Make connections.
- Socket connections[nConnections];
- for (int i = 0; i < nConnections; ++i) {
- connections[i] = Socket::createTcp();
- connections[i].connect("localhost", port);
- }
-
- // Post looping write events.
- for (int i = 0; i < nConnections; ++i) {
- threads->postEvent(new TestWriteEvent(connections[i].fd()));
- }
-
- // Wait for all events to be dispatched.
- results.wait();
-
- if (!results.exception.empty()) CPPUNIT_FAIL(results.exception);
- CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining));
-
- // Expect a read and write for each messageNo from each connection.
- for (int messageNo = 0; messageNo < nMessages; ++messageNo) {
- CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo]));
- CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo]));
- }
-
- threads->shutdown();
- threads->join();
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest);
-
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index cb060cb2b4..4a1a8d9a66 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -44,7 +44,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
SequenceSet.cpp \
StringUtils.cpp \
IncompleteMessageList.cpp \
- RangeSet.cpp
+ RangeSet.cpp \
+ AtomicValue.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -109,10 +110,6 @@ framing_unit_tests = \
HeaderTest \
SequenceNumberTest
-posix_unit_tests = \
- EventChannelTest \
- EventChannelThreadsTest
-
unit_tests = \
$(broker_unit_tests) \
$(client_unit_tests) \