summaryrefslogtreecommitdiff
path: root/cpp/lib/common
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
committerAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
commitfb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch)
treea2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/common
parent33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff)
downloadqpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz
2006-12-01 Jim Meyering <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy: - adds autoconf, automake, libtool support - makes the hierarchy flatter and renames a few files (e.g., Queue.h, Queue.cpp) that appeared twice, once under client/ and again under broker/. In the process, I've changed many #include directives, mostly to remove a qpid/ or qpid/framing/ prefix from the file name argument. Although most changes were to .cpp and .h files under qpid/cpp/, there were also several to template files under qpid/gentools, and even one to CppGenerator.java. Nearly all files are moved to a new position in the hierarchy. The new hierarchy looks like this: src # this is the new home of qpidd.cpp tests # all tests are here. See Makefile.am. gen # As before, all generated files go here. lib # This is just a container for the 3 lib dirs: lib/client lib/broker lib/common lib/common/framing lib/common/sys lib/common/sys/posix lib/common/sys/apr build-aux m4 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common')
-rw-r--r--cpp/lib/common/Exception.cpp42
-rw-r--r--cpp/lib/common/Exception.h61
-rw-r--r--cpp/lib/common/ExceptionHolder.cpp32
-rw-r--r--cpp/lib/common/ExceptionHolder.h62
-rw-r--r--cpp/lib/common/Makefile.am109
-rw-r--r--cpp/lib/common/QpidError.cpp44
-rw-r--r--cpp/lib/common/QpidError.h67
-rw-r--r--cpp/lib/common/SharedObject.h55
-rw-r--r--cpp/lib/common/framing/AMQBody.cpp36
-rw-r--r--cpp/lib/common/framing/AMQBody.h51
-rw-r--r--cpp/lib/common/framing/AMQContentBody.cpp43
-rw-r--r--cpp/lib/common/framing/AMQContentBody.h53
-rw-r--r--cpp/lib/common/framing/AMQDataBlock.h42
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp138
-rw-r--r--cpp/lib/common/framing/AMQFrame.h68
-rw-r--r--cpp/lib/common/framing/AMQHeaderBody.cpp75
-rw-r--r--cpp/lib/common/framing/AMQHeaderBody.h60
-rw-r--r--cpp/lib/common/framing/AMQHeartbeatBody.cpp29
-rw-r--r--cpp/lib/common/framing/AMQHeartbeatBody.h47
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp46
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h62
-rw-r--r--cpp/lib/common/framing/BasicHeaderProperties.cpp103
-rw-r--r--cpp/lib/common/framing/BasicHeaderProperties.h97
-rw-r--r--cpp/lib/common/framing/BodyHandler.cpp54
-rw-r--r--cpp/lib/common/framing/BodyHandler.h54
-rw-r--r--cpp/lib/common/framing/Buffer.cpp174
-rw-r--r--cpp/lib/common/framing/Buffer.h83
-rw-r--r--cpp/lib/common/framing/FieldTable.cpp150
-rw-r--r--cpp/lib/common/framing/FieldTable.h80
-rw-r--r--cpp/lib/common/framing/HeaderProperties.h46
-rw-r--r--cpp/lib/common/framing/InitiationHandler.cpp24
-rw-r--r--cpp/lib/common/framing/InitiationHandler.h41
-rw-r--r--cpp/lib/common/framing/InputHandler.h39
-rw-r--r--cpp/lib/common/framing/OutputHandler.h39
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.cpp58
-rw-r--r--cpp/lib/common/framing/ProtocolInitiation.h54
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.cpp64
-rw-r--r--cpp/lib/common/framing/ProtocolVersion.h57
-rw-r--r--cpp/lib/common/framing/ProtocolVersionException.cpp66
-rw-r--r--cpp/lib/common/framing/ProtocolVersionException.h55
-rw-r--r--cpp/lib/common/framing/Value.cpp114
-rw-r--r--cpp/lib/common/framing/Value.h163
-rw-r--r--cpp/lib/common/framing/amqp_framing.h36
-rw-r--r--cpp/lib/common/framing/amqp_types.h45
-rw-r--r--cpp/lib/common/sys/Acceptor.h47
-rw-r--r--cpp/lib/common/sys/AtomicCount.h71
-rw-r--r--cpp/lib/common/sys/Module.h161
-rw-r--r--cpp/lib/common/sys/Monitor.h127
-rw-r--r--cpp/lib/common/sys/Mutex.h151
-rw-r--r--cpp/lib/common/sys/Runnable.cpp32
-rw-r--r--cpp/lib/common/sys/Runnable.h50
-rw-r--r--cpp/lib/common/sys/SessionContext.h41
-rw-r--r--cpp/lib/common/sys/SessionHandler.h45
-rw-r--r--cpp/lib/common/sys/SessionHandlerFactory.h46
-rw-r--r--cpp/lib/common/sys/ShutdownHandler.h37
-rw-r--r--cpp/lib/common/sys/Socket.h88
-rw-r--r--cpp/lib/common/sys/Thread.h142
-rw-r--r--cpp/lib/common/sys/Time.cpp60
-rw-r--r--cpp/lib/common/sys/Time.h58
-rw-r--r--cpp/lib/common/sys/TimeoutHandler.h39
-rw-r--r--cpp/lib/common/sys/apr/APRAcceptor.cpp110
-rw-r--r--cpp/lib/common/sys/apr/APRBase.cpp100
-rw-r--r--cpp/lib/common/sys/apr/APRBase.h66
-rw-r--r--cpp/lib/common/sys/apr/APRPool.cpp41
-rw-r--r--cpp/lib/common/sys/apr/APRPool.h50
-rw-r--r--cpp/lib/common/sys/apr/APRSocket.cpp78
-rw-r--r--cpp/lib/common/sys/apr/APRSocket.h48
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.cpp179
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.h121
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.cpp173
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h90
-rw-r--r--cpp/lib/common/sys/apr/Socket.cpp83
-rw-r--r--cpp/lib/common/sys/apr/Thread.cpp33
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp325
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.h176
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.cpp119
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.h92
-rw-r--r--cpp/lib/common/sys/posix/PosixAcceptor.cpp48
-rw-r--r--cpp/lib/common/sys/posix/Socket.cpp118
-rw-r--r--cpp/lib/common/sys/posix/Thread.cpp28
-rw-r--r--cpp/lib/common/sys/posix/check.cpp39
-rw-r--r--cpp/lib/common/sys/posix/check.h62
82 files changed, 6292 insertions, 0 deletions
diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp
new file mode 100644
index 0000000000..0161518011
--- /dev/null
+++ b/cpp/lib/common/Exception.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <Exception.h>
+
+namespace qpid {
+
+Exception::Exception() throw() {}
+
+Exception::Exception(const std::string& str) throw() : whatStr(str) {}
+
+Exception::Exception(const char* str) throw() : whatStr(str) {}
+
+Exception::~Exception() throw() {}
+
+const char* Exception::what() const throw() { return whatStr.c_str(); }
+
+std::string Exception::toString() const throw() { return whatStr; }
+
+Exception* Exception::clone() const throw() { return new Exception(*this); }
+
+void Exception::throwSelf() const { throw *this; }
+
+} // namespace qpid
diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h
new file mode 100644
index 0000000000..f35d427bb0
--- /dev/null
+++ b/cpp/lib/common/Exception.h
@@ -0,0 +1,61 @@
+#ifndef _Exception_
+#define _Exception_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <exception>
+#include <string>
+#include <memory>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid
+{
+/**
+ * Exception base class for all Qpid exceptions.
+ */
+class Exception : public std::exception
+{
+ protected:
+ std::string whatStr;
+
+ public:
+ Exception() throw();
+ Exception(const std::string& str) throw();
+ Exception(const char* str) throw();
+ Exception(const std::exception&) throw();
+
+ virtual ~Exception() throw();
+
+ virtual const char* what() const throw();
+ virtual std::string toString() const throw();
+
+ virtual Exception* clone() const throw();
+ virtual void throwSelf() const;
+
+ typedef boost::shared_ptr<Exception> shared_ptr;
+};
+
+
+
+}
+
+#endif /*!_Exception_*/
diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp
new file mode 100644
index 0000000000..de8d7b2487
--- /dev/null
+++ b/cpp/lib/common/ExceptionHolder.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ExceptionHolder.h"
+
+namespace qpid {
+
+ExceptionHolder::ExceptionHolder(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex) {
+ reset(ex->clone());
+ } else {
+ reset(new Exception(e.what()));
+ }
+}
+
+}
diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h
new file mode 100644
index 0000000000..83d0884be9
--- /dev/null
+++ b/cpp/lib/common/ExceptionHolder.h
@@ -0,0 +1,62 @@
+#ifndef _qpid_ExceptionHolder_h
+#define _qpid_ExceptionHolder_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <Exception.h>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+/**
+ * Holder for a heap-allocated exc eption that can be stack allocated
+ * and thrown safely.
+ *
+ * Basically this is a shared_ptr with the Exception functions added
+ * so the catcher need not be aware that it is a pointer rather than a
+ * reference.
+ *
+ * shared_ptr is chosen over auto_ptr because it has normal
+ * copy semantics.
+ */
+class ExceptionHolder : public Exception, public boost::shared_ptr<Exception>
+{
+ public:
+ typedef boost::shared_ptr<Exception> shared_ptr;
+
+ ExceptionHolder() throw() {}
+ ExceptionHolder(Exception* p) throw() : shared_ptr(p) {}
+ ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {}
+
+ ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {}
+ ExceptionHolder(const std::exception& e);
+
+ ~ExceptionHolder() throw() {}
+
+ const char* what() const throw() { return (*this)->what(); }
+ std::string toString() const throw() { return (*this)->toString(); }
+ virtual Exception* clone() const throw() { return (*this)->clone(); }
+ virtual void throwSelf() const { (*this)->throwSelf(); }
+};
+
+} // namespace qpid
+
+
+
+#endif /*!_qpid_ExceptionHolder_h*/
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
new file mode 100644
index 0000000000..ed02cea6a7
--- /dev/null
+++ b/cpp/lib/common/Makefile.am
@@ -0,0 +1,109 @@
+AM_CXXFLAGS = $(WARNING_CFLAGS)
+INCLUDES = \
+ -I$(top_srcdir)/gen \
+ -I$(top_srcdir)/lib/common/sys \
+ -I$(top_srcdir)/lib/common/framing
+
+# This is a kludge to include the contents of the apr/ directory.
+# For now, we don't use it.
+EXTRA_DIST = \
+ sys/apr/APRAcceptor.cpp \
+ sys/apr/APRBase.cpp \
+ sys/apr/APRBase.h \
+ sys/apr/APRPool.cpp \
+ sys/apr/APRPool.h \
+ sys/apr/APRSocket.cpp \
+ sys/apr/APRSocket.h \
+ sys/apr/LFProcessor.cpp \
+ sys/apr/LFProcessor.h \
+ sys/apr/LFSessionContext.cpp \
+ sys/apr/LFSessionContext.h \
+ sys/apr/Socket.cpp \
+ sys/apr/Thread.cpp
+
+
+framing = framing
+posix = sys/posix
+gen = $(srcdir)/../../gen
+
+lib_LTLIBRARIES = libcommon.la
+libcommon_la_LIBADD = \
+ $(LIB_DLOPEN) \
+ $(LIB_CLOCK_GETTIME)
+
+libcommon_la_LDFLAGS = \
+ -version-info \
+ $(LIBTOOL_VERSION_INFO_ARG)
+
+libcommon_la_SOURCES = \
+ $(framing)/AMQBody.cpp \
+ $(framing)/AMQBody.h \
+ $(framing)/AMQContentBody.cpp \
+ $(framing)/AMQContentBody.h \
+ $(framing)/AMQDataBlock.h \
+ $(framing)/AMQFrame.cpp \
+ $(framing)/AMQFrame.h \
+ $(framing)/AMQHeaderBody.cpp \
+ $(framing)/AMQHeaderBody.h \
+ $(framing)/AMQHeartbeatBody.cpp \
+ $(framing)/AMQHeartbeatBody.h \
+ $(framing)/AMQMethodBody.cpp \
+ $(framing)/AMQMethodBody.h \
+ $(framing)/BasicHeaderProperties.cpp \
+ $(framing)/BasicHeaderProperties.h \
+ $(framing)/BodyHandler.cpp \
+ $(framing)/BodyHandler.h \
+ $(framing)/Buffer.cpp \
+ $(framing)/Buffer.h \
+ $(framing)/FieldTable.cpp \
+ $(framing)/FieldTable.h \
+ $(framing)/HeaderProperties.h \
+ $(framing)/InitiationHandler.cpp \
+ $(framing)/InitiationHandler.h \
+ $(framing)/InputHandler.h \
+ $(framing)/OutputHandler.h \
+ $(framing)/ProtocolInitiation.cpp \
+ $(framing)/ProtocolInitiation.h \
+ $(framing)/ProtocolVersion.cpp \
+ $(framing)/ProtocolVersion.h \
+ $(framing)/ProtocolVersionException.cpp \
+ $(framing)/ProtocolVersionException.h \
+ $(framing)/Value.cpp \
+ $(framing)/Value.h \
+ $(framing)/amqp_framing.h \
+ $(framing)/amqp_types.h \
+ $(gen)/AMQP_ClientProxy.cpp \
+ $(gen)/AMQP_MethodVersionMap.cpp \
+ $(gen)/AMQP_ServerProxy.cpp \
+ $(posix)/PosixAcceptor.cpp \
+ $(posix)/Socket.cpp \
+ $(posix)/Thread.cpp \
+ $(posix)/check.cpp \
+ $(posix)/check.h \
+ $(posix)/EventChannel.cpp \
+ $(posix)/EventChannel.h \
+ $(posix)/EventChannelThreads.cpp \
+ $(posix)/EventChannelThreads.h \
+ Exception.cpp \
+ Exception.h \
+ ExceptionHolder.cpp \
+ ExceptionHolder.h \
+ QpidError.cpp \
+ QpidError.h \
+ SharedObject.h \
+ sys/Acceptor.h \
+ sys/AtomicCount.h \
+ sys/Module.h \
+ sys/Monitor.h \
+ sys/Mutex.h \
+ sys/Runnable.cpp \
+ sys/Runnable.h \
+ sys/SessionContext.h \
+ sys/SessionHandler.h \
+ sys/SessionHandlerFactory.h \
+ sys/ShutdownHandler.h \
+ sys/Socket.h \
+ sys/Thread.h \
+ sys/Time.cpp \
+ sys/Time.h \
+ sys/TimeoutHandler.h
diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp
new file mode 100644
index 0000000000..7f4f9e2f34
--- /dev/null
+++ b/cpp/lib/common/QpidError.cpp
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <QpidError.h>
+#include <sstream>
+
+using namespace qpid;
+
+QpidError::QpidError() : code(0) {}
+
+QpidError::QpidError(int _code, const std::string& _msg,
+ const SrcLine& _loc) throw()
+ : code(_code), msg(_msg), location(_loc)
+{
+ std::ostringstream os;
+ os << "Error [" << code << "] " << msg << " ("
+ << location.file << ":" << location.line << ")";
+ whatStr = os.str();
+}
+
+QpidError::~QpidError() throw() {}
+
+Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+
+void QpidError::throwSelf() const { throw *this; }
+
diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h
new file mode 100644
index 0000000000..30d9d27076
--- /dev/null
+++ b/cpp/lib/common/QpidError.h
@@ -0,0 +1,67 @@
+#ifndef __QpidError__
+#define __QpidError__
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <memory>
+#include <ostream>
+#include <Exception.h>
+
+namespace qpid {
+
+struct SrcLine {
+ public:
+ SrcLine(const std::string& file_="", int line_=0) :
+ file(file_), line(line_) {}
+
+ std::string file;
+ int line;
+};
+
+class QpidError : public Exception {
+ public:
+ const int code;
+ const std::string msg;
+ const SrcLine location;
+
+ QpidError();
+ QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw();
+ ~QpidError() throw();
+ Exception* clone() const throw();
+ void throwSelf() const;
+};
+
+
+} // namespace qpid
+
+#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
+
+#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
+
+#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
+
+const int PROTOCOL_ERROR = 10000;
+const int APR_ERROR = 20000;
+const int FRAMING_ERROR = 30000;
+const int CLIENT_ERROR = 40000;
+const int INTERNAL_ERROR = 50000;
+
+#endif
diff --git a/cpp/lib/common/SharedObject.h b/cpp/lib/common/SharedObject.h
new file mode 100644
index 0000000000..852a036ab9
--- /dev/null
+++ b/cpp/lib/common/SharedObject.h
@@ -0,0 +1,55 @@
+#ifndef _SharedObject_
+#define _SharedObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+ /**
+ * Template to enforce shared object conventions.
+ * Shared object classes should inherit : public qpid::SharedObject
+ * That ensures Foo:
+ * - has typedef boost::shared_ptr<T> shared_ptr
+ * - has virtual destructor
+ * - is boost::noncopyable (no default copy or assign)
+ * - has a protected default constructor.
+ *
+ * Shared objects should not have public constructors.
+ * Make constructors protected and provide public statc create()
+ * functions that return a shared_ptr.
+ */
+ template <class T>
+ class SharedObject : private boost::noncopyable
+ {
+ public:
+ typedef boost::shared_ptr<T> shared_ptr;
+
+ virtual ~SharedObject() {};
+
+ protected:
+ SharedObject() {}
+ };
+}
+
+#endif /*!_SharedObject_*/
diff --git a/cpp/lib/common/framing/AMQBody.cpp b/cpp/lib/common/framing/AMQBody.cpp
new file mode 100644
index 0000000000..b095312a16
--- /dev/null
+++ b/cpp/lib/common/framing/AMQBody.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <AMQBody.h>
+#include <iostream>
+
+std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body)
+{
+ body.print(out);
+ return out;
+}
+
+
+qpid::framing::AMQBody::~AMQBody() {}
+
+void qpid::framing::AMQBody::print(std::ostream& out) const {
+ out << "unknown body";
+}
diff --git a/cpp/lib/common/framing/AMQBody.h b/cpp/lib/common/framing/AMQBody.h
new file mode 100644
index 0000000000..5547d3c506
--- /dev/null
+++ b/cpp/lib/common/framing/AMQBody.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include <amqp_types.h>
+#include <Buffer.h>
+
+#ifndef _AMQBody_
+#define _AMQBody_
+
+namespace qpid {
+ namespace framing {
+
+ class AMQBody
+ {
+ public:
+ typedef boost::shared_ptr<AMQBody> shared_ptr;
+
+ virtual ~AMQBody();
+ virtual u_int32_t size() const = 0;
+ virtual u_int8_t type() const = 0;
+ virtual void encode(Buffer& buffer) const = 0;
+ virtual void decode(Buffer& buffer, u_int32_t size) = 0;
+ virtual void print(std::ostream& out) const;
+ };
+
+ std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
+
+ enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8};
+ }
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQContentBody.cpp b/cpp/lib/common/framing/AMQContentBody.cpp
new file mode 100644
index 0000000000..4f51dca243
--- /dev/null
+++ b/cpp/lib/common/framing/AMQContentBody.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQContentBody.h>
+#include <iostream>
+
+qpid::framing::AMQContentBody::AMQContentBody(){
+}
+
+qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){
+}
+
+u_int32_t qpid::framing::AMQContentBody::size() const{
+ return data.size();
+}
+void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{
+ buffer.putRawData(data);
+}
+void qpid::framing::AMQContentBody::decode(Buffer& buffer, u_int32_t _size){
+ buffer.getRawData(data, _size);
+}
+
+void qpid::framing::AMQContentBody::print(std::ostream& out) const
+{
+ out << "content (" << size() << " bytes)";
+}
diff --git a/cpp/lib/common/framing/AMQContentBody.h b/cpp/lib/common/framing/AMQContentBody.h
new file mode 100644
index 0000000000..172228671a
--- /dev/null
+++ b/cpp/lib/common/framing/AMQContentBody.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+
+#ifndef _AMQContentBody_
+#define _AMQContentBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQContentBody : virtual public AMQBody
+{
+ string data;
+
+public:
+ typedef boost::shared_ptr<AMQContentBody> shared_ptr;
+
+ AMQContentBody();
+ AMQContentBody(const string& data);
+ inline virtual ~AMQContentBody(){}
+ inline u_int8_t type() const { return CONTENT_BODY; };
+ inline string& getData(){ return data; }
+ u_int32_t size() const;
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, u_int32_t size);
+ void print(std::ostream& out) const;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQDataBlock.h b/cpp/lib/common/framing/AMQDataBlock.h
new file mode 100644
index 0000000000..ac91c52164
--- /dev/null
+++ b/cpp/lib/common/framing/AMQDataBlock.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Buffer.h>
+
+#ifndef _AMQDataBlock_
+#define _AMQDataBlock_
+
+namespace qpid {
+namespace framing {
+
+class AMQDataBlock
+{
+public:
+ virtual ~AMQDataBlock() {}
+ virtual void encode(Buffer& buffer) = 0;
+ virtual bool decode(Buffer& buffer) = 0;
+ virtual u_int32_t size() const = 0;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
new file mode 100644
index 0000000000..ca8bf41758
--- /dev/null
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -0,0 +1,138 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQFrame.h>
+#include <QpidError.h>
+
+using namespace qpid::framing;
+
+// This only works as a static as the version is currently fixed to 8.0
+// TODO: When the class is version-aware this will need to change
+AMQP_MethodVersionMap AMQFrame::versionMap(8,0);
+
+// AMQP version management change - kpvdr 2-11-17
+// TODO: Make this class version-aware
+AMQFrame::AMQFrame() {}
+
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware
+AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) :
+channel(_channel), body(_body)
+{}
+
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware
+AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) :
+channel(_channel), body(_body)
+{}
+
+AMQFrame::~AMQFrame() {}
+
+u_int16_t AMQFrame::getChannel(){
+ return channel;
+}
+
+AMQBody::shared_ptr& AMQFrame::getBody(){
+ return body;
+}
+
+void AMQFrame::encode(Buffer& buffer)
+{
+ buffer.putOctet(body->type());
+ buffer.putShort(channel);
+ buffer.putLong(body->size());
+ body->encode(buffer);
+ buffer.putOctet(0xCE);
+}
+
+AMQBody::shared_ptr AMQFrame::createMethodBody(Buffer& buffer){
+ u_int16_t classId = buffer.getShort();
+ u_int16_t methodId = buffer.getShort();
+ // AMQP version management change - kpvdr 2006-11-16
+ // TODO: Make this class version-aware and link these hard-wired numbers to that version
+ AMQBody::shared_ptr body(versionMap.createMethodBody(classId, methodId, 8, 0));
+ // Origianl stmt:
+ // AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId));
+ return body;
+}
+
+u_int32_t AMQFrame::size() const{
+ if(!body.get()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to get size of frame with no body set!");
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() + 1/*0xCE*/;
+}
+
+bool AMQFrame::decode(Buffer& buffer)
+{
+ if(buffer.available() < 7) return false;
+ buffer.record();
+ u_int32_t bufSize = decodeHead(buffer);
+
+ if(buffer.available() < bufSize + 1){
+ buffer.restore();
+ return false;
+ }
+ decodeBody(buffer, bufSize);
+ u_int8_t end = buffer.getOctet();
+ if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+ return true;
+}
+
+u_int32_t AMQFrame::decodeHead(Buffer& buffer){
+ type = buffer.getOctet();
+ channel = buffer.getShort();
+ return buffer.getLong();
+}
+
+void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize)
+{
+ switch(type)
+ {
+ case METHOD_BODY:
+ body = createMethodBody(buffer);
+ break;
+ case HEADER_BODY:
+ body = AMQBody::shared_ptr(new AMQHeaderBody());
+ break;
+ case CONTENT_BODY:
+ body = AMQBody::shared_ptr(new AMQContentBody());
+ break;
+ case HEARTBEAT_BODY:
+ body = AMQBody::shared_ptr(new AMQHeartbeatBody());
+ break;
+ default:
+ string msg("Unknown body type: ");
+ msg += type;
+ THROW_QPID_ERROR(FRAMING_ERROR, msg);
+ }
+ body->decode(buffer, bufSize);
+}
+
+std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t)
+{
+ out << "Frame[channel=" << t.channel << "; ";
+ if (t.body.get() == 0)
+ out << "empty";
+ else
+ out << *t.body;
+ out << "]";
+ return out;
+}
+
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
new file mode 100644
index 0000000000..bec1946fb7
--- /dev/null
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*#include <qpid/framing/amqp_methods.h>*/
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <AMQDataBlock.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
+#include <AMQP_MethodVersionMap.h>
+#include <Buffer.h>
+
+#ifndef _AMQFrame_
+#define _AMQFrame_
+
+namespace qpid {
+ namespace framing {
+
+ class AMQFrame : virtual public AMQDataBlock
+ {
+ static AMQP_MethodVersionMap versionMap;
+
+ u_int16_t channel;
+ u_int8_t type;//used if the body is decoded separately from the 'head'
+ AMQBody::shared_ptr body;
+ AMQBody::shared_ptr createMethodBody(Buffer& buffer);
+
+ public:
+ AMQFrame();
+ AMQFrame(u_int16_t channel, AMQBody* body);
+ AMQFrame(u_int16_t channel, AMQBody::shared_ptr& body);
+ virtual ~AMQFrame();
+ virtual void encode(Buffer& buffer);
+ virtual bool decode(Buffer& buffer);
+ virtual u_int32_t size() const;
+ u_int16_t getChannel();
+ AMQBody::shared_ptr& getBody();
+
+ u_int32_t decodeHead(Buffer& buffer);
+ void decodeBody(Buffer& buffer, uint32_t size);
+
+ friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body);
+ };
+
+ }
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQHeaderBody.cpp b/cpp/lib/common/framing/AMQHeaderBody.cpp
new file mode 100644
index 0000000000..3653073f29
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeaderBody.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQHeaderBody.h>
+#include <QpidError.h>
+#include <BasicHeaderProperties.h>
+
+qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){
+ createProperties(classId);
+}
+
+qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){
+}
+
+qpid::framing::AMQHeaderBody::~AMQHeaderBody(){
+ delete properties;
+}
+
+u_int32_t qpid::framing::AMQHeaderBody::size() const{
+ return 12 + properties->size();
+}
+
+void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
+ buffer.putShort(properties->classId());
+ buffer.putShort(weight);
+ buffer.putLongLong(contentSize);
+ properties->encode(buffer);
+}
+
+void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, u_int32_t bufSize){
+ u_int16_t classId = buffer.getShort();
+ weight = buffer.getShort();
+ contentSize = buffer.getLongLong();
+ createProperties(classId);
+ properties->decode(buffer, bufSize - 12);
+}
+
+void qpid::framing::AMQHeaderBody::createProperties(int classId){
+ switch(classId){
+ case BASIC:
+ properties = new qpid::framing::BasicHeaderProperties();
+ break;
+ default:
+ THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class");
+ }
+}
+
+void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
+{
+ out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
+ const BasicHeaderProperties* props =
+ dynamic_cast<const BasicHeaderProperties*>(getProperties());
+ if (props) {
+ out << ", message_id=" << props->getMessageId();
+ out << ", delivery_mode=" << (int) props->getDeliveryMode();
+ out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders();
+ }
+}
diff --git a/cpp/lib/common/framing/AMQHeaderBody.h b/cpp/lib/common/framing/AMQHeaderBody.h
new file mode 100644
index 0000000000..31cf7d575e
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeaderBody.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+#include <HeaderProperties.h>
+
+#ifndef _AMQHeaderBody_
+#define _AMQHeaderBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQHeaderBody : virtual public AMQBody
+{
+ HeaderProperties* properties;
+ u_int16_t weight;
+ u_int64_t contentSize;
+
+ void createProperties(int classId);
+public:
+ typedef boost::shared_ptr<AMQHeaderBody> shared_ptr;
+
+ AMQHeaderBody(int classId);
+ AMQHeaderBody();
+ inline u_int8_t type() const { return HEADER_BODY; }
+ HeaderProperties* getProperties(){ return properties; }
+ const HeaderProperties* getProperties() const { return properties; }
+ inline u_int64_t getContentSize() const { return contentSize; }
+ inline void setContentSize(u_int64_t _size) { contentSize = _size; }
+ virtual ~AMQHeaderBody();
+ virtual u_int32_t size() const;
+ virtual void encode(Buffer& buffer) const;
+ virtual void decode(Buffer& buffer, u_int32_t size);
+ virtual void print(std::ostream& out) const;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.cpp b/cpp/lib/common/framing/AMQHeartbeatBody.cpp
new file mode 100644
index 0000000000..63f83a3d29
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeartbeatBody.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <AMQHeartbeatBody.h>
+#include <iostream>
+
+qpid::framing::AMQHeartbeatBody::~AMQHeartbeatBody() {}
+
+void qpid::framing::AMQHeartbeatBody::print(std::ostream& out) const {
+ out << "heartbeat";
+}
diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.h b/cpp/lib/common/framing/AMQHeartbeatBody.h
new file mode 100644
index 0000000000..a2315119e4
--- /dev/null
+++ b/cpp/lib/common/framing/AMQHeartbeatBody.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+
+#ifndef _AMQHeartbeatBody_
+#define _AMQHeartbeatBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQHeartbeatBody : virtual public AMQBody
+{
+public:
+ typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr;
+
+ virtual ~AMQHeartbeatBody();
+ inline u_int32_t size() const { return 0; }
+ inline u_int8_t type() const { return HEARTBEAT_BODY; }
+ inline void encode(Buffer& ) const {}
+ inline void decode(Buffer& , u_int32_t /*size*/) {}
+ virtual void print(std::ostream& out) const;
+};
+
+}
+}
+
+#endif
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
new file mode 100644
index 0000000000..525310f3d4
--- /dev/null
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AMQMethodBody.h>
+#include <QpidError.h>
+
+void qpid::framing::AMQMethodBody::encode(Buffer& buffer) const{
+ buffer.putShort(amqpClassId());
+ buffer.putShort(amqpMethodId());
+ encodeContent(buffer);
+}
+
+void qpid::framing::AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/){
+ decodeContent(buffer);
+}
+
+bool qpid::framing::AMQMethodBody::match(AMQMethodBody* other) const{
+ return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId();
+}
+
+void qpid::framing::AMQMethodBody::invoke(AMQP_ServerOperations& /*target*/, u_int16_t /*channel*/){
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
+}
+
+
+std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQMethodBody& m){
+ m.print(out);
+ return out;
+}
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
new file mode 100644
index 0000000000..da25c7c545
--- /dev/null
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <amqp_types.h>
+#include <AMQBody.h>
+#include <Buffer.h>
+#include <AMQP_ServerOperations.h>
+
+#ifndef _AMQMethodBody_
+#define _AMQMethodBody_
+
+namespace qpid {
+namespace framing {
+
+class AMQMethodBody : virtual public AMQBody
+{
+public:
+ typedef boost::shared_ptr<AMQMethodBody> shared_ptr;
+
+ ProtocolVersion version;
+ inline u_int8_t type() const { return METHOD_BODY; }
+ inline u_int32_t size() const { return 4 + bodySize(); }
+ inline AMQMethodBody(u_int8_t major, u_int8_t minor) : version(major, minor) {}
+ inline AMQMethodBody(ProtocolVersion version) : version(version) {}
+ inline virtual ~AMQMethodBody() {}
+ virtual void print(std::ostream& out) const = 0;
+ virtual u_int16_t amqpMethodId() const = 0;
+ virtual u_int16_t amqpClassId() const = 0;
+ virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel);
+ virtual void encodeContent(Buffer& buffer) const = 0;
+ virtual void decodeContent(Buffer& buffer) = 0;
+ virtual u_int32_t bodySize() const = 0;
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, u_int32_t size);
+ bool match(AMQMethodBody* other) const;
+};
+
+std::ostream& operator<<(std::ostream& out, const AMQMethodBody& body);
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.cpp b/cpp/lib/common/framing/BasicHeaderProperties.cpp
new file mode 100644
index 0000000000..f673215536
--- /dev/null
+++ b/cpp/lib/common/framing/BasicHeaderProperties.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BasicHeaderProperties.h>
+
+//TODO: This could be easily generated from the spec
+
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
+
+u_int32_t qpid::framing::BasicHeaderProperties::size() const{
+ u_int32_t bytes = 2;//flags
+ if(contentType.length() > 0) bytes += contentType.length() + 1;
+ if(contentEncoding.length() > 0) bytes += contentEncoding.length() + 1;
+ if(headers.count() > 0) bytes += headers.size();
+ if(deliveryMode != 0) bytes += 1;
+ if(priority != 0) bytes += 1;
+ if(correlationId.length() > 0) bytes += correlationId.length() + 1;
+ if(replyTo.length() > 0) bytes += replyTo.length() + 1;
+ if(expiration.length() > 0) bytes += expiration.length() + 1;
+ if(messageId.length() > 0) bytes += messageId.length() + 1;
+ if(timestamp != 0) bytes += 8;
+ if(type.length() > 0) bytes += type.length() + 1;
+ if(userId.length() > 0) bytes += userId.length() + 1;
+ if(appId.length() > 0) bytes += appId.length() + 1;
+ if(clusterId.length() > 0) bytes += clusterId.length() + 1;
+
+ return bytes;
+}
+
+void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) const{
+ u_int16_t flags = getFlags();
+ buffer.putShort(flags);
+
+ if(contentType.length() > 0) buffer.putShortString(contentType);
+ if(contentEncoding.length() > 0) buffer.putShortString(contentEncoding);
+ if(headers.count() > 0) buffer.putFieldTable(headers);
+ if(deliveryMode != 0) buffer.putOctet(deliveryMode);
+ if(priority != 0) buffer.putOctet(priority);
+ if(correlationId.length() > 0) buffer.putShortString(correlationId);
+ if(replyTo.length() > 0) buffer.putShortString(replyTo);
+ if(expiration.length() > 0) buffer.putShortString(expiration);
+ if(messageId.length() > 0) buffer.putShortString(messageId);
+ if(timestamp != 0) buffer.putLongLong(timestamp);;
+ if(type.length() > 0) buffer.putShortString(type);
+ if(userId.length() > 0) buffer.putShortString(userId);
+ if(appId.length() > 0) buffer.putShortString(appId);
+ if(clusterId.length() > 0) buffer.putShortString(clusterId);
+}
+
+void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, u_int32_t /*size*/){
+ u_int16_t flags = buffer.getShort();
+ if(flags & (1 << 15)) buffer.getShortString(contentType);
+ if(flags & (1 << 14)) buffer.getShortString(contentEncoding);
+ if(flags & (1 << 13)) buffer.getFieldTable(headers);
+ if(flags & (1 << 12)) deliveryMode = buffer.getOctet();
+ if(flags & (1 << 11)) priority = buffer.getOctet();
+ if(flags & (1 << 10)) buffer.getShortString(correlationId);
+ if(flags & (1 << 9)) buffer.getShortString(replyTo);
+ if(flags & (1 << 8)) buffer.getShortString(expiration);
+ if(flags & (1 << 7)) buffer.getShortString(messageId);
+ if(flags & (1 << 6)) timestamp = buffer.getLongLong();
+ if(flags & (1 << 5)) buffer.getShortString(type);
+ if(flags & (1 << 4)) buffer.getShortString(userId);
+ if(flags & (1 << 3)) buffer.getShortString(appId);
+ if(flags & (1 << 2)) buffer.getShortString(clusterId);
+}
+
+u_int16_t qpid::framing::BasicHeaderProperties::getFlags() const{
+ u_int16_t flags(0);
+ if(contentType.length() > 0) flags |= (1 << 15);
+ if(contentEncoding.length() > 0) flags |= (1 << 14);
+ if(headers.count() > 0) flags |= (1 << 13);
+ if(deliveryMode != 0) flags |= (1 << 12);
+ if(priority != 0) flags |= (1 << 11);
+ if(correlationId.length() > 0) flags |= (1 << 10);
+ if(replyTo.length() > 0) flags |= (1 << 9);
+ if(expiration.length() > 0) flags |= (1 << 8);
+ if(messageId.length() > 0) flags |= (1 << 7);
+ if(timestamp != 0) flags |= (1 << 6);
+ if(type.length() > 0) flags |= (1 << 5);
+ if(userId.length() > 0) flags |= (1 << 4);
+ if(appId.length() > 0) flags |= (1 << 3);
+ if(clusterId.length() > 0) flags |= (1 << 2);
+ return flags;
+}
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.h b/cpp/lib/common/framing/BasicHeaderProperties.h
new file mode 100644
index 0000000000..bcd81b4776
--- /dev/null
+++ b/cpp/lib/common/framing/BasicHeaderProperties.h
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <Buffer.h>
+#include <FieldTable.h>
+#include <HeaderProperties.h>
+
+#ifndef _BasicHeaderProperties_
+#define _BasicHeaderProperties_
+
+namespace qpid {
+namespace framing {
+ enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2};
+
+ //TODO: This could be easily generated from the spec
+ class BasicHeaderProperties : public HeaderProperties
+ {
+ string contentType;
+ string contentEncoding;
+ FieldTable headers;
+ u_int8_t deliveryMode;
+ u_int8_t priority;
+ string correlationId;
+ string replyTo;
+ string expiration;
+ string messageId;
+ u_int64_t timestamp;
+ string type;
+ string userId;
+ string appId;
+ string clusterId;
+
+ u_int16_t getFlags() const;
+
+ public:
+ BasicHeaderProperties();
+ virtual ~BasicHeaderProperties();
+ virtual u_int32_t size() const;
+ virtual void encode(Buffer& buffer) const;
+ virtual void decode(Buffer& buffer, u_int32_t size);
+
+ inline virtual u_int8_t classId() { return BASIC; }
+
+ inline const string& getContentType() const { return contentType; }
+ inline const string& getContentEncoding() const { return contentEncoding; }
+ inline FieldTable& getHeaders() { return headers; }
+ inline u_int8_t getDeliveryMode() const { return deliveryMode; }
+ inline u_int8_t getPriority() const { return priority; }
+ inline const string& getCorrelationId() const {return correlationId; }
+ inline const string& getReplyTo() const { return replyTo; }
+ inline const string& getExpiration() const { return expiration; }
+ inline const string& getMessageId() const {return messageId; }
+ inline u_int64_t getTimestamp() const { return timestamp; }
+ inline const string& getType() const { return type; }
+ inline const string& getUserId() const { return userId; }
+ inline const string& getAppId() const { return appId; }
+ inline const string& getClusterId() const { return clusterId; }
+
+ void inline setContentType(const string& _type){ contentType = _type; }
+ void inline setContentEncoding(const string& encoding){ contentEncoding = encoding; }
+ void inline setHeaders(const FieldTable& _headers){ headers = _headers; }
+ void inline setDeliveryMode(u_int8_t mode){ deliveryMode = mode; }
+ void inline setPriority(u_int8_t _priority){ priority = _priority; }
+ void inline setCorrelationId(const string& _correlationId){ correlationId = _correlationId; }
+ void inline setReplyTo(const string& _replyTo){ replyTo = _replyTo;}
+ void inline setExpiration(const string& _expiration){ expiration = _expiration; }
+ void inline setMessageId(const string& _messageId){ messageId = _messageId; }
+ void inline setTimestamp(u_int64_t _timestamp){ timestamp = _timestamp; }
+ void inline setType(const string& _type){ type = _type; }
+ void inline setUserId(const string& _userId){ userId = _userId; }
+ void inline setAppId(const string& _appId){appId = _appId; }
+ void inline setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp
new file mode 100644
index 0000000000..8ccfb222df
--- /dev/null
+++ b/cpp/lib/common/framing/BodyHandler.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+#include <BodyHandler.h>
+
+using namespace qpid::framing;
+using namespace boost;
+
+BodyHandler::~BodyHandler() {}
+
+void BodyHandler::handleBody(AMQBody::shared_ptr& body){
+
+ switch(body->type())
+ {
+
+ case METHOD_BODY:
+ handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body));
+ break;
+
+ case HEADER_BODY:
+ handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ break;
+
+ default:
+ throw UnknownBodyType(body->type());
+ }
+
+}
diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h
new file mode 100644
index 0000000000..3923258d1c
--- /dev/null
+++ b/cpp/lib/common/framing/BodyHandler.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _BodyHandler_
+#define _BodyHandler_
+
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
+
+namespace qpid {
+namespace framing {
+
+ class BodyHandler{
+ public:
+ virtual ~BodyHandler();
+ virtual void handleMethod(AMQMethodBody::shared_ptr body) = 0;
+ virtual void handleHeader(AMQHeaderBody::shared_ptr body) = 0;
+ virtual void handleContent(AMQContentBody::shared_ptr body) = 0;
+ virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0;
+
+ void handleBody(AMQBody::shared_ptr& body);
+ };
+
+ class UnknownBodyType{
+ public:
+ const u_int16_t type;
+ inline UnknownBodyType(u_int16_t _type) : type(_type){}
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/Buffer.cpp b/cpp/lib/common/framing/Buffer.cpp
new file mode 100644
index 0000000000..f25b2a47c0
--- /dev/null
+++ b/cpp/lib/common/framing/Buffer.cpp
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Buffer.h>
+#include <FieldTable.h>
+
+qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), owner(true), position(0), limit(_size){
+ data = new char[size];
+}
+
+qpid::framing::Buffer::Buffer(char* _data, u_int32_t _size) : size(_size), owner(false), data(_data), position(0), limit(_size){
+}
+
+qpid::framing::Buffer::~Buffer(){
+ if(owner) delete[] data;
+}
+
+void qpid::framing::Buffer::flip(){
+ limit = position;
+ position = 0;
+}
+
+void qpid::framing::Buffer::clear(){
+ limit = size;
+ position = 0;
+}
+
+void qpid::framing::Buffer::compact(){
+ u_int32_t p = limit - position;
+ //copy p chars from position to 0
+ memmove(data, data + position, p);
+ limit = size;
+ position = p;
+}
+
+void qpid::framing::Buffer::record(){
+ r_position = position;
+ r_limit = limit;
+}
+
+void qpid::framing::Buffer::restore(){
+ position = r_position;
+ limit = r_limit;
+}
+
+u_int32_t qpid::framing::Buffer::available(){
+ return limit - position;
+}
+
+char* qpid::framing::Buffer::start(){
+ return data + position;
+}
+
+void qpid::framing::Buffer::move(u_int32_t bytes){
+ position += bytes;
+}
+
+void qpid::framing::Buffer::putOctet(u_int8_t i){
+ data[position++] = i;
+}
+
+void qpid::framing::Buffer::putShort(u_int16_t i){
+ u_int16_t b = i;
+ data[position++] = (u_int8_t) (0xFF & (b >> 8));
+ data[position++] = (u_int8_t) (0xFF & b);
+}
+
+void qpid::framing::Buffer::putLong(u_int32_t i){
+ u_int32_t b = i;
+ data[position++] = (u_int8_t) (0xFF & (b >> 24));
+ data[position++] = (u_int8_t) (0xFF & (b >> 16));
+ data[position++] = (u_int8_t) (0xFF & (b >> 8));
+ data[position++] = (u_int8_t) (0xFF & b);
+}
+
+void qpid::framing::Buffer::putLongLong(u_int64_t i){
+ u_int32_t hi = i >> 32;
+ u_int32_t lo = i;
+ putLong(hi);
+ putLong(lo);
+}
+
+u_int8_t qpid::framing::Buffer::getOctet(){
+ return (u_int8_t) data[position++];
+}
+
+u_int16_t qpid::framing::Buffer::getShort(){
+ u_int16_t hi = (unsigned char) data[position++];
+ hi = hi << 8;
+ hi |= (unsigned char) data[position++];
+ return hi;
+}
+
+u_int32_t qpid::framing::Buffer::getLong(){
+ u_int32_t a = (unsigned char) data[position++];
+ u_int32_t b = (unsigned char) data[position++];
+ u_int32_t c = (unsigned char) data[position++];
+ u_int32_t d = (unsigned char) data[position++];
+ a = a << 24;
+ a |= b << 16;
+ a |= c << 8;
+ a |= d;
+ return a;
+}
+
+u_int64_t qpid::framing::Buffer::getLongLong(){
+ u_int64_t hi = getLong();
+ u_int64_t lo = getLong();
+ hi = hi << 32;
+ return hi | lo;
+}
+
+
+void qpid::framing::Buffer::putShortString(const string& s){
+ u_int8_t len = s.length();
+ putOctet(len);
+ s.copy(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::putLongString(const string& s){
+ u_int32_t len = s.length();
+ putLong(len);
+ s.copy(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::getShortString(string& s){
+ u_int8_t len = getOctet();
+ s.assign(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::getLongString(string& s){
+ u_int32_t len = getLong();
+ s.assign(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::putFieldTable(const FieldTable& t){
+ t.encode(*this);
+}
+
+void qpid::framing::Buffer::getFieldTable(FieldTable& t){
+ t.decode(*this);
+}
+
+void qpid::framing::Buffer::putRawData(const string& s){
+ u_int32_t len = s.length();
+ s.copy(data + position, len);
+ position += len;
+}
+
+void qpid::framing::Buffer::getRawData(string& s, u_int32_t len){
+ s.assign(data + position, len);
+ position += len;
+}
diff --git a/cpp/lib/common/framing/Buffer.h b/cpp/lib/common/framing/Buffer.h
new file mode 100644
index 0000000000..92a25ee4c8
--- /dev/null
+++ b/cpp/lib/common/framing/Buffer.h
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+
+#ifndef _Buffer_
+#define _Buffer_
+
+namespace qpid {
+namespace framing {
+
+class FieldTable;
+
+class Buffer
+{
+ const u_int32_t size;
+ const bool owner;//indicates whether the data is owned by this instance
+ char* data;
+ u_int32_t position;
+ u_int32_t limit;
+ u_int32_t r_position;
+ u_int32_t r_limit;
+
+public:
+
+ Buffer(u_int32_t size);
+ Buffer(char* data, u_int32_t size);
+ ~Buffer();
+
+ void flip();
+ void clear();
+ void compact();
+ void record();
+ void restore();
+ u_int32_t available();
+ char* start();
+ void move(u_int32_t bytes);
+
+ void putOctet(u_int8_t i);
+ void putShort(u_int16_t i);
+ void putLong(u_int32_t i);
+ void putLongLong(u_int64_t i);
+
+ u_int8_t getOctet();
+ u_int16_t getShort();
+ u_int32_t getLong();
+ u_int64_t getLongLong();
+
+ void putShortString(const string& s);
+ void putLongString(const string& s);
+ void getShortString(string& s);
+ void getLongString(string& s);
+
+ void putFieldTable(const FieldTable& t);
+ void getFieldTable(FieldTable& t);
+
+ void putRawData(const string& s);
+ void getRawData(string& s, u_int32_t size);
+
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/FieldTable.cpp b/cpp/lib/common/framing/FieldTable.cpp
new file mode 100644
index 0000000000..cf16e87272
--- /dev/null
+++ b/cpp/lib/common/framing/FieldTable.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <FieldTable.h>
+#include <QpidError.h>
+#include <Buffer.h>
+#include <Value.h>
+#include <assert.h>
+
+namespace qpid {
+namespace framing {
+
+FieldTable::~FieldTable() {}
+
+u_int32_t FieldTable::size() const {
+ u_int32_t len(4);
+ for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ // 2 = shortstr_len_byyte + type_char_byte
+ len += 2 + (i->first).size() + (i->second)->size();
+ }
+ return len;
+}
+
+int FieldTable::count() const {
+ return values.size();
+}
+
+namespace
+{
+std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) {
+ return out << i.first << ":" << *i.second;
+}
+}
+
+std::ostream& operator<<(std::ostream& out, const FieldTable& t) {
+ out << "{";
+ FieldTable::ValueMap::const_iterator i = t.getMap().begin();
+ if (i != t.getMap().end()) out << *i++;
+ while (i != t.getMap().end())
+ {
+ out << "," << *i++;
+ }
+ return out << "}";
+}
+
+void FieldTable::setString(const std::string& name, const std::string& value){
+ values[name] = ValuePtr(new StringValue(value));
+}
+
+void FieldTable::setInt(const std::string& name, int value){
+ values[name] = ValuePtr(new IntegerValue(value));
+}
+
+void FieldTable::setTimestamp(const std::string& name, u_int64_t value){
+ values[name] = ValuePtr(new TimeValue(value));
+}
+
+void FieldTable::setTable(const std::string& name, const FieldTable& value){
+ values[name] = ValuePtr(new FieldTableValue(value));
+}
+
+namespace {
+template <class T> T default_value() { return T(); }
+template <> int default_value<int>() { return 0; }
+template <> u_int64_t default_value<u_int64_t>() { return 0; }
+}
+
+template <class T>
+T FieldTable::getValue(const std::string& name) const
+{
+ ValueMap::const_iterator i = values.find(name);
+ if (i == values.end()) return default_value<T>();
+ const ValueOps<T> *vt = dynamic_cast<const ValueOps<T>*>(i->second.get());
+ return vt->getValue();
+}
+
+std::string FieldTable::getString(const std::string& name) const {
+ return getValue<std::string>(name);
+}
+
+int FieldTable::getInt(const std::string& name) const {
+ return getValue<int>(name);
+}
+
+u_int64_t FieldTable::getTimestamp(const std::string& name) const {
+ return getValue<u_int64_t>(name);
+}
+
+void FieldTable::getTable(const std::string& name, FieldTable& value) const {
+ value = getValue<FieldTable>(name);
+}
+
+void FieldTable::encode(Buffer& buffer) const{
+ buffer.putLong(size() - 4);
+ for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
+ buffer.putShortString(i->first);
+ buffer.putOctet(i->second->getType());
+ i->second->encode(buffer);
+ }
+}
+
+void FieldTable::decode(Buffer& buffer){
+ u_int32_t len = buffer.getLong();
+ u_int32_t available = buffer.available();
+ if (available < len)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table.");
+ u_int32_t leftover = available - len;
+ while(buffer.available() > leftover){
+ std::string name;
+ buffer.getShortString(name);
+ std::auto_ptr<Value> value(Value::decode_value(buffer));
+ values[name] = ValuePtr(value.release());
+ }
+}
+
+
+bool FieldTable::operator==(const FieldTable& x) const {
+ if (values.size() != x.values.size()) return false;
+ for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ ValueMap::const_iterator j = x.values.find(i->first);
+ if (j == x.values.end()) return false;
+ if (*(i->second) != *(j->second)) return false;
+ }
+ return true;
+}
+
+void FieldTable::erase(const std::string& name)
+{
+ values.erase(values.find(name));
+}
+
+}
+}
diff --git a/cpp/lib/common/framing/FieldTable.h b/cpp/lib/common/framing/FieldTable.h
new file mode 100644
index 0000000000..3d09cdec64
--- /dev/null
+++ b/cpp/lib/common/framing/FieldTable.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <amqp_types.h>
+
+#ifndef _FieldTable_
+#define _FieldTable_
+
+namespace qpid {
+namespace framing {
+
+class Value;
+class Buffer;
+
+class FieldTable
+{
+ public:
+ typedef boost::shared_ptr<Value> ValuePtr;
+ typedef std::map<std::string, ValuePtr> ValueMap;
+
+ ~FieldTable();
+ u_int32_t size() const;
+ int count() const;
+ void setString(const std::string& name, const std::string& value);
+ void setInt(const std::string& name, int value);
+ void setTimestamp(const std::string& name, u_int64_t value);
+ void setTable(const std::string& name, const FieldTable& value);
+ //void setDecimal(string& name, xxx& value);
+ std::string getString(const std::string& name) const;
+ int getInt(const std::string& name) const;
+ u_int64_t getTimestamp(const std::string& name) const;
+ void getTable(const std::string& name, FieldTable& value) const;
+ //void getDecimal(string& name, xxx& value);
+ void erase(const std::string& name);
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+
+ bool operator==(const FieldTable& other) const;
+
+ // TODO aconway 2006-09-26: Yeuch! Rework FieldTable to have
+ // a map-like interface.
+ const ValueMap& getMap() const { return values; }
+ ValueMap& getMap() { return values; }
+
+ private:
+ friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
+ ValueMap values;
+ template<class T> T getValue(const std::string& name) const;
+};
+
+class FieldNotFoundException{};
+class UnknownFieldName : public FieldNotFoundException{};
+class IncorrectFieldType : public FieldNotFoundException{};
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/HeaderProperties.h b/cpp/lib/common/framing/HeaderProperties.h
new file mode 100644
index 0000000000..7a8c65549d
--- /dev/null
+++ b/cpp/lib/common/framing/HeaderProperties.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <Buffer.h>
+
+#ifndef _HeaderProperties_
+#define _HeaderProperties_
+
+namespace qpid {
+namespace framing {
+
+ enum header_classes{BASIC = 60};
+
+ class HeaderProperties
+ {
+
+ public:
+ inline virtual ~HeaderProperties(){}
+ virtual u_int8_t classId() = 0;
+ virtual u_int32_t size() const = 0;
+ virtual void encode(Buffer& buffer) const = 0;
+ virtual void decode(Buffer& buffer, u_int32_t size) = 0;
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/InitiationHandler.cpp b/cpp/lib/common/framing/InitiationHandler.cpp
new file mode 100644
index 0000000000..dd92c9859b
--- /dev/null
+++ b/cpp/lib/common/framing/InitiationHandler.cpp
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <InitiationHandler.h>
+
+qpid::framing::InitiationHandler::~InitiationHandler() {}
diff --git a/cpp/lib/common/framing/InitiationHandler.h b/cpp/lib/common/framing/InitiationHandler.h
new file mode 100644
index 0000000000..d94fc58d2c
--- /dev/null
+++ b/cpp/lib/common/framing/InitiationHandler.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+#ifndef _InitiationHandler_
+#define _InitiationHandler_
+
+#include <ProtocolInitiation.h>
+
+namespace qpid {
+namespace framing {
+
+ class InitiationHandler{
+ public:
+ virtual ~InitiationHandler();
+ virtual void initiated(ProtocolInitiation* header) = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/InputHandler.h b/cpp/lib/common/framing/InputHandler.h
new file mode 100644
index 0000000000..4e2d4bcc9b
--- /dev/null
+++ b/cpp/lib/common/framing/InputHandler.h
@@ -0,0 +1,39 @@
+#ifndef _InputHandler_
+#define _InputHandler_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <AMQFrame.h>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace framing {
+
+class InputHandler : private boost::noncopyable {
+ public:
+ virtual ~InputHandler() {}
+ virtual void received(AMQFrame* frame) = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h
new file mode 100644
index 0000000000..2e01e34df2
--- /dev/null
+++ b/cpp/lib/common/framing/OutputHandler.h
@@ -0,0 +1,39 @@
+#ifndef _OutputHandler_
+#define _OutputHandler_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <AMQFrame.h>
+
+namespace qpid {
+namespace framing {
+
+class OutputHandler : private boost::noncopyable {
+ public:
+ virtual ~OutputHandler() {}
+ virtual void send(AMQFrame* frame) = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp
new file mode 100644
index 0000000000..471f736a7d
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolInitiation.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ProtocolInitiation.h>
+
+qpid::framing::ProtocolInitiation::ProtocolInitiation(){}
+
+qpid::framing::ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : version(_major, _minor) {}
+
+qpid::framing::ProtocolInitiation::ProtocolInitiation(const qpid::framing::ProtocolVersion& p) : version(p) {}
+
+qpid::framing::ProtocolInitiation::~ProtocolInitiation(){}
+
+void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){
+ buffer.putOctet('A');
+ buffer.putOctet('M');
+ buffer.putOctet('Q');
+ buffer.putOctet('P');
+ buffer.putOctet(1);//class
+ buffer.putOctet(1);//instance
+ buffer.putOctet(version.getMajor());
+ buffer.putOctet(version.getMinor());
+}
+
+bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){
+ if(buffer.available() >= 8){
+ buffer.getOctet();//A
+ buffer.getOctet();//M
+ buffer.getOctet();//Q
+ buffer.getOctet();//P
+ buffer.getOctet();//class
+ buffer.getOctet();//instance
+ version.setMajor(buffer.getOctet());
+ version.setMinor(buffer.getOctet());
+ return true;
+ }else{
+ return false;
+ }
+}
+
+//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date
diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h
new file mode 100644
index 0000000000..003c3bba81
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolInitiation.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <Buffer.h>
+#include <AMQDataBlock.h>
+#include <ProtocolVersion.h>
+
+#ifndef _ProtocolInitiation_
+#define _ProtocolInitiation_
+
+namespace qpid {
+namespace framing {
+
+class ProtocolInitiation : virtual public AMQDataBlock
+{
+private:
+ ProtocolVersion version;
+
+public:
+ ProtocolInitiation();
+ ProtocolInitiation(u_int8_t major, u_int8_t minor);
+ ProtocolInitiation(const ProtocolVersion& p);
+ virtual ~ProtocolInitiation();
+ virtual void encode(Buffer& buffer);
+ virtual bool decode(Buffer& buffer);
+ inline virtual u_int32_t size() const { return 8; }
+ inline u_int8_t getMajor() const { return version.getMajor(); }
+ inline u_int8_t getMinor() const { return version.getMinor(); }
+ inline const ProtocolVersion& getVersion() const { return version; }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp
new file mode 100644
index 0000000000..69ff89ec32
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersion.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ProtocolVersion.h>
+#include <sstream>
+
+using namespace qpid::framing;
+
+ProtocolVersion::ProtocolVersion() {}
+
+ProtocolVersion::ProtocolVersion(u_int8_t _major, u_int8_t _minor) :
+ major_(_major),
+ minor_(_minor)
+{}
+
+ProtocolVersion::ProtocolVersion(const ProtocolVersion::ProtocolVersion& p):
+ major_(p.major_),
+ minor_(p.minor_)
+{}
+
+ProtocolVersion::~ProtocolVersion()
+{}
+
+bool ProtocolVersion::equals(u_int8_t _major, u_int8_t _minor) const
+{
+ return major_ == _major && minor_ == _minor;
+}
+
+bool ProtocolVersion::equals(const ProtocolVersion::ProtocolVersion& p) const
+{
+ return major_ == p.major_ && minor_ == p.minor_;
+}
+
+const std::string ProtocolVersion::toString() const
+{
+ std::stringstream ss;
+ ss << major_ << "-" << minor_;
+ return ss.str();
+}
+
+ProtocolVersion::ProtocolVersion ProtocolVersion::operator=(const ProtocolVersion& p)
+{
+ major_ = p.major_;
+ minor_ = p.minor_;
+ return *this;
+}
+
diff --git a/cpp/lib/common/framing/ProtocolVersion.h b/cpp/lib/common/framing/ProtocolVersion.h
new file mode 100644
index 0000000000..331cf53555
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersion.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ProtocolVersion_
+#define _ProtocolVersion_
+
+#include <amqp_types.h>
+
+namespace qpid
+{
+namespace framing
+{
+
+class ProtocolVersion
+{
+private:
+ u_int8_t major_;
+ u_int8_t minor_;
+
+public:
+ ProtocolVersion();
+ ProtocolVersion(u_int8_t _major, u_int8_t _minor);
+ ProtocolVersion(const ProtocolVersion& p);
+ virtual ~ProtocolVersion();
+
+ inline u_int8_t getMajor() const { return major_; }
+ inline void setMajor(u_int8_t major) { major_ = major; }
+ inline u_int8_t getMinor() const { return minor_; }
+ inline void setMinor(u_int8_t minor) { minor_ = minor; }
+ virtual bool equals(u_int8_t _major, u_int8_t _minor) const;
+ virtual bool equals(const ProtocolVersion& p) const;
+ virtual const std::string toString() const;
+ ProtocolVersion operator=(const ProtocolVersion& p);
+};
+
+} // namespace framing
+} // namespace qpid
+
+
+#endif // ifndef _ProtocolVersion_
diff --git a/cpp/lib/common/framing/ProtocolVersionException.cpp b/cpp/lib/common/framing/ProtocolVersionException.cpp
new file mode 100644
index 0000000000..8249a88f4b
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersionException.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ProtocolVersionException.h>
+#include <sstream>
+
+using namespace qpid::framing;
+
+ProtocolVersionException::ProtocolVersionException() throw ()
+{
+}
+
+ProtocolVersionException::ProtocolVersionException(const std::string& str) throw () : Exception(str)
+{
+}
+
+ProtocolVersionException::ProtocolVersionException(const char* str) throw () : Exception(str)
+{
+}
+
+ProtocolVersionException::ProtocolVersionException(const ProtocolVersion& versionFound_, const std::string& str) throw () : Exception(str)
+
+{
+ versionFound = versionFound_;
+}
+
+ProtocolVersionException::ProtocolVersionException(const ProtocolVersion& versionFound_, const char* str) throw () : Exception(str)
+
+{
+ versionFound = versionFound_;
+}
+
+ProtocolVersionException::~ProtocolVersionException() throw ()
+{
+}
+
+const char* ProtocolVersionException::what() const throw()
+{
+ std::stringstream ss;
+ ss << "ProtocolVersionException: AMQP Version " << versionFound.toString() << " found: " << whatStr;
+ return ss.str().c_str();
+}
+
+std::string ProtocolVersionException::toString() const throw()
+{
+ std::stringstream ss;
+ ss << "ProtocolVersionException: AMQP Version " << versionFound.toString() << " found: " << whatStr;
+ return ss.str();
+}
diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h
new file mode 100644
index 0000000000..4494d87064
--- /dev/null
+++ b/cpp/lib/common/framing/ProtocolVersionException.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ProtocolVersionException_
+#define _ProtocolVersionException_
+
+#include <Exception.h>
+#include <ProtocolVersion.h>
+#include <string>
+#include <vector>
+
+namespace qpid
+{
+namespace framing
+{
+
+class ProtocolVersionException : virtual public qpid::Exception
+{
+protected:
+ ProtocolVersion versionFound;
+
+public:
+ ProtocolVersionException() throw ();
+ ProtocolVersionException(const std::string& str) throw ();
+ ProtocolVersionException(const char* str) throw ();
+ ProtocolVersionException(const ProtocolVersion& versionFound_, const std::string& str) throw ();
+ ProtocolVersionException(const ProtocolVersion& versionFound_, const char* str) throw ();
+ virtual ~ProtocolVersionException() throw ();
+
+ virtual const char* what() const throw();
+ virtual std::string toString() const throw();
+}; // class ProtocolVersionException
+
+} // namespace framing
+} // namespace qpid
+
+#endif //ifndef _ProtocolVersionException_
diff --git a/cpp/lib/common/framing/Value.cpp b/cpp/lib/common/framing/Value.cpp
new file mode 100644
index 0000000000..fc087043e5
--- /dev/null
+++ b/cpp/lib/common/framing/Value.cpp
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Value.h>
+#include <Buffer.h>
+#include <FieldTable.h>
+#include <QpidError.h>
+
+namespace qpid {
+namespace framing {
+
+Value::~Value() {}
+
+void StringValue::encode(Buffer& buffer){
+ buffer.putLongString(value);
+}
+void StringValue::decode(Buffer& buffer){
+ buffer.getLongString(value);
+}
+
+void IntegerValue::encode(Buffer& buffer){
+ buffer.putLong((u_int32_t) value);
+}
+void IntegerValue::decode(Buffer& buffer){
+ value = buffer.getLong();
+}
+
+void TimeValue::encode(Buffer& buffer){
+ buffer.putLongLong(value);
+}
+void TimeValue::decode(Buffer& buffer){
+ value = buffer.getLongLong();
+}
+
+void DecimalValue::encode(Buffer& buffer){
+ buffer.putOctet(value.decimals);
+ buffer.putLong(value.value);
+}
+void DecimalValue::decode(Buffer& buffer){
+ value = Decimal(buffer.getLong(), buffer.getOctet());
+}
+
+void FieldTableValue::encode(Buffer& buffer){
+ buffer.putFieldTable(value);
+}
+void FieldTableValue::decode(Buffer& buffer){
+ buffer.getFieldTable(value);
+}
+
+std::auto_ptr<Value> Value::decode_value(Buffer& buffer)
+{
+ std::auto_ptr<Value> value;
+ u_int8_t type = buffer.getOctet();
+ switch(type){
+ case 'S':
+ value.reset(new StringValue());
+ break;
+ case 'I':
+ value.reset(new IntegerValue());
+ break;
+ case 'D':
+ value.reset(new DecimalValue());
+ break;
+ case 'T':
+ value.reset(new TimeValue());
+ break;
+ case 'F':
+ value.reset(new FieldTableValue());
+ break;
+ default:
+ THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type");
+ }
+ value->decode(buffer);
+ return value;
+}
+
+EmptyValue::~EmptyValue() {}
+
+void EmptyValue::print(std::ostream& out) const
+{
+ out << "<empty field value>";
+}
+
+std::ostream& operator<<(std::ostream& out, const Value& v) {
+ v.print(out);
+ return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d)
+{
+ return out << "Decimal(" << d.value << "," << d.decimals << ")";
+}
+
+}}
+
+
+
diff --git a/cpp/lib/common/framing/Value.h b/cpp/lib/common/framing/Value.h
new file mode 100644
index 0000000000..6d240c2eb9
--- /dev/null
+++ b/cpp/lib/common/framing/Value.h
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <vector>
+#include <amqp_types.h>
+#include <FieldTable.h>
+
+#ifndef _Value_
+#define _Value_
+
+namespace qpid {
+namespace framing {
+
+class Buffer;
+
+/**
+ * Represents a decimal value.
+ * No arithmetic functionality for now, we only care about encoding/decoding.
+ */
+struct Decimal {
+ u_int32_t value;
+ u_int8_t decimals;
+
+ Decimal(u_int32_t value_=0, u_int8_t decimals_=0) : value(value_), decimals(decimals_) {}
+ bool operator==(const Decimal& d) const {
+ return decimals == d.decimals && value == d.value;
+ }
+ bool operator!=(const Decimal& d) const { return !(*this == d); }
+};
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d);
+
+/**
+ * Polymorpic base class for values.
+ */
+class Value {
+ public:
+ virtual ~Value();
+ virtual u_int32_t size() const = 0;
+ virtual char getType() const = 0;
+ virtual void encode(Buffer& buffer) = 0;
+ virtual void decode(Buffer& buffer) = 0;
+ virtual bool operator==(const Value&) const = 0;
+ bool operator!=(const Value& v) const { return !(*this == v); }
+ virtual void print(std::ostream& out) const = 0;
+
+ /** Create a new value by decoding from the buffer */
+ static std::auto_ptr<Value> decode_value(Buffer& buffer);
+};
+
+std::ostream& operator<<(std::ostream& out, const Value& d);
+
+
+/**
+ * Template for common operations on Value sub-classes.
+ */
+template <class T>
+class ValueOps : public Value
+{
+ protected:
+ T value;
+ public:
+ ValueOps() {}
+ ValueOps(const T& v) : value(v) {}
+ const T& getValue() const { return value; }
+ T& getValue() { return value; }
+
+ virtual bool operator==(const Value& v) const {
+ const ValueOps<T>* vo = dynamic_cast<const ValueOps<T>*>(&v);
+ if (vo == 0) return false;
+ else return value == vo->value;
+ }
+
+ void print(std::ostream& out) const { out << value; }
+};
+
+
+class StringValue : public ValueOps<std::string> {
+ public:
+ StringValue(const std::string& v) : ValueOps<std::string>(v) {}
+ StringValue() {}
+ virtual u_int32_t size() const { return 4 + value.length(); }
+ virtual char getType() const { return 'S'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class IntegerValue : public ValueOps<int> {
+ public:
+ IntegerValue(int v) : ValueOps<int>(v) {}
+ IntegerValue(){}
+ virtual u_int32_t size() const { return 4; }
+ virtual char getType() const { return 'I'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class TimeValue : public ValueOps<u_int64_t> {
+ public:
+ TimeValue(u_int64_t v) : ValueOps<u_int64_t>(v){}
+ TimeValue(){}
+ virtual u_int32_t size() const { return 8; }
+ virtual char getType() const { return 'T'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class DecimalValue : public ValueOps<Decimal> {
+ public:
+ DecimalValue(const Decimal& d) : ValueOps<Decimal>(d) {}
+ DecimalValue(u_int32_t value_=0, u_int8_t decimals_=0) :
+ ValueOps<Decimal>(Decimal(value_, decimals_)){}
+ virtual u_int32_t size() const { return 5; }
+ virtual char getType() const { return 'D'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+
+class FieldTableValue : public ValueOps<FieldTable> {
+ public:
+ FieldTableValue(const FieldTable& v) : ValueOps<FieldTable>(v){}
+ FieldTableValue(){}
+ virtual u_int32_t size() const { return 4 + value.size(); }
+ virtual char getType() const { return 'F'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class EmptyValue : public Value {
+ public:
+ ~EmptyValue();
+ virtual u_int32_t size() const { return 0; }
+ virtual char getType() const { return 0; }
+ virtual void encode(Buffer& ) {}
+ virtual void decode(Buffer& ) {}
+ virtual bool operator==(const Value& v) const {
+ return dynamic_cast<const EmptyValue*>(&v);
+ }
+ virtual void print(std::ostream& out) const;
+};
+
+}} // qpid::framing
+
+#endif
diff --git a/cpp/lib/common/framing/amqp_framing.h b/cpp/lib/common/framing/amqp_framing.h
new file mode 100644
index 0000000000..62f87352f8
--- /dev/null
+++ b/cpp/lib/common/framing/amqp_framing.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <amqp_types.h>
+#include <AMQFrame.h>
+#include <AMQBody.h>
+#include <BodyHandler.h>
+#include <AMQMethodBody.h>
+#include <AMQHeaderBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeartbeatBody.h>
+#include <AMQP_MethodVersionMap.h>
+#include <InputHandler.h>
+#include <OutputHandler.h>
+#include <InitiationHandler.h>
+#include <ProtocolInitiation.h>
+#include <BasicHeaderProperties.h>
+#include <ProtocolVersion.h>
+#include <ProtocolVersionException.h>
diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h
new file mode 100644
index 0000000000..3d8e9632c0
--- /dev/null
+++ b/cpp/lib/common/framing/amqp_types.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#ifdef _WINDOWS
+#include "windows.h"
+typedef unsigned char u_int8_t;
+typedef unsigned short u_int16_t;
+typedef unsigned int u_int32_t;
+typedef unsigned __int64 u_int64_t;
+#endif
+#ifndef _WINDOWS
+#include "stdint.h"
+#endif
+
+#ifndef AMQP_TYPES_H
+#define AMQP_TYPES_H
+
+namespace qpid
+{
+namespace framing
+{
+
+using std::string;
+
+}
+}
+#endif
diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h
new file mode 100644
index 0000000000..e6bc27a593
--- /dev/null
+++ b/cpp/lib/common/sys/Acceptor.h
@@ -0,0 +1,47 @@
+#ifndef _sys_Acceptor_h
+#define _sys_Acceptor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+#include <SharedObject.h>
+
+namespace qpid {
+namespace sys {
+
+class SessionHandlerFactory;
+
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
+ virtual ~Acceptor() = 0;
+ virtual int16_t getPort() const = 0;
+ virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0;
+ virtual void shutdown() = 0;
+};
+
+}}
+
+
+
+#endif /*!_sys_Acceptor_h*/
diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h
new file mode 100644
index 0000000000..b625b2c9b0
--- /dev/null
+++ b/cpp/lib/common/sys/AtomicCount.h
@@ -0,0 +1,71 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount : boost::noncopyable {
+ public:
+ class ScopedDecrement : boost::noncopyable {
+ public:
+ /** Decrement counter in constructor and increment in destructor. */
+ ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+ /** Return the value returned by the decrement. */
+ operator long() { return value; }
+ private:
+ AtomicCount& count;
+ long value;
+ };
+
+ class ScopedIncrement : boost::noncopyable {
+ public:
+ /** Increment counter in constructor and increment in destructor. */
+ ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ AtomicCount& count;
+ };
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h
diff --git a/cpp/lib/common/sys/Module.h b/cpp/lib/common/sys/Module.h
new file mode 100644
index 0000000000..9bf5d6e1fc
--- /dev/null
+++ b/cpp/lib/common/sys/Module.h
@@ -0,0 +1,161 @@
+#ifndef _sys_Module_h
+#define _sys_Module_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <iostream>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+#if USE_APR
+#include <apr_dso.h>
+ typedef apr_dso_handle_t* dso_handle_t;
+#else
+ typedef void* dso_handle_t;
+#endif
+
+ template <class T> class Module : private boost::noncopyable
+ {
+ typedef T* create_t();
+ typedef void destroy_t(T*);
+
+ dso_handle_t handle;
+ destroy_t* destroy;
+ T* ptr;
+
+ void load(const std::string& name);
+ void unload();
+ void* getSymbol(const std::string& name);
+
+ public:
+ Module(const std::string& name);
+ T* operator->();
+ T* get();
+ ~Module() throw();
+ };
+
+}
+}
+
+using namespace qpid::sys;
+
+template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0)
+{
+ load(module);
+ //TODO: need a better strategy for symbol names to allow multiple
+ //modules to be loaded without clashes...
+
+ //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic
+ create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create")));
+ destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy")));
+ ptr = create();
+}
+
+template <class T> T* Module<T>::operator->()
+{
+ return ptr;
+}
+
+template <class T> T* Module<T>::get()
+{
+ return ptr;
+}
+
+template <class T> Module<T>::~Module() throw()
+{
+ try {
+ if (handle && ptr) {
+ destroy(ptr);
+ }
+ if (handle) unload();
+ } catch (std::exception& e) {
+ std::cout << "Error while destroying module: " << e.what() << std::endl;
+ }
+ destroy = 0;
+ handle = 0;
+ ptr = 0;
+}
+
+// APR ================================================================
+#if USE_APR
+
+#include <apr/APRBase.h>
+#include <apr/APRPool.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+}
+
+template <class T> void Module<T>::unload()
+{
+ CHECK_APR_SUCCESS(apr_dso_unload(handle));
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ apr_dso_handle_sym_t symbol;
+ CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str()));
+ return (void*) symbol;
+}
+
+// POSIX================================================================
+#else
+
+#include <dlfcn.h>
+
+template <class T> void Module<T>::load(const std::string& name)
+{
+ dlerror();
+ handle = dlopen(name.c_str(), RTLD_NOW);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void Module<T>::unload()
+{
+ dlerror();
+ dlclose(handle);
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+}
+
+template <class T> void* Module<T>::getSymbol(const std::string& name)
+{
+ dlerror();
+ void* sym = dlsym(handle, name.c_str());
+ const char* error = dlerror();
+ if (error) {
+ THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ }
+ return sym;
+}
+
+#endif //if USE_APR
+
+#endif //ifndef _sys_Module_h
+
diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h
new file mode 100644
index 0000000000..e58931e699
--- /dev/null
+++ b/cpp/lib/common/sys/Monitor.h
@@ -0,0 +1,127 @@
+#ifndef _sys_Monitor_h
+#define _sys_Monitor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/errno.h>
+#include <boost/noncopyable.hpp>
+#include <sys/Mutex.h>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_thread_cond.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor is a condition variable and a mutex
+ */
+class Monitor : public Mutex
+{
+ public:
+ inline Monitor();
+ inline ~Monitor();
+ inline void wait();
+ inline bool wait(const Time& absoluteTime);
+ inline void notify();
+ inline void notifyAll();
+
+ private:
+#ifdef USE_APR
+ apr_thread_cond_t* condition;
+#else
+ pthread_cond_t condition;
+#endif
+};
+
+
+// APR ================================================================
+#ifdef USE_APR
+
+Monitor::Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Monitor::~Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Monitor::wait() {
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+bool Monitor::wait(const Time& absoluteTime){
+ // APR uses microseconds.
+ apr_status_t status =
+ apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC);
+ if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
+ return status == 0;
+}
+
+void Monitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Monitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+#else
+// POSIX ================================================================
+
+Monitor::Monitor() {
+ QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+}
+
+Monitor::~Monitor() {
+ QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+}
+
+void Monitor::wait() {
+ QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex));
+}
+
+bool Monitor::wait(const Time& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
+ }
+ return true;
+}
+
+void Monitor::notify(){
+ QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+}
+
+void Monitor::notifyAll(){
+ QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+}
+#endif /*USE_APR*/
+
+
+}}
+#endif /*!_sys_Monitor_h*/
diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h
new file mode 100644
index 0000000000..4022da2f6e
--- /dev/null
+++ b/cpp/lib/common/sys/Mutex.h
@@ -0,0 +1,151 @@
+#ifndef _sys_Mutex_h
+#define _sys_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifdef USE_APR
+# include <apr_thread_mutex.h>
+# include <apr/APRBase.h>
+# include <apr/APRPool.h>
+#else
+# include <pthread.h>
+# include <posix/check.h>
+#endif
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Scoped lock template: calls lock() in ctor, unlock() in dtor.
+ * L can be any class with lock() and unlock() functions.
+ */
+template <class L>
+class ScopedLock
+{
+ public:
+ ScopedLock(L& l) : mutex(l) { l.lock(); }
+ ~ScopedLock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ public:
+ typedef ScopedLock<Mutex> ScopedLock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ protected:
+#ifdef USE_APR
+ apr_thread_mutex_t* mutex;
+#else
+ pthread_mutex_t mutex;
+#endif
+};
+
+#ifdef USE_APR
+// APR ================================================================
+
+Mutex::Mutex() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+#else
+// POSIX ================================================================
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ // Must be public to be a POD:
+ pthread_mutex_t mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
+
+
+void PODMutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void PODMutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void PODMutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+
+Mutex::Mutex() {
+ QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0));
+}
+
+Mutex::~Mutex(){
+ QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+}
+
+void Mutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void Mutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void Mutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+#endif // USE_APR
+
+}}
+
+
+
+#endif /*!_sys_Mutex_h*/
diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp
new file mode 100644
index 0000000000..30122c682f
--- /dev/null
+++ b/cpp/lib/common/sys/Runnable.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Runnable.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+Runnable::~Runnable() {}
+
+Runnable::Functor Runnable::functor()
+{
+ return boost::bind(&Runnable::run, this);
+}
+
+}}
diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h
new file mode 100644
index 0000000000..fb3927c612
--- /dev/null
+++ b/cpp/lib/common/sys/Runnable.h
@@ -0,0 +1,50 @@
+#ifndef _Runnable_
+#define _Runnable_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Interface for objects that can be run, e.g. in a thread.
+ */
+class Runnable
+{
+ public:
+ /** Type to represent a runnable as a Functor */
+ typedef boost::function0<void> Functor;
+
+ virtual ~Runnable();
+
+ /** Derived classes override run(). */
+ virtual void run() = 0;
+
+ /** Create a functor object that will call this->run(). */
+ Functor functor();
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/sys/SessionContext.h b/cpp/lib/common/sys/SessionContext.h
new file mode 100644
index 0000000000..671e00774f
--- /dev/null
+++ b/cpp/lib/common/sys/SessionContext.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionContext_
+#define _SessionContext_
+
+#include <OutputHandler.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Provides the output handler associated with a connection.
+ */
+class SessionContext : public virtual qpid::framing::OutputHandler
+{
+ public:
+ virtual void close() = 0;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/sys/SessionHandler.h b/cpp/lib/common/sys/SessionHandler.h
new file mode 100644
index 0000000000..76f79d421d
--- /dev/null
+++ b/cpp/lib/common/sys/SessionHandler.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandler_
+#define _SessionHandler_
+
+#include <InputHandler.h>
+#include <InitiationHandler.h>
+#include <ProtocolInitiation.h>
+#include <sys/TimeoutHandler.h>
+
+namespace qpid {
+namespace sys {
+
+ class SessionHandler :
+ public qpid::framing::InitiationHandler,
+ public qpid::framing::InputHandler,
+ public TimeoutHandler
+ {
+ public:
+ virtual void closed() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/SessionHandlerFactory.h b/cpp/lib/common/sys/SessionHandlerFactory.h
new file mode 100644
index 0000000000..2a01aebcb0
--- /dev/null
+++ b/cpp/lib/common/sys/SessionHandlerFactory.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerFactory_
+#define _SessionHandlerFactory_
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class SessionContext;
+class SessionHandler;
+
+/**
+ * Callback interface used by the Acceptor to
+ * create a SessionHandler for each new connection.
+ */
+class SessionHandlerFactory : private boost::noncopyable
+{
+ public:
+ virtual SessionHandler* create(SessionContext* ctxt) = 0;
+ virtual ~SessionHandlerFactory(){}
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/sys/ShutdownHandler.h b/cpp/lib/common/sys/ShutdownHandler.h
new file mode 100644
index 0000000000..88baecb5b6
--- /dev/null
+++ b/cpp/lib/common/sys/ShutdownHandler.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ShutdownHandler_
+#define _ShutdownHandler_
+
+namespace qpid {
+namespace sys {
+
+ class ShutdownHandler
+ {
+ public:
+ virtual void shutdown() = 0;
+ virtual ~ShutdownHandler(){}
+ };
+
+}
+}
+
+#endif
diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h
new file mode 100644
index 0000000000..d793a240c6
--- /dev/null
+++ b/cpp/lib/common/sys/Socket.h
@@ -0,0 +1,88 @@
+#ifndef _sys_Socket_h
+#define _sys_Socket_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_network_io.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+class Socket
+{
+ public:
+ /** Create an initialized TCP socket */
+ static Socket createTcp();
+
+ /** Create a socket wrapper for descriptor. */
+#ifdef USE_APR
+ Socket(apr_socket_t* descriptor = 0);
+#else
+ Socket(int descriptor = 0);
+#endif
+
+ /** Set timeout for read and write */
+ void setTimeout(Time interval);
+
+ void connect(const std::string& host, int port);
+
+ void close();
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
+
+ /** Returns bytes sent or an ErrorCode value < 0. */
+ ssize_t send(const void* data, size_t size);
+
+ /**
+ * Returns bytes received, an ErrorCode value < 0 or 0
+ * if the connection closed in an orderly manner.
+ */
+ ssize_t recv(void* data, size_t size);
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10);
+
+ /** Get file descriptor */
+ int fd();
+
+ private:
+#ifdef USE_APR
+ apr_socket_t* socket;
+#else
+ void init() const;
+ mutable int socket; // Initialized on demand.
+#endif
+};
+
+}}
+
+
+#endif /*!_sys_Socket_h*/
diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h
new file mode 100644
index 0000000000..47b95b6234
--- /dev/null
+++ b/cpp/lib/common/sys/Thread.h
@@ -0,0 +1,142 @@
+#ifndef _sys_Thread_h
+#define _sys_Thread_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Runnable.h>
+
+#ifdef USE_APR
+# include <apr_thread_proc.h>
+# include <apr_portable.h>
+# include <apr/APRPool.h>
+# include <apr/APRBase.h>
+#else
+# include <posix/check.h>
+# include <pthread.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+class Thread
+{
+ public:
+ inline static Thread current();
+ inline static void yield();
+
+ inline Thread();
+ inline explicit Thread(qpid::sys::Runnable*);
+ inline explicit Thread(qpid::sys::Runnable&);
+
+ inline void join();
+
+ inline long id();
+
+ private:
+#ifdef USE_APR
+ static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data);
+ inline Thread(apr_thread_t* t);
+ apr_thread_t* thread;
+#else
+ static void* runRunnable(void* runnable);
+ inline Thread(pthread_t);
+ pthread_t thread;
+#endif
+};
+
+
+Thread::Thread() : thread(0) {}
+
+// APR ================================================================
+#ifdef USE_APR
+
+Thread::Thread(Runnable* runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
+}
+
+Thread::Thread(Runnable& runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+}
+
+void Thread::join(){
+ apr_status_t status;
+ if (thread != 0)
+ CHECK_APR_SUCCESS(apr_thread_join(&status, thread));
+}
+
+long Thread::id() {
+ return long(thread);
+}
+
+Thread::Thread(apr_thread_t* t) : thread(t) {}
+
+Thread Thread::current(){
+ apr_thread_t* thr;
+ apr_os_thread_t osthr = apr_os_thread_current();
+ CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get()));
+ return Thread(thr);
+}
+
+void Thread::yield()
+{
+ apr_thread_yield();
+}
+
+
+// POSIX ================================================================
+#else
+
+Thread::Thread(Runnable* runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+}
+
+Thread::Thread(Runnable& runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
+}
+
+void Thread::join(){
+ QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+}
+
+long Thread::id() {
+ return long(thread);
+}
+
+Thread::Thread(pthread_t thr) : thread(thr) {}
+
+Thread Thread::current() {
+ return Thread(pthread_self());
+}
+
+void Thread::yield()
+{
+ QPID_POSIX_THROW_IF(pthread_yield());
+}
+
+
+#endif
+
+}}
+
+#endif /*!_sys_Thread_h*/
diff --git a/cpp/lib/common/sys/Time.cpp b/cpp/lib/common/sys/Time.cpp
new file mode 100644
index 0000000000..ad6185b966
--- /dev/null
+++ b/cpp/lib/common/sys/Time.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+namespace qpid {
+namespace sys {
+
+// APR ================================================================
+#if USE_APR
+
+Time now() { return apr_time_now() * TIME_USEC; }
+
+// POSIX================================================================
+#else
+
+Time now() {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ return toTime(ts);
+}
+
+struct timespec toTimespec(const Time& t) {
+ struct timespec ts;
+ toTimespec(ts, t);
+ return ts;
+}
+
+struct timespec& toTimespec(struct timespec& ts, const Time& t) {
+ ts.tv_sec = t / TIME_SEC;
+ ts.tv_nsec = t % TIME_SEC;
+ return ts;
+}
+
+Time toTime(const struct timespec& ts) {
+ return ts.tv_sec*TIME_SEC + ts.tv_nsec;
+}
+
+
+#endif
+}}
+
diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h
new file mode 100644
index 0000000000..3dd46741d8
--- /dev/null
+++ b/cpp/lib/common/sys/Time.h
@@ -0,0 +1,58 @@
+#ifndef _sys_Time_h
+#define _sys_Time_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdint.h>
+
+#ifdef USE_APR
+# include <apr_time.h>
+#else
+# include <time.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/** Time in nanoseconds */
+typedef int64_t Time;
+
+Time now();
+
+/** Nanoseconds per second. */
+const Time TIME_SEC = 1000*1000*1000;
+/** Nanoseconds per millisecond */
+const Time TIME_MSEC = 1000*1000;
+/** Nanoseconds per microseconds. */
+const Time TIME_USEC = 1000;
+/** Nanoseconds per nanosecond. */
+const Time TIME_NSEC = 1;
+
+#ifndef USE_APR
+struct timespec toTimespec(const Time& t);
+struct timespec& toTimespec(struct timespec& ts, const Time& t);
+Time toTime(const struct timespec& ts);
+#endif
+
+}}
+
+#endif /*!_sys_Time_h*/
diff --git a/cpp/lib/common/sys/TimeoutHandler.h b/cpp/lib/common/sys/TimeoutHandler.h
new file mode 100644
index 0000000000..0c10709bbf
--- /dev/null
+++ b/cpp/lib/common/sys/TimeoutHandler.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TimeoutHandler_
+#define _TimeoutHandler_
+
+namespace qpid {
+namespace sys {
+
+ class TimeoutHandler
+ {
+ public:
+ virtual void idleOut() = 0;
+ virtual void idleIn() = 0;
+ virtual ~TimeoutHandler(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
new file mode 100644
index 0000000000..c998b33625
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sys/Acceptor.h>
+#include <sys/SessionHandlerFactory.h>
+#include "LFProcessor.h"
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include "APRPool.h"
+
+namespace qpid {
+namespace sys {
+
+class APRAcceptor : public Acceptor
+{
+ public:
+ APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+ virtual int16_t getPort() const;
+ virtual void run(qpid::sys::SessionHandlerFactory* factory);
+ virtual void shutdown();
+
+ private:
+ int16_t port;
+ bool trace;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+};
+
+// Define generic Acceptor::create() to return APRAcceptor.
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+ return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
+}
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+ APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+ port(port_),
+ trace(trace_),
+ processor(APRPool::get(), threads, 1000, 5000000)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t APRAcceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void APRAcceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void APRAcceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
+
+}}
diff --git a/cpp/lib/common/sys/apr/APRBase.cpp b/cpp/lib/common/sys/apr/APRBase.cpp
new file mode 100644
index 0000000000..19a1b93103
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRBase.cpp
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <QpidError.h>
+#include "APRBase.h"
+
+using namespace qpid::sys;
+
+APRBase* APRBase::instance = 0;
+
+APRBase* APRBase::getInstance(){
+ if(instance == 0){
+ instance = new APRBase();
+ }
+ return instance;
+}
+
+
+APRBase::APRBase() : count(0){
+ apr_initialize();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, 0));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+APRBase::~APRBase(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ apr_terminate();
+}
+
+bool APRBase::_increment(){
+ bool deleted(false);
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(this == instance){
+ count++;
+ }else{
+ deleted = true;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ return !deleted;
+}
+
+void APRBase::_decrement(){
+ APRBase* copy = 0;
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(--count == 0){
+ copy = instance;
+ instance = 0;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ if(copy != 0){
+ delete copy;
+ }
+}
+
+void APRBase::increment(){
+ int count = 0;
+ while(count++ < 2 && !getInstance()->_increment()){
+ std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl;
+ }
+}
+
+void APRBase::decrement(){
+ getInstance()->_decrement();
+}
+
+void qpid::sys::check(apr_status_t status, const std::string& file, const int line){
+ if (status != APR_SUCCESS){
+ const int size = 50;
+ char tmp[size];
+ std::string msg(apr_strerror(status, tmp, size));
+ throw QpidError(APR_ERROR + ((int) status), msg,
+ qpid::SrcLine(file, line));
+ }
+}
+
+std::string qpid::sys::get_desc(apr_status_t status){
+ const int size = 50;
+ char tmp[size];
+ return std::string(apr_strerror(status, tmp, size));
+}
+
diff --git a/cpp/lib/common/sys/apr/APRBase.h b/cpp/lib/common/sys/apr/APRBase.h
new file mode 100644
index 0000000000..d1b3e21b91
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRBase.h
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _APRBase_
+#define _APRBase_
+
+#include <string>
+#include <apr_thread_mutex.h>
+#include <apr_errno.h>
+
+namespace qpid {
+namespace sys {
+
+ /**
+ * Use of APR libraries necessitates explicit init and terminate
+ * calls. Any class using APR libs should obtain the reference to
+ * this singleton and increment on construction, decrement on
+ * destruction. This class can then correctly initialise apr
+ * before the first use and terminate after the last use.
+ */
+ class APRBase{
+ static APRBase* instance;
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ int count;
+
+ APRBase();
+ ~APRBase();
+ static APRBase* getInstance();
+ bool _increment();
+ void _decrement();
+ public:
+ static void increment();
+ static void decrement();
+ };
+
+ //this is also a convenient place for a helper function for error checking:
+ void check(apr_status_t status, const std::string& file, const int line);
+ std::string get_desc(apr_status_t status);
+
+#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__);
+
+}
+}
+
+
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp
new file mode 100644
index 0000000000..e8b71f6e8a
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRPool.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "APRPool.h"
+#include "APRBase.h"
+#include <boost/pool/detail/singleton.hpp>
+
+using namespace qpid::sys;
+
+APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+apr_pool_t* APRPool::get() {
+ return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
+
diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h
new file mode 100644
index 0000000000..da7661fcfa
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRPool.h
@@ -0,0 +1,50 @@
+#ifndef _APRPool_
+#define _APRPool_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <apr_pools.h>
+
+namespace qpid {
+namespace sys {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ /** Get singleton instance */
+ static apr_pool_t* get();
+
+ private:
+ apr_pool_t* pool;
+};
+
+}}
+
+
+
+
+
+#endif /*!_APRPool_*/
diff --git a/cpp/lib/common/sys/apr/APRSocket.cpp b/cpp/lib/common/sys/apr/APRSocket.cpp
new file mode 100644
index 0000000000..4917803370
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRSocket.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRSocket.h"
+#include <assert.h>
+#include <iostream>
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
+
+}
+
+void APRSocket::read(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ bytes = buffer.available();
+ apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out
+ }else if(APR_STATUS_IS_EOF(s)){
+ close();
+ }
+}
+
+void APRSocket::write(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ do{
+ bytes = buffer.available();
+ apr_socket_send(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ }while(bytes > 0);
+}
+
+void APRSocket::close(){
+ if(!closed){
+ std::cout << "Closing socket " << socket << "@" << this << std::endl;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ closed = true;
+ }
+}
+
+bool APRSocket::isOpen(){
+ return !closed;
+}
+
+u_int8_t APRSocket::read(){
+ char data[1];
+ apr_size_t bytes = 1;
+ apr_status_t s = apr_socket_recv(socket, data, &bytes);
+ if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ return 0;
+ }else{
+ return *data;
+ }
+}
+
+APRSocket::~APRSocket(){
+}
diff --git a/cpp/lib/common/sys/apr/APRSocket.h b/cpp/lib/common/sys/apr/APRSocket.h
new file mode 100644
index 0000000000..53f1055c6a
--- /dev/null
+++ b/cpp/lib/common/sys/apr/APRSocket.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _APRSocket_
+#define _APRSocket_
+
+#include <apr_network_io.h>
+#include <Buffer.h>
+
+namespace qpid {
+namespace sys {
+
+ class APRSocket
+ {
+ apr_socket_t* const socket;
+ volatile bool closed;
+ public:
+ APRSocket(apr_socket_t* socket);
+ void read(qpid::framing::Buffer& b);
+ void write(qpid::framing::Buffer& b);
+ void close();
+ bool isOpen();
+ u_int8_t read();
+ ~APRSocket();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp
new file mode 100644
index 0000000000..2b6fc92623
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sstream>
+#include <QpidError.h>
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "LFSessionContext.h"
+
+using namespace qpid::sys;
+using qpid::QpidError;
+
+// TODO aconway 2006-10-12: stopped is read outside locks.
+//
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+ size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ workerCount(_workers),
+ hasLeader(false),
+ workers(new Thread[_workers]),
+ stopped(false)
+{
+
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+}
+
+
+LFProcessor::~LFProcessor(){
+ if (!stopped) stop();
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = Thread(this);
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ Mutex::ScopedLock locker(countLock);
+ return count == size;
+}
+
+bool LFProcessor::empty(){
+ Mutex::ScopedLock locker(countLock);
+ return count == 0;
+}
+
+void LFProcessor::poll() {
+ apr_status_t status = APR_EGENERAL;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ const apr_pollfd_t* event = 0;
+ LFSessionContext* session = 0;
+ {
+ Monitor::ScopedLock l(leadLock);
+ waitToLead();
+ event = getNextEvent();
+ if(!event) return;
+ session = reinterpret_cast<LFSessionContext*>(
+ event->client_data);
+ session->startProcessing();
+ relinquishLead();
+ }
+
+ //process event:
+ if(event->rtnevents & APR_POLLIN) session->read();
+ if(event->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(),sessions.end(), session));
+ count--;
+ }else{
+ session->stopProcessing();
+ }
+ }
+ }catch(std::exception e){
+ std::cout << e.what() << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ {
+ Monitor::ScopedLock l(leadLock);
+ leadLock.notifyAll();
+ }
+ for(int i = 0; i < workerCount; i++){
+ workers[i].join();
+ }
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+
diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h
new file mode 100644
index 0000000000..de90199472
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFProcessor.h
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include <apr_poll.h>
+#include <iostream>
+#include <vector>
+#include <sys/Monitor.h>
+#include <sys/Runnable.h>
+#include <sys/Thread.h>
+
+namespace qpid {
+namespace sys {
+
+ class LFSessionContext;
+
+ /**
+ * This class processes a poll set using the leaders-followers
+ * pattern for thread synchronization: the leader will poll and on
+ * the poll returning, it will remove a session, promote a
+ * follower to leadership, then process the session.
+ */
+ class LFProcessor : private virtual qpid::sys::Runnable
+ {
+ typedef std::vector<LFSessionContext*>::iterator iterator;
+
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int signalledCount;
+ int current;
+ const apr_pollfd_t* signalledFDs;
+ int count;
+ const int workerCount;
+ bool hasLeader;
+ qpid::sys::Thread* workers;
+ qpid::sys::Monitor leadLock;
+ qpid::sys::Mutex countLock;
+ std::vector<LFSessionContext*> sessions;
+ volatile bool stopped;
+
+ const apr_pollfd_t* getNextEvent();
+ void waitToLead();
+ void relinquishLead();
+ void poll();
+ virtual void run();
+
+ public:
+ LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance of LFSessionContext.
+ */
+ void add(const apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(const apr_pollfd_t* const fd);
+ /**
+ * Signal that the fd passed in, already part of the pollset,
+ * has had its flags altered.
+ */
+ void update(const apr_pollfd_t* const fd);
+ /**
+ * Add an fd back to the poll set after deactivation.
+ */
+ void reactivate(const apr_pollfd_t* const fd);
+ /**
+ * Temporarily remove the fd from the poll set. Called when processing
+ * is about to begin.
+ */
+ void deactivate(const apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+ /**
+ * Start processing.
+ */
+ void start();
+ /**
+ * Is processing stopped?
+ */
+ bool isStopped();
+
+ ~LFProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp
new file mode 100644
index 0000000000..7fb8d5a91b
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include <QpidError.h>
+#include <assert.h>
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+ LFProcessor* const _processor,
+ bool _debug) :
+ debug(_debug),
+ socket(_socket),
+ initiated(false),
+ in(65536),
+ out(65536),
+ processor(_processor),
+ processing(false),
+ closing(false)
+{
+
+ fd.p = _pool;
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.reqevents = APR_POLLIN;
+ fd.client_data = this;
+ fd.desc.s = _socket;
+
+ out.flip();
+}
+
+LFSessionContext::~LFSessionContext(){
+
+}
+
+void LFSessionContext::read(){
+ socket.read(in);
+ in.flip();
+ if(initiated){
+ AMQFrame frame;
+ while(frame.decode(in)){
+ if(debug) log("RECV", &frame);
+ handler->received(&frame);
+ }
+ }else{
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ initiated = true;
+ if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
+ }
+ }
+ in.compact();
+}
+
+void LFSessionContext::write(){
+ bool done = isClosed();
+ while(!done){
+ if(out.available() > 0){
+ socket.write(out);
+ if(out.available() > 0){
+
+ //incomplete write, leave flags to receive notification of readiness to write
+ done = true;//finished processing for now, but write is still in progress
+ }
+ }else{
+ //do we have any frames to write?
+ Mutex::ScopedLock l(writeLock);
+ if(!framesToWrite.empty()){
+ out.clear();
+ bool encoded(false);
+ AMQFrame* frame = framesToWrite.front();
+ while(frame && out.available() >= frame->size()){
+ encoded = true;
+ frame->encode(out);
+ if(debug) log("SENT", frame);
+ delete frame;
+ framesToWrite.pop();
+ frame = framesToWrite.empty() ? 0 : framesToWrite.front();
+ }
+ if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ out.flip();
+ }else{
+ //reset flags, don't care about writability anymore
+ fd.reqevents = APR_POLLIN;
+ done = true;
+
+ if(closing){
+ socket.close();
+ }
+ }
+ }
+ }
+}
+
+void LFSessionContext::send(AMQFrame* frame){
+ Mutex::ScopedLock l(writeLock);
+ if(!closing){
+ framesToWrite.push(frame);
+ if(!(fd.reqevents & APR_POLLOUT)){
+ fd.reqevents |= APR_POLLOUT;
+ if(!processing){
+ processor->update(&fd);
+ }
+ }
+ }
+}
+
+void LFSessionContext::startProcessing(){
+ Mutex::ScopedLock l(writeLock);
+ processing = true;
+ processor->deactivate(&fd);
+}
+
+void LFSessionContext::stopProcessing(){
+ Mutex::ScopedLock l(writeLock);
+ processor->reactivate(&fd);
+ processing = false;
+}
+
+void LFSessionContext::close(){
+ closing = true;
+ Mutex::ScopedLock l(writeLock);
+ if(!processing){
+ //allow pending frames to be written to socket
+ fd.reqevents = APR_POLLOUT;
+ processor->update(&fd);
+ }
+}
+
+void LFSessionContext::handleClose(){
+ handler->closed();
+ std::cout << "Session closed [" << &socket << "]" << std::endl;
+ delete handler;
+ delete this;
+}
+
+void LFSessionContext::shutdown(){
+ socket.close();
+ handleClose();
+}
+
+void LFSessionContext::init(SessionHandler* _handler){
+ handler = _handler;
+ processor->add(&fd);
+}
+
+void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+ Mutex::ScopedLock l(logLock);
+ std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+}
+
+Mutex LFSessionContext::logLock;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
new file mode 100644
index 0000000000..9483cbe590
--- /dev/null
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include <apr_network_io.h>
+#include <apr_poll.h>
+#include <apr_time.h>
+
+#include <AMQFrame.h>
+#include <Buffer.h>
+#include <sys/Monitor.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+
+#include "APRSocket.h"
+#include "LFProcessor.h"
+
+namespace qpid {
+namespace sys {
+
+
+class LFSessionContext : public virtual qpid::sys::SessionContext
+{
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ qpid::sys::SessionHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::sys::Mutex writeLock;
+
+ bool processing;
+ bool closing;
+
+ static qpid::sys::Mutex logLock;
+ void log(const std::string& desc,
+ qpid::framing::AMQFrame* const frame);
+
+
+ public:
+ LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ virtual ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void read();
+ void write();
+ void init(qpid::sys::SessionHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp
new file mode 100644
index 0000000000..336eb4996a
--- /dev/null
+++ b/cpp/lib/common/sys/apr/Socket.cpp
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <sys/Socket.h>
+#include <apr/APRBase.h>
+#include <apr/APRPool.h>
+
+
+using namespace qpid::sys;
+
+Socket Socket::createTcp() {
+ Socket s;
+ CHECK_APR_SUCCESS(
+ apr_socket_create(
+ &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ APRPool::get()));
+ return s;
+}
+
+Socket::Socket(apr_socket_t* s) {
+ socket = s;
+}
+
+void Socket::setTimeout(Time interval) {
+ apr_socket_timeout_set(socket, interval/TIME_USEC);
+}
+
+void Socket::connect(const std::string& host, int port) {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(
+ apr_sockaddr_info_get(
+ &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
+ APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+}
+
+void Socket::close() {
+ if (socket == 0) return;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket = 0;
+}
+
+ssize_t Socket::send(const void* data, size_t size)
+{
+ apr_size_t sent = size;
+ apr_status_t status =
+ apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
+ if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return sent;
+}
+
+ssize_t Socket::recv(void* data, size_t size)
+{
+ apr_size_t received = size;
+ apr_status_t status =
+ apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
+ if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ CHECK_APR_SUCCESS(status);
+ return received;
+}
+
+
diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp
new file mode 100644
index 0000000000..5c4799aa96
--- /dev/null
+++ b/cpp/lib/common/sys/apr/Thread.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Thread.h>
+
+using namespace qpid::sys;
+using qpid::sys::Runnable;
+
+void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) {
+ reinterpret_cast<Runnable*>(data)->run();
+ CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+ return NULL;
+}
+
+
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
new file mode 100644
index 0000000000..16c7ec9c3f
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -0,0 +1,325 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <mqueue.h>
+#include <string.h>
+#include <iostream>
+
+#include <sys/errno.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#include <typeinfo>
+#include <iostream>
+#include <queue>
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/current_function.hpp>
+
+#include <QpidError.h>
+#include <sys/Monitor.h>
+
+#include "check.h"
+#include "EventChannel.h"
+
+using namespace std;
+
+
+// Convenience template to zero out a struct.
+template <class S> struct ZeroStruct : public S {
+ ZeroStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+namespace qpid {
+namespace sys {
+
+
+/**
+ * EventHandler wraps an epoll file descriptor. Acts as private
+ * interface between EventChannel and subclasses.
+ *
+ * Also implements Event interface for events that are not associated
+ * with a file descriptor and are passed via the message queue.
+ */
+class EventHandler : public Event, private Monitor
+{
+ public:
+ EventHandler(int epollSize = 256);
+ ~EventHandler();
+
+ int getEpollFd() { return epollFd; }
+ void epollAdd(int fd, uint32_t epollEvents, Event* event);
+ void epollMod(int fd, uint32_t epollEvents, Event* event);
+ void epollDel(int fd);
+
+ void mqPut(Event* event);
+ Event* mqGet();
+
+ protected:
+ // Should never be called, only complete.
+ void prepare(EventHandler&) { assert(0); }
+ Event* complete(EventHandler& eh);
+
+ private:
+ int epollFd;
+ std::string mqName;
+ int mqFd;
+ std::queue<Event*> mqEvents;
+};
+
+EventHandler::EventHandler(int epollSize)
+{
+ epollFd = epoll_create(epollSize);
+ if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+ // Create a POSIX message queue for non-fd events.
+ // We write one byte and never read it is always ready for read
+ // when we add it to epoll.
+ //
+ ZeroStruct<struct mq_attr> attr;
+ attr.mq_maxmsg = 1;
+ attr.mq_msgsize = 1;
+ do {
+ char tmpnam[L_tmpnam];
+ tmpnam_r(tmpnam);
+ mqName = tmpnam + 4; // Skip "tmp/"
+ mqFd = mq_open(
+ mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
+ if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
+ } while (mqFd == EEXIST); // Name already taken, try again.
+
+ static char zero = '\0';
+ mq_send(mqFd, &zero, 1, 0);
+ epollAdd(mqFd, 0, this);
+}
+
+EventHandler::~EventHandler() {
+ mq_close(mqFd);
+ mq_unlink(mqName.c_str());
+}
+
+void EventHandler::mqPut(Event* event) {
+ ScopedLock l(*this);
+ assert(event != 0);
+ mqEvents.push(event);
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+}
+
+Event* EventHandler::mqGet() {
+ ScopedLock l(*this);
+ if (mqEvents.empty())
+ return 0;
+ Event* event = mqEvents.front();
+ mqEvents.pop();
+ if(!mqEvents.empty())
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+ return event;
+}
+
+void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollDel(int fd) {
+ if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+Event* EventHandler::complete(EventHandler& eh)
+{
+ assert(&eh == this);
+ Event* event = mqGet();
+ return event==0 ? 0 : event->complete(eh);
+}
+
+// ================================================================
+// EventChannel
+
+EventChannel::shared_ptr EventChannel::create() {
+ return shared_ptr(new EventChannel());
+}
+
+EventChannel::EventChannel() : handler(new EventHandler()) {}
+
+EventChannel::~EventChannel() {}
+
+void EventChannel::postEvent(Event& e)
+{
+ e.prepare(*handler);
+}
+
+Event* EventChannel::getEvent()
+{
+ static const int infiniteTimeout = -1;
+ ZeroStruct<struct epoll_event> epollEvent;
+
+ // Loop until we can complete the event. Some events may re-post
+ // themselves and return 0 from complete, e.g. partial reads. //
+ Event* event = 0;
+ while (event == 0) {
+ int eventCount = epoll_wait(handler->getEpollFd(),
+ &epollEvent, 1, infiniteTimeout);
+ if (eventCount < 0) {
+ if (errno != EINTR) {
+ // TODO aconway 2006-11-28: Proper handling/logging of errors.
+ cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
+ << PosixError::getMessage(errno) << endl;
+ assert(0);
+ }
+ }
+ else if (eventCount == 1) {
+ event = reinterpret_cast<Event*>(epollEvent.data.ptr);
+ assert(event != 0);
+ try {
+ event = event->complete(*handler);
+ }
+ catch (const Exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ catch (const std::exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ }
+ }
+ return event;
+}
+
+Event::~Event() {}
+
+void Event::prepare(EventHandler& handler)
+{
+ handler.mqPut(this);
+}
+
+bool Event::hasError() const {
+ return error;
+}
+
+void Event::throwIfError() throw (Exception) {
+ if (hasError())
+ error.throwSelf();
+}
+
+Event* Event::complete(EventHandler&)
+{
+ return this;
+}
+
+void Event::dispatch()
+{
+ try {
+ if (!callback.empty())
+ callback();
+ } catch (const std::exception&) {
+ throw;
+ } catch (...) {
+ throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ }
+}
+
+void Event::setError(const ExceptionHolder& e) {
+ error = e;
+}
+
+void ReadEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+ssize_t ReadEvent::doRead() {
+ ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
+ size - received);
+ if (n > 0) received += n;
+ return n;
+}
+
+Event* ReadEvent::complete(EventHandler& handler)
+{
+ // Read as much as possible without blocking.
+ ssize_t n = doRead();
+ while (n > 0 && received < size) doRead();
+
+ if (received == size) {
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ return this;
+ }
+ else if (n <0 && (errno == EAGAIN)) {
+ // Keep polling for more.
+ handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ return 0;
+ }
+ else {
+ // Unexpected EOF or error. Throw ENODATA for EOF.
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ }
+}
+
+void WriteEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+}
+
+Event* WriteEvent::complete(EventHandler& handler)
+{
+ ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
+ size - written);
+ if (n < 0) throw QPID_POSIX_ERROR(errno);
+ written += n;
+ if(written < size) {
+ // Keep polling.
+ handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+ return 0;
+ }
+ written = 0; // Reset for re-use.
+ handler.epollDel(descriptor);
+ return this;
+}
+
+void AcceptEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+Event* AcceptEvent::complete(EventHandler& handler)
+{
+ handler.epollDel(descriptor);
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) throw QPID_POSIX_ERROR(errno);
+ return this;
+}
+
+}}
diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h
new file mode 100644
index 0000000000..49c7fce740
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannel.h
@@ -0,0 +1,176 @@
+#ifndef _sys_EventChannel_h
+#define _sys_EventChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <SharedObject.h>
+#include <ExceptionHolder.h>
+#include <boost/function.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class Event;
+class EventHandler;
+class EventChannel;
+
+/**
+ * Base class for all Events.
+ */
+class Event
+{
+ public:
+ /** Type for callback when event is dispatched */
+ typedef boost::function0<void> Callback;
+
+ /**
+ * Create an event with optional callback.
+ * Instances of Event are sent directly through the channel.
+ * Derived classes define additional waiting behaviour.
+ *@param cb A callback functor that is invoked when dispatch() is called.
+ */
+ Event(Callback cb = 0) : callback(cb) {}
+
+ virtual ~Event();
+
+ /** Call the callback provided to the constructor, if any. */
+ void dispatch();
+
+ /** True if there was an error processing this event */
+ bool hasError() const;
+
+ /** If hasError() throw the corresponding exception. */
+ void throwIfError() throw(Exception);
+
+ protected:
+ virtual void prepare(EventHandler&);
+ virtual Event* complete(EventHandler&);
+ void setError(const ExceptionHolder& e);
+
+ Callback callback;
+ ExceptionHolder error;
+
+ friend class EventChannel;
+ friend class EventHandler;
+};
+
+template <class BufT>
+class IOEvent : public Event {
+ public:
+ void getDescriptor() const { return descriptor; }
+ size_t getSize() const { return size; }
+ BufT getBuffer() const { return buffer; }
+
+ protected:
+ IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
+ Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+
+ int descriptor;
+ BufT buffer;
+ size_t size;
+};
+
+/** Asynchronous read event */
+class ReadEvent : public IOEvent<void*>
+{
+ public:
+ explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
+ IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+ ssize_t doRead();
+
+ size_t received;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public IOEvent<const void*>
+{
+ public:
+ explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
+ Callback cb=0) :
+ IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+
+ protected:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ private:
+ ssize_t doWrite();
+ size_t written;
+};
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public Event
+{
+ public:
+ /** Accept a connection on fd. */
+ explicit AcceptEvent(int fd=-1, Callback cb=0) :
+ Event(cb), descriptor(fd), accepted(0) {}
+
+ /** Get descriptor for server socket */
+ int getAcceptedDesscriptor() const { return accepted; }
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ int descriptor;
+ int accepted;
+};
+
+
+class QueueSet;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void postEvent(Event& event);
+
+ /** Post an event to the channel. Must not be 0. */
+ void postEvent(Event* event) { postEvent(*event); }
+
+ /**
+ * Wait for the next complete event.
+ *@return Pointer to event. Will never return 0.
+ */
+ Event* getEvent();
+
+ private:
+ EventChannel();
+ boost::shared_ptr<EventHandler> handler;
+};
+
+
+}}
+
+
+
+#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
new file mode 100644
index 0000000000..95e699e0b0
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "EventChannelThreads.h"
+#include <sys/Runnable.h>
+#include <iostream>
+using namespace std;
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+EventChannelThreads::shared_ptr EventChannelThreads::create(
+ EventChannel::shared_ptr ec)
+{
+ return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+}
+
+EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
+ channel(ec), nWaiting(0), state(RUNNING)
+{
+ // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
+ addThread();
+}
+
+EventChannelThreads::~EventChannelThreads() {
+ shutdown();
+ join();
+}
+
+void EventChannelThreads::shutdown()
+{
+ ScopedLock lock(*this);
+ if (state != RUNNING) // Already shutting down.
+ return;
+ for (size_t i = 0; i < workers.size(); ++i) {
+ channel->postEvent(terminate);
+ }
+ state = TERMINATE_SENT;
+ notify(); // Wake up one join() thread.
+}
+
+void EventChannelThreads::join()
+{
+ {
+ ScopedLock lock(*this);
+ while (state == RUNNING) // Wait for shutdown to start.
+ wait();
+ if (state == SHUTDOWN) // Shutdown is complete
+ return;
+ if (state == JOINING) {
+ // Someone else is doing the join.
+ while (state != SHUTDOWN)
+ wait();
+ return;
+ }
+ // I'm the joining thread
+ assert(state == TERMINATE_SENT);
+ state = JOINING;
+ } // Drop the lock.
+
+ for (size_t i = 0; i < workers.size(); ++i) {
+ assert(state == JOINING); // Only this thread can change JOINING.
+ workers[i].join();
+ }
+ state = SHUTDOWN;
+ notifyAll(); // Notify other join() threaeds.
+}
+
+void EventChannelThreads::addThread() {
+ ScopedLock l(*this);
+ workers.push_back(Thread(*this));
+}
+
+void EventChannelThreads::run()
+{
+ // Start life waiting. Decrement on exit.
+ AtomicCount::ScopedIncrement inc(nWaiting);
+ try {
+ while (true) {
+ Event* e = channel->getEvent();
+ assert(e != 0);
+ if (e == &terminate) {
+ return;
+ }
+ AtomicCount::ScopedDecrement dec(nWaiting);
+ // I'm no longer waiting, make sure someone is.
+ if (dec == 0)
+ addThread();
+ e->dispatch();
+ }
+ }
+ catch (const std::exception& e) {
+ // TODO aconway 2006-11-15: need better logging across the board.
+ std::cerr << "EventChannelThreads::run() caught: " << e.what()
+ << std::endl;
+ }
+ catch (...) {
+ std::cerr << "EventChannelThreads::run() caught unknown exception."
+ << std::endl;
+ }
+}
+
+}}
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h
new file mode 100644
index 0000000000..98403c0869
--- /dev/null
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.h
@@ -0,0 +1,92 @@
+#ifndef _posix_EventChannelThreads_h
+#define _sys_EventChannelThreads_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+
+#include <Exception.h>
+#include <sys/Time.h>
+#include <sys/Monitor.h>
+#include <sys/Thread.h>
+#include <sys/AtomicCount.h>
+#include "EventChannel.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ Dynamic thread pool serving an EventChannel.
+
+ Threads run a loop { e = getEvent(); e->dispatch(); }
+ The size of the thread pool is automatically adjusted to optimal size.
+*/
+class EventChannelThreads :
+ public qpid::SharedObject<EventChannelThreads>,
+ public sys::Monitor, private sys::Runnable
+{
+ public:
+ /** Create the thread pool and start initial threads. */
+ static EventChannelThreads::shared_ptr create(
+ EventChannel::shared_ptr channel
+ );
+
+ ~EventChannelThreads();
+
+ /** Post event to the underlying channel */
+ void postEvent(Event& event) { channel->postEvent(event); }
+
+ /** Post event to the underlying channel Must not be 0. */
+ void postEvent(Event* event) { channel->postEvent(event); }
+
+ /**
+ * Terminate all threads.
+ *
+ * Returns immediately, use join() to wait till all threads are
+ * shut down.
+ */
+ void shutdown();
+
+ /** Wait for all threads to terminate. */
+ void join();
+
+ private:
+ typedef std::vector<sys::Thread> Threads;
+ typedef enum {
+ RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ } State;
+
+ EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ void addThread();
+
+ void run();
+ bool keepRunning();
+ void adjustThreads();
+
+ EventChannel::shared_ptr channel;
+ Threads workers;
+ sys::AtomicCount nWaiting;
+ State state;
+ Event terminate;
+};
+
+
+}}
+
+
+#endif /*!_sys_EventChannelThreads_h*/
diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
new file mode 100644
index 0000000000..842aa76f36
--- /dev/null
+++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Acceptor.h>
+#include <Exception.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
+}
+
+class PosixAcceptor : public Acceptor {
+ public:
+ virtual int16_t getPort() const { fail(); return 0; }
+ virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); }
+ virtual void shutdown() { fail(); }
+};
+
+// Define generic Acceptor::create() to return APRAcceptor.
+ Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
+{
+ return Acceptor::shared_ptr(new PosixAcceptor());
+}
+
+// Must define Acceptor virtual dtor.
+Acceptor::~Acceptor() {}
+
+}}
diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp
new file mode 100644
index 0000000000..5bd13742f6
--- /dev/null
+++ b/cpp/lib/common/sys/posix/Socket.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/socket.h>
+#include <sys/errno.h>
+#include <netinet/in.h>
+#include <netdb.h>
+
+#include <boost/format.hpp>
+
+#include <QpidError.h>
+#include <posix/check.h>
+#include <sys/Socket.h>
+
+using namespace qpid::sys;
+
+Socket Socket::createTcp()
+{
+ int s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return s;
+}
+
+Socket::Socket(int descriptor) : socket(descriptor) {}
+
+void Socket::setTimeout(Time interval)
+{
+ struct timeval tv;
+ tv.tv_sec = interval/TIME_SEC;
+ tv.tv_usec = (interval%TIME_SEC)/TIME_USEC;
+ setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+ setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+}
+
+void Socket::connect(const std::string& host, int port)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ struct hostent* hp = gethostbyname ( host.c_str() );
+ if (hp == 0) throw QPID_POSIX_ERROR(errno);
+ memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
+ if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void
+Socket::close()
+{
+ if (socket == 0) return;
+ if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
+ socket = 0;
+}
+
+ssize_t
+Socket::send(const void* data, size_t size)
+{
+ ssize_t sent = ::send(socket, data, size, 0);
+ if (sent < 0) {
+ if (errno == ECONNRESET) return SOCKET_EOF;
+ if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
+ throw QPID_POSIX_ERROR(errno);
+ }
+ return sent;
+}
+
+ssize_t
+Socket::recv(void* data, size_t size)
+{
+ ssize_t received = ::recv(socket, data, size, 0);
+ if (received < 0) {
+ if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
+ throw QPID_POSIX_ERROR(errno);
+ }
+ return received;
+}
+
+int Socket::listen(int port, int backlog)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+ if (::listen(socket, backlog) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ socklen_t namelen = sizeof(name);
+ if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ return ntohs(name.sin_port);
+}
+
+
+int Socket::fd()
+{
+ return socket;
+}
diff --git a/cpp/lib/common/sys/posix/Thread.cpp b/cpp/lib/common/sys/posix/Thread.cpp
new file mode 100644
index 0000000000..f524799556
--- /dev/null
+++ b/cpp/lib/common/sys/posix/Thread.cpp
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/Thread.h>
+
+void* qpid::sys::Thread::runRunnable(void* p)
+{
+ static_cast<Runnable*>(p)->run();
+ return 0;
+}
diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp
new file mode 100644
index 0000000000..408679caa8
--- /dev/null
+++ b/cpp/lib/common/sys/posix/check.cpp
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cerrno>
+#include "check.h"
+
+namespace qpid {
+namespace sys {
+
+std::string
+PosixError::getMessage(int errNo)
+{
+ char buf[512];
+ return std::string(strerror_r(errNo, buf, sizeof(buf)));
+}
+
+PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
+ : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+{ }
+
+}}
diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h
new file mode 100644
index 0000000000..5afbe8f5a8
--- /dev/null
+++ b/cpp/lib/common/sys/posix/check.h
@@ -0,0 +1,62 @@
+#ifndef _posix_check_h
+#define _posix_check_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <cerrno>
+#include <string>
+#include <QpidError.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Exception with message from errno.
+ */
+class PosixError : public qpid::QpidError
+{
+ public:
+ static std::string getMessage(int errNo);
+
+ PosixError(int errNo, const qpid::SrcLine& location) throw();
+
+ ~PosixError() throw() {}
+
+ int getErrNo() { return errNo; }
+
+ Exception* clone() const throw() { return new PosixError(*this); }
+
+ void throwSelf() { throw *this; }
+
+ private:
+ int errNo;
+};
+
+}}
+
+/** Create a PosixError for the current file/line and errno. */
+#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+
+/** Throw a posix error if errNo is non-zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+#endif /*!_posix_check_h*/