diff options
Diffstat (limited to 'cpp/lib/common')
109 files changed, 0 insertions, 8612 deletions
diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp deleted file mode 100644 index f7d91498e0..0000000000 --- a/cpp/lib/common/Exception.cpp +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <Exception.h> - -namespace qpid { - -Exception::Exception() throw() {} - -Exception::Exception(const std::string& str) throw() : whatStr(str) {} - -Exception::Exception(const char* str) throw() : whatStr(str) {} - -Exception::~Exception() throw() {} - -const char* Exception::what() const throw() { return whatStr.c_str(); } - -std::string Exception::toString() const throw() { return whatStr; } - -Exception* Exception::clone() const throw() { return new Exception(*this); } - -void Exception::throwSelf() const { throw *this; } - -ShutdownException::ShutdownException() : Exception("Shut down.") {} - -EmptyException::EmptyException() : Exception("Empty.") {} - -} // namespace qpid diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h deleted file mode 100644 index b8cd68cf8c..0000000000 --- a/cpp/lib/common/Exception.h +++ /dev/null @@ -1,97 +0,0 @@ -#ifndef _Exception_ -#define _Exception_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <exception> -#include <string> -#include <memory> -#include <boost/shared_ptr.hpp> -#include <boost/lexical_cast.hpp> - -#include "amqp_types.h" - -namespace qpid -{ -/** - * Exception base class for all Qpid exceptions. - */ -class Exception : public std::exception -{ - protected: - std::string whatStr; - - public: - Exception() throw(); - Exception(const std::string& str) throw(); - Exception(const char* str) throw(); - Exception(const std::exception&) throw(); - - /** Allow any type that has ostream operator<< to act as message */ - template <class T> - Exception(const T& message) - : whatStr(boost::lexical_cast<std::string>(message)) {} - - virtual ~Exception() throw(); - - virtual const char* what() const throw(); - virtual std::string toString() const throw(); - - virtual Exception* clone() const throw(); - virtual void throwSelf() const; - - typedef boost::shared_ptr<Exception> shared_ptr; -}; - -struct ChannelException : public Exception { - framing::ReplyCode code; - template <class T> - ChannelException(framing::ReplyCode code_, const T& message) - : Exception(message), code(code_) {} - void throwSelf() const { throw *this; } -}; - -struct ConnectionException : public Exception { - framing::ReplyCode code; - template <class T> - ConnectionException(framing::ReplyCode code_, const T& message) - : Exception(message), code(code_) {} - void throwSelf() const { throw *this; } -}; - -/** - * Exception used to indicate that a thread should shut down. - * Does not indicate an error that should be signalled to the user. - */ -struct ShutdownException : public Exception { - ShutdownException(); - void throwSelf() const { throw *this; } -}; - -/** Exception to indicate empty queue or other empty state */ -struct EmptyException : public Exception { - EmptyException(); - void throwSelf() const { throw *this; } -}; - -} - -#endif /*!_Exception_*/ diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp deleted file mode 100644 index de8d7b2487..0000000000 --- a/cpp/lib/common/ExceptionHolder.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ExceptionHolder.h" - -namespace qpid { - -ExceptionHolder::ExceptionHolder(const std::exception& e) { - const Exception* ex = dynamic_cast<const Exception*>(&e); - if (ex) { - reset(ex->clone()); - } else { - reset(new Exception(e.what())); - } -} - -} diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h deleted file mode 100644 index 2769455aba..0000000000 --- a/cpp/lib/common/ExceptionHolder.h +++ /dev/null @@ -1,67 +0,0 @@ -#ifndef _qpid_ExceptionHolder_h -#define _qpid_ExceptionHolder_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <assert.h> -#include <Exception.h> -#include <boost/shared_ptr.hpp> - -namespace qpid { - -// FIXME aconway 2007-02-20: Not necessary, a simple -// Exception::shared_ptr will do the job. Remove -// -/** - * Holder for a heap-allocated exc eption that can be stack allocated - * and thrown safely. - * - * Basically this is a shared_ptr with the Exception functions added - * so the catcher need not be aware that it is a pointer rather than a - * reference. - * - * shared_ptr is chosen over auto_ptr because it has normal - * copy semantics. - */ -class ExceptionHolder : public Exception, public boost::shared_ptr<Exception> -{ - public: - typedef boost::shared_ptr<Exception> shared_ptr; - - ExceptionHolder() throw() {} - ExceptionHolder(Exception* p) throw() : shared_ptr(p) {} - ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {} - - ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {} - ExceptionHolder(const std::exception& e); - - ~ExceptionHolder() throw() {} - - const char* what() const throw() { return get()->what(); } - std::string toString() const throw() { return get()->toString(); } - Exception* clone() const throw() { return get()->clone(); } - void throwIf() const { if (get()) get()->throwSelf(); } - void throwSelf() const { assert(get()); get()->throwSelf(); } -}; - -} // namespace qpid - - - -#endif /*!_qpid_ExceptionHolder_h*/ diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am deleted file mode 100644 index 355f764b6e..0000000000 --- a/cpp/lib/common/Makefile.am +++ /dev/null @@ -1,143 +0,0 @@ -AM_CXXFLAGS = $(WARNING_CFLAGS) - -INCLUDES = \ - -I$(top_srcdir)/gen \ - -I$(top_srcdir)/lib/common/sys \ - -I$(top_srcdir)/lib/common/framing \ - $(APR_CXXFLAGS) - -apr = sys/apr -apr_src = \ - $(apr)/APRAcceptor.cpp \ - $(apr)/APRBase.cpp \ - $(apr)/APRPool.cpp \ - $(apr)/APRSocket.cpp \ - $(apr)/LFProcessor.cpp \ - $(apr)/LFSessionContext.cpp \ - $(apr)/Socket.cpp \ - $(apr)/Thread.cpp -apr_hdr = \ - $(apr)/APRBase.h \ - $(apr)/APRPool.h \ - $(apr)/APRSocket.h \ - $(apr)/LFProcessor.h \ - $(apr)/LFSessionContext.h - -posix = sys/posix -posix_src = \ - $(posix)/PosixAcceptor.cpp \ - $(posix)/Socket.cpp \ - $(posix)/Thread.cpp \ - $(posix)/check.cpp \ - $(posix)/EventChannel.cpp \ - $(posix)/EventChannelThreads.cpp -posix_hdr = \ - $(posix)/check.h \ - $(posix)/EventChannel.h \ - $(posix)/EventChannelThreads.h - -EXTRA_DIST=$(posix_src) $(posix_hdr) -platform_src = $(apr_src) -platform_hdr = $(apr_hdr) - -framing = framing -gen = $(srcdir)/../../gen - -lib_LTLIBRARIES = libqpidcommon.la -libqpidcommon_la_LIBADD = \ - $(APR_LIBS) \ - $(LIB_DLOPEN) \ - $(LIB_CLOCK_GETTIME) - -libqpidcommon_la_LDFLAGS = \ - -version-info \ - $(LIBTOOL_VERSION_INFO_ARG) - -libqpidcommon_la_SOURCES = \ - $(platform_src) \ - $(framing)/AMQBody.cpp \ - $(framing)/AMQRequestBody.cpp \ - $(framing)/AMQResponseBody.cpp \ - $(framing)/AMQContentBody.cpp \ - $(framing)/AMQFrame.cpp \ - $(framing)/AMQHeaderBody.cpp \ - $(framing)/AMQHeartbeatBody.cpp \ - $(framing)/AMQMethodBody.cpp \ - $(framing)/MethodContext.cpp \ - $(framing)/BasicHeaderProperties.cpp \ - $(framing)/BodyHandler.cpp \ - $(framing)/ChannelAdapter.cpp \ - $(framing)/Buffer.cpp \ - $(framing)/FieldTable.cpp \ - $(framing)/FramingContent.cpp \ - $(framing)/InitiationHandler.cpp \ - $(framing)/ProtocolInitiation.cpp \ - $(framing)/ProtocolVersion.cpp \ - $(framing)/ProtocolVersionException.cpp \ - $(framing)/Requester.cpp \ - $(framing)/Responder.cpp \ - $(framing)/Value.cpp \ - $(framing)/Proxy.cpp \ - $(gen)/AMQP_ClientProxy.cpp \ - $(gen)/AMQP_HighestVersion.h \ - $(gen)/AMQP_MethodVersionMap.cpp \ - $(gen)/AMQP_ServerProxy.cpp \ - Exception.cpp \ - ExceptionHolder.cpp \ - QpidError.cpp \ - sys/Runnable.cpp \ - sys/Time.cpp \ - sys/ProducerConsumer.cpp - -nobase_pkginclude_HEADERS = \ - $(gen)/AMQP_HighestVersion.h \ - $(platform_hdr) \ - $(framing)/AMQBody.h \ - $(framing)/AMQContentBody.h \ - $(framing)/AMQDataBlock.h \ - $(framing)/AMQFrame.h \ - $(framing)/AMQHeaderBody.h \ - $(framing)/AMQHeartbeatBody.h \ - $(framing)/AMQMethodBody.h \ - $(framing)/MethodContext.h \ - $(framing)/BasicHeaderProperties.h \ - $(framing)/BodyHandler.h \ - $(framing)/ChannelAdapter.h \ - $(framing)/Buffer.h \ - $(framing)/FieldTable.h \ - $(framing)/FramingContent.h \ - $(framing)/HeaderProperties.h \ - $(framing)/InitiationHandler.h \ - $(framing)/InputHandler.h \ - $(framing)/OutputHandler.h \ - $(framing)/ProtocolInitiation.h \ - $(framing)/ProtocolVersion.h \ - $(framing)/ProtocolVersionException.h \ - $(framing)/Value.h \ - $(framing)/amqp_framing.h \ - $(framing)/amqp_types.h \ - $(framing)/Proxy.h \ - shared_ptr.h \ - Exception.h \ - ExceptionHolder.h \ - QpidError.h \ - SharedObject.h \ - sys/Acceptor.h \ - sys/AtomicCount.h \ - sys/Module.h \ - sys/Monitor.h \ - sys/Mutex.h \ - sys/Runnable.h \ - sys/ConnectionOutputHandler.h \ - sys/ConnectionInputHandler.h \ - sys/ConnectionInputHandlerFactory.h \ - sys/ShutdownHandler.h \ - sys/Socket.h \ - sys/Thread.h \ - sys/Time.h \ - sys/TimeoutHandler.h \ - sys/ProducerConsumer.h - - -# Force build during dist phase so help2man will work. -dist-hook: $(lib_LTLIBRARIES) diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp deleted file mode 100644 index 9cd3051d1e..0000000000 --- a/cpp/lib/common/QpidError.cpp +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/format.hpp> - -#include <QpidError.h> -#include <sstream> - -using namespace qpid; - -QpidError::QpidError() : code(0) {} - -QpidError::~QpidError() throw() {} - -Exception* QpidError::clone() const throw() { return new QpidError(*this); } - -void QpidError::throwSelf() const { throw *this; } - -void QpidError::init() { - whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)") - % code % msg % loc.file % loc.line); -} - - diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h deleted file mode 100644 index 2a0395ab79..0000000000 --- a/cpp/lib/common/QpidError.h +++ /dev/null @@ -1,78 +0,0 @@ -#ifndef __QpidError__ -#define __QpidError__ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <memory> -#include <ostream> - -#include <Exception.h> - -namespace qpid { - -struct SrcLine { - public: - SrcLine(const std::string& file_="", int line_=0) : - file(file_), line(line_) {} - - std::string file; - int line; -}; - -class QpidError : public Exception -{ - public: - const int code; - SrcLine loc; - std::string msg; - - QpidError(); - - template <class T> - QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) - { init(); } - - ~QpidError() throw(); - Exception* clone() const throw(); - void throwSelf() const; - - private: - - void init(); -}; - - -} // namespace qpid - -#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) - -#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) - -#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) - -const int PROTOCOL_ERROR = 10000; -const int APR_ERROR = 20000; -const int FRAMING_ERROR = 30000; -const int CLIENT_ERROR = 40000; -const int INTERNAL_ERROR = 50000; - -#endif diff --git a/cpp/lib/common/SharedObject.h b/cpp/lib/common/SharedObject.h deleted file mode 100644 index 852a036ab9..0000000000 --- a/cpp/lib/common/SharedObject.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _SharedObject_ -#define _SharedObject_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> -#include <boost/noncopyable.hpp> - -namespace qpid { - /** - * Template to enforce shared object conventions. - * Shared object classes should inherit : public qpid::SharedObject - * That ensures Foo: - * - has typedef boost::shared_ptr<T> shared_ptr - * - has virtual destructor - * - is boost::noncopyable (no default copy or assign) - * - has a protected default constructor. - * - * Shared objects should not have public constructors. - * Make constructors protected and provide public statc create() - * functions that return a shared_ptr. - */ - template <class T> - class SharedObject : private boost::noncopyable - { - public: - typedef boost::shared_ptr<T> shared_ptr; - - virtual ~SharedObject() {}; - - protected: - SharedObject() {} - }; -} - -#endif /*!_SharedObject_*/ diff --git a/cpp/lib/common/doxygen_mainpage.h b/cpp/lib/common/doxygen_mainpage.h deleted file mode 100644 index b354238cd0..0000000000 --- a/cpp/lib/common/doxygen_mainpage.h +++ /dev/null @@ -1,45 +0,0 @@ -// This header file is just for doxygen documentation purposes. - -/*!\mainpage Qpid C++ Developer Kit. - * - *\section intro_sec Introduction - * - * The <a href=http://incubator.apache.org/qpid/index.html>Qpid project</a> provides implementations of the <a href="http://amqp.org/">AMQP messaging specification</a> in several programming language. - * - * Qpidc provides APIs and libraries to implement AMQP - * clients in C++. Qpidc clients can interact with any compliant AMQP - * message broker. The Qpid project also provides an AMQP broker - * daemon called qpidd that you can use with your qpidc clients. - * - *\section install_sec Installation - * - * If you are installing from the source distribution - <pre> - > ./configure && make - > make install </pre> - * This will build and install the client development kit and the broker - * in standard places. Use - * <code>./configure --help</code> for more options. - * - * You can also install from RPMs with the <code>rpm -i</code> command. - * You will need - * - <code>qpidc</code> for core libraries. - * - <code>qpidc-devel</code> for header files and developer documentation. - * - <code>qpidd</code> for the broker daemon. - * - *\section getstart_sec Getting Started - * - * If you have installed in the standard places you should use - * these compile flags: - * - *<code> -I/usr/include/qpidc -I/usr/include/qpidc/framing -I/usr/include/qpidc/sys</code> - * - * and these link flags: - * - *<code> -lqpidcommon -lqpidclient</code> - * - * If you have installed somewhere else you should modify the flags - * appropriately. - * - * See the \ref clientapi "client API module" for more on the client API. - */ diff --git a/cpp/lib/common/framing/AMQBody.cpp b/cpp/lib/common/framing/AMQBody.cpp deleted file mode 100644 index c7c253beda..0000000000 --- a/cpp/lib/common/framing/AMQBody.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <AMQBody.h> -#include <iostream> - -std::ostream& qpid::framing::operator<<(std::ostream& out, const qpid::framing::AMQBody& body) -{ - body.print(out); - return out; -} - -qpid::framing::AMQBody::~AMQBody() {} - - diff --git a/cpp/lib/common/framing/AMQBody.h b/cpp/lib/common/framing/AMQBody.h deleted file mode 100644 index 26076956ca..0000000000 --- a/cpp/lib/common/framing/AMQBody.h +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/shared_ptr.hpp> -#include <amqp_types.h> -#include <Buffer.h> - -#ifndef _AMQBody_ -#define _AMQBody_ - -namespace qpid { - namespace framing { - - class AMQBody - { - public: - typedef boost::shared_ptr<AMQBody> shared_ptr; - - virtual ~AMQBody(); - virtual uint32_t size() const = 0; - virtual uint8_t type() const = 0; - virtual void encode(Buffer& buffer) const = 0; - virtual void decode(Buffer& buffer, uint32_t size) = 0; - - virtual void print(std::ostream& out) const = 0; - }; - - std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; - - enum BodyTypes { - METHOD_BODY = 1, - HEADER_BODY = 2, - CONTENT_BODY = 3, - HEARTBEAT_BODY = 8, - REQUEST_BODY = 9, - RESPONSE_BODY = 10 - }; - } -} - - -#endif diff --git a/cpp/lib/common/framing/AMQContentBody.cpp b/cpp/lib/common/framing/AMQContentBody.cpp deleted file mode 100644 index 573c17dade..0000000000 --- a/cpp/lib/common/framing/AMQContentBody.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <AMQContentBody.h> -#include <iostream> - -qpid::framing::AMQContentBody::AMQContentBody(){ -} - -qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){ -} - -uint32_t qpid::framing::AMQContentBody::size() const{ - return data.size(); -} -void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{ - buffer.putRawData(data); -} -void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){ - buffer.getRawData(data, _size); -} - -void qpid::framing::AMQContentBody::print(std::ostream& out) const -{ - out << "content (" << size() << " bytes)"; -} diff --git a/cpp/lib/common/framing/AMQContentBody.h b/cpp/lib/common/framing/AMQContentBody.h deleted file mode 100644 index c9fa7cde5c..0000000000 --- a/cpp/lib/common/framing/AMQContentBody.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <AMQBody.h> -#include <Buffer.h> - -#ifndef _AMQContentBody_ -#define _AMQContentBody_ - -namespace qpid { -namespace framing { - -class AMQContentBody : public AMQBody -{ - string data; - -public: - typedef boost::shared_ptr<AMQContentBody> shared_ptr; - - AMQContentBody(); - AMQContentBody(const string& data); - inline virtual ~AMQContentBody(){} - inline uint8_t type() const { return CONTENT_BODY; }; - inline string& getData(){ return data; } - uint32_t size() const; - void encode(Buffer& buffer) const; - void decode(Buffer& buffer, uint32_t size); - void print(std::ostream& out) const; -}; - -} -} - - -#endif diff --git a/cpp/lib/common/framing/AMQDataBlock.h b/cpp/lib/common/framing/AMQDataBlock.h deleted file mode 100644 index 2a6f843f1e..0000000000 --- a/cpp/lib/common/framing/AMQDataBlock.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Buffer.h> - -#ifndef _AMQDataBlock_ -#define _AMQDataBlock_ - -namespace qpid { -namespace framing { - -class AMQDataBlock -{ -public: - virtual ~AMQDataBlock() {} - virtual void encode(Buffer& buffer) = 0; - virtual bool decode(Buffer& buffer) = 0; - virtual uint32_t size() const = 0; -}; - -} -} - - -#endif diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp deleted file mode 100644 index bc9061b169..0000000000 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ /dev/null @@ -1,139 +0,0 @@ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> - -#include <AMQFrame.h> -#include <QpidError.h> -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" - - -namespace qpid { -namespace framing { - - -AMQP_MethodVersionMap AMQFrame::versionMap; - -AMQFrame::AMQFrame(ProtocolVersion _version): -version(_version) - { - assert(version != ProtocolVersion(0,0)); - } - -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : -version(_version), channel(_channel), body(_body) -{} - -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) : -version(_version), channel(_channel), body(_body) -{} - -AMQFrame::~AMQFrame() {} - -uint16_t AMQFrame::getChannel(){ - return channel; -} - -AMQBody::shared_ptr AMQFrame::getBody(){ - return body; -} - -void AMQFrame::encode(Buffer& buffer) -{ - buffer.putOctet(body->type()); - buffer.putShort(channel); - buffer.putLong(body->size()); - body->encode(buffer); - buffer.putOctet(0xCE); -} - -uint32_t AMQFrame::size() const{ - assert(body.get()); - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() - + 1/*0xCE*/; -} - -bool AMQFrame::decode(Buffer& buffer) -{ - if(buffer.available() < 7) - return false; - buffer.record(); - uint32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ - buffer.restore(); - return false; - } - decodeBody(buffer, frameSize); - uint8_t end = buffer.getOctet(); - if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); - return true; -} - -uint32_t AMQFrame::decodeHead(Buffer& buffer){ - type = buffer.getOctet(); - channel = buffer.getShort(); - return buffer.getLong(); -} - -void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) -{ - switch(type) - { - case METHOD_BODY: - body = AMQMethodBody::create(versionMap, version, buffer); - break; - case REQUEST_BODY: - body = AMQRequestBody::create(versionMap, version, buffer); - break; - case RESPONSE_BODY: - body = AMQResponseBody::create(versionMap, version, buffer); - break; - case HEADER_BODY: - body = AMQBody::shared_ptr(new AMQHeaderBody()); - break; - case CONTENT_BODY: - body = AMQBody::shared_ptr(new AMQContentBody()); - break; - case HEARTBEAT_BODY: - body = AMQBody::shared_ptr(new AMQHeartbeatBody()); - break; - default: - THROW_QPID_ERROR( - FRAMING_ERROR, - boost::format("Unknown frame type %d") % type); - } - body->decode(buffer, size); -} - -std::ostream& operator<<(std::ostream& out, const AMQFrame& t) -{ - out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) - out << "empty"; - else - out << *t.body; - out << "]"; - return out; -} - - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h deleted file mode 100644 index 0c18e0c2a5..0000000000 --- a/cpp/lib/common/framing/AMQFrame.h +++ /dev/null @@ -1,78 +0,0 @@ -#ifndef _AMQFrame_ -#define _AMQFrame_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/cast.hpp> - -#include <amqp_types.h> -#include <AMQBody.h> -#include <AMQDataBlock.h> -#include <AMQMethodBody.h> -#include <AMQHeaderBody.h> -#include <AMQContentBody.h> -#include <AMQHeartbeatBody.h> -#include <AMQP_MethodVersionMap.h> -#include <AMQP_HighestVersion.h> -#include <Buffer.h> - -namespace qpid { -namespace framing { - - -class AMQFrame : public AMQDataBlock -{ - public: - AMQFrame(ProtocolVersion _version = highestProtocolVersion); - AMQFrame(ProtocolVersion _version, uint16_t channel, AMQBody* body); - AMQFrame(ProtocolVersion _version, uint16_t channel, const AMQBody::shared_ptr& body); - virtual ~AMQFrame(); - virtual void encode(Buffer& buffer); - virtual bool decode(Buffer& buffer); - virtual uint32_t size() const; - uint16_t getChannel(); - AMQBody::shared_ptr getBody(); - - /** Convenience template to cast the body to an expected type */ - template <class T> boost::shared_ptr<T> castBody() { - assert(dynamic_cast<T*>(getBody().get())); - boost::static_pointer_cast<T>(getBody()); - } - - uint32_t decodeHead(Buffer& buffer); - void decodeBody(Buffer& buffer, uint32_t size); - - private: - static AMQP_MethodVersionMap versionMap; - ProtocolVersion version; - - uint16_t channel; - uint8_t type; - AMQBody::shared_ptr body; - - - friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body); -}; - -}} // namespace qpid::framing - - -#endif diff --git a/cpp/lib/common/framing/AMQHeaderBody.cpp b/cpp/lib/common/framing/AMQHeaderBody.cpp deleted file mode 100644 index 3ddae4eebf..0000000000 --- a/cpp/lib/common/framing/AMQHeaderBody.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <AMQHeaderBody.h> -#include <QpidError.h> -#include <BasicHeaderProperties.h> - -qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){ - createProperties(classId); -} - -qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){ -} - -qpid::framing::AMQHeaderBody::~AMQHeaderBody(){ - delete properties; -} - -uint32_t qpid::framing::AMQHeaderBody::size() const{ - return 12 + properties->size(); -} - -void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { - buffer.putShort(properties->classId()); - buffer.putShort(weight); - buffer.putLongLong(contentSize); - properties->encode(buffer); -} - -void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){ - uint16_t classId = buffer.getShort(); - weight = buffer.getShort(); - contentSize = buffer.getLongLong(); - createProperties(classId); - properties->decode(buffer, bufSize - 12); -} - -void qpid::framing::AMQHeaderBody::createProperties(int classId){ - switch(classId){ - case BASIC: - properties = new qpid::framing::BasicHeaderProperties(); - break; - default: - THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class"); - } -} - -void qpid::framing::AMQHeaderBody::print(std::ostream& out) const -{ - out << "header (" << size() << " bytes)" << " content_size=" << getContentSize(); - const BasicHeaderProperties* props = - dynamic_cast<const BasicHeaderProperties*>(getProperties()); - if (props) { - out << ", message_id=" << props->getMessageId(); - out << ", delivery_mode=" << (int) props->getDeliveryMode(); - out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders(); - } -} diff --git a/cpp/lib/common/framing/AMQHeaderBody.h b/cpp/lib/common/framing/AMQHeaderBody.h deleted file mode 100644 index d57f93aacd..0000000000 --- a/cpp/lib/common/framing/AMQHeaderBody.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <AMQBody.h> -#include <Buffer.h> -#include <HeaderProperties.h> - -#ifndef _AMQHeaderBody_ -#define _AMQHeaderBody_ - -namespace qpid { -namespace framing { - -class AMQHeaderBody : public AMQBody -{ - HeaderProperties* properties; - uint16_t weight; - uint64_t contentSize; - - void createProperties(int classId); -public: - typedef boost::shared_ptr<AMQHeaderBody> shared_ptr; - - AMQHeaderBody(int classId); - AMQHeaderBody(); - inline uint8_t type() const { return HEADER_BODY; } - HeaderProperties* getProperties(){ return properties; } - const HeaderProperties* getProperties() const { return properties; } - inline uint64_t getContentSize() const { return contentSize; } - inline void setContentSize(uint64_t _size) { contentSize = _size; } - virtual ~AMQHeaderBody(); - virtual uint32_t size() const; - virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); - virtual void print(std::ostream& out) const; -}; - -} -} - - -#endif diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.cpp b/cpp/lib/common/framing/AMQHeartbeatBody.cpp deleted file mode 100644 index 63f83a3d29..0000000000 --- a/cpp/lib/common/framing/AMQHeartbeatBody.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <AMQHeartbeatBody.h> -#include <iostream> - -qpid::framing::AMQHeartbeatBody::~AMQHeartbeatBody() {} - -void qpid::framing::AMQHeartbeatBody::print(std::ostream& out) const { - out << "heartbeat"; -} diff --git a/cpp/lib/common/framing/AMQHeartbeatBody.h b/cpp/lib/common/framing/AMQHeartbeatBody.h deleted file mode 100644 index a3e9d823f1..0000000000 --- a/cpp/lib/common/framing/AMQHeartbeatBody.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <AMQBody.h> -#include <Buffer.h> - -#ifndef _AMQHeartbeatBody_ -#define _AMQHeartbeatBody_ - -namespace qpid { -namespace framing { - -class AMQHeartbeatBody : public AMQBody -{ -public: - typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr; - - virtual ~AMQHeartbeatBody(); - inline uint32_t size() const { return 0; } - inline uint8_t type() const { return HEARTBEAT_BODY; } - inline void encode(Buffer& ) const {} - inline void decode(Buffer& , uint32_t /*size*/) {} - virtual void print(std::ostream& out) const; -}; - -} -} - -#endif diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp deleted file mode 100644 index 23502068f5..0000000000 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <AMQFrame.h> -#include <AMQMethodBody.h> -#include <QpidError.h> -#include "AMQP_MethodVersionMap.h" - -namespace qpid { -namespace framing { - -void AMQMethodBody::encodeId(Buffer& buffer) const{ - buffer.putShort(amqpClassId()); - buffer.putShort(amqpMethodId()); -} - -void AMQMethodBody::invoke(AMQP_ServerOperations&, const MethodContext&){ - assert(0); - THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); -} - -AMQMethodBody::shared_ptr AMQMethodBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - id.decode(buffer); - return AMQMethodBody::shared_ptr( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); -} - -void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) { - classId = buffer.getShort(); - methodId = buffer.getShort(); -} - -void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) { - decodeContent(buffer); -} - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h deleted file mode 100644 index c2b00c2169..0000000000 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef _AMQMethodBody_ -#define _AMQMethodBody_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <amqp_types.h> -#include <AMQBody.h> -#include <Buffer.h> -#include <AMQP_ServerOperations.h> -#include <MethodContext.h> - -namespace qpid { -namespace framing { - -class AMQP_MethodVersionMap; - -class AMQMethodBody : public AMQBody -{ - public: - typedef boost::shared_ptr<AMQMethodBody> shared_ptr; - - static shared_ptr create( - AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf); - - ProtocolVersion version; - uint8_t type() const { return METHOD_BODY; } - AMQMethodBody(uint8_t major, uint8_t minor) : version(major, minor) {} - AMQMethodBody(ProtocolVersion ver) : version(ver) {} - virtual ~AMQMethodBody() {} - void decode(Buffer&, uint32_t); - - virtual MethodId amqpMethodId() const = 0; - virtual ClassId amqpClassId() const = 0; - - virtual void invoke(AMQP_ServerOperations&, const MethodContext&); - - template <class T> bool isA() { - return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; - } - - /** Return request ID or response correlationID */ - virtual RequestId getRequestId() const { return 0; } - - virtual bool isRequest() const { return false; } - virtual bool isResponse() const { return false; } - - protected: - static uint32_t baseSize() { return 4; } - - struct ClassMethodId { - uint16_t classId; - uint16_t methodId; - void decode(Buffer& b); - }; - - void encodeId(Buffer& buffer) const; - virtual void encodeContent(Buffer& buffer) const = 0; - virtual void decodeContent(Buffer& buffer) = 0; -}; - - -}} // namespace qpid::framing - - -#endif diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp deleted file mode 100644 index 54e1c11863..0000000000 --- a/cpp/lib/common/framing/AMQRequestBody.cpp +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQRequestBody.h" -#include "AMQP_MethodVersionMap.h" - -namespace qpid { -namespace framing { - -void AMQRequestBody::Data::encode(Buffer& buffer) const { - buffer.putLongLong(requestId); - buffer.putLongLong(responseMark); - buffer.putLong(0); // Reserved long in spec. -} - -void AMQRequestBody::Data::decode(Buffer& buffer) { - requestId = buffer.getLongLong(); - responseMark = buffer.getLongLong(); - buffer.getLong(); // Ignore reserved long. -} - -void AMQRequestBody::encode(Buffer& buffer) const { - data.encode(buffer); - encodeId(buffer); - encodeContent(buffer); -} - -AMQRequestBody::shared_ptr -AMQRequestBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - Data data; - data.decode(buffer); - id.decode(buffer); - AMQRequestBody* body = dynamic_cast<AMQRequestBody*>( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); - assert(body); - body->data = data; - return AMQRequestBody::shared_ptr(body); -} - -void AMQRequestBody::printPrefix(std::ostream& out) const { - out << "request(id=" << data.requestId << ",mark=" - << data.responseMark << "): "; -} - -}} // namespace qpid::framing - diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h deleted file mode 100644 index f21659a57a..0000000000 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ /dev/null @@ -1,78 +0,0 @@ -#ifndef _framing_AMQRequestBody_h -#define _framing_AMQRequestBody_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQMethodBody.h" - -namespace qpid { -namespace framing { - -/** - * Body of a request method frame. - */ -class AMQRequestBody : public AMQMethodBody -{ - public: - typedef boost::shared_ptr<AMQRequestBody> shared_ptr; - - struct Data { - Data(RequestId id=0, ResponseId mark=0) - : requestId(id), responseMark(mark) {} - void encode(Buffer&) const; - void decode(Buffer&); - - RequestId requestId; - ResponseId responseMark; - }; - - static Data& getData(const AMQBody::shared_ptr& body) { - return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData(); - } - - static shared_ptr create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer); - - AMQRequestBody(ProtocolVersion v, RequestId id=0, ResponseId mark=0) - : AMQMethodBody(v), data(id, mark) {} - - uint8_t type() const { return REQUEST_BODY; } - void encode(Buffer& buffer) const; - - Data& getData() { return data; } - RequestId getRequestId() const { return data.requestId; } - ResponseId getResponseMark() const { return data.responseMark; } - void setRequestId(RequestId id) { data.requestId=id; } - void setResponseMark(ResponseId mark) { data.responseMark=mark; } - - bool isRequest()const { return true; } - static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; } - protected: - void printPrefix(std::ostream& out) const; - - private: - Data data; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_AMQRequestBody_h*/ diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp deleted file mode 100644 index 7da71a5d25..0000000000 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQFrame.h" -#include "AMQResponseBody.h" -#include "AMQP_MethodVersionMap.h" - -namespace qpid { -namespace framing { - -void AMQResponseBody::Data::encode(Buffer& buffer) const { - buffer.putLongLong(responseId); - buffer.putLongLong(requestId); - buffer.putLong(batchOffset); -} - -void AMQResponseBody::Data::decode(Buffer& buffer) { - responseId = buffer.getLongLong(); - requestId = buffer.getLongLong(); - batchOffset = buffer.getLong(); -} - -void AMQResponseBody::encode(Buffer& buffer) const { - data.encode(buffer); - encodeId(buffer); - encodeContent(buffer); -} - -AMQResponseBody::shared_ptr AMQResponseBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - Data data; - data.decode(buffer); - id.decode(buffer); - AMQResponseBody* body = dynamic_cast<AMQResponseBody*>( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); - assert(body); - body->data = data; - return AMQResponseBody::shared_ptr(body); -} - -void AMQResponseBody::printPrefix(std::ostream& out) const { - out << "response(id=" << data.responseId << ",request=" << data.requestId - << ",batch=" << data.batchOffset << "): "; -} - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h deleted file mode 100644 index fa381baddd..0000000000 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef _framing_AMQResponseBody_h -#define _framing_AMQResponseBody_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQMethodBody.h" - -namespace qpid { -namespace framing { - -class AMQP_MethodVersionMap; - -/** - * Body of a response method frame. - */ -class AMQResponseBody : public AMQMethodBody -{ - - public: - typedef boost::shared_ptr<AMQResponseBody> shared_ptr; - - struct Data { - Data(ResponseId id=0, RequestId req=0, BatchOffset off=0) - : responseId(id), requestId(req), batchOffset(off) {} - void encode(Buffer&) const; - void decode(Buffer&); - - uint64_t responseId; - uint64_t requestId; - uint32_t batchOffset; - }; - - static Data& getData(const AMQBody::shared_ptr& body) { - return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData(); - } - - static shared_ptr create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer); - - AMQResponseBody( - ProtocolVersion v, ResponseId id=0, RequestId req=0, BatchOffset off=0) - : AMQMethodBody(v), data(id, req, off) {} - - uint8_t type() const { return RESPONSE_BODY; } - void encode(Buffer& buffer) const; - - Data& getData() { return data; } - ResponseId getResponseId() const { return data.responseId; } - RequestId getRequestId() const { return data.requestId; } - BatchOffset getBatchOffset() const { return data.batchOffset; } - void setResponseId(ResponseId id) { data.responseId = id; } - void setRequestId(RequestId id) { data.requestId = id; } - void setBatchOffset(BatchOffset id) { data.batchOffset = id; } - - bool isResponse() const { return true; } - protected: - static const uint32_t baseSize() { return AMQMethodBody::baseSize()+20; } - void printPrefix(std::ostream& out) const; - - private: - Data data; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_AMQResponseBody_h*/ diff --git a/cpp/lib/common/framing/BasicHeaderProperties.cpp b/cpp/lib/common/framing/BasicHeaderProperties.cpp deleted file mode 100644 index d815d1e62f..0000000000 --- a/cpp/lib/common/framing/BasicHeaderProperties.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <BasicHeaderProperties.h> - -//TODO: This could be easily generated from the spec - -qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){} -qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} - -uint32_t qpid::framing::BasicHeaderProperties::size() const{ - uint32_t bytes = 2;//flags - if(contentType.length() > 0) bytes += contentType.length() + 1; - if(contentEncoding.length() > 0) bytes += contentEncoding.length() + 1; - if(headers.count() > 0) bytes += headers.size(); - if(deliveryMode != 0) bytes += 1; - if(priority != 0) bytes += 1; - if(correlationId.length() > 0) bytes += correlationId.length() + 1; - if(replyTo.length() > 0) bytes += replyTo.length() + 1; - if(expiration.length() > 0) bytes += expiration.length() + 1; - if(messageId.length() > 0) bytes += messageId.length() + 1; - if(timestamp != 0) bytes += 8; - if(type.length() > 0) bytes += type.length() + 1; - if(userId.length() > 0) bytes += userId.length() + 1; - if(appId.length() > 0) bytes += appId.length() + 1; - if(clusterId.length() > 0) bytes += clusterId.length() + 1; - - return bytes; -} - -void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) const{ - uint16_t flags = getFlags(); - buffer.putShort(flags); - - if(contentType.length() > 0) buffer.putShortString(contentType); - if(contentEncoding.length() > 0) buffer.putShortString(contentEncoding); - if(headers.count() > 0) buffer.putFieldTable(headers); - if(deliveryMode != 0) buffer.putOctet(deliveryMode); - if(priority != 0) buffer.putOctet(priority); - if(correlationId.length() > 0) buffer.putShortString(correlationId); - if(replyTo.length() > 0) buffer.putShortString(replyTo); - if(expiration.length() > 0) buffer.putShortString(expiration); - if(messageId.length() > 0) buffer.putShortString(messageId); - if(timestamp != 0) buffer.putLongLong(timestamp);; - if(type.length() > 0) buffer.putShortString(type); - if(userId.length() > 0) buffer.putShortString(userId); - if(appId.length() > 0) buffer.putShortString(appId); - if(clusterId.length() > 0) buffer.putShortString(clusterId); -} - -void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){ - uint16_t flags = buffer.getShort(); - if(flags & (1 << 15)) buffer.getShortString(contentType); - if(flags & (1 << 14)) buffer.getShortString(contentEncoding); - if(flags & (1 << 13)) buffer.getFieldTable(headers); - if(flags & (1 << 12)) deliveryMode = DeliveryMode(buffer.getOctet()); - if(flags & (1 << 11)) priority = buffer.getOctet(); - if(flags & (1 << 10)) buffer.getShortString(correlationId); - if(flags & (1 << 9)) buffer.getShortString(replyTo); - if(flags & (1 << 8)) buffer.getShortString(expiration); - if(flags & (1 << 7)) buffer.getShortString(messageId); - if(flags & (1 << 6)) timestamp = buffer.getLongLong(); - if(flags & (1 << 5)) buffer.getShortString(type); - if(flags & (1 << 4)) buffer.getShortString(userId); - if(flags & (1 << 3)) buffer.getShortString(appId); - if(flags & (1 << 2)) buffer.getShortString(clusterId); -} - -uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{ - uint16_t flags(0); - if(contentType.length() > 0) flags |= (1 << 15); - if(contentEncoding.length() > 0) flags |= (1 << 14); - if(headers.count() > 0) flags |= (1 << 13); - if(deliveryMode != 0) flags |= (1 << 12); - if(priority != 0) flags |= (1 << 11); - if(correlationId.length() > 0) flags |= (1 << 10); - if(replyTo.length() > 0) flags |= (1 << 9); - if(expiration.length() > 0) flags |= (1 << 8); - if(messageId.length() > 0) flags |= (1 << 7); - if(timestamp != 0) flags |= (1 << 6); - if(type.length() > 0) flags |= (1 << 5); - if(userId.length() > 0) flags |= (1 << 4); - if(appId.length() > 0) flags |= (1 << 3); - if(clusterId.length() > 0) flags |= (1 << 2); - return flags; -} diff --git a/cpp/lib/common/framing/BasicHeaderProperties.h b/cpp/lib/common/framing/BasicHeaderProperties.h deleted file mode 100644 index 248014aefb..0000000000 --- a/cpp/lib/common/framing/BasicHeaderProperties.h +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <Buffer.h> -#include <FieldTable.h> -#include <HeaderProperties.h> - -#ifndef _BasicHeaderProperties_ -#define _BasicHeaderProperties_ - -namespace qpid { -namespace framing { - -enum DeliveryMode { TRANSIENT = 1, PERSISTENT = 2}; - -class BasicHeaderProperties : public HeaderProperties -{ - string contentType; - string contentEncoding; - FieldTable headers; - DeliveryMode deliveryMode; - uint8_t priority; - string correlationId; - string replyTo; - string expiration; - string messageId; - uint64_t timestamp; - string type; - string userId; - string appId; - string clusterId; - - uint16_t getFlags() const; - - public: - BasicHeaderProperties(); - virtual ~BasicHeaderProperties(); - virtual uint32_t size() const; - virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); - - virtual uint8_t classId() { return BASIC; } - - string getContentType() const { return contentType; } - string getContentEncoding() const { return contentEncoding; } - FieldTable& getHeaders() { return headers; } - const FieldTable& getHeaders() const { return headers; } - DeliveryMode getDeliveryMode() const { return deliveryMode; } - uint8_t getPriority() const { return priority; } - string getCorrelationId() const {return correlationId; } - string getReplyTo() const { return replyTo; } - string getExpiration() const { return expiration; } - string getMessageId() const {return messageId; } - uint64_t getTimestamp() const { return timestamp; } - string getType() const { return type; } - string getUserId() const { return userId; } - string getAppId() const { return appId; } - string getClusterId() const { return clusterId; } - - void setContentType(const string& _type){ contentType = _type; } - void setContentEncoding(const string& encoding){ contentEncoding = encoding; } - void setHeaders(const FieldTable& _headers){ headers = _headers; } - void setDeliveryMode(DeliveryMode mode){ deliveryMode = mode; } - void setPriority(uint8_t _priority){ priority = _priority; } - void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; } - void setReplyTo(const string& _replyTo){ replyTo = _replyTo;} - void setExpiration(const string& _expiration){ expiration = _expiration; } - void setMessageId(const string& _messageId){ messageId = _messageId; } - void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; } - void setType(const string& _type){ type = _type; } - void setUserId(const string& _userId){ userId = _userId; } - void setAppId(const string& _appId){appId = _appId; } - void setClusterId(const string& _clusterId){ clusterId = _clusterId; } - - /** \internal - * Template to copy between types like BasicHeaderProperties. - */ - template <class T, class U> - static void copy(T& to, const U& from) { - to.setContentType(from.getContentType()); - to.setContentEncoding(from.getContentEncoding()); - to.setHeaders(from.getHeaders()); - to.setDeliveryMode(from.getDeliveryMode()); - to.setPriority(from.getPriority()); - to.setCorrelationId(from.getCorrelationId()); - to.setReplyTo(from.getReplyTo()); - to.setExpiration(from.getExpiration()); - to.setMessageId(from.getMessageId()); - to.setTimestamp(from.getTimestamp()); - to.setType(from.getType()); - to.setUserId(from.getUserId()); - to.setAppId(from.getAppId()); - to.setClusterId(from.getClusterId()); - } -}; -}} -#endif diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp deleted file mode 100644 index 5dd0c0c23d..0000000000 --- a/cpp/lib/common/framing/BodyHandler.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "QpidError.h" -#include "BodyHandler.h" -#include <AMQRequestBody.h> -#include <AMQResponseBody.h> -#include <AMQMethodBody.h> -#include <AMQHeaderBody.h> -#include <AMQContentBody.h> -#include <AMQHeartbeatBody.h> - -using namespace qpid::framing; -using namespace boost; - -BodyHandler::~BodyHandler() {} - -void BodyHandler::handleBody(shared_ptr<AMQBody> body) { - switch(body->type()) - { - case REQUEST_BODY: - handleRequest(shared_polymorphic_cast<AMQRequestBody>(body)); - break; - case RESPONSE_BODY: - handleResponse(shared_polymorphic_cast<AMQResponseBody>(body)); - break; - case METHOD_BODY: - handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); - break; - case HEADER_BODY: - handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body)); - break; - case CONTENT_BODY: - handleContent(shared_polymorphic_cast<AMQContentBody>(body)); - break; - case HEARTBEAT_BODY: - handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body)); - break; - default: - QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); - } -} - diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h deleted file mode 100644 index cb3f0997b0..0000000000 --- a/cpp/lib/common/framing/BodyHandler.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef _BodyHandler_ -#define _BodyHandler_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> - -#include "Requester.h" -#include "Responder.h" - -namespace qpid { -namespace framing { - -class AMQRequestBody; -class AMQResponseBody; -class AMQMethodBody; -class AMQHeaderBody; -class AMQContentBody; -class AMQHeartbeatBody; - -/** - * Interface to handle incoming frame bodies. - * Derived classes provide logic for each frame type. - */ -class BodyHandler { - public: - virtual ~BodyHandler(); - virtual void handleBody(boost::shared_ptr<AMQBody> body); - - protected: - virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0; - virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0; - virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; - virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; - virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; - virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0; -}; - -}} - - -#endif diff --git a/cpp/lib/common/framing/Buffer.cpp b/cpp/lib/common/framing/Buffer.cpp deleted file mode 100644 index 52c9a42d55..0000000000 --- a/cpp/lib/common/framing/Buffer.cpp +++ /dev/null @@ -1,183 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Buffer.h> -#include <FramingContent.h> -#include <FieldTable.h> - -qpid::framing::Buffer::Buffer(uint32_t _size) : size(_size), owner(true), position(0), limit(_size){ - data = new char[size]; -} - -qpid::framing::Buffer::Buffer(char* _data, uint32_t _size) : size(_size), owner(false), data(_data), position(0), limit(_size){ -} - -qpid::framing::Buffer::~Buffer(){ - if(owner) delete[] data; -} - -void qpid::framing::Buffer::flip(){ - limit = position; - position = 0; -} - -void qpid::framing::Buffer::clear(){ - limit = size; - position = 0; -} - -void qpid::framing::Buffer::compact(){ - uint32_t p = limit - position; - //copy p chars from position to 0 - memmove(data, data + position, p); - limit = size; - position = p; -} - -void qpid::framing::Buffer::record(){ - r_position = position; - r_limit = limit; -} - -void qpid::framing::Buffer::restore(){ - position = r_position; - limit = r_limit; -} - -uint32_t qpid::framing::Buffer::available(){ - return limit - position; -} - -char* qpid::framing::Buffer::start(){ - return data + position; -} - -void qpid::framing::Buffer::move(uint32_t bytes){ - position += bytes; -} - -void qpid::framing::Buffer::putOctet(uint8_t i){ - data[position++] = i; -} - -void qpid::framing::Buffer::putShort(uint16_t i){ - uint16_t b = i; - data[position++] = (uint8_t) (0xFF & (b >> 8)); - data[position++] = (uint8_t) (0xFF & b); -} - -void qpid::framing::Buffer::putLong(uint32_t i){ - uint32_t b = i; - data[position++] = (uint8_t) (0xFF & (b >> 24)); - data[position++] = (uint8_t) (0xFF & (b >> 16)); - data[position++] = (uint8_t) (0xFF & (b >> 8)); - data[position++] = (uint8_t) (0xFF & b); -} - -void qpid::framing::Buffer::putLongLong(uint64_t i){ - uint32_t hi = i >> 32; - uint32_t lo = i; - putLong(hi); - putLong(lo); -} - -uint8_t qpid::framing::Buffer::getOctet(){ - return (uint8_t) data[position++]; -} - -uint16_t qpid::framing::Buffer::getShort(){ - uint16_t hi = (unsigned char) data[position++]; - hi = hi << 8; - hi |= (unsigned char) data[position++]; - return hi; -} - -uint32_t qpid::framing::Buffer::getLong(){ - uint32_t a = (unsigned char) data[position++]; - uint32_t b = (unsigned char) data[position++]; - uint32_t c = (unsigned char) data[position++]; - uint32_t d = (unsigned char) data[position++]; - a = a << 24; - a |= b << 16; - a |= c << 8; - a |= d; - return a; -} - -uint64_t qpid::framing::Buffer::getLongLong(){ - uint64_t hi = getLong(); - uint64_t lo = getLong(); - hi = hi << 32; - return hi | lo; -} - - -void qpid::framing::Buffer::putShortString(const string& s){ - uint8_t len = s.length(); - putOctet(len); - s.copy(data + position, len); - position += len; -} - -void qpid::framing::Buffer::putLongString(const string& s){ - uint32_t len = s.length(); - putLong(len); - s.copy(data + position, len); - position += len; -} - -void qpid::framing::Buffer::getShortString(string& s){ - uint8_t len = getOctet(); - s.assign(data + position, len); - position += len; -} - -void qpid::framing::Buffer::getLongString(string& s){ - uint32_t len = getLong(); - s.assign(data + position, len); - position += len; -} - -void qpid::framing::Buffer::putFieldTable(const FieldTable& t){ - t.encode(*this); -} - -void qpid::framing::Buffer::getFieldTable(FieldTable& t){ - t.decode(*this); -} - -void qpid::framing::Buffer::putContent(const Content& c){ - c.encode(*this); -} - -void qpid::framing::Buffer::getContent(Content& c){ - c.decode(*this); -} - -void qpid::framing::Buffer::putRawData(const string& s){ - uint32_t len = s.length(); - s.copy(data + position, len); - position += len; -} - -void qpid::framing::Buffer::getRawData(string& s, uint32_t len){ - s.assign(data + position, len); - position += len; -} diff --git a/cpp/lib/common/framing/Buffer.h b/cpp/lib/common/framing/Buffer.h deleted file mode 100644 index 63a15c7c3d..0000000000 --- a/cpp/lib/common/framing/Buffer.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> - -#ifndef _Buffer_ -#define _Buffer_ - -namespace qpid { -namespace framing { - -class Content; -class FieldTable; - -class Buffer -{ - const uint32_t size; - const bool owner;//indicates whether the data is owned by this instance - char* data; - uint32_t position; - uint32_t limit; - uint32_t r_position; - uint32_t r_limit; - -public: - - Buffer(uint32_t size); - Buffer(char* data, uint32_t size); - ~Buffer(); - - void flip(); - void clear(); - void compact(); - void record(); - void restore(); - uint32_t available(); - char* start(); - void move(uint32_t bytes); - - void putOctet(uint8_t i); - void putShort(uint16_t i); - void putLong(uint32_t i); - void putLongLong(uint64_t i); - - uint8_t getOctet(); - uint16_t getShort(); - uint32_t getLong(); - uint64_t getLongLong(); - - void putShortString(const string& s); - void putLongString(const string& s); - void getShortString(string& s); - void getLongString(string& s); - - void putFieldTable(const FieldTable& t); - void getFieldTable(FieldTable& t); - - void putContent(const Content& c); - void getContent(Content& c); - - void putRawData(const string& s); - void getRawData(string& s, uint32_t size); - -}; - -}} // namespace qpid::framing - - -#endif diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp deleted file mode 100644 index 8a1ff39ee5..0000000000 --- a/cpp/lib/common/framing/ChannelAdapter.cpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/format.hpp> - -#include "ChannelAdapter.h" -#include "AMQFrame.h" -#include "Exception.h" - -using boost::format; - -namespace qpid { -namespace framing { - -void ChannelAdapter::init( - ChannelId i, OutputHandler& o, ProtocolVersion v) -{ - assertChannelNotOpen(); - id = i; - out = &o; - version = v; -} - -RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { - RequestId result = 0; - assertChannelOpen(); - switch (body->type()) { - case REQUEST_BODY: { - AMQRequestBody::shared_ptr request = - boost::shared_polymorphic_downcast<AMQRequestBody>(body); - requester.sending(request->getData()); - result = request->getData().requestId; - break; - } - case RESPONSE_BODY: { - AMQResponseBody::shared_ptr response = - boost::shared_polymorphic_downcast<AMQResponseBody>(body); - responder.sending(response->getData()); - break; - } - } - out->send(new AMQFrame(getVersion(), getId(), body)); - return result; -} - -void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { - assertMethodOk(*request); - AMQRequestBody::Data& requestData = request->getData(); - responder.received(requestData); - handleMethodInContext(request, MethodContext(this, request)); -} - -void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { - assertMethodOk(*response); - // TODO aconway 2007-01-30: Consider a response handled on receipt. - // Review - any cases where this is not the case? - AMQResponseBody::Data& responseData = response->getData(); - requester.processed(responseData); - handleMethod(response); -} - -void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { - assertMethodOk(*method); - handleMethodInContext(method, MethodContext(this, method)); -} - -void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { - if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID) - throw ConnectionException( - 504, format("Connection method on non-0 channel %d.")%getId()); -} - -void ChannelAdapter::assertChannelOpen() const { - if (getId() != 0 && !isOpen()) - throw ConnectionException( - 504, format("Channel %d is not open.")%getId()); -} - -void ChannelAdapter::assertChannelNotOpen() const { - if (getId() != 0 && isOpen()) - throw ConnectionException( - 504, format("Channel %d is already open.") % getId()); -} - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h deleted file mode 100644 index f6e3986eed..0000000000 --- a/cpp/lib/common/framing/ChannelAdapter.h +++ /dev/null @@ -1,105 +0,0 @@ -#ifndef _ChannelAdapter_ -#define _ChannelAdapter_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> - -#include "BodyHandler.h" -#include "Requester.h" -#include "Responder.h" -#include "framing/amqp_types.h" - -namespace qpid { -namespace framing { - -class MethodContext; - -// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel. - -/** - * Base class for client and broker channels. - * - * - receives frame bodies from the network. - * - Updates request/response data. - * - Dispatches requests with a MethodContext for responses. - * - * send() - * - Updates request/resposne ID data. - * - Forwards frame to the peer. - * - * Thread safety: OBJECT UNSAFE. Instances must not be called - * concurrently. AMQP defines channels to be serialized. - */ -class ChannelAdapter : public BodyHandler { - public: - /** - *@param output Processed frames are forwarded to this handler. - */ - ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0, - ProtocolVersion ver=ProtocolVersion()) - : id(id_), out(out_), version(ver) {} - - /** Initialize the channel adapter. */ - void init(ChannelId, OutputHandler&, ProtocolVersion); - - ChannelId getId() const { return id; } - ProtocolVersion getVersion() const { return version; } - - /** - * Wrap body in a frame and send the frame. - * Takes ownership of body. - */ - RequestId send(AMQBody::shared_ptr body); - RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); - void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); - void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>); - - virtual bool isOpen() const = 0; - - protected: - void assertMethodOk(AMQMethodBody& method) const; - void assertChannelOpen() const; - void assertChannelNotOpen() const; - - virtual void handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context) = 0; - - RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } - RequestId getLastAckRequest() { return requester.getLastAckRequest(); } - RequestId getNextSendRequestId() { return requester.getNextId(); } - - private: - ChannelId id; - OutputHandler* out; - ProtocolVersion version; - Requester requester; - Responder responder; -}; - -}} - - -#endif diff --git a/cpp/lib/common/framing/FieldTable.cpp b/cpp/lib/common/framing/FieldTable.cpp deleted file mode 100644 index 5bbc4651d3..0000000000 --- a/cpp/lib/common/framing/FieldTable.cpp +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <FieldTable.h> -#include <QpidError.h> -#include <Buffer.h> -#include <Value.h> -#include <assert.h> - -namespace qpid { -namespace framing { - -FieldTable::~FieldTable() {} - -uint32_t FieldTable::size() const { - uint32_t len(4); - for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { - // 2 = shortstr_len_byyte + type_char_byte - len += 2 + (i->first).size() + (i->second)->size(); - } - return len; -} - -int FieldTable::count() const { - return values.size(); -} - -namespace -{ -std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) { - return out << i.first << ":" << *i.second; -} -} - -std::ostream& operator<<(std::ostream& out, const FieldTable& t) { - out << "{"; - FieldTable::ValueMap::const_iterator i = t.getMap().begin(); - if (i != t.getMap().end()) out << *i++; - while (i != t.getMap().end()) - { - out << "," << *i++; - } - return out << "}"; -} - -void FieldTable::setString(const std::string& name, const std::string& value){ - values[name] = ValuePtr(new StringValue(value)); -} - -void FieldTable::setInt(const std::string& name, int value){ - values[name] = ValuePtr(new IntegerValue(value)); -} - -void FieldTable::setTimestamp(const std::string& name, uint64_t value){ - values[name] = ValuePtr(new TimeValue(value)); -} - -void FieldTable::setTable(const std::string& name, const FieldTable& value){ - values[name] = ValuePtr(new FieldTableValue(value)); -} - -namespace { -template <class T> T default_value() { return T(); } -template <> int default_value<int>() { return 0; } -template <> uint64_t default_value<uint64_t>() { return 0; } -} - -template <class T> -T FieldTable::getValue(const std::string& name) const -{ - ValueMap::const_iterator i = values.find(name); - if (i == values.end()) return default_value<T>(); - const ValueOps<T> *vt = dynamic_cast<const ValueOps<T>*>(i->second.get()); - return vt->getValue(); -} - -std::string FieldTable::getString(const std::string& name) const { - return getValue<std::string>(name); -} - -int FieldTable::getInt(const std::string& name) const { - return getValue<int>(name); -} - -uint64_t FieldTable::getTimestamp(const std::string& name) const { - return getValue<uint64_t>(name); -} - -void FieldTable::getTable(const std::string& name, FieldTable& value) const { - value = getValue<FieldTable>(name); -} - -void FieldTable::encode(Buffer& buffer) const{ - buffer.putLong(size() - 4); - for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { - buffer.putShortString(i->first); - buffer.putOctet(i->second->getType()); - i->second->encode(buffer); - } -} - -void FieldTable::decode(Buffer& buffer){ - uint32_t len = buffer.getLong(); - uint32_t available = buffer.available(); - if (available < len) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table."); - uint32_t leftover = available - len; - while(buffer.available() > leftover){ - std::string name; - buffer.getShortString(name); - std::auto_ptr<Value> value(Value::decode_value(buffer)); - values[name] = ValuePtr(value.release()); - } -} - - -bool FieldTable::operator==(const FieldTable& x) const { - if (values.size() != x.values.size()) return false; - for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { - ValueMap::const_iterator j = x.values.find(i->first); - if (j == x.values.end()) return false; - if (*(i->second) != *(j->second)) return false; - } - return true; -} - -void FieldTable::erase(const std::string& name) -{ - values.erase(values.find(name)); -} - -} -} diff --git a/cpp/lib/common/framing/FieldTable.h b/cpp/lib/common/framing/FieldTable.h deleted file mode 100644 index e25a7d3f8c..0000000000 --- a/cpp/lib/common/framing/FieldTable.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <vector> -#include <boost/shared_ptr.hpp> -#include <map> -#include <amqp_types.h> - -#ifndef _FieldTable_ -#define _FieldTable_ - -namespace qpid { - /** - * The framing namespace contains classes that are used to create, - * send and receive the basic packets from which AMQP is built. - */ -namespace framing { - -class Value; -class Buffer; - -/** - * A set of name-value pairs. (See the AMQP spec for more details on - * AMQP field tables). - * - * \ingroup clientapi - */ -class FieldTable -{ - public: - typedef boost::shared_ptr<Value> ValuePtr; - typedef std::map<std::string, ValuePtr> ValueMap; - - ~FieldTable(); - uint32_t size() const; - int count() const; - void setString(const std::string& name, const std::string& value); - void setInt(const std::string& name, int value); - void setTimestamp(const std::string& name, uint64_t value); - void setTable(const std::string& name, const FieldTable& value); - //void setDecimal(string& name, xxx& value); - std::string getString(const std::string& name) const; - int getInt(const std::string& name) const; - uint64_t getTimestamp(const std::string& name) const; - void getTable(const std::string& name, FieldTable& value) const; - //void getDecimal(string& name, xxx& value); - void erase(const std::string& name); - - void encode(Buffer& buffer) const; - void decode(Buffer& buffer); - - bool operator==(const FieldTable& other) const; - - // TODO aconway 2006-09-26: Yeuch! Rework FieldTable to have - // a map-like interface. - const ValueMap& getMap() const { return values; } - ValueMap& getMap() { return values; } - - private: - friend std::ostream& operator<<(std::ostream& out, const FieldTable& body); - ValueMap values; - template<class T> T getValue(const std::string& name) const; -}; - -class FieldNotFoundException{}; -class UnknownFieldName : public FieldNotFoundException{}; -class IncorrectFieldType : public FieldNotFoundException{}; -} -} - - -#endif diff --git a/cpp/lib/common/framing/FramingContent.cpp b/cpp/lib/common/framing/FramingContent.cpp deleted file mode 100644 index 24efa38dcb..0000000000 --- a/cpp/lib/common/framing/FramingContent.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <assert.h> - -#include "Buffer.h" -#include "FramingContent.h" -#include <QpidError.h> -#include <sstream> - -namespace qpid { -namespace framing { - -Content::Content() : discriminator(0) {} - -Content::Content(uint8_t _discriminator, const string& _value): discriminator(_discriminator), value(_value) { - validate(); -} - -void Content::validate() { - if (discriminator == REFERENCE) { - if(value.empty()) { - THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty"); - } - }else if (discriminator != INLINE) { - std::stringstream out; - out << "Invalid discriminator: " << (int) discriminator; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); - } -} - -Content::~Content() {} - -void Content::encode(Buffer& buffer) const { - buffer.putOctet(discriminator); - buffer.putLongString(value); -} - -void Content::decode(Buffer& buffer) { - discriminator = buffer.getOctet(); - buffer.getLongString(value); - validate(); -} - -size_t Content::size() const { - return 1/*discriminator*/ + 4/*for recording size of long string*/ + value.size(); -} - -std::ostream& operator<<(std::ostream& out, const Content& content) { - if (content.discriminator == REFERENCE) { - out << "{REF:" << content.value << "}"; - } else if (content.discriminator == INLINE) { - out << "{INLINE:" << content.value.size() << " bytes}"; - } - return out; -} - -}} // namespace framing::qpid diff --git a/cpp/lib/common/framing/FramingContent.h b/cpp/lib/common/framing/FramingContent.h deleted file mode 100644 index 876e90c905..0000000000 --- a/cpp/lib/common/framing/FramingContent.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef _framing_FramingContent_h -#define _framing_FramingContent_h - -#include <ostream> - -namespace qpid { -namespace framing { - -enum discriminator_types { INLINE = 0, REFERENCE = 1 }; - -/** - * A representation of the AMQP 'content' data type (used for message - * bodies) which can hold inline data or a reference. - */ -class Content -{ - uint8_t discriminator; - string value; - - void validate(); - - public: - Content(); - Content(uint8_t _discriminator, const string& _value); - ~Content(); - - void encode(Buffer& buffer) const; - void decode(Buffer& buffer); - size_t size() const; - bool isInline() const { return discriminator == INLINE; } - bool isReference() const { return discriminator == REFERENCE; } - const string& getValue() const { return value; } - void setValue(const string& newValue) { value = newValue; } - - friend std::ostream& operator<<(std::ostream&, const Content&); -}; - -}} // namespace qpid::framing - - -#endif /*!_framing_FramingContent_h*/ diff --git a/cpp/lib/common/framing/HeaderProperties.h b/cpp/lib/common/framing/HeaderProperties.h deleted file mode 100644 index 1ec4840309..0000000000 --- a/cpp/lib/common/framing/HeaderProperties.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <Buffer.h> - -#ifndef _HeaderProperties_ -#define _HeaderProperties_ - -namespace qpid { -namespace framing { - - enum header_classes{BASIC = 60}; - - class HeaderProperties - { - - public: - inline virtual ~HeaderProperties(){} - virtual uint8_t classId() = 0; - virtual uint32_t size() const = 0; - virtual void encode(Buffer& buffer) const = 0; - virtual void decode(Buffer& buffer, uint32_t size) = 0; - }; -} -} - - -#endif diff --git a/cpp/lib/common/framing/InitiationHandler.cpp b/cpp/lib/common/framing/InitiationHandler.cpp deleted file mode 100644 index dd92c9859b..0000000000 --- a/cpp/lib/common/framing/InitiationHandler.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <InitiationHandler.h> - -qpid::framing::InitiationHandler::~InitiationHandler() {} diff --git a/cpp/lib/common/framing/InitiationHandler.h b/cpp/lib/common/framing/InitiationHandler.h deleted file mode 100644 index 7f44323f09..0000000000 --- a/cpp/lib/common/framing/InitiationHandler.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _InitiationHandler_ -#define _InitiationHandler_ - -#include <ProtocolInitiation.h> - -namespace qpid { -namespace framing { - - class InitiationHandler{ - public: - virtual ~InitiationHandler(); - virtual void initiated(const ProtocolInitiation&) = 0; - }; - -} -} - - -#endif diff --git a/cpp/lib/common/framing/InputHandler.h b/cpp/lib/common/framing/InputHandler.h deleted file mode 100644 index 4e2d4bcc9b..0000000000 --- a/cpp/lib/common/framing/InputHandler.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _InputHandler_ -#define _InputHandler_ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <AMQFrame.h> -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace framing { - -class InputHandler : private boost::noncopyable { - public: - virtual ~InputHandler() {} - virtual void received(AMQFrame* frame) = 0; -}; - -}} - - -#endif diff --git a/cpp/lib/common/framing/MethodContext.cpp b/cpp/lib/common/framing/MethodContext.cpp deleted file mode 100644 index 73af73f8e5..0000000000 --- a/cpp/lib/common/framing/MethodContext.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MethodContext.h" -#include "amqp_types.h" -#include "AMQRequestBody.h" - -namespace qpid { -namespace framing { - -RequestId MethodContext::getRequestId() const { - return boost::shared_polymorphic_downcast<AMQRequestBody>(methodBody) - ->getRequestId(); -} - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h deleted file mode 100644 index 80e4c55d7e..0000000000 --- a/cpp/lib/common/framing/MethodContext.h +++ /dev/null @@ -1,75 +0,0 @@ -#ifndef _framing_MethodContext_h -#define _framing_MethodContext_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/shared_ptr.hpp> - -#include "OutputHandler.h" -#include "ProtocolVersion.h" - -namespace qpid { -namespace framing { - -class BodyHandler; -class AMQMethodBody; -class ChannelAdapter; - -/** - * Invocation context for an AMQP method. - * - * It provides the method being processed and the channel on which - * it arrived. - * - * All Handler functions take a MethodContext as the last parameter. - */ -struct MethodContext -{ - typedef boost::shared_ptr<AMQMethodBody> BodyPtr; - - MethodContext(ChannelAdapter* ch=0, BodyPtr method=BodyPtr()) - : channel(ch), methodBody(method) {} - - /** - * Channel on which the method being processed arrived. - * 0 if the method was constructed by the caller - * rather than received from a channel. - */ - ChannelAdapter* channel; - - /** - * Body of the method being processed. - * It's useful for passing around instead of unpacking all its parameters. - * It's also provides the request ID when constructing a response. - */ - BodyPtr methodBody; - - /** - * Return methodBody's request ID. - * It is an error to call this if methodBody is not a request. - */ - RequestId getRequestId() const; -}; - - -}} // namespace qpid::framing - - - -#endif /*!_framing_MethodContext_h*/ diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h deleted file mode 100644 index 9ffd4227d8..0000000000 --- a/cpp/lib/common/framing/OutputHandler.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _OutputHandler_ -#define _OutputHandler_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace framing { -class AMQFrame; - -class OutputHandler : private boost::noncopyable { - public: - virtual ~OutputHandler() {} - virtual void send(AMQFrame* frame) = 0; -}; - -}} - - -#endif diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp deleted file mode 100644 index de53488f7b..0000000000 --- a/cpp/lib/common/framing/ProtocolInitiation.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ProtocolInitiation.h> - -namespace qpid { -namespace framing { - -ProtocolInitiation::ProtocolInitiation(){} - -ProtocolInitiation::ProtocolInitiation(uint8_t _major, uint8_t _minor) : version(_major, _minor) {} - -ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} - -ProtocolInitiation::~ProtocolInitiation(){} - -void ProtocolInitiation::encode(Buffer& buffer){ - buffer.putOctet('A'); - buffer.putOctet('M'); - buffer.putOctet('Q'); - buffer.putOctet('P'); - buffer.putOctet(1);//class - buffer.putOctet(1);//instance - buffer.putOctet(version.getMajor()); - buffer.putOctet(version.getMinor()); -} - -bool ProtocolInitiation::decode(Buffer& buffer){ - if(buffer.available() >= 8){ - buffer.getOctet();//A - buffer.getOctet();//M - buffer.getOctet();//Q - buffer.getOctet();//P - buffer.getOctet();//class - buffer.getOctet();//instance - version.setMajor(buffer.getOctet()); - version.setMinor(buffer.getOctet()); - return true; - }else{ - return false; - } -} - -//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h deleted file mode 100644 index ed7b59e94e..0000000000 --- a/cpp/lib/common/framing/ProtocolInitiation.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <Buffer.h> -#include <AMQDataBlock.h> -#include <ProtocolVersion.h> - -#ifndef _ProtocolInitiation_ -#define _ProtocolInitiation_ - -namespace qpid { -namespace framing { - -class ProtocolInitiation : public AMQDataBlock -{ -private: - ProtocolVersion version; - -public: - ProtocolInitiation(); - ProtocolInitiation(uint8_t major, uint8_t minor); - ProtocolInitiation(ProtocolVersion p); - virtual ~ProtocolInitiation(); - virtual void encode(Buffer& buffer); - virtual bool decode(Buffer& buffer); - inline virtual uint32_t size() const { return 8; } - inline uint8_t getMajor() const { return version.getMajor(); } - inline uint8_t getMinor() const { return version.getMinor(); } - inline ProtocolVersion getVersion() const { return version; } -}; - -} -} - - -#endif diff --git a/cpp/lib/common/framing/ProtocolVersion.cpp b/cpp/lib/common/framing/ProtocolVersion.cpp deleted file mode 100644 index fd4b1a645f..0000000000 --- a/cpp/lib/common/framing/ProtocolVersion.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ProtocolVersion.h> -#include <sstream> - -using namespace qpid::framing; - -const std::string ProtocolVersion::toString() const -{ - std::stringstream ss; - ss << major_ << "-" << minor_; - return ss.str(); -} - -ProtocolVersion& ProtocolVersion::operator=(ProtocolVersion p) -{ - major_ = p.major_; - minor_ = p.minor_; - return *this; -} - -bool ProtocolVersion::operator==(ProtocolVersion p) const -{ - return major_ == p.major_ && minor_ == p.minor_; -} - diff --git a/cpp/lib/common/framing/ProtocolVersion.h b/cpp/lib/common/framing/ProtocolVersion.h deleted file mode 100644 index 5e1429c1ea..0000000000 --- a/cpp/lib/common/framing/ProtocolVersion.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ProtocolVersion_ -#define _ProtocolVersion_ - -#include <amqp_types.h> - -namespace qpid -{ -namespace framing -{ - -class ProtocolVersion -{ -private: - uint8_t major_; - uint8_t minor_; - -public: - ProtocolVersion(uint8_t _major=0, uint8_t _minor=0) - : major_(_major), minor_(_minor) {} - - uint8_t getMajor() const { return major_; } - void setMajor(uint8_t major) { major_ = major; } - uint8_t getMinor() const { return minor_; } - void setMinor(uint8_t minor) { minor_ = minor; } - const std::string toString() const; - - ProtocolVersion& operator=(ProtocolVersion p); - - bool operator==(ProtocolVersion p) const; - bool operator!=(ProtocolVersion p) const { return ! (*this == p); } -}; - -} // namespace framing -} // namespace qpid - - -#endif // ifndef _ProtocolVersion_ diff --git a/cpp/lib/common/framing/ProtocolVersionException.cpp b/cpp/lib/common/framing/ProtocolVersionException.cpp deleted file mode 100644 index 9088422f6f..0000000000 --- a/cpp/lib/common/framing/ProtocolVersionException.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> -#include <ProtocolVersionException.h> - - -using namespace qpid::framing; - -void ProtocolVersionException::init(const std::string& msg) -{ - whatStr = boost::str( - boost::format("ProtocolVersionException: %s found: %s") - % versionFound.toString() % msg); -} - diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h deleted file mode 100644 index 8e2de8b843..0000000000 --- a/cpp/lib/common/framing/ProtocolVersionException.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _ProtocolVersionException_ -#define _ProtocolVersionException_ - -#include <Exception.h> -#include <ProtocolVersion.h> -#include <string> -#include <vector> - -namespace qpid { -namespace framing { - -class ProtocolVersionException : public qpid::Exception -{ -protected: - ProtocolVersion versionFound; - -public: - ~ProtocolVersionException() throw() {} - - template <class T> - ProtocolVersionException( - ProtocolVersion ver, const T& msg) throw () : versionFound(ver) - { init(boost::lexical_cast<std::string>(msg)); } - - template <class T> - ProtocolVersionException(const T& msg) throw () - { init(boost::lexical_cast<std::string>(msg)); } - - private: - void init(const std::string& msg); -}; - -}} // namespace qpid::framing - -#endif //ifndef _ProtocolVersionException_ diff --git a/cpp/lib/common/framing/Proxy.cpp b/cpp/lib/common/framing/Proxy.cpp deleted file mode 100644 index 0b2a882a49..0000000000 --- a/cpp/lib/common/framing/Proxy.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Proxy.h" -#include "ChannelAdapter.h" -#include "ProtocolVersion.h" - -namespace qpid { -namespace framing { - -Proxy::~Proxy() {} - -ProtocolVersion Proxy::getProtocolVersion() const { - return channel.getVersion(); -} - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/Proxy.h b/cpp/lib/common/framing/Proxy.h deleted file mode 100644 index 8ed46ed748..0000000000 --- a/cpp/lib/common/framing/Proxy.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef _framing_Proxy_h -#define _framing_Proxy_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ProtocolVersion.h" - -namespace qpid { -namespace framing { - -class ChannelAdapter; -class FieldTable; -class Content; - -/** - * Base class for proxies. - */ -class Proxy -{ - - public: - Proxy(ChannelAdapter& ch) : channel(ch) {} - virtual ~Proxy(); - - ProtocolVersion getProtocolVersion() const; - - protected: - ChannelAdapter& channel; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_Proxy_h*/ diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp deleted file mode 100644 index 9ee809e2ee..0000000000 --- a/cpp/lib/common/framing/Requester.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/format.hpp> - -#include "Requester.h" -#include "QpidError.h" - -namespace qpid { -namespace framing { - -Requester::Requester() : lastId(0), responseMark(0) {} - -void Requester::sending(AMQRequestBody::Data& request) { - request.requestId = ++lastId; - request.responseMark = responseMark; -} - -void Requester::processed(const AMQResponseBody::Data& response) { - responseMark = response.responseId; - firstAckRequest = response.requestId; - lastAckRequest = firstAckRequest + response.batchOffset; -} - -}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h deleted file mode 100644 index dcc4460041..0000000000 --- a/cpp/lib/common/framing/Requester.h +++ /dev/null @@ -1,67 +0,0 @@ -#ifndef _framing_Requester_h -#define _framing_Requester_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <set> -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" - -namespace qpid { -namespace framing { - -class AMQRequestBody; -class AMQResponseBody; - -/** - * Manage request IDs and the response mark for locally initiated requests. - * - * THREAD UNSAFE: This class is called as frames are sent or received - * sequentially on a connection, so it does not need to be thread safe. - */ -class Requester -{ - public: - Requester(); - - /** Called before sending a request to set request data. */ - void sending(AMQRequestBody::Data&); - - /** Called after processing a response. */ - void processed(const AMQResponseBody::Data&); - - /** Get the next request id to be used. */ - RequestId getNextId() { return lastId + 1; } - /** Get the first request acked by this response */ - RequestId getFirstAckRequest() { return firstAckRequest; } - /** Get the last request acked by this response */ - RequestId getLastAckRequest() { return lastAckRequest; } - - private: - RequestId lastId; - ResponseId responseMark; - ResponseId firstAckRequest; - ResponseId lastAckRequest; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_Requester_h*/ diff --git a/cpp/lib/common/framing/Responder.cpp b/cpp/lib/common/framing/Responder.cpp deleted file mode 100644 index c8c5ce8dcc..0000000000 --- a/cpp/lib/common/framing/Responder.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/format.hpp> - -#include "Responder.h" -#include "QpidError.h" - -namespace qpid { -namespace framing { - -Responder::Responder() : lastId(0), responseMark(0) {} - -void Responder::received(const AMQRequestBody::Data& request) { - if (request.responseMark < responseMark || request.responseMark > lastId) - THROW_QPID_ERROR( - PROTOCOL_ERROR, boost::format("Invalid response mark %d.") - %request.responseMark); - responseMark = request.responseMark; -} - -void Responder::sending(AMQResponseBody::Data& response) { - response.responseId = ++lastId; - assert(response.requestId); // Should be already set. - response.batchOffset = 0; -} - -}} // namespace qpid::framing - diff --git a/cpp/lib/common/framing/Responder.h b/cpp/lib/common/framing/Responder.h deleted file mode 100644 index 0e1785256b..0000000000 --- a/cpp/lib/common/framing/Responder.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef _framing_Responder_h -#define _framing_Responder_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" - -namespace qpid { -namespace framing { - -/** - * Manage response ids and response mark remotely initianted requests. - * - * THREAD UNSAFE: This class is called as frames are sent or received - * sequentially on a connection, so it does not need to be thread safe. - */ -class Responder -{ - public: - Responder(); - - /** Called after receiving a request. */ - void received(const AMQRequestBody::Data& request); - - /** Called before sending a response to set respose data. */ - void sending(AMQResponseBody::Data& response); - - /** Get the ID of the highest response acknowledged by the peer. */ - ResponseId getResponseMark() { return responseMark; } - - // TODO aconway 2007-01-14: Batching support - store unsent - // Response for equality comparison with subsequent responses. - // - - private: - ResponseId lastId; - ResponseId responseMark; -}; - -}} // namespace qpid::framing - - - -#endif /*!_framing_Responder_h*/ diff --git a/cpp/lib/common/framing/Value.cpp b/cpp/lib/common/framing/Value.cpp deleted file mode 100644 index 03e005e384..0000000000 --- a/cpp/lib/common/framing/Value.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Value.h> -#include <Buffer.h> -#include <FieldTable.h> -#include <QpidError.h> -#include <sstream> - -namespace qpid { -namespace framing { - -Value::~Value() {} - -void StringValue::encode(Buffer& buffer){ - buffer.putLongString(value); -} -void StringValue::decode(Buffer& buffer){ - buffer.getLongString(value); -} - -void IntegerValue::encode(Buffer& buffer){ - buffer.putLong((uint32_t) value); -} -void IntegerValue::decode(Buffer& buffer){ - value = buffer.getLong(); -} - -void TimeValue::encode(Buffer& buffer){ - buffer.putLongLong(value); -} -void TimeValue::decode(Buffer& buffer){ - value = buffer.getLongLong(); -} - -void DecimalValue::encode(Buffer& buffer){ - buffer.putOctet(value.decimals); - buffer.putLong(value.value); -} -void DecimalValue::decode(Buffer& buffer){ - value = Decimal(buffer.getLong(), buffer.getOctet()); -} - -void FieldTableValue::encode(Buffer& buffer){ - buffer.putFieldTable(value); -} -void FieldTableValue::decode(Buffer& buffer){ - buffer.getFieldTable(value); -} - -std::auto_ptr<Value> Value::decode_value(Buffer& buffer) -{ - std::auto_ptr<Value> value; - uint8_t type = buffer.getOctet(); - switch(type){ - case 'S': - value.reset(new StringValue()); - break; - case 'I': - value.reset(new IntegerValue()); - break; - case 'D': - value.reset(new DecimalValue()); - break; - case 'T': - value.reset(new TimeValue()); - break; - case 'F': - value.reset(new FieldTableValue()); - break; - - //non-standard types, introduced in java client for JMS compliance - case 'x': - value.reset(new BinaryValue()); - break; - default: - std::stringstream out; - out << "Unknown field table value type: " << type; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); - } - value->decode(buffer); - return value; -} - -EmptyValue::~EmptyValue() {} - -void EmptyValue::print(std::ostream& out) const -{ - out << "<empty field value>"; -} - -std::ostream& operator<<(std::ostream& out, const Value& v) { - v.print(out); - return out; -} - -std::ostream& operator<<(std::ostream& out, const Decimal& d) -{ - return out << "Decimal(" << d.value << "," << d.decimals << ")"; -} - -}} - - - diff --git a/cpp/lib/common/framing/Value.h b/cpp/lib/common/framing/Value.h deleted file mode 100644 index 8752b02f40..0000000000 --- a/cpp/lib/common/framing/Value.h +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <vector> -#include <amqp_types.h> -#include <FieldTable.h> - -#ifndef _Value_ -#define _Value_ - -namespace qpid { -namespace framing { - -class Buffer; - -/** - * Represents a decimal value. - * No arithmetic functionality for now, we only care about encoding/decoding. - */ -struct Decimal { - uint32_t value; - uint8_t decimals; - - Decimal(uint32_t value_=0, uint8_t decimals_=0) : value(value_), decimals(decimals_) {} - bool operator==(const Decimal& d) const { - return decimals == d.decimals && value == d.value; - } - bool operator!=(const Decimal& d) const { return !(*this == d); } -}; - -std::ostream& operator<<(std::ostream& out, const Decimal& d); - -/** - * Polymorpic base class for values. - */ -class Value { - public: - virtual ~Value(); - virtual uint32_t size() const = 0; - virtual char getType() const = 0; - virtual void encode(Buffer& buffer) = 0; - virtual void decode(Buffer& buffer) = 0; - virtual bool operator==(const Value&) const = 0; - bool operator!=(const Value& v) const { return !(*this == v); } - virtual void print(std::ostream& out) const = 0; - - /** Create a new value by decoding from the buffer */ - static std::auto_ptr<Value> decode_value(Buffer& buffer); -}; - -std::ostream& operator<<(std::ostream& out, const Value& d); - - -/** - * Template for common operations on Value sub-classes. - */ -template <class T> -class ValueOps : public Value -{ - protected: - T value; - public: - ValueOps() {} - ValueOps(const T& v) : value(v) {} - const T& getValue() const { return value; } - T& getValue() { return value; } - - virtual bool operator==(const Value& v) const { - const ValueOps<T>* vo = dynamic_cast<const ValueOps<T>*>(&v); - if (vo == 0) return false; - else return value == vo->value; - } - - void print(std::ostream& out) const { out << value; } -}; - - -class StringValue : public ValueOps<std::string> { - public: - StringValue(const std::string& v) : ValueOps<std::string>(v) {} - StringValue() {} - virtual uint32_t size() const { return 4 + value.length(); } - virtual char getType() const { return 'S'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); -}; - -class IntegerValue : public ValueOps<int> { - public: - IntegerValue(int v) : ValueOps<int>(v) {} - IntegerValue(){} - virtual uint32_t size() const { return 4; } - virtual char getType() const { return 'I'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); -}; - -class TimeValue : public ValueOps<uint64_t> { - public: - TimeValue(uint64_t v) : ValueOps<uint64_t>(v){} - TimeValue(){} - virtual uint32_t size() const { return 8; } - virtual char getType() const { return 'T'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); -}; - -class DecimalValue : public ValueOps<Decimal> { - public: - DecimalValue(const Decimal& d) : ValueOps<Decimal>(d) {} - DecimalValue(uint32_t value_=0, uint8_t decimals_=0) : - ValueOps<Decimal>(Decimal(value_, decimals_)){} - virtual uint32_t size() const { return 5; } - virtual char getType() const { return 'D'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); -}; - - -class FieldTableValue : public ValueOps<FieldTable> { - public: - FieldTableValue(const FieldTable& v) : ValueOps<FieldTable>(v){} - FieldTableValue(){} - virtual uint32_t size() const { return 4 + value.size(); } - virtual char getType() const { return 'F'; } - virtual void encode(Buffer& buffer); - virtual void decode(Buffer& buffer); -}; - -class EmptyValue : public Value { - public: - ~EmptyValue(); - virtual uint32_t size() const { return 0; } - virtual char getType() const { return 0; } - virtual void encode(Buffer& ) {} - virtual void decode(Buffer& ) {} - virtual bool operator==(const Value& v) const { - return dynamic_cast<const EmptyValue*>(&v); - } - virtual void print(std::ostream& out) const; -}; - -//non-standard types, introduced in java client for JMS compliance -class BinaryValue : public StringValue { - public: - BinaryValue(const std::string& v) : StringValue(v) {} - BinaryValue() {} - virtual char getType() const { return 'x'; } -}; - -}} // qpid::framing - -#endif diff --git a/cpp/lib/common/framing/amqp_framing.h b/cpp/lib/common/framing/amqp_framing.h deleted file mode 100644 index 62f87352f8..0000000000 --- a/cpp/lib/common/framing/amqp_framing.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <amqp_types.h> -#include <AMQFrame.h> -#include <AMQBody.h> -#include <BodyHandler.h> -#include <AMQMethodBody.h> -#include <AMQHeaderBody.h> -#include <AMQContentBody.h> -#include <AMQHeartbeatBody.h> -#include <AMQP_MethodVersionMap.h> -#include <InputHandler.h> -#include <OutputHandler.h> -#include <InitiationHandler.h> -#include <ProtocolInitiation.h> -#include <BasicHeaderProperties.h> -#include <ProtocolVersion.h> -#include <ProtocolVersionException.h> diff --git a/cpp/lib/common/framing/amqp_types.h b/cpp/lib/common/framing/amqp_types.h deleted file mode 100644 index 49963bd570..0000000000 --- a/cpp/lib/common/framing/amqp_types.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef AMQP_TYPES_H -#define AMQP_TYPES_H -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** \file - * Type definitions and forward declarations of all types used to - * in AMQP messages. - */ - -#include <string> -#ifdef _WINDOWS -#include "windows.h" -typedef unsigned char uint8_t; -typedef unsigned short uint16_t; -typedef unsigned int uint32_t; -typedef unsigned __int64 uint64_t; -#endif -#ifndef _WINDOWS -#include "stdint.h" -#endif - -namespace qpid { -namespace framing { - -using std::string; -typedef uint16_t ChannelId; -typedef uint64_t RequestId; -typedef uint64_t ResponseId; -typedef uint32_t BatchOffset; -typedef uint16_t ClassId; -typedef uint16_t MethodId; -typedef uint16_t ReplyCode; - -// Types represented by classes. -class Content; -class FieldTable; -}} // namespace qpid::framing -#endif diff --git a/cpp/lib/common/framing/amqp_types_full.h b/cpp/lib/common/framing/amqp_types_full.h deleted file mode 100644 index 6a24a99d38..0000000000 --- a/cpp/lib/common/framing/amqp_types_full.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef _framing_amqp_types_decl_h -#define _framing_amqp_types_decl_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** \file - * Type definitions and full declarations of all types used to - * in AMQP messages. - * - * Its better to include amqp_types.h in another header instead of this file - * unless the header actually needs the full declarations. Including - * full declarations when forward declarations would do increases compile - * times. - */ - -#include "amqp_types.h" -#include "FramingContent.h" -#include "FieldTable.h" - -#endif /*!_framing_amqp_types_decl_h*/ diff --git a/cpp/lib/common/shared_ptr.h b/cpp/lib/common/shared_ptr.h deleted file mode 100644 index c4d547e5bb..0000000000 --- a/cpp/lib/common/shared_ptr.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef _common_shared_ptr_h -#define _common_shared_ptr_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/shared_ptr.hpp> -#include <boost/cast.hpp> - -namespace qpid { -/// Import shared_ptr definitions into qpid namespace. -using boost::shared_ptr; -using boost::dynamic_pointer_cast; -using boost::static_pointer_cast; -using boost::const_pointer_cast; -using boost::shared_polymorphic_downcast; -} // namespace qpid - - - -#endif /*!_common_shared_ptr_h*/ diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h deleted file mode 100644 index cd4932d92e..0000000000 --- a/cpp/lib/common/sys/Acceptor.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef _sys_Acceptor_h -#define _sys_Acceptor_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <stdint.h> -#include <SharedObject.h> - -namespace qpid { -namespace sys { - -class ConnectionInputHandlerFactory; - -class Acceptor : public qpid::SharedObject<Acceptor> -{ - public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); - virtual ~Acceptor() = 0; - virtual uint16_t getPort() const = 0; - virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; - virtual void shutdown() = 0; -}; - -}} - - - -#endif /*!_sys_Acceptor_h*/ diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h deleted file mode 100644 index 63670cbf00..0000000000 --- a/cpp/lib/common/sys/AtomicCount.h +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef _posix_AtomicCount_h -#define _posix_AtomicCount_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/detail/atomic_count.hpp> -#include "ScopedIncrement.h" - -namespace qpid { -namespace sys { - -/** - * Atomic counter. - */ -class AtomicCount : boost::noncopyable { - public: - typedef ScopedDecrement<AtomicCount> ScopedDecrement; - typedef ScopedIncrement<AtomicCount> ScopedIncrement; - - AtomicCount(long value = 0) : count(value) {} - - void operator++() { ++count ; } - - long operator--() { return --count; } - - operator long() const { return count; } - - - private: - boost::detail::atomic_count count; -}; - - -}} - - -#endif // _posix_AtomicCount_h diff --git a/cpp/lib/common/sys/Condition.h b/cpp/lib/common/sys/Condition.h deleted file mode 100644 index 9d70af5b84..0000000000 --- a/cpp/lib/common/sys/Condition.h +++ /dev/null @@ -1,128 +0,0 @@ -#ifndef _sys_Condition_h -#define _sys_Condition_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/errno.h> -#include <boost/noncopyable.hpp> -#include <sys/Mutex.h> -#include <sys/Time.h> - -#ifdef USE_APR -# include <apr_thread_cond.h> -#endif - -namespace qpid { -namespace sys { - -/** - * A condition variable for thread synchronization. - */ -class Condition -{ - public: - inline Condition(); - inline ~Condition(); - inline void wait(Mutex&); - inline bool wait(Mutex&, const Time& absoluteTime); - inline void notify(); - inline void notifyAll(); - - private: -#ifdef USE_APR - apr_thread_cond_t* condition; -#else - pthread_cond_t condition; -#endif -}; - - -// APR ================================================================ -#ifdef USE_APR - -Condition::Condition() { - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); -} - -Condition::~Condition() { - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); -} - -void Condition::wait(Mutex& mutex) { - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex)); -} - -bool Condition::wait(Mutex& mutex, const Time& absoluteTime){ - // APR uses microseconds. - apr_status_t status = - apr_thread_cond_timedwait( - condition, mutex.mutex, absoluteTime/TIME_USEC); - if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); - return status == 0; -} - -void Condition::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void Condition::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -#else -// POSIX ================================================================ - -Condition::Condition() { - QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); -} - -Condition::~Condition() { - QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); -} - -void Condition::wait(Mutex& mutex) { - QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); -} - -bool Condition::wait(Mutex& mutex, const Time& absoluteTime){ - struct timespec ts; - toTimespec(ts, absoluteTime); - int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); - if (status != 0) { - if (status == ETIMEDOUT) return false; - throw QPID_POSIX_ERROR(status); - } - return true; -} - -void Condition::notify(){ - QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); -} - -void Condition::notifyAll(){ - QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); -} -#endif /*USE_APR*/ - - -}} -#endif /*!_sys_Condition_h*/ diff --git a/cpp/lib/common/sys/ConnectionInputHandler.h b/cpp/lib/common/sys/ConnectionInputHandler.h deleted file mode 100644 index fa70dfaf48..0000000000 --- a/cpp/lib/common/sys/ConnectionInputHandler.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ConnectionInputHandler_ -#define _ConnectionInputHandler_ - -#include <InputHandler.h> -#include <InitiationHandler.h> -#include <ProtocolInitiation.h> -#include <sys/TimeoutHandler.h> - -namespace qpid { -namespace sys { - - class ConnectionInputHandler : - public qpid::framing::InitiationHandler, - public qpid::framing::InputHandler, - public TimeoutHandler - { - public: - virtual void closed() = 0; - }; - -} -} - - -#endif diff --git a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h deleted file mode 100644 index af7d411928..0000000000 --- a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ConnectionInputHandlerFactory_ -#define _ConnectionInputHandlerFactory_ - -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace sys { - -class ConnectionOutputHandler; -class ConnectionInputHandler; - -/** - * Callback interface used by the Acceptor to - * create a ConnectionInputHandler for each new connection. - */ -class ConnectionInputHandlerFactory : private boost::noncopyable -{ - public: - virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0; - virtual ~ConnectionInputHandlerFactory(){} -}; - -}} - - -#endif diff --git a/cpp/lib/common/sys/ConnectionOutputHandler.h b/cpp/lib/common/sys/ConnectionOutputHandler.h deleted file mode 100644 index 91849e1dfb..0000000000 --- a/cpp/lib/common/sys/ConnectionOutputHandler.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ConnectionOutputHandler_ -#define _ConnectionOutputHandler_ - -#include <OutputHandler.h> - -namespace qpid { -namespace sys { - -/** - * Provides the output handler associated with a connection. - */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler -{ - public: - virtual void close() = 0; -}; - -}} - - -#endif diff --git a/cpp/lib/common/sys/Module.h b/cpp/lib/common/sys/Module.h deleted file mode 100644 index 9bf5d6e1fc..0000000000 --- a/cpp/lib/common/sys/Module.h +++ /dev/null @@ -1,161 +0,0 @@ -#ifndef _sys_Module_h -#define _sys_Module_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/noncopyable.hpp> -#include <iostream> -#include <QpidError.h> - -namespace qpid { -namespace sys { -#if USE_APR -#include <apr_dso.h> - typedef apr_dso_handle_t* dso_handle_t; -#else - typedef void* dso_handle_t; -#endif - - template <class T> class Module : private boost::noncopyable - { - typedef T* create_t(); - typedef void destroy_t(T*); - - dso_handle_t handle; - destroy_t* destroy; - T* ptr; - - void load(const std::string& name); - void unload(); - void* getSymbol(const std::string& name); - - public: - Module(const std::string& name); - T* operator->(); - T* get(); - ~Module() throw(); - }; - -} -} - -using namespace qpid::sys; - -template <class T> Module<T>::Module(const std::string& module) : destroy(0), ptr(0) -{ - load(module); - //TODO: need a better strategy for symbol names to allow multiple - //modules to be loaded without clashes... - - //Note: need the double cast to avoid errors in casting from void* to function pointer with -pedantic - create_t* create = reinterpret_cast<create_t*>(reinterpret_cast<intptr_t>(getSymbol("create"))); - destroy = reinterpret_cast<destroy_t*>(reinterpret_cast<intptr_t>(getSymbol("destroy"))); - ptr = create(); -} - -template <class T> T* Module<T>::operator->() -{ - return ptr; -} - -template <class T> T* Module<T>::get() -{ - return ptr; -} - -template <class T> Module<T>::~Module() throw() -{ - try { - if (handle && ptr) { - destroy(ptr); - } - if (handle) unload(); - } catch (std::exception& e) { - std::cout << "Error while destroying module: " << e.what() << std::endl; - } - destroy = 0; - handle = 0; - ptr = 0; -} - -// APR ================================================================ -#if USE_APR - -#include <apr/APRBase.h> -#include <apr/APRPool.h> - -template <class T> void Module<T>::load(const std::string& name) -{ - CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); -} - -template <class T> void Module<T>::unload() -{ - CHECK_APR_SUCCESS(apr_dso_unload(handle)); -} - -template <class T> void* Module<T>::getSymbol(const std::string& name) -{ - apr_dso_handle_sym_t symbol; - CHECK_APR_SUCCESS(apr_dso_sym(&symbol, handle, name.c_str())); - return (void*) symbol; -} - -// POSIX================================================================ -#else - -#include <dlfcn.h> - -template <class T> void Module<T>::load(const std::string& name) -{ - dlerror(); - handle = dlopen(name.c_str(), RTLD_NOW); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } -} - -template <class T> void Module<T>::unload() -{ - dlerror(); - dlclose(handle); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } -} - -template <class T> void* Module<T>::getSymbol(const std::string& name) -{ - dlerror(); - void* sym = dlsym(handle, name.c_str()); - const char* error = dlerror(); - if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); - } - return sym; -} - -#endif //if USE_APR - -#endif //ifndef _sys_Module_h - diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h deleted file mode 100644 index a3bbd3c5aa..0000000000 --- a/cpp/lib/common/sys/Monitor.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _sys_Monitor_h -#define _sys_Monitor_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/errno.h> -#include <sys/Condition.h> - -#ifdef USE_APR -# include <apr_thread_cond.h> -#endif - -namespace qpid { -namespace sys { - -/** - * A monitor is a condition variable and a mutex - */ -class Monitor : public Mutex, public Condition { - public: - using Condition::wait; - inline void wait(); - inline bool wait(const Time& absoluteTime); -}; - - -void Monitor::wait() { - Condition::wait(*this); -} - -bool Monitor::wait(const Time& absoluteTime) { - return Condition::wait(*this, absoluteTime); -} - -}} -#endif /*!_sys_Monitor_h*/ diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h deleted file mode 100644 index 9db9be0981..0000000000 --- a/cpp/lib/common/sys/Mutex.h +++ /dev/null @@ -1,165 +0,0 @@ -#ifndef _sys_Mutex_h -#define _sys_Mutex_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifdef USE_APR -# include <apr_thread_mutex.h> -# include <apr/APRBase.h> -# include <apr/APRPool.h> -#else -# include <pthread.h> -# include <posix/check.h> -#endif -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace sys { - -class Condition; - -/** - * Scoped lock template: calls lock() in ctor, unlock() in dtor. - * L can be any class with lock() and unlock() functions. - */ -template <class L> -class ScopedLock -{ - public: - ScopedLock(L& l) : mutex(l) { l.lock(); } - ~ScopedLock() { mutex.unlock(); } - private: - L& mutex; -}; - -template <class L> -class ScopedUnlock -{ - public: - ScopedUnlock(L& l) : mutex(l) { l.unlock(); } - ~ScopedUnlock() { mutex.lock(); } - private: - L& mutex; -}; - -/** - * Mutex lock. - */ -class Mutex : private boost::noncopyable { - public: - typedef ScopedLock<Mutex> ScopedLock; - typedef ScopedUnlock<Mutex> ScopedUnlock; - - inline Mutex(); - inline ~Mutex(); - inline void lock(); - inline void unlock(); - inline void trylock(); - - protected: -#ifdef USE_APR - apr_thread_mutex_t* mutex; -#else - pthread_mutex_t mutex; -#endif - friend class Condition; -}; - -#ifdef USE_APR -// APR ================================================================ - -Mutex::Mutex() { - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); -} - -void Mutex::lock() { - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} -void Mutex::unlock() { - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} - -void Mutex::trylock() { - CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); -} - -#else -// POSIX ================================================================ - -/** - * PODMutex is a POD, can be static-initialized with - * PODMutex m = QPID_PODMUTEX_INITIALIZER - */ -struct PODMutex -{ - typedef ScopedLock<PODMutex> ScopedLock; - - inline void lock(); - inline void unlock(); - inline void trylock(); - - // Must be public to be a POD: - pthread_mutex_t mutex; -}; - -#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } - - -void PODMutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); -} -void PODMutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); -} - -void PODMutex::trylock() { - QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); -} - - -Mutex::Mutex() { - QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0)); -} - -Mutex::~Mutex(){ - QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); -} - -void Mutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); -} -void Mutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); -} - -void Mutex::trylock() { - QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); -} - -#endif // USE_APR - -}} - - - -#endif /*!_sys_Mutex_h*/ diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp deleted file mode 100644 index 7a0249f666..0000000000 --- a/cpp/lib/common/sys/ProducerConsumer.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - - -#include "QpidError.h" -#include "ScopedIncrement.h" -#include "ProducerConsumer.h" - -namespace qpid { -namespace sys { - -// // ================ ProducerConsumer - -ProducerConsumer::ProducerConsumer(size_t init_items) - : items(init_items), waiters(0), shutdownFlag(false) -{} - -void ProducerConsumer::shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); - // Wait for waiting consumers to wake up. - while (waiters > 0) - monitor.wait(); -} - -size_t ProducerConsumer::available() const { - Mutex::ScopedLock l(monitor); - return items; -} - -size_t ProducerConsumer::consumers() const { - Mutex::ScopedLock l(monitor); - return waiters; -} - -// ================ Lock - -ProducerConsumer::Lock::Lock(ProducerConsumer& p) - : pc(p), lock(p.monitor), status(INCOMPLETE) {} - -bool ProducerConsumer::Lock::isOk() const { - return !pc.isShutdown() && status==INCOMPLETE; -} - -void ProducerConsumer::Lock::checkOk() const { - assert(!pc.isShutdown()); - assert(status == INCOMPLETE); -} - -ProducerConsumer::Lock::~Lock() { - assert(status != INCOMPLETE || pc.isShutdown()); -} - -void ProducerConsumer::Lock::confirm() { - checkOk(); - status = CONFIRMED; -} - -void ProducerConsumer::Lock::cancel() { - checkOk(); - status = CANCELLED; -} - -// ================ ProducerLock - -ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p) -{} - - -ProducerConsumer::ProducerLock::~ProducerLock() { - if (status == CONFIRMED) { - pc.items++; - pc.monitor.notify(); // Notify a consumer. - } -} - -// ================ ConsumerLock - -ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) -{ - if (isOk()) { - ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.shutdownFlag) { - pc.monitor.wait(); - } - } -} - -ProducerConsumer::ConsumerLock::ConsumerLock( - ProducerConsumer& p, const Time& timeout) : Lock(p) -{ - if (isOk()) { - // Don't wait if timeout==0 - if (timeout == 0) { - if (pc.items == 0) - status = TIMEOUT; - return; - } - else { - Time deadline = now() + timeout; - ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.shutdownFlag) { - if (!pc.monitor.wait(deadline)) { - status = TIMEOUT; - return; - } - } - } - } -} - -ProducerConsumer::ConsumerLock::~ConsumerLock() { - if (pc.isShutdown()) { - if (pc.waiters == 0) - pc.monitor.notifyAll(); // Notify shutdown thread(s) - } - else if (status==CONFIRMED) { - pc.items--; - if (pc.items > 0) - pc.monitor.notify(); // Notify another consumer. - } -} - - -}} // namespace qpid::sys diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h deleted file mode 100644 index c7f42f266d..0000000000 --- a/cpp/lib/common/sys/ProducerConsumer.h +++ /dev/null @@ -1,165 +0,0 @@ -#ifndef _sys_ProducerConsumer_h -#define _sys_ProducerConsumer_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/noncopyable.hpp> -#include "Exception.h" -#include "sys/Monitor.h" - -namespace qpid { -namespace sys { - -/** - * Producer-consumer synchronisation. - * - * Producers increase the number of available items, consumers reduce it. - * Consumers wait till an item is available. Waiting threads can be - * woken for shutdown using shutdown(). - * - * Note: Currently implements unbounded producer-consumer, i.e. no limit - * to available items, producers never block. Can be extended to support - * bounded PC if required. - * - // TODO aconway 2007-02-13: example, from tests. -*/ -class ProducerConsumer -{ - public: - ProducerConsumer(size_t init_items=0); - - ~ProducerConsumer() { shutdown(); } - - /** - * Wake any threads waiting for ProducerLock or ConsumerLock. - *@post No threads are waiting in Producer or Consumer locks. - */ - void shutdown(); - - /** True if queue is shutdown */ - bool isShutdown() { return shutdownFlag; } - - /** Number of items available for consumers */ - size_t available() const; - - /** Number of consumers waiting for items */ - size_t consumers() const; - - /** True if available == 0 */ - bool empty() const { return available() == 0; } - - /** - * Base class for producer and consumer locks. - */ - class Lock : private boost::noncopyable { - public: - - /** - * You must call isOk() after creating a lock to verify its state. - * - *@return true means the lock succeeded. You MUST call either - *confirm() or cancel() before the lock goes out of scope. - * - * false means the lock failed - timed out or the - * ProducerConsumer is shutdown. You should not do anything in - * the scope of the lock. - */ - bool isOk() const; - - /** - * Confirm that an item was produced/consumed. - *@pre isOk() - */ - void confirm(); - - /** - * Cancel the lock to indicate nothing was produced/consumed. - * Note that locks are not actually released until destroyed. - * - *@pre isOk() - */ - void cancel(); - - /** True if this lock experienced a timeout */ - bool isTimedOut() const { return status == TIMEOUT; } - - /** True if we have been shutdown */ - bool isShutdown() const { return pc.isShutdown(); } - - ProducerConsumer& pc; - - protected: - /** Lock status */ - enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT }; - - Lock(ProducerConsumer& p); - ~Lock(); - void checkOk() const; - Mutex::ScopedLock lock; - Status status; - }; - - /** Lock for code that produces items. */ - struct ProducerLock : public Lock { - /** - * Acquire locks to produce an item. - *@post If isOk() the calling thread has exclusive access - * to produce an item. - */ - ProducerLock(ProducerConsumer& p); - - /** Release locks, signal waiting consumers if confirm() was called. */ - ~ProducerLock(); - }; - - /** Lock for code that consumes items */ - struct ConsumerLock : public Lock { - /** - * Wait for an item to consume and acquire locks. - * - *@post If isOk() there is at least one item available and the - *calling thread has exclusive access to consume it. - */ - ConsumerLock(ProducerConsumer& p); - - /** - * Wait up to timeout to acquire lock. - *@post If isOk() caller has a producer lock. - * If isTimedOut() there was a timeout. - * If neither then we were shutdown. - */ - ConsumerLock(ProducerConsumer& p, const Time& timeout); - - /** Release locks */ - ~ConsumerLock(); - }; - - private: - mutable Monitor monitor; - size_t items; - size_t waiters; - bool shutdownFlag; - - friend class Lock; - friend class ProducerLock; - friend class ConsumerLock; -}; - -}} // namespace qpid::sys - -#endif /*!_sys_ProducerConsumer_h*/ diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp deleted file mode 100644 index 30122c682f..0000000000 --- a/cpp/lib/common/sys/Runnable.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Runnable.h" -#include <boost/bind.hpp> - -namespace qpid { -namespace sys { - -Runnable::~Runnable() {} - -Runnable::Functor Runnable::functor() -{ - return boost::bind(&Runnable::run, this); -} - -}} diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h deleted file mode 100644 index fb3927c612..0000000000 --- a/cpp/lib/common/sys/Runnable.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef _Runnable_ -#define _Runnable_ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/function.hpp> - -namespace qpid { -namespace sys { - -/** - * Interface for objects that can be run, e.g. in a thread. - */ -class Runnable -{ - public: - /** Type to represent a runnable as a Functor */ - typedef boost::function0<void> Functor; - - virtual ~Runnable(); - - /** Derived classes override run(). */ - virtual void run() = 0; - - /** Create a functor object that will call this->run(). */ - Functor functor(); -}; - -}} - - -#endif diff --git a/cpp/lib/common/sys/ScopedIncrement.h b/cpp/lib/common/sys/ScopedIncrement.h deleted file mode 100644 index f14461ddaf..0000000000 --- a/cpp/lib/common/sys/ScopedIncrement.h +++ /dev/null @@ -1,59 +0,0 @@ -#ifndef _posix_ScopedIncrement_h -#define _posix_ScopedIncrement_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace sys { - -/** Increment counter in constructor and decrement in destructor. */ -template <class T> -class ScopedIncrement : boost::noncopyable -{ - public: - ScopedIncrement(T& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } - private: - T& count; -}; - - -/** Decrement counter in constructor and increment in destructor. */ -template <class T> -class ScopedDecrement : boost::noncopyable -{ - public: - ScopedDecrement(T& c) : count(c) { value = --count; } - ~ScopedDecrement() { ++count; } - - /** Return the value after the decrement. */ - operator long() { return value; } - - private: - T& count; - long value; -}; - - -}} - - -#endif // _posix_ScopedIncrement_h diff --git a/cpp/lib/common/sys/ShutdownHandler.h b/cpp/lib/common/sys/ShutdownHandler.h deleted file mode 100644 index 88baecb5b6..0000000000 --- a/cpp/lib/common/sys/ShutdownHandler.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ShutdownHandler_ -#define _ShutdownHandler_ - -namespace qpid { -namespace sys { - - class ShutdownHandler - { - public: - virtual void shutdown() = 0; - virtual ~ShutdownHandler(){} - }; - -} -} - -#endif diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h deleted file mode 100644 index d793a240c6..0000000000 --- a/cpp/lib/common/sys/Socket.h +++ /dev/null @@ -1,88 +0,0 @@ -#ifndef _sys_Socket_h -#define _sys_Socket_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include <sys/Time.h> - -#ifdef USE_APR -# include <apr_network_io.h> -#endif - -namespace qpid { -namespace sys { - -class Socket -{ - public: - /** Create an initialized TCP socket */ - static Socket createTcp(); - - /** Create a socket wrapper for descriptor. */ -#ifdef USE_APR - Socket(apr_socket_t* descriptor = 0); -#else - Socket(int descriptor = 0); -#endif - - /** Set timeout for read and write */ - void setTimeout(Time interval); - - void connect(const std::string& host, int port); - - void close(); - - enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; - - /** Returns bytes sent or an ErrorCode value < 0. */ - ssize_t send(const void* data, size_t size); - - /** - * Returns bytes received, an ErrorCode value < 0 or 0 - * if the connection closed in an orderly manner. - */ - ssize_t recv(void* data, size_t size); - - /** Bind to a port and start listening. - *@param port 0 means choose an available port. - *@param backlog maximum number of pending connections. - *@return The bound port. - */ - int listen(int port = 0, int backlog = 10); - - /** Get file descriptor */ - int fd(); - - private: -#ifdef USE_APR - apr_socket_t* socket; -#else - void init() const; - mutable int socket; // Initialized on demand. -#endif -}; - -}} - - -#endif /*!_sys_Socket_h*/ diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h deleted file mode 100644 index 47b95b6234..0000000000 --- a/cpp/lib/common/sys/Thread.h +++ /dev/null @@ -1,142 +0,0 @@ -#ifndef _sys_Thread_h -#define _sys_Thread_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/Runnable.h> - -#ifdef USE_APR -# include <apr_thread_proc.h> -# include <apr_portable.h> -# include <apr/APRPool.h> -# include <apr/APRBase.h> -#else -# include <posix/check.h> -# include <pthread.h> -#endif - -namespace qpid { -namespace sys { - -class Thread -{ - public: - inline static Thread current(); - inline static void yield(); - - inline Thread(); - inline explicit Thread(qpid::sys::Runnable*); - inline explicit Thread(qpid::sys::Runnable&); - - inline void join(); - - inline long id(); - - private: -#ifdef USE_APR - static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); - inline Thread(apr_thread_t* t); - apr_thread_t* thread; -#else - static void* runRunnable(void* runnable); - inline Thread(pthread_t); - pthread_t thread; -#endif -}; - - -Thread::Thread() : thread(0) {} - -// APR ================================================================ -#ifdef USE_APR - -Thread::Thread(Runnable* runnable) { - CHECK_APR_SUCCESS( - apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); -} - -Thread::Thread(Runnable& runnable) { - CHECK_APR_SUCCESS( - apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); -} - -void Thread::join(){ - apr_status_t status; - if (thread != 0) - CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); -} - -long Thread::id() { - return long(thread); -} - -Thread::Thread(apr_thread_t* t) : thread(t) {} - -Thread Thread::current(){ - apr_thread_t* thr; - apr_os_thread_t osthr = apr_os_thread_current(); - CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); - return Thread(thr); -} - -void Thread::yield() -{ - apr_thread_yield(); -} - - -// POSIX ================================================================ -#else - -Thread::Thread(Runnable* runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); -} - -Thread::Thread(Runnable& runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); -} - -void Thread::join(){ - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); -} - -long Thread::id() { - return long(thread); -} - -Thread::Thread(pthread_t thr) : thread(thr) {} - -Thread Thread::current() { - return Thread(pthread_self()); -} - -void Thread::yield() -{ - QPID_POSIX_THROW_IF(pthread_yield()); -} - - -#endif - -}} - -#endif /*!_sys_Thread_h*/ diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h deleted file mode 100644 index 80ea92da0e..0000000000 --- a/cpp/lib/common/sys/ThreadSafeQueue.h +++ /dev/null @@ -1,98 +0,0 @@ -#ifndef _sys_ThreadSafeQueue_h -#define _sys_ThreadSafeQueue_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <deque> -#include "ProducerConsumer.h" -#include "Exception.h" - -namespace qpid { -namespace sys { - -/** - * A thread safe queue template. - */ -template <class T, class ContainerType=std::deque<T> > -class ThreadSafeQueue -{ - public: - - ThreadSafeQueue() {} - - /** Push a value onto the back of the queue */ - void push(const T& value) { - ProducerConsumer::ProducerLock producer(pc); - if (producer.isOk()) { - producer.confirm(); - container.push_back(value); - } - } - - /** Pop a value from the front of the queue. Waits till value is available. - *@throw ShutdownException if queue is shutdown while waiting. - */ - T pop() { - ProducerConsumer::ConsumerLock consumer(pc); - if (consumer.isOk()) { - consumer.confirm(); - T value(container.front()); - container.pop_front(); - return value; - } - throw ShutdownException(); - } - - /** - * If a value becomes available within the timeout, set outValue - * and return true. Otherwise return false; - */ - bool pop(T& outValue, const Time& timeout) { - ProducerConsumer::ConsumerLock consumer(pc, timeout); - if (consumer.isOk()) { - consumer.confirm(); - outValue = container.front(); - container.pop_front(); - return true; - } - return false; - } - - /** Interrupt threads waiting in pop() */ - void shutdown() { pc.shutdown(); } - - /** True if queue is shutdown */ - bool isShutdown() { return pc.isShutdown(); } - - /** Size of the queue */ - size_t size() { ProducerConsumer::Lock l(pc); return container.size(); } - - /** True if queue is empty */ - bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); } - - private: - ProducerConsumer pc; - ContainerType container; -}; - -}} // namespace qpid::sys - - - -#endif /*!_sys_ThreadSafeQueue_h*/ diff --git a/cpp/lib/common/sys/Time.cpp b/cpp/lib/common/sys/Time.cpp deleted file mode 100644 index ad6185b966..0000000000 --- a/cpp/lib/common/sys/Time.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Time.h" - -namespace qpid { -namespace sys { - -// APR ================================================================ -#if USE_APR - -Time now() { return apr_time_now() * TIME_USEC; } - -// POSIX================================================================ -#else - -Time now() { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - return toTime(ts); -} - -struct timespec toTimespec(const Time& t) { - struct timespec ts; - toTimespec(ts, t); - return ts; -} - -struct timespec& toTimespec(struct timespec& ts, const Time& t) { - ts.tv_sec = t / TIME_SEC; - ts.tv_nsec = t % TIME_SEC; - return ts; -} - -Time toTime(const struct timespec& ts) { - return ts.tv_sec*TIME_SEC + ts.tv_nsec; -} - - -#endif -}} - diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h deleted file mode 100644 index 3dd46741d8..0000000000 --- a/cpp/lib/common/sys/Time.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef _sys_Time_h -#define _sys_Time_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <stdint.h> - -#ifdef USE_APR -# include <apr_time.h> -#else -# include <time.h> -#endif - -namespace qpid { -namespace sys { - -/** Time in nanoseconds */ -typedef int64_t Time; - -Time now(); - -/** Nanoseconds per second. */ -const Time TIME_SEC = 1000*1000*1000; -/** Nanoseconds per millisecond */ -const Time TIME_MSEC = 1000*1000; -/** Nanoseconds per microseconds. */ -const Time TIME_USEC = 1000; -/** Nanoseconds per nanosecond. */ -const Time TIME_NSEC = 1; - -#ifndef USE_APR -struct timespec toTimespec(const Time& t); -struct timespec& toTimespec(struct timespec& ts, const Time& t); -Time toTime(const struct timespec& ts); -#endif - -}} - -#endif /*!_sys_Time_h*/ diff --git a/cpp/lib/common/sys/TimeoutHandler.h b/cpp/lib/common/sys/TimeoutHandler.h deleted file mode 100644 index 0c10709bbf..0000000000 --- a/cpp/lib/common/sys/TimeoutHandler.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TimeoutHandler_ -#define _TimeoutHandler_ - -namespace qpid { -namespace sys { - - class TimeoutHandler - { - public: - virtual void idleOut() = 0; - virtual void idleIn() = 0; - virtual ~TimeoutHandler(){} - }; - -} -} - - -#endif diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp deleted file mode 100644 index 604fb9f5ca..0000000000 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <sys/Acceptor.h> -#include <sys/ConnectionInputHandlerFactory.h> -#include "LFProcessor.h" -#include "LFSessionContext.h" -#include "APRBase.h" -#include "APRPool.h" - -namespace qpid { -namespace sys { - -class APRAcceptor : public Acceptor -{ - public: - APRAcceptor(int16_t port, int backlog, int threads, bool trace); - virtual uint16_t getPort() const; - virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); - virtual void shutdown(); - - private: - void shutdownImpl(); - - private: - int16_t port; - bool trace; - LFProcessor processor; - apr_socket_t* socket; - volatile bool running; - Mutex shutdownLock; -}; - -// Define generic Acceptor::create() to return APRAcceptor. -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) -{ - return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace)); -} -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : - port(port_), - trace(trace_), - processor(APRPool::get(), threads, 1000, 5000000), - running(false) -{ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); -} - -uint16_t APRAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void APRAcceptor::run(ConnectionInputHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running) { - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace); - session->init(factory->create(session)); - }else{ - Mutex::ScopedLock locker(shutdownLock); - if(running) { - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - shutdownImpl(); - } - } - } -} - -void APRAcceptor::shutdown() { - Mutex::ScopedLock locker(shutdownLock); - if (running) - shutdownImpl(); -} - -void APRAcceptor::shutdownImpl() { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); -} - - -}} diff --git a/cpp/lib/common/sys/apr/APRBase.cpp b/cpp/lib/common/sys/apr/APRBase.cpp deleted file mode 100644 index 861071499f..0000000000 --- a/cpp/lib/common/sys/apr/APRBase.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <QpidError.h> -#include "APRBase.h" - -using namespace qpid::sys; - -APRBase* APRBase::instance = 0; - -APRBase* APRBase::getInstance(){ - if(instance == 0){ - instance = new APRBase(); - } - return instance; -} - - -APRBase::APRBase() : count(0){ - apr_initialize(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); -} - -APRBase::~APRBase(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - apr_terminate(); -} - -bool APRBase::_increment(){ - bool deleted(false); - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(this == instance){ - count++; - }else{ - deleted = true; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - return !deleted; -} - -void APRBase::_decrement(){ - APRBase* copy = 0; - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(--count == 0){ - copy = instance; - instance = 0; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - if(copy != 0){ - delete copy; - } -} - -void APRBase::increment(){ - int count = 0; - while(count++ < 2 && !getInstance()->_increment()){ - std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; - } -} - -void APRBase::decrement(){ - getInstance()->_decrement(); -} - -std::string qpid::sys::get_desc(apr_status_t status){ - const int size = 50; - char tmp[size]; - return std::string(apr_strerror(status, tmp, size)); -} - diff --git a/cpp/lib/common/sys/apr/APRBase.h b/cpp/lib/common/sys/apr/APRBase.h deleted file mode 100644 index 6a866a554a..0000000000 --- a/cpp/lib/common/sys/apr/APRBase.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _APRBase_ -#define _APRBase_ - -#include <string> -#include <apr_thread_mutex.h> -#include <apr_errno.h> -#include <QpidError.h> - -namespace qpid { -namespace sys { - - /** - * Use of APR libraries necessitates explicit init and terminate - * calls. Any class using APR libs should obtain the reference to - * this singleton and increment on construction, decrement on - * destruction. This class can then correctly initialise apr - * before the first use and terminate after the last use. - */ - class APRBase{ - static APRBase* instance; - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - int count; - - APRBase(); - ~APRBase(); - static APRBase* getInstance(); - bool _increment(); - void _decrement(); - public: - static void increment(); - static void decrement(); - }; - - //this is also a convenient place for a helper function for error checking: - void check(apr_status_t status, const char* file, const int line); - std::string get_desc(apr_status_t status); - -#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__); - -} -} - -// Inlined as it is called *a lot* -void inline qpid::sys::check(apr_status_t status, const char* file, const int line){ - if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw qpid::QpidError(APR_ERROR + ((int) status), msg, - qpid::SrcLine(file, line)); - } -} - - - - -#endif diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp deleted file mode 100644 index e8b71f6e8a..0000000000 --- a/cpp/lib/common/sys/apr/APRPool.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "APRPool.h" -#include "APRBase.h" -#include <boost/pool/detail/singleton.hpp> - -using namespace qpid::sys; - -APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -apr_pool_t* APRPool::get() { - return boost::details::pool::singleton_default<APRPool>::instance().pool; -} - diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h deleted file mode 100644 index da7661fcfa..0000000000 --- a/cpp/lib/common/sys/apr/APRPool.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef _APRPool_ -#define _APRPool_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/noncopyable.hpp> -#include <apr_pools.h> - -namespace qpid { -namespace sys { -/** - * Singleton APR memory pool. - */ -class APRPool : private boost::noncopyable { - public: - APRPool(); - ~APRPool(); - - /** Get singleton instance */ - static apr_pool_t* get(); - - private: - apr_pool_t* pool; -}; - -}} - - - - - -#endif /*!_APRPool_*/ diff --git a/cpp/lib/common/sys/apr/APRSocket.cpp b/cpp/lib/common/sys/apr/APRSocket.cpp deleted file mode 100644 index 96dbd132a1..0000000000 --- a/cpp/lib/common/sys/apr/APRSocket.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "APRBase.h" -#include "APRSocket.h" -#include <assert.h> -#include <iostream> - -using namespace qpid::sys; -using namespace qpid::framing; - -APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ - -} - -void APRSocket::read(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - bytes = buffer.available(); - apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); - buffer.move(bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out - }else if(APR_STATUS_IS_EOF(s)){ - close(); - } -} - -void APRSocket::write(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - do{ - bytes = buffer.available(); - apr_socket_send(socket, buffer.start(), &bytes); - buffer.move(bytes); - }while(bytes > 0); -} - -void APRSocket::close(){ - if(!closed){ - std::cout << "Closing socket " << socket << "@" << this << std::endl; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - closed = true; - } -} - -bool APRSocket::isOpen() const { - return !closed; -} - -uint8_t APRSocket::read(){ - char data[1]; - apr_size_t bytes = 1; - apr_status_t s = apr_socket_recv(socket, data, &bytes); - if(APR_STATUS_IS_EOF(s) || bytes == 0){ - return 0; - }else{ - return *data; - } -} - -APRSocket::~APRSocket(){ -} diff --git a/cpp/lib/common/sys/apr/APRSocket.h b/cpp/lib/common/sys/apr/APRSocket.h deleted file mode 100644 index a55dfc06b0..0000000000 --- a/cpp/lib/common/sys/apr/APRSocket.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _APRSocket_ -#define _APRSocket_ - -#include <apr_network_io.h> -#include <Buffer.h> - -namespace qpid { -namespace sys { - - class APRSocket - { - apr_socket_t* const socket; - volatile bool closed; - public: - APRSocket(apr_socket_t* socket); - void read(qpid::framing::Buffer& b); - void write(qpid::framing::Buffer& b); - void close(); - bool isOpen() const; - uint8_t read(); - ~APRSocket(); - }; - -} -} - - -#endif diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp deleted file mode 100644 index 2b6fc92623..0000000000 --- a/cpp/lib/common/sys/apr/LFProcessor.cpp +++ /dev/null @@ -1,179 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <sstream> -#include <QpidError.h> -#include "LFProcessor.h" -#include "APRBase.h" -#include "LFSessionContext.h" - -using namespace qpid::sys; -using qpid::QpidError; - -// TODO aconway 2006-10-12: stopped is read outside locks. -// - -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : - size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - workerCount(_workers), - hasLeader(false), - workers(new Thread[_workers]), - stopped(false) -{ - - CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); -} - - -LFProcessor::~LFProcessor(){ - if (!stopped) stop(); - delete[] workers; - CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); -} - -void LFProcessor::start(){ - for(int i = 0; i < workerCount; i++){ - workers[i] = Thread(this); - } -} - -void LFProcessor::add(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - Monitor::ScopedLock l(countLock); - sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); - count++; -} - -void LFProcessor::remove(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - Monitor::ScopedLock l(countLock); - sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); - count--; -} - -void LFProcessor::reactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -void LFProcessor::deactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); -} - -void LFProcessor::update(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -bool LFProcessor::full(){ - Mutex::ScopedLock locker(countLock); - return count == size; -} - -bool LFProcessor::empty(){ - Mutex::ScopedLock locker(countLock); - return count == 0; -} - -void LFProcessor::poll() { - apr_status_t status = APR_EGENERAL; - do{ - current = 0; - if(!stopped){ - status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); - } - }while(status != APR_SUCCESS && !stopped); -} - -void LFProcessor::run(){ - try{ - while(!stopped){ - const apr_pollfd_t* event = 0; - LFSessionContext* session = 0; - { - Monitor::ScopedLock l(leadLock); - waitToLead(); - event = getNextEvent(); - if(!event) return; - session = reinterpret_cast<LFSessionContext*>( - event->client_data); - session->startProcessing(); - relinquishLead(); - } - - //process event: - if(event->rtnevents & APR_POLLIN) session->read(); - if(event->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - Monitor::ScopedLock l(countLock); - sessions.erase(find(sessions.begin(),sessions.end(), session)); - count--; - }else{ - session->stopProcessing(); - } - } - }catch(std::exception e){ - std::cout << e.what() << std::endl; - } -} - -void LFProcessor::waitToLead(){ - while(hasLeader && !stopped) leadLock.wait(); - hasLeader = !stopped; -} - -void LFProcessor::relinquishLead(){ - hasLeader = false; - leadLock.notify(); -} - -const apr_pollfd_t* LFProcessor::getNextEvent(){ - while(true){ - if(stopped){ - return 0; - }else if(current < signalledCount){ - //use result of previous poll if one is available - return signalledFDs + (current++); - }else{ - //else poll to get new events - poll(); - } - } -} - -void LFProcessor::stop(){ - stopped = true; - { - Monitor::ScopedLock l(leadLock); - leadLock.notifyAll(); - } - for(int i = 0; i < workerCount; i++){ - workers[i].join(); - } - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } -} - diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h deleted file mode 100644 index de90199472..0000000000 --- a/cpp/lib/common/sys/apr/LFProcessor.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _LFProcessor_ -#define _LFProcessor_ - -#include <apr_poll.h> -#include <iostream> -#include <vector> -#include <sys/Monitor.h> -#include <sys/Runnable.h> -#include <sys/Thread.h> - -namespace qpid { -namespace sys { - - class LFSessionContext; - - /** - * This class processes a poll set using the leaders-followers - * pattern for thread synchronization: the leader will poll and on - * the poll returning, it will remove a session, promote a - * follower to leadership, then process the session. - */ - class LFProcessor : private virtual qpid::sys::Runnable - { - typedef std::vector<LFSessionContext*>::iterator iterator; - - const int size; - const apr_interval_time_t timeout; - apr_pollset_t* pollset; - int signalledCount; - int current; - const apr_pollfd_t* signalledFDs; - int count; - const int workerCount; - bool hasLeader; - qpid::sys::Thread* workers; - qpid::sys::Monitor leadLock; - qpid::sys::Mutex countLock; - std::vector<LFSessionContext*> sessions; - volatile bool stopped; - - const apr_pollfd_t* getNextEvent(); - void waitToLead(); - void relinquishLead(); - void poll(); - virtual void run(); - - public: - LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); - /** - * Add the fd to the poll set. Relies on the client_data being - * an instance of LFSessionContext. - */ - void add(const apr_pollfd_t* const fd); - /** - * Remove the fd from the poll set. - */ - void remove(const apr_pollfd_t* const fd); - /** - * Signal that the fd passed in, already part of the pollset, - * has had its flags altered. - */ - void update(const apr_pollfd_t* const fd); - /** - * Add an fd back to the poll set after deactivation. - */ - void reactivate(const apr_pollfd_t* const fd); - /** - * Temporarily remove the fd from the poll set. Called when processing - * is about to begin. - */ - void deactivate(const apr_pollfd_t* const fd); - /** - * Indicates whether the capacity of this processor has been - * reached (or whether it can still handle further fd's). - */ - bool full(); - /** - * Indicates whether there are any fd's registered. - */ - bool empty(); - /** - * Stop processing. - */ - void stop(); - /** - * Start processing. - */ - void start(); - /** - * Is processing stopped? - */ - bool isStopped(); - - ~LFProcessor(); - }; - -} -} - - -#endif diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp deleted file mode 100644 index 5edb72baee..0000000000 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ /dev/null @@ -1,179 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "LFSessionContext.h" -#include "APRBase.h" -#include <QpidError.h> -#include <assert.h> - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(65536), - out(65536), - processor(_processor), - processing(false), - closing(false) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - try{ - while(frame.decode(in)){ - if(debug) log("RECV", &frame); - handler->received(&frame); - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg - << " (" << error.loc.file << ":" << error.loc.line - << ")" << std::endl; - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(protocolInit); - initiated = true; - if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; - } - } - in.compact(); -} - -void LFSessionContext::write(){ - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - Mutex::ScopedLock l(writeLock); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ - encoded = true; - frame->encode(out); - if(debug) log("SENT", frame); - delete frame; - framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - if(closing){ - socket.close(); - } - } - } - } -} - -void LFSessionContext::send(AMQFrame* frame){ - Mutex::ScopedLock l(writeLock); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } -} - -void LFSessionContext::startProcessing(){ - Mutex::ScopedLock l(writeLock); - processing = true; - processor->deactivate(&fd); -} - -void LFSessionContext::stopProcessing(){ - Mutex::ScopedLock l(writeLock); - processor->reactivate(&fd); - processing = false; -} - -void LFSessionContext::close(){ - Mutex::ScopedLock l(writeLock); - closing = true; - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } -} - -void LFSessionContext::handleClose(){ - handler->closed(); - std::cout << "Session closed [" << &socket << "]" << std::endl; - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(ConnectionInputHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - Mutex::ScopedLock l(logLock); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; -} - -Mutex LFSessionContext::logLock; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h deleted file mode 100644 index 81cfc0efda..0000000000 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _LFSessionContext_ -#define _LFSessionContext_ - -#include <queue> - -#include <apr_network_io.h> -#include <apr_poll.h> -#include <apr_time.h> - -#include <AMQFrame.h> -#include <Buffer.h> -#include <sys/Monitor.h> -#include <sys/ConnectionOutputHandler.h> -#include <sys/ConnectionInputHandler.h> - -#include "APRSocket.h" -#include "LFProcessor.h" - -namespace qpid { -namespace sys { - - -class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler -{ - const bool debug; - APRSocket socket; - bool initiated; - - qpid::framing::Buffer in; - qpid::framing::Buffer out; - - qpid::sys::ConnectionInputHandler* handler; - LFProcessor* const processor; - - apr_pollfd_t fd; - - std::queue<qpid::framing::AMQFrame*> framesToWrite; - qpid::sys::Mutex writeLock; - - bool processing; - bool closing; - - static qpid::sys::Mutex logLock; - void log(const std::string& desc, - qpid::framing::AMQFrame* const frame); - - - public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, - LFProcessor* const processor, - bool debug = false); - virtual ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void read(); - void write(); - void init(qpid::sys::ConnectionInputHandler* handler); - void startProcessing(); - void stopProcessing(); - void handleClose(); - void shutdown(); - inline apr_pollfd_t* const getFd(){ return &fd; } - inline bool isClosed(){ return !socket.isOpen(); } -}; - -} -} - - -#endif diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp deleted file mode 100644 index bca4da6c96..0000000000 --- a/cpp/lib/common/sys/apr/Socket.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include <sys/Socket.h> -#include <apr/APRBase.h> -#include <apr/APRPool.h> - - -using namespace qpid::sys; - -Socket Socket::createTcp() { - Socket s; - CHECK_APR_SUCCESS( - apr_socket_create( - &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, - APRPool::get())); - return s; -} - -Socket::Socket(apr_socket_t* s) { - socket = s; -} - -void Socket::setTimeout(Time interval) { - apr_socket_timeout_set(socket, interval/TIME_USEC); -} - -void Socket::connect(const std::string& host, int port) { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS( - apr_sockaddr_info_get( - &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, - APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); -} - -void Socket::close() { - if (socket == 0) return; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - socket = 0; -} - -ssize_t Socket::send(const void* data, size_t size) -{ - apr_size_t sent = size; - apr_status_t status = - apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent); - if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; - if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF; - CHECK_APR_SUCCESS(status); - return sent; -} - -ssize_t Socket::recv(void* data, size_t size) -{ - apr_size_t received = size; - apr_status_t status = - apr_socket_recv(socket, reinterpret_cast<char*>(data), &received); - if (APR_STATUS_IS_TIMEUP(status)) - return SOCKET_TIMEOUT; - if (APR_STATUS_IS_EOF(status)) - return SOCKET_EOF; - CHECK_APR_SUCCESS(status); - return received; -} - - diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp deleted file mode 100644 index 5c4799aa96..0000000000 --- a/cpp/lib/common/sys/apr/Thread.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/Thread.h> - -using namespace qpid::sys; -using qpid::sys::Runnable; - -void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) { - reinterpret_cast<Runnable*>(data)->run(); - CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); - return NULL; -} - - diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp deleted file mode 100644 index 16c7ec9c3f..0000000000 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <mqueue.h> -#include <string.h> -#include <iostream> - -#include <sys/errno.h> -#include <sys/socket.h> -#include <sys/epoll.h> - -#include <typeinfo> -#include <iostream> -#include <queue> - -#include <boost/ptr_container/ptr_map.hpp> -#include <boost/current_function.hpp> - -#include <QpidError.h> -#include <sys/Monitor.h> - -#include "check.h" -#include "EventChannel.h" - -using namespace std; - - -// Convenience template to zero out a struct. -template <class S> struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - -namespace qpid { -namespace sys { - - -/** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor -{ - public: - EventHandler(int epollSize = 256); - ~EventHandler(); - - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); - - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); - - private: - int epollFd; - std::string mqName; - int mqFd; - std::queue<Event*> mqEvents; -}; - -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); - - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. - // - ZeroStruct<struct mq_attr> attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. - - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); -} - -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); -} - -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); -} - -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) - return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; -} - -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); -} - -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); -} - -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); -} - -Event* EventHandler::complete(EventHandler& eh) -{ - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); -} - -// ================================================================ -// EventChannel - -EventChannel::shared_ptr EventChannel::create() { - return shared_ptr(new EventChannel()); -} - -EventChannel::EventChannel() : handler(new EventHandler()) {} - -EventChannel::~EventChannel() {} - -void EventChannel::postEvent(Event& e) -{ - e.prepare(*handler); -} - -Event* EventChannel::getEvent() -{ - static const int infiniteTimeout = -1; - ZeroStruct<struct epoll_event> epollEvent; - - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - // TODO aconway 2006-11-28: Proper handling/logging of errors. - cerr << BOOST_CURRENT_FUNCTION << " ignoring error " - << PosixError::getMessage(errno) << endl; - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast<Event*>(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; -} - -Event::~Event() {} - -void Event::prepare(EventHandler& handler) -{ - handler.mqPut(this); -} - -bool Event::hasError() const { - return error; -} - -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); -} - -Event* Event::complete(EventHandler&) -{ - return this; -} - -void Event::dispatch() -{ - try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); - } -} - -void Event::setError(const ExceptionHolder& e) { - error = e; -} - -void ReadEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); -} - -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, - size - received); - if (n > 0) received += n; - return n; -} - -Event* ReadEvent::complete(EventHandler& handler) -{ - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); - return 0; - } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); - } -} - -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); -} - -Event* WriteEvent::complete(EventHandler& handler) -{ - ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; - } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); - return this; -} - -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); -} - -Event* AcceptEvent::complete(EventHandler& handler) -{ - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); - return this; -} - -}} diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h deleted file mode 100644 index 49c7fce740..0000000000 --- a/cpp/lib/common/sys/posix/EventChannel.h +++ /dev/null @@ -1,176 +0,0 @@ -#ifndef _sys_EventChannel_h -#define _sys_EventChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <SharedObject.h> -#include <ExceptionHolder.h> -#include <boost/function.hpp> -#include <memory> - -namespace qpid { -namespace sys { - -class Event; -class EventHandler; -class EventChannel; - -/** - * Base class for all Events. - */ -class Event -{ - public: - /** Type for callback when event is dispatched */ - typedef boost::function0<void> Callback; - - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - - virtual ~Event(); - - /** Call the callback provided to the constructor, if any. */ - void dispatch(); - - /** True if there was an error processing this event */ - bool hasError() const; - - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); - - protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); - - Callback callback; - ExceptionHolder error; - - friend class EventChannel; - friend class EventHandler; -}; - -template <class BufT> -class IOEvent : public Event { - public: - void getDescriptor() const { return descriptor; } - size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - - protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} - - int descriptor; - BufT buffer; - size_t size; -}; - -/** Asynchronous read event */ -class ReadEvent : public IOEvent<void*> -{ - public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<void*>(fd, cb, sz, buf), received(0) {} - - private: - void prepare(EventHandler&); - Event* complete(EventHandler&); - ssize_t doRead(); - - size_t received; -}; - -/** Asynchronous write event */ -class WriteEvent : public IOEvent<const void*> -{ - public: - explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, - Callback cb=0) : - IOEvent<const void*>(fd, cb, sz, buf), written(0) {} - - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); - - private: - ssize_t doWrite(); - size_t written; -}; - -/** Asynchronous socket accept event */ -class AcceptEvent : public Event -{ - public: - /** Accept a connection on fd. */ - explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ - int getAcceptedDesscriptor() const { return accepted; } - - private: - void prepare(EventHandler&); - Event* complete(EventHandler&); - - int descriptor; - int accepted; -}; - - -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr<EventHandler> handler; -}; - - -}} - - - -#endif /*!_sys_EventChannel_h*/ diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp deleted file mode 100644 index 548fbd1881..0000000000 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> - -#include <boost/assert.hpp> -#include <boost/ptr_container/ptr_vector.hpp> -#include <boost/ptr_container/ptr_deque.hpp> -#include <boost/bind.hpp> -#include <boost/scoped_ptr.hpp> - -#include <sys/ConnectionOutputHandler.h> -#include <sys/ConnectionInputHandler.h> -#include <sys/ConnectionInputHandlerFactory.h> -#include <sys/Acceptor.h> -#include <sys/Socket.h> -#include <framing/Buffer.h> -#include <framing/AMQFrame.h> -#include <Exception.h> - -#include "EventChannelConnection.h" - -namespace qpid { -namespace sys { - -using namespace qpid::framing; -using namespace std; - -class EventChannelAcceptor : public Acceptor { - public: - - - EventChannelAcceptor( - int16_t port_, int backlog, int nThreads, bool trace_ - ); - - int getPort() const; - - void run(ConnectionInputHandlerFactory& factory); - - void shutdown(); - - private: - - void accept(); - - Mutex lock; - Socket listener; - const int port; - const bool isTrace; - bool isRunning; - boost::ptr_vector<EventChannelConnection> connections; - AcceptEvent acceptEvent; - ConnectionInputHandlerFactory* factory; - bool isShutdown; - EventChannelThreads::shared_ptr threads; -}; - -Acceptor::shared_ptr Acceptor::create( - int16_t port, int backlog, int threads, bool trace) -{ - return Acceptor::shared_ptr( - new EventChannelAcceptor(port, backlog, threads, trace)); -} - -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -EventChannelAcceptor::EventChannelAcceptor( - int16_t port_, int backlog, int nThreads, bool trace_ -) : listener(Socket::createTcp()), - port(listener.listen(int(port_), backlog)), - isTrace(trace_), - isRunning(false), - acceptEvent(listener.fd(), - boost::bind(&EventChannelAcceptor::accept, this)), - factory(0), - isShutdown(false), - threads(EventChannelThreads::create(EventChannel::create(), nThreads)) -{ } - -int EventChannelAcceptor::getPort() const { - return port; // Immutable no need for lock. -} - -void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) { - { - Mutex::ScopedLock l(lock); - if (!isRunning && !isShutdown) { - isRunning = true; - factory = &f; - threads->post(acceptEvent); - } - } - threads->join(); // Wait for shutdown. -} - -void EventChannelAcceptor::shutdown() { - bool doShutdown = false; - { - Mutex::ScopedLock l(lock); - doShutdown = !isShutdown; // I'm the shutdown thread. - isShutdown = true; - } - if (doShutdown) { - ::close(acceptEvent.getDescriptor()); - threads->shutdown(); - for_each(connections.begin(), connections.end(), - boost::bind(&EventChannelConnection::close, _1)); - } - threads->join(); -} - -void EventChannelAcceptor::accept() -{ - // No lock, we only post one accept event at a time. - if (isShutdown) - return; - if (acceptEvent.getException()) { - Exception::log(*acceptEvent.getException(), - "EventChannelAcceptor::accept"); - shutdown(); - return; - } - // TODO aconway 2006-11-29: Need to reap closed connections also. - int fd = acceptEvent.getAcceptedDesscriptor(); - connections.push_back( - new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->post(acceptEvent); // Keep accepting. -} - -}} // namespace qpid::sys diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.cpp b/cpp/lib/common/sys/posix/EventChannelConnection.cpp deleted file mode 100644 index 4449dc3035..0000000000 --- a/cpp/lib/common/sys/posix/EventChannelConnection.cpp +++ /dev/null @@ -1,229 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <iostream> - -#include <boost/bind.hpp> -#include <boost/assert.hpp> - -#include "EventChannelConnection.h" -#include "sys/ConnectionInputHandlerFactory.h" -#include "QpidError.h" - -using namespace std; -using namespace qpid; -using namespace qpid::framing; - -namespace qpid { -namespace sys { - -const size_t EventChannelConnection::bufferSize = 65536; - -EventChannelConnection::EventChannelConnection( - EventChannelThreads::shared_ptr threads_, - ConnectionInputHandlerFactory& factory_, - int rfd, - int wfd, - bool isTrace_ -) : - readFd(rfd), - writeFd(wfd ? wfd : rfd), - readCallback(boost::bind(&EventChannelConnection::closeOnException, - this, &EventChannelConnection::endInitRead)), - - isWriting(false), - isClosed(false), - threads(threads_), - handler(factory_.create(this)), - in(bufferSize), - out(bufferSize), - isTrace(isTrace_) -{ - BOOST_ASSERT(readFd > 0); - BOOST_ASSERT(writeFd > 0); - closeOnException(&EventChannelConnection::startRead); -} - - -void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) { - { - Monitor::ScopedLock lock(monitor); - assert(frame.get()); - writeFrames.push_back(frame.release()); - } - closeOnException(&EventChannelConnection::startWrite); -} - -void EventChannelConnection::close() { - { - Monitor::ScopedLock lock(monitor); - if (isClosed) - return; - isClosed = true; - } - ::close(readFd); - ::close(writeFd); - { - Monitor::ScopedLock lock(monitor); - while (busyThreads > 0) - monitor.wait(); - } - handler->closed(); -} - -void EventChannelConnection::closeNoThrow() { - Exception::tryCatchLog<void>( - boost::bind(&EventChannelConnection::close, this), - false, - "Exception closing channel" - ); -} - -/** - * Call f in a try/catch block and close the connection if - * an exception is thrown. - */ -void EventChannelConnection::closeOnException(MemberFnPtr f) -{ - try { - Exception::tryCatchLog<void>( - boost::bind(f, this), - "Closing connection due to exception" - ); - return; - } catch (...) { - // Exception was already logged by tryCatchLog - closeNoThrow(); - } -} - -// Post the write event. -// Always called inside closeOnException. -// Called by endWrite and send, but only one thread writes at a time. -// -void EventChannelConnection::startWrite() { - FrameQueue::auto_type frame; - { - Monitor::ScopedLock lock(monitor); - // Stop if closed or a write event is already in progress. - if (isClosed || isWriting) - return; - if (writeFrames.empty()) { - isWriting = false; - return; - } - isWriting = true; - frame = writeFrames.pop_front(); - } - // No need to lock here - only one thread can be writing at a time. - out.clear(); - if (isTrace) - cout << "Send on socket " << writeFd << ": " << *frame << endl; - frame->encode(out); - out.flip(); - writeEvent = WriteEvent( - writeFd, out.start(), out.available(), - boost::bind(&EventChannelConnection::closeOnException, - this, &EventChannelConnection::endWrite)); - threads->post(writeEvent); -} - -// ScopedBusy ctor increments busyThreads. -// dtor decrements and calls monitor.notifyAll if it reaches 0. -// -struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement -{ - ScopedBusy(EventChannelConnection& ecc) - : AtomicCount::ScopedIncrement( - ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor)) - {} -}; - -// Write event completed. -// Always called by a channel thread inside closeOnException. -// -void EventChannelConnection::endWrite() { - ScopedBusy(*this); - { - Monitor::ScopedLock lock(monitor); - isWriting = false; - if (isClosed) - return; - writeEvent.throwIfException(); - } - // Check if there's more in to write in the write queue. - startWrite(); -} - - -// Post the read event. -// Always called inside closeOnException. -// Called from ctor and end[Init]Read, so only one call at a time -// is possible since we only post one read event at a time. -// -void EventChannelConnection::startRead() { - // Non blocking read, as much as we can swallow. - readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback,true); - threads->post(readEvent); -} - -// Completion of initial read, expect protocolInit. -// Always called inside closeOnException in channel thread. -// Only called by one thread at a time. -void EventChannelConnection::endInitRead() { - ScopedBusy(*this); - if (!isClosed) { - readEvent.throwIfException(); - in.move(readEvent.getBytesRead()); - in.flip(); - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); - readCallback = boost::bind( - &EventChannelConnection::closeOnException, - this, &EventChannelConnection::endRead); - } - in.compact(); - // Continue reading. - startRead(); - } -} - -// Normal reads, expect a frame. -// Always called inside closeOnException in channel thread. -void EventChannelConnection::endRead() { - ScopedBusy(*this); - if (!isClosed) { - readEvent.throwIfException(); - in.move(readEvent.getBytesRead()); - in.flip(); - AMQFrame frame; - while (frame.decode(in)) { - // TODO aconway 2006-11-30: received should take Frame& - if (isTrace) - cout << "Received on socket " << readFd - << ": " << frame << endl; - handler->received(&frame); - } - in.compact(); - startRead(); - } -} - -}} // namespace qpid::sys diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h deleted file mode 100644 index da7b6dca27..0000000000 --- a/cpp/lib/common/sys/posix/EventChannelConnection.h +++ /dev/null @@ -1,102 +0,0 @@ -#ifndef _posix_EventChannelConnection_h -#define _posix_EventChannelConnection_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/ptr_container/ptr_deque.hpp> - -#include "EventChannelThreads.h" -#include "sys/Monitor.h" -#include "sys/ConnectionOutputHandler.h" -#include "sys/ConnectionInputHandler.h" -#include "sys/AtomicCount.h" -#include "framing/AMQFrame.h" - -namespace qpid { -namespace sys { - -class ConnectionInputHandlerFactory; - -/** - * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler - * for a connection via the EventChannel. - *@param readDescriptor file descriptor for reading. - *@param writeDescriptor file descriptor for writing, - * by default same as readDescriptor - */ -class EventChannelConnection : public ConnectionOutputHandler { - public: - EventChannelConnection( - EventChannelThreads::shared_ptr threads, - ConnectionInputHandlerFactory& factory, - int readDescriptor, - int writeDescriptor = 0, - bool isTrace = false - ); - - // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr - virtual void send(qpid::framing::AMQFrame* frame) { - send(std::auto_ptr<qpid::framing::AMQFrame>(frame)); - } - - virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame); - - virtual void close(); - - private: - typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue; - typedef void (EventChannelConnection::*MemberFnPtr)(); - struct ScopedBusy; - - void startWrite(); - void endWrite(); - void startRead(); - void endInitRead(); - void endRead(); - void closeNoThrow(); - void closeOnException(MemberFnPtr); - bool shouldContinue(bool& flag); - - static const size_t bufferSize; - - Monitor monitor; - - int readFd, writeFd; - ReadEvent readEvent; - WriteEvent writeEvent; - Event::Callback readCallback; - bool isWriting; - bool isClosed; - AtomicCount busyThreads; - - EventChannelThreads::shared_ptr threads; - std::auto_ptr<ConnectionInputHandler> handler; - qpid::framing::Buffer in, out; - FrameQueue writeFrames; - bool isTrace; - - friend struct ScopedBusy; -}; - - -}} // namespace qpid::sys - - - -#endif /*!_posix_EventChannelConnection_h*/ diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp deleted file mode 100644 index 95e699e0b0..0000000000 --- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "EventChannelThreads.h" -#include <sys/Runnable.h> -#include <iostream> -using namespace std; -#include <boost/bind.hpp> - -namespace qpid { -namespace sys { - -EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) -{ - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); -} - -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) -{ - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); -} - -EventChannelThreads::~EventChannelThreads() { - shutdown(); - join(); -} - -void EventChannelThreads::shutdown() -{ - ScopedLock lock(*this); - if (state != RUNNING) // Already shutting down. - return; - for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); - } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. -} - -void EventChannelThreads::join() -{ - { - ScopedLock lock(*this); - while (state == RUNNING) // Wait for shutdown to start. - wait(); - if (state == SHUTDOWN) // Shutdown is complete - return; - if (state == JOINING) { - // Someone else is doing the join. - while (state != SHUTDOWN) - wait(); - return; - } - // I'm the joining thread - assert(state == TERMINATE_SENT); - state = JOINING; - } // Drop the lock. - - for (size_t i = 0; i < workers.size(); ++i) { - assert(state == JOINING); // Only this thread can change JOINING. - workers[i].join(); - } - state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. -} - -void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); -} - -void EventChannelThreads::run() -{ - // Start life waiting. Decrement on exit. - AtomicCount::ScopedIncrement inc(nWaiting); - try { - while (true) { - Event* e = channel->getEvent(); - assert(e != 0); - if (e == &terminate) { - return; - } - AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) - addThread(); - e->dispatch(); - } - } - catch (const std::exception& e) { - // TODO aconway 2006-11-15: need better logging across the board. - std::cerr << "EventChannelThreads::run() caught: " << e.what() - << std::endl; - } - catch (...) { - std::cerr << "EventChannelThreads::run() caught unknown exception." - << std::endl; - } -} - -}} diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h deleted file mode 100644 index 98403c0869..0000000000 --- a/cpp/lib/common/sys/posix/EventChannelThreads.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef _posix_EventChannelThreads_h -#define _sys_EventChannelThreads_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <vector> - -#include <Exception.h> -#include <sys/Time.h> -#include <sys/Monitor.h> -#include <sys/Thread.h> -#include <sys/AtomicCount.h> -#include "EventChannel.h" - -namespace qpid { -namespace sys { - -/** - Dynamic thread pool serving an EventChannel. - - Threads run a loop { e = getEvent(); e->dispatch(); } - The size of the thread pool is automatically adjusted to optimal size. -*/ -class EventChannelThreads : - public qpid::SharedObject<EventChannelThreads>, - public sys::Monitor, private sys::Runnable -{ - public: - /** Create the thread pool and start initial threads. */ - static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel - ); - - ~EventChannelThreads(); - - /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } - - /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } - - /** - * Terminate all threads. - * - * Returns immediately, use join() to wait till all threads are - * shut down. - */ - void shutdown(); - - /** Wait for all threads to terminate. */ - void join(); - - private: - typedef std::vector<sys::Thread> Threads; - typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN - } State; - - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); - void addThread(); - - void run(); - bool keepRunning(); - void adjustThreads(); - - EventChannel::shared_ptr channel; - Threads workers; - sys::AtomicCount nWaiting; - State state; - Event terminate; -}; - - -}} - - -#endif /*!_sys_EventChannelThreads_h*/ diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp deleted file mode 100644 index a80a6c61f7..0000000000 --- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/Acceptor.h> -#include <Exception.h> - -namespace qpid { -namespace sys { - -namespace { -void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } -} - -class PosixAcceptor : public Acceptor { - public: - virtual int16_t getPort() const { fail(); return 0; } - virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } - virtual void shutdown() { fail(); } -}; - -// Define generic Acceptor::create() to return APRAcceptor. - Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool) -{ - return Acceptor::shared_ptr(new PosixAcceptor()); -} - -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -}} diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp deleted file mode 100644 index 5bd13742f6..0000000000 --- a/cpp/lib/common/sys/posix/Socket.cpp +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/socket.h> -#include <sys/errno.h> -#include <netinet/in.h> -#include <netdb.h> - -#include <boost/format.hpp> - -#include <QpidError.h> -#include <posix/check.h> -#include <sys/Socket.h> - -using namespace qpid::sys; - -Socket Socket::createTcp() -{ - int s = ::socket (PF_INET, SOCK_STREAM, 0); - if (s < 0) throw QPID_POSIX_ERROR(errno); - return s; -} - -Socket::Socket(int descriptor) : socket(descriptor) {} - -void Socket::setTimeout(Time interval) -{ - struct timeval tv; - tv.tv_sec = interval/TIME_SEC; - tv.tv_usec = (interval%TIME_SEC)/TIME_USEC; - setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); -} - -void Socket::connect(const std::string& host, int port) -{ - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons(port); - struct hostent* hp = gethostbyname ( host.c_str() ); - if (hp == 0) throw QPID_POSIX_ERROR(errno); - memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); - if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) - throw QPID_POSIX_ERROR(errno); -} - -void -Socket::close() -{ - if (socket == 0) return; - if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); - socket = 0; -} - -ssize_t -Socket::send(const void* data, size_t size) -{ - ssize_t sent = ::send(socket, data, size, 0); - if (sent < 0) { - if (errno == ECONNRESET) return SOCKET_EOF; - if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; - throw QPID_POSIX_ERROR(errno); - } - return sent; -} - -ssize_t -Socket::recv(void* data, size_t size) -{ - ssize_t received = ::recv(socket, data, size, 0); - if (received < 0) { - if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; - throw QPID_POSIX_ERROR(errno); - } - return received; -} - -int Socket::listen(int port, int backlog) -{ - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons(port); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) - throw QPID_POSIX_ERROR(errno); - if (::listen(socket, backlog) < 0) - throw QPID_POSIX_ERROR(errno); - - socklen_t namelen = sizeof(name); - if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) - throw QPID_POSIX_ERROR(errno); - - return ntohs(name.sin_port); -} - - -int Socket::fd() -{ - return socket; -} diff --git a/cpp/lib/common/sys/posix/Thread.cpp b/cpp/lib/common/sys/posix/Thread.cpp deleted file mode 100644 index f524799556..0000000000 --- a/cpp/lib/common/sys/posix/Thread.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/Thread.h> - -void* qpid::sys::Thread::runRunnable(void* p) -{ - static_cast<Runnable*>(p)->run(); - return 0; -} diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp deleted file mode 100644 index 408679caa8..0000000000 --- a/cpp/lib/common/sys/posix/check.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <cerrno> -#include "check.h" - -namespace qpid { -namespace sys { - -std::string -PosixError::getMessage(int errNo) -{ - char buf[512]; - return std::string(strerror_r(errNo, buf, sizeof(buf))); -} - -PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() - : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) -{ } - -}} diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h deleted file mode 100644 index 57b5a5757c..0000000000 --- a/cpp/lib/common/sys/posix/check.h +++ /dev/null @@ -1,62 +0,0 @@ -#ifndef _posix_check_h -#define _posix_check_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <cerrno> -#include <string> -#include <QpidError.h> - -namespace qpid { -namespace sys { - -/** - * Exception with message from errno. - */ -class PosixError : public qpid::QpidError -{ - public: - static std::string getMessage(int errNo); - - PosixError(int errNo, const qpid::SrcLine& location) throw(); - - ~PosixError() throw() {} - - int getErrNo() { return errNo; } - - Exception* clone() const throw() { return new PosixError(*this); } - - void throwSelf() const { throw *this; } - - private: - int errNo; -}; - -}} - -/** Create a PosixError for the current file/line and errno. */ -#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) - -/** Throw a posix error if errNo is non-zero */ -#define QPID_POSIX_THROW_IF(ERRNO) \ - if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) -#endif /*!_posix_check_h*/ |