diff options
author | Keith Wall <kwall@apache.org> | 2015-03-03 14:58:01 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-03-03 14:58:01 +0000 |
commit | 11a201863b9c989151cf117450785504a61df5ce (patch) | |
tree | ba96c870aa9ed34edcac0bd07fc0e0138f715bbd | |
parent | 9dc57fe738f366d875c2319dafdfa2c50ce2f20b (diff) | |
parent | 83120216de949c1cae3004c74475cc6c54cd61f1 (diff) | |
download | qpid-python-11a201863b9c989151cf117450785504a61df5ce.tar.gz |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663719 13f79535-47bb-0310-9956-ffa450edef68
62 files changed, 2059 insertions, 565 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt index aaa4203c1e..3b1890f976 100644 --- a/qpid/cpp/CMakeLists.txt +++ b/qpid/cpp/CMakeLists.txt @@ -23,7 +23,6 @@ set (CMAKE_BUILD_TYPE RelWithDebInfo CACHE string if (CMAKE_BUILD_TYPE MATCHES "Deb") set (has_debug_symbols " (has debug symbols)") endif (CMAKE_BUILD_TYPE MATCHES "Deb") -message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") project(qpid-cpp) @@ -242,5 +241,5 @@ add_subdirectory(examples) include (CPack) # Build type message again, last so it is visible at end of output. -message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") +message(STATUS "Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") diff --git a/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake b/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake deleted file mode 100644 index fb515cd149..0000000000 --- a/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake +++ /dev/null @@ -1,59 +0,0 @@ -# -# $Id $ -# -# Author(s): Anton Deguet -# Created on: 2011 -# -# (C) Copyright 2011 Johns Hopkins University (JHU), All Rights -# Reserved. -# -# --- begin cisst license - do not edit --- -# -# This software is provided "as is" under an open source license, with -# no warranty. The complete license can be found in license.txt and -# http://www.cisst.org/cisst/license.txt. -# -# --- end cisst license --- - -function (check_size_t_native_type VARIABLE) - # make sure we don't test over and over - if ("${VARIABLE}" MATCHES "^${VARIABLE}$") - message (STATUS "Checking to see if size_t is a native type") - set (SOURCE - "#include <vector> - char method(unsigned int p) { - return 'u'; - } - char method(unsigned long long int p) { - return 'l'; - } - char method(size_t p) { - return 's'; - } - int main(void) {}") - - file (WRITE - "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test_size_t.cpp" - "${SOURCE}\n") - - try_compile (${VARIABLE} - ${CMAKE_BINARY_DIR} - "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test_size_t.cpp" - OUTPUT_VARIABLE OUTPUT) - - # report using message and log files - if (${VARIABLE}) - message (STATUS "Checking to see if size_t is a native type - yes") - file (APPEND ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeOutput.log - "Determining if size_t is a native type passed with " - "the following output:\n${OUTPUT}\n\n") - else (${VARIABLE}) - message (STATUS "Checking to see if size_t is a native type - no") - file (APPEND ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeError.log - "Determining if size_t is a native type passed with " - "the following output:\n${OUTPUT}\n\n") - endif (${VARIABLE}) - - endif ("${VARIABLE}" MATCHES "^${VARIABLE}$") - -endfunction (check_size_t_native_type VARIABLE) diff --git a/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake b/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake new file mode 100755 index 0000000000..2ae4a89de9 --- /dev/null +++ b/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This module checks to see if size_t is a distinct type from the other +# integer types already set up in IntegerTypes.h. + +INCLUDE (CheckCXXSourceCompiles) + +FUNCTION (check_size_t_distinct VARIABLE) + # No need to check if we already did. If you want to re-run, clear it + # from the cache. + if ("${VARIABLE}" MATCHES "^${VARIABLE}$") + message (STATUS "Checking to see if size_t is a distinct type") + set (CMAKE_REQUIRED_QUIET ON) + set (CMAKE_REQUIRED_INCLUDES "${CMAKE_SOURCE_DIR}/include") + CHECK_CXX_SOURCE_COMPILES ( +" +#include <iostream> +#include \"qpid/sys/IntegerTypes.h\" +// Define functions that will fail to compile if size_t is the same as +// one of the int types defined in IntegerTypes.h +int foo(int16_t) { return 1; } +int foo(int32_t) { return 2; } +int foo(int64_t) { return 3; } +int foo(uint16_t) { return 4; } +int foo(uint32_t) { return 5; } +int foo(uint64_t) { return 6; } +int foo(size_t) { return 7; } +int main (int, char *[]) { + return 0; +} +" + ${VARIABLE}) + if (${VARIABLE}) + message (STATUS "Checking to see if size_t is a distinct type - yes") + else (${VARIABLE}) + message (STATUS "Checking to see if size_t is a distinct type - no") + endif (${VARIABLE}) + endif ("${VARIABLE}" MATCHES "^${VARIABLE}$") +ENDFUNCTION (check_size_t_distinct VARIABLE) diff --git a/qpid/cpp/include/qpid/types/Variant.h b/qpid/cpp/include/qpid/types/Variant.h index faba5fe9a4..843870e438 100644 --- a/qpid/cpp/include/qpid/types/Variant.h +++ b/qpid/cpp/include/qpid/types/Variant.h @@ -89,7 +89,9 @@ class QPID_TYPES_CLASS_EXTERN Variant QPID_TYPES_EXTERN Variant(float); QPID_TYPES_EXTERN Variant(double); QPID_TYPES_EXTERN Variant(const std::string&); + QPID_TYPES_EXTERN Variant(const std::string& value, const std::string& encoding); QPID_TYPES_EXTERN Variant(const char*); + QPID_TYPES_EXTERN Variant(const char* value, const char* encoding); QPID_TYPES_EXTERN Variant(const Map&); QPID_TYPES_EXTERN Variant(const List&); QPID_TYPES_EXTERN Variant(const Variant&); @@ -156,9 +158,10 @@ class QPID_TYPES_CLASS_EXTERN Variant QPID_TYPES_EXTERN Map& asMap(); QPID_TYPES_EXTERN const List& asList() const; QPID_TYPES_EXTERN List& asList(); + /** - * Unlike asString(), getString() will not do any conversions and - * will throw InvalidConversion if the type is not STRING. + * Unlike asString(), getString() will not do any conversions. + * @exception InvalidConversion if the type is not STRING. */ QPID_TYPES_EXTERN const std::string& getString() const; QPID_TYPES_EXTERN std::string& getString(); @@ -168,9 +171,45 @@ class QPID_TYPES_CLASS_EXTERN Variant QPID_TYPES_EXTERN bool isEqualTo(const Variant& a) const; + /** Reset value to VOID, does not reset the descriptors. */ QPID_TYPES_EXTERN void reset(); + + /** True if there is at least one descriptor associated with this variant. */ + QPID_TYPES_EXTERN bool isDescribed() const; + + /** Get the first descriptor associated with this variant. + * + * Normally there is at most one descriptor, when there are multiple + * descriptors use getDescriptors() + * + *@return The first descriptor or VOID if there is no descriptor. + *@see isDescribed, getDescriptors + */ + QPID_TYPES_EXTERN Variant getDescriptor() const; + + /** Set a single descriptor for this Variant. The descriptor must be a string or integer. */ + QPID_TYPES_EXTERN void setDescriptor(const Variant& descriptor); + + /** Return a modifiable list of descriptors for this Variant. + * Used in case where there are multiple descriptors, for a single descriptor use + * getDescriptor and setDescriptor. + */ + QPID_TYPES_EXTERN List& getDescriptors(); + + /** Return the list of descriptors for this Variant. + * Used in case where there are multiple descriptors, for a single descriptor use + * getDescriptor and setDescriptor. + */ + QPID_TYPES_EXTERN const List& getDescriptors() const; + + /** Create a described value */ + QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, const Variant& value); + + /** Create a described list, a common special case */ + QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, const List& value); + private: - VariantImpl* impl; + mutable VariantImpl* impl; }; #ifndef SWIG diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 8263534614..3de2cc9b06 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -42,7 +42,7 @@ include(CheckIncludeFiles) include(CheckIncludeFileCXX) include(CheckLibraryExists) include(CheckSymbolExists) -include(CheckSizeTNativeType) +include(CheckSizetDistinct) find_package(PkgConfig) find_package(Ruby) @@ -351,7 +351,7 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) mark_as_advanced(QPID_POLLER) endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) -check_size_t_native_type (QPID_SIZE_T_NATIVE) +check_size_t_distinct (QPID_SIZE_T_DISTINCT) option(BUILD_SASL "Build with Cyrus SASL support" ${SASL_FOUND}) if (BUILD_SASL) diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index b2ff10bd68..044267afbc 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -144,6 +144,8 @@ if (BUILD_AMQP) qpid/messaging/amqp/SessionHandle.cpp qpid/messaging/amqp/TcpTransport.h qpid/messaging/amqp/TcpTransport.cpp + qpid/messaging/amqp/Transaction.h + qpid/messaging/amqp/Transaction.cpp ) if (WIN32) diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake index 478b369eb6..ffde86ffa5 100644 --- a/qpid/cpp/src/config.h.cmake +++ b/qpid/cpp/src/config.h.cmake @@ -56,7 +56,7 @@ #cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H} #cmakedefine HAVE_LOG_AUTHPRIV #cmakedefine HAVE_LOG_FTP -#cmakedefine QPID_SIZE_T_NATIVE +#cmakedefine QPID_SIZE_T_DISTINCT #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION #cmakedefine HAVE_PROTON_EVENTS diff --git a/qpid/cpp/src/legacystore.cmake b/qpid/cpp/src/legacystore.cmake index e8deee9538..3cb1171b00 100644 --- a/qpid/cpp/src/legacystore.cmake +++ b/qpid/cpp/src/legacystore.cmake @@ -39,8 +39,8 @@ else (DEFINED legacystore_force) # # allow legacystore to be built # - message(STATUS "BerkeleyDB for C++ and libaio found, Legacystore support enabled") - set (legacystore_default ON) + message(STATUS "BerkeleyDB for C++ and libaio found, Legacystore support disabled by default (deprecated, use linearstore instead).") + set (legacystore_default OFF) # Disabled, deprecated. Use linearstore instead. else (HAVE_AIO AND HAVE_AIO_H) if (NOT HAVE_AIO) message(STATUS "Legacystore requires libaio which is absent.") diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake index d462450f72..e876ca712a 100644 --- a/qpid/cpp/src/linearstore.cmake +++ b/qpid/cpp/src/linearstore.cmake @@ -39,8 +39,8 @@ else (DEFINED linearstore_force) # # allow linearstore to be built # - message(STATUS "BerkeleyDB for C++ and libaio found, Linearstore support may be enabled (currently experimental and disabled by default)") - set (linearstore_default OFF) # Temporarily disabled + message(STATUS "BerkeleyDB for C++ and libaio found, Linearstore support enabled.") + set (linearstore_default ON) else (HAVE_AIO AND HAVE_AIO_H) if (NOT HAVE_AIO) message(STATUS "Linearstore requires libaio which is absent.") diff --git a/qpid/cpp/src/qpid/Options.cpp b/qpid/cpp/src/qpid/Options.cpp index 5ca91e6bd4..0021afc574 100644 --- a/qpid/cpp/src/qpid/Options.cpp +++ b/qpid/cpp/src/qpid/Options.cpp @@ -146,7 +146,7 @@ template QPID_COMMON_EXTERN po::value_semantic* create_value(int64_t& val, const template QPID_COMMON_EXTERN po::value_semantic* create_value(uint16_t& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(uint32_t& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(uint64_t& val, const std::string& arg); -#ifdef QPID_SIZE_T_NATIVE +#ifdef QPID_SIZE_T_DISTINCT template QPID_COMMON_EXTERN po::value_semantic* create_value(size_t& val, const std::string& arg); #endif template QPID_COMMON_EXTERN po::value_semantic* create_value(double& val, const std::string& arg); diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp index 21de32aaa3..1780a07f92 100644 --- a/qpid/cpp/src/qpid/Url.cpp +++ b/qpid/cpp/src/qpid/Url.cpp @@ -113,8 +113,10 @@ class UrlParser { const char* at = std::find(i, end, '@'); if (at == end) return false; const char* slash = std::find(i, at, '/'); - url.setUser(string(i, slash)); - const char* pass = (slash == at) ? slash : slash+1; + const char* colon = std::find(i, at, ':'); + const char* sep = std::min(slash, colon); + url.setUser(string(i, sep)); + const char* pass = (sep == at) ? sep : sep+1; url.setPass(string(pass, at)); i = at+1; return true; diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp index 7e433bd26e..ad5b0ec84c 100644 --- a/qpid/cpp/src/qpid/amqp/CharSequence.cpp +++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp @@ -35,7 +35,7 @@ CharSequence::operator bool() const } std::string CharSequence::str() const { - return std::string(data, size); + return (data && size) ? std::string(data, size) : std::string(); } CharSequence CharSequence::create() diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.cpp b/qpid/cpp/src/qpid/amqp/Descriptor.cpp index 9e33294edd..43d388ee76 100644 --- a/qpid/cpp/src/qpid/amqp/Descriptor.cpp +++ b/qpid/cpp/src/qpid/amqp/Descriptor.cpp @@ -19,11 +19,17 @@ * */ #include "Descriptor.h" +#include "descriptors.h" +#include <qpid/framing/reply_exceptions.h> +#include <map> namespace qpid { namespace amqp { + Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; } + Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { value.symbol = symbol; } + bool Descriptor::match(const std::string& symbol, uint64_t code) const { switch (type) { @@ -58,20 +64,85 @@ Descriptor* Descriptor::nest(const Descriptor& d) return nested.get(); } -std::ostream& operator<<(std::ostream& os, const Descriptor& d) -{ - switch (d.type) { - case Descriptor::SYMBOLIC: - if (d.value.symbol.data && d.value.symbol.size) os << std::string(d.value.symbol.data, d.value.symbol.size); - else os << "null"; - break; - case Descriptor::NUMERIC: - os << "0x" << std::hex << d.value.code; - break; +namespace { + +class DescriptorMap { + typedef std::map<uint64_t, std::string> SymbolMap; + typedef std::map<std::string, uint64_t> CodeMap; + + SymbolMap symbols; + CodeMap codes; + + public: + DescriptorMap() { + symbols[message::HEADER_CODE] = message::HEADER_SYMBOL; + symbols[message::DELIVERY_ANNOTATIONS_CODE] = message::DELIVERY_ANNOTATIONS_SYMBOL; + symbols[message::MESSAGE_ANNOTATIONS_CODE] = message::MESSAGE_ANNOTATIONS_SYMBOL; + symbols[message::PROPERTIES_CODE] = message::PROPERTIES_SYMBOL; + symbols[message::APPLICATION_PROPERTIES_CODE] = message::APPLICATION_PROPERTIES_SYMBOL; + symbols[message::DATA_CODE] = message::DATA_SYMBOL; + symbols[message::AMQP_SEQUENCE_CODE] = message::AMQP_SEQUENCE_SYMBOL; + symbols[message::AMQP_VALUE_CODE] = message::AMQP_VALUE_SYMBOL; + symbols[message::FOOTER_CODE] = message::FOOTER_SYMBOL; + symbols[message::ACCEPTED_CODE] = message::ACCEPTED_SYMBOL; + symbols[sasl::SASL_MECHANISMS_CODE] = sasl::SASL_MECHANISMS_SYMBOL; + symbols[sasl::SASL_INIT_CODE] = sasl::SASL_INIT_SYMBOL; + symbols[sasl::SASL_CHALLENGE_CODE] = sasl::SASL_CHALLENGE_SYMBOL; + symbols[sasl::SASL_RESPONSE_CODE] = sasl::SASL_RESPONSE_SYMBOL; + symbols[sasl::SASL_OUTCOME_CODE] = sasl::SASL_OUTCOME_SYMBOL; + symbols[filters::LEGACY_DIRECT_FILTER_CODE] = filters::LEGACY_DIRECT_FILTER_SYMBOL; + symbols[filters::LEGACY_TOPIC_FILTER_CODE] = filters::LEGACY_TOPIC_FILTER_SYMBOL; + symbols[filters::LEGACY_HEADERS_FILTER_CODE] = filters::LEGACY_HEADERS_FILTER_SYMBOL; + symbols[filters::SELECTOR_FILTER_CODE] = filters::SELECTOR_FILTER_SYMBOL; + symbols[filters::XQUERY_FILTER_CODE] = filters::XQUERY_FILTER_SYMBOL; + symbols[lifetime_policy::DELETE_ON_CLOSE_CODE] = lifetime_policy::DELETE_ON_CLOSE_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_LINKS_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; + symbols[transaction::DECLARE_CODE] = transaction::DECLARE_SYMBOL; + symbols[transaction::DISCHARGE_CODE] = transaction::DISCHARGE_SYMBOL; + symbols[transaction::DECLARED_CODE] = transaction::DECLARED_SYMBOL; + symbols[transaction::TRANSACTIONAL_STATE_CODE] = transaction::TRANSACTIONAL_STATE_SYMBOL; + symbols[0] = "unknown-descriptor"; + + for (SymbolMap::const_iterator i = symbols.begin(); i != symbols.end(); ++i) + codes[i->second] = i->first; + } + + std::string operator[](uint64_t code) const { + SymbolMap::const_iterator i = symbols.find(code); + return (i == symbols.end()) ? "unknown-descriptor" : i->second; } - if (d.nested.get()) { - os << " ->(" << *d.nested << ")"; + + uint64_t operator[](const std::string& symbol) const { + CodeMap::const_iterator i = codes.find(symbol); + return (i == codes.end()) ? 0 : i->second; + } +}; + +DescriptorMap DESCRIPTOR_MAP; +} + +std::string Descriptor::symbol() const { + switch (type) { + case Descriptor::NUMERIC: return DESCRIPTOR_MAP[value.code]; + case Descriptor::SYMBOLIC: return value.symbol.str(); + } + assert(0); + return std::string(); +} + +uint64_t Descriptor::code() const { + switch (type) { + case Descriptor::NUMERIC: return value.code; + case Descriptor::SYMBOLIC: return DESCRIPTOR_MAP[value.symbol.str()]; } - return os; + assert(0); + return 0; } + +std::ostream& operator<<(std::ostream& os, const Descriptor& d) { + return os << d.symbol() << "(" << "0x" << std::hex << d.code() << ")"; +} + }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.h b/qpid/cpp/src/qpid/amqp/Descriptor.h index 6b0cb80e87..3726114769 100644 --- a/qpid/cpp/src/qpid/amqp/Descriptor.h +++ b/qpid/cpp/src/qpid/amqp/Descriptor.h @@ -49,6 +49,8 @@ struct Descriptor QPID_COMMON_EXTERN bool match(const std::string&, uint64_t) const; QPID_COMMON_EXTERN size_t getSize() const; QPID_COMMON_EXTERN Descriptor* nest(const Descriptor& d); + QPID_COMMON_EXTERN std::string symbol() const; + QPID_COMMON_EXTERN uint64_t code() const; }; QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& os, const Descriptor& d); diff --git a/qpid/cpp/src/qpid/amqp/Encoder.cpp b/qpid/cpp/src/qpid/amqp/Encoder.cpp index 0760fc166d..86b59fb1a2 100644 --- a/qpid/cpp/src/qpid/amqp/Encoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Encoder.cpp @@ -31,10 +31,17 @@ #include <string.h> using namespace qpid::types::encodings; +using qpid::types::Variant; namespace qpid { namespace amqp { +Encoder::Overflow::Overflow() : Exception("Buffer overflow in encoder!") {} + +Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0), grow(false) {} + +Encoder::Encoder() : data(0), size(0), position(0), grow(true) {} + namespace { template <typename T> size_t encode(char* data, T i); template <> size_t encode<uint8_t>(char* data, uint8_t i) @@ -406,6 +413,18 @@ void Encoder::writeList(const std::list<qpid::types::Variant>& value, const Desc void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* d) { + if (d) { + writeDescriptor(*d); // Write this descriptor before any in the value. + d = 0; + } + // Write any descriptors attached to the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + if (i->getType() == types::VAR_STRING) + writeDescriptor(Descriptor(CharSequence::create(i->asString()))); + else + writeDescriptor(Descriptor(i->asUint64())); + } switch (value.getType()) { case qpid::types::VAR_VOID: writeNull(d); @@ -477,18 +496,28 @@ void Encoder::writeDescriptor(const Descriptor& d) break; } } + void Encoder::check(size_t s) { if (position + s > size) { - QPID_LOG(notice, "Buffer overflow for write of size " << s << " to buffer of size " << size << " at position " << position); - assert(false); - throw qpid::Exception("Buffer overflow in encoder!"); + if (grow) { + buffer.resize(buffer.size() + s); + data = const_cast<char*>(buffer.data()); + size = buffer.size(); + } + else { + QPID_LOG(notice, "Buffer overflow for write of size " << s + << " to buffer of size " << size << " at position " << position); + assert(false); + throw Overflow(); + } } } -Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0) {} + size_t Encoder::getPosition() { return position; } size_t Encoder::getSize() const { return size; } char* Encoder::getData() { return data + position; } +std::string Encoder::getBuffer() { return buffer; } void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; } }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Encoder.h b/qpid/cpp/src/qpid/amqp/Encoder.h index 4f7c1d1489..8729f29b94 100644 --- a/qpid/cpp/src/qpid/amqp/Encoder.h +++ b/qpid/cpp/src/qpid/amqp/Encoder.h @@ -23,6 +23,7 @@ */ #include "qpid/sys/IntegerTypes.h" #include "qpid/amqp/Constructor.h" +#include "qpid/Exception.h" #include <list> #include <map> #include <stddef.h> @@ -43,6 +44,18 @@ struct Descriptor; class Encoder { public: + struct Overflow : public Exception { Overflow(); }; + + /** Create an encoder that writes into the buffer at data up to size bytes. + * Write operations throw Overflow if encoding exceeds size bytes. + */ + QPID_COMMON_EXTERN Encoder(char* data, size_t size); + + /** Create an encoder that manages its own buffer. Buffer grows to accomodate + * all encoded data. Call getBuffer() to get the buffer. + */ + QPID_COMMON_EXTERN Encoder(); + void writeCode(uint8_t); void write(bool); @@ -100,19 +113,27 @@ class Encoder QPID_COMMON_EXTERN void writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d=0, bool large=true); void writeDescriptor(const Descriptor&); - QPID_COMMON_EXTERN Encoder(char* data, size_t size); QPID_COMMON_EXTERN size_t getPosition(); void resetPosition(size_t p); char* skip(size_t); void writeBytes(const char* bytes, size_t count); virtual ~Encoder() {} + + /** Return the total size of the buffer. */ size_t getSize() const; - protected: + + /** Return the growable buffer. */ + std::string getBuffer(); + + /** Return the unused portion of the buffer. */ char* getData(); + private: char* data; size_t size; size_t position; + bool grow; + std::string buffer; void write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); void write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h index a9ee12644a..29c626edc2 100644 --- a/qpid/cpp/src/qpid/amqp/descriptors.h +++ b/qpid/cpp/src/qpid/amqp/descriptors.h @@ -26,6 +26,9 @@ namespace qpid { namespace amqp { +// NOTE: If you add descriptor symbols and codes here, you must also update the DescriptorMap +// constructor in Descriptor.cpp. + namespace message { const std::string HEADER_SYMBOL("amqp:header:list"); const std::string PROPERTIES_SYMBOL("amqp:properties:list"); @@ -36,6 +39,7 @@ const std::string AMQP_SEQUENCE_SYMBOL("amqp:amqp-sequence:list"); const std::string AMQP_VALUE_SYMBOL("amqp:amqp-value:*"); const std::string DATA_SYMBOL("amqp:data:binary"); const std::string FOOTER_SYMBOL("amqp:footer:map"); +const std::string ACCEPTED_SYMBOL("amqp:accepted:list"); const uint64_t HEADER_CODE(0x70); const uint64_t DELIVERY_ANNOTATIONS_CODE(0x71); @@ -46,6 +50,7 @@ const uint64_t DATA_CODE(0x75); const uint64_t AMQP_SEQUENCE_CODE(0x76); const uint64_t AMQP_VALUE_CODE(0x77); const uint64_t FOOTER_CODE(0x78); +const uint64_t ACCEPTED_CODE(0x24); const Descriptor HEADER(HEADER_CODE); const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b1f7d0524b..4dd6455104 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -297,6 +297,8 @@ void Queue::deliverTo(Message msg, TxBuffer* txn) if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); + QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name + << " enlisted in " << txn); } else { if (enqueue(0, msg)) { push(msg); diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 8d6516edee..e315e55843 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -165,8 +165,8 @@ Connection::~Connection() { if (ticker) ticker->cancel(); getBroker().getConnectionObservers().closed(*this); - pn_transport_free(transport); pn_connection_free(connection); + pn_transport_free(transport); #ifdef HAVE_PROTON_EVENTS pn_collector_free(collector); #endif diff --git a/qpid/cpp/src/qpid/broker/amqp/Exception.h b/qpid/cpp/src/qpid/broker/amqp/Exception.h index c2fe470e55..a129dffe1f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Exception.h +++ b/qpid/cpp/src/qpid/broker/amqp/Exception.h @@ -22,6 +22,7 @@ * */ #include <string> +#include <qpid/Exception.h> namespace qpid { namespace broker { @@ -29,7 +30,7 @@ namespace amqp { /** * Exception to signal various AMQP 1.0 defined conditions */ -class Exception : public std::exception +class Exception : public qpid::Exception { public: Exception(const std::string& name, const std::string& description); diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index d4f73fc511..3986818846 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -100,6 +100,7 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session)); return copy; } + private: pn_delivery_t* delivery; boost::shared_ptr<Session> session; @@ -146,8 +147,8 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> { qpid::broker::Message message(received, received); userid.verify(message.getUserId()); - handle(message, session.getTransaction(delivery)); received->begin(); + handle(message, session.getTransaction(delivery)); Transfer t(delivery, sessionPtr); received->end(t); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 0136d5a0ed..f2949c5879 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -28,6 +28,7 @@ #include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" @@ -90,13 +91,13 @@ bool OutgoingFromQueue::doWork() return true; } else { pn_link_drained(link); - QPID_LOG(debug, "No message available on " << queue->getName()); + QPID_LOG(trace, "No message available on " << queue->getName()); } } catch (const qpid::framing::ResourceDeletedException& e) { throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what()); } } else { - QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); + QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); } return false; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 538883f29a..3b65e6a64d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -61,6 +61,8 @@ namespace qpid { namespace broker { namespace amqp { +using namespace qpid::amqp::transaction; + namespace { pn_bytes_t convert(const std::string& s) { @@ -209,6 +211,7 @@ class IncomingToCoordinator : public DecodingIncoming public: IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent) : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {} + ~IncomingToCoordinator() { session.abort(); } void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*); void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {} @@ -218,7 +221,9 @@ class IncomingToCoordinator : public DecodingIncoming Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()), - detachRequested(), txnId((boost::format("%1%") % s).str()) {} + detachRequested(), + tx(*this) +{} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -383,6 +388,7 @@ void Session::attach(pn_link_t* link) //i.e a subscription std::string name; if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { + pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No source specified!"); } else if (pn_terminus_is_dynamic(source)) { name = generateName(link); @@ -399,6 +405,7 @@ void Session::attach(pn_link_t* link) pn_terminus_t* target = pn_link_remote_target(link); std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { + pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); } else if (pn_terminus_get_type(target) == PN_COORDINATOR) { QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this); @@ -634,11 +641,12 @@ void Session::readable(pn_link_t* link, pn_delivery_t* delivery) if (target->second->haveWork()) out.activateOutput(); } } + void Session::writable(pn_link_t* link, pn_delivery_t* delivery) { OutgoingLinks::iterator sender = outgoing.find(link); if (sender == outgoing.end()) { - QPID_LOG(error, "Delivery returned for unknown link"); + QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link)); } else { sender->second->handle(delivery); } @@ -647,7 +655,7 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery) bool Session::dispatch() { bool output(false); - if (commitPending.boolCompareAndSwap(true, false)) { + if (tx.commitPending.boolCompareAndSwap(true, false)) { committed(true); } for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { @@ -735,7 +743,7 @@ void Session::detachedByManagement() TxBuffer* Session::getTransaction(const std::string& id) { - return (txn.get() && id == txnId) ? txn.get() : 0; + return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0; } TxBuffer* Session::getTransaction(pn_delivery_t* delivery) @@ -746,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_delivery_t* delivery) std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery) { std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0); - if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) { + if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) { pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery)); - if (data && pn_data_next(data)) { - size_t count = pn_data_get_list(data); - if (count > 0) { + pn_data_rewind(data); + size_t count = 0; + if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) { + pn_data_enter(data); + pn_data_next(data); + std::string id = convert(pn_data_get_binary(data)); + result.first = getTransaction(id); + if (!result.first) { + QPID_LOG(error, "Transaction not found for id: " << id); + } + if (count > 1 && pn_data_next(data)) { pn_data_enter(data); pn_data_next(data); - std::string id = convert(pn_data_get_binary(data)); - result.first = getTransaction(id); - if (!result.first) { - QPID_LOG(error, "Transaction not found for id: " << id); - } - if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) { - pn_data_enter(data); - pn_data_next(data); - result.second = pn_data_get_ulong(data); - } - pn_data_exit(data); + result.second = pn_data_get_ulong(data); } - } else { - QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } + else + QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } return result; } std::string Session::declare() { - if (txn.get()) { + if (tx.buffer.get()) { //not sure what the error code should be; none in spec really fit well. - throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time"); + throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, + "Session only supports one transaction active at a time"); } - txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); - connection.getBroker().getBrokerObservers().startTx(txn); + tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); + connection.getBroker().getBrokerObservers().startTx(tx.buffer); txStarted(); - return txnId; + return tx.id; } namespace { @@ -795,32 +802,41 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session)); return copy; } + private: boost::shared_ptr<Session> session; }; } -void Session::discharge(const std::string& id, bool failed) +void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery) { - if (!txn.get() || id != txnId) { - throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id"); + QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit") + << " transaction " << id); + if (!tx.buffer.get() || id != tx.id) { + throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, + Msg() << "Cannot discharge transaction " << id + << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id : + Msg() << ", no current transaction")); } + tx.discharge = delivery; if (failed) { abort(); } else { - txn->begin(); - txn->startCommit(&connection.getBroker().getStore()); + tx.buffer->begin(); + tx.buffer->startCommit(&connection.getBroker().getStore()); AsyncCommit callback(shared_from_this()); - txn->end(callback); + tx.buffer->end(callback); } } void Session::abort() { - if (txn) { - txn->rollback(); + if (tx.buffer) { + tx.dischargeComplete(); + tx.buffer->rollback(); txAborted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " rolled back"); } } @@ -828,16 +844,18 @@ void Session::committed(bool sync) { if (sync) { //this is on IO thread - if (txn.get()) { - txn->endCommit(&connection.getBroker().getStore()); + tx.dischargeComplete(); + if (tx.buffer.get()) { + tx.buffer->endCommit(&connection.getBroker().getStore()); txCommitted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " comitted"); } else { throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit"); } } else { //this is not on IO thread, need to delay processing until on IO thread - if (commitPending.boolCompareAndSwap(false, true)) { + if (tx.commitPending.boolCompareAndSwap(false, true)) { qpid::sys::Mutex::ScopedLock l(lock); if (!deleted) { out.activateOutput(); @@ -878,7 +896,7 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes { if (message && message->isTypedBody()) { QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody()); - if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) { + if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) { std::string id = session.declare(); //encode the txn id in a 'declared' list on the disposition pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery)); @@ -887,22 +905,38 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes pn_data_put_binary(data, convert(id)); pn_data_exit(data); pn_data_exit(data); - pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE); + pn_delivery_update(delivery, DECLARED_CODE); pn_delivery_settle(delivery); session.incomingMessageAccepted(); - } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) { + QPID_LOG(debug, "Coordinator declared transaction " << id); + } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) { if (message->getTypedBody().getType() == qpid::types::VAR_LIST) { qpid::types::Variant::List args = message->getTypedBody().asList(); qpid::types::Variant::List::const_iterator i = args.begin(); if (i != args.end()) { std::string id = *i; bool failed = ++i != args.end() ? i->asBool() : false; - session.discharge(id, failed); - DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of + session.discharge(id, failed, delivery); } + + } else { + throw framing::IllegalArgumentException( + Msg() << "Coordinator unknown message: @" << + message->getBodyDescriptor() << " " << message->getTypedBody()); } } } } +Session::Transaction::Transaction(Session& s) : + session(s), id((boost::format("%1%") % &s).str()), discharge(0) {} + +// Called in IO thread to signal completion of dischage by settling discharge message. +void Session::Transaction::dischargeComplete() { + if (buffer.get() && discharge) { + session.accepted(discharge, false); // Queue up accept and activate output. + discharge = 0; + } +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 591af1175f..ea3fb82beb 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -91,7 +91,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*); //transaction coordination: std::string declare(); - void discharge(const std::string& id, bool failed); + void discharge(const std::string& id, bool failed, pn_delivery_t*); void abort(); protected: void detachedByManagement(); @@ -109,9 +109,18 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::set< boost::shared_ptr<Queue> > exclusiveQueues; Authorise authorise; bool detachRequested; - boost::intrusive_ptr<TxBuffer> txn; - std::string txnId; - qpid::sys::AtomicValue<bool> commitPending; + + struct Transaction { + Transaction(Session&); + void dischargeComplete(); + + Session& session; + boost::intrusive_ptr<TxBuffer> buffer; + std::string id; + qpid::sys::AtomicValue<bool> commitPending; + pn_delivery_t* discharge; + }; + Transaction tx; struct ResolvedNode { diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 3ee3f1cd40..77d43f191d 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -205,7 +205,7 @@ void ConnectionHandler::fail(const std::string& message) { errorCode = CLOSE_CODE_FRAMING_ERROR; errorText = message; - QPID_LOG(warning, message); + QPID_LOG(debug, message); setState(FAILED); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 7f19ca7ec0..2106e21686 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -510,9 +510,9 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) requested.erase(j->first); } } else if (key == AUTO_DELETE) { - PnData(data).read(v); + PnData(data).get(v); isAutoDeleted = v.asBool(); - } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) { + } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) { requested.erase(j->first); } } @@ -646,7 +646,7 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod } else { pn_data_put_ulong(filter, i->descriptorCode); } - PnData(filter).write(i->value); + PnData(filter).put(i->value); pn_data_exit(filter); } pn_data_exit(filter); @@ -733,7 +733,7 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus) putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); } else { pn_data_put_symbol(data, convert(i->first)); - PnData(data).write(i->second); + PnData(data).put(i->second); } } pn_data_exit(data); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index a0b16c2b4c..d4a7b60e3c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -25,8 +25,11 @@ #include "Sasl.h" #include "SenderContext.h" #include "SessionContext.h" +#include "Transaction.h" #include "Transport.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/messaging/exceptions.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" @@ -43,6 +46,7 @@ #include "qpid/sys/urlAdd.h" #include "config.h" #include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> #include <vector> extern "C" { #include <proton/engine.h> @@ -151,20 +155,23 @@ ConnectionContext::~ConnectionContext() if (ticker) ticker->cancel(); close(); sessions.clear(); - pn_transport_free(engine); pn_connection_free(connection); + pn_transport_free(engine); } bool ConnectionContext::isOpen() const { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - //wait for outstanding sends to settle + sys::Monitor::ScopedLock l(lock); + syncLH(ssn, l); +} + +void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) { while (!ssn->settled()) { QPID_LOG(debug, "Waiting for sends to settle on sync()"); wait(ssn);//wait until message has been confirmed @@ -175,18 +182,13 @@ void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { //explicitly release messages that have yet to be fetched for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { drain_and_release_messages(ssn, i->second); } - //wait for outstanding sends to settle - while (!ssn->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(ssn);//wait until message has been confirmed - wakeupDriver(); - } + syncLH(ssn, l); } if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { @@ -199,17 +201,11 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) void ConnectionContext::close() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != CONNECTED) return; if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { - //wait for outstanding sends to settle - while (!i->second->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(i->second);//wait until message has been confirmed - } - - + syncLH(i->second, l); if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { pn_session_close(i->second->session); } @@ -246,7 +242,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar */ qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn, lnk); if (!lnk->capacity) { pn_link_flow(lnk->receiver, 1); @@ -257,10 +253,10 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar return true; } else { { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); pn_link_drain(lnk->receiver, 0); wakeupDriver(); - while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); wait(ssn, lnk); } @@ -269,7 +265,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar } } if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); wakeupDriver(); @@ -296,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared { qpid::sys::AbsTime until(convert(timeout)); while (true) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn, lnk); pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); @@ -320,6 +316,9 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared haveOutput = true; } } + // Automatically ack messages if we are in a transaction. + if (ssn->transaction) + acknowledgeLH(ssn, &message, false, l); return true; } else if (until > qpid::sys::now()) { waitUntil(ssn, lnk, until); @@ -334,7 +333,7 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared { qpid::sys::AbsTime until(convert(timeout)); while (true) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn); boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); if (r) { @@ -347,9 +346,13 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared } } -void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { + sys::Monitor::ScopedLock l(lock); + acknowledgeLH(ssn, message, cumulative, l); +} + +void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); if (message) { ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); @@ -361,7 +364,7 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid: void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn); ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); wakeupDriver(); @@ -369,7 +372,7 @@ void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messag void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { lnk->close(); } @@ -401,7 +404,7 @@ void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionCont void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); drain_and_release_messages(ssn, lnk); if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { lnk->close(); @@ -415,7 +418,7 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->sender); checkClosed(ssn, lnk); @@ -425,7 +428,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->receiver, lnk->capacity); checkClosed(ssn, lnk); @@ -445,11 +448,26 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* } } -void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +void ConnectionContext::send( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery) +{ + sys::Monitor::ScopedLock l(lock); + sendLH(ssn, snd, message, sync, delivery, l); +} + +void ConnectionContext::sendLH( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery, + sys::Monitor::ScopedLock&) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); - SenderContext::Delivery* delivery(0); while (pn_transport_pending(engine) > 65536) { QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); notifyOnWrite = true; @@ -457,17 +475,17 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share wait(ssn, snd); notifyOnWrite = false; } - while (!snd->send(message, &delivery)) { + while (!snd->send(message, delivery)) { QPID_LOG(debug, "Waiting for capacity..."); wait(ssn, snd);//wait for capacity } wakeupDriver(); - if (sync && delivery) { - while (!delivery->delivered()) { + if (sync && *delivery) { + while (!(*delivery)->delivered()) { QPID_LOG(debug, "Waiting for confirmation..."); wait(ssn, snd);//wait until message has been confirmed } - if (delivery->rejected()) { + if ((*delivery)->rejected()) { throw MessageRejected("Message was rejected by peer"); } @@ -476,46 +494,46 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); sender->setCapacity(capacity); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return sender->getCapacity(); } uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return sender->getUnsettled(); } void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); receiver->setCapacity(capacity); pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); wakeupDriver(); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getCapacity(); } uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getAvailable(); } uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getUnsettled(); } void ConnectionContext::activateOutput() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state == CONNECTED) wakeupDriver(); } /** @@ -543,8 +561,8 @@ pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; void ConnectionContext::reset() { - pn_transport_free(engine); pn_connection_free(connection); + pn_transport_free(engine); engine = pn_transport(); connection = pn_connection(); @@ -555,7 +573,7 @@ void ConnectionContext::reset() } } -void ConnectionContext::check() { +bool ConnectionContext::check() { if (checkDisconnected()) { if (ConnectionOptions::reconnect) { QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); @@ -564,7 +582,9 @@ void ConnectionContext::check() { } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } + return true; } + return false; } bool ConnectionContext::checkDisconnected() { @@ -588,7 +608,7 @@ bool ConnectionContext::checkDisconnected() { void ConnectionContext::wait() { - check(); + if (check()) return; // Reconnected, may need to re-test condition. lock.wait(); check(); } @@ -630,6 +650,7 @@ void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost:: void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) { check(); + ssn->error.raise(); if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { pn_condition_t* error = pn_session_remote_condition(ssn->session); std::stringstream text; @@ -690,6 +711,7 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) { + if (s->error) return; pn_session_open(s->session); wakeupDriver(); while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { @@ -718,26 +740,31 @@ void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported"); + boost::shared_ptr<SessionContext> session; std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; - SessionMap::const_iterator i = sessions.find(name); - if (i == sessions.end()) { - boost::shared_ptr<SessionContext> s(new SessionContext(connection)); - s->setName(name); - s->session = pn_session(connection); - pn_session_open(s->session); - wakeupDriver(); - while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { - wait(); + { + sys::Monitor::ScopedLock l(lock); + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + session = boost::shared_ptr<SessionContext>(new SessionContext(connection)); + session->setName(name); + pn_session_open(session->session); + wakeupDriver(); + sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait() + while (pn_session_state(session->session) & PN_REMOTE_UNINIT) { + wait(); + } + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); } - sessions[name] = s; - return s; - } else { - throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); - } + } + if (transactional) { // Outside of lock + startTxSession(session); + } + return session; } + boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const { SessionMap::const_iterator i = sessions.find(name); @@ -760,7 +787,7 @@ std::string ConnectionContext::getAuthenticatedUsername() std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); QPID_LOG(trace, id << " decode(" << size << ")"); if (readHeader) { size_t decoded = readProtocolHeader(buffer, size); @@ -805,7 +832,7 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) } std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); QPID_LOG(trace, id << " encode(" << size << ")"); if (writeHeader) { size_t encoded = writeProtocolHeader(buffer, size); @@ -843,19 +870,19 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) } bool ConnectionContext::canEncodePlain() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); return haveOutput && state == CONNECTED; } void ConnectionContext::closed() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); state = DISCONNECTED; lock.notifyAll(); } void ConnectionContext::opened() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); state = CONNECTED; lock.notifyAll(); } @@ -921,7 +948,7 @@ const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions() std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); size_t decoded = 0; try { if (sasl.get() && !sasl->authenticated()) { @@ -939,7 +966,7 @@ std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) } std::size_t ConnectionContext::encode(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); size_t encoded = 0; try { if (sasl.get() && sasl->canEncode()) { @@ -957,7 +984,7 @@ std::size_t ConnectionContext::encode(char* buffer, std::size_t size) } bool ConnectionContext::canEncode() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (sasl.get()) { try { if (sasl->canEncode()) return true; @@ -978,26 +1005,21 @@ const std::string CLIENT_PPID("qpid.client_ppid"); } void ConnectionContext::setProperties() { - pn_data_t* data = pn_connection_properties(connection); - pn_data_put_map(data); - pn_data_enter(data); - - pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME)); - std::string processName = sys::SystemInfo::getProcessName(); - pn_data_put_string(data, PnData::str(processName)); - - pn_data_put_symbol(data, PnData::str(CLIENT_PID)); - pn_data_put_int(data, sys::SystemInfo::getProcessId()); - - pn_data_put_symbol(data, PnData::str(CLIENT_PPID)); - pn_data_put_int(data, sys::SystemInfo::getParentProcessId()); - + PnData data(pn_connection_properties(connection)); + pn_data_put_map(data.data); + pn_data_enter(data.data); + data.putSymbol(CLIENT_PROCESS_NAME); + data.putSymbol(sys::SystemInfo::getProcessName()); + data.putSymbol(CLIENT_PID); + data.put(int32_t(sys::SystemInfo::getProcessId())); + data.putSymbol(CLIENT_PPID); + data.put(int32_t(sys::SystemInfo::getParentProcessId())); for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - pn_data_put_symbol(data, PnData::str(i->first)); - PnData(data).write(i->second); + data.putSymbol(i->first); + data.put(i->second); } - pn_data_exit(data); + pn_data_exit(data.data); } const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() @@ -1007,7 +1029,7 @@ const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettin void ConnectionContext::open() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); QPID_LOG(info, "Starting connection to " << fullUrl); @@ -1049,7 +1071,7 @@ void ConnectionContext::autoconnect() void ConnectionContext::reconnect(const Url& url) { QPID_LOG(notice, "Reconnecting to " << url); - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); reset(); @@ -1137,7 +1159,7 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { std::string ConnectionContext::getUrl() const { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return (state == CONNECTED) ? currentUrl.str() : std::string(); } @@ -1209,6 +1231,40 @@ bool ConnectionContext::CodecAdapter::canEncode() return context.canEncodePlain(); } +void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) { + try { + QPID_LOG(debug, id << " attaching transaction for " << session->getName()); + boost::shared_ptr<Transaction> tx(new Transaction(session->session)); + session->transaction = tx; + attach(session, tx); + tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session); + } catch (const Exception& e) { + throw TransactionError(Msg() << "Cannot start transaction: " << e.what()); + } +} + +void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) { + { + sys::Monitor::ScopedLock l(lock); + checkClosed(session); + if (!session->transaction) + throw TransactionError("No Transaction"); + Transaction::SendFunction sendFn = boost::bind( + &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l)); + syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received. + session->transaction->discharge(sendFn, session, fail); + session->transaction->declare(sendFn, session); + } +} + +void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) { + discharge(session, false); +} + +void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) { + discharge(session, true); +} + // setup the transport and connection objects: void ConnectionContext::configureConnection() diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 80da9dff10..b687219624 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -34,6 +34,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/types/Variant.h" #include "qpid/messaging/amqp/TransportContext.h" +#include "SenderContext.h" struct pn_connection_t; struct pn_link_t; @@ -59,7 +60,6 @@ class DriverImpl; class ReceiverContext; class Sasl; class SessionContext; -class SenderContext; class Transport; /** @@ -82,10 +82,20 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); - void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); + + // Link operations + void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery); + bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + + // Session operations void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + void commit(boost::shared_ptr<SessionContext> ssn); + void rollback(boost::shared_ptr<SessionContext> ssn); + void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject); void sync(boost::shared_ptr<SessionContext> ssn); boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout); @@ -93,10 +103,10 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); + // Link operations void setCapacity(boost::shared_ptr<SenderContext>, uint32_t); uint32_t getCapacity(boost::shared_ptr<SenderContext>); uint32_t getUnsettled(boost::shared_ptr<SenderContext>); - void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t); uint32_t getCapacity(boost::shared_ptr<ReceiverContext>); uint32_t getAvailable(boost::shared_ptr<ReceiverContext>); @@ -159,9 +169,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag bool notifyOnWrite; boost::intrusive_ptr<qpid::sys::TimerTask> ticker; - void check(); + bool check(); bool checkDisconnected(); void waitNoReconnect(); + + // NOTE: All wait*() functions must be called in a loop that checks for the + // waited condition with the lock held. void wait(); void waitUntil(qpid::sys::AbsTime until); void wait(boost::shared_ptr<SessionContext>); @@ -170,10 +183,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until); void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until); + void checkClosed(boost::shared_ptr<SessionContext>); void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); + void wakeupDriver(); void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0); void autoconnect(); @@ -194,8 +209,18 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::string getError(); bool useSasl(); void setProperties(); + void configureConnection(); bool checkTransportError(std::string&); + + void discharge(boost::shared_ptr<SessionContext>, bool fail); + void startTxSession(boost::shared_ptr<SessionContext>); + + void syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&); + void sendLH(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&); + void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp index 5c57c5b0a3..3309d1a683 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp @@ -20,34 +20,53 @@ */ #include "PnData.h" #include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" namespace qpid { namespace messaging { namespace amqp { using types::Variant; +using namespace types::encodings; -void PnData::write(const Variant::Map& map) +// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder. +// Collapse them all into a single proton-based codec. + +void PnData::put(const Variant::Map& map) { pn_data_put_map(data); pn_data_enter(data); for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { - pn_data_put_string(data, str(i->first)); - write(i->second); + pn_data_put_string(data, bytes(i->first)); + put(i->second); } pn_data_exit(data); } -void PnData::write(const Variant::List& list) + +void PnData::put(const Variant::List& list) { pn_data_put_list(data); pn_data_enter(data); for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { - write(*i); + put(*i); } pn_data_exit(data); } -void PnData::write(const Variant& value) + +void PnData::put(const Variant& value) { + // Open data descriptors associated with the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + pn_data_put_described(data); + pn_data_enter(data); + if (i->getType() == types::VAR_STRING) + pn_data_put_symbol(data, bytes(i->asString())); + else + pn_data_put_ulong(data, i->asUint64()); + } + + // Put the variant value switch (value.getType()) { case qpid::types::VAR_VOID: pn_data_put_null(data); @@ -65,61 +84,70 @@ void PnData::write(const Variant& value) pn_data_put_double(data, value.asDouble()); break; case qpid::types::VAR_STRING: - pn_data_put_string(data, str(value.asString())); + if (value.getEncoding() == ASCII) + pn_data_put_symbol(data, bytes(value.asString())); + else if (value.getEncoding() == BINARY) + pn_data_put_binary(data, bytes(value.asString())); + else + pn_data_put_string(data, bytes(value.asString())); break; case qpid::types::VAR_MAP: - write(value.asMap()); + put(value.asMap()); break; case qpid::types::VAR_LIST: - write(value.asList()); + put(value.asList()); break; default: break; } + + // Close any descriptors. + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) + pn_data_exit(data); } -bool PnData::read(qpid::types::Variant& value) +bool PnData::get(qpid::types::Variant& value) { - return read(pn_data_type(data), value); + return get(pn_data_type(data), value); } -void PnData::readList(qpid::types::Variant::List& value) +void PnData::getList(qpid::types::Variant::List& value) { size_t count = pn_data_get_list(data); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { qpid::types::Variant e; - if (read(e)) value.push_back(e); + if (get(e)) value.push_back(e); } pn_data_exit(data); } -void PnData::readMap(qpid::types::Variant::Map& value) +void PnData::getMap(qpid::types::Variant::Map& value) { size_t count = pn_data_get_list(data); pn_data_enter(data); for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { - std::string key = str(pn_data_get_symbol(data)); + std::string key = string(pn_data_get_symbol(data)); pn_data_next(data); qpid::types::Variant e; - if (read(e)) value[key]= e; + if (get(e)) value[key]= e; } pn_data_exit(data); } -void PnData::readArray(qpid::types::Variant::List& value) +void PnData::getArray(qpid::types::Variant::List& value) { size_t count = pn_data_get_array(data); pn_type_t type = pn_data_get_array_type(data); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { qpid::types::Variant e; - if (read(type, e)) value.push_back(e); + if (get(type, e)) value.push_back(e); } pn_data_exit(data); } -bool PnData::read(pn_type_t type, qpid::types::Variant& value) +bool PnData::get(pn_type_t type, qpid::types::Variant& value) { switch (type) { case PN_NULL: @@ -168,41 +196,41 @@ bool PnData::read(pn_type_t type, qpid::types::Variant& value) value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); return true; case PN_BINARY: - value = str(pn_data_get_binary(data)); + value = string(pn_data_get_binary(data)); value.setEncoding(qpid::types::encodings::BINARY); return true; case PN_STRING: - value = str(pn_data_get_string(data)); + value = string(pn_data_get_string(data)); value.setEncoding(qpid::types::encodings::UTF8); return true; case PN_SYMBOL: - value = str(pn_data_get_string(data)); + value = string(pn_data_get_string(data)); value.setEncoding(qpid::types::encodings::ASCII); return true; case PN_LIST: value = qpid::types::Variant::List(); - readList(value.asList()); + getList(value.asList()); return true; break; case PN_MAP: value = qpid::types::Variant::Map(); - readMap(value.asMap()); + getMap(value.asMap()); return true; case PN_ARRAY: value = qpid::types::Variant::List(); - readArray(value.asList()); + getArray(value.asList()); return true; case PN_DESCRIBED: + // TODO aconway 2014-11-20: get described values. case PN_DECIMAL32: case PN_DECIMAL64: case PN_DECIMAL128: default: return false; } - } -pn_bytes_t PnData::str(const std::string& s) +pn_bytes_t PnData::bytes(const std::string& s) { pn_bytes_t result; result.start = const_cast<char*>(s.data()); @@ -210,7 +238,7 @@ pn_bytes_t PnData::str(const std::string& s) return result; } -std::string PnData::str(const pn_bytes_t& in) +std::string PnData::string(const pn_bytes_t& in) { return std::string(in.start, in.size); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h index 6d03235432..b0119f88fd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/PnData.h +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h @@ -32,28 +32,29 @@ namespace messaging { namespace amqp { /** - * Helper class to read/write messaging types to/from pn_data_t. + * Helper class to put/get messaging types to/from pn_data_t. */ class PnData { public: - PnData(pn_data_t* d) : data(d) {} + pn_data_t* data; - void write(const types::Variant& value); - void write(const types::Variant::Map& map); - void write(const types::Variant::List& list); + PnData(pn_data_t* d) : data(d) {} - bool read(pn_type_t type, types::Variant& value); - bool read(types::Variant& value); - void readList(types::Variant::List& value); - void readMap(types::Variant::Map& value); - void readArray(types::Variant::List& value); + void put(const types::Variant& value); + void put(const types::Variant::Map& map); + void put(const types::Variant::List& list); + void put(int32_t n) { pn_data_put_int(data, n); } + void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); } - static pn_bytes_t str(const std::string&); - static std::string str(const pn_bytes_t&); + bool get(pn_type_t type, types::Variant& value); + bool get(types::Variant& value); + void getList(types::Variant::List& value); + void getMap(types::Variant::Map& value); + void getArray(types::Variant::List& value); - private: - pn_data_t* data; + static pn_bytes_t bytes(const std::string&); + static std::string string(const pn_bytes_t&); }; }}} // namespace messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 5e0707056f..a28509b0b1 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -37,9 +37,10 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co helper(address), receiver(pn_receiver(session, name.c_str())), capacity(0), used(0) {} + ReceiverContext::~ReceiverContext() { - pn_link_free(receiver); + if (receiver) pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) @@ -63,12 +64,13 @@ uint32_t ReceiverContext::getAvailable() uint32_t ReceiverContext::getUnsettled() { + assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver)); return pn_link_unsettled(receiver) - pn_link_queued(receiver); } void ReceiverContext::close() { - pn_link_close(receiver); + if (receiver) pn_link_close(receiver); } const std::string& ReceiverContext::getName() const @@ -96,7 +98,7 @@ void ReceiverContext::verify() } void ReceiverContext::configure() { - configure(pn_link_source(receiver)); + if (receiver) configure(pn_link_source(receiver)); } void ReceiverContext::configure(pn_terminus_t* source) { @@ -116,13 +118,13 @@ Address ReceiverContext::getAddress() const void ReceiverContext::reset(pn_session_t* session) { - receiver = pn_receiver(session, name.c_str()); - configure(); + receiver = session ? pn_receiver(session, name.c_str()) : 0; + if (receiver) configure(); } bool ReceiverContext::hasCurrent() { - return pn_link_current(receiver); + return receiver && pn_link_current(receiver); } bool ReceiverContext::wakeupToIssueCredit() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 2a48b2241a..b12af5eb25 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -18,8 +18,10 @@ * under the License. * */ -#include "qpid/messaging/amqp/SenderContext.h" -#include "qpid/messaging/amqp/EncodedMessage.h" +#include "SenderContext.h" +#include "Transaction.h" +#include "EncodedMessage.h" +#include "PnData.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/exceptions.h" #include "qpid/Exception.h" @@ -40,22 +42,29 @@ extern "C" { namespace qpid { namespace messaging { namespace amqp { + //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_) - : name(n), +SenderContext::SenderContext(pn_session_t* session, const std::string& n, + const qpid::messaging::Address& a, + bool setToOnSend_, + const CoordinatorPtr& coord) + : sender(pn_sender(session, n.c_str())), + name(n), address(a), helper(address), - sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()), - setToOnSend(setToOnSend_) {} + nextId(0), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_), + transaction(coord) +{} SenderContext::~SenderContext() { - pn_link_free(sender); + if (sender) pn_link_free(sender); } void SenderContext::close() { - pn_link_close(sender); + if (sender) pn_link_close(sender); } void SenderContext::setCapacity(uint32_t c) @@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: { resend();//if there are any messages needing to be resent at the front of the queue, send them first if (processUnsettled(false) < capacity && pn_link_credit(sender)) { + types::Variant state; + if (transaction) + state = transaction->getSendState(); if (unreliable) { Delivery delivery(nextId++); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = 0; return true; } else { @@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: try { Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = &delivery; return true; } catch (const std::exception& e) { @@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co throw SendError(e.what()); } } -void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) + +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) { pn_delivery_tag_t tag; tag.size = sizeof(id); @@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) tag.bytes = reinterpret_cast<const char*>(&id); #endif token = pn_delivery(sender, tag); + if (!state.isVoid()) { // Add transaction state + PnData data(pn_disposition_data(pn_delivery_local(token))); + data.put(state); + pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + } pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { pn_delivery_settle(token); @@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected() { return pn_delivery_remote_state(token) == PN_REJECTED; } + +std::string SenderContext::Delivery::error() +{ + pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + return (condition && pn_condition_is_set(condition)) ? + Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) : + std::string(); +} + void SenderContext::Delivery::settle() { pn_delivery_settle(token); @@ -570,10 +597,12 @@ void SenderContext::verify() helper.checkAssertion(target, AddressHelper::FOR_SENDER); } + void SenderContext::configure() { - configure(pn_link_target(sender)); + if (sender) configure(pn_link_target(sender)); } + void SenderContext::configure(pn_terminus_t* target) { helper.configure(sender, target, AddressHelper::FOR_SENDER); @@ -603,12 +632,10 @@ Address SenderContext::getAddress() const void SenderContext::reset(pn_session_t* session) { - sender = pn_sender(session, name.c_str()); - configure(); - - for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) { + sender = session ? pn_sender(session, name.c_str()) : 0; + if (sender) configure(); + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) i->reset(); - } } void SenderContext::resend() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 66e45a85a6..4d3c4bee79 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -24,6 +24,7 @@ #include <deque> #include <string> #include <vector> +#include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/AddressHelper.h" @@ -41,9 +42,10 @@ class Message; class MessageImpl; namespace amqp { -/** - * - */ + +class Transaction; + + class SenderContext { public: @@ -52,13 +54,15 @@ class SenderContext public: Delivery(int32_t id); void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField); - void send(pn_link_t*, bool unreliable); + void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant()); bool delivered(); bool accepted(); bool rejected(); void settle(); void reset(); bool sent() const; + pn_delivery_t* getToken() const { return token; } + std::string error(); private: int32_t id; pn_delivery_t* token; @@ -66,22 +70,32 @@ class SenderContext bool presettled; }; - SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend); + typedef boost::shared_ptr<Transaction> CoordinatorPtr; + + SenderContext(pn_session_t* session, const std::string& name, + const qpid::messaging::Address& target, + bool setToOnSend, + const CoordinatorPtr& transaction = CoordinatorPtr()); ~SenderContext(); - void reset(pn_session_t* session); - void close(); - void setCapacity(uint32_t); - uint32_t getCapacity(); - uint32_t getUnsettled(); - const std::string& getName() const; - const std::string& getTarget() const; - bool send(const qpid::messaging::Message& message, Delivery**); - void configure(); - void verify(); - void check(); - bool settled(); - bool closed(); - Address getAddress() const; + + virtual void reset(pn_session_t* session); + virtual void close(); + virtual void setCapacity(uint32_t); + virtual uint32_t getCapacity(); + virtual uint32_t getUnsettled(); + virtual const std::string& getName() const; + virtual const std::string& getTarget() const; + virtual bool send(const qpid::messaging::Message& message, Delivery**); + virtual void configure(); + virtual void verify(); + virtual void check(); + virtual bool settled(); + virtual bool closed(); + virtual Address getAddress() const; + + protected: + pn_link_t* sender; + private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; @@ -89,12 +103,12 @@ class SenderContext const std::string name; qpid::messaging::Address address; AddressHelper helper; - pn_link_t* sender; int32_t nextId; Deliveries deliveries; uint32_t capacity; bool unreliable; bool setToOnSend; + boost::shared_ptr<Transaction> transaction; uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp index 367db701cb..98f2d34e7d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -39,7 +39,8 @@ SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c, void SenderHandle::send(const Message& message, bool sync) { - connection->send(session, sender, message, sync); + SenderContext::Delivery* d = 0; + connection->send(session, sender, message, sync, &d); } void SenderHandle::close() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 824b958af3..2b82ffc377 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -21,11 +21,15 @@ #include "SessionContext.h" #include "SenderContext.h" #include "ReceiverContext.h" +#include "Transaction.h" +#include "PnData.h" #include <boost/format.hpp> #include "qpid/messaging/Address.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" +#include "qpid/amqp/descriptors.h" + extern "C" { #include <proton/engine.h> } @@ -35,23 +39,32 @@ namespace messaging { namespace amqp { SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} + SessionContext::~SessionContext() { - senders.clear(); receivers.clear(); - pn_session_free(session); + // Clear all pointers to senders and receivers before we free the session. + senders.clear(); + receivers.clear(); + transaction.reset(); // Transaction is a sender. + if (!error && session) + pn_session_free(session); } boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend) { + error.raise(); std::string name = AddressHelper::getLinkName(address); - if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection"); - boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend)); + if (senders.find(name) != senders.end()) + throw LinkError("Link name must be unique within the scope of the connection"); + boost::shared_ptr<SenderContext> s( + new SenderContext(session, name, address, setToOnSend, transaction)); senders[name] = s; return s; } boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) { + error.raise(); std::string name = AddressHelper::getLinkName(address); if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection"); boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); @@ -61,6 +74,7 @@ boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::me boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const { + error.raise(); SenderMap::const_iterator i = senders.find(name); if (i == senders.end()) { throw qpid::messaging::KeyError(std::string("No such sender") + name); @@ -71,6 +85,7 @@ boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& na boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const { + error.raise(); ReceiverMap::const_iterator i = receivers.find(name); if (i == receivers.end()) { throw qpid::messaging::KeyError(std::string("No such receiver") + name); @@ -81,16 +96,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string void SessionContext::removeReceiver(const std::string& n) { + error.raise(); receivers.erase(n); } void SessionContext::removeSender(const std::string& n) { + error.raise(); senders.erase(n); } boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() { + error.raise(); for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { if (i->second->hasCurrent()) { return i->second; @@ -102,16 +120,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() uint32_t SessionContext::getReceivable() { + error.raise(); return 0;//TODO } uint32_t SessionContext::getUnsettledAcks() { + error.raise(); return 0;//TODO } qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { + error.raise(); qpid::framing::SequenceNumber id = next++; if (!pn_delivery_settled(delivery)) unacked[id] = delivery; @@ -121,22 +142,32 @@ qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) { + error.raise(); for (DeliveryMap::iterator i = begin; i != end; ++i) { - QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); - pn_delivery_update(i->second, PN_ACCEPTED); - pn_delivery_settle(i->second);//TODO: different settlement modes? + types::Variant txState; + if (transaction) { + QPID_LOG(trace, "Setting disposition for transactional delivery " + << i->first << " -> " << i->second); + transaction->acknowledge(i->second); + } else { + QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_delivery_update(i->second, PN_ACCEPTED); + pn_delivery_settle(i->second); //TODO: different settlement modes? + } } unacked.erase(begin, end); } void SessionContext::acknowledge() { + error.raise(); QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); acknowledge(unacked.begin(), unacked.end()); } void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) { + error.raise(); QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative); DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { @@ -149,6 +180,7 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) { + error.raise(); DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { if (reject) { @@ -166,7 +198,9 @@ void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) bool SessionContext::settled() { + error.raise(); bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { try { if (!i->second->closed() && !i->second->settled()) result = false; @@ -189,8 +223,25 @@ std::string SessionContext::getName() const void SessionContext::reset(pn_connection_t* connection) { - session = pn_session(connection); unacked.clear(); + if (transaction) { + if (transaction->isCommitting()) + error = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + error = new TransactionAborted("Transaction aborted: transport failure"); + resetSession(0); + senders.clear(); + receivers.clear(); + transaction.reset(); + return; + } + resetSession(pn_session(connection)); + +} + +void SessionContext::resetSession(pn_session_t* session_) { + session = session_; + if (transaction) transaction->reset(session); for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { i->second->reset(session); } @@ -198,4 +249,6 @@ void SessionContext::reset(pn_connection_t* connection) i->second->reset(session); } } + + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 8c2bb040a6..67b3c1e401 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -26,6 +26,7 @@ #include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/ExceptionHolder.h" struct pn_connection_t; struct pn_session_t; @@ -42,6 +43,8 @@ namespace amqp { class ConnectionContext; class SenderContext; class ReceiverContext; +class Transaction; + /** * */ @@ -63,23 +66,29 @@ class SessionContext bool settled(); void setName(const std::string&); std::string getName() const; + + void nack(const qpid::framing::SequenceNumber& id, bool reject); + private: friend class ConnectionContext; typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap; typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap; + pn_session_t* session; SenderMap senders; + boost::shared_ptr<Transaction> transaction; ReceiverMap receivers; DeliveryMap unacked; qpid::framing::SequenceNumber next; std::string name; + sys::ExceptionHolder error; qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); - void nack(const qpid::framing::SequenceNumber& id, bool reject); + void resetSession(pn_session_t*); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 4d427639d3..44294e5f04 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -42,12 +42,12 @@ SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shar void SessionHandle::commit() { - + connection->commit(session); } void SessionHandle::rollback() { - + connection->rollback(session); } void SessionHandle::acknowledge(bool /*sync*/) diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp new file mode 100644 index 0000000000..754b00d802 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Transaction.h" +#include "SessionContext.h" +#include "ConnectionContext.h" +#include "PnData.h" +#include <proton/engine.h> +#include <qpid/Exception.h> +#include <qpid/amqp/descriptors.h> +#include <qpid/messaging/exceptions.h> +#include <qpid/log/Statement.h> +#include "qpid/messaging/Message.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +using namespace types; +using types::Exception; + +namespace { +const std::string LOCAL_TRANSACTIONS("amqp:local-transactions"); +const std::string TX_COORDINATOR("tx-transaction"); +const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}"); +} + +Transaction::Transaction(pn_session_t* session) : + SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false) +{} + +void Transaction::clear() { + id.clear(); + sendState.reset(); + acceptState.reset(); +} + +void Transaction::configure() { + SenderContext::configure(); + pn_terminus_t* target = pn_link_target(sender); + pn_terminus_set_type(target, PN_COORDINATOR); + PnData(pn_terminus_capabilities(target)).putSymbol(LOCAL_TRANSACTIONS); +} + +void Transaction::verify() {} + +const std::string& Transaction::getTarget() const { return getName(); } + +void Transaction::declare(SendFunction send, const SessionPtr& session) { + committing = false; + error.raise(); + clear(); + Variant declare = Variant::described(qpid::amqp::transaction::DECLARE_CODE, Variant::List()); + SenderContext::Delivery* delivery = 0; + send(session, shared_from_this(), Message(declare), true, &delivery); + setId(*delivery); +} + +void Transaction::discharge(SendFunction send, const SessionPtr& session, bool fail) { + error.raise(); + committing = !fail; + try { + // Send a discharge message to the remote coordinator. + Variant::List dischargeList; + dischargeList.push_back(Variant(id)); + dischargeList.push_back(Variant(fail)); + Variant discharge(dischargeList); + discharge.setDescriptor(qpid::amqp::transaction::DISCHARGE_CODE); + SenderContext::Delivery* delivery = 0; + send(session, shared_from_this(), Message(discharge), true, &delivery); + if (!delivery->accepted()) + throw TransactionAborted(delivery->error()); + committing = false; + } + catch(const TransactionError&) { + throw; + } + catch(const Exception& e) { + committing = false; + throw TransactionAborted(e.what()); + } +} + +// Set the transaction ID from the delivery returned by the remote coordinator. +void Transaction::setId(const SenderContext::Delivery& delivery) +{ + if (delivery.getToken() && + pn_delivery_remote_state(delivery.getToken()) == qpid::amqp::transaction::DECLARED_CODE) + { + pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery.getToken())); + if (data && pn_data_next(data)) { + size_t count = pn_data_get_list(data); + if (count > 0) { + pn_data_enter(data); + pn_data_next(data); + setId(PnData::string(pn_data_get_binary(data))); + pn_data_exit(data); + return; + } + } + } + throw TransactionError("No transaction ID returned by remote coordinator."); +} + +void Transaction::setId(const std::string& id_) { + id = id_; + if (id.empty()) { + clear(); + } + else { + // NOTE: The send and accept states are NOT described, the descriptor + // is added in pn_delivery_update. + Variant::List list; + list.push_back(Variant(id, "binary")); + sendState = Variant(list); + + Variant accepted = Variant::described(qpid::amqp::message::ACCEPTED_CODE, Variant::List()); + list.push_back(accepted); + acceptState = Variant(list); + } +} + +types::Variant Transaction::getSendState() const { + error.raise(); + return sendState; +} + +void Transaction::acknowledge(pn_delivery_t* delivery) +{ + error.raise(); + PnData data(pn_disposition_data(pn_delivery_local(delivery))); + data.put(acceptState); + pn_delivery_update(delivery, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + pn_delivery_settle(delivery); +} + + + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.h b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h new file mode 100644 index 0000000000..35492c9bb3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h @@ -0,0 +1,95 @@ +#ifndef COORDINATORCONTEXT_H +#define COORDINATORCONTEXT_H +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#include "SenderContext.h" +#include <qpid/types/Variant.h> +#include "qpid/sys/ExceptionHolder.h" +#include <boost/enable_shared_from_this.hpp> +#include <boost/function.hpp> + +struct pn_session_t; + +namespace qpid { +namespace messaging { +namespace amqp { + +class SessionContext; +class ConnectionContext; + +/** + * Track the current transaction for a session. + * + * Implements SenderContext, to send transaction command messages to remote coordinator. + */ +class Transaction : public SenderContext, public boost::enable_shared_from_this<Transaction> { + public: + typedef boost::shared_ptr<SessionContext> SessionPtr; + + typedef boost::function<void (boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery)> SendFunction; + + Transaction(pn_session_t*); + + sys::ExceptionHolder error; + + /** Declare a transaction using connection and session to send to remote co-ordinator. */ + void declare(SendFunction, const SessionPtr& session); + + /** Discharge a transaction using connection and session to send to remote co-ordinator. + *@param fail: true means rollback, false means commit. + */ + void discharge(SendFunction, const SessionPtr& session, bool fail); + + /** Update a delivery with a transactional accept state. */ + void acknowledge(pn_delivery_t* delivery); + + /** Get delivery state to attach to transfers sent in a transaction. */ + types::Variant getSendState() const; + + /** Override SenderContext::getTarget with a more readable value */ + const std::string& getTarget() const; + + bool isCommitting() const { return committing; } + + protected: + // SenderContext overrides + void configure(); + void verify(); + + private: + std::string id; + types::Variant sendState; + types::Variant acceptState; + bool committing; + + + void clear(); + void setId(const SenderContext::Delivery& delivery); + void setId(const std::string& id); +}; + +}}} + +#endif diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 7be625a1a3..2037ba38ab 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -36,17 +36,20 @@ namespace sys { struct ProtocolTimeoutTask : public sys::TimerTask { AsynchIOHandler& handler; std::string id; + Duration timeout; - ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) : - TimerTask(timeout, "ProtocolTimeout"), + ProtocolTimeoutTask(const std::string& i, const Duration& timeout_, AsynchIOHandler& h) : + TimerTask(timeout_, "ProtocolTimeout"), handler(h), - id(i) + id(i), + timeout(timeout_) {} void fire() { // If this fires it means that we didn't negotiate the connection in the timeout period // Schedule closing the connection for the io thread - QPID_LOG(error, "Connection " << id << " No protocol received closing"); + QPID_LOG(error, "Connection " << id << " No protocol received after " << timeout + << ", closing"); handler.abort(); } }; diff --git a/qpid/cpp/src/qpid/sys/aix/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/aix/SystemInfo.cpp new file mode 100644 index 0000000000..d28d9b7b37 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/aix/SystemInfo.cpp @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" +#include <procinfo.h> +#include <arpa/inet.h> +#include <net/if.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/utsname.h> +#include <unistd.h> +#include <map> +#include <netdb.h> +#include <string.h> + +#ifndef HOST_NAME_MAX +# define HOST_NAME_MAX 256 +#endif + +using namespace std; + +namespace qpid { +namespace sys { + +long SystemInfo::concurrency() { + return sysconf(_SC_NPROCESSORS_ONLN); +} + +bool SystemInfo::getLocalHostname (Address &address) { + char name[HOST_NAME_MAX]; + if (::gethostname(name, sizeof(name)) != 0) + return false; + address.host = name; + return true; +} + +static const string LOOPBACK("127.0.0.1"); +static const string TCP("tcp"); + +// Test IPv4 address for loopback +inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) { + return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000); +} + +inline bool isLoopback(const ::sockaddr* addr) { + switch (addr->sa_family) { + case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr); + case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr); + default: return false; + } +} + +namespace { + class HandleCloser : public IOHandle { + public: + HandleCloser(int fd) : IOHandle(fd) {} + ~HandleCloser() { ::close(fd); fd = -1; } + }; + + inline bool isInetOrInet6(::sockaddr* sa) { + switch (sa->sa_family) { + case AF_INET: + case AF_INET6: + return true; + default: + return false; + } + } + + inline void *InetAddr(::sockaddr* sa) { + switch (sa->sa_family) { + case AF_INET: + return &(reinterpret_cast<struct sockaddr_in *>(sa)->sin_addr); + case AF_INET6: + return &(reinterpret_cast<struct sockaddr_in6 *>(sa)->sin6_addr); + default: + return 0; + } + } + + typedef std::map<std::string, std::vector<std::string> > InterfaceInfo; + std::map<std::string, std::vector<std::string> > cachedInterfaces; + + void cacheInterfaceInfo() { + int status = 0; + int handle = ::socket (PF_INET, SOCK_DGRAM, 0); + QPID_POSIX_CHECK(handle); + HandleCloser h(handle); + + size_t num_ifs = 0; + struct ifconf ifc; + status = ::ioctl(handle, SIOCGSIZIFCONF, (caddr_t)&ifc.ifc_len); + QPID_POSIX_CHECK(status); + + std::auto_ptr<char> auto_ifc_buf(new char[ifc.ifc_len]); + memset (auto_ifc_buf.get(), 0, ifc.ifc_len); + + status = ::ioctl(handle, SIOCGIFCONF, (caddr_t)auto_ifc_buf.get()); + QPID_POSIX_CHECK(status); + + char *buf_start = auto_ifc_buf.get(); + char *buf_end = buf_start + ifc.ifc_len; + + for (char *ptr = buf_start; ptr < buf_end; ) { + struct ifreq *req = reinterpret_cast<struct ifreq *>(ptr); + ptr += IFNAMSIZ; + ptr += req->ifr_addr.sa_len; + if (!strcmp("lo0", req->ifr_name) || !isInetOrInet6(&req->ifr_addr)) + continue; + char dots[INET6_ADDRSTRLEN]; + if (! ::inet_ntop(req->ifr_addr.sa_family, InetAddr(&req->ifr_addr), dots, sizeof(dots))) + throw QPID_POSIX_ERROR(errno); + std::string address(dots); + cachedInterfaces[req->ifr_name].push_back(address); + } + } +} + +bool SystemInfo::getInterfaceAddresses(const std::string& interface, std::vector<std::string>& addresses) { + if ( cachedInterfaces.empty() ) cacheInterfaceInfo(); + InterfaceInfo::iterator i = cachedInterfaces.find(interface); + if ( i==cachedInterfaces.end() ) return false; + std::copy(i->second.begin(), i->second.end(), std::back_inserter(addresses)); + return true; +} + +void SystemInfo::getInterfaceNames(std::vector<std::string>& names ) { + if ( cachedInterfaces.empty() ) cacheInterfaceInfo(); + + for (InterfaceInfo::const_iterator i = cachedInterfaces.begin(); i!=cachedInterfaces.end(); ++i) { + names.push_back(i->first); + } +} + +void SystemInfo::getSystemId (std::string &osName, + std::string &nodeName, + std::string &release, + std::string &version, + std::string &machine) +{ + struct utsname _uname; + if (uname (&_uname) == 0) + { + osName = _uname.sysname; + nodeName = _uname.nodename; + release = _uname.release; + version = _uname.version; + machine = _uname.machine; + } +} + +uint32_t SystemInfo::getProcessId() +{ + return (uint32_t) ::getpid(); +} + +uint32_t SystemInfo::getParentProcessId() +{ + return (uint32_t) ::getppid(); +} + +// AIX specific +string SystemInfo::getProcessName() +{ + struct procsinfo my_info; + pid_t my_pid = getpid(); + int status = getprocs(&my_info, sizeof(my_info), 0, 0, &my_pid, 1); + QPID_POSIX_CHECK(status); + std::string my_name(my_info.pi_comm); + return my_name; +} + +// Always true. Only Windows has exception cases. +bool SystemInfo::threadSafeShutdown() +{ + return true; +} + + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp index 6b979a016b..26dbe0c91e 100644 --- a/qpid/cpp/src/qpid/types/Variant.cpp +++ b/qpid/cpp/src/qpid/types/Variant.cpp @@ -43,21 +43,23 @@ class VariantImpl { public: VariantImpl(); - VariantImpl(bool); - VariantImpl(uint8_t); - VariantImpl(uint16_t); - VariantImpl(uint32_t); - VariantImpl(uint64_t); - VariantImpl(int8_t); - VariantImpl(int16_t); - VariantImpl(int32_t); - VariantImpl(int64_t); - VariantImpl(float); - VariantImpl(double); - VariantImpl(const std::string&, const std::string& encoding=std::string()); - VariantImpl(const Variant::Map&); - VariantImpl(const Variant::List&); - VariantImpl(const Uuid&); + void reset(); + void set(bool); + void set(uint8_t); + void set(uint16_t); + void set(uint32_t); + void set(uint64_t); + void set(int8_t); + void set(int16_t); + void set(int32_t); + void set(int64_t); + void set(float); + void set(double); + void set(const std::string&, const std::string& encoding=std::string()); + void set(const Variant::Map&); + void set(const Variant::List&); + void set(const Uuid&); + void set(const Variant&); ~VariantImpl(); VariantType getType() const; @@ -90,9 +92,10 @@ class VariantImpl bool isEqualTo(VariantImpl&) const; bool isEquivalentTo(VariantImpl&) const; - static VariantImpl* create(const Variant&); + Variant::List descriptors; // Optional descriptors for described value. + private: - const VariantType type; + VariantType type; union { bool b; uint8_t ui8; @@ -110,7 +113,7 @@ class VariantImpl Variant::List* list; std::string* string; } value; - std::string encoding;//optional encoding for variable length data + std::string encoding; // Optional encoding for variable length data. template<class T> T convertFromString() const { @@ -136,26 +139,34 @@ class VariantImpl }; +VariantImpl::VariantImpl() : type(VAR_VOID) {} + +void VariantImpl::set(bool b) { reset(); type = VAR_BOOL; value.b = b; } +void VariantImpl::set(uint8_t i) { reset(); type = VAR_UINT8; value.ui8 = i; } +void VariantImpl::set(uint16_t i) { reset(); type = VAR_UINT16; value.ui16 = i; } +void VariantImpl::set(uint32_t i) { reset(); type = VAR_UINT32; value.ui32 = i; } +void VariantImpl::set(uint64_t i) { reset(); type = VAR_UINT64; value.ui64 = i; } +void VariantImpl::set(int8_t i) { reset(); type = VAR_INT8; value.i8 = i; } +void VariantImpl::set(int16_t i) { reset(); type = VAR_INT16; value.i16 = i; } +void VariantImpl::set(int32_t i) { reset(); type = VAR_INT32; value.i32 = i; } +void VariantImpl::set(int64_t i) { reset(); type = VAR_INT64; value.i64 = i; } +void VariantImpl::set(float f) { reset(); type = VAR_FLOAT; value.f = f; } +void VariantImpl::set(double d) { reset(); type = VAR_DOUBLE; value.d = d; } +void VariantImpl::set(const std::string& s, const std::string& e) { reset(); type = VAR_STRING; encoding = e; value.string = new std::string(s); } + +void VariantImpl::set(const Variant::Map& m) { + reset(); + type = VAR_MAP; + value.map = new Variant::Map(m); +} + +void VariantImpl::set(const Variant::List& l) { reset(); type = VAR_LIST; value.list = new Variant::List(l); } + +void VariantImpl::set(const Uuid& u) { reset(); type = VAR_UUID; value.uuid = new Uuid(u); } -VariantImpl::VariantImpl() : type(VAR_VOID) { value.i64 = 0; } -VariantImpl::VariantImpl(bool b) : type(VAR_BOOL) { value.b = b; } -VariantImpl::VariantImpl(uint8_t i) : type(VAR_UINT8) { value.ui8 = i; } -VariantImpl::VariantImpl(uint16_t i) : type(VAR_UINT16) { value.ui16 = i; } -VariantImpl::VariantImpl(uint32_t i) : type(VAR_UINT32) { value.ui32 = i; } -VariantImpl::VariantImpl(uint64_t i) : type(VAR_UINT64) { value.ui64 = i; } -VariantImpl::VariantImpl(int8_t i) : type(VAR_INT8) { value.i8 = i; } -VariantImpl::VariantImpl(int16_t i) : type(VAR_INT16) { value.i16 = i; } -VariantImpl::VariantImpl(int32_t i) : type(VAR_INT32) { value.i32 = i; } -VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; } -VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; } -VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; } -VariantImpl::VariantImpl(const std::string& s, const std::string& e) - : type(VAR_STRING), encoding(e) { value.string = new std::string(s); } -VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.map = new Variant::Map(m); } -VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.list = new Variant::List(l); } -VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.uuid = new Uuid(u); } - -VariantImpl::~VariantImpl() { +VariantImpl::~VariantImpl() { reset(); } + +void VariantImpl::reset() { switch (type) { case VAR_STRING: delete value.string; @@ -172,6 +183,7 @@ VariantImpl::~VariantImpl() { default: break; } + type = VAR_VOID; } VariantType VariantImpl::getType() const { return type; } @@ -637,46 +649,50 @@ bool isIntegerType(VariantType type) } } -VariantImpl* VariantImpl::create(const Variant& v) +void VariantImpl::set(const Variant& v) { switch (v.getType()) { - case VAR_BOOL: return new VariantImpl(v.asBool()); - case VAR_UINT8: return new VariantImpl(v.asUint8()); - case VAR_UINT16: return new VariantImpl(v.asUint16()); - case VAR_UINT32: return new VariantImpl(v.asUint32()); - case VAR_UINT64: return new VariantImpl(v.asUint64()); - case VAR_INT8: return new VariantImpl(v.asInt8()); - case VAR_INT16: return new VariantImpl(v.asInt16()); - case VAR_INT32: return new VariantImpl(v.asInt32()); - case VAR_INT64: return new VariantImpl(v.asInt64()); - case VAR_FLOAT: return new VariantImpl(v.asFloat()); - case VAR_DOUBLE: return new VariantImpl(v.asDouble()); - case VAR_STRING: return new VariantImpl(v.asString(), v.getEncoding()); - case VAR_MAP: return new VariantImpl(v.asMap()); - case VAR_LIST: return new VariantImpl(v.asList()); - case VAR_UUID: return new VariantImpl(v.asUuid()); - default: return new VariantImpl(); + case VAR_BOOL: set(v.asBool()); break; + case VAR_UINT8: set(v.asUint8()); break; + case VAR_UINT16: set(v.asUint16()); break; + case VAR_UINT32: set(v.asUint32()); break; + case VAR_UINT64: set(v.asUint64()); break; + case VAR_INT8: set(v.asInt8()); break; + case VAR_INT16: set(v.asInt16()); break; + case VAR_INT32: set(v.asInt32()); break; + case VAR_INT64: set(v.asInt64()); break; + case VAR_FLOAT: set(v.asFloat()); break; + case VAR_DOUBLE: set(v.asDouble()); break; + case VAR_STRING: set(v.asString(), v.getEncoding()); break; + case VAR_MAP: set(v.asMap()); break; + case VAR_LIST: set(v.asList()); break; + case VAR_UUID: set(v.asUuid()); break; + default: reset(); } -} - -Variant::Variant() : impl(new VariantImpl()) {} -Variant::Variant(bool b) : impl(new VariantImpl(b)) {} -Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(uint32_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(uint64_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int8_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int16_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int32_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int64_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(float f) : impl(new VariantImpl(f)) {} -Variant::Variant(double d) : impl(new VariantImpl(d)) {} -Variant::Variant(const std::string& s) : impl(new VariantImpl(s)) {} -Variant::Variant(const char* s) : impl(new VariantImpl(std::string(s))) {} -Variant::Variant(const Map& m) : impl(new VariantImpl(m)) {} -Variant::Variant(const List& l) : impl(new VariantImpl(l)) {} -Variant::Variant(const Variant& v) : impl(VariantImpl::create(v)) {} -Variant::Variant(const Uuid& u) : impl(new VariantImpl(u)) {} + encoding = v.getEncoding(); + descriptors = v.getDescriptors(); +} + +Variant::Variant() : impl(0) {} +Variant::Variant(bool b) : impl(new VariantImpl()) { impl->set(b); } +Variant::Variant(uint8_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(uint16_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(uint32_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(uint64_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int8_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int16_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int32_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int64_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(float f) : impl(new VariantImpl()) { impl->set(f); } +Variant::Variant(double d) : impl(new VariantImpl()) { impl->set(d); } +Variant::Variant(const std::string& s) : impl(new VariantImpl()) { impl->set(s); } +Variant::Variant(const std::string& s, const std::string& encoding) : impl(new VariantImpl()) { impl->set(s, encoding); } +Variant::Variant(const char* s) : impl(new VariantImpl()) { impl->set(std::string(s)); } +Variant::Variant(const char* s, const char* encoding) : impl(new VariantImpl()) { impl->set(std::string(s), std::string(encoding)); } +Variant::Variant(const Map& m) : impl(new VariantImpl()) { impl->set(m); } +Variant::Variant(const List& l) : impl(new VariantImpl()) { impl->set(l); } +Variant::Variant(const Variant& v) : impl(new VariantImpl()) { impl->set(v); } +Variant::Variant(const Uuid& u) : impl(new VariantImpl()) { impl->set(u); } Variant::~Variant() { if (impl) delete impl; } @@ -686,116 +702,105 @@ void Variant::reset() impl = 0; } +namespace { +VariantImpl* assure(VariantImpl*& ptr) { + if (!ptr) ptr = new VariantImpl(); + return ptr; +} +} Variant& Variant::operator=(bool b) { - if (impl) delete impl; - impl = new VariantImpl(b); + assure(impl)->set(b); return *this; } Variant& Variant::operator=(uint8_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(uint16_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(uint32_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(uint64_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int8_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int16_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int32_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int64_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(float f) { - if (impl) delete impl; - impl = new VariantImpl(f); + assure(impl)->set(f); return *this; } Variant& Variant::operator=(double d) { - if (impl) delete impl; - impl = new VariantImpl(d); + assure(impl)->set(d); return *this; } Variant& Variant::operator=(const std::string& s) { - if (impl) delete impl; - impl = new VariantImpl(s); + assure(impl)->set(s); return *this; } Variant& Variant::operator=(const char* s) { - if (impl) delete impl; - impl = new VariantImpl(std::string(s)); + assure(impl)->set(std::string(s)); return *this; } Variant& Variant::operator=(const Uuid& u) { - if (impl) delete impl; - impl = new VariantImpl(u); + assure(impl)->set(u); return *this; } Variant& Variant::operator=(const Map& m) { - if (impl) delete impl; - impl = new VariantImpl(m); + assure(impl)->set(m); return *this; } Variant& Variant::operator=(const List& l) { - if (impl) delete impl; - impl = new VariantImpl(l); + assure(impl)->set(l); return *this; } Variant& Variant::operator=(const Variant& v) { - if (impl) delete impl; - impl = VariantImpl::create(v); + assure(impl)->set(v); return *this; } @@ -841,8 +846,7 @@ Variant::List& Variant::asList() { if (!impl) throw InvalidConversion("Can't con const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); } std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); } void Variant::setEncoding(const std::string& s) { - if (!impl) impl = new VariantImpl(); - impl->setEncoding(s); + assure(impl)->setEncoding(s); } const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; } @@ -884,6 +888,12 @@ std::ostream& operator<<(std::ostream& out, const Variant::List& list) std::ostream& operator<<(std::ostream& out, const Variant& value) { + // Print the descriptors + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) + out << "@" << *i << " "; + + // Print the value switch (value.getType()) { case VAR_MAP: out << value.asMap(); @@ -910,7 +920,43 @@ bool operator!=(const Variant& a, const Variant& b) { return !(a == b); } bool Variant::isEqualTo(const Variant& other) const { + if (isVoid() && other.isVoid()) return true; + if (isVoid() || other.isVoid()) return false; return impl && impl->isEqualTo(*other.impl); } +bool Variant::isDescribed() const { + return impl && !impl->descriptors.empty(); +} + +Variant::List& Variant::getDescriptors() { + return assure(impl)->descriptors; +} + +const Variant::List& Variant::getDescriptors() const { + return assure(impl)->descriptors; +} + +Variant Variant::getDescriptor() const { + if (getDescriptors().size() > 0) return getDescriptors().front(); + else return Variant(); +} + +void Variant::setDescriptor(const Variant& descriptor) { + getDescriptors().clear(); + getDescriptors().push_back(descriptor); +} + +Variant Variant::described(const Variant& descriptor, const Variant& value) { + Variant described(value); + described.setDescriptor(descriptor); + return described; +} + +Variant Variant::described(const Variant& descriptor, const List& value) { + Variant described(value); + described.setDescriptor(descriptor); + return described; +} + }} // namespace qpid::types diff --git a/qpid/cpp/src/qpid/types/encodings.h b/qpid/cpp/src/qpid/types/encodings.h index 827b6964b9..571e8607aa 100644 --- a/qpid/cpp/src/qpid/types/encodings.h +++ b/qpid/cpp/src/qpid/types/encodings.h @@ -23,11 +23,13 @@ */ namespace qpid { namespace types { + namespace encodings { const std::string BINARY("binary"); const std::string UTF8("utf8"); const std::string ASCII("ascii"); } + }} // namespace qpid::types #endif /*!QPID_TYPES_ENCODINGS_H*/ diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index c455dd10fc..474b9d747f 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -101,11 +101,13 @@ struct BrokerFixture : private boost::noncopyable { opts.auth=false; // Argument parsing - std::vector<const char*> argv(args.size()); - for (size_t i = 0; i<args.size(); ++i) - argv[i] = args[i].c_str(); - Plugin::addOptions(opts); - opts.parse(argv.size(), &argv[0]); + if (args.size() > 0) { + std::vector<const char*> argv(args.size()); + for (size_t i = 0; i<args.size(); ++i) + argv[i] = args[i].c_str(); + Plugin::addOptions(opts); + opts.parse(argv.size(), &argv[0]); + } broker = Broker::create(opts); // TODO aconway 2007-12-05: At one point BrokerFixture // tests could hang in Connection ctor if the following diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index c914c50e33..f3443aa57e 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -360,6 +360,11 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) # paged queue not yet implemented for windows add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix}) endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) + +if (BUILD_AMQP) + add_test (interop_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interop_tests.py) +endif (BUILD_AMQP) + add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py) add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py) if (BUILD_AMQP) diff --git a/qpid/cpp/src/tests/Variant.cpp b/qpid/cpp/src/tests/Variant.cpp index d2394bfbad..d6605f9fe5 100644 --- a/qpid/cpp/src/tests/Variant.cpp +++ b/qpid/cpp/src/tests/Variant.cpp @@ -18,14 +18,16 @@ * under the License. * */ -#include <iostream> -#include "qpid/types/Variant.h" -#include "qpid/amqp_0_10/Codecs.h" #include "unit_test.h" +#include "qpid/types/Variant.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <boost/assign.hpp> +#include <iostream> using namespace qpid::types; using namespace qpid::amqp_0_10; +using boost::assign::list_of; namespace qpid { namespace tests { @@ -807,6 +809,22 @@ QPID_AUTO_TEST_CASE(parse) BOOST_CHECK(a.getType()==types::VAR_DOUBLE); } +QPID_AUTO_TEST_CASE(described) +{ + Variant a; + BOOST_CHECK(!a.isDescribed()); + a.getDescriptors().push_back("foo"); + BOOST_CHECK(a.isDescribed()); + BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")); + a = 42; + BOOST_CHECK(a.isDescribed()); + BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")); + a.getDescriptors().push_back(33); + BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")(33)); + a.getDescriptors().clear(); + BOOST_CHECK(!a.isDescribed()); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 598879d4ad..2566bc527d 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -21,9 +21,9 @@ import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal +import proton from qpid import connection, util from qpid.compat import format_exc -from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition @@ -49,13 +49,18 @@ from qpidtoollibs import BrokerAgent import qpid.messaging qm = qpid.messaging qpid_messaging = None + +def env_has_log_config(): + """True if there are qpid log configuratoin settings in the environment.""" + return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ + if not os.environ.get("QPID_PY_NO_SWIG"): try: import qpid_messaging from qpid.datatypes import uuid4 qm = qpid_messaging # Silence warnings from swigged messaging library unless enabled in environment. - if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ: + if not env_has_log_config(): qm.Logger.configure(["--log-enable=error"]) except ImportError: print "Cannot load python SWIG bindings, falling back to native qpid.messaging." @@ -136,7 +141,7 @@ _popen_id = AtomicCounter() # Popen identifier for use in output file names. # Constants for file descriptor arguments to Popen FILE = "FILE" # Write to file named after process -PIPE = subprocess.PIPE +from subprocess import PIPE, STDOUT class Popen(subprocess.Popen): """ @@ -202,7 +207,7 @@ class Popen(subprocess.Popen): def communicate(self, input=None): ret = subprocess.Popen.communicate(self, input) - self.cleanup() + self._cleanup() return ret def is_running(self): return self.poll() is None @@ -254,6 +259,7 @@ class Popen(subprocess.Popen): def cmd_str(self): return " ".join([str(s) for s in self.cmd]) + def checkenv(name): value = os.getenv(name) if not value: raise Exception("Environment variable %s is not set" % name) @@ -308,7 +314,7 @@ class Broker(Popen): cmd += ["--log-to-stderr=no"] # Add default --log-enable arguments unless args already has --log arguments. - if not [l for l in args if l.startswith("--log")]: + if not env_has_log_config() and not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+"] if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, @@ -444,10 +450,11 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content): finally: r.close() -def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): +def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" + if msg is None: msg = "browse '%s' failed" % queue actual_contents = browse(session, queue, timeout, transform=transform) if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg @@ -486,6 +493,18 @@ class BrokerTest(TestCase): test_store_lib = os.getenv("TEST_STORE_LIB") rootdir = os.getcwd() + PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR) + PN_TX_VERSION = (0, 9) + + amqp_tx_supported = PN_VERSION >= PN_TX_VERSION + + @classmethod + def amqp_tx_warning(cls): + if not cls.amqp_tx_supported: + print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION) + return False + return True + def configure(self, config): self.config=config def setUp(self): @@ -498,8 +517,8 @@ class BrokerTest(TestCase): if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0" else: default_protocol="amqp0-10" self.protocol = defs.get("PROTOCOL") or default_protocol - self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0 - + self.tx_protocol = self.protocol + if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10" def tearDown(self): err = [] @@ -530,15 +549,22 @@ class BrokerTest(TestCase): self.teardown_add(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd) + b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b + def check_output(self, args, stdin=None): + p = self.popen(args, stdout=PIPE, stderr=STDOUT) + out = p.communicate(stdin) + if p.returncode != 0: + raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0])) + return out[0] + def browse(self, *args, **kwargs): browse(*args, **kwargs) def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 40ea3854c9..82ca808cb1 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -24,6 +24,7 @@ from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent +from qpid.harness import Skipped log = getLogger(__name__) @@ -129,12 +130,9 @@ class HaBroker(Broker): args += ["--load-module", BrokerTest.ha_lib, # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", - # Heartbeat and negotiate time are needed so that a broker wont - # stall on an address that doesn't currently have a broker running. - "--max-negotiate-time=1000", "--ha-cluster=%s"%ha_cluster] # Add default --log-enable arguments unless args already has --log arguments. - if not [l for l in args if l.startswith("--log")]: + if not env_has_log_config() and not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+", "--log-enable=debug+:ha::"] if not [h for h in args if h.startswith("--link-heartbeat-interval")]: args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)] @@ -159,13 +157,20 @@ acl allow all all Broker.__init__(self, test, args, port=ha_port.port, **kwargs) # Do some static setup to locate the qpid-config and qpid-ha tools. - qpid_ha_script=import_script(os.path.join(os.getenv("PYTHON_COMMANDS"),"qpid-ha")) - qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") - assert os.path.isfile(qpid_config_path) + @property + def qpid_ha_script(self): + if not hasattr(self, "_qpid_ha_script"): + qpid_ha_exec = os.getenv("QPID_HA_EXEC") + if not qpid_ha_exec or not os.path.isfile(qpid_ha_exec): + raise Skipped("qpid-ha not available") + self._qpid_ha_script = import_script(qpid_ha_exec) + return self._qpid_ha_script def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port()) def qpid_ha(self, args): + if not self.qpid_ha_script: + raise Skipped("qpid-ha not available") try: cred = self.client_credentials url = self.host_port() @@ -195,33 +200,37 @@ acl allow all all def ha_status(self): return self.qmf().status - def wait_status(self, status, timeout=5): + def wait_status(self, status, timeout=10): + def try_get_status(): self._status = "<unknown>" - # Ignore ConnectionError, the broker may not be up yet. try: self._status = self.ha_status() - return self._status == status; - except qm.ConnectionError: return False + except qm.ConnectionError, e: + # Record the error but don't raise, the broker may not be up yet. + self._status = "%s: %s" % (type(e).__name__, e) + return self._status == status; assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( self, status, self._status) - def wait_queue(self, queue, timeout=1, msg="wait_queue"): + def wait_queue(self, queue, timeout=10, msg="wait_queue"): """ Wait for queue to be visible via QMF""" agent = self.agent - assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue + assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), \ + "%s queue %s not present" % (msg, queue) - def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"): + def wait_no_queue(self, queue, timeout=10, msg="wait_no_queue"): """ Wait for queue to be invisible via QMF""" agent = self.agent assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue) - # TODO aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): + qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") + if not qpid_config_exec or not os.path.isfile(qpid_config_exec): + raise Skipped("qpid-config not available") assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args, - stdout=1, stderr=subprocess.STDOUT - ) == 0 + [qpid_config_exec, "--broker", self.host_port()]+args, stdout=1, stderr=subprocess.STDOUT + ) == 0, "qpid-config failed" def config_replicate(self, from_broker, queue): self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) @@ -325,7 +334,7 @@ class HaCluster(object): ha_port = self._ports[i] b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name, args=args, **self.kwargs) - b.ready(timeout=5) + b.ready(timeout=10) return b def start(self): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index a43b939ee3..180831569f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1025,8 +1025,8 @@ class LongTests(HaBrokerTest): "--broker", brokers[0].host_port(), "--address", "q;{create:always}", "--messages=1000", - "--tx=10" - # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet + "--tx=10", + "--connection-options={protocol:%s}" % self.tx_protocol ]) receiver = self.popen( ["qpid-receive", @@ -1034,8 +1034,8 @@ class LongTests(HaBrokerTest): "--address", "q;{create:always}", "--messages=990", "--timeout=10", - "--tx=10" - # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet + "--tx=10", + "--connection-options={protocol:%s}" % self.tx_protocol ]) self.assertEqual(sender.wait(), 0) self.assertEqual(receiver.wait(), 0) @@ -1268,7 +1268,7 @@ class StoreTests(HaBrokerTest): """Verify that a backup erases queue data from store recovery before doing catch-up from the primary.""" if self.check_skip(): return - cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store']) + cluster = HaCluster(self, 2) sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True)) @@ -1532,7 +1532,7 @@ class TransactionTests(HaBrokerTest): except qm.TransactionUnknown: pass for b in cluster: self.assert_tx_clean(b) try: tx.connection.close() - except TransactionUnknown: pass # Occasionally get exception on close. + except qm.TransactionUnknown: pass # Occasionally get exception on close. finally: l.restore() def test_tx_no_backups(self): @@ -1622,21 +1622,26 @@ class TransactionTests(HaBrokerTest): import qpid_tests.broker_0_10 except ImportError: raise Skipped("Tests not found") - cluster = HaCluster(self, 3) - self.popen(["qpid-txtest", "-p%s"%cluster[0].port()]).assert_exit_ok() + if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"] + self.popen(["qpid-txtest2", "--broker", cluster[0].host_port()]).assert_exit_ok() + print self.popen(["qpid-python-test", "-m", "qpid_tests.broker_0_10", + "-m", "qpid_tests.broker_1_0", "-b", "localhost:%s"%(cluster[0].port()), - "*.tx.*"]).assert_exit_ok() + "*.tx.*"], stdout=None, stderr=None).assert_exit_ok() if __name__ == "__main__": - outdir = "ha_tests.tmp" - shutil.rmtree(outdir, True) - qpid_ha = os.getenv("QPID_HA_EXEC") - if qpid_ha and os.path.exists(qpid_ha): + qpid_ha_exec = os.getenv("QPID_HA_EXEC") + if qpid_ha_exec and os.path.isfile(qpid_ha_exec): + BrokerTest.amqp_tx_warning() + outdir = "ha_tests.tmp" + shutil.rmtree(outdir, True) os.execvp("qpid-python-test", - ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir] + ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:]) else: - print "Skipping ha_tests, %s not available"%(qpid_ha) + print "Skipping ha_tests, qpid-ha not available" + + diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index 20ce6167a8..3eec2422f1 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -88,6 +88,7 @@ class AmqpBrokerTest(BrokerTest): result = self.popen(cmd, stdout=PIPE) r.fetch(timeout=1) # wait until receiver is actually ready s.acknowledge() + r.close() s.close() return result diff --git a/qpid/cpp/src/tests/interop_tests.py b/qpid/cpp/src/tests/interop_tests.py new file mode 100755 index 0000000000..d5533ead21 --- /dev/null +++ b/qpid/cpp/src/tests/interop_tests.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A set of tests that can be run against a foreign AMQP 1.0 broker. + +RUNNING WITH A FOREIGN BROKER: + +1. Start the broker +2. Create persistent queues named: interop-a interop-b interop-q tx-1 tx-2 +3. Export the environment variable QPID_INTEROP_URL with the URL to connect to your broker + in the form [user[:password]@]host[:port] +4. From the build directory run this test: + ctest -VV -R interop_tests + +If QPID_INTEROP_URL is not set, a qpidd broker will be started for the test. +""" + +import os, sys, shutil, subprocess +import qpid_messaging as qm +from brokertest import * + +URL='QPID_INTEROP_URL' + +class InteropTest(BrokerTest): + + def setUp(self): + super(InteropTest, self).setUp() + self.url = os.environ[URL] + self.connect_opts = ['--broker', self.url, '--connection-options', '{protocol:amqp1.0}'] + + def connect(self, **kwargs): + """Python connection to interop URL""" + c = qm.Connection.establish(self.url, protocol='amqp1.0', **kwargs) + self.teardown_add(c) + return c + + def drain(self, queue, connection=None): + """ + Drain a queue to make sure it is empty. Throw away the messages. + """ + c = connection or self.connect() + r = c.session().receiver(queue) + try: + while True: + r.fetch(timeout=0) + r.session.acknowledge() + except qm.Empty: + pass + r.close() + + def clear_queue(self, queue, connection=None, properties=None, durable=False): + """ + Make empty queue, prefix with self.id(). Create if needed, drain if needed + @return queue name. + """ + queue = "interop-%s" % queue + c = connection or self.connect() + props = {'create':'always'} + if durable: props['node'] = {'durable':True} + if properties: props.update(properties) + self.drain("%s;%s" % (queue, props), c) + return queue + + +class SimpleTest(InteropTest): + """Simple test to check the broker is responding.""" + + def test_send_receive_python(self): + c = self.connect() + q = self.clear_queue('q', c) + s = c.session() + s.sender(q).send('foo') + self.assertEqual('foo', s.receiver(q).fetch().content) + + def test_send_receive_cpp(self): + q = self.clear_queue('q') + args = ['-b', self.url, '-a', q] + self.check_output(['qpid-send', '--content-string=cpp_foo'] + args) + self.assertEqual('cpp_foo', self.check_output(['qpid-receive'] + args).strip()) + + +class PythonTxTest(InteropTest): + + def tx_simple_setup(self): + """Start a transaction, remove messages from queue a, add messages to queue b""" + c = self.connect() + qa, qb = self.clear_queue('a', c, durable=True), self.clear_queue('b', c, durable=True) + + # Send messages to a, no transaction. + sa = c.session().sender(qa+";{create:always,node:{durable:true}}") + tx_msgs = ['x', 'y', 'z'] + for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) + + # Receive messages from a, in transaction. + tx = c.session(transactional=True) + txr = tx.receiver(qa) + self.assertEqual(tx_msgs, [txr.fetch(1).content for i in xrange(3)]) + tx.acknowledge() + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender(qb+";{create:always,node:{durable:true}}") + txs = tx.sender(qb) + msgs = [str(i) for i in xrange(3)] + for tx_m, m in zip(tx_msgs, msgs): + txs.send(tx_m); + sb.send(m) + tx.sync() + return tx, qa, qb + + def test_tx_simple_commit(self): + tx, qa, qb = self.tx_simple_setup() + s = self.connect().session() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2']) + tx.commit() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2', 'x', 'y', 'z']) + + def test_tx_simple_rollback(self): + tx, qa, qb = self.tx_simple_setup() + s = self.connect().session() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2']) + tx.rollback() + assert_browse(s, qa, ['x', 'y', 'z']) + assert_browse(s, qb, ['0', '1', '2']) + + def test_tx_sequence(self): + tx = self.connect().session(transactional=True) + notx = self.connect().session() + q = self.clear_queue('q', tx.connection, durable=True) + s = tx.sender(q) + r = tx.receiver(q) + s.send('a') + tx.commit() + assert_browse(notx, q, ['a']) + s.send('b') + tx.commit() + assert_browse(notx, q, ['a', 'b']) + self.assertEqual('a', r.fetch().content) + tx.acknowledge(); + tx.commit() + assert_browse(notx, q, ['b']) + s.send('z') + tx.rollback() + assert_browse(notx, q, ['b']) + self.assertEqual('b', r.fetch().content) + tx.acknowledge(); + tx.rollback() + assert_browse(notx, q, ['b']) + + +class CppTxTest(InteropTest): + + def test_txtest2(self): + self.popen(["qpid-txtest2"] + self.connect_opts).assert_exit_ok() + + def test_send_receive(self): + q = self.clear_queue('q', durable=True) + sender = self.popen(["qpid-send", + "--address", q, + "--messages=100", + "--tx=10", + "--durable=yes"] + self.connect_opts) + receiver = self.popen(["qpid-receive", + "--address", q, + "--messages=90", + "--timeout=10", + "--tx=10"] + self.connect_opts) + sender.assert_exit_ok() + receiver.assert_exit_ok() + expect = [long(i) for i in range(91, 101)] + sn = lambda m: m.properties["sn"] + assert_browse(self.connect().session(), q, expect, transform=sn) + + +if __name__ == "__main__": + if not BrokerTest.amqp_tx_supported: + BrokerTest.amqp_tx_warning() + print "Skipping interop_tests" + exit(0) + outdir = "interop_tests.tmp" + shutil.rmtree(outdir, True) + cmd = ["qpid-python-test", "-m", "interop_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:] + if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"] + if os.environ.get(URL): + os.execvp(cmd[0], cmd) + else: + dir = os.getcwd() + class StartBroker(BrokerTest): + def start_qpidd(self): pass + test = StartBroker('start_qpidd') + class Config: + def __init__(self): + self.defines = { 'OUTDIR': outdir } + test.configure(Config()) + test.setUp() + os.environ[URL] = test.broker().host_port() + os.chdir(dir) + p = subprocess.Popen(cmd) + status = p.wait() + test.tearDown() + sys.exit(status) diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 05a1a6df10..a71fd11fa7 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -197,7 +197,7 @@ int main(int argc, char ** argv) std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); Receiver receiver = session.createReceiver(opts.address); - receiver.setCapacity(opts.capacity); + receiver.setCapacity(std::min(opts.capacity, opts.messages)); Message msg; uint count = 0; uint txCount = 0; @@ -207,9 +207,9 @@ int main(int argc, char ** argv) Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) { session.createSender(opts.readyAddress).send(msg); - if (opts.tx) - session.commit(); - } + if (opts.tx) + session.commit(); + } // For receive rate calculation qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; @@ -290,6 +290,7 @@ int main(int argc, char ** argv) connection.close(); return 0; } + return 1; } catch(const std::exception& error) { std::cerr << "qpid-receive: " << error.what() << std::endl; connection.close(); diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index 498dc96ce9..970944f8d0 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -112,14 +112,14 @@ struct Options : public qpid::Options log(argv0), reportTotal(false), reportEvery(0), - reportHeader(true), - sendRate(0), - sequence(true), - timestamp(true), - groupPrefix("GROUP-"), - groupSize(10), - groupRandSize(false), - groupInterleave(1) + reportHeader(true), + sendRate(0), + sequence(true), + timestamp(true), + groupPrefix("GROUP-"), + groupSize(10), + groupRandSize(false), + groupInterleave(1) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -272,7 +272,7 @@ class MapContentGenerator : public ContentGenerator { // tag each generated message with a group identifer // class GroupGenerator { -public: + public: GroupGenerator(const std::string& key, const std::string& prefix, const uint size, @@ -351,7 +351,7 @@ int main(int argc, char ** argv) try { Options opts; if (opts.parse(argc, argv)) { - connection = Connection(opts.url, opts.connectionOptions); + connection = Connection(opts.url, opts.connectionOptions); connection.open(); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); @@ -447,6 +447,7 @@ int main(int argc, char ** argv) connection.close(); return 0; } + return 1; } catch(const std::exception& error) { std::cerr << "qpid-send: " << error.what() << std::endl; connection.close(); diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index 2393ec2396..58c48f9a8d 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -353,10 +353,11 @@ int main(int argc, char** argv) if (opts.init) controller.init(); if (opts.transfer) controller.transfer(); if (opts.check) return controller.check(); + return 0; } - return 0; + return 1; } catch(const std::exception& e) { - std::cout << argv[0] << ": " << e.what() << std::endl; + std::cerr << argv[0] << ": " << e.what() << std::endl; } return 2; } diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests index 4d9e5e35d4..40c35ac0fa 100755 --- a/qpid/cpp/src/tests/swig_python_tests +++ b/qpid/cpp/src/tests/swig_python_tests @@ -39,7 +39,8 @@ skip() { } start_broker() { - QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker" + rm -f swig_python_tests.log + QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no --log-to-file swig_python_tests.log) || fail "Could not start broker" } stop_broker() { @@ -54,9 +55,9 @@ echo "Running swigged python tests using broker on port $QPID_PORT" export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG export QPID_USE_SWIG_CLIENT=1 -$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1 +$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests $* || FAILED=1 if [[ -a $AMQP_LIB ]] ; then - $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 + $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests $* || FAILED=1 fi stop_broker if [[ $FAILED -eq 1 ]]; then diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 96f1596890..1c4c117e4b 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -20,14 +20,14 @@ absdir() { echo `cd $1 && pwd`; } # Environment variables substituted by cmake. -srcdir=`absdir @abs_srcdir@` -builddir=`absdir @abs_builddir@` -top_srcdir=`absdir @abs_top_srcdir@` -top_builddir=`absdir @abs_top_builddir@` -moduledir=$top_builddir/src@builddir_lib_suffix@ -pythonswigdir=$top_builddir/bindings/qpid/python/ -pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@ -testmoduledir=$builddir@builddir_lib_suffix@ +export srcdir=`absdir @abs_srcdir@` +export builddir=`absdir @abs_builddir@` +export top_srcdir=`absdir @abs_top_srcdir@` +export top_builddir=`absdir @abs_top_builddir@` +export moduledir=$top_builddir/src@builddir_lib_suffix@ +export pythonswigdir=$top_builddir/bindings/qpid/python/ +export pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@ +export testmoduledir=$builddir@builddir_lib_suffix@ export QPID_INSTALL_PREFIX=@prefix@ # Tools substituted by cmake diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index ee04dddd6a..14aee7b648 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -223,27 +223,18 @@ class TestStore : public NullMessageStore { const boost::intrusive_ptr<PersistableMessage>& pmsg, const PersistableQueue& queue) { - qpid::broker::amqp_0_10::MessageTransfer* msg = - dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); - assert(msg); - ostringstream o; - o << "<enqueue " << queue.getName() << " " << getContent(msg); + string data = getContent(pmsg); + o << "<enqueue " << queue.getName() << " " << data; if (tx) o << " tx=" << getId(*tx); o << ">"; log(o.str()); // Dump the message if there is a dump file. if (dump.get()) { - msg->getFrames().getMethod()->print(*dump); - *dump << endl << " "; - msg->getFrames().getHeaders()->print(*dump); - *dump << endl << " "; - *dump << msg->getFrames().getContentSize() << endl; + *dump << "Message(" << data.size() << "): " << data << endl; } string logPrefix = "TestStore "+name+": "; - // Check the message for special instructions for this store. - string data = msg->getFrames().getContent(); Action action(data); bool doComplete = true; if (action.index && action.executeIn(name)) { @@ -258,7 +249,7 @@ class TestStore : public NullMessageStore { QPID_LOG(error, logPrefix << "async-id needs argument: " << data); break; } - asyncIds[action.args[0]] = msg; + asyncIds[action.args[0]] = pmsg; QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]); doComplete = false; break; @@ -284,7 +275,7 @@ class TestStore : public NullMessageStore { QPID_LOG(error, logPrefix << "unknown action: " << data); } } - if (doComplete) msg->enqueueComplete(); + if (doComplete) pmsg->enqueueComplete(); } void dequeue(TransactionContext* tx, diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index b0ce5d9009..fbec7ccc7d 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -89,7 +89,7 @@ class Client: self.password = password self.locale = locale self.tune_params = tune_params - self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties) + self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties, version_property_key="version") self.sasl_options = sasl_options self.socket = connect(self.host, self.port, connection_options) self.conn = Connection(self.socket, self.spec) diff --git a/qpid/python/qpid/tests/util.py b/qpid/python/qpid/tests/util.py index 9777443720..4e901218c2 100644 --- a/qpid/python/qpid/tests/util.py +++ b/qpid/python/qpid/tests/util.py @@ -21,26 +21,32 @@ from qpid.util import get_client_properties_with_defaults class UtilTest (TestCase): - def test_get_spec_recommended_client_properties(self): - client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"}) + def test_default_client_properties_08091(self): + client_properties = get_client_properties_with_defaults(version_property_key="version") self.assertTrue("product" in client_properties) self.assertTrue("version" in client_properties) self.assertTrue("platform" in client_properties) - def test_get_client_properties_with_provided_value(self): + def test_default_client_properties_010(self): + client_properties = get_client_properties_with_defaults(version_property_key="qpid.client_version") + self.assertTrue("product" in client_properties) + self.assertTrue("qpid.client_version" in client_properties) + self.assertTrue("platform" in client_properties) + + def test_client_properties_with_provided_value(self): client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"}) self.assertTrue("product" in client_properties) self.assertTrue("mykey" in client_properties) self.assertEqual("myvalue", client_properties["mykey"]) - def test_get_client_properties_with_no_provided_values(self): + def test_client_properties_with_provided_value_that_overrides_default(self): + client_properties = get_client_properties_with_defaults(provided_client_properties={"product":"myproduct"}) + self.assertEqual("myproduct", client_properties["product"]) + + def test_client_properties_with_no_provided_values(self): client_properties = get_client_properties_with_defaults(provided_client_properties=None) self.assertTrue("product" in client_properties) client_properties = get_client_properties_with_defaults() self.assertTrue("product" in client_properties) - def test_get_client_properties_with_provided_value_that_overrides_default(self): - client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"}) - self.assertEqual("myversion", client_properties["version"]) - diff --git a/qpid/python/qpid/util.py b/qpid/python/qpid/util.py index 37d999b771..b17f13e6e6 100644 --- a/qpid/python/qpid/util.py +++ b/qpid/python/qpid/util.py @@ -42,15 +42,24 @@ except ImportError: def close(self): self.sock.close() -def get_client_properties_with_defaults(provided_client_properties={}): +def get_client_properties_with_defaults(provided_client_properties={}, version_property_key="qpid.client_version"): ppid = 0 + version = "unidentified" try: ppid = os.getppid() except: pass + try: + import pkg_resources + pkg = pkg_resources.require("qpid-python") + if pkg and pkg[0] and pkg[0].version: + version = pkg[0].version + except: + pass + client_properties = {"product": "qpid python client", - "version": "development", + version_property_key : version, "platform": os.name, "qpid.client_process": os.path.basename(sys.argv and sys.argv[0] or ''), "qpid.client_pid": os.getpid(), diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py index 5ebbb4c651..b14bb96dc8 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py @@ -23,3 +23,4 @@ from general import * from legacy_exchanges import * from selector import * from translation import * +from tx import * diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py b/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py new file mode 100644 index 0000000000..45817fc64f --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py @@ -0,0 +1,264 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 + +class TxTests(TestBase010): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + session = self.session + + #declare queues and create subscribers in the checking session + #to ensure that the queues are not auto-deleted too early: + self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"]) + session.message_subscribe(queue="tx-commit-a", destination="qa") + session.message_subscribe(queue="tx-commit-b", destination="qb") + session.message_subscribe(queue="tx-commit-c", destination="qc") + + #use a separate session for actual work + session2 = self.conn.session("worker", 2) + self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + session2.tx_commit() + session2.close() + + session.tx_select() + + self.enable_flow("qa") + queue_a = session.incoming("qa") + + self.enable_flow("qb") + queue_b = session.incoming("qb") + + self.enable_flow("qc") + queue_c = session.incoming("qc") + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.body) + session.message_accept(RangedSet(msg.id)) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + session.tx_commit() + + def test_auto_rollback(self): + """ + Test that a session closed with an open transaction is effectively rolled back + """ + session = self.session + self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"]) + session.message_subscribe(queue="tx-autorollback-a", destination="qa") + session.message_subscribe(queue="tx-autorollback-b", destination="qb") + session.message_subscribe(queue="tx-autorollback-c", destination="qc") + + session2 = self.conn.session("worker", 2) + queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session2.close() + + session.tx_select() + + self.enable_flow("qa") + queue_a = session.incoming("qa") + + self.enable_flow("qb") + queue_b = session.incoming("qb") + + self.enable_flow("qc") + queue_c = session.incoming("qc") + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + session.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + session = self.session + queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session.tx_rollback() + + #need to release messages to get them redelivered now: + session.message_release(consumed) + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + session.tx_commit() + + def perform_txn_work(self, session, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + self.declare_queues([name_a, name_b, name_c]) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key) + session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic) + + dp = session.delivery_properties(routing_key=name_a) + for i in range(1, 5): + mp = session.message_properties(message_id="msg%d" % i) + session.message_transfer(message=Message(dp, mp, "Message %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6")) + + dp = session.delivery_properties(routing_key=topic) + mp = session.message_properties(message_id="msg7") + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7")) + + session.tx_select() + + #consume and ack messages + acked = RangedSet() + self.subscribe(session, queue=name_a, destination="sub_a") + queue_a = session.incoming("sub_a") + for i in range(1, 5): + msg = queue_a.get(timeout=1) + acked.add(msg.id) + self.assertEqual("Message %d" % i, msg.body) + + self.subscribe(session, queue=name_b, destination="sub_b") + queue_b = session.incoming("sub_b") + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + acked.add(msg.id) + + sub_c = self.subscribe(session, queue=name_c, destination="sub_c") + queue_c = session.incoming("sub_c") + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + acked.add(msg.id) + + session.message_accept(acked) + + dp = session.delivery_properties(routing_key=topic) + #publish messages + for i in range(1, 5): + mp = session.message_properties(message_id="tx-msg%d" % i) + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="tx-msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6")) + + dp = session.delivery_properties(routing_key=name_a) + mp = session.message_properties(message_id="tx-msg7") + session.message_transfer(message=Message(dp, mp, "TxMessage 7")) + return queue_a, queue_b, queue_c, acked + + def declare_queues(self, names, session=None): + session = session or self.session + for n in names: + session.queue_declare(queue=n, auto_delete=True) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + def enable_flow(self, tag, session=None): + session = session or self.session + session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + def complete(self, session, msg): + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + session.channel.session_completed(session.receiver._completed) |