summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
committerAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
commitf61e1ef7589da893b9b54448224dc0961515eb40 (patch)
tree258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/sys
parentc5294d471ade7a18c52ca7d4028a494011c82293 (diff)
downloadqpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp3
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h90
-rw-r--r--cpp/src/qpid/sys/ScopedIncrement.h6
-rw-r--r--cpp/src/qpid/sys/Serializer.h4
-rw-r--r--cpp/src/qpid/sys/StateMonitor.h78
-rw-r--r--cpp/src/qpid/sys/Waitable.h73
-rw-r--r--cpp/src/qpid/sys/apr/APRBase.cpp1
-rw-r--r--cpp/src/qpid/sys/apr/APRBase.h8
-rw-r--r--cpp/src/qpid/sys/posix/Shlib.cpp9
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp4
-rw-r--r--cpp/src/qpid/sys/posix/check.cpp39
-rw-r--r--cpp/src/qpid/sys/posix/check.h36
12 files changed, 215 insertions, 136 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 733a892cff..eccfb1465e 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -29,6 +29,7 @@
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -301,7 +302,7 @@ void AsynchIOHandler::idle(AsynchIO&){
}
// If frame was egregiously large complain
if (frameSize > buff->byteCount)
- THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
buff->dataCount = buffUsed;
aio->queueWrite(buff);
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index 917afc5704..cf8199954e 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
#include "qpid/sys/ScopedIncrement.h"
#include <boost/bind.hpp>
@@ -39,73 +39,73 @@ namespace sys {
*
* Also allows consuming threads to wait until an item is available.
*/
-template <class T> class ConcurrentQueue {
+template <class T> class ConcurrentQueue : public Waitable {
public:
- ConcurrentQueue() : waiters(0), shutdown(false) {}
+ struct ShutdownException {};
+
+ ConcurrentQueue() : shutdownFlag(false) {}
- /** Threads in wait() are woken with ShutdownException before
- * destroying the queue.
- */
- ~ConcurrentQueue() {
- Mutex::ScopedLock l(lock);
- shutdown = true;
- lock.notifyAll();
- while (waiters > 0)
- lock.wait();
+ /** Waiting threads are notified by ~Waitable */
+ ~ConcurrentQueue() { shutdown(); }
+
+ bool shutdown(bool wait=true) {
+ ScopedLock l(lock);
+ if (!shutdownFlag) {
+ shutdownFlag=true;
+ lock.notifyAll();
+ if (wait) lock.waitAll();
+ shutdownFlag=true;
+ return true;
+ }
+ return false;
}
-
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
queue.push_back(data);
+ lock.notify();
}
/** If the queue is non-empty, pop the front item into data and
* return true. If the queue is empty, return false
*/
- bool pop(T& data) {
+ bool tryPop(T& data) {
Mutex::ScopedLock l(lock);
- return popInternal(data);
+ if (shutdownFlag || queue.empty())
+ return false;
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
- /** Wait up to deadline for a data item to be available.
- *@return true if data was available, false if timed out.
+ /** Wait up to a timeout for a data item to be available.
+ *@return true if data was available, false if timed out or shut down.
*@throws ShutdownException if the queue is destroyed.
*/
- bool waitPop(T& data, Duration timeout) {
- Mutex::ScopedLock l(lock);
- ScopedIncrement<size_t> w(
- waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ bool waitPop(T& data, Duration timeout=TIME_INFINITE) {
+ ScopedLock l(lock);
AbsTime deadline(now(), timeout);
- while (queue.empty() && lock.wait(deadline))
- ;
- return popInternal(data);
- }
-
- private:
-
- bool popInternal(T& data) {
- if (shutdown)
- throw ShutdownException();
+ {
+ ScopedWait(*this);
+ while (!shutdownFlag && queue.empty())
+ if (!lock.wait(deadline))
+ return false;
+ }
if (queue.empty())
return false;
- else {
- data = queue.front();
- queue.pop_front();
- return true;
- }
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
+
+ bool isShutdown() { ScopedLock l(lock); return shutdownFlag; }
- void noWaiters() {
- assert(waiters == 0);
- if (shutdown)
- lock.notify(); // Notify dtor thread.
- }
-
- Monitor lock;
+ protected:
+ Waitable lock;
+ private:
std::deque<T> queue;
- size_t waiters;
- bool shutdown;
+ bool shutdownFlag;
};
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h
index ba9e89ba5f..8645ab2484 100644
--- a/cpp/src/qpid/sys/ScopedIncrement.h
+++ b/cpp/src/qpid/sys/ScopedIncrement.h
@@ -30,17 +30,17 @@ namespace sys {
* Optionally call a function if the decremented counter value is 0.
* Note the function must not throw, it is called in the destructor.
*/
-template <class T>
+template <class T, class F=boost::function<void()> >
class ScopedIncrement : boost::noncopyable
{
public:
- ScopedIncrement(T& c, boost::function0<void> f=0)
+ ScopedIncrement(T& c, F f=0)
: count(c), callback(f) { ++count; }
~ScopedIncrement() { if (--count == 0 && callback) callback(); }
private:
T& count;
- boost::function0<void> callback;
+ F callback;
};
diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h
index 085d51d7e2..7bb3b07ae0 100644
--- a/cpp/src/qpid/sys/Serializer.h
+++ b/cpp/src/qpid/sys/Serializer.h
@@ -23,7 +23,7 @@
*
*/
-
+#include "qpid/Exception.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -41,6 +41,8 @@ class SerializerBase : private boost::noncopyable, private Runnable
{
public:
typedef boost::function<void()> VoidFn0;
+ struct ShutdownException : public Exception {};
+
/** @see Serializer::Serializer */
SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h
new file mode 100644
index 0000000000..5a92756f3a
--- /dev/null
+++ b/cpp/src/qpid/sys/StateMonitor.h
@@ -0,0 +1,78 @@
+#ifndef QPID_SYS_STATEMONITOR_H
+#define QPID_SYS_STATEMONITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Waitable.h"
+
+#include <bitset>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor with an enum state value.
+ *
+ *@param Enum: enum type to use for states.
+ *@param EnumMax: Highest enum value.
+ */
+template <class Enum, size_t MaxEnum>
+class StateMonitor : public Waitable
+{
+ public:
+ struct Set : public std::bitset<MaxEnum + 1> {
+ Set() {}
+ Set(Enum s) { set(s); }
+ Set(Enum s, Enum t) { set(s).set(t); }
+ Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
+ Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
+ };
+
+
+ StateMonitor(Enum initial) { state=initial; }
+
+ /** @pre Caller holds a ScopedLock. */
+ void set(Enum s) { state=s; notifyAll(); }
+ /** @pre Caller holds a ScopedLock. */
+ StateMonitor& operator=(Enum s) { set(s); return *this; }
+
+ /** @pre Caller holds a ScopedLock. */
+ Enum get() const { return state; }
+ /** @pre Caller holds a ScopedLock. */
+ operator Enum() const { return state; }
+
+ /** @pre Caller holds a ScopedLock */
+ void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
+
+ private:
+ Enum state;
+};
+
+}}
+
+
+#endif /*!QPID_SYS_STATEMONITOR_H*/
diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h
new file mode 100644
index 0000000000..eb71a1d742
--- /dev/null
+++ b/cpp/src/qpid/sys/Waitable.h
@@ -0,0 +1,73 @@
+#ifndef QPID_SYS_WAITABLE_H
+#define QPID_SYS_WAITABLE_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Monitor.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor that keeps track of waiting threads.
+ * Threads that use a WaitLock are counted as waiters, threads that
+ * use a normal ScopedLock are not considered waiters.
+ */
+class Waitable : public Monitor {
+ public:
+ Waitable() : waiters(0) {}
+
+ /** Use this inside a scoped lock around the
+ * call to Monitor::wait to be counted as a waiter
+ */
+ struct ScopedWait {
+ Waitable& w;
+ ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; }
+ ~ScopedWait() { --w.waiters; w.notifyAll(); }
+ };
+
+ /** Block till all waiters have finished waiting.
+ * The calling thread does not count as a waiter.
+ *@pre Must be called inside a ScopedLock but NOT a ScopedWait.
+ */
+ bool waitAll(Duration timeout=TIME_INFINITE) {
+ AbsTime deadline(now(), timeout);
+ while (waiters > 0) {
+ if (!wait(deadline)) {
+ assert(timeout != TIME_INFINITE);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ friend struct ScopedWait;
+ size_t waiters;
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!QPID_SYS_WAITABLE_H*/
diff --git a/cpp/src/qpid/sys/apr/APRBase.cpp b/cpp/src/qpid/sys/apr/APRBase.cpp
index f527e0d0b2..724c489303 100644
--- a/cpp/src/qpid/sys/apr/APRBase.cpp
+++ b/cpp/src/qpid/sys/apr/APRBase.cpp
@@ -20,7 +20,6 @@
*/
#include <iostream>
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
#include "APRBase.h"
using namespace qpid::sys;
diff --git a/cpp/src/qpid/sys/apr/APRBase.h b/cpp/src/qpid/sys/apr/APRBase.h
index c6b1854fb1..7b5644a129 100644
--- a/cpp/src/qpid/sys/apr/APRBase.h
+++ b/cpp/src/qpid/sys/apr/APRBase.h
@@ -24,7 +24,6 @@
#include <string>
#include <apr_thread_mutex.h>
#include <apr_errno.h>
-#include "qpid/QpidError.h"
namespace qpid {
namespace sys {
@@ -64,11 +63,8 @@ namespace sys {
// Inlined as it is called *a lot*
void inline qpid::sys::check(apr_status_t status, const char* file, const int line){
if (status != APR_SUCCESS){
- const int size = 50;
- char tmp[size];
- std::string msg(apr_strerror(status, tmp, size));
- throw qpid::QpidError(APR_ERROR + ((int) status), msg,
- qpid::SrcLine(file, line));
+ char tmp[256];
+ throw Exception(QPID_MSG(apr_strerror(status, tmp, size)))
}
}
diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp
index 2630337408..1552aa06b5 100644
--- a/cpp/src/qpid/sys/posix/Shlib.cpp
+++ b/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -19,8 +19,7 @@
*/
#include "qpid/sys/Shlib.h"
-
-#include <qpid/QpidError.h>
+#include "qpid/Exception.h"
#include <dlfcn.h>
@@ -32,7 +31,7 @@ void Shlib::load(const char* name) {
handle = ::dlopen(name, RTLD_NOW);
const char* error = ::dlerror();
if (error) {
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
}
}
@@ -42,7 +41,7 @@ void Shlib::unload() {
::dlclose(handle);
const char* error = ::dlerror();
if (error) {
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
}
handle = 0;
}
@@ -53,7 +52,7 @@ void* Shlib::getSymbol(const char* name) {
void* sym = ::dlsym(handle, name);
const char* error = ::dlerror();
if (error)
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
return sym;
}
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index c7a83df581..f0cc8cd5a5 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -21,7 +21,6 @@
#include "qpid/sys/Socket.h"
-#include "qpid/QpidError.h"
#include "check.h"
#include "PrivatePosix.h"
@@ -60,8 +59,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const
result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
}
- if (result < 0)
- throw QPID_POSIX_ERROR(errno);
+ QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
diff --git a/cpp/src/qpid/sys/posix/check.cpp b/cpp/src/qpid/sys/posix/check.cpp
deleted file mode 100644
index 408679caa8..0000000000
--- a/cpp/src/qpid/sys/posix/check.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <cerrno>
-#include "check.h"
-
-namespace qpid {
-namespace sys {
-
-std::string
-PosixError::getMessage(int errNo)
-{
- char buf[512];
- return std::string(strerror_r(errNo, buf, sizeof(buf)));
-}
-
-PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
- : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
-{ }
-
-}}
diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h
index 7fa7b69d3b..f864bf8762 100644
--- a/cpp/src/qpid/sys/posix/check.h
+++ b/cpp/src/qpid/sys/posix/check.h
@@ -22,41 +22,13 @@
*
*/
-#include <cerrno>
-#include <string>
-#include "qpid/QpidError.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Exception with message from errno.
- */
-class PosixError : public qpid::QpidError
-{
- public:
- static std::string getMessage(int errNo);
-
- PosixError(int errNo, const qpid::SrcLine& location) throw();
-
- ~PosixError() throw() {}
-
- int getErrNo() { return errNo; }
+#include "qpid/Exception.h"
- Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); }
-
- void throwSelf() const { throw *this; }
-
- private:
- int errNo;
-};
-
-}}
+#include <cerrno>
-/** Create a PosixError for the current file/line and errno. */
-#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO)) << " " << ERRNO)
-/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */
#define QPID_POSIX_CHECK(RESULT) \
if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))