diff options
author | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
commit | f61e1ef7589da893b9b54448224dc0961515eb40 (patch) | |
tree | 258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/sys | |
parent | c5294d471ade7a18c52ca7d4028a494011c82293 (diff) | |
download | qpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz |
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.
client::Session improvements:
- Locking to avoid races between network & user threads.
- Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.
qpid::Exception clean up:
- use QPID_MSG consistently to format exception messages.
- throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
- re-throw correct typed exception on client for exceptions from broker.
- Removed QpidError.h
rubygen/templates/constants.rb:
- constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
- reply_constants.h: Added throwReplyException(code, text)
log::Logger:
- Fixed shutdown race in Statement::~Initializer()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConcurrentQueue.h | 90 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ScopedIncrement.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Serializer.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/StateMonitor.h | 78 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Waitable.h | 73 | ||||
-rw-r--r-- | cpp/src/qpid/sys/apr/APRBase.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/apr/APRBase.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Shlib.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/check.cpp | 39 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/check.h | 36 |
12 files changed, 215 insertions, 136 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 733a892cff..eccfb1465e 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" @@ -301,7 +302,7 @@ void AsynchIOHandler::idle(AsynchIO&){ } // If frame was egregiously large complain if (frameSize > buff->byteCount) - THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); buff->dataCount = buffUsed; aio->queueWrite(buff); diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index 917afc5704..cf8199954e 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,7 @@ * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" #include "qpid/sys/ScopedIncrement.h" #include <boost/bind.hpp> @@ -39,73 +39,73 @@ namespace sys { * * Also allows consuming threads to wait until an item is available. */ -template <class T> class ConcurrentQueue { +template <class T> class ConcurrentQueue : public Waitable { public: - ConcurrentQueue() : waiters(0), shutdown(false) {} + struct ShutdownException {}; + + ConcurrentQueue() : shutdownFlag(false) {} - /** Threads in wait() are woken with ShutdownException before - * destroying the queue. - */ - ~ConcurrentQueue() { - Mutex::ScopedLock l(lock); - shutdown = true; - lock.notifyAll(); - while (waiters > 0) - lock.wait(); + /** Waiting threads are notified by ~Waitable */ + ~ConcurrentQueue() { shutdown(); } + + bool shutdown(bool wait=true) { + ScopedLock l(lock); + if (!shutdownFlag) { + shutdownFlag=true; + lock.notifyAll(); + if (wait) lock.waitAll(); + shutdownFlag=true; + return true; + } + return false; } - + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); queue.push_back(data); + lock.notify(); } /** If the queue is non-empty, pop the front item into data and * return true. If the queue is empty, return false */ - bool pop(T& data) { + bool tryPop(T& data) { Mutex::ScopedLock l(lock); - return popInternal(data); + if (shutdownFlag || queue.empty()) + return false; + data = queue.front(); + queue.pop_front(); + return true; } - /** Wait up to deadline for a data item to be available. - *@return true if data was available, false if timed out. + /** Wait up to a timeout for a data item to be available. + *@return true if data was available, false if timed out or shut down. *@throws ShutdownException if the queue is destroyed. */ - bool waitPop(T& data, Duration timeout) { - Mutex::ScopedLock l(lock); - ScopedIncrement<size_t> w( - waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + bool waitPop(T& data, Duration timeout=TIME_INFINITE) { + ScopedLock l(lock); AbsTime deadline(now(), timeout); - while (queue.empty() && lock.wait(deadline)) - ; - return popInternal(data); - } - - private: - - bool popInternal(T& data) { - if (shutdown) - throw ShutdownException(); + { + ScopedWait(*this); + while (!shutdownFlag && queue.empty()) + if (!lock.wait(deadline)) + return false; + } if (queue.empty()) return false; - else { - data = queue.front(); - queue.pop_front(); - return true; - } + data = queue.front(); + queue.pop_front(); + return true; } + + bool isShutdown() { ScopedLock l(lock); return shutdownFlag; } - void noWaiters() { - assert(waiters == 0); - if (shutdown) - lock.notify(); // Notify dtor thread. - } - - Monitor lock; + protected: + Waitable lock; + private: std::deque<T> queue; - size_t waiters; - bool shutdown; + bool shutdownFlag; }; }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h index ba9e89ba5f..8645ab2484 100644 --- a/cpp/src/qpid/sys/ScopedIncrement.h +++ b/cpp/src/qpid/sys/ScopedIncrement.h @@ -30,17 +30,17 @@ namespace sys { * Optionally call a function if the decremented counter value is 0. * Note the function must not throw, it is called in the destructor. */ -template <class T> +template <class T, class F=boost::function<void()> > class ScopedIncrement : boost::noncopyable { public: - ScopedIncrement(T& c, boost::function0<void> f=0) + ScopedIncrement(T& c, F f=0) : count(c), callback(f) { ++count; } ~ScopedIncrement() { if (--count == 0 && callback) callback(); } private: T& count; - boost::function0<void> callback; + F callback; }; diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h index 085d51d7e2..7bb3b07ae0 100644 --- a/cpp/src/qpid/sys/Serializer.h +++ b/cpp/src/qpid/sys/Serializer.h @@ -23,7 +23,7 @@ * */ - +#include "qpid/Exception.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" @@ -41,6 +41,8 @@ class SerializerBase : private boost::noncopyable, private Runnable { public: typedef boost::function<void()> VoidFn0; + struct ShutdownException : public Exception {}; + /** @see Serializer::Serializer */ SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()); diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h new file mode 100644 index 0000000000..5a92756f3a --- /dev/null +++ b/cpp/src/qpid/sys/StateMonitor.h @@ -0,0 +1,78 @@ +#ifndef QPID_SYS_STATEMONITOR_H +#define QPID_SYS_STATEMONITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Waitable.h" + +#include <bitset> + +namespace qpid { +namespace sys { + +/** + * A monitor with an enum state value. + * + *@param Enum: enum type to use for states. + *@param EnumMax: Highest enum value. + */ +template <class Enum, size_t MaxEnum> +class StateMonitor : public Waitable +{ + public: + struct Set : public std::bitset<MaxEnum + 1> { + Set() {} + Set(Enum s) { set(s); } + Set(Enum s, Enum t) { set(s).set(t); } + Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); } + Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); } + }; + + + StateMonitor(Enum initial) { state=initial; } + + /** @pre Caller holds a ScopedLock. */ + void set(Enum s) { state=s; notifyAll(); } + /** @pre Caller holds a ScopedLock. */ + StateMonitor& operator=(Enum s) { set(s); return *this; } + + /** @pre Caller holds a ScopedLock. */ + Enum get() const { return state; } + /** @pre Caller holds a ScopedLock. */ + operator Enum() const { return state; } + + /** @pre Caller holds a ScopedLock */ + void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); } + + private: + Enum state; +}; + +}} + + +#endif /*!QPID_SYS_STATEMONITOR_H*/ diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h new file mode 100644 index 0000000000..eb71a1d742 --- /dev/null +++ b/cpp/src/qpid/sys/Waitable.h @@ -0,0 +1,73 @@ +#ifndef QPID_SYS_WAITABLE_H +#define QPID_SYS_WAITABLE_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Monitor.h" + +#include <assert.h> + +namespace qpid { +namespace sys { + +/** + * A monitor that keeps track of waiting threads. + * Threads that use a WaitLock are counted as waiters, threads that + * use a normal ScopedLock are not considered waiters. + */ +class Waitable : public Monitor { + public: + Waitable() : waiters(0) {} + + /** Use this inside a scoped lock around the + * call to Monitor::wait to be counted as a waiter + */ + struct ScopedWait { + Waitable& w; + ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; } + ~ScopedWait() { --w.waiters; w.notifyAll(); } + }; + + /** Block till all waiters have finished waiting. + * The calling thread does not count as a waiter. + *@pre Must be called inside a ScopedLock but NOT a ScopedWait. + */ + bool waitAll(Duration timeout=TIME_INFINITE) { + AbsTime deadline(now(), timeout); + while (waiters > 0) { + if (!wait(deadline)) { + assert(timeout != TIME_INFINITE); + return false; + } + } + return true; + } + + private: + friend struct ScopedWait; + size_t waiters; +}; + +}} // namespace qpid::sys + + + +#endif /*!QPID_SYS_WAITABLE_H*/ diff --git a/cpp/src/qpid/sys/apr/APRBase.cpp b/cpp/src/qpid/sys/apr/APRBase.cpp index f527e0d0b2..724c489303 100644 --- a/cpp/src/qpid/sys/apr/APRBase.cpp +++ b/cpp/src/qpid/sys/apr/APRBase.cpp @@ -20,7 +20,6 @@ */ #include <iostream> #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" #include "APRBase.h" using namespace qpid::sys; diff --git a/cpp/src/qpid/sys/apr/APRBase.h b/cpp/src/qpid/sys/apr/APRBase.h index c6b1854fb1..7b5644a129 100644 --- a/cpp/src/qpid/sys/apr/APRBase.h +++ b/cpp/src/qpid/sys/apr/APRBase.h @@ -24,7 +24,6 @@ #include <string> #include <apr_thread_mutex.h> #include <apr_errno.h> -#include "qpid/QpidError.h" namespace qpid { namespace sys { @@ -64,11 +63,8 @@ namespace sys { // Inlined as it is called *a lot* void inline qpid::sys::check(apr_status_t status, const char* file, const int line){ if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw qpid::QpidError(APR_ERROR + ((int) status), msg, - qpid::SrcLine(file, line)); + char tmp[256]; + throw Exception(QPID_MSG(apr_strerror(status, tmp, size))) } } diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp index 2630337408..1552aa06b5 100644 --- a/cpp/src/qpid/sys/posix/Shlib.cpp +++ b/cpp/src/qpid/sys/posix/Shlib.cpp @@ -19,8 +19,7 @@ */ #include "qpid/sys/Shlib.h" - -#include <qpid/QpidError.h> +#include "qpid/Exception.h" #include <dlfcn.h> @@ -32,7 +31,7 @@ void Shlib::load(const char* name) { handle = ::dlopen(name, RTLD_NOW); const char* error = ::dlerror(); if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); } } @@ -42,7 +41,7 @@ void Shlib::unload() { ::dlclose(handle); const char* error = ::dlerror(); if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); } handle = 0; } @@ -53,7 +52,7 @@ void* Shlib::getSymbol(const char* name) { void* sym = ::dlsym(handle, name); const char* error = ::dlerror(); if (error) - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); return sym; } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index c7a83df581..f0cc8cd5a5 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -21,7 +21,6 @@ #include "qpid/sys/Socket.h" -#include "qpid/QpidError.h" #include "check.h" #include "PrivatePosix.h" @@ -60,8 +59,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const result = ::getpeername(fd, (::sockaddr*)&name, &namelen); } - if (result < 0) - throw QPID_POSIX_ERROR(errno); + QPID_POSIX_CHECK(result); char servName[NI_MAXSERV]; char dispName[NI_MAXHOST]; diff --git a/cpp/src/qpid/sys/posix/check.cpp b/cpp/src/qpid/sys/posix/check.cpp deleted file mode 100644 index 408679caa8..0000000000 --- a/cpp/src/qpid/sys/posix/check.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <cerrno> -#include "check.h" - -namespace qpid { -namespace sys { - -std::string -PosixError::getMessage(int errNo) -{ - char buf[512]; - return std::string(strerror_r(errNo, buf, sizeof(buf))); -} - -PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() - : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) -{ } - -}} diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index 7fa7b69d3b..f864bf8762 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -22,41 +22,13 @@ * */ -#include <cerrno> -#include <string> -#include "qpid/QpidError.h" - -namespace qpid { -namespace sys { - -/** - * Exception with message from errno. - */ -class PosixError : public qpid::QpidError -{ - public: - static std::string getMessage(int errNo); - - PosixError(int errNo, const qpid::SrcLine& location) throw(); - - ~PosixError() throw() {} - - int getErrNo() { return errNo; } +#include "qpid/Exception.h" - Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); } - - void throwSelf() const { throw *this; } - - private: - int errNo; -}; - -}} +#include <cerrno> -/** Create a PosixError for the current file/line and errno. */ -#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO)) << " " << ERRNO) -/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */ #define QPID_POSIX_CHECK(RESULT) \ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) |