diff options
Diffstat (limited to 'cpp/lib/common')
-rw-r--r-- | cpp/lib/common/Exception.h | 11 | ||||
-rw-r--r-- | cpp/lib/common/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.cpp | 12 | ||||
-rw-r--r-- | cpp/lib/common/sys/Acceptor.h | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/ConnectionInputHandler.h (renamed from cpp/lib/common/sys/SessionHandler.h) | 6 | ||||
-rw-r--r-- | cpp/lib/common/sys/ConnectionInputHandlerFactory.h (renamed from cpp/lib/common/sys/SessionHandlerFactory.h) | 14 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/APRAcceptor.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/LFSessionContext.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/LFSessionContext.h | 6 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannelAcceptor.cpp | 10 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannelConnection.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannelConnection.h | 10 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/PosixAcceptor.cpp | 2 |
13 files changed, 52 insertions, 39 deletions
diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index f35d427bb0..61bbc0ab5f 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -54,6 +54,17 @@ class Exception : public std::exception typedef boost::shared_ptr<Exception> shared_ptr; }; +struct ChannelException : public qpid::Exception { + u_int16_t code; + ChannelException(u_int16_t _code, std::string _text) + : Exception(_text), code(_code) {} +}; + +struct ConnectionException : public qpid::Exception { + u_int16_t code; + ConnectionException(u_int16_t _code, std::string _text) + : Exception(_text), code(_code) {} +}; } diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index 541145ac97..813c49135e 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -120,8 +120,8 @@ nobase_pkginclude_HEADERS = \ sys/Mutex.h \ sys/Runnable.h \ sys/SessionContext.h \ - sys/SessionHandler.h \ - sys/SessionHandlerFactory.h \ + sys/ConnectionInputHandler.h \ + sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ sys/Socket.h \ sys/Thread.h \ diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp index 1dd3cd4ce9..7e1da505c6 100644 --- a/cpp/lib/common/framing/Requester.cpp +++ b/cpp/lib/common/framing/Requester.cpp @@ -33,13 +33,15 @@ void Requester::sending(AMQRequestBody::Data& request) { void Requester::processed(const AMQResponseBody::Data& response) { responseMark = response.responseId; RequestId id = response.requestId; - RequestId end = id + response.batchOffset; + RequestId end = id + response.batchOffset + 1; for ( ; id < end; ++id) { std::set<RequestId>::iterator i = requests.find(id); - if (i == requests.end()) - // TODO aconway 2007-01-12: Verify this is the right exception. - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); - requests.erase(i); + if (i != requests.end()) + requests.erase(i); + else { + // FIXME aconway 2007-01-16: Uncomment exception when ids are propagating. +// THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response."); + } } } diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h index e6bc27a593..f571dcbddd 100644 --- a/cpp/lib/common/sys/Acceptor.h +++ b/cpp/lib/common/sys/Acceptor.h @@ -28,7 +28,7 @@ namespace qpid { namespace sys { -class SessionHandlerFactory; +class ConnectionInputHandlerFactory; class Acceptor : public qpid::SharedObject<Acceptor> { @@ -36,7 +36,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); virtual ~Acceptor() = 0; virtual int16_t getPort() const = 0; - virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/lib/common/sys/SessionHandler.h b/cpp/lib/common/sys/ConnectionInputHandler.h index 76f79d421d..fa70dfaf48 100644 --- a/cpp/lib/common/sys/SessionHandler.h +++ b/cpp/lib/common/sys/ConnectionInputHandler.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionHandler_ -#define _SessionHandler_ +#ifndef _ConnectionInputHandler_ +#define _ConnectionInputHandler_ #include <InputHandler.h> #include <InitiationHandler.h> @@ -29,7 +29,7 @@ namespace qpid { namespace sys { - class SessionHandler : + class ConnectionInputHandler : public qpid::framing::InitiationHandler, public qpid::framing::InputHandler, public TimeoutHandler diff --git a/cpp/lib/common/sys/SessionHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h index 2a01aebcb0..5bb5e17704 100644 --- a/cpp/lib/common/sys/SessionHandlerFactory.h +++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionHandlerFactory_ -#define _SessionHandlerFactory_ +#ifndef _ConnectionInputHandlerFactory_ +#define _ConnectionInputHandlerFactory_ #include <boost/noncopyable.hpp> @@ -27,17 +27,17 @@ namespace qpid { namespace sys { class SessionContext; -class SessionHandler; +class ConnectionInputHandler; /** * Callback interface used by the Acceptor to - * create a SessionHandler for each new connection. + * create a ConnectionInputHandler for each new connection. */ -class SessionHandlerFactory : private boost::noncopyable +class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual SessionHandler* create(SessionContext* ctxt) = 0; - virtual ~SessionHandlerFactory(){} + virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0; + virtual ~ConnectionInputHandlerFactory(){} }; }} diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 6853833797..10f787f4fe 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -19,7 +19,7 @@ * */ #include <sys/Acceptor.h> -#include <sys/SessionHandlerFactory.h> +#include <sys/ConnectionInputHandlerFactory.h> #include "LFProcessor.h" #include "LFSessionContext.h" #include "APRBase.h" @@ -33,7 +33,7 @@ class APRAcceptor : public Acceptor public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); virtual int16_t getPort() const; - virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); virtual void shutdown(); private: @@ -75,7 +75,7 @@ int16_t APRAcceptor::getPort() const { return address->port; } -void APRAcceptor::run(SessionHandlerFactory* factory) { +void APRAcceptor::run(ConnectionInputHandlerFactory* factory) { running = true; processor.start(); std::cout << "Listening on port " << getPort() << "..." << std::endl; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp index 7fb8d5a91b..43fc3de3dd 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -160,7 +160,7 @@ void LFSessionContext::shutdown(){ handleClose(); } -void LFSessionContext::init(SessionHandler* _handler){ +void LFSessionContext::init(ConnectionInputHandler* _handler){ handler = _handler; processor->add(&fd); } diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index 9483cbe590..8cf50b87ba 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -31,7 +31,7 @@ #include <Buffer.h> #include <sys/Monitor.h> #include <sys/SessionContext.h> -#include <sys/SessionHandler.h> +#include <sys/ConnectionInputHandler.h> #include "APRSocket.h" #include "LFProcessor.h" @@ -49,7 +49,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext qpid::framing::Buffer in; qpid::framing::Buffer out; - qpid::sys::SessionHandler* handler; + qpid::sys::ConnectionInputHandler* handler; LFProcessor* const processor; apr_pollfd_t fd; @@ -74,7 +74,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext virtual void close(); void read(); void write(); - void init(qpid::sys::SessionHandler* handler); + void init(qpid::sys::ConnectionInputHandler* handler); void startProcessing(); void stopProcessing(); void handleClose(); diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 7cd6f60902..787d12d6d1 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -27,8 +27,8 @@ #include <boost/scoped_ptr.hpp> #include <sys/SessionContext.h> -#include <sys/SessionHandler.h> -#include <sys/SessionHandlerFactory.h> +#include <sys/ConnectionInputHandler.h> +#include <sys/ConnectionInputHandlerFactory.h> #include <sys/Acceptor.h> #include <sys/Socket.h> #include <framing/Buffer.h> @@ -53,7 +53,7 @@ class EventChannelAcceptor : public Acceptor { int getPort() const; - void run(SessionHandlerFactory& factory); + void run(ConnectionInputHandlerFactory& factory); void shutdown(); @@ -68,7 +68,7 @@ class EventChannelAcceptor : public Acceptor { bool isRunning; boost::ptr_vector<EventChannelConnection> connections; AcceptEvent acceptEvent; - SessionHandlerFactory* factory; + ConnectionInputHandlerFactory* factory; bool isShutdown; EventChannelThreads::shared_ptr threads; }; @@ -100,7 +100,7 @@ int EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } -void EventChannelAcceptor::run(SessionHandlerFactory& f) { +void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) { { Mutex::ScopedLock l(lock); if (!isRunning && !isShutdown) { diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp index 196dde5af8..4449dc3035 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp +++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp @@ -22,7 +22,7 @@ #include <boost/assert.hpp> #include "EventChannelConnection.h" -#include "sys/SessionHandlerFactory.h" +#include "sys/ConnectionInputHandlerFactory.h" #include "QpidError.h" using namespace std; @@ -36,7 +36,7 @@ const size_t EventChannelConnection::bufferSize = 65536; EventChannelConnection::EventChannelConnection( EventChannelThreads::shared_ptr threads_, - SessionHandlerFactory& factory_, + ConnectionInputHandlerFactory& factory_, int rfd, int wfd, bool isTrace_ diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h index bace045993..1504e92651 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.h +++ b/cpp/lib/common/sys/posix/EventChannelConnection.h @@ -24,17 +24,17 @@ #include "EventChannelThreads.h" #include "sys/Monitor.h" #include "sys/SessionContext.h" -#include "sys/SessionHandler.h" +#include "sys/ConnectionInputHandler.h" #include "sys/AtomicCount.h" #include "framing/AMQFrame.h" namespace qpid { namespace sys { -class SessionHandlerFactory; +class ConnectionInputHandlerFactory; /** - * Implements SessionContext and delegates to a SessionHandler + * Implements SessionContext and delegates to a ConnectionInputHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, @@ -44,7 +44,7 @@ class EventChannelConnection : public SessionContext { public: EventChannelConnection( EventChannelThreads::shared_ptr threads, - SessionHandlerFactory& factory, + ConnectionInputHandlerFactory& factory, int readDescriptor, int writeDescriptor = 0, bool isTrace = false @@ -86,7 +86,7 @@ class EventChannelConnection : public SessionContext { AtomicCount busyThreads; EventChannelThreads::shared_ptr threads; - std::auto_ptr<SessionHandler> handler; + std::auto_ptr<ConnectionInputHandler> handler; qpid::framing::Buffer in, out; FrameQueue writeFrames; bool isTrace; diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp index 842aa76f36..a80a6c61f7 100644 --- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp +++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -32,7 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } class PosixAcceptor : public Acceptor { public: virtual int16_t getPort() const { fail(); return 0; } - virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } + virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } virtual void shutdown() { fail(); } }; |