summaryrefslogtreecommitdiff
path: root/cpp/lib/common
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/common')
-rw-r--r--cpp/lib/common/Exception.cpp46
-rw-r--r--cpp/lib/common/Exception.h97
-rw-r--r--cpp/lib/common/ExceptionHolder.cpp32
-rw-r--r--cpp/lib/common/ExceptionHolder.h67
-rw-r--r--cpp/lib/common/Makefile.am142
-rw-r--r--cpp/lib/common/QpidError.cpp42
-rw-r--r--cpp/lib/common/QpidError.h78
-rw-r--r--cpp/lib/common/SharedObject.h55
-rw-r--r--cpp/lib/common/doxygen_mainpage.h45
-rw-r--r--cpp/lib/common/framing/AMQBody.cpp33
-rw-r--r--cpp/lib/common/framing/AMQBody.h59
-rw-r--r--cpp/lib/common/framing/AMQContentBody.cpp43
-rw-r--r--cpp/lib/common/framing/AMQContentBody.h53
-rw-r--r--cpp/lib/common/framing/AMQDataBlock.h42
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp139
-rw-r--r--cpp/lib/common/framing/AMQFrame.h78
-rw-r--r--cpp/lib/common/framing/AMQHeaderBody.cpp75
-rw-r--r--cpp/lib/common/framing/AMQHeaderBody.h60
-rw-r--r--cpp/lib/common/framing/AMQHeartbeatBody.cpp29
-rw-r--r--cpp/lib/common/framing/AMQHeartbeatBody.h47
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp59
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h84
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.cpp66
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.h78
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp65
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h85
-rw-r--r--cpp/lib/common/framing/BasicHeaderProperties.cpp103
-rw-r--r--cpp/lib/common/framing/BasicHeaderProperties.h97
-rw-r--r--cpp/lib/common/framing/BodyHandler.cpp60
-rw-r--r--cpp/lib/common/framing/BodyHandler.h61
-rw-r--r--cpp/lib/common/framing/Buffer.cpp183
-rw-r--r--cpp/lib/common/framing/Buffer.h86
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp99
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h105
-rw-r--r--cpp/lib/common/framing/FieldTable.cpp150
-rw-r--r--cpp/lib/common/framing/FieldTable.h90
-rw-r--r--cpp/lib/common/framing/FramingContent.cpp75
-rw-r--r--cpp/lib/common/framing/FramingContent.h40
-rw-r--r--cpp/lib/common/framing/HeaderProperties.h46
-rw-r--r--cpp/lib/common/framing/InitiationHandler.cpp24
-rw-r--r--cpp/lib/common/framing/InitiationHandler.h41
-rw-r--r--cpp/lib/common/framing/InputHandler.h39
-rw-r--r--cpp/lib/common/framing/MethodContext.cpp31
-rw-r--r--cpp/lib/common/framing/MethodContext.h80
-rw-r--r--cpp/lib/common/framing/OutputHandler.h39
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.cpp63
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.h54
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.cpp44
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.h57
-rw-r--r--cpp/lib/common/framing/ProtocolVersionException.cpp33
-rw-r--r--cpp/lib/common/framing/ProtocolVersionException.h56
-rw-r--r--cpp/lib/common/framing/Proxy.cpp32
-rw-r--r--cpp/lib/common/framing/Proxy.h51
-rw-r--r--cpp/lib/common/framing/Requester.cpp40
-rw-r--r--cpp/lib/common/framing/Requester.h67
-rw-r--r--cpp/lib/common/framing/Responder.cpp43
-rw-r--r--cpp/lib/common/framing/Responder.h61
-rw-r--r--cpp/lib/common/framing/Value.cpp122
-rw-r--r--cpp/lib/common/framing/Value.h171
-rw-r--r--cpp/lib/common/framing/amqp_framing.h36
-rw-r--r--cpp/lib/common/framing/amqp_types.h57
-rw-r--r--cpp/lib/common/framing/amqp_types_full.h36
-rw-r--r--cpp/lib/common/sys/Acceptor.h47
-rw-r--r--cpp/lib/common/sys/AtomicCount.h53
-rw-r--r--cpp/lib/common/sys/Condition.h128
-rw-r--r--cpp/lib/common/sys/ConnectionInputHandler.h45
-rw-r--r--cpp/lib/common/sys/ConnectionInputHandlerFactory.h46
-rw-r--r--cpp/lib/common/sys/ConnectionOutputHandler.h41
-rw-r--r--cpp/lib/common/sys/Module.h161
-rw-r--r--cpp/lib/common/sys/Monitor.h55
-rw-r--r--cpp/lib/common/sys/Mutex.h165
-rw-r--r--cpp/lib/common/sys/ProducerConsumer.cpp141
-rw-r--r--cpp/lib/common/sys/ProducerConsumer.h165
-rw-r--r--cpp/lib/common/sys/Runnable.cpp32
-rw-r--r--cpp/lib/common/sys/Runnable.h50
-rw-r--r--cpp/lib/common/sys/ScopedIncrement.h59
-rw-r--r--cpp/lib/common/sys/ShutdownHandler.h37
-rw-r--r--cpp/lib/common/sys/Socket.h88
-rw-r--r--cpp/lib/common/sys/Thread.h142
-rw-r--r--cpp/lib/common/sys/ThreadSafeQueue.h98
-rw-r--r--cpp/lib/common/sys/Time.cpp60
-rw-r--r--cpp/lib/common/sys/Time.h58
-rw-r--r--cpp/lib/common/sys/TimeoutHandler.h39
-rw-r--r--cpp/lib/common/sys/apr/APRAcceptor.cpp122
-rw-r--r--cpp/lib/common/sys/apr/APRBase.cpp90
-rw-r--r--cpp/lib/common/sys/apr/APRBase.h78
-rw-r--r--cpp/lib/common/sys/apr/APRPool.cpp41
-rw-r--r--cpp/lib/common/sys/apr/APRPool.h50
-rw-r--r--cpp/lib/common/sys/apr/APRSocket.cpp78
-rw-r--r--cpp/lib/common/sys/apr/APRSocket.h48
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.cpp179
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.h121
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.cpp179
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h90
-rw-r--r--cpp/lib/common/sys/apr/Socket.cpp86
-rw-r--r--cpp/lib/common/sys/apr/Thread.cpp33
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp325
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.h176
-rw-r--r--cpp/lib/common/sys/posix/EventChannelAcceptor.cpp149
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.cpp229
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.h102
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.cpp119
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.h92
-rw-r--r--cpp/lib/common/sys/posix/PosixAcceptor.cpp48
-rw-r--r--cpp/lib/common/sys/posix/Socket.cpp118
-rw-r--r--cpp/lib/common/sys/posix/Thread.cpp28
-rw-r--r--cpp/lib/common/sys/posix/check.cpp39
-rw-r--r--cpp/lib/common/sys/posix/check.h62
108 files changed, 8563 insertions, 0 deletions
diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp
new file mode 100644
index 0000000000..f7d91498e0
--- /dev/null
+++ b/cpp/lib/common/Exception.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <Exception.h>
+
+namespace qpid {
+
+Exception::Exception() throw() {}
+
+Exception::Exception(const std::string& str) throw() : whatStr(str) {}
+
+Exception::Exception(const char* str) throw() : whatStr(str) {}
+
+Exception::~Exception() throw() {}
+
+const char* Exception::what() const throw() { return whatStr.c_str(); }
+
+std::string Exception::toString() const throw() { return whatStr; }
+
+Exception* Exception::clone() const throw() { return new Exception(*this); }
+
+void Exception::throwSelf() const { throw *this; }
+
+ShutdownException::ShutdownException() : Exception("Shut down.") {}
+
+EmptyException::EmptyException() : Exception("Empty.") {}
+
+} // namespace qpid
diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h
new file mode 100644
index 0000000000..b8cd68cf8c
--- /dev/null
+++ b/cpp/lib/common/Exception.h
@@ -0,0 +1,97 @@
+#ifndef _Exception_
+#define _Exception_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <exception>
+#include <string>
+#include <memory>
+#include <boost/shared_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include "amqp_types.h"
+
+namespace qpid
+{
+/**
+ * Exception base class for all Qpid exceptions.
+ */
+class Exception : public std::exception
+{
+ protected:
+ std::string whatStr;
+
+ public:
+ Exception() throw();
+ Exception(const std::string& str) throw();
+ Exception(const char* str) throw();
+ Exception(const std::exception&) throw();
+
+ /** Allow any type that has ostream operator<< to act as message */
+ template <class T>
+ Exception(const T& message)
+ : whatStr(boost::lexical_cast<std::string>(message)) {}
+
+ virtual ~Exception() throw();
+
+ virtual const char* what() const throw();
+ virtual std::string toString() const throw();
+
+ virtual Exception* clone() const throw();
+ virtual void throwSelf() const;
+
+ typedef boost::shared_ptr<Exception> shared_ptr;
+};
+
+struct ChannelException : public Exception {
+ framing::ReplyCode code;
+ template <class T>
+ ChannelException(framing::ReplyCode code_, const T& message)
+ : Exception(message), code(code_) {}
+ void throwSelf() const { throw *this; }
+};
+
+struct ConnectionException : public Exception {
+ framing::ReplyCode code;
+ template <class T>
+ ConnectionException(framing::ReplyCode code_, const T& message)
+ : Exception(message), code(code_) {}
+ void throwSelf() const { throw *this; }
+};
+
+/**
+ * Exception used to indicate that a thread should shut down.
+ * Does not indicate an error that should be signalled to the user.
+ */
+struct ShutdownException : public Exception {
+ ShutdownException();
+ void throwSelf() const { throw *this; }
+};
+
+/** Exception to indicate empty queue or other empty state */
+struct EmptyException : public Exception {
+ EmptyException();
+ void throwSelf() const { throw *this; }
+};
+
+}
+
+#endif /*!_Exception_*/
diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp
new file mode 100644
index 0000000000..de8d7b2487
--- /dev/null
+++ b/cpp/lib/common/ExceptionHolder.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ExceptionHolder.h"
+
+namespace qpid {
+
+ExceptionHolder::ExceptionHolder(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex) {
+ reset(ex->clone());
+ } else {
+ reset(new Exception(e.what()));
+ }
+}
+
+}
diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h
new file mode 100644
index 0000000000..2769455aba
--- /dev/null
+++ b/cpp/lib/common/ExceptionHolder.h
@@ -0,0 +1,67 @@
+#ifndef _qpid_ExceptionHolder_h
+#define _qpid_ExceptionHolder_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <assert.h>
+#include <Exception.h>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+// FIXME aconway 2007-02-20: Not necessary, a simple
+// Exception::shared_ptr will do the job. Remove
+//
+/**
+ * Holder for a heap-allocated exc eption that can be stack allocated
+ * and thrown safely.
+ *
+ * Basically this is a shared_ptr with the Exception functions added
+ * so the catcher need not be aware that it is a pointer rather than a
+ * reference.
+ *
+ * shared_ptr is chosen over auto_ptr because it has normal
+ * copy semantics.
+ */
+class ExceptionHolder : public Exception, public boost::shared_ptr<Exception>
+{
+ public:
+ typedef boost::shared_ptr<Exception> shared_ptr;
+
+ ExceptionHolder() throw() {}
+ ExceptionHolder(Exception* p) throw() : shared_ptr(p) {}
+ ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {}
+
+ ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {}
+ ExceptionHolder(const std::exception& e);
+
+ ~ExceptionHolder() throw() {}
+
+ const char* what() const throw() { return get()->what(); }
+ std::string toString() const throw() { return get()->toString(); }
+ Exception* clone() const throw() { return get()->clone(); }
+ void throwIf() const { if (get()) get()->throwSelf(); }
+ void throwSelf() const { assert(get()); get()->throwSelf(); }
+};
+
+} // namespace qpid
+
+
+
+#endif /*!_qpid_ExceptionHolder_h*/
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
new file mode 100644
index 0000000000..d70adf1186
--- /dev/null
+++ b/cpp/lib/common/Makefile.am
@@ -0,0 +1,142 @@
+AM_CXXFLAGS = $(WARNING_CFLAGS)
+
+INCLUDES = \
+ -I$(top_srcdir)/gen \
+ -I$(top_srcdir)/lib/common/sys \
+ -I$(top_srcdir)/lib/common/framing \
+ $(APR_CXXFLAGS)
+
+apr = sys/apr
+apr_src = \
+ $(apr)/APRAcceptor.cpp \
+ $(apr)/APRBase.cpp \
+ $(apr)/APRPool.cpp \
+ $(apr)/APRSocket.cpp \
+ $(apr)/LFProcessor.cpp \
+ $(apr)/LFSessionContext.cpp \
+ $(apr)/Socket.cpp \
+ $(apr)/Thread.cpp
+apr_hdr = \
+ $(apr)/APRBase.h \
+ $(apr)/APRPool.h \
+ $(apr)/APRSocket.h \
+ $(apr)/LFProcessor.h \
+ $(apr)/LFSessionContext.h
+
+posix = sys/posix
+posix_src = \
+ $(posix)/PosixAcceptor.cpp \
+ $(posix)/Socket.cpp \
+ $(posix)/Thread.cpp \
+ $(posix)/check.cpp \
+ $(posix)/EventChannel.cpp \
+ $(posix)/EventChannelThreads.cpp
+posix_hdr = \
+ $(posix)/check.h \
+ $(posix)/EventChannel.h \
+ $(posix)/EventChannelThreads.h
+
+EXTRA_DIST=$(posix_src) $(posix_hdr)
+platform_src = $(apr_src)
+platform_hdr = $(apr_hdr)
+
+framing = framing
+gen = $(srcdir)/../../gen
+
+lib_LTLIBRARIES = libqpidcommon.la
+libqpidcommon_la_LIBADD = \
+ $(APR_LIBS) \
+ $(LIB_DLOPEN) \
+ $(LIB_CLOCK_GETTIME)
+
+libqpidcommon_la_LDFLAGS = \
+ -version-info \
+ $(LIBTOOL_VERSION_INFO_ARG)
+
+libqpidcommon_la_SOURCES = \
+ $(platform_src) \
+ $(framing)/AMQBody.cpp \
+ $(framing)/AMQRequestBody.cpp \
+ $(framing)/AMQResponseBody.cpp \
+ $(framing)/AMQContentBody.cpp \
+ $(framing)/AMQFrame.cpp \
+ $(framing)/AMQHeaderBody.cpp \
+ $(framing)/AMQHeartbeatBody.cpp \
+ $(framing)/AMQMethodBody.cpp \
+ $(framing)/MethodContext.cpp \
+ $(framing)/BasicHeaderProperties.cpp \
+ $(framing)/BodyHandler.cpp \
+ $(framing)/ChannelAdapter.cpp \
+ $(framing)/Buffer.cpp \
+ $(framing)/FieldTable.cpp \
+ $(framing)/FramingContent.cpp \
+ $(framing)/InitiationHandler.cpp \
+ $(framing)/ProtocolInitiation.cpp \
+ $(framing)/ProtocolVersion.cpp \
+ $(framing)/ProtocolVersionException.cpp \
+ $(framing)/Requester.cpp \
+ $(framing)/Responder.cpp \
+ $(framing)/Value.cpp \
+ $(framing)/Proxy.cpp \
+ $(gen)/AMQP_ClientProxy.cpp \
+ $(gen)/AMQP_HighestVersion.h \
+ $(gen)/AMQP_MethodVersionMap.cpp \
+ $(gen)/AMQP_ServerProxy.cpp \
+ Exception.cpp \
+ ExceptionHolder.cpp \
+ QpidError.cpp \
+ sys/Runnable.cpp \
+ sys/Time.cpp \
+ sys/ProducerConsumer.cpp
+
+nobase_pkginclude_HEADERS = \
+ $(gen)/AMQP_HighestVersion.h \
+ $(platform_hdr) \
+ $(framing)/AMQBody.h \
+ $(framing)/AMQContentBody.h \
+ $(framing)/AMQDataBlock.h \
+ $(framing)/AMQFrame.h \
+ $(framing)/AMQHeaderBody.h \
+ $(framing)/AMQHeartbeatBody.h \
+ $(framing)/AMQMethodBody.h \
+ $(framing)/MethodContext.h \
+ $(framing)/BasicHeaderProperties.h \
+ $(framing)/BodyHandler.h \
+ $(framing)/ChannelAdapter.h \
+ $(framing)/Buffer.h \
+ $(framing)/FieldTable.h \
+ $(framing)/FramingContent.h \
+ $(framing)/HeaderProperties.h \
+ $(framing)/InitiationHandler.h \
+ $(framing)/InputHandler.h \
+ $(framing)/OutputHandler.h \
+ $(framing)/ProtocolInitiation.h \
+ $(framing)/ProtocolVersion.h \
+ $(framing)/ProtocolVersionException.h \
+ $(framing)/Value.h \
+ $(framing)/amqp_framing.h \
+ $(framing)/amqp_types.h \
+ $(framing)/Proxy.h \
+ Exception.h \
+ ExceptionHolder.h \
+ QpidError.h \
+ SharedObject.h \
+ sys/Acceptor.h \
+ sys/AtomicCount.h \
+ sys/Module.h \
+ sys/Monitor.h \
+ sys/Mutex.h \
+ sys/Runnable.h \
+ sys/ConnectionOutputHandler.h \
+ sys/ConnectionInputHandler.h \
+ sys/ConnectionInputHandlerFactory.h \
+ sys/ShutdownHandler.h \
+ sys/Socket.h \
+ sys/Thread.h \
+ sys/Time.h \
+ sys/TimeoutHandler.h \
+ sys/ProducerConsumer.h
+
+
+# Force build during dist phase so help2man will work.
+dist-hook: $(lib_LTLIBRARIES)
diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp
new file mode 100644
index 0000000000..9cd3051d1e
--- /dev/null
+++ b/cpp/lib/common/QpidError.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/format.hpp>
+
+#include <QpidError.h>
+#include <sstream>
+
+using namespace qpid;
+
+QpidError::QpidError() : code(0) {}
+
+QpidError::~QpidError() throw() {}
+
+Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+
+void QpidError::throwSelf() const { throw *this; }
+
+void QpidError::init() {
+ whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)")
+ % code % msg % loc.file % loc.line);
+}
+
+
diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h
new file mode 100644
index 0000000000..2a0395ab79
--- /dev/null
+++ b/cpp/lib/common/QpidError.h
@@ -0,0 +1,78 @@
+#ifndef __QpidError__
+#define __QpidError__
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <memory>
+#include <ostream>
+
+#include <Exception.h>
+
+namespace qpid {
+
+struct SrcLine {
+ public:
+ SrcLine(const std::string& file_="", int line_=0) :
+ file(file_), line(line_) {}
+
+ std::string file;
+ int line;
+};
+
+class QpidError : public Exception
+{
+ public:
+ const int code;
+ SrcLine loc;
+ std::string msg;
+
+ QpidError();
+
+ template <class T>
+ QpidError(int code_, const T& msg_, const SrcLine& loc_) throw()
+ : code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_))
+ { init(); }
+
+ ~QpidError() throw();
+ Exception* clone() const throw();
+ void throwSelf() const;
+
+ private:
+
+ void init();
+};
+
+
+} // namespace qpid
+
+#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
+
+#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
+
+#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
+
+const int PROTOCOL_ERROR = 10000;
+const int APR_ERROR = 20000;
+const int FRAMING_ERROR = 30000;
+const int CLIENT_ERROR = 40000;
+const int INTERNAL_ERROR = 50000;
+
+#endif
diff --git a/cpp/lib/common/SharedObject.h b/cpp/lib/common/SharedObject.h
new file mode 100644
index 0000000000..852a036ab9
--- /dev/null
+++ b/cpp/lib/common/SharedObject.h
@@ -0,0 +1,55 @@
+#ifndef _SharedObject_
+#define _SharedObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+ /**
+ * Template to enforce shared object conventions.
+ * Shared object classes should inherit : public qpid::SharedObject
+ * That ensures Foo:
+ * - has typedef boost::shared_ptr<T> shared_ptr
+ * - has virtual destructor
+ * - is boost::noncopyable (no default copy or assign)
+ * - has a protected default constructor.
+ *
+ * Shared objects should not have public constructors.
+ * Make constructors protected and provide public statc create()
+ * functions that return a shared_ptr.
+ */
+ template <class T>
+ class SharedObject : private boost::noncopyable
+ {
+ public:
+ typedef boost::shared_ptr<T> shared_ptr;
+
+ virtual ~SharedObject() {};
+
+ protected:
+ SharedObject() {}
+ };
+}
+
+#endif /*!_SharedObject_*/
diff --git a/cpp/lib/common/doxygen_mainpage.h b/cpp/lib/common/doxygen_mainpage.h
new file mode 100644
index 0000000000..b354238cd0
--- /dev/null
+++ b/cpp/lib/common/doxygen_mainpage.h
@@ -0,0 +1,45 @@
+// This header file is just for doxygen documentation purposes.
+
+/*!\mainpage Qpid C++ Developer Kit.
+ *
+ *\section intro_sec Introduction
+ *
+ * The <a href=http://incubator.apache.org/qpid/index.html>Qpid project</a> provides implementations of the <a href="http://amqp.org/">AMQP messaging specification</a> in several programming language.
+ *
+ * Qpidc provides APIs and libraries to implement AMQP
+ * clients in C++. Qpidc clients can interact with any compliant AMQP
+ * message broker. The Qpid project also provides an AMQP broker
+ * daemon called qpidd that you can use with your qpidc clients.
+ *
+ *\section install_sec Installation
+ *
+ * If you are installing from the source distribution
+ <pre>
+ > ./configure && make
+ > make install </pre>
+ * This will build and install the client development kit and the broker
+ * in standard places. Use
+ * <code>./configure --help</code> for more options.
+ *
+ * You can also install from RPMs with the <code>rpm -i</code> command.
+ * You will need
+ * - <code>qpidc</code> for core libraries.
+ * - <code>qpidc-devel</code> for header files and developer documentation.
+ * - <code>qpidd</code> for the broker daemon.
+ *
+ *\section getstart_sec Getting Started
+ *
+ * If you have installed in the standard places you should use
+ * these compile flags:
+ *
+ *<code> -I/usr/include/qpidc -I/usr/include/qpidc/framing -I/usr/include/qpidc/sys</code>
+ *
+ * and these link flags:
+ *
+ *<code> -lqpidcommon -lqpidclient</code>
+ *
+ * If you have installed somewhere else you should modify the flags
+ * appropriately.
+ *
+ * See the \ref clientapi "client API module" for more on the client API.
+ */
diff --git a/cpp/lib/common/framing/AMQBody.cpp b/cpp/lib/common/framing/AMQBody.cpp
new file mode 100644
index 0000000000..c7c253beda
--- /dev/null
+++ b/cpp/lib/common/framing/AMQBody.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <AMQBody.h>
+#include <iostream>
+
+std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body)
+{
+ body.print(out);
+ return out;
+}
+
+qpid::framing::AMQBody::~AMQBody() {}
+
+
diff --git a/cpp/lib/common/framing/AMQBody.h b/cpp/lib/common/framing/AMQBody.h
new file mode 100644
index 0000000000..26076956ca
--- /dev/null
+++ b/cpp/lib/common/framing/AMQBody.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include <amqp_types.h>
+#include <Buffer.h>
+
+#ifndef _AMQBody_
+#define _AMQBody_
+
+namespace qpid {
+ namespace framing {
+
+ class AMQBody
+ {
+ public:
+ typedef boost::shared_ptr<AMQBody> shared_ptr;
+
+ virtual ~AMQBody();
+ virtual uint32_t size() const = 0;
+ virtual uint8_t type() const = 0;
+ virtual void encode(Buffer& buffer) const = 0;
+ virtual void decode(Buffer& buffer, uint32_t size) = 0;
+
+ virtual void print(std::ostream& out) const = 0;
+ };
+
+ std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
+
+ enum BodyTypes {
+ METHOD_BODY = 1,
+ HEADER_BODY = 2,
+ CONTENT_BODY = 3,
+ HEARTBEAT_BODY = 8,
+ REQUEST_BODY = 9,
+ RESPONSE_BODY = 10
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQContentBody.cpp b/cpp/lib/common/framing/AMQContentBody.cpp
new file mode 100644
index 0000000000..573c17dade
--- /dev/null
+++ b/cpp/lib/common/framing/AMQContentBody.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQContentBody.h>
+#include <iostream>
+
+qpid::framing::AMQContentBody::AMQContentBody(){
+}
+
+qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){
+}
+
+uint32_t qpid::framing::AMQContentBody::size() const{
+ return data.size();
+}
+void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{
+ buffer.putRawData(data);
+}
+void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){
+ buffer.getRawData(data, _size);
+}
+
+void qpid::framing::AMQContentBody::print(std::ostream& out) const
+{
+ out << "content (" << size() << " bytes)";
+}
diff --git a/cpp/lib/common/framing/AMQContentBody.h b/cpp/lib/common/framing/AMQContentBody.h
new file mode 100644
index 0000000000..c9fa7cde5c
--- /dev/null
+++ b/cpp/lib/common/framing/AMQContentBody.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+
+#ifndef _AMQContentBody_
+#define _AMQContentBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQContentBody : public AMQBody
+{
+ string data;
+
+public:
+ typedef boost::shared_ptr<AMQContentBody> shared_ptr;
+
+ AMQContentBody();
+ AMQContentBody(const string& data);
+ inline virtual ~AMQContentBody(){}
+ inline uint8_t type() const { return CONTENT_BODY; };
+ inline string& getData(){ return data; }
+ uint32_t size() const;
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, uint32_t size);
+ void print(std::ostream& out) const;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQDataBlock.h b/cpp/lib/common/framing/AMQDataBlock.h
new file mode 100644
index 0000000000..2a6f843f1e
--- /dev/null
+++ b/cpp/lib/common/framing/AMQDataBlock.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Buffer.h>
+
+#ifndef _AMQDataBlock_
+#define _AMQDataBlock_
+
+namespace qpid {
+namespace framing {
+
+class AMQDataBlock
+{
+public:
+ virtual ~AMQDataBlock() {}
+ virtual void encode(Buffer& buffer) = 0;
+ virtual bool decode(Buffer& buffer) = 0;
+ virtual uint32_t size() const = 0;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
new file mode 100644
index 0000000000..bc9061b169
--- /dev/null
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -0,0 +1,139 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/format.hpp>
+
+#include <AMQFrame.h>
+#include <QpidError.h>
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+
+namespace qpid {
+namespace framing {
+
+
+AMQP_MethodVersionMap AMQFrame::versionMap;
+
+AMQFrame::AMQFrame(ProtocolVersion _version):
+version(_version)
+ {
+ assert(version != ProtocolVersion(0,0));
+ }
+
+AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) :
+version(_version), channel(_channel), body(_body)
+{}
+
+AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) :
+version(_version), channel(_channel), body(_body)
+{}
+
+AMQFrame::~AMQFrame() {}
+
+uint16_t AMQFrame::getChannel(){
+ return channel;
+}
+
+AMQBody::shared_ptr AMQFrame::getBody(){
+ return body;
+}
+
+void AMQFrame::encode(Buffer& buffer)
+{
+ buffer.putOctet(body->type());
+ buffer.putShort(channel);
+ buffer.putLong(body->size());
+ body->encode(buffer);
+ buffer.putOctet(0xCE);
+}
+
+uint32_t AMQFrame::size() const{
+ assert(body.get());
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size()
+ + 1/*0xCE*/;
+}
+
+bool AMQFrame::decode(Buffer& buffer)
+{
+ if(buffer.available() < 7)
+ return false;
+ buffer.record();
+ uint32_t frameSize = decodeHead(buffer);
+ if(buffer.available() < frameSize + 1){
+ buffer.restore();
+ return false;
+ }
+ decodeBody(buffer, frameSize);
+ uint8_t end = buffer.getOctet();
+ if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+ return true;
+}
+
+uint32_t AMQFrame::decodeHead(Buffer& buffer){
+ type = buffer.getOctet();
+ channel = buffer.getShort();
+ return buffer.getLong();
+}
+
+void AMQFrame::decodeBody(Buffer& buffer, uint32_t size)
+{
+ switch(type)
+ {
+ case METHOD_BODY:
+ body = AMQMethodBody::create(versionMap, version, buffer);
+ break;
+ case REQUEST_BODY:
+ body = AMQRequestBody::create(versionMap, version, buffer);
+ break;
+ case RESPONSE_BODY:
+ body = AMQResponseBody::create(versionMap, version, buffer);
+ break;
+ case HEADER_BODY:
+ body = AMQBody::shared_ptr(new AMQHeaderBody());
+ break;
+ case CONTENT_BODY:
+ body = AMQBody::shared_ptr(new AMQContentBody());
+ break;
+ case HEARTBEAT_BODY:
+ body = AMQBody::shared_ptr(new AMQHeartbeatBody());
+ break;
+ default:
+ THROW_QPID_ERROR(
+ FRAMING_ERROR,
+ boost::format("Unknown frame type %d") % type);
+ }
+ body->decode(buffer, size);
+}
+
+std::ostream& operator<<(std::ostream& out, const AMQFrame& t)
+{
+ out << "Frame[channel=" << t.channel << "; ";
+ if (t.body.get() == 0)
+ out << "empty";
+ else
+ out << *t.body;
+ out << "]";
+ return out;
+}
+
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
new file mode 100644
index 0000000000..0c18e0c2a5
--- /dev/null
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -0,0 +1,78 @@
+#ifndef _AMQFrame_
+#define _AMQFrame_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/cast.hpp>
+
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <AMQDataBlock.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
+#include <AMQP_MethodVersionMap.h>
+#include <AMQP_HighestVersion.h>
+#include <Buffer.h>
+
+namespace qpid {
+namespace framing {
+
+
+class AMQFrame : public AMQDataBlock
+{
+ public:
+ AMQFrame(ProtocolVersion _version = highestProtocolVersion);
+ AMQFrame(ProtocolVersion _version, uint16_t channel, AMQBody* body);
+ AMQFrame(ProtocolVersion _version, uint16_t channel, const AMQBody::shared_ptr& body);
+ virtual ~AMQFrame();
+ virtual void encode(Buffer& buffer);
+ virtual bool decode(Buffer& buffer);
+ virtual uint32_t size() const;
+ uint16_t getChannel();
+ AMQBody::shared_ptr getBody();
+
+ /** Convenience template to cast the body to an expected type */
+ template <class T> boost::shared_ptr<T> castBody() {
+ assert(dynamic_cast<T*>(getBody().get()));
+ boost::static_pointer_cast<T>(getBody());
+ }
+
+ uint32_t decodeHead(Buffer& buffer);
+ void decodeBody(Buffer& buffer, uint32_t size);
+
+ private:
+ static AMQP_MethodVersionMap versionMap;
+ ProtocolVersion version;
+
+ uint16_t channel;
+ uint8_t type;
+ AMQBody::shared_ptr body;
+
+
+ friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body);
+};
+
+}} // namespace qpid::framing
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQHeaderBody.cpp b/cpp/lib/common/framing/AMQHeaderBody.cpp
new file mode 100644
index 0000000000..3ddae4eebf
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeaderBody.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQHeaderBody.h>
+#include <QpidError.h>
+#include <BasicHeaderProperties.h>
+
+qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){
+ createProperties(classId);
+}
+
+qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){
+}
+
+qpid::framing::AMQHeaderBody::~AMQHeaderBody(){
+ delete properties;
+}
+
+uint32_t qpid::framing::AMQHeaderBody::size() const{
+ return 12 + properties->size();
+}
+
+void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
+ buffer.putShort(properties->classId());
+ buffer.putShort(weight);
+ buffer.putLongLong(contentSize);
+ properties->encode(buffer);
+}
+
+void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
+ uint16_t classId = buffer.getShort();
+ weight = buffer.getShort();
+ contentSize = buffer.getLongLong();
+ createProperties(classId);
+ properties->decode(buffer, bufSize - 12);
+}
+
+void qpid::framing::AMQHeaderBody::createProperties(int classId){
+ switch(classId){
+ case BASIC:
+ properties = new qpid::framing::BasicHeaderProperties();
+ break;
+ default:
+ THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class");
+ }
+}
+
+void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
+{
+ out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
+ const BasicHeaderProperties* props =
+ dynamic_cast<const BasicHeaderProperties*>(getProperties());
+ if (props) {
+ out << ", message_id=" << props->getMessageId();
+ out << ", delivery_mode=" << (int) props->getDeliveryMode();
+ out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders();
+ }
+}
diff --git a/cpp/lib/common/framing/AMQHeaderBody.h b/cpp/lib/common/framing/AMQHeaderBody.h
new file mode 100644
index 0000000000..d57f93aacd
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeaderBody.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+#include <HeaderProperties.h>
+
+#ifndef _AMQHeaderBody_
+#define _AMQHeaderBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQHeaderBody : public AMQBody
+{
+ HeaderProperties* properties;
+ uint16_t weight;
+ uint64_t contentSize;
+
+ void createProperties(int classId);
+public:
+ typedef boost::shared_ptr<AMQHeaderBody> shared_ptr;
+
+ AMQHeaderBody(int classId);
+ AMQHeaderBody();
+ inline uint8_t type() const { return HEADER_BODY; }
+ HeaderProperties* getProperties(){ return properties; }
+ const HeaderProperties* getProperties() const { return properties; }
+ inline uint64_t getContentSize() const { return contentSize; }
+ inline void setContentSize(uint64_t _size) { contentSize = _size; }
+ virtual ~AMQHeaderBody();
+ virtual uint32_t size() const;
+ virtual void encode(Buffer& buffer) const;
+ virtual void decode(Buffer& buffer, uint32_t size);
+ virtual void print(std::ostream& out) const;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.cpp b/cpp/lib/common/framing/AMQHeartbeatBody.cpp
new file mode 100644
index 0000000000..63f83a3d29
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeartbeatBody.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <AMQHeartbeatBody.h>
+#include <iostream>
+
+qpid::framing::AMQHeartbeatBody::~AMQHeartbeatBody() {}
+
+void qpid::framing::AMQHeartbeatBody::print(std::ostream& out) const {
+ out << "heartbeat";
+}
diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.h b/cpp/lib/common/framing/AMQHeartbeatBody.h
new file mode 100644
index 0000000000..a3e9d823f1
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeartbeatBody.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+
+#ifndef _AMQHeartbeatBody_
+#define _AMQHeartbeatBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQHeartbeatBody : public AMQBody
+{
+public:
+ typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr;
+
+ virtual ~AMQHeartbeatBody();
+ inline uint32_t size() const { return 0; }
+ inline uint8_t type() const { return HEARTBEAT_BODY; }
+ inline void encode(Buffer& ) const {}
+ inline void decode(Buffer& , uint32_t /*size*/) {}
+ virtual void print(std::ostream& out) const;
+};
+
+}
+}
+
+#endif
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
new file mode 100644
index 0000000000..23502068f5
--- /dev/null
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQFrame.h>
+#include <AMQMethodBody.h>
+#include <QpidError.h>
+#include "AMQP_MethodVersionMap.h"
+
+namespace qpid {
+namespace framing {
+
+void AMQMethodBody::encodeId(Buffer& buffer) const{
+ buffer.putShort(amqpClassId());
+ buffer.putShort(amqpMethodId());
+}
+
+void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){
+ assert(0);
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
+}
+
+AMQMethodBody::shared_ptr AMQMethodBody::create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer)
+{
+ ClassMethodId id;
+ id.decode(buffer);
+ return AMQMethodBody::shared_ptr(
+ versionMap.createMethodBody(
+ id.classId, id.methodId, version.getMajor(), version.getMinor()));
+}
+
+void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) {
+ classId = buffer.getShort();
+ methodId = buffer.getShort();
+}
+
+void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) {
+ decodeContent(buffer);
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
new file mode 100644
index 0000000000..c2b00c2169
--- /dev/null
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -0,0 +1,84 @@
+#ifndef _AMQMethodBody_
+#define _AMQMethodBody_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+#include <AMQP_ServerOperations.h>
+#include <MethodContext.h>
+
+namespace qpid {
+namespace framing {
+
+class AMQP_MethodVersionMap;
+
+class AMQMethodBody : public AMQBody
+{
+ public:
+ typedef boost::shared_ptr<AMQMethodBody> shared_ptr;
+
+ static shared_ptr create(
+ AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf);
+
+ ProtocolVersion version;
+ uint8_t type() const { return METHOD_BODY; }
+ AMQMethodBody(uint8_t major, uint8_t minor) : version(major, minor) {}
+ AMQMethodBody(ProtocolVersion ver) : version(ver) {}
+ virtual ~AMQMethodBody() {}
+ void decode(Buffer&, uint32_t);
+
+ virtual MethodId amqpMethodId() const = 0;
+ virtual ClassId amqpClassId() const = 0;
+
+ virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
+
+ template <class T> bool isA() {
+ return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;
+ }
+
+ /** Return request ID or response correlationID */
+ virtual RequestId getRequestId() const { return 0; }
+
+ virtual bool isRequest() const { return false; }
+ virtual bool isResponse() const { return false; }
+
+ protected:
+ static uint32_t baseSize() { return 4; }
+
+ struct ClassMethodId {
+ uint16_t classId;
+ uint16_t methodId;
+ void decode(Buffer& b);
+ };
+
+ void encodeId(Buffer& buffer) const;
+ virtual void encodeContent(Buffer& buffer) const = 0;
+ virtual void decodeContent(Buffer& buffer) = 0;
+};
+
+
+}} // namespace qpid::framing
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp
new file mode 100644
index 0000000000..54e1c11863
--- /dev/null
+++ b/cpp/lib/common/framing/AMQRequestBody.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQRequestBody.h"
+#include "AMQP_MethodVersionMap.h"
+
+namespace qpid {
+namespace framing {
+
+void AMQRequestBody::Data::encode(Buffer& buffer) const {
+ buffer.putLongLong(requestId);
+ buffer.putLongLong(responseMark);
+ buffer.putLong(0); // Reserved long in spec.
+}
+
+void AMQRequestBody::Data::decode(Buffer& buffer) {
+ requestId = buffer.getLongLong();
+ responseMark = buffer.getLongLong();
+ buffer.getLong(); // Ignore reserved long.
+}
+
+void AMQRequestBody::encode(Buffer& buffer) const {
+ data.encode(buffer);
+ encodeId(buffer);
+ encodeContent(buffer);
+}
+
+AMQRequestBody::shared_ptr
+AMQRequestBody::create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer)
+{
+ ClassMethodId id;
+ Data data;
+ data.decode(buffer);
+ id.decode(buffer);
+ AMQRequestBody* body = dynamic_cast<AMQRequestBody*>(
+ versionMap.createMethodBody(
+ id.classId, id.methodId, version.getMajor(), version.getMinor()));
+ assert(body);
+ body->data = data;
+ return AMQRequestBody::shared_ptr(body);
+}
+
+void AMQRequestBody::printPrefix(std::ostream& out) const {
+ out << "request(id=" << data.requestId << ",mark="
+ << data.responseMark << "): ";
+}
+
+}} // namespace qpid::framing
+
diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h
new file mode 100644
index 0000000000..e184fff1d6
--- /dev/null
+++ b/cpp/lib/common/framing/AMQRequestBody.h
@@ -0,0 +1,78 @@
+#ifndef _framing_AMQRequestBody_h
+#define _framing_AMQRequestBody_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Body of a request method frame.
+ */
+class AMQRequestBody : public AMQMethodBody
+{
+ public:
+ typedef boost::shared_ptr<AMQRequestBody> shared_ptr;
+
+ struct Data {
+ Data(RequestId id=0, ResponseId mark=0)
+ : requestId(id), responseMark(mark) {}
+ void encode(Buffer&) const;
+ void decode(Buffer&);
+
+ RequestId requestId;
+ ResponseId responseMark;
+ };
+
+ static Data& getData(const AMQBody::shared_ptr& body) {
+ return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData();
+ }
+
+ static shared_ptr create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer);
+
+ AMQRequestBody(ProtocolVersion v, RequestId id=0, ResponseId mark=0)
+ : AMQMethodBody(v), data(id, mark) {}
+
+ uint8_t type() const { return REQUEST_BODY; }
+ void encode(Buffer& buffer) const;
+
+ Data& getData() { return data; }
+ RequestId getRequestId() const { return data.requestId; }
+ ResponseId getResponseMark() const { return data.responseMark; }
+ void setRequestId(RequestId id) { data.requestId=id; }
+ void setResponseMark(ResponseId mark) { data.responseMark=mark; }
+
+ bool isRequest()const { return true; }
+ protected:
+ static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; }
+ void printPrefix(std::ostream& out) const;
+
+ private:
+ Data data;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_AMQRequestBody_h*/
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
new file mode 100644
index 0000000000..7da71a5d25
--- /dev/null
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQFrame.h"
+#include "AMQResponseBody.h"
+#include "AMQP_MethodVersionMap.h"
+
+namespace qpid {
+namespace framing {
+
+void AMQResponseBody::Data::encode(Buffer& buffer) const {
+ buffer.putLongLong(responseId);
+ buffer.putLongLong(requestId);
+ buffer.putLong(batchOffset);
+}
+
+void AMQResponseBody::Data::decode(Buffer& buffer) {
+ responseId = buffer.getLongLong();
+ requestId = buffer.getLongLong();
+ batchOffset = buffer.getLong();
+}
+
+void AMQResponseBody::encode(Buffer& buffer) const {
+ data.encode(buffer);
+ encodeId(buffer);
+ encodeContent(buffer);
+}
+
+AMQResponseBody::shared_ptr AMQResponseBody::create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer)
+{
+ ClassMethodId id;
+ Data data;
+ data.decode(buffer);
+ id.decode(buffer);
+ AMQResponseBody* body = dynamic_cast<AMQResponseBody*>(
+ versionMap.createMethodBody(
+ id.classId, id.methodId, version.getMajor(), version.getMinor()));
+ assert(body);
+ body->data = data;
+ return AMQResponseBody::shared_ptr(body);
+}
+
+void AMQResponseBody::printPrefix(std::ostream& out) const {
+ out << "response(id=" << data.responseId << ",request=" << data.requestId
+ << ",batch=" << data.batchOffset << "): ";
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
new file mode 100644
index 0000000000..fa381baddd
--- /dev/null
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -0,0 +1,85 @@
+#ifndef _framing_AMQResponseBody_h
+#define _framing_AMQResponseBody_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQMethodBody.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQP_MethodVersionMap;
+
+/**
+ * Body of a response method frame.
+ */
+class AMQResponseBody : public AMQMethodBody
+{
+
+ public:
+ typedef boost::shared_ptr<AMQResponseBody> shared_ptr;
+
+ struct Data {
+ Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : responseId(id), requestId(req), batchOffset(off) {}
+ void encode(Buffer&) const;
+ void decode(Buffer&);
+
+ uint64_t responseId;
+ uint64_t requestId;
+ uint32_t batchOffset;
+ };
+
+ static Data& getData(const AMQBody::shared_ptr& body) {
+ return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData();
+ }
+
+ static shared_ptr create(
+ AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
+ Buffer& buffer);
+
+ AMQResponseBody(
+ ProtocolVersion v, ResponseId id=0, RequestId req=0, BatchOffset off=0)
+ : AMQMethodBody(v), data(id, req, off) {}
+
+ uint8_t type() const { return RESPONSE_BODY; }
+ void encode(Buffer& buffer) const;
+
+ Data& getData() { return data; }
+ ResponseId getResponseId() const { return data.responseId; }
+ RequestId getRequestId() const { return data.requestId; }
+ BatchOffset getBatchOffset() const { return data.batchOffset; }
+ void setResponseId(ResponseId id) { data.responseId = id; }
+ void setRequestId(RequestId id) { data.requestId = id; }
+ void setBatchOffset(BatchOffset id) { data.batchOffset = id; }
+
+ bool isResponse() const { return true; }
+ protected:
+ static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; }
+ void printPrefix(std::ostream& out) const;
+
+ private:
+ Data data;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_AMQResponseBody_h*/
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.cpp b/cpp/lib/common/framing/BasicHeaderProperties.cpp
new file mode 100644
index 0000000000..930ec9f4dd
--- /dev/null
+++ b/cpp/lib/common/framing/BasicHeaderProperties.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BasicHeaderProperties.h>
+
+//TODO: This could be easily generated from the spec
+
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
+
+uint32_t qpid::framing::BasicHeaderProperties::size() const{
+ uint32_t bytes = 2;//flags
+ if(contentType.length() > 0) bytes += contentType.length() + 1;
+ if(contentEncoding.length() > 0) bytes += contentEncoding.length() + 1;
+ if(headers.count() > 0) bytes += headers.size();
+ if(deliveryMode != 0) bytes += 1;
+ if(priority != 0) bytes += 1;
+ if(correlationId.length() > 0) bytes += correlationId.length() + 1;
+ if(replyTo.length() > 0) bytes += replyTo.length() + 1;
+ if(expiration.length() > 0) bytes += expiration.length() + 1;
+ if(messageId.length() > 0) bytes += messageId.length() + 1;
+ if(timestamp != 0) bytes += 8;
+ if(type.length() > 0) bytes += type.length() + 1;
+ if(userId.length() > 0) bytes += userId.length() + 1;
+ if(appId.length() > 0) bytes += appId.length() + 1;
+ if(clusterId.length() > 0) bytes += clusterId.length() + 1;
+
+ return bytes;
+}
+
+void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) const{
+ uint16_t flags = getFlags();
+ buffer.putShort(flags);
+
+ if(contentType.length() > 0) buffer.putShortString(contentType);
+ if(contentEncoding.length() > 0) buffer.putShortString(contentEncoding);
+ if(headers.count() > 0) buffer.putFieldTable(headers);
+ if(deliveryMode != 0) buffer.putOctet(deliveryMode);
+ if(priority != 0) buffer.putOctet(priority);
+ if(correlationId.length() > 0) buffer.putShortString(correlationId);
+ if(replyTo.length() > 0) buffer.putShortString(replyTo);
+ if(expiration.length() > 0) buffer.putShortString(expiration);
+ if(messageId.length() > 0) buffer.putShortString(messageId);
+ if(timestamp != 0) buffer.putLongLong(timestamp);;
+ if(type.length() > 0) buffer.putShortString(type);
+ if(userId.length() > 0) buffer.putShortString(userId);
+ if(appId.length() > 0) buffer.putShortString(appId);
+ if(clusterId.length() > 0) buffer.putShortString(clusterId);
+}
+
+void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){
+ uint16_t flags = buffer.getShort();
+ if(flags & (1 << 15)) buffer.getShortString(contentType);
+ if(flags & (1 << 14)) buffer.getShortString(contentEncoding);
+ if(flags & (1 << 13)) buffer.getFieldTable(headers);
+ if(flags & (1 << 12)) deliveryMode = buffer.getOctet();
+ if(flags & (1 << 11)) priority = buffer.getOctet();
+ if(flags & (1 << 10)) buffer.getShortString(correlationId);
+ if(flags & (1 << 9)) buffer.getShortString(replyTo);
+ if(flags & (1 << 8)) buffer.getShortString(expiration);
+ if(flags & (1 << 7)) buffer.getShortString(messageId);
+ if(flags & (1 << 6)) timestamp = buffer.getLongLong();
+ if(flags & (1 << 5)) buffer.getShortString(type);
+ if(flags & (1 << 4)) buffer.getShortString(userId);
+ if(flags & (1 << 3)) buffer.getShortString(appId);
+ if(flags & (1 << 2)) buffer.getShortString(clusterId);
+}
+
+uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
+ uint16_t flags(0);
+ if(contentType.length() > 0) flags |= (1 << 15);
+ if(contentEncoding.length() > 0) flags |= (1 << 14);
+ if(headers.count() > 0) flags |= (1 << 13);
+ if(deliveryMode != 0) flags |= (1 << 12);
+ if(priority != 0) flags |= (1 << 11);
+ if(correlationId.length() > 0) flags |= (1 << 10);
+ if(replyTo.length() > 0) flags |= (1 << 9);
+ if(expiration.length() > 0) flags |= (1 << 8);
+ if(messageId.length() > 0) flags |= (1 << 7);
+ if(timestamp != 0) flags |= (1 << 6);
+ if(type.length() > 0) flags |= (1 << 5);
+ if(userId.length() > 0) flags |= (1 << 4);
+ if(appId.length() > 0) flags |= (1 << 3);
+ if(clusterId.length() > 0) flags |= (1 << 2);
+ return flags;
+}
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.h b/cpp/lib/common/framing/BasicHeaderProperties.h
new file mode 100644
index 0000000000..1f3fd31250
--- /dev/null
+++ b/cpp/lib/common/framing/BasicHeaderProperties.h
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <Buffer.h>
+#include <FieldTable.h>
+#include <HeaderProperties.h>
+
+#ifndef _BasicHeaderProperties_
+#define _BasicHeaderProperties_
+
+namespace qpid {
+namespace framing {
+ enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2};
+
+ //TODO: This could be easily generated from the spec
+ class BasicHeaderProperties : public HeaderProperties
+ {
+ string contentType;
+ string contentEncoding;
+ FieldTable headers;
+ uint8_t deliveryMode;
+ uint8_t priority;
+ string correlationId;
+ string replyTo;
+ string expiration;
+ string messageId;
+ uint64_t timestamp;
+ string type;
+ string userId;
+ string appId;
+ string clusterId;
+
+ uint16_t getFlags() const;
+
+ public:
+ BasicHeaderProperties();
+ virtual ~BasicHeaderProperties();
+ virtual uint32_t size() const;
+ virtual void encode(Buffer& buffer) const;
+ virtual void decode(Buffer& buffer, uint32_t size);
+
+ inline virtual uint8_t classId() { return BASIC; }
+
+ inline const string& getContentType() const { return contentType; }
+ inline const string& getContentEncoding() const { return contentEncoding; }
+ inline FieldTable& getHeaders() { return headers; }
+ inline uint8_t getDeliveryMode() const { return deliveryMode; }
+ inline uint8_t getPriority() const { return priority; }
+ inline const string& getCorrelationId() const {return correlationId; }
+ inline const string& getReplyTo() const { return replyTo; }
+ inline const string& getExpiration() const { return expiration; }
+ inline const string& getMessageId() const {return messageId; }
+ inline uint64_t getTimestamp() const { return timestamp; }
+ inline const string& getType() const { return type; }
+ inline const string& getUserId() const { return userId; }
+ inline const string& getAppId() const { return appId; }
+ inline const string& getClusterId() const { return clusterId; }
+
+ void inline setContentType(const string& _type){ contentType = _type; }
+ void inline setContentEncoding(const string& encoding){ contentEncoding = encoding; }
+ void inline setHeaders(const FieldTable& _headers){ headers = _headers; }
+ void inline setDeliveryMode(uint8_t mode){ deliveryMode = mode; }
+ void inline setPriority(uint8_t _priority){ priority = _priority; }
+ void inline setCorrelationId(const string& _correlationId){ correlationId = _correlationId; }
+ void inline setReplyTo(const string& _replyTo){ replyTo = _replyTo;}
+ void inline setExpiration(const string& _expiration){ expiration = _expiration; }
+ void inline setMessageId(const string& _messageId){ messageId = _messageId; }
+ void inline setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; }
+ void inline setType(const string& _type){ type = _type; }
+ void inline setUserId(const string& _userId){ userId = _userId; }
+ void inline setAppId(const string& _appId){appId = _appId; }
+ void inline setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp
new file mode 100644
index 0000000000..5dd0c0c23d
--- /dev/null
+++ b/cpp/lib/common/framing/BodyHandler.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QpidError.h"
+#include "BodyHandler.h"
+#include <AMQRequestBody.h>
+#include <AMQResponseBody.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
+
+using namespace qpid::framing;
+using namespace boost;
+
+BodyHandler::~BodyHandler() {}
+
+void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
+ switch(body->type())
+ {
+ case REQUEST_BODY:
+ handleRequest(shared_polymorphic_cast<AMQRequestBody>(body));
+ break;
+ case RESPONSE_BODY:
+ handleResponse(shared_polymorphic_cast<AMQResponseBody>(body));
+ break;
+ case METHOD_BODY:
+ handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ break;
+ case HEADER_BODY:
+ handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body));
+ break;
+ case CONTENT_BODY:
+ handleContent(shared_polymorphic_cast<AMQContentBody>(body));
+ break;
+ case HEARTBEAT_BODY:
+ handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body));
+ break;
+ default:
+ QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
+ }
+}
+
diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h
new file mode 100644
index 0000000000..cb3f0997b0
--- /dev/null
+++ b/cpp/lib/common/framing/BodyHandler.h
@@ -0,0 +1,61 @@
+#ifndef _BodyHandler_
+#define _BodyHandler_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "Requester.h"
+#include "Responder.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQRequestBody;
+class AMQResponseBody;
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class AMQHeartbeatBody;
+
+/**
+ * Interface to handle incoming frame bodies.
+ * Derived classes provide logic for each frame type.
+ */
+class BodyHandler {
+ public:
+ virtual ~BodyHandler();
+ virtual void handleBody(boost::shared_ptr<AMQBody> body);
+
+ protected:
+ virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0;
+ virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0;
+ virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
+ virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
+ virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
+ virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/Buffer.cpp b/cpp/lib/common/framing/Buffer.cpp
new file mode 100644
index 0000000000..52c9a42d55
--- /dev/null
+++ b/cpp/lib/common/framing/Buffer.cpp
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Buffer.h>
+#include <FramingContent.h>
+#include <FieldTable.h>
+
+qpid::framing::Buffer::Buffer(uint32_t _size) : size(_size), owner(true), position(0), limit(_size){
+ data = new char[size];
+}
+
+qpid::framing::Buffer::Buffer(char* _data, uint32_t _size) : size(_size), owner(false), data(_data), position(0), limit(_size){
+}
+
+qpid::framing::Buffer::~Buffer(){
+ if(owner) delete[] data;
+}
+
+void qpid::framing::Buffer::flip(){
+ limit = position;
+ position = 0;
+}
+
+void qpid::framing::Buffer::clear(){
+ limit = size;
+ position = 0;
+}
+
+void qpid::framing::Buffer::compact(){
+ uint32_t p = limit - position;
+ //copy p chars from position to 0
+ memmove(data, data + position, p);
+ limit = size;
+ position = p;
+}
+
+void qpid::framing::Buffer::record(){
+ r_position = position;
+ r_limit = limit;
+}
+
+void qpid::framing::Buffer::restore(){
+ position = r_position;
+ limit = r_limit;
+}
+
+uint32_t qpid::framing::Buffer::available(){
+ return limit - position;
+}
+
+char* qpid::framing::Buffer::start(){
+ return data + position;
+}
+
+void qpid::framing::Buffer::move(uint32_t bytes){
+ position += bytes;
+}
+
+void qpid::framing::Buffer::putOctet(uint8_t i){
+ data[position++] = i;
+}
+
+void qpid::framing::Buffer::putShort(uint16_t i){
+ uint16_t b = i;
+ data[position++] = (uint8_t) (0xFF & (b >> 8));
+ data[position++] = (uint8_t) (0xFF & b);
+}
+
+void qpid::framing::Buffer::putLong(uint32_t i){
+ uint32_t b = i;
+ data[position++] = (uint8_t) (0xFF & (b >> 24));
+ data[position++] = (uint8_t) (0xFF & (b >> 16));
+ data[position++] = (uint8_t) (0xFF & (b >> 8));
+ data[position++] = (uint8_t) (0xFF & b);
+}
+
+void qpid::framing::Buffer::putLongLong(uint64_t i){
+ uint32_t hi = i >> 32;
+ uint32_t lo = i;
+ putLong(hi);
+ putLong(lo);
+}
+
+uint8_t qpid::framing::Buffer::getOctet(){
+ return (uint8_t) data[position++];
+}
+
+uint16_t qpid::framing::Buffer::getShort(){
+ uint16_t hi = (unsigned char) data[position++];
+ hi = hi << 8;
+ hi |= (unsigned char) data[position++];
+ return hi;
+}
+
+uint32_t qpid::framing::Buffer::getLong(){
+ uint32_t a = (unsigned char) data[position++];
+ uint32_t b = (unsigned char) data[position++];
+ uint32_t c = (unsigned char) data[position++];
+ uint32_t d = (unsigned char) data[position++];
+ a = a << 24;
+ a |= b << 16;
+ a |= c << 8;
+ a |= d;
+ return a;
+}
+
+uint64_t qpid::framing::Buffer::getLongLong(){
+ uint64_t hi = getLong();
+ uint64_t lo = getLong();
+ hi = hi << 32;
+ return hi | lo;
+}
+
+
+void qpid::framing::Buffer::putShortString(const string& s){
+ uint8_t len = s.length();
+ putOctet(len);
+ s.copy(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::putLongString(const string& s){
+ uint32_t len = s.length();
+ putLong(len);
+ s.copy(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::getShortString(string& s){
+ uint8_t len = getOctet();
+ s.assign(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::getLongString(string& s){
+ uint32_t len = getLong();
+ s.assign(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::putFieldTable(const FieldTable& t){
+ t.encode(*this);
+}
+
+void qpid::framing::Buffer::getFieldTable(FieldTable& t){
+ t.decode(*this);
+}
+
+void qpid::framing::Buffer::putContent(const Content& c){
+ c.encode(*this);
+}
+
+void qpid::framing::Buffer::getContent(Content& c){
+ c.decode(*this);
+}
+
+void qpid::framing::Buffer::putRawData(const string& s){
+ uint32_t len = s.length();
+ s.copy(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::getRawData(string& s, uint32_t len){
+ s.assign(data + position, len);
+ position += len;
+}
diff --git a/cpp/lib/common/framing/Buffer.h b/cpp/lib/common/framing/Buffer.h
new file mode 100644
index 0000000000..63a15c7c3d
--- /dev/null
+++ b/cpp/lib/common/framing/Buffer.h
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+
+#ifndef _Buffer_
+#define _Buffer_
+
+namespace qpid {
+namespace framing {
+
+class Content;
+class FieldTable;
+
+class Buffer
+{
+ const uint32_t size;
+ const bool owner;//indicates whether the data is owned by this instance
+ char* data;
+ uint32_t position;
+ uint32_t limit;
+ uint32_t r_position;
+ uint32_t r_limit;
+
+public:
+
+ Buffer(uint32_t size);
+ Buffer(char* data, uint32_t size);
+ ~Buffer();
+
+ void flip();
+ void clear();
+ void compact();
+ void record();
+ void restore();
+ uint32_t available();
+ char* start();
+ void move(uint32_t bytes);
+
+ void putOctet(uint8_t i);
+ void putShort(uint16_t i);
+ void putLong(uint32_t i);
+ void putLongLong(uint64_t i);
+
+ uint8_t getOctet();
+ uint16_t getShort();
+ uint32_t getLong();
+ uint64_t getLongLong();
+
+ void putShortString(const string& s);
+ void putLongString(const string& s);
+ void getShortString(string& s);
+ void getLongString(string& s);
+
+ void putFieldTable(const FieldTable& t);
+ void getFieldTable(FieldTable& t);
+
+ void putContent(const Content& c);
+ void getContent(Content& c);
+
+ void putRawData(const string& s);
+ void getRawData(string& s, uint32_t size);
+
+};
+
+}} // namespace qpid::framing
+
+
+#endif
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
new file mode 100644
index 0000000000..8a1ff39ee5
--- /dev/null
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/format.hpp>
+
+#include "ChannelAdapter.h"
+#include "AMQFrame.h"
+#include "Exception.h"
+
+using boost::format;
+
+namespace qpid {
+namespace framing {
+
+void ChannelAdapter::init(
+ ChannelId i, OutputHandler& o, ProtocolVersion v)
+{
+ assertChannelNotOpen();
+ id = i;
+ out = &o;
+ version = v;
+}
+
+RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
+ RequestId result = 0;
+ assertChannelOpen();
+ switch (body->type()) {
+ case REQUEST_BODY: {
+ AMQRequestBody::shared_ptr request =
+ boost::shared_polymorphic_downcast<AMQRequestBody>(body);
+ requester.sending(request->getData());
+ result = request->getData().requestId;
+ break;
+ }
+ case RESPONSE_BODY: {
+ AMQResponseBody::shared_ptr response =
+ boost::shared_polymorphic_downcast<AMQResponseBody>(body);
+ responder.sending(response->getData());
+ break;
+ }
+ }
+ out->send(new AMQFrame(getVersion(), getId(), body));
+ return result;
+}
+
+void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
+ assertMethodOk(*request);
+ AMQRequestBody::Data& requestData = request->getData();
+ responder.received(requestData);
+ handleMethodInContext(request, MethodContext(this, request));
+}
+
+void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
+ assertMethodOk(*response);
+ // TODO aconway 2007-01-30: Consider a response handled on receipt.
+ // Review - any cases where this is not the case?
+ AMQResponseBody::Data& responseData = response->getData();
+ requester.processed(responseData);
+ handleMethod(response);
+}
+
+void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
+ assertMethodOk(*method);
+ handleMethodInContext(method, MethodContext(this, method));
+}
+
+void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
+ if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID)
+ throw ConnectionException(
+ 504, format("Connection method on non-0 channel %d.")%getId());
+}
+
+void ChannelAdapter::assertChannelOpen() const {
+ if (getId() != 0 && !isOpen())
+ throw ConnectionException(
+ 504, format("Channel %d is not open.")%getId());
+}
+
+void ChannelAdapter::assertChannelNotOpen() const {
+ if (getId() != 0 && isOpen())
+ throw ConnectionException(
+ 504, format("Channel %d is already open.") % getId());
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
new file mode 100644
index 0000000000..f6e3986eed
--- /dev/null
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -0,0 +1,105 @@
+#ifndef _ChannelAdapter_
+#define _ChannelAdapter_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "BodyHandler.h"
+#include "Requester.h"
+#include "Responder.h"
+#include "framing/amqp_types.h"
+
+namespace qpid {
+namespace framing {
+
+class MethodContext;
+
+// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel.
+
+/**
+ * Base class for client and broker channels.
+ *
+ * - receives frame bodies from the network.
+ * - Updates request/response data.
+ * - Dispatches requests with a MethodContext for responses.
+ *
+ * send()
+ * - Updates request/resposne ID data.
+ * - Forwards frame to the peer.
+ *
+ * Thread safety: OBJECT UNSAFE. Instances must not be called
+ * concurrently. AMQP defines channels to be serialized.
+ */
+class ChannelAdapter : public BodyHandler {
+ public:
+ /**
+ *@param output Processed frames are forwarded to this handler.
+ */
+ ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0,
+ ProtocolVersion ver=ProtocolVersion())
+ : id(id_), out(out_), version(ver) {}
+
+ /** Initialize the channel adapter. */
+ void init(ChannelId, OutputHandler&, ProtocolVersion);
+
+ ChannelId getId() const { return id; }
+ ProtocolVersion getVersion() const { return version; }
+
+ /**
+ * Wrap body in a frame and send the frame.
+ * Takes ownership of body.
+ */
+ RequestId send(AMQBody::shared_ptr body);
+ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
+
+ void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
+ void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
+ void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+
+ virtual bool isOpen() const = 0;
+
+ protected:
+ void assertMethodOk(AMQMethodBody& method) const;
+ void assertChannelOpen() const;
+ void assertChannelNotOpen() const;
+
+ virtual void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context) = 0;
+
+ RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+ RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
+ RequestId getNextSendRequestId() { return requester.getNextId(); }
+
+ private:
+ ChannelId id;
+ OutputHandler* out;
+ ProtocolVersion version;
+ Requester requester;
+ Responder responder;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/FieldTable.cpp b/cpp/lib/common/framing/FieldTable.cpp
new file mode 100644
index 0000000000..5bbc4651d3
--- /dev/null
+++ b/cpp/lib/common/framing/FieldTable.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <FieldTable.h>
+#include <QpidError.h>
+#include <Buffer.h>
+#include <Value.h>
+#include <assert.h>
+
+namespace qpid {
+namespace framing {
+
+FieldTable::~FieldTable() {}
+
+uint32_t FieldTable::size() const {
+ uint32_t len(4);
+ for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ // 2 = shortstr_len_byyte + type_char_byte
+ len += 2 + (i->first).size() + (i->second)->size();
+ }
+ return len;
+}
+
+int FieldTable::count() const {
+ return values.size();
+}
+
+namespace
+{
+std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) {
+ return out << i.first << ":" << *i.second;
+}
+}
+
+std::ostream& operator<<(std::ostream& out, const FieldTable& t) {
+ out << "{";
+ FieldTable::ValueMap::const_iterator i = t.getMap().begin();
+ if (i != t.getMap().end()) out << *i++;
+ while (i != t.getMap().end())
+ {
+ out << "," << *i++;
+ }
+ return out << "}";
+}
+
+void FieldTable::setString(const std::string& name, const std::string& value){
+ values[name] = ValuePtr(new StringValue(value));
+}
+
+void FieldTable::setInt(const std::string& name, int value){
+ values[name] = ValuePtr(new IntegerValue(value));
+}
+
+void FieldTable::setTimestamp(const std::string& name, uint64_t value){
+ values[name] = ValuePtr(new TimeValue(value));
+}
+
+void FieldTable::setTable(const std::string& name, const FieldTable& value){
+ values[name] = ValuePtr(new FieldTableValue(value));
+}
+
+namespace {
+template <class T> T default_value() { return T(); }
+template <> int default_value<int>() { return 0; }
+template <> uint64_t default_value<uint64_t>() { return 0; }
+}
+
+template <class T>
+T FieldTable::getValue(const std::string& name) const
+{
+ ValueMap::const_iterator i = values.find(name);
+ if (i == values.end()) return default_value<T>();
+ const ValueOps<T> *vt = dynamic_cast<const ValueOps<T>*>(i->second.get());
+ return vt->getValue();
+}
+
+std::string FieldTable::getString(const std::string& name) const {
+ return getValue<std::string>(name);
+}
+
+int FieldTable::getInt(const std::string& name) const {
+ return getValue<int>(name);
+}
+
+uint64_t FieldTable::getTimestamp(const std::string& name) const {
+ return getValue<uint64_t>(name);
+}
+
+void FieldTable::getTable(const std::string& name, FieldTable& value) const {
+ value = getValue<FieldTable>(name);
+}
+
+void FieldTable::encode(Buffer& buffer) const{
+ buffer.putLong(size() - 4);
+ for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
+ buffer.putShortString(i->first);
+ buffer.putOctet(i->second->getType());
+ i->second->encode(buffer);
+ }
+}
+
+void FieldTable::decode(Buffer& buffer){
+ uint32_t len = buffer.getLong();
+ uint32_t available = buffer.available();
+ if (available < len)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table.");
+ uint32_t leftover = available - len;
+ while(buffer.available() > leftover){
+ std::string name;
+ buffer.getShortString(name);
+ std::auto_ptr<Value> value(Value::decode_value(buffer));
+ values[name] = ValuePtr(value.release());
+ }
+}
+
+
+bool FieldTable::operator==(const FieldTable& x) const {
+ if (values.size() != x.values.size()) return false;
+ for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ ValueMap::const_iterator j = x.values.find(i->first);
+ if (j == x.values.end()) return false;
+ if (*(i->second) != *(j->second)) return false;
+ }
+ return true;
+}
+
+void FieldTable::erase(const std::string& name)
+{
+ values.erase(values.find(name));
+}
+
+}
+}
diff --git a/cpp/lib/common/framing/FieldTable.h b/cpp/lib/common/framing/FieldTable.h
new file mode 100644
index 0000000000..e25a7d3f8c
--- /dev/null
+++ b/cpp/lib/common/framing/FieldTable.h
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <amqp_types.h>
+
+#ifndef _FieldTable_
+#define _FieldTable_
+
+namespace qpid {
+ /**
+ * The framing namespace contains classes that are used to create,
+ * send and receive the basic packets from which AMQP is built.
+ */
+namespace framing {
+
+class Value;
+class Buffer;
+
+/**
+ * A set of name-value pairs. (See the AMQP spec for more details on
+ * AMQP field tables).
+ *
+ * \ingroup clientapi
+ */
+class FieldTable
+{
+ public:
+ typedef boost::shared_ptr<Value> ValuePtr;
+ typedef std::map<std::string, ValuePtr> ValueMap;
+
+ ~FieldTable();
+ uint32_t size() const;
+ int count() const;
+ void setString(const std::string& name, const std::string& value);
+ void setInt(const std::string& name, int value);
+ void setTimestamp(const std::string& name, uint64_t value);
+ void setTable(const std::string& name, const FieldTable& value);
+ //void setDecimal(string& name, xxx& value);
+ std::string getString(const std::string& name) const;
+ int getInt(const std::string& name) const;
+ uint64_t getTimestamp(const std::string& name) const;
+ void getTable(const std::string& name, FieldTable& value) const;
+ //void getDecimal(string& name, xxx& value);
+ void erase(const std::string& name);
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+
+ bool operator==(const FieldTable& other) const;
+
+ // TODO aconway 2006-09-26: Yeuch! Rework FieldTable to have
+ // a map-like interface.
+ const ValueMap& getMap() const { return values; }
+ ValueMap& getMap() { return values; }
+
+ private:
+ friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
+ ValueMap values;
+ template<class T> T getValue(const std::string& name) const;
+};
+
+class FieldNotFoundException{};
+class UnknownFieldName : public FieldNotFoundException{};
+class IncorrectFieldType : public FieldNotFoundException{};
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/FramingContent.cpp b/cpp/lib/common/framing/FramingContent.cpp
new file mode 100644
index 0000000000..24efa38dcb
--- /dev/null
+++ b/cpp/lib/common/framing/FramingContent.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <assert.h>
+
+#include "Buffer.h"
+#include "FramingContent.h"
+#include <QpidError.h>
+#include <sstream>
+
+namespace qpid {
+namespace framing {
+
+Content::Content() : discriminator(0) {}
+
+Content::Content(uint8_t _discriminator, const string& _value): discriminator(_discriminator), value(_value) {
+ validate();
+}
+
+void Content::validate() {
+ if (discriminator == REFERENCE) {
+ if(value.empty()) {
+ THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty");
+ }
+ }else if (discriminator != INLINE) {
+ std::stringstream out;
+ out << "Invalid discriminator: " << (int) discriminator;
+ THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ }
+}
+
+Content::~Content() {}
+
+void Content::encode(Buffer& buffer) const {
+ buffer.putOctet(discriminator);
+ buffer.putLongString(value);
+}
+
+void Content::decode(Buffer& buffer) {
+ discriminator = buffer.getOctet();
+ buffer.getLongString(value);
+ validate();
+}
+
+size_t Content::size() const {
+ return 1/*discriminator*/ + 4/*for recording size of long string*/ + value.size();
+}
+
+std::ostream& operator<<(std::ostream& out, const Content& content) {
+ if (content.discriminator == REFERENCE) {
+ out << "{REF:" << content.value << "}";
+ } else if (content.discriminator == INLINE) {
+ out << "{INLINE:" << content.value.size() << " bytes}";
+ }
+ return out;
+}
+
+}} // namespace framing::qpid
diff --git a/cpp/lib/common/framing/FramingContent.h b/cpp/lib/common/framing/FramingContent.h
new file mode 100644
index 0000000000..696bcc7c1a
--- /dev/null
+++ b/cpp/lib/common/framing/FramingContent.h
@@ -0,0 +1,40 @@
+#ifndef _framing_FramingContent_h
+#define _framing_FramingContent_h
+
+#include <ostream>
+
+namespace qpid {
+namespace framing {
+
+enum discriminator_types { INLINE = 0, REFERENCE = 1 };
+
+/**
+ * A representation of the AMQP 'content' data type (used for message
+ * bodies) which can hold inline data or a reference.
+ */
+class Content
+{
+ uint8_t discriminator;
+ string value;
+
+ void validate();
+
+ public:
+ Content();
+ Content(uint8_t _discriminator, const string& _value);
+ ~Content();
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ size_t size() const;
+ bool isInline() const { return discriminator == INLINE; }
+ bool isReference() const { return discriminator == REFERENCE; }
+ const string& getValue() const { return value; }
+
+ friend std::ostream& operator<<(std::ostream&, const Content&);
+};
+
+}} // namespace qpid::framing
+
+
+#endif /*!_framing_FramingContent_h*/
diff --git a/cpp/lib/common/framing/HeaderProperties.h b/cpp/lib/common/framing/HeaderProperties.h
new file mode 100644
index 0000000000..1ec4840309
--- /dev/null
+++ b/cpp/lib/common/framing/HeaderProperties.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <Buffer.h>
+
+#ifndef _HeaderProperties_
+#define _HeaderProperties_
+
+namespace qpid {
+namespace framing {
+
+ enum header_classes{BASIC = 60};
+
+ class HeaderProperties
+ {
+
+ public:
+ inline virtual ~HeaderProperties(){}
+ virtual uint8_t classId() = 0;
+ virtual uint32_t size() const = 0;
+ virtual void encode(Buffer& buffer) const = 0;
+ virtual void decode(Buffer& buffer, uint32_t size) = 0;
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/InitiationHandler.cpp b/cpp/lib/common/framing/InitiationHandler.cpp
new file mode 100644
index 0000000000..dd92c9859b
--- /dev/null
+++ b/cpp/lib/common/framing/InitiationHandler.cpp
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <InitiationHandler.h>
+
+qpid::framing::InitiationHandler::~InitiationHandler() {}
diff --git a/cpp/lib/common/framing/InitiationHandler.h b/cpp/lib/common/framing/InitiationHandler.h
new file mode 100644
index 0000000000..d94fc58d2c
--- /dev/null
+++ b/cpp/lib/common/framing/InitiationHandler.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _InitiationHandler_
+#define _InitiationHandler_
+
+#include <ProtocolInitiation.h>
+
+namespace qpid {
+namespace framing {
+
+ class InitiationHandler{
+ public:
+ virtual ~InitiationHandler();
+ virtual void initiated(ProtocolInitiation* header) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/InputHandler.h b/cpp/lib/common/framing/InputHandler.h
new file mode 100644
index 0000000000..4e2d4bcc9b
--- /dev/null
+++ b/cpp/lib/common/framing/InputHandler.h
@@ -0,0 +1,39 @@
+#ifndef _InputHandler_
+#define _InputHandler_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <AMQFrame.h>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace framing {
+
+class InputHandler : private boost::noncopyable {
+ public:
+ virtual ~InputHandler() {}
+ virtual void received(AMQFrame* frame) = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/MethodContext.cpp b/cpp/lib/common/framing/MethodContext.cpp
new file mode 100644
index 0000000000..73af73f8e5
--- /dev/null
+++ b/cpp/lib/common/framing/MethodContext.cpp
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MethodContext.h"
+#include "amqp_types.h"
+#include "AMQRequestBody.h"
+
+namespace qpid {
+namespace framing {
+
+RequestId MethodContext::getRequestId() const {
+ return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody)
+ ->getRequestId();
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
new file mode 100644
index 0000000000..3493924bf6
--- /dev/null
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -0,0 +1,80 @@
+#ifndef _framing_MethodContext_h
+#define _framing_MethodContext_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "OutputHandler.h"
+#include "ProtocolVersion.h"
+
+namespace qpid {
+namespace framing {
+
+class BodyHandler;
+class AMQMethodBody;
+class ChannelAdapter;
+
+/**
+ * Invocation context for an AMQP method.
+ *
+ * It provides the method being processed and the channel on which
+ * it arrived.
+ *
+ * All Handler functions take a MethodContext as the last parameter.
+ */
+struct MethodContext
+{
+ typedef boost::shared_ptr<AMQMethodBody> BodyPtr;
+
+ MethodContext(ChannelAdapter* ch=0, BodyPtr method=BodyPtr())
+ : channel(ch), methodBody(method) {}
+
+ /**
+ * Channel on which the method being processed arrived.
+ * 0 if the method was constructed by the caller
+ * rather than received from a channel.
+ */
+ ChannelAdapter* channel;
+
+ /**
+ * Body of the method being processed.
+ * It's useful for passing around instead of unpacking all its parameters.
+ * It's also provides the request ID when constructing a response.
+ */
+ BodyPtr methodBody;
+
+ /**
+ * Return methodBody's request ID.
+ * It is an error to call this if methodBody is not a request.
+ */
+ RequestId getRequestId() const;
+};
+
+// FIXME aconway 2007-02-01: Method context only required on Handler
+// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*)
+// on AMQBody and set it during decodeing then we could get rid of the context.
+
+
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_MethodContext_h*/
diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h
new file mode 100644
index 0000000000..9ffd4227d8
--- /dev/null
+++ b/cpp/lib/common/framing/OutputHandler.h
@@ -0,0 +1,39 @@
+#ifndef _OutputHandler_
+#define _OutputHandler_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace framing {
+class AMQFrame;
+
+class OutputHandler : private boost::noncopyable {
+ public:
+ virtual ~OutputHandler() {}
+ virtual void send(AMQFrame* frame) = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp
new file mode 100644
index 0000000000..de53488f7b
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolInitiation.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ProtocolInitiation.h>
+
+namespace qpid {
+namespace framing {
+
+ProtocolInitiation::ProtocolInitiation(){}
+
+ProtocolInitiation::ProtocolInitiation(uint8_t _major, uint8_t _minor) : version(_major, _minor) {}
+
+ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {}
+
+ProtocolInitiation::~ProtocolInitiation(){}
+
+void ProtocolInitiation::encode(Buffer& buffer){
+ buffer.putOctet('A');
+ buffer.putOctet('M');
+ buffer.putOctet('Q');
+ buffer.putOctet('P');
+ buffer.putOctet(1);//class
+ buffer.putOctet(1);//instance
+ buffer.putOctet(version.getMajor());
+ buffer.putOctet(version.getMinor());
+}
+
+bool ProtocolInitiation::decode(Buffer& buffer){
+ if(buffer.available() >= 8){
+ buffer.getOctet();//A
+ buffer.getOctet();//M
+ buffer.getOctet();//Q
+ buffer.getOctet();//P
+ buffer.getOctet();//class
+ buffer.getOctet();//instance
+ version.setMajor(buffer.getOctet());
+ version.setMinor(buffer.getOctet());
+ return true;
+ }else{
+ return false;
+ }
+}
+
+//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h
new file mode 100644
index 0000000000..ed7b59e94e
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolInitiation.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <Buffer.h>
+#include <AMQDataBlock.h>
+#include <ProtocolVersion.h>
+
+#ifndef _ProtocolInitiation_
+#define _ProtocolInitiation_
+
+namespace qpid {
+namespace framing {
+
+class ProtocolInitiation : public AMQDataBlock
+{
+private:
+ ProtocolVersion version;
+
+public:
+ ProtocolInitiation();
+ ProtocolInitiation(uint8_t major, uint8_t minor);
+ ProtocolInitiation(ProtocolVersion p);
+ virtual ~ProtocolInitiation();
+ virtual void encode(Buffer& buffer);
+ virtual bool decode(Buffer& buffer);
+ inline virtual uint32_t size() const { return 8; }
+ inline uint8_t getMajor() const { return version.getMajor(); }
+ inline uint8_t getMinor() const { return version.getMinor(); }
+ inline ProtocolVersion getVersion() const { return version; }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp
new file mode 100644
index 0000000000..fd4b1a645f
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersion.cpp
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ProtocolVersion.h>
+#include <sstream>
+
+using namespace qpid::framing;
+
+const std::string ProtocolVersion::toString() const
+{
+ std::stringstream ss;
+ ss << major_ << "-" << minor_;
+ return ss.str();
+}
+
+ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p)
+{
+ major_ = p.major_;
+ minor_ = p.minor_;
+ return *this;
+}
+
+bool ProtocolVersion::operator==(ProtocolVersion p) const
+{
+ return major_ == p.major_ && minor_ == p.minor_;
+}
+
diff --git a/cpp/lib/common/framing/ProtocolVersion.h b/cpp/lib/common/framing/ProtocolVersion.h
new file mode 100644
index 0000000000..5e1429c1ea
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersion.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ProtocolVersion_
+#define _ProtocolVersion_
+
+#include <amqp_types.h>
+
+namespace qpid
+{
+namespace framing
+{
+
+class ProtocolVersion
+{
+private:
+ uint8_t major_;
+ uint8_t minor_;
+
+public:
+ ProtocolVersion(uint8_t _major=0, uint8_t _minor=0)
+ : major_(_major), minor_(_minor) {}
+
+ uint8_t getMajor() const { return major_; }
+ void setMajor(uint8_t major) { major_ = major; }
+ uint8_t getMinor() const { return minor_; }
+ void setMinor(uint8_t minor) { minor_ = minor; }
+ const std::string toString() const;
+
+ ProtocolVersion& operator=(ProtocolVersion p);
+
+ bool operator==(ProtocolVersion p) const;
+ bool operator!=(ProtocolVersion p) const { return ! (*this == p); }
+};
+
+} // namespace framing
+} // namespace qpid
+
+
+#endif // ifndef _ProtocolVersion_
diff --git a/cpp/lib/common/framing/ProtocolVersionException.cpp b/cpp/lib/common/framing/ProtocolVersionException.cpp
new file mode 100644
index 0000000000..9088422f6f
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersionException.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/format.hpp>
+#include <ProtocolVersionException.h>
+
+
+using namespace qpid::framing;
+
+void ProtocolVersionException::init(const std::string& msg)
+{
+ whatStr = boost::str(
+ boost::format("ProtocolVersionException: %s found: %s")
+ % versionFound.toString() % msg);
+}
+
diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h
new file mode 100644
index 0000000000..8e2de8b843
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersionException.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ProtocolVersionException_
+#define _ProtocolVersionException_
+
+#include <Exception.h>
+#include <ProtocolVersion.h>
+#include <string>
+#include <vector>
+
+namespace qpid {
+namespace framing {
+
+class ProtocolVersionException : public qpid::Exception
+{
+protected:
+ ProtocolVersion versionFound;
+
+public:
+ ~ProtocolVersionException() throw() {}
+
+ template <class T>
+ ProtocolVersionException(
+ ProtocolVersion ver, const T& msg) throw () : versionFound(ver)
+ { init(boost::lexical_cast<std::string>(msg)); }
+
+ template <class T>
+ ProtocolVersionException(const T& msg) throw ()
+ { init(boost::lexical_cast<std::string>(msg)); }
+
+ private:
+ void init(const std::string& msg);
+};
+
+}} // namespace qpid::framing
+
+#endif //ifndef _ProtocolVersionException_
diff --git a/cpp/lib/common/framing/Proxy.cpp b/cpp/lib/common/framing/Proxy.cpp
new file mode 100644
index 0000000000..0b2a882a49
--- /dev/null
+++ b/cpp/lib/common/framing/Proxy.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Proxy.h"
+#include "ChannelAdapter.h"
+#include "ProtocolVersion.h"
+
+namespace qpid {
+namespace framing {
+
+Proxy::~Proxy() {}
+
+ProtocolVersion Proxy::getProtocolVersion() const {
+ return channel.getVersion();
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/Proxy.h b/cpp/lib/common/framing/Proxy.h
new file mode 100644
index 0000000000..8ed46ed748
--- /dev/null
+++ b/cpp/lib/common/framing/Proxy.h
@@ -0,0 +1,51 @@
+#ifndef _framing_Proxy_h
+#define _framing_Proxy_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ProtocolVersion.h"
+
+namespace qpid {
+namespace framing {
+
+class ChannelAdapter;
+class FieldTable;
+class Content;
+
+/**
+ * Base class for proxies.
+ */
+class Proxy
+{
+
+ public:
+ Proxy(ChannelAdapter& ch) : channel(ch) {}
+ virtual ~Proxy();
+
+ ProtocolVersion getProtocolVersion() const;
+
+ protected:
+ ChannelAdapter& channel;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Proxy_h*/
diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp
new file mode 100644
index 0000000000..9ee809e2ee
--- /dev/null
+++ b/cpp/lib/common/framing/Requester.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/format.hpp>
+
+#include "Requester.h"
+#include "QpidError.h"
+
+namespace qpid {
+namespace framing {
+
+Requester::Requester() : lastId(0), responseMark(0) {}
+
+void Requester::sending(AMQRequestBody::Data& request) {
+ request.requestId = ++lastId;
+ request.responseMark = responseMark;
+}
+
+void Requester::processed(const AMQResponseBody::Data& response) {
+ responseMark = response.responseId;
+ firstAckRequest = response.requestId;
+ lastAckRequest = firstAckRequest + response.batchOffset;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h
new file mode 100644
index 0000000000..dcc4460041
--- /dev/null
+++ b/cpp/lib/common/framing/Requester.h
@@ -0,0 +1,67 @@
+#ifndef _framing_Requester_h
+#define _framing_Requester_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <set>
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQRequestBody;
+class AMQResponseBody;
+
+/**
+ * Manage request IDs and the response mark for locally initiated requests.
+ *
+ * THREAD UNSAFE: This class is called as frames are sent or received
+ * sequentially on a connection, so it does not need to be thread safe.
+ */
+class Requester
+{
+ public:
+ Requester();
+
+ /** Called before sending a request to set request data. */
+ void sending(AMQRequestBody::Data&);
+
+ /** Called after processing a response. */
+ void processed(const AMQResponseBody::Data&);
+
+ /** Get the next request id to be used. */
+ RequestId getNextId() { return lastId + 1; }
+ /** Get the first request acked by this response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+ /** Get the last request acked by this response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
+
+ private:
+ RequestId lastId;
+ ResponseId responseMark;
+ ResponseId firstAckRequest;
+ ResponseId lastAckRequest;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Requester_h*/
diff --git a/cpp/lib/common/framing/Responder.cpp b/cpp/lib/common/framing/Responder.cpp
new file mode 100644
index 0000000000..c8c5ce8dcc
--- /dev/null
+++ b/cpp/lib/common/framing/Responder.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/format.hpp>
+
+#include "Responder.h"
+#include "QpidError.h"
+
+namespace qpid {
+namespace framing {
+
+Responder::Responder() : lastId(0), responseMark(0) {}
+
+void Responder::received(const AMQRequestBody::Data& request) {
+ if (request.responseMark < responseMark || request.responseMark > lastId)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, boost::format("Invalid response mark %d.")
+ %request.responseMark);
+ responseMark = request.responseMark;
+}
+
+void Responder::sending(AMQResponseBody::Data& response) {
+ response.responseId = ++lastId;
+ assert(response.requestId); // Should be already set.
+ response.batchOffset = 0;
+}
+
+}} // namespace qpid::framing
+
diff --git a/cpp/lib/common/framing/Responder.h b/cpp/lib/common/framing/Responder.h
new file mode 100644
index 0000000000..0e1785256b
--- /dev/null
+++ b/cpp/lib/common/framing/Responder.h
@@ -0,0 +1,61 @@
+#ifndef _framing_Responder_h
+#define _framing_Responder_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Manage response ids and response mark remotely initianted requests.
+ *
+ * THREAD UNSAFE: This class is called as frames are sent or received
+ * sequentially on a connection, so it does not need to be thread safe.
+ */
+class Responder
+{
+ public:
+ Responder();
+
+ /** Called after receiving a request. */
+ void received(const AMQRequestBody::Data& request);
+
+ /** Called before sending a response to set respose data. */
+ void sending(AMQResponseBody::Data& response);
+
+ /** Get the ID of the highest response acknowledged by the peer. */
+ ResponseId getResponseMark() { return responseMark; }
+
+ // TODO aconway 2007-01-14: Batching support - store unsent
+ // Response for equality comparison with subsequent responses.
+ //
+
+ private:
+ ResponseId lastId;
+ ResponseId responseMark;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Responder_h*/
diff --git a/cpp/lib/common/framing/Value.cpp b/cpp/lib/common/framing/Value.cpp
new file mode 100644
index 0000000000..03e005e384
--- /dev/null
+++ b/cpp/lib/common/framing/Value.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Value.h>
+#include <Buffer.h>
+#include <FieldTable.h>
+#include <QpidError.h>
+#include <sstream>
+
+namespace qpid {
+namespace framing {
+
+Value::~Value() {}
+
+void StringValue::encode(Buffer& buffer){
+ buffer.putLongString(value);
+}
+void StringValue::decode(Buffer& buffer){
+ buffer.getLongString(value);
+}
+
+void IntegerValue::encode(Buffer& buffer){
+ buffer.putLong((uint32_t) value);
+}
+void IntegerValue::decode(Buffer& buffer){
+ value = buffer.getLong();
+}
+
+void TimeValue::encode(Buffer& buffer){
+ buffer.putLongLong(value);
+}
+void TimeValue::decode(Buffer& buffer){
+ value = buffer.getLongLong();
+}
+
+void DecimalValue::encode(Buffer& buffer){
+ buffer.putOctet(value.decimals);
+ buffer.putLong(value.value);
+}
+void DecimalValue::decode(Buffer& buffer){
+ value = Decimal(buffer.getLong(), buffer.getOctet());
+}
+
+void FieldTableValue::encode(Buffer& buffer){
+ buffer.putFieldTable(value);
+}
+void FieldTableValue::decode(Buffer& buffer){
+ buffer.getFieldTable(value);
+}
+
+std::auto_ptr<Value> Value::decode_value(Buffer& buffer)
+{
+ std::auto_ptr<Value> value;
+ uint8_t type = buffer.getOctet();
+ switch(type){
+ case 'S':
+ value.reset(new StringValue());
+ break;
+ case 'I':
+ value.reset(new IntegerValue());
+ break;
+ case 'D':
+ value.reset(new DecimalValue());
+ break;
+ case 'T':
+ value.reset(new TimeValue());
+ break;
+ case 'F':
+ value.reset(new FieldTableValue());
+ break;
+
+ //non-standard types, introduced in java client for JMS compliance
+ case 'x':
+ value.reset(new BinaryValue());
+ break;
+ default:
+ std::stringstream out;
+ out << "Unknown field table value type: " << type;
+ THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ }
+ value->decode(buffer);
+ return value;
+}
+
+EmptyValue::~EmptyValue() {}
+
+void EmptyValue::print(std::ostream& out) const
+{
+ out << "<empty field value>";
+}
+
+std::ostream& operator<<(std::ostream& out, const Value& v) {
+ v.print(out);
+ return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d)
+{
+ return out << "Decimal(" << d.value << "," << d.decimals << ")";
+}
+
+}}
+
+
+
diff --git a/cpp/lib/common/framing/Value.h b/cpp/lib/common/framing/Value.h
new file mode 100644
index 0000000000..8752b02f40
--- /dev/null
+++ b/cpp/lib/common/framing/Value.h
@@ -0,0 +1,171 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <vector>
+#include <amqp_types.h>
+#include <FieldTable.h>
+
+#ifndef _Value_
+#define _Value_
+
+namespace qpid {
+namespace framing {
+
+class Buffer;
+
+/**
+ * Represents a decimal value.
+ * No arithmetic functionality for now, we only care about encoding/decoding.
+ */
+struct Decimal {
+ uint32_t value;
+ uint8_t decimals;
+
+ Decimal(uint32_t value_=0, uint8_t decimals_=0) : value(value_), decimals(decimals_) {}
+ bool operator==(const Decimal& d) const {
+ return decimals == d.decimals && value == d.value;
+ }
+ bool operator!=(const Decimal& d) const { return !(*this == d); }
+};
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d);
+
+/**
+ * Polymorpic base class for values.
+ */
+class Value {
+ public:
+ virtual ~Value();
+ virtual uint32_t size() const = 0;
+ virtual char getType() const = 0;
+ virtual void encode(Buffer& buffer) = 0;
+ virtual void decode(Buffer& buffer) = 0;
+ virtual bool operator==(const Value&) const = 0;
+ bool operator!=(const Value& v) const { return !(*this == v); }
+ virtual void print(std::ostream& out) const = 0;
+
+ /** Create a new value by decoding from the buffer */
+ static std::auto_ptr<Value> decode_value(Buffer& buffer);
+};
+
+std::ostream& operator<<(std::ostream& out, const Value& d);
+
+
+/**
+ * Template for common operations on Value sub-classes.
+ */
+template <class T>
+class ValueOps : public Value
+{
+ protected:
+ T value;
+ public:
+ ValueOps() {}
+ ValueOps(const T& v) : value(v) {}
+ const T& getValue() const { return value; }
+ T& getValue() { return value; }
+
+ virtual bool operator==(const Value& v) const {
+ const ValueOps<T>* vo = dynamic_cast<const ValueOps<T>*>(&v);
+ if (vo == 0) return false;
+ else return value == vo->value;
+ }
+
+ void print(std::ostream& out) const { out << value; }
+};
+
+
+class StringValue : public ValueOps<std::string> {
+ public:
+ StringValue(const std::string& v) : ValueOps<std::string>(v) {}
+ StringValue() {}
+ virtual uint32_t size() const { return 4 + value.length(); }
+ virtual char getType() const { return 'S'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class IntegerValue : public ValueOps<int> {
+ public:
+ IntegerValue(int v) : ValueOps<int>(v) {}
+ IntegerValue(){}
+ virtual uint32_t size() const { return 4; }
+ virtual char getType() const { return 'I'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class TimeValue : public ValueOps<uint64_t> {
+ public:
+ TimeValue(uint64_t v) : ValueOps<uint64_t>(v){}
+ TimeValue(){}
+ virtual uint32_t size() const { return 8; }
+ virtual char getType() const { return 'T'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class DecimalValue : public ValueOps<Decimal> {
+ public:
+ DecimalValue(const Decimal& d) : ValueOps<Decimal>(d) {}
+ DecimalValue(uint32_t value_=0, uint8_t decimals_=0) :
+ ValueOps<Decimal>(Decimal(value_, decimals_)){}
+ virtual uint32_t size() const { return 5; }
+ virtual char getType() const { return 'D'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+
+class FieldTableValue : public ValueOps<FieldTable> {
+ public:
+ FieldTableValue(const FieldTable& v) : ValueOps<FieldTable>(v){}
+ FieldTableValue(){}
+ virtual uint32_t size() const { return 4 + value.size(); }
+ virtual char getType() const { return 'F'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class EmptyValue : public Value {
+ public:
+ ~EmptyValue();
+ virtual uint32_t size() const { return 0; }
+ virtual char getType() const { return 0; }
+ virtual void encode(Buffer& ) {}
+ virtual void decode(Buffer& ) {}
+ virtual bool operator==(const Value& v) const {
+ return dynamic_cast<const EmptyValue*>(&v);
+ }
+ virtual void print(std::ostream& out) const;
+};
+
+//non-standard types, introduced in java client for JMS compliance
+class BinaryValue : public StringValue {
+ public:
+ BinaryValue(const std::string& v) : StringValue(v) {}
+ BinaryValue() {}
+ virtual char getType() const { return 'x'; }
+};
+
+}} // qpid::framing
+
+#endif
diff --git a/cpp/lib/common/framing/amqp_framing.h b/cpp/lib/common/framing/amqp_framing.h
new file mode 100644
index 0000000000..62f87352f8
--- /dev/null
+++ b/cpp/lib/common/framing/amqp_framing.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQFrame.h>
+#include <AMQBody.h>
+#include <BodyHandler.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
+#include <AMQP_MethodVersionMap.h>
+#include <InputHandler.h>
+#include <OutputHandler.h>
+#include <InitiationHandler.h>
+#include <ProtocolInitiation.h>
+#include <BasicHeaderProperties.h>
+#include <ProtocolVersion.h>
+#include <ProtocolVersionException.h>
diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h
new file mode 100644
index 0000000000..49963bd570
--- /dev/null
+++ b/cpp/lib/common/framing/amqp_types.h
@@ -0,0 +1,57 @@
+#ifndef AMQP_TYPES_H
+#define AMQP_TYPES_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** \file
+ * Type definitions and forward declarations of all types used to
+ * in AMQP messages.
+ */
+
+#include <string>
+#ifdef _WINDOWS
+#include "windows.h"
+typedef unsigned char uint8_t;
+typedef unsigned short uint16_t;
+typedef unsigned int uint32_t;
+typedef unsigned __int64 uint64_t;
+#endif
+#ifndef _WINDOWS
+#include "stdint.h"
+#endif
+
+namespace qpid {
+namespace framing {
+
+using std::string;
+typedef uint16_t ChannelId;
+typedef uint64_t RequestId;
+typedef uint64_t ResponseId;
+typedef uint32_t BatchOffset;
+typedef uint16_t ClassId;
+typedef uint16_t MethodId;
+typedef uint16_t ReplyCode;
+
+// Types represented by classes.
+class Content;
+class FieldTable;
+}} // namespace qpid::framing
+#endif
diff --git a/cpp/lib/common/framing/amqp_types_full.h b/cpp/lib/common/framing/amqp_types_full.h
new file mode 100644
index 0000000000..6a24a99d38
--- /dev/null
+++ b/cpp/lib/common/framing/amqp_types_full.h
@@ -0,0 +1,36 @@
+#ifndef _framing_amqp_types_decl_h
+#define _framing_amqp_types_decl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** \file
+ * Type definitions and full declarations of all types used to
+ * in AMQP messages.
+ *
+ * Its better to include amqp_types.h in another header instead of this file
+ * unless the header actually needs the full declarations. Including
+ * full declarations when forward declarations would do increases compile
+ * times.
+ */
+
+#include "amqp_types.h"
+#include "FramingContent.h"
+#include "FieldTable.h"
+
+#endif /*!_framing_amqp_types_decl_h*/
diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h
new file mode 100644
index 0000000000..f571dcbddd
--- /dev/null
+++ b/cpp/lib/common/sys/Acceptor.h
@@ -0,0 +1,47 @@
+#ifndef _sys_Acceptor_h
+#define _sys_Acceptor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+#include <SharedObject.h>
+
+namespace qpid {
+namespace sys {
+
+class ConnectionInputHandlerFactory;
+
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
+ virtual ~Acceptor() = 0;
+ virtual int16_t getPort() const = 0;
+ virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0;
+ virtual void shutdown() = 0;
+};
+
+}}
+
+
+
+#endif /*!_sys_Acceptor_h*/
diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h
new file mode 100644
index 0000000000..63670cbf00
--- /dev/null
+++ b/cpp/lib/common/sys/AtomicCount.h
@@ -0,0 +1,53 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include "ScopedIncrement.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount : boost::noncopyable {
+ public:
+ typedef ScopedDecrement<AtomicCount> ScopedDecrement;
+ typedef ScopedIncrement<AtomicCount> ScopedIncrement;
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h
diff --git a/cpp/lib/common/sys/Condition.h b/cpp/lib/common/sys/Condition.h
new file mode 100644
index 0000000000..9d70af5b84
--- /dev/null
+++ b/cpp/lib/common/sys/Condition.h
@@ -0,0 +1,128 @@
+#ifndef _sys_Condition_h
+#define _sys_Condition_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/errno.h>
+#include <boost/noncopyable.hpp>
+#include <sys/Mutex.h>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_thread_cond.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A condition variable for thread synchronization.
+ */
+class Condition
+{
+ public:
+ inline Condition();
+ inline ~Condition();
+ inline void wait(Mutex&);
+ inline bool wait(Mutex&, const Time& absoluteTime);
+ inline void notify();
+ inline void notifyAll();
+
+ private:
+#ifdef USE_APR
+ apr_thread_cond_t* condition;
+#else
+ pthread_cond_t condition;
+#endif
+};
+
+
+// APR ================================================================
+#ifdef USE_APR
+
+Condition::Condition() {
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Condition::~Condition() {
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Condition::wait(Mutex& mutex) {
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex));
+}
+
+bool Condition::wait(Mutex& mutex, const Time& absoluteTime){
+ // APR uses microseconds.
+ apr_status_t status =
+ apr_thread_cond_timedwait(
+ condition, mutex.mutex, absoluteTime/TIME_USEC);
+ if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
+ return status == 0;
+}
+
+void Condition::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Condition::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+#else
+// POSIX ================================================================
+
+Condition::Condition() {
+ QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+}
+
+Condition::~Condition() {
+ QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+}
+
+void Condition::wait(Mutex& mutex) {
+ QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
+}
+
+bool Condition::wait(Mutex& mutex, const Time& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
+ }
+ return true;
+}
+
+void Condition::notify(){
+ QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+}
+
+void Condition::notifyAll(){
+ QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+}
+#endif /*USE_APR*/
+
+
+}}
+#endif /*!_sys_Condition_h*/
diff --git a/cpp/lib/common/sys/ConnectionInputHandler.h b/cpp/lib/common/sys/ConnectionInputHandler.h
new file mode 100644
index 0000000000..fa70dfaf48
--- /dev/null
+++ b/cpp/lib/common/sys/ConnectionInputHandler.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionInputHandler_
+#define _ConnectionInputHandler_
+
+#include <InputHandler.h>
+#include <InitiationHandler.h>
+#include <ProtocolInitiation.h>
+#include <sys/TimeoutHandler.h>
+
+namespace qpid {
+namespace sys {
+
+ class ConnectionInputHandler :
+ public qpid::framing::InitiationHandler,
+ public qpid::framing::InputHandler,
+ public TimeoutHandler
+ {
+ public:
+ virtual void closed() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
new file mode 100644
index 0000000000..af7d411928
--- /dev/null
+++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionInputHandlerFactory_
+#define _ConnectionInputHandlerFactory_
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class ConnectionOutputHandler;
+class ConnectionInputHandler;
+
+/**
+ * Callback interface used by the Acceptor to
+ * create a ConnectionInputHandler for each new connection.
+ */
+class ConnectionInputHandlerFactory : private boost::noncopyable
+{
+ public:
+ virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0;
+ virtual ~ConnectionInputHandlerFactory(){}
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/sys/ConnectionOutputHandler.h b/cpp/lib/common/sys/ConnectionOutputHandler.h
new file mode 100644
index 0000000000..91849e1dfb
--- /dev/null
+++ b/cpp/lib/common/sys/ConnectionOutputHandler.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionOutputHandler_
+#define _ConnectionOutputHandler_
+
+#include <OutputHandler.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Provides the output handler associated with a connection.
+ */
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler
+{
+ public:
+ virtual void close() = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/sys/Module.h b/cpp/lib/common/sys/Module.h
new file mode 100644
index 0000000000..9bf5d6e1fc
--- /dev/null
+++ b/cpp/lib/common/sys/Module.h
@@ -0,0 +1,161 @@
+#ifndef _sys_Module_h
+#define _sys_Module_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+#if USE_APR
+#include <apr_dso.h>
+ typedef apr_dso_handle_t* dso_handle_t;
+#else
+ typedef void* dso_handle_t;
+#endif
+
+ template <class T> class Module : private boost::noncopyable
+ {
+ typedef T* create_t();
+ typedef void destroy_t(T*);
+
+ dso_handle_t handle;
+ destroy_t* destroy;
+ T* ptr;
+
+ void load(const std::string& name);
+ void unload();
+ void* getSymbol(const std::string& name);
+
+ public:
+ Module(const std::string& name);
+ T* operator->();
+ T* get();
+ ~Module() throw();
+ };
+
+}
+}
+
+using namespace qpid::sys;
+
+template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0)
+{
+ load(module);
+ //TODO: need a better strategy for symbol names to allow multiple
+ //modules to be loaded without clashes...
+
+ //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic
+ create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create")));
+ destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy")));
+ ptr = create();
+}
+
+template <class T> T* Module<T>::operator->()
+{
+ return ptr;
+}
+
+template <class T> T* Module<T>::get()
+{
+ return ptr;
+}
+
+template <class T> Module<T>::~Module() throw()
+{
+ try {
+ if (handle && ptr) {
+ destroy(ptr);
+ }
+ if (handle) unload();
+ } catch (std::exception& e) {
+ std::cout << "Error while destroying module: " << e.what() << std::endl;
+ }
+ destroy = 0;
+ handle = 0;
+ ptr = 0;
+}
+
+// APR ================================================================
+#if USE_APR
+
+#include <apr/APRBase.h>
+#include <apr/APRPool.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+}
+
+template <class T> void Module<T>::unload()
+{
+ CHECK_APR_SUCCESS(apr_dso_unload(handle));
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ apr_dso_handle_sym_t symbol;
+ CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str()));
+ return (void*) symbol;
+}
+
+// POSIX================================================================
+#else
+
+#include <dlfcn.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ dlerror();
+ handle = dlopen(name.c_str(), RTLD_NOW);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void Module<T>::unload()
+{
+ dlerror();
+ dlclose(handle);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ dlerror();
+ void* sym = dlsym(handle, name.c_str());
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+ return sym;
+}
+
+#endif //if USE_APR
+
+#endif //ifndef _sys_Module_h
+
diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h
new file mode 100644
index 0000000000..a3bbd3c5aa
--- /dev/null
+++ b/cpp/lib/common/sys/Monitor.h
@@ -0,0 +1,55 @@
+#ifndef _sys_Monitor_h
+#define _sys_Monitor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/errno.h>
+#include <sys/Condition.h>
+
+#ifdef USE_APR
+# include <apr_thread_cond.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor is a condition variable and a mutex
+ */
+class Monitor : public Mutex, public Condition {
+ public:
+ using Condition::wait;
+ inline void wait();
+ inline bool wait(const Time& absoluteTime);
+};
+
+
+void Monitor::wait() {
+ Condition::wait(*this);
+}
+
+bool Monitor::wait(const Time& absoluteTime) {
+ return Condition::wait(*this, absoluteTime);
+}
+
+}}
+#endif /*!_sys_Monitor_h*/
diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h
new file mode 100644
index 0000000000..9db9be0981
--- /dev/null
+++ b/cpp/lib/common/sys/Mutex.h
@@ -0,0 +1,165 @@
+#ifndef _sys_Mutex_h
+#define _sys_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifdef USE_APR
+# include <apr_thread_mutex.h>
+# include <apr/APRBase.h>
+# include <apr/APRPool.h>
+#else
+# include <pthread.h>
+# include <posix/check.h>
+#endif
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Condition;
+
+/**
+ * Scoped lock template: calls lock() in ctor, unlock() in dtor.
+ * L can be any class with lock() and unlock() functions.
+ */
+template <class L>
+class ScopedLock
+{
+ public:
+ ScopedLock(L& l) : mutex(l) { l.lock(); }
+ ~ScopedLock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+template <class L>
+class ScopedUnlock
+{
+ public:
+ ScopedUnlock(L& l) : mutex(l) { l.unlock(); }
+ ~ScopedUnlock() { mutex.lock(); }
+ private:
+ L& mutex;
+};
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ public:
+ typedef ScopedLock<Mutex> ScopedLock;
+ typedef ScopedUnlock<Mutex> ScopedUnlock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ protected:
+#ifdef USE_APR
+ apr_thread_mutex_t* mutex;
+#else
+ pthread_mutex_t mutex;
+#endif
+ friend class Condition;
+};
+
+#ifdef USE_APR
+// APR ================================================================
+
+Mutex::Mutex() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+#else
+// POSIX ================================================================
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ // Must be public to be a POD:
+ pthread_mutex_t mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
+
+
+void PODMutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void PODMutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void PODMutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+
+Mutex::Mutex() {
+ QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0));
+}
+
+Mutex::~Mutex(){
+ QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+}
+
+void Mutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void Mutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void Mutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+#endif // USE_APR
+
+}}
+
+
+
+#endif /*!_sys_Mutex_h*/
diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp
new file mode 100644
index 0000000000..3f6156f230
--- /dev/null
+++ b/cpp/lib/common/sys/ProducerConsumer.cpp
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "QpidError.h"
+#include "ScopedIncrement.h"
+#include "ProducerConsumer.h"
+
+namespace qpid {
+namespace sys {
+
+// // ================ ProducerConsumer
+
+ProducerConsumer::ProducerConsumer(size_t init_items)
+ : items(init_items), waiters(0), stopped(false)
+{}
+
+void ProducerConsumer::stop() {
+ Mutex::ScopedLock l(monitor);
+ stopped = true;
+ monitor.notifyAll();
+ // Wait for waiting consumers to wake up.
+ while (waiters > 0)
+ monitor.wait();
+}
+
+size_t ProducerConsumer::available() const {
+ Mutex::ScopedLock l(monitor);
+ return items;
+}
+
+size_t ProducerConsumer::consumers() const {
+ Mutex::ScopedLock l(monitor);
+ return waiters;
+}
+
+// ================ Lock
+
+ProducerConsumer::Lock::Lock(ProducerConsumer& p)
+ : pc(p), lock(p.monitor), status(INCOMPLETE) {}
+
+bool ProducerConsumer::Lock::isOk() const {
+ return !pc.isStopped() && status==INCOMPLETE;
+}
+
+void ProducerConsumer::Lock::checkOk() const {
+ assert(!pc.isStopped());
+ assert(status == INCOMPLETE);
+}
+
+ProducerConsumer::Lock::~Lock() {
+ assert(status != INCOMPLETE || pc.isStopped());
+}
+
+void ProducerConsumer::Lock::confirm() {
+ checkOk();
+ status = CONFIRMED;
+}
+
+void ProducerConsumer::Lock::cancel() {
+ checkOk();
+ status = CANCELLED;
+}
+
+// ================ ProducerLock
+
+ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
+{}
+
+
+ProducerConsumer::ProducerLock::~ProducerLock() {
+ if (status == CONFIRMED) {
+ pc.items++;
+ pc.monitor.notify(); // Notify a consumer.
+ }
+}
+
+// ================ ConsumerLock
+
+ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
+{
+ if (isOk()) {
+ ScopedIncrement<size_t> inc(pc.waiters);
+ while (pc.items == 0 && !pc.stopped) {
+ pc.monitor.wait();
+ }
+ }
+}
+
+ProducerConsumer::ConsumerLock::ConsumerLock(
+ ProducerConsumer& p, const Time& timeout) : Lock(p)
+{
+ if (isOk()) {
+ // Don't wait if timeout==0
+ if (timeout == 0) {
+ if (pc.items == 0)
+ status = TIMEOUT;
+ return;
+ }
+ else {
+ Time deadline = now() + timeout;
+ ScopedIncrement<size_t> inc(pc.waiters);
+ while (pc.items == 0 && !pc.stopped) {
+ if (!pc.monitor.wait(deadline)) {
+ status = TIMEOUT;
+ return;
+ }
+ }
+ }
+ }
+}
+
+ProducerConsumer::ConsumerLock::~ConsumerLock() {
+ if (pc.isStopped()) {
+ if (pc.waiters == 0)
+ pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s)
+ }
+ else if (status==CONFIRMED) {
+ pc.items--;
+ if (pc.items > 0)
+ pc.monitor.notify(); // Notify another consumer.
+ }
+}
+
+
+}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h
new file mode 100644
index 0000000000..742639323b
--- /dev/null
+++ b/cpp/lib/common/sys/ProducerConsumer.h
@@ -0,0 +1,165 @@
+#ifndef _sys_ProducerConsumer_h
+#define _sys_ProducerConsumer_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include "Exception.h"
+#include "sys/Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Producer-consumer synchronisation.
+ *
+ * Producers increase the number of available items, consumers reduce it.
+ * Consumers wait till an item is available. Waiting threads can be
+ * woken for shutdown using stop().
+ *
+ * Note: Currently implements unbounded producer-consumer, i.e. no limit
+ * to available items, producers never block. Can be extended to support
+ * bounded PC if required.
+ *
+ // TODO aconway 2007-02-13: example, from tests.
+*/
+class ProducerConsumer
+{
+ public:
+ ProducerConsumer(size_t init_items=0);
+
+ ~ProducerConsumer() { stop(); }
+
+ /**
+ * Wake any threads waiting for ProducerLock or ConsumerLock.
+ *@post No threads are waiting in Producer or Consumer locks.
+ */
+ void stop();
+
+ /** True if queue is stopped */
+ bool isStopped() { return stopped; }
+
+ /** Number of items available for consumers */
+ size_t available() const;
+
+ /** Number of consumers waiting for items */
+ size_t consumers() const;
+
+ /** True if available == 0 */
+ bool empty() const { return available() == 0; }
+
+ /**
+ * Base class for producer and consumer locks.
+ */
+ class Lock : private boost::noncopyable {
+ public:
+
+ /**
+ * You must call isOk() after creating a lock to verify its state.
+ *
+ *@return true means the lock succeeded. You MUST call either
+ *confirm() or cancel() before the lock goes out of scope.
+ *
+ * false means the lock failed - timed out or the
+ * ProducerConsumer is stopped. You should not do anything in
+ * the scope of the lock.
+ */
+ bool isOk() const;
+
+ /**
+ * Confirm that an item was produced/consumed.
+ *@pre isOk()
+ */
+ void confirm();
+
+ /**
+ * Cancel the lock to indicate nothing was produced/consumed.
+ * Note that locks are not actually released until destroyed.
+ *
+ *@pre isOk()
+ */
+ void cancel();
+
+ /** True if this lock experienced a timeout */
+ bool isTimedOut() const { return status == TIMEOUT; }
+
+ /** True if we have been stopped */
+ bool isStopped() const { return pc.isStopped(); }
+
+ ProducerConsumer& pc;
+
+ protected:
+ /** Lock status */
+ enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
+
+ Lock(ProducerConsumer& p);
+ ~Lock();
+ void checkOk() const;
+ Mutex::ScopedLock lock;
+ Status status;
+ };
+
+ /** Lock for code that produces items. */
+ struct ProducerLock : public Lock {
+ /**
+ * Acquire locks to produce an item.
+ *@post If isOk() the calling thread has exclusive access
+ * to produce an item.
+ */
+ ProducerLock(ProducerConsumer& p);
+
+ /** Release locks, signal waiting consumers if confirm() was called. */
+ ~ProducerLock();
+ };
+
+ /** Lock for code that consumes items */
+ struct ConsumerLock : public Lock {
+ /**
+ * Wait for an item to consume and acquire locks.
+ *
+ *@post If isOk() there is at least one item available and the
+ *calling thread has exclusive access to consume it.
+ */
+ ConsumerLock(ProducerConsumer& p);
+
+ /**
+ * Wait up to timeout to acquire lock.
+ *@post If isOk() caller has a producer lock.
+ * If isTimedOut() there was a timeout.
+ * If neither then we were stopped.
+ */
+ ConsumerLock(ProducerConsumer& p, const Time& timeout);
+
+ /** Release locks */
+ ~ConsumerLock();
+ };
+
+ private:
+ mutable Monitor monitor;
+ size_t items;
+ size_t waiters;
+ bool stopped;
+
+ friend class Lock;
+ friend class ProducerLock;
+ friend class ConsumerLock;
+};
+
+}} // namespace qpid::sys
+
+#endif /*!_sys_ProducerConsumer_h*/
diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp
new file mode 100644
index 0000000000..30122c682f
--- /dev/null
+++ b/cpp/lib/common/sys/Runnable.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Runnable.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+Runnable::~Runnable() {}
+
+Runnable::Functor Runnable::functor()
+{
+ return boost::bind(&Runnable::run, this);
+}
+
+}}
diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h
new file mode 100644
index 0000000000..fb3927c612
--- /dev/null
+++ b/cpp/lib/common/sys/Runnable.h
@@ -0,0 +1,50 @@
+#ifndef _Runnable_
+#define _Runnable_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Interface for objects that can be run, e.g. in a thread.
+ */
+class Runnable
+{
+ public:
+ /** Type to represent a runnable as a Functor */
+ typedef boost::function0<void> Functor;
+
+ virtual ~Runnable();
+
+ /** Derived classes override run(). */
+ virtual void run() = 0;
+
+ /** Create a functor object that will call this->run(). */
+ Functor functor();
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/sys/ScopedIncrement.h b/cpp/lib/common/sys/ScopedIncrement.h
new file mode 100644
index 0000000000..f14461ddaf
--- /dev/null
+++ b/cpp/lib/common/sys/ScopedIncrement.h
@@ -0,0 +1,59 @@
+#ifndef _posix_ScopedIncrement_h
+#define _posix_ScopedIncrement_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/** Increment counter in constructor and decrement in destructor. */
+template <class T>
+class ScopedIncrement : boost::noncopyable
+{
+ public:
+ ScopedIncrement(T& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ T& count;
+};
+
+
+/** Decrement counter in constructor and increment in destructor. */
+template <class T>
+class ScopedDecrement : boost::noncopyable
+{
+ public:
+ ScopedDecrement(T& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+
+ /** Return the value after the decrement. */
+ operator long() { return value; }
+
+ private:
+ T& count;
+ long value;
+};
+
+
+}}
+
+
+#endif // _posix_ScopedIncrement_h
diff --git a/cpp/lib/common/sys/ShutdownHandler.h b/cpp/lib/common/sys/ShutdownHandler.h
new file mode 100644
index 0000000000..88baecb5b6
--- /dev/null
+++ b/cpp/lib/common/sys/ShutdownHandler.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ShutdownHandler_
+#define _ShutdownHandler_
+
+namespace qpid {
+namespace sys {
+
+ class ShutdownHandler
+ {
+ public:
+ virtual void shutdown() = 0;
+ virtual ~ShutdownHandler(){}
+ };
+
+}
+}
+
+#endif
diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h
new file mode 100644
index 0000000000..d793a240c6
--- /dev/null
+++ b/cpp/lib/common/sys/Socket.h
@@ -0,0 +1,88 @@
+#ifndef _sys_Socket_h
+#define _sys_Socket_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_network_io.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+class Socket
+{
+ public:
+ /** Create an initialized TCP socket */
+ static Socket createTcp();
+
+ /** Create a socket wrapper for descriptor. */
+#ifdef USE_APR
+ Socket(apr_socket_t* descriptor = 0);
+#else
+ Socket(int descriptor = 0);
+#endif
+
+ /** Set timeout for read and write */
+ void setTimeout(Time interval);
+
+ void connect(const std::string& host, int port);
+
+ void close();
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
+
+ /** Returns bytes sent or an ErrorCode value < 0. */
+ ssize_t send(const void* data, size_t size);
+
+ /**
+ * Returns bytes received, an ErrorCode value < 0 or 0
+ * if the connection closed in an orderly manner.
+ */
+ ssize_t recv(void* data, size_t size);
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10);
+
+ /** Get file descriptor */
+ int fd();
+
+ private:
+#ifdef USE_APR
+ apr_socket_t* socket;
+#else
+ void init() const;
+ mutable int socket; // Initialized on demand.
+#endif
+};
+
+}}
+
+
+#endif /*!_sys_Socket_h*/
diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h
new file mode 100644
index 0000000000..47b95b6234
--- /dev/null
+++ b/cpp/lib/common/sys/Thread.h
@@ -0,0 +1,142 @@
+#ifndef _sys_Thread_h
+#define _sys_Thread_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Runnable.h>
+
+#ifdef USE_APR
+# include <apr_thread_proc.h>
+# include <apr_portable.h>
+# include <apr/APRPool.h>
+# include <apr/APRBase.h>
+#else
+# include <posix/check.h>
+# include <pthread.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+class Thread
+{
+ public:
+ inline static Thread current();
+ inline static void yield();
+
+ inline Thread();
+ inline explicit Thread(qpid::sys::Runnable*);
+ inline explicit Thread(qpid::sys::Runnable&);
+
+ inline void join();
+
+ inline long id();
+
+ private:
+#ifdef USE_APR
+ static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data);
+ inline Thread(apr_thread_t* t);
+ apr_thread_t* thread;
+#else
+ static void* runRunnable(void* runnable);
+ inline Thread(pthread_t);
+ pthread_t thread;
+#endif
+};
+
+
+Thread::Thread() : thread(0) {}
+
+// APR ================================================================
+#ifdef USE_APR
+
+Thread::Thread(Runnable* runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
+}
+
+Thread::Thread(Runnable& runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+}
+
+void Thread::join(){
+ apr_status_t status;
+ if (thread != 0)
+ CHECK_APR_SUCCESS(apr_thread_join(&status, thread));
+}
+
+long Thread::id() {
+ return long(thread);
+}
+
+Thread::Thread(apr_thread_t* t) : thread(t) {}
+
+Thread Thread::current(){
+ apr_thread_t* thr;
+ apr_os_thread_t osthr = apr_os_thread_current();
+ CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get()));
+ return Thread(thr);
+}
+
+void Thread::yield()
+{
+ apr_thread_yield();
+}
+
+
+// POSIX ================================================================
+#else
+
+Thread::Thread(Runnable* runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+}
+
+Thread::Thread(Runnable& runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
+}
+
+void Thread::join(){
+ QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+}
+
+long Thread::id() {
+ return long(thread);
+}
+
+Thread::Thread(pthread_t thr) : thread(thr) {}
+
+Thread Thread::current() {
+ return Thread(pthread_self());
+}
+
+void Thread::yield()
+{
+ QPID_POSIX_THROW_IF(pthread_yield());
+}
+
+
+#endif
+
+}}
+
+#endif /*!_sys_Thread_h*/
diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h
new file mode 100644
index 0000000000..ff949a3e16
--- /dev/null
+++ b/cpp/lib/common/sys/ThreadSafeQueue.h
@@ -0,0 +1,98 @@
+#ifndef _sys_ThreadSafeQueue_h
+#define _sys_ThreadSafeQueue_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <deque>
+#include "ProducerConsumer.h"
+#include "Exception.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A thread safe queue template.
+ */
+template <class T, class ContainerType=std::deque<T> >
+class ThreadSafeQueue
+{
+ public:
+
+ ThreadSafeQueue() {}
+
+ /** Push a value onto the back of the queue */
+ void push(const T& value) {
+ ProducerConsumer::ProducerLock producer(pc);
+ if (producer.isOk()) {
+ producer.confirm();
+ container.push_back(value);
+ }
+ }
+
+ /** Pop a value from the front of the queue. Waits till value is available.
+ *@throw ShutdownException if queue is stopped while waiting.
+ */
+ T pop() {
+ ProducerConsumer::ConsumerLock consumer(pc);
+ if (consumer.isOk()) {
+ consumer.confirm();
+ T value(container.front());
+ container.pop_front();
+ return value;
+ }
+ throw ShutdownException();
+ }
+
+ /**
+ * If a value becomes available within the timeout, set outValue
+ * and return true. Otherwise return false;
+ */
+ bool pop(T& outValue, const Time& timeout) {
+ ProducerConsumer::ConsumerLock consumer(pc, timeout);
+ if (consumer.isOk()) {
+ consumer.confirm();
+ outValue = container.front();
+ container.pop_front();
+ return true;
+ }
+ return false;
+ }
+
+ /** Interrupt threads waiting in pop() */
+ void stop() { pc.stop(); }
+
+ /** True if queue is stopped */
+ bool isStopped() { return pc.isStopped(); }
+
+ /** Size of the queue */
+ size_t size() { ProducerConsumer::Lock l(pc); return container.size(); }
+
+ /** True if queue is empty */
+ bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); }
+
+ private:
+ ProducerConsumer pc;
+ ContainerType container;
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!_sys_ThreadSafeQueue_h*/
diff --git a/cpp/lib/common/sys/Time.cpp b/cpp/lib/common/sys/Time.cpp
new file mode 100644
index 0000000000..ad6185b966
--- /dev/null
+++ b/cpp/lib/common/sys/Time.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+namespace qpid {
+namespace sys {
+
+// APR ================================================================
+#if USE_APR
+
+Time now() { return apr_time_now() * TIME_USEC; }
+
+// POSIX================================================================
+#else
+
+Time now() {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ return toTime(ts);
+}
+
+struct timespec toTimespec(const Time& t) {
+ struct timespec ts;
+ toTimespec(ts, t);
+ return ts;
+}
+
+struct timespec& toTimespec(struct timespec& ts, const Time& t) {
+ ts.tv_sec = t / TIME_SEC;
+ ts.tv_nsec = t % TIME_SEC;
+ return ts;
+}
+
+Time toTime(const struct timespec& ts) {
+ return ts.tv_sec*TIME_SEC + ts.tv_nsec;
+}
+
+
+#endif
+}}
+
diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h
new file mode 100644
index 0000000000..3dd46741d8
--- /dev/null
+++ b/cpp/lib/common/sys/Time.h
@@ -0,0 +1,58 @@
+#ifndef _sys_Time_h
+#define _sys_Time_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+
+#ifdef USE_APR
+# include <apr_time.h>
+#else
+# include <time.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/** Time in nanoseconds */
+typedef int64_t Time;
+
+Time now();
+
+/** Nanoseconds per second. */
+const Time TIME_SEC = 1000*1000*1000;
+/** Nanoseconds per millisecond */
+const Time TIME_MSEC = 1000*1000;
+/** Nanoseconds per microseconds. */
+const Time TIME_USEC = 1000;
+/** Nanoseconds per nanosecond. */
+const Time TIME_NSEC = 1;
+
+#ifndef USE_APR
+struct timespec toTimespec(const Time& t);
+struct timespec& toTimespec(struct timespec& ts, const Time& t);
+Time toTime(const struct timespec& ts);
+#endif
+
+}}
+
+#endif /*!_sys_Time_h*/
diff --git a/cpp/lib/common/sys/TimeoutHandler.h b/cpp/lib/common/sys/TimeoutHandler.h
new file mode 100644
index 0000000000..0c10709bbf
--- /dev/null
+++ b/cpp/lib/common/sys/TimeoutHandler.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TimeoutHandler_
+#define _TimeoutHandler_
+
+namespace qpid {
+namespace sys {
+
+ class TimeoutHandler
+ {
+ public:
+ virtual void idleOut() = 0;
+ virtual void idleIn() = 0;
+ virtual ~TimeoutHandler(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
new file mode 100644
index 0000000000..52384857ed
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sys/Acceptor.h>
+#include <sys/ConnectionInputHandlerFactory.h>
+#include "LFProcessor.h"
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include "APRPool.h"
+
+namespace qpid {
+namespace sys {
+
+class APRAcceptor : public Acceptor
+{
+ public:
+ APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+ virtual int16_t getPort() const;
+ virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory);
+ virtual void shutdown();
+
+ private:
+ void shutdownImpl();
+
+ private:
+ int16_t port;
+ bool trace;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+ Mutex shutdownLock;
+};
+
+// Define generic Acceptor::create() to return APRAcceptor.
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+ return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
+}
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+ port(port_),
+ trace(trace_),
+ processor(APRPool::get(), threads, 1000, 5000000),
+ running(false)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t APRAcceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void APRAcceptor::run(ConnectionInputHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
+ session->init(factory->create(session));
+ }else{
+ Mutex::ScopedLock locker(shutdownLock);
+ if(running) {
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ shutdownImpl();
+ }
+ }
+ }
+}
+
+void APRAcceptor::shutdown() {
+ Mutex::ScopedLock locker(shutdownLock);
+ if (running) {
+ shutdownImpl();
+ }
+}
+
+void APRAcceptor::shutdownImpl() {
+ Mutex::ScopedLock locker(shutdownLock);
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+}
+
+
+}}
diff --git a/cpp/lib/common/sys/apr/APRBase.cpp b/cpp/lib/common/sys/apr/APRBase.cpp
new file mode 100644
index 0000000000..861071499f
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRBase.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <QpidError.h>
+#include "APRBase.h"
+
+using namespace qpid::sys;
+
+APRBase* APRBase::instance = 0;
+
+APRBase* APRBase::getInstance(){
+ if(instance == 0){
+ instance = new APRBase();
+ }
+ return instance;
+}
+
+
+APRBase::APRBase() : count(0){
+ apr_initialize();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, 0));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+APRBase::~APRBase(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ apr_terminate();
+}
+
+bool APRBase::_increment(){
+ bool deleted(false);
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(this == instance){
+ count++;
+ }else{
+ deleted = true;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ return !deleted;
+}
+
+void APRBase::_decrement(){
+ APRBase* copy = 0;
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(--count == 0){
+ copy = instance;
+ instance = 0;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ if(copy != 0){
+ delete copy;
+ }
+}
+
+void APRBase::increment(){
+ int count = 0;
+ while(count++ < 2 && !getInstance()->_increment()){
+ std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl;
+ }
+}
+
+void APRBase::decrement(){
+ getInstance()->_decrement();
+}
+
+std::string qpid::sys::get_desc(apr_status_t status){
+ const int size = 50;
+ char tmp[size];
+ return std::string(apr_strerror(status, tmp, size));
+}
+
diff --git a/cpp/lib/common/sys/apr/APRBase.h b/cpp/lib/common/sys/apr/APRBase.h
new file mode 100644
index 0000000000..6a866a554a
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRBase.h
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _APRBase_
+#define _APRBase_
+
+#include <string>
+#include <apr_thread_mutex.h>
+#include <apr_errno.h>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+
+ /**
+ * Use of APR libraries necessitates explicit init and terminate
+ * calls. Any class using APR libs should obtain the reference to
+ * this singleton and increment on construction, decrement on
+ * destruction. This class can then correctly initialise apr
+ * before the first use and terminate after the last use.
+ */
+ class APRBase{
+ static APRBase* instance;
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ int count;
+
+ APRBase();
+ ~APRBase();
+ static APRBase* getInstance();
+ bool _increment();
+ void _decrement();
+ public:
+ static void increment();
+ static void decrement();
+ };
+
+ //this is also a convenient place for a helper function for error checking:
+ void check(apr_status_t status, const char* file, const int line);
+ std::string get_desc(apr_status_t status);
+
+#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__);
+
+}
+}
+
+// Inlined as it is called *a lot*
+void inline qpid::sys::check(apr_status_t status, const char* file, const int line){
+ if (status != APR_SUCCESS){
+ const int size = 50;
+ char tmp[size];
+ std::string msg(apr_strerror(status, tmp, size));
+ throw qpid::QpidError(APR_ERROR + ((int) status), msg,
+ qpid::SrcLine(file, line));
+ }
+}
+
+
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp
new file mode 100644
index 0000000000..e8b71f6e8a
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRPool.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "APRPool.h"
+#include "APRBase.h"
+#include <boost/pool/detail/singleton.hpp>
+
+using namespace qpid::sys;
+
+APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+apr_pool_t* APRPool::get() {
+ return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
+
diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h
new file mode 100644
index 0000000000..da7661fcfa
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRPool.h
@@ -0,0 +1,50 @@
+#ifndef _APRPool_
+#define _APRPool_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <apr_pools.h>
+
+namespace qpid {
+namespace sys {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ /** Get singleton instance */
+ static apr_pool_t* get();
+
+ private:
+ apr_pool_t* pool;
+};
+
+}}
+
+
+
+
+
+#endif /*!_APRPool_*/
diff --git a/cpp/lib/common/sys/apr/APRSocket.cpp b/cpp/lib/common/sys/apr/APRSocket.cpp
new file mode 100644
index 0000000000..96dbd132a1
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRSocket.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRSocket.h"
+#include <assert.h>
+#include <iostream>
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
+
+}
+
+void APRSocket::read(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ bytes = buffer.available();
+ apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out
+ }else if(APR_STATUS_IS_EOF(s)){
+ close();
+ }
+}
+
+void APRSocket::write(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ do{
+ bytes = buffer.available();
+ apr_socket_send(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ }while(bytes > 0);
+}
+
+void APRSocket::close(){
+ if(!closed){
+ std::cout << "Closing socket " << socket << "@" << this << std::endl;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ closed = true;
+ }
+}
+
+bool APRSocket::isOpen() const {
+ return !closed;
+}
+
+uint8_t APRSocket::read(){
+ char data[1];
+ apr_size_t bytes = 1;
+ apr_status_t s = apr_socket_recv(socket, data, &bytes);
+ if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ return 0;
+ }else{
+ return *data;
+ }
+}
+
+APRSocket::~APRSocket(){
+}
diff --git a/cpp/lib/common/sys/apr/APRSocket.h b/cpp/lib/common/sys/apr/APRSocket.h
new file mode 100644
index 0000000000..a55dfc06b0
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRSocket.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _APRSocket_
+#define _APRSocket_
+
+#include <apr_network_io.h>
+#include <Buffer.h>
+
+namespace qpid {
+namespace sys {
+
+ class APRSocket
+ {
+ apr_socket_t* const socket;
+ volatile bool closed;
+ public:
+ APRSocket(apr_socket_t* socket);
+ void read(qpid::framing::Buffer& b);
+ void write(qpid::framing::Buffer& b);
+ void close();
+ bool isOpen() const;
+ uint8_t read();
+ ~APRSocket();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp
new file mode 100644
index 0000000000..2b6fc92623
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sstream>
+#include <QpidError.h>
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "LFSessionContext.h"
+
+using namespace qpid::sys;
+using qpid::QpidError;
+
+// TODO aconway 2006-10-12: stopped is read outside locks.
+//
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+ size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ workerCount(_workers),
+ hasLeader(false),
+ workers(new Thread[_workers]),
+ stopped(false)
+{
+
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+}
+
+
+LFProcessor::~LFProcessor(){
+ if (!stopped) stop();
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = Thread(this);
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ Mutex::ScopedLock locker(countLock);
+ return count == size;
+}
+
+bool LFProcessor::empty(){
+ Mutex::ScopedLock locker(countLock);
+ return count == 0;
+}
+
+void LFProcessor::poll() {
+ apr_status_t status = APR_EGENERAL;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ const apr_pollfd_t* event = 0;
+ LFSessionContext* session = 0;
+ {
+ Monitor::ScopedLock l(leadLock);
+ waitToLead();
+ event = getNextEvent();
+ if(!event) return;
+ session = reinterpret_cast<LFSessionContext*>(
+ event->client_data);
+ session->startProcessing();
+ relinquishLead();
+ }
+
+ //process event:
+ if(event->rtnevents & APR_POLLIN) session->read();
+ if(event->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(),sessions.end(), session));
+ count--;
+ }else{
+ session->stopProcessing();
+ }
+ }
+ }catch(std::exception e){
+ std::cout << e.what() << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ {
+ Monitor::ScopedLock l(leadLock);
+ leadLock.notifyAll();
+ }
+ for(int i = 0; i < workerCount; i++){
+ workers[i].join();
+ }
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+
diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h
new file mode 100644
index 0000000000..de90199472
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFProcessor.h
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include <apr_poll.h>
+#include <iostream>
+#include <vector>
+#include <sys/Monitor.h>
+#include <sys/Runnable.h>
+#include <sys/Thread.h>
+
+namespace qpid {
+namespace sys {
+
+ class LFSessionContext;
+
+ /**
+ * This class processes a poll set using the leaders-followers
+ * pattern for thread synchronization: the leader will poll and on
+ * the poll returning, it will remove a session, promote a
+ * follower to leadership, then process the session.
+ */
+ class LFProcessor : private virtual qpid::sys::Runnable
+ {
+ typedef std::vector<LFSessionContext*>::iterator iterator;
+
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int signalledCount;
+ int current;
+ const apr_pollfd_t* signalledFDs;
+ int count;
+ const int workerCount;
+ bool hasLeader;
+ qpid::sys::Thread* workers;
+ qpid::sys::Monitor leadLock;
+ qpid::sys::Mutex countLock;
+ std::vector<LFSessionContext*> sessions;
+ volatile bool stopped;
+
+ const apr_pollfd_t* getNextEvent();
+ void waitToLead();
+ void relinquishLead();
+ void poll();
+ virtual void run();
+
+ public:
+ LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance of LFSessionContext.
+ */
+ void add(const apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(const apr_pollfd_t* const fd);
+ /**
+ * Signal that the fd passed in, already part of the pollset,
+ * has had its flags altered.
+ */
+ void update(const apr_pollfd_t* const fd);
+ /**
+ * Add an fd back to the poll set after deactivation.
+ */
+ void reactivate(const apr_pollfd_t* const fd);
+ /**
+ * Temporarily remove the fd from the poll set. Called when processing
+ * is about to begin.
+ */
+ void deactivate(const apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+ /**
+ * Start processing.
+ */
+ void start();
+ /**
+ * Is processing stopped?
+ */
+ bool isStopped();
+
+ ~LFProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp
new file mode 100644
index 0000000000..503dfddbb7
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include <QpidError.h>
+#include <assert.h>
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+ LFProcessor* const _processor,
+ bool _debug) :
+ debug(_debug),
+ socket(_socket),
+ initiated(false),
+ in(65536),
+ out(65536),
+ processor(_processor),
+ processing(false),
+ closing(false)
+{
+
+ fd.p = _pool;
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.reqevents = APR_POLLIN;
+ fd.client_data = this;
+ fd.desc.s = _socket;
+
+ out.flip();
+}
+
+LFSessionContext::~LFSessionContext(){
+
+}
+
+void LFSessionContext::read(){
+ socket.read(in);
+ in.flip();
+ if(initiated){
+ AMQFrame frame;
+ try{
+ while(frame.decode(in)){
+ if(debug) log("RECV", &frame);
+ handler->received(&frame);
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg
+ << " (" << error.loc.file << ":" << error.loc.line
+ << ")" << std::endl;
+ }
+ }else{
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ initiated = true;
+ if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
+ }
+ }
+ in.compact();
+}
+
+void LFSessionContext::write(){
+ bool done = isClosed();
+ while(!done){
+ if(out.available() > 0){
+ socket.write(out);
+ if(out.available() > 0){
+
+ //incomplete write, leave flags to receive notification of readiness to write
+ done = true;//finished processing for now, but write is still in progress
+ }
+ }else{
+ //do we have any frames to write?
+ Mutex::ScopedLock l(writeLock);
+ if(!framesToWrite.empty()){
+ out.clear();
+ bool encoded(false);
+ AMQFrame* frame = framesToWrite.front();
+ while(frame && out.available() >= frame->size()){
+ encoded = true;
+ frame->encode(out);
+ if(debug) log("SENT", frame);
+ delete frame;
+ framesToWrite.pop();
+ frame = framesToWrite.empty() ? 0 : framesToWrite.front();
+ }
+ if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ out.flip();
+ }else{
+ //reset flags, don't care about writability anymore
+ fd.reqevents = APR_POLLIN;
+ done = true;
+
+ if(closing){
+ socket.close();
+ }
+ }
+ }
+ }
+}
+
+void LFSessionContext::send(AMQFrame* frame){
+ Mutex::ScopedLock l(writeLock);
+ if(!closing){
+ framesToWrite.push(frame);
+ if(!(fd.reqevents & APR_POLLOUT)){
+ fd.reqevents |= APR_POLLOUT;
+ if(!processing){
+ processor->update(&fd);
+ }
+ }
+ }
+}
+
+void LFSessionContext::startProcessing(){
+ Mutex::ScopedLock l(writeLock);
+ processing = true;
+ processor->deactivate(&fd);
+}
+
+void LFSessionContext::stopProcessing(){
+ Mutex::ScopedLock l(writeLock);
+ processor->reactivate(&fd);
+ processing = false;
+}
+
+void LFSessionContext::close(){
+ Mutex::ScopedLock l(writeLock);
+ closing = true;
+ if(!processing){
+ //allow pending frames to be written to socket
+ fd.reqevents = APR_POLLOUT;
+ processor->update(&fd);
+ }
+}
+
+void LFSessionContext::handleClose(){
+ handler->closed();
+ std::cout << "Session closed [" << &socket << "]" << std::endl;
+ delete handler;
+ delete this;
+}
+
+void LFSessionContext::shutdown(){
+ socket.close();
+ handleClose();
+}
+
+void LFSessionContext::init(ConnectionInputHandler* _handler){
+ handler = _handler;
+ processor->add(&fd);
+}
+
+void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+ Mutex::ScopedLock l(logLock);
+ std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+}
+
+Mutex LFSessionContext::logLock;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
new file mode 100644
index 0000000000..81cfc0efda
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include <apr_network_io.h>
+#include <apr_poll.h>
+#include <apr_time.h>
+
+#include <AMQFrame.h>
+#include <Buffer.h>
+#include <sys/Monitor.h>
+#include <sys/ConnectionOutputHandler.h>
+#include <sys/ConnectionInputHandler.h>
+
+#include "APRSocket.h"
+#include "LFProcessor.h"
+
+namespace qpid {
+namespace sys {
+
+
+class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler
+{
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ qpid::sys::ConnectionInputHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::sys::Mutex writeLock;
+
+ bool processing;
+ bool closing;
+
+ static qpid::sys::Mutex logLock;
+ void log(const std::string& desc,
+ qpid::framing::AMQFrame* const frame);
+
+
+ public:
+ LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ virtual ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void read();
+ void write();
+ void init(qpid::sys::ConnectionInputHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp
new file mode 100644
index 0000000000..bca4da6c96
--- /dev/null
+++ b/cpp/lib/common/sys/apr/Socket.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <sys/Socket.h>
+#include <apr/APRBase.h>
+#include <apr/APRPool.h>
+
+
+using namespace qpid::sys;
+
+Socket Socket::createTcp() {
+ Socket s;
+ CHECK_APR_SUCCESS(
+ apr_socket_create(
+ &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ APRPool::get()));
+ return s;
+}
+
+Socket::Socket(apr_socket_t* s) {
+ socket = s;
+}
+
+void Socket::setTimeout(Time interval) {
+ apr_socket_timeout_set(socket, interval/TIME_USEC);
+}
+
+void Socket::connect(const std::string& host, int port) {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(
+ apr_sockaddr_info_get(
+ &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
+ APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+}
+
+void Socket::close() {
+ if (socket == 0) return;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket = 0;
+}
+
+ssize_t Socket::send(const void* data, size_t size)
+{
+ apr_size_t sent = size;
+ apr_status_t status =
+ apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
+ if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return sent;
+}
+
+ssize_t Socket::recv(void* data, size_t size)
+{
+ apr_size_t received = size;
+ apr_status_t status =
+ apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
+ if (APR_STATUS_IS_TIMEUP(status))
+ return SOCKET_TIMEOUT;
+ if (APR_STATUS_IS_EOF(status))
+ return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return received;
+}
+
+
diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp
new file mode 100644
index 0000000000..5c4799aa96
--- /dev/null
+++ b/cpp/lib/common/sys/apr/Thread.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Thread.h>
+
+using namespace qpid::sys;
+using qpid::sys::Runnable;
+
+void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) {
+ reinterpret_cast<Runnable*>(data)->run();
+ CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+ return NULL;
+}
+
+
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
new file mode 100644
index 0000000000..16c7ec9c3f
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -0,0 +1,325 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <mqueue.h>
+#include <string.h>
+#include <iostream>
+
+#include <sys/errno.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#include <typeinfo>
+#include <iostream>
+#include <queue>
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/current_function.hpp>
+
+#include <QpidError.h>
+#include <sys/Monitor.h>
+
+#include "check.h"
+#include "EventChannel.h"
+
+using namespace std;
+
+
+// Convenience template to zero out a struct.
+template <class S> struct ZeroStruct : public S {
+ ZeroStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+namespace qpid {
+namespace sys {
+
+
+/**
+ * EventHandler wraps an epoll file descriptor. Acts as private
+ * interface between EventChannel and subclasses.
+ *
+ * Also implements Event interface for events that are not associated
+ * with a file descriptor and are passed via the message queue.
+ */
+class EventHandler : public Event, private Monitor
+{
+ public:
+ EventHandler(int epollSize = 256);
+ ~EventHandler();
+
+ int getEpollFd() { return epollFd; }
+ void epollAdd(int fd, uint32_t epollEvents, Event* event);
+ void epollMod(int fd, uint32_t epollEvents, Event* event);
+ void epollDel(int fd);
+
+ void mqPut(Event* event);
+ Event* mqGet();
+
+ protected:
+ // Should never be called, only complete.
+ void prepare(EventHandler&) { assert(0); }
+ Event* complete(EventHandler& eh);
+
+ private:
+ int epollFd;
+ std::string mqName;
+ int mqFd;
+ std::queue<Event*> mqEvents;
+};
+
+EventHandler::EventHandler(int epollSize)
+{
+ epollFd = epoll_create(epollSize);
+ if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+ // Create a POSIX message queue for non-fd events.
+ // We write one byte and never read it is always ready for read
+ // when we add it to epoll.
+ //
+ ZeroStruct<struct mq_attr> attr;
+ attr.mq_maxmsg = 1;
+ attr.mq_msgsize = 1;
+ do {
+ char tmpnam[L_tmpnam];
+ tmpnam_r(tmpnam);
+ mqName = tmpnam + 4; // Skip "tmp/"
+ mqFd = mq_open(
+ mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
+ if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
+ } while (mqFd == EEXIST); // Name already taken, try again.
+
+ static char zero = '\0';
+ mq_send(mqFd, &zero, 1, 0);
+ epollAdd(mqFd, 0, this);
+}
+
+EventHandler::~EventHandler() {
+ mq_close(mqFd);
+ mq_unlink(mqName.c_str());
+}
+
+void EventHandler::mqPut(Event* event) {
+ ScopedLock l(*this);
+ assert(event != 0);
+ mqEvents.push(event);
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+}
+
+Event* EventHandler::mqGet() {
+ ScopedLock l(*this);
+ if (mqEvents.empty())
+ return 0;
+ Event* event = mqEvents.front();
+ mqEvents.pop();
+ if(!mqEvents.empty())
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+ return event;
+}
+
+void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollDel(int fd) {
+ if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+Event* EventHandler::complete(EventHandler& eh)
+{
+ assert(&eh == this);
+ Event* event = mqGet();
+ return event==0 ? 0 : event->complete(eh);
+}
+
+// ================================================================
+// EventChannel
+
+EventChannel::shared_ptr EventChannel::create() {
+ return shared_ptr(new EventChannel());
+}
+
+EventChannel::EventChannel() : handler(new EventHandler()) {}
+
+EventChannel::~EventChannel() {}
+
+void EventChannel::postEvent(Event& e)
+{
+ e.prepare(*handler);
+}
+
+Event* EventChannel::getEvent()
+{
+ static const int infiniteTimeout = -1;
+ ZeroStruct<struct epoll_event> epollEvent;
+
+ // Loop until we can complete the event. Some events may re-post
+ // themselves and return 0 from complete, e.g. partial reads. //
+ Event* event = 0;
+ while (event == 0) {
+ int eventCount = epoll_wait(handler->getEpollFd(),
+ &epollEvent, 1, infiniteTimeout);
+ if (eventCount < 0) {
+ if (errno != EINTR) {
+ // TODO aconway 2006-11-28: Proper handling/logging of errors.
+ cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
+ << PosixError::getMessage(errno) << endl;
+ assert(0);
+ }
+ }
+ else if (eventCount == 1) {
+ event = reinterpret_cast<Event*>(epollEvent.data.ptr);
+ assert(event != 0);
+ try {
+ event = event->complete(*handler);
+ }
+ catch (const Exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ catch (const std::exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ }
+ }
+ return event;
+}
+
+Event::~Event() {}
+
+void Event::prepare(EventHandler& handler)
+{
+ handler.mqPut(this);
+}
+
+bool Event::hasError() const {
+ return error;
+}
+
+void Event::throwIfError() throw (Exception) {
+ if (hasError())
+ error.throwSelf();
+}
+
+Event* Event::complete(EventHandler&)
+{
+ return this;
+}
+
+void Event::dispatch()
+{
+ try {
+ if (!callback.empty())
+ callback();
+ } catch (const std::exception&) {
+ throw;
+ } catch (...) {
+ throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ }
+}
+
+void Event::setError(const ExceptionHolder& e) {
+ error = e;
+}
+
+void ReadEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+ssize_t ReadEvent::doRead() {
+ ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
+ size - received);
+ if (n > 0) received += n;
+ return n;
+}
+
+Event* ReadEvent::complete(EventHandler& handler)
+{
+ // Read as much as possible without blocking.
+ ssize_t n = doRead();
+ while (n > 0 && received < size) doRead();
+
+ if (received == size) {
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ return this;
+ }
+ else if (n <0 && (errno == EAGAIN)) {
+ // Keep polling for more.
+ handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ return 0;
+ }
+ else {
+ // Unexpected EOF or error. Throw ENODATA for EOF.
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ }
+}
+
+void WriteEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+}
+
+Event* WriteEvent::complete(EventHandler& handler)
+{
+ ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
+ size - written);
+ if (n < 0) throw QPID_POSIX_ERROR(errno);
+ written += n;
+ if(written < size) {
+ // Keep polling.
+ handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+ return 0;
+ }
+ written = 0; // Reset for re-use.
+ handler.epollDel(descriptor);
+ return this;
+}
+
+void AcceptEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+Event* AcceptEvent::complete(EventHandler& handler)
+{
+ handler.epollDel(descriptor);
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) throw QPID_POSIX_ERROR(errno);
+ return this;
+}
+
+}}
diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h
new file mode 100644
index 0000000000..49c7fce740
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannel.h
@@ -0,0 +1,176 @@
+#ifndef _sys_EventChannel_h
+#define _sys_EventChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <SharedObject.h>
+#include <ExceptionHolder.h>
+#include <boost/function.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class Event;
+class EventHandler;
+class EventChannel;
+
+/**
+ * Base class for all Events.
+ */
+class Event
+{
+ public:
+ /** Type for callback when event is dispatched */
+ typedef boost::function0<void> Callback;
+
+ /**
+ * Create an event with optional callback.
+ * Instances of Event are sent directly through the channel.
+ * Derived classes define additional waiting behaviour.
+ *@param cb A callback functor that is invoked when dispatch() is called.
+ */
+ Event(Callback cb = 0) : callback(cb) {}
+
+ virtual ~Event();
+
+ /** Call the callback provided to the constructor, if any. */
+ void dispatch();
+
+ /** True if there was an error processing this event */
+ bool hasError() const;
+
+ /** If hasError() throw the corresponding exception. */
+ void throwIfError() throw(Exception);
+
+ protected:
+ virtual void prepare(EventHandler&);
+ virtual Event* complete(EventHandler&);
+ void setError(const ExceptionHolder& e);
+
+ Callback callback;
+ ExceptionHolder error;
+
+ friend class EventChannel;
+ friend class EventHandler;
+};
+
+template <class BufT>
+class IOEvent : public Event {
+ public:
+ void getDescriptor() const { return descriptor; }
+ size_t getSize() const { return size; }
+ BufT getBuffer() const { return buffer; }
+
+ protected:
+ IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
+ Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+
+ int descriptor;
+ BufT buffer;
+ size_t size;
+};
+
+/** Asynchronous read event */
+class ReadEvent : public IOEvent<void*>
+{
+ public:
+ explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
+ IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+ ssize_t doRead();
+
+ size_t received;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public IOEvent<const void*>
+{
+ public:
+ explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
+ Callback cb=0) :
+ IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+
+ protected:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ private:
+ ssize_t doWrite();
+ size_t written;
+};
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public Event
+{
+ public:
+ /** Accept a connection on fd. */
+ explicit AcceptEvent(int fd=-1, Callback cb=0) :
+ Event(cb), descriptor(fd), accepted(0) {}
+
+ /** Get descriptor for server socket */
+ int getAcceptedDesscriptor() const { return accepted; }
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ int descriptor;
+ int accepted;
+};
+
+
+class QueueSet;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void postEvent(Event& event);
+
+ /** Post an event to the channel. Must not be 0. */
+ void postEvent(Event* event) { postEvent(*event); }
+
+ /**
+ * Wait for the next complete event.
+ *@return Pointer to event. Will never return 0.
+ */
+ Event* getEvent();
+
+ private:
+ EventChannel();
+ boost::shared_ptr<EventHandler> handler;
+};
+
+
+}}
+
+
+
+#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
new file mode 100644
index 0000000000..548fbd1881
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+
+#include <boost/assert.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <sys/ConnectionOutputHandler.h>
+#include <sys/ConnectionInputHandler.h>
+#include <sys/ConnectionInputHandlerFactory.h>
+#include <sys/Acceptor.h>
+#include <sys/Socket.h>
+#include <framing/Buffer.h>
+#include <framing/AMQFrame.h>
+#include <Exception.h>
+
+#include "EventChannelConnection.h"
+
+namespace qpid {
+namespace sys {
+
+using namespace qpid::framing;
+using namespace std;
+
+class EventChannelAcceptor : public Acceptor {
+ public:
+
+
+ EventChannelAcceptor(
+ int16_t port_, int backlog, int nThreads, bool trace_
+ );
+
+ int getPort() const;
+
+ void run(ConnectionInputHandlerFactory& factory);
+
+ void shutdown();
+
+ private:
+
+ void accept();
+
+ Mutex lock;
+ Socket listener;
+ const int port;
+ const bool isTrace;
+ bool isRunning;
+ boost::ptr_vector<EventChannelConnection> connections;
+ AcceptEvent acceptEvent;
+ ConnectionInputHandlerFactory* factory;
+ bool isShutdown;
+ EventChannelThreads::shared_ptr threads;
+};
+
+Acceptor::shared_ptr Acceptor::create(
+ int16_t port, int backlog, int threads, bool trace)
+{
+ return Acceptor::shared_ptr(
+ new EventChannelAcceptor(port, backlog, threads, trace));
+}
+
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+EventChannelAcceptor::EventChannelAcceptor(
+ int16_t port_, int backlog, int nThreads, bool trace_
+) : listener(Socket::createTcp()),
+ port(listener.listen(int(port_), backlog)),
+ isTrace(trace_),
+ isRunning(false),
+ acceptEvent(listener.fd(),
+ boost::bind(&EventChannelAcceptor::accept, this)),
+ factory(0),
+ isShutdown(false),
+ threads(EventChannelThreads::create(EventChannel::create(), nThreads))
+{ }
+
+int EventChannelAcceptor::getPort() const {
+ return port; // Immutable no need for lock.
+}
+
+void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) {
+ {
+ Mutex::ScopedLock l(lock);
+ if (!isRunning && !isShutdown) {
+ isRunning = true;
+ factory = &f;
+ threads->post(acceptEvent);
+ }
+ }
+ threads->join(); // Wait for shutdown.
+}
+
+void EventChannelAcceptor::shutdown() {
+ bool doShutdown = false;
+ {
+ Mutex::ScopedLock l(lock);
+ doShutdown = !isShutdown; // I'm the shutdown thread.
+ isShutdown = true;
+ }
+ if (doShutdown) {
+ ::close(acceptEvent.getDescriptor());
+ threads->shutdown();
+ for_each(connections.begin(), connections.end(),
+ boost::bind(&EventChannelConnection::close, _1));
+ }
+ threads->join();
+}
+
+void EventChannelAcceptor::accept()
+{
+ // No lock, we only post one accept event at a time.
+ if (isShutdown)
+ return;
+ if (acceptEvent.getException()) {
+ Exception::log(*acceptEvent.getException(),
+ "EventChannelAcceptor::accept");
+ shutdown();
+ return;
+ }
+ // TODO aconway 2006-11-29: Need to reap closed connections also.
+ int fd = acceptEvent.getAcceptedDesscriptor();
+ connections.push_back(
+ new EventChannelConnection(threads, *factory, fd, fd, isTrace));
+ threads->post(acceptEvent); // Keep accepting.
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
new file mode 100644
index 0000000000..4449dc3035
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.cpp
@@ -0,0 +1,229 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+
+#include "EventChannelConnection.h"
+#include "sys/ConnectionInputHandlerFactory.h"
+#include "QpidError.h"
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace sys {
+
+const size_t EventChannelConnection::bufferSize = 65536;
+
+EventChannelConnection::EventChannelConnection(
+ EventChannelThreads::shared_ptr threads_,
+ ConnectionInputHandlerFactory& factory_,
+ int rfd,
+ int wfd,
+ bool isTrace_
+) :
+ readFd(rfd),
+ writeFd(wfd ? wfd : rfd),
+ readCallback(boost::bind(&EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endInitRead)),
+
+ isWriting(false),
+ isClosed(false),
+ threads(threads_),
+ handler(factory_.create(this)),
+ in(bufferSize),
+ out(bufferSize),
+ isTrace(isTrace_)
+{
+ BOOST_ASSERT(readFd > 0);
+ BOOST_ASSERT(writeFd > 0);
+ closeOnException(&EventChannelConnection::startRead);
+}
+
+
+void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
+ {
+ Monitor::ScopedLock lock(monitor);
+ assert(frame.get());
+ writeFrames.push_back(frame.release());
+ }
+ closeOnException(&EventChannelConnection::startWrite);
+}
+
+void EventChannelConnection::close() {
+ {
+ Monitor::ScopedLock lock(monitor);
+ if (isClosed)
+ return;
+ isClosed = true;
+ }
+ ::close(readFd);
+ ::close(writeFd);
+ {
+ Monitor::ScopedLock lock(monitor);
+ while (busyThreads > 0)
+ monitor.wait();
+ }
+ handler->closed();
+}
+
+void EventChannelConnection::closeNoThrow() {
+ Exception::tryCatchLog<void>(
+ boost::bind(&EventChannelConnection::close, this),
+ false,
+ "Exception closing channel"
+ );
+}
+
+/**
+ * Call f in a try/catch block and close the connection if
+ * an exception is thrown.
+ */
+void EventChannelConnection::closeOnException(MemberFnPtr f)
+{
+ try {
+ Exception::tryCatchLog<void>(
+ boost::bind(f, this),
+ "Closing connection due to exception"
+ );
+ return;
+ } catch (...) {
+ // Exception was already logged by tryCatchLog
+ closeNoThrow();
+ }
+}
+
+// Post the write event.
+// Always called inside closeOnException.
+// Called by endWrite and send, but only one thread writes at a time.
+//
+void EventChannelConnection::startWrite() {
+ FrameQueue::auto_type frame;
+ {
+ Monitor::ScopedLock lock(monitor);
+ // Stop if closed or a write event is already in progress.
+ if (isClosed || isWriting)
+ return;
+ if (writeFrames.empty()) {
+ isWriting = false;
+ return;
+ }
+ isWriting = true;
+ frame = writeFrames.pop_front();
+ }
+ // No need to lock here - only one thread can be writing at a time.
+ out.clear();
+ if (isTrace)
+ cout << "Send on socket " << writeFd << ": " << *frame << endl;
+ frame->encode(out);
+ out.flip();
+ writeEvent = WriteEvent(
+ writeFd, out.start(), out.available(),
+ boost::bind(&EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endWrite));
+ threads->post(writeEvent);
+}
+
+// ScopedBusy ctor increments busyThreads.
+// dtor decrements and calls monitor.notifyAll if it reaches 0.
+//
+struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
+{
+ ScopedBusy(EventChannelConnection& ecc)
+ : AtomicCount::ScopedIncrement(
+ ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
+ {}
+};
+
+// Write event completed.
+// Always called by a channel thread inside closeOnException.
+//
+void EventChannelConnection::endWrite() {
+ ScopedBusy(*this);
+ {
+ Monitor::ScopedLock lock(monitor);
+ isWriting = false;
+ if (isClosed)
+ return;
+ writeEvent.throwIfException();
+ }
+ // Check if there's more in to write in the write queue.
+ startWrite();
+}
+
+
+// Post the read event.
+// Always called inside closeOnException.
+// Called from ctor and end[Init]Read, so only one call at a time
+// is possible since we only post one read event at a time.
+//
+void EventChannelConnection::startRead() {
+ // Non blocking read, as much as we can swallow.
+ readEvent = ReadEvent(
+ readFd, in.start(), in.available(), readCallback,true);
+ threads->post(readEvent);
+}
+
+// Completion of initial read, expect protocolInit.
+// Always called inside closeOnException in channel thread.
+// Only called by one thread at a time.
+void EventChannelConnection::endInitRead() {
+ ScopedBusy(*this);
+ if (!isClosed) {
+ readEvent.throwIfException();
+ in.move(readEvent.getBytesRead());
+ in.flip();
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ readCallback = boost::bind(
+ &EventChannelConnection::closeOnException,
+ this, &EventChannelConnection::endRead);
+ }
+ in.compact();
+ // Continue reading.
+ startRead();
+ }
+}
+
+// Normal reads, expect a frame.
+// Always called inside closeOnException in channel thread.
+void EventChannelConnection::endRead() {
+ ScopedBusy(*this);
+ if (!isClosed) {
+ readEvent.throwIfException();
+ in.move(readEvent.getBytesRead());
+ in.flip();
+ AMQFrame frame;
+ while (frame.decode(in)) {
+ // TODO aconway 2006-11-30: received should take Frame&
+ if (isTrace)
+ cout << "Received on socket " << readFd
+ << ": " << frame << endl;
+ handler->received(&frame);
+ }
+ in.compact();
+ startRead();
+ }
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h
new file mode 100644
index 0000000000..da7b6dca27
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.h
@@ -0,0 +1,102 @@
+#ifndef _posix_EventChannelConnection_h
+#define _posix_EventChannelConnection_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/ptr_container/ptr_deque.hpp>
+
+#include "EventChannelThreads.h"
+#include "sys/Monitor.h"
+#include "sys/ConnectionOutputHandler.h"
+#include "sys/ConnectionInputHandler.h"
+#include "sys/AtomicCount.h"
+#include "framing/AMQFrame.h"
+
+namespace qpid {
+namespace sys {
+
+class ConnectionInputHandlerFactory;
+
+/**
+ * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler
+ * for a connection via the EventChannel.
+ *@param readDescriptor file descriptor for reading.
+ *@param writeDescriptor file descriptor for writing,
+ * by default same as readDescriptor
+ */
+class EventChannelConnection : public ConnectionOutputHandler {
+ public:
+ EventChannelConnection(
+ EventChannelThreads::shared_ptr threads,
+ ConnectionInputHandlerFactory& factory,
+ int readDescriptor,
+ int writeDescriptor = 0,
+ bool isTrace = false
+ );
+
+ // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr
+ virtual void send(qpid::framing::AMQFrame* frame) {
+ send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
+ }
+
+ virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
+
+ virtual void close();
+
+ private:
+ typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
+ typedef void (EventChannelConnection::*MemberFnPtr)();
+ struct ScopedBusy;
+
+ void startWrite();
+ void endWrite();
+ void startRead();
+ void endInitRead();
+ void endRead();
+ void closeNoThrow();
+ void closeOnException(MemberFnPtr);
+ bool shouldContinue(bool& flag);
+
+ static const size_t bufferSize;
+
+ Monitor monitor;
+
+ int readFd, writeFd;
+ ReadEvent readEvent;
+ WriteEvent writeEvent;
+ Event::Callback readCallback;
+ bool isWriting;
+ bool isClosed;
+ AtomicCount busyThreads;
+
+ EventChannelThreads::shared_ptr threads;
+ std::auto_ptr<ConnectionInputHandler> handler;
+ qpid::framing::Buffer in, out;
+ FrameQueue writeFrames;
+ bool isTrace;
+
+ friend struct ScopedBusy;
+};
+
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!_posix_EventChannelConnection_h*/
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
new file mode 100644
index 0000000000..95e699e0b0
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "EventChannelThreads.h"
+#include <sys/Runnable.h>
+#include <iostream>
+using namespace std;
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+EventChannelThreads::shared_ptr EventChannelThreads::create(
+ EventChannel::shared_ptr ec)
+{
+ return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+}
+
+EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
+ channel(ec), nWaiting(0), state(RUNNING)
+{
+ // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
+ addThread();
+}
+
+EventChannelThreads::~EventChannelThreads() {
+ shutdown();
+ join();
+}
+
+void EventChannelThreads::shutdown()
+{
+ ScopedLock lock(*this);
+ if (state != RUNNING) // Already shutting down.
+ return;
+ for (size_t i = 0; i < workers.size(); ++i) {
+ channel->postEvent(terminate);
+ }
+ state = TERMINATE_SENT;
+ notify(); // Wake up one join() thread.
+}
+
+void EventChannelThreads::join()
+{
+ {
+ ScopedLock lock(*this);
+ while (state == RUNNING) // Wait for shutdown to start.
+ wait();
+ if (state == SHUTDOWN) // Shutdown is complete
+ return;
+ if (state == JOINING) {
+ // Someone else is doing the join.
+ while (state != SHUTDOWN)
+ wait();
+ return;
+ }
+ // I'm the joining thread
+ assert(state == TERMINATE_SENT);
+ state = JOINING;
+ } // Drop the lock.
+
+ for (size_t i = 0; i < workers.size(); ++i) {
+ assert(state == JOINING); // Only this thread can change JOINING.
+ workers[i].join();
+ }
+ state = SHUTDOWN;
+ notifyAll(); // Notify other join() threaeds.
+}
+
+void EventChannelThreads::addThread() {
+ ScopedLock l(*this);
+ workers.push_back(Thread(*this));
+}
+
+void EventChannelThreads::run()
+{
+ // Start life waiting. Decrement on exit.
+ AtomicCount::ScopedIncrement inc(nWaiting);
+ try {
+ while (true) {
+ Event* e = channel->getEvent();
+ assert(e != 0);
+ if (e == &terminate) {
+ return;
+ }
+ AtomicCount::ScopedDecrement dec(nWaiting);
+ // I'm no longer waiting, make sure someone is.
+ if (dec == 0)
+ addThread();
+ e->dispatch();
+ }
+ }
+ catch (const std::exception& e) {
+ // TODO aconway 2006-11-15: need better logging across the board.
+ std::cerr << "EventChannelThreads::run() caught: " << e.what()
+ << std::endl;
+ }
+ catch (...) {
+ std::cerr << "EventChannelThreads::run() caught unknown exception."
+ << std::endl;
+ }
+}
+
+}}
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h
new file mode 100644
index 0000000000..98403c0869
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.h
@@ -0,0 +1,92 @@
+#ifndef _posix_EventChannelThreads_h
+#define _sys_EventChannelThreads_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+
+#include <Exception.h>
+#include <sys/Time.h>
+#include <sys/Monitor.h>
+#include <sys/Thread.h>
+#include <sys/AtomicCount.h>
+#include "EventChannel.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ Dynamic thread pool serving an EventChannel.
+
+ Threads run a loop { e = getEvent(); e->dispatch(); }
+ The size of the thread pool is automatically adjusted to optimal size.
+*/
+class EventChannelThreads :
+ public qpid::SharedObject<EventChannelThreads>,
+ public sys::Monitor, private sys::Runnable
+{
+ public:
+ /** Create the thread pool and start initial threads. */
+ static EventChannelThreads::shared_ptr create(
+ EventChannel::shared_ptr channel
+ );
+
+ ~EventChannelThreads();
+
+ /** Post event to the underlying channel */
+ void postEvent(Event& event) { channel->postEvent(event); }
+
+ /** Post event to the underlying channel Must not be 0. */
+ void postEvent(Event* event) { channel->postEvent(event); }
+
+ /**
+ * Terminate all threads.
+ *
+ * Returns immediately, use join() to wait till all threads are
+ * shut down.
+ */
+ void shutdown();
+
+ /** Wait for all threads to terminate. */
+ void join();
+
+ private:
+ typedef std::vector<sys::Thread> Threads;
+ typedef enum {
+ RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ } State;
+
+ EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ void addThread();
+
+ void run();
+ bool keepRunning();
+ void adjustThreads();
+
+ EventChannel::shared_ptr channel;
+ Threads workers;
+ sys::AtomicCount nWaiting;
+ State state;
+ Event terminate;
+};
+
+
+}}
+
+
+#endif /*!_sys_EventChannelThreads_h*/
diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
new file mode 100644
index 0000000000..a80a6c61f7
--- /dev/null
+++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Acceptor.h>
+#include <Exception.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
+}
+
+class PosixAcceptor : public Acceptor {
+ public:
+ virtual int16_t getPort() const { fail(); return 0; }
+ virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); }
+ virtual void shutdown() { fail(); }
+};
+
+// Define generic Acceptor::create() to return APRAcceptor.
+ Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
+{
+ return Acceptor::shared_ptr(new PosixAcceptor());
+}
+
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+}}
diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp
new file mode 100644
index 0000000000..5bd13742f6
--- /dev/null
+++ b/cpp/lib/common/sys/posix/Socket.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/socket.h>
+#include <sys/errno.h>
+#include <netinet/in.h>
+#include <netdb.h>
+
+#include <boost/format.hpp>
+
+#include <QpidError.h>
+#include <posix/check.h>
+#include <sys/Socket.h>
+
+using namespace qpid::sys;
+
+Socket Socket::createTcp()
+{
+ int s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return s;
+}
+
+Socket::Socket(int descriptor) : socket(descriptor) {}
+
+void Socket::setTimeout(Time interval)
+{
+ struct timeval tv;
+ tv.tv_sec = interval/TIME_SEC;
+ tv.tv_usec = (interval%TIME_SEC)/TIME_USEC;
+ setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+ setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+}
+
+void Socket::connect(const std::string& host, int port)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ struct hostent* hp = gethostbyname ( host.c_str() );
+ if (hp == 0) throw QPID_POSIX_ERROR(errno);
+ memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
+ if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void
+Socket::close()
+{
+ if (socket == 0) return;
+ if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
+ socket = 0;
+}
+
+ssize_t
+Socket::send(const void* data, size_t size)
+{
+ ssize_t sent = ::send(socket, data, size, 0);
+ if (sent < 0) {
+ if (errno == ECONNRESET) return SOCKET_EOF;
+ if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
+ throw QPID_POSIX_ERROR(errno);
+ }
+ return sent;
+}
+
+ssize_t
+Socket::recv(void* data, size_t size)
+{
+ ssize_t received = ::recv(socket, data, size, 0);
+ if (received < 0) {
+ if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
+ throw QPID_POSIX_ERROR(errno);
+ }
+ return received;
+}
+
+int Socket::listen(int port, int backlog)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+ if (::listen(socket, backlog) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ socklen_t namelen = sizeof(name);
+ if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ return ntohs(name.sin_port);
+}
+
+
+int Socket::fd()
+{
+ return socket;
+}
diff --git a/cpp/lib/common/sys/posix/Thread.cpp b/cpp/lib/common/sys/posix/Thread.cpp
new file mode 100644
index 0000000000..f524799556
--- /dev/null
+++ b/cpp/lib/common/sys/posix/Thread.cpp
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Thread.h>
+
+void* qpid::sys::Thread::runRunnable(void* p)
+{
+ static_cast<Runnable*>(p)->run();
+ return 0;
+}
diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp
new file mode 100644
index 0000000000..408679caa8
--- /dev/null
+++ b/cpp/lib/common/sys/posix/check.cpp
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cerrno>
+#include "check.h"
+
+namespace qpid {
+namespace sys {
+
+std::string
+PosixError::getMessage(int errNo)
+{
+ char buf[512];
+ return std::string(strerror_r(errNo, buf, sizeof(buf)));
+}
+
+PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
+ : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+{ }
+
+}}
diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h
new file mode 100644
index 0000000000..57b5a5757c
--- /dev/null
+++ b/cpp/lib/common/sys/posix/check.h
@@ -0,0 +1,62 @@
+#ifndef _posix_check_h
+#define _posix_check_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cerrno>
+#include <string>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Exception with message from errno.
+ */
+class PosixError : public qpid::QpidError
+{
+ public:
+ static std::string getMessage(int errNo);
+
+ PosixError(int errNo, const qpid::SrcLine& location) throw();
+
+ ~PosixError() throw() {}
+
+ int getErrNo() { return errNo; }
+
+ Exception* clone() const throw() { return new PosixError(*this); }
+
+ void throwSelf() const { throw *this; }
+
+ private:
+ int errNo;
+};
+
+}}
+
+/** Create a PosixError for the current file/line and errno. */
+#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+
+/** Throw a posix error if errNo is non-zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+#endif /*!_posix_check_h*/