summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2015-02-27 16:37:06 +0000
committerAlan Conway <aconway@apache.org>2015-02-27 16:37:06 +0000
commit3aaa53e9103b6019c9e31d15186b12a95a1993be (patch)
treef5950c063ff08f574c808023ece7745739ca7027
parent9c9f0e2c935d11c0f8d1ebddf1bbb78c3c22c606 (diff)
downloadqpid-python-3aaa53e9103b6019c9e31d15186b12a95a1993be.tar.gz
QPID-4710: [AMQP 1.0] Support for transactions in qpid::messaging C++ client.
Implements the "transactional retire and settle immediately" option for transactions as specified in AMQP 1.0 in the qpid::messaging C++ client. NOTE: Transactions over AMQP 1.0 require proton 0.9 or greater. With older versions, attempting a transactions over AMQP 1.0 will raise a link-detached exception "Node not found: tx-transaction" 1. Added descriptor list to Variant with support in Encoder and PnData. Required to support transactions, need to be able to create described lists. Variant changes are source and binary compatible. A Variant now has a Variant::List of descripors which can be numeric or string. Nested descriptors are implemented by putting multiple descriptors in the list. Other minor changes: - Variant refactor: don't delete impl on every assignment. - Add Variant constructors that take a string encoding. (new constructors, not defaulted arguments, so the change is binary and source compatible.) - Growable buffer support for Encoder. - Printing described Variant prints descriptors in form @descriptor value 2. Added transaction support to AMQP 1.0 client code Added messaging/amqp/Transaction.h,cpp: transaction logic - communicate with coordinator, send declare/dischange messages. - add tx state info to transfers and acknowledgements. - Sync session after discharge. - A transactional session automatically acks any message retrieved by fetch/get to bring them into the transaction. This is consistent the 0-10 client. Minor fixes to existing client code: - Fix use of pn_drain API in C++ client to work with C++ and Java brokers. - Make amqp::Exception derive from qpid::Exception 3. Fixes to existing broker code: - Incoming.cpp fix: start async completion before processing message. - Delay accept of dischage message till commit is complete. - newSession - handle failover during session creation. 4. Added tests interop_tests.py: transaction tests that can run against an external broker, see comments. ha_tests.py: Enable transaction tests over AMQP 1.0. Minor test fixes: - brokertest.py don't set default logging if QPID_LOG env vars set. - brokertest.py Pass kwargs to broker() create function. - qpid-receive: capacity should never be larger than message count. - Accept user:pass as well as user/pass in Url. - brokertest.py: Always do a ready() check on all brokers. If proton < 0.9 is used, transaction tests will be skipped or will downgrade to the amqp0-10 protocol with a printed warning. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1662743 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/types/Variant.h45
-rw-r--r--qpid/cpp/src/amqp.cmake2
-rw-r--r--qpid/cpp/src/qpid/Url.cpp6
-rw-r--r--qpid/cpp/src/qpid/amqp/CharSequence.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp/Descriptor.cpp97
-rw-r--r--qpid/cpp/src/qpid/amqp/Descriptor.h2
-rw-r--r--qpid/cpp/src/qpid/amqp/Encoder.cpp37
-rw-r--r--qpid/cpp/src/qpid/amqp/Encoder.h25
-rw-r--r--qpid/cpp/src/qpid/amqp/descriptors.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Exception.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp120
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h17
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp240
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h33
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/PnData.cpp84
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/PnData.h29
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp14
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp61
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h54
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp69
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h11
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp155
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Transaction.h95
-rw-r--r--qpid/cpp/src/qpid/types/Variant.cpp266
-rw-r--r--qpid/cpp/src/qpid/types/encodings.h2
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt5
-rw-r--r--qpid/cpp/src/tests/Variant.cpp24
-rw-r--r--qpid/cpp/src/tests/brokertest.py45
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py21
-rwxr-xr-xqpid/cpp/src/tests/interop_tests.py220
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp9
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp21
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp5
-rwxr-xr-xqpid/cpp/src/tests/swig_python_tests7
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in16
-rw-r--r--qpid/cpp/src/tests/test_store.cpp19
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py1
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/tx.py264
46 files changed, 1712 insertions, 448 deletions
diff --git a/qpid/cpp/include/qpid/types/Variant.h b/qpid/cpp/include/qpid/types/Variant.h
index faba5fe9a4..843870e438 100644
--- a/qpid/cpp/include/qpid/types/Variant.h
+++ b/qpid/cpp/include/qpid/types/Variant.h
@@ -89,7 +89,9 @@ class QPID_TYPES_CLASS_EXTERN Variant
QPID_TYPES_EXTERN Variant(float);
QPID_TYPES_EXTERN Variant(double);
QPID_TYPES_EXTERN Variant(const std::string&);
+ QPID_TYPES_EXTERN Variant(const std::string& value, const std::string& encoding);
QPID_TYPES_EXTERN Variant(const char*);
+ QPID_TYPES_EXTERN Variant(const char* value, const char* encoding);
QPID_TYPES_EXTERN Variant(const Map&);
QPID_TYPES_EXTERN Variant(const List&);
QPID_TYPES_EXTERN Variant(const Variant&);
@@ -156,9 +158,10 @@ class QPID_TYPES_CLASS_EXTERN Variant
QPID_TYPES_EXTERN Map& asMap();
QPID_TYPES_EXTERN const List& asList() const;
QPID_TYPES_EXTERN List& asList();
+
/**
- * Unlike asString(), getString() will not do any conversions and
- * will throw InvalidConversion if the type is not STRING.
+ * Unlike asString(), getString() will not do any conversions.
+ * @exception InvalidConversion if the type is not STRING.
*/
QPID_TYPES_EXTERN const std::string& getString() const;
QPID_TYPES_EXTERN std::string& getString();
@@ -168,9 +171,45 @@ class QPID_TYPES_CLASS_EXTERN Variant
QPID_TYPES_EXTERN bool isEqualTo(const Variant& a) const;
+ /** Reset value to VOID, does not reset the descriptors. */
QPID_TYPES_EXTERN void reset();
+
+ /** True if there is at least one descriptor associated with this variant. */
+ QPID_TYPES_EXTERN bool isDescribed() const;
+
+ /** Get the first descriptor associated with this variant.
+ *
+ * Normally there is at most one descriptor, when there are multiple
+ * descriptors use getDescriptors()
+ *
+ *@return The first descriptor or VOID if there is no descriptor.
+ *@see isDescribed, getDescriptors
+ */
+ QPID_TYPES_EXTERN Variant getDescriptor() const;
+
+ /** Set a single descriptor for this Variant. The descriptor must be a string or integer. */
+ QPID_TYPES_EXTERN void setDescriptor(const Variant& descriptor);
+
+ /** Return a modifiable list of descriptors for this Variant.
+ * Used in case where there are multiple descriptors, for a single descriptor use
+ * getDescriptor and setDescriptor.
+ */
+ QPID_TYPES_EXTERN List& getDescriptors();
+
+ /** Return the list of descriptors for this Variant.
+ * Used in case where there are multiple descriptors, for a single descriptor use
+ * getDescriptor and setDescriptor.
+ */
+ QPID_TYPES_EXTERN const List& getDescriptors() const;
+
+ /** Create a described value */
+ QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, const Variant& value);
+
+ /** Create a described list, a common special case */
+ QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, const List& value);
+
private:
- VariantImpl* impl;
+ mutable VariantImpl* impl;
};
#ifndef SWIG
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index b2ff10bd68..044267afbc 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -144,6 +144,8 @@ if (BUILD_AMQP)
qpid/messaging/amqp/SessionHandle.cpp
qpid/messaging/amqp/TcpTransport.h
qpid/messaging/amqp/TcpTransport.cpp
+ qpid/messaging/amqp/Transaction.h
+ qpid/messaging/amqp/Transaction.cpp
)
if (WIN32)
diff --git a/qpid/cpp/src/qpid/Url.cpp b/qpid/cpp/src/qpid/Url.cpp
index 21de32aaa3..1780a07f92 100644
--- a/qpid/cpp/src/qpid/Url.cpp
+++ b/qpid/cpp/src/qpid/Url.cpp
@@ -113,8 +113,10 @@ class UrlParser {
const char* at = std::find(i, end, '@');
if (at == end) return false;
const char* slash = std::find(i, at, '/');
- url.setUser(string(i, slash));
- const char* pass = (slash == at) ? slash : slash+1;
+ const char* colon = std::find(i, at, ':');
+ const char* sep = std::min(slash, colon);
+ url.setUser(string(i, sep));
+ const char* pass = (sep == at) ? sep : sep+1;
url.setPass(string(pass, at));
i = at+1;
return true;
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp
index 7e433bd26e..ad5b0ec84c 100644
--- a/qpid/cpp/src/qpid/amqp/CharSequence.cpp
+++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp
@@ -35,7 +35,7 @@ CharSequence::operator bool() const
}
std::string CharSequence::str() const
{
- return std::string(data, size);
+ return (data && size) ? std::string(data, size) : std::string();
}
CharSequence CharSequence::create()
diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.cpp b/qpid/cpp/src/qpid/amqp/Descriptor.cpp
index 9e33294edd..43d388ee76 100644
--- a/qpid/cpp/src/qpid/amqp/Descriptor.cpp
+++ b/qpid/cpp/src/qpid/amqp/Descriptor.cpp
@@ -19,11 +19,17 @@
*
*/
#include "Descriptor.h"
+#include "descriptors.h"
+#include <qpid/framing/reply_exceptions.h>
+#include <map>
namespace qpid {
namespace amqp {
+
Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; }
+
Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { value.symbol = symbol; }
+
bool Descriptor::match(const std::string& symbol, uint64_t code) const
{
switch (type) {
@@ -58,20 +64,85 @@ Descriptor* Descriptor::nest(const Descriptor& d)
return nested.get();
}
-std::ostream& operator<<(std::ostream& os, const Descriptor& d)
-{
- switch (d.type) {
- case Descriptor::SYMBOLIC:
- if (d.value.symbol.data && d.value.symbol.size) os << std::string(d.value.symbol.data, d.value.symbol.size);
- else os << "null";
- break;
- case Descriptor::NUMERIC:
- os << "0x" << std::hex << d.value.code;
- break;
+namespace {
+
+class DescriptorMap {
+ typedef std::map<uint64_t, std::string> SymbolMap;
+ typedef std::map<std::string, uint64_t> CodeMap;
+
+ SymbolMap symbols;
+ CodeMap codes;
+
+ public:
+ DescriptorMap() {
+ symbols[message::HEADER_CODE] = message::HEADER_SYMBOL;
+ symbols[message::DELIVERY_ANNOTATIONS_CODE] = message::DELIVERY_ANNOTATIONS_SYMBOL;
+ symbols[message::MESSAGE_ANNOTATIONS_CODE] = message::MESSAGE_ANNOTATIONS_SYMBOL;
+ symbols[message::PROPERTIES_CODE] = message::PROPERTIES_SYMBOL;
+ symbols[message::APPLICATION_PROPERTIES_CODE] = message::APPLICATION_PROPERTIES_SYMBOL;
+ symbols[message::DATA_CODE] = message::DATA_SYMBOL;
+ symbols[message::AMQP_SEQUENCE_CODE] = message::AMQP_SEQUENCE_SYMBOL;
+ symbols[message::AMQP_VALUE_CODE] = message::AMQP_VALUE_SYMBOL;
+ symbols[message::FOOTER_CODE] = message::FOOTER_SYMBOL;
+ symbols[message::ACCEPTED_CODE] = message::ACCEPTED_SYMBOL;
+ symbols[sasl::SASL_MECHANISMS_CODE] = sasl::SASL_MECHANISMS_SYMBOL;
+ symbols[sasl::SASL_INIT_CODE] = sasl::SASL_INIT_SYMBOL;
+ symbols[sasl::SASL_CHALLENGE_CODE] = sasl::SASL_CHALLENGE_SYMBOL;
+ symbols[sasl::SASL_RESPONSE_CODE] = sasl::SASL_RESPONSE_SYMBOL;
+ symbols[sasl::SASL_OUTCOME_CODE] = sasl::SASL_OUTCOME_SYMBOL;
+ symbols[filters::LEGACY_DIRECT_FILTER_CODE] = filters::LEGACY_DIRECT_FILTER_SYMBOL;
+ symbols[filters::LEGACY_TOPIC_FILTER_CODE] = filters::LEGACY_TOPIC_FILTER_SYMBOL;
+ symbols[filters::LEGACY_HEADERS_FILTER_CODE] = filters::LEGACY_HEADERS_FILTER_SYMBOL;
+ symbols[filters::SELECTOR_FILTER_CODE] = filters::SELECTOR_FILTER_SYMBOL;
+ symbols[filters::XQUERY_FILTER_CODE] = filters::XQUERY_FILTER_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_CLOSE_CODE] = lifetime_policy::DELETE_ON_CLOSE_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_NO_LINKS_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_NO_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL;
+ symbols[transaction::DECLARE_CODE] = transaction::DECLARE_SYMBOL;
+ symbols[transaction::DISCHARGE_CODE] = transaction::DISCHARGE_SYMBOL;
+ symbols[transaction::DECLARED_CODE] = transaction::DECLARED_SYMBOL;
+ symbols[transaction::TRANSACTIONAL_STATE_CODE] = transaction::TRANSACTIONAL_STATE_SYMBOL;
+ symbols[0] = "unknown-descriptor";
+
+ for (SymbolMap::const_iterator i = symbols.begin(); i != symbols.end(); ++i)
+ codes[i->second] = i->first;
+ }
+
+ std::string operator[](uint64_t code) const {
+ SymbolMap::const_iterator i = symbols.find(code);
+ return (i == symbols.end()) ? "unknown-descriptor" : i->second;
}
- if (d.nested.get()) {
- os << " ->(" << *d.nested << ")";
+
+ uint64_t operator[](const std::string& symbol) const {
+ CodeMap::const_iterator i = codes.find(symbol);
+ return (i == codes.end()) ? 0 : i->second;
+ }
+};
+
+DescriptorMap DESCRIPTOR_MAP;
+}
+
+std::string Descriptor::symbol() const {
+ switch (type) {
+ case Descriptor::NUMERIC: return DESCRIPTOR_MAP[value.code];
+ case Descriptor::SYMBOLIC: return value.symbol.str();
+ }
+ assert(0);
+ return std::string();
+}
+
+uint64_t Descriptor::code() const {
+ switch (type) {
+ case Descriptor::NUMERIC: return value.code;
+ case Descriptor::SYMBOLIC: return DESCRIPTOR_MAP[value.symbol.str()];
}
- return os;
+ assert(0);
+ return 0;
}
+
+std::ostream& operator<<(std::ostream& os, const Descriptor& d) {
+ return os << d.symbol() << "(" << "0x" << std::hex << d.code() << ")";
+}
+
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.h b/qpid/cpp/src/qpid/amqp/Descriptor.h
index 6b0cb80e87..3726114769 100644
--- a/qpid/cpp/src/qpid/amqp/Descriptor.h
+++ b/qpid/cpp/src/qpid/amqp/Descriptor.h
@@ -49,6 +49,8 @@ struct Descriptor
QPID_COMMON_EXTERN bool match(const std::string&, uint64_t) const;
QPID_COMMON_EXTERN size_t getSize() const;
QPID_COMMON_EXTERN Descriptor* nest(const Descriptor& d);
+ QPID_COMMON_EXTERN std::string symbol() const;
+ QPID_COMMON_EXTERN uint64_t code() const;
};
QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& os, const Descriptor& d);
diff --git a/qpid/cpp/src/qpid/amqp/Encoder.cpp b/qpid/cpp/src/qpid/amqp/Encoder.cpp
index 0760fc166d..86b59fb1a2 100644
--- a/qpid/cpp/src/qpid/amqp/Encoder.cpp
+++ b/qpid/cpp/src/qpid/amqp/Encoder.cpp
@@ -31,10 +31,17 @@
#include <string.h>
using namespace qpid::types::encodings;
+using qpid::types::Variant;
namespace qpid {
namespace amqp {
+Encoder::Overflow::Overflow() : Exception("Buffer overflow in encoder!") {}
+
+Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0), grow(false) {}
+
+Encoder::Encoder() : data(0), size(0), position(0), grow(true) {}
+
namespace {
template <typename T> size_t encode(char* data, T i);
template <> size_t encode<uint8_t>(char* data, uint8_t i)
@@ -406,6 +413,18 @@ void Encoder::writeList(const std::list<qpid::types::Variant>& value, const Desc
void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* d)
{
+ if (d) {
+ writeDescriptor(*d); // Write this descriptor before any in the value.
+ d = 0;
+ }
+ // Write any descriptors attached to the value.
+ const Variant::List& descriptors = value.getDescriptors();
+ for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) {
+ if (i->getType() == types::VAR_STRING)
+ writeDescriptor(Descriptor(CharSequence::create(i->asString())));
+ else
+ writeDescriptor(Descriptor(i->asUint64()));
+ }
switch (value.getType()) {
case qpid::types::VAR_VOID:
writeNull(d);
@@ -477,18 +496,28 @@ void Encoder::writeDescriptor(const Descriptor& d)
break;
}
}
+
void Encoder::check(size_t s)
{
if (position + s > size) {
- QPID_LOG(notice, "Buffer overflow for write of size " << s << " to buffer of size " << size << " at position " << position);
- assert(false);
- throw qpid::Exception("Buffer overflow in encoder!");
+ if (grow) {
+ buffer.resize(buffer.size() + s);
+ data = const_cast<char*>(buffer.data());
+ size = buffer.size();
+ }
+ else {
+ QPID_LOG(notice, "Buffer overflow for write of size " << s
+ << " to buffer of size " << size << " at position " << position);
+ assert(false);
+ throw Overflow();
+ }
}
}
-Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0) {}
+
size_t Encoder::getPosition() { return position; }
size_t Encoder::getSize() const { return size; }
char* Encoder::getData() { return data + position; }
+std::string Encoder::getBuffer() { return buffer; }
void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; }
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/Encoder.h b/qpid/cpp/src/qpid/amqp/Encoder.h
index 4f7c1d1489..8729f29b94 100644
--- a/qpid/cpp/src/qpid/amqp/Encoder.h
+++ b/qpid/cpp/src/qpid/amqp/Encoder.h
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/IntegerTypes.h"
#include "qpid/amqp/Constructor.h"
+#include "qpid/Exception.h"
#include <list>
#include <map>
#include <stddef.h>
@@ -43,6 +44,18 @@ struct Descriptor;
class Encoder
{
public:
+ struct Overflow : public Exception { Overflow(); };
+
+ /** Create an encoder that writes into the buffer at data up to size bytes.
+ * Write operations throw Overflow if encoding exceeds size bytes.
+ */
+ QPID_COMMON_EXTERN Encoder(char* data, size_t size);
+
+ /** Create an encoder that manages its own buffer. Buffer grows to accomodate
+ * all encoded data. Call getBuffer() to get the buffer.
+ */
+ QPID_COMMON_EXTERN Encoder();
+
void writeCode(uint8_t);
void write(bool);
@@ -100,19 +113,27 @@ class Encoder
QPID_COMMON_EXTERN void writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d=0, bool large=true);
void writeDescriptor(const Descriptor&);
- QPID_COMMON_EXTERN Encoder(char* data, size_t size);
QPID_COMMON_EXTERN size_t getPosition();
void resetPosition(size_t p);
char* skip(size_t);
void writeBytes(const char* bytes, size_t count);
virtual ~Encoder() {}
+
+ /** Return the total size of the buffer. */
size_t getSize() const;
- protected:
+
+ /** Return the growable buffer. */
+ std::string getBuffer();
+
+ /** Return the unused portion of the buffer. */
char* getData();
+
private:
char* data;
size_t size;
size_t position;
+ bool grow;
+ std::string buffer;
void write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d);
void write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d);
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h
index a9ee12644a..29c626edc2 100644
--- a/qpid/cpp/src/qpid/amqp/descriptors.h
+++ b/qpid/cpp/src/qpid/amqp/descriptors.h
@@ -26,6 +26,9 @@
namespace qpid {
namespace amqp {
+// NOTE: If you add descriptor symbols and codes here, you must also update the DescriptorMap
+// constructor in Descriptor.cpp.
+
namespace message {
const std::string HEADER_SYMBOL("amqp:header:list");
const std::string PROPERTIES_SYMBOL("amqp:properties:list");
@@ -36,6 +39,7 @@ const std::string AMQP_SEQUENCE_SYMBOL("amqp:amqp-sequence:list");
const std::string AMQP_VALUE_SYMBOL("amqp:amqp-value:*");
const std::string DATA_SYMBOL("amqp:data:binary");
const std::string FOOTER_SYMBOL("amqp:footer:map");
+const std::string ACCEPTED_SYMBOL("amqp:accepted:list");
const uint64_t HEADER_CODE(0x70);
const uint64_t DELIVERY_ANNOTATIONS_CODE(0x71);
@@ -46,6 +50,7 @@ const uint64_t DATA_CODE(0x75);
const uint64_t AMQP_SEQUENCE_CODE(0x76);
const uint64_t AMQP_VALUE_CODE(0x77);
const uint64_t FOOTER_CODE(0x78);
+const uint64_t ACCEPTED_CODE(0x24);
const Descriptor HEADER(HEADER_CODE);
const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b1f7d0524b..4dd6455104 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -297,6 +297,8 @@ void Queue::deliverTo(Message msg, TxBuffer* txn)
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
+ QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name
+ << " enlisted in " << txn);
} else {
if (enqueue(0, msg)) {
push(msg);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Exception.h b/qpid/cpp/src/qpid/broker/amqp/Exception.h
index c2fe470e55..a129dffe1f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Exception.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Exception.h
@@ -22,6 +22,7 @@
*
*/
#include <string>
+#include <qpid/Exception.h>
namespace qpid {
namespace broker {
@@ -29,7 +30,7 @@ namespace amqp {
/**
* Exception to signal various AMQP 1.0 defined conditions
*/
-class Exception : public std::exception
+class Exception : public qpid::Exception
{
public:
Exception(const std::string& name, const std::string& description);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index d4f73fc511..3986818846 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -100,6 +100,7 @@ namespace {
boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session));
return copy;
}
+
private:
pn_delivery_t* delivery;
boost::shared_ptr<Session> session;
@@ -146,8 +147,8 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>
{
qpid::broker::Message message(received, received);
userid.verify(message.getUserId());
- handle(message, session.getTransaction(delivery));
received->begin();
+ handle(message, session.getTransaction(delivery));
Transfer t(delivery, sessionPtr);
received->end(t);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 0136d5a0ed..f2949c5879 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
@@ -90,13 +91,13 @@ bool OutgoingFromQueue::doWork()
return true;
} else {
pn_link_drained(link);
- QPID_LOG(debug, "No message available on " << queue->getName());
+ QPID_LOG(trace, "No message available on " << queue->getName());
}
} catch (const qpid::framing::ResourceDeletedException& e) {
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what());
}
} else {
- QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
+ QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
}
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 0e44374d19..3b65e6a64d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -61,6 +61,8 @@ namespace qpid {
namespace broker {
namespace amqp {
+using namespace qpid::amqp::transaction;
+
namespace {
pn_bytes_t convert(const std::string& s)
{
@@ -209,6 +211,7 @@ class IncomingToCoordinator : public DecodingIncoming
public:
IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent)
: DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {}
+
~IncomingToCoordinator() { session.abort(); }
void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*);
void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {}
@@ -218,7 +221,9 @@ class IncomingToCoordinator : public DecodingIncoming
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
authorise(connection.getUserId(), connection.getBroker().getAcl()),
- detachRequested(), txnId((boost::format("%1%") % s).str()) {}
+ detachRequested(),
+ tx(*this)
+{}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -636,11 +641,12 @@ void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
if (target->second->haveWork()) out.activateOutput();
}
}
+
void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
{
OutgoingLinks::iterator sender = outgoing.find(link);
if (sender == outgoing.end()) {
- QPID_LOG(error, "Delivery returned for unknown link");
+ QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link));
} else {
sender->second->handle(delivery);
}
@@ -649,7 +655,7 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
bool Session::dispatch()
{
bool output(false);
- if (commitPending.boolCompareAndSwap(true, false)) {
+ if (tx.commitPending.boolCompareAndSwap(true, false)) {
committed(true);
}
for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
@@ -737,7 +743,7 @@ void Session::detachedByManagement()
TxBuffer* Session::getTransaction(const std::string& id)
{
- return (txn.get() && id == txnId) ? txn.get() : 0;
+ return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0;
}
TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
@@ -748,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery)
{
std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0);
- if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) {
+ if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) {
pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery));
- if (data && pn_data_next(data)) {
- size_t count = pn_data_get_list(data);
- if (count > 0) {
+ pn_data_rewind(data);
+ size_t count = 0;
+ if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ std::string id = convert(pn_data_get_binary(data));
+ result.first = getTransaction(id);
+ if (!result.first) {
+ QPID_LOG(error, "Transaction not found for id: " << id);
+ }
+ if (count > 1 && pn_data_next(data)) {
pn_data_enter(data);
pn_data_next(data);
- std::string id = convert(pn_data_get_binary(data));
- result.first = getTransaction(id);
- if (!result.first) {
- QPID_LOG(error, "Transaction not found for id: " << id);
- }
- if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) {
- pn_data_enter(data);
- pn_data_next(data);
- result.second = pn_data_get_ulong(data);
- }
- pn_data_exit(data);
+ result.second = pn_data_get_ulong(data);
}
- } else {
- QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
}
+ else
+ QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
}
return result;
}
std::string Session::declare()
{
- if (txn.get()) {
+ if (tx.buffer.get()) {
//not sure what the error code should be; none in spec really fit well.
- throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time");
+ throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK,
+ "Session only supports one transaction active at a time");
}
- txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
- connection.getBroker().getBrokerObservers().startTx(txn);
+ tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
+ connection.getBroker().getBrokerObservers().startTx(tx.buffer);
txStarted();
- return txnId;
+ return tx.id;
}
namespace {
@@ -797,32 +802,41 @@ namespace {
boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session));
return copy;
}
+
private:
boost::shared_ptr<Session> session;
};
}
-void Session::discharge(const std::string& id, bool failed)
+void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery)
{
- if (!txn.get() || id != txnId) {
- throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id");
+ QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit")
+ << " transaction " << id);
+ if (!tx.buffer.get() || id != tx.id) {
+ throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID,
+ Msg() << "Cannot discharge transaction " << id
+ << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id :
+ Msg() << ", no current transaction"));
}
+ tx.discharge = delivery;
if (failed) {
abort();
} else {
- txn->begin();
- txn->startCommit(&connection.getBroker().getStore());
+ tx.buffer->begin();
+ tx.buffer->startCommit(&connection.getBroker().getStore());
AsyncCommit callback(shared_from_this());
- txn->end(callback);
+ tx.buffer->end(callback);
}
}
void Session::abort()
{
- if (txn) {
- txn->rollback();
+ if (tx.buffer) {
+ tx.dischargeComplete();
+ tx.buffer->rollback();
txAborted();
- txn = boost::intrusive_ptr<TxBuffer>();
+ tx.buffer.reset();
+ QPID_LOG(debug, "Transaction " << tx.id << " rolled back");
}
}
@@ -830,16 +844,18 @@ void Session::committed(bool sync)
{
if (sync) {
//this is on IO thread
- if (txn.get()) {
- txn->endCommit(&connection.getBroker().getStore());
+ tx.dischargeComplete();
+ if (tx.buffer.get()) {
+ tx.buffer->endCommit(&connection.getBroker().getStore());
txCommitted();
- txn = boost::intrusive_ptr<TxBuffer>();
+ tx.buffer.reset();
+ QPID_LOG(debug, "Transaction " << tx.id << " comitted");
} else {
throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit");
}
} else {
//this is not on IO thread, need to delay processing until on IO thread
- if (commitPending.boolCompareAndSwap(false, true)) {
+ if (tx.commitPending.boolCompareAndSwap(false, true)) {
qpid::sys::Mutex::ScopedLock l(lock);
if (!deleted) {
out.activateOutput();
@@ -880,7 +896,7 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes
{
if (message && message->isTypedBody()) {
QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody());
- if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) {
+ if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) {
std::string id = session.declare();
//encode the txn id in a 'declared' list on the disposition
pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery));
@@ -889,22 +905,38 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes
pn_data_put_binary(data, convert(id));
pn_data_exit(data);
pn_data_exit(data);
- pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE);
+ pn_delivery_update(delivery, DECLARED_CODE);
pn_delivery_settle(delivery);
session.incomingMessageAccepted();
- } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) {
+ QPID_LOG(debug, "Coordinator declared transaction " << id);
+ } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) {
if (message->getTypedBody().getType() == qpid::types::VAR_LIST) {
qpid::types::Variant::List args = message->getTypedBody().asList();
qpid::types::Variant::List::const_iterator i = args.begin();
if (i != args.end()) {
std::string id = *i;
bool failed = ++i != args.end() ? i->asBool() : false;
- session.discharge(id, failed);
- DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of
+ session.discharge(id, failed, delivery);
}
+
+ } else {
+ throw framing::IllegalArgumentException(
+ Msg() << "Coordinator unknown message: @" <<
+ message->getBodyDescriptor() << " " << message->getTypedBody());
}
}
}
}
+Session::Transaction::Transaction(Session& s) :
+ session(s), id((boost::format("%1%") % &s).str()), discharge(0) {}
+
+// Called in IO thread to signal completion of dischage by settling discharge message.
+void Session::Transaction::dischargeComplete() {
+ if (buffer.get() && discharge) {
+ session.accepted(discharge, false); // Queue up accept and activate output.
+ discharge = 0;
+ }
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 591af1175f..ea3fb82beb 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -91,7 +91,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*);
//transaction coordination:
std::string declare();
- void discharge(const std::string& id, bool failed);
+ void discharge(const std::string& id, bool failed, pn_delivery_t*);
void abort();
protected:
void detachedByManagement();
@@ -109,9 +109,18 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
Authorise authorise;
bool detachRequested;
- boost::intrusive_ptr<TxBuffer> txn;
- std::string txnId;
- qpid::sys::AtomicValue<bool> commitPending;
+
+ struct Transaction {
+ Transaction(Session&);
+ void dischargeComplete();
+
+ Session& session;
+ boost::intrusive_ptr<TxBuffer> buffer;
+ std::string id;
+ qpid::sys::AtomicValue<bool> commitPending;
+ pn_delivery_t* discharge;
+ };
+ Transaction tx;
struct ResolvedNode
{
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index 3ee3f1cd40..77d43f191d 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -205,7 +205,7 @@ void ConnectionHandler::fail(const std::string& message)
{
errorCode = CLOSE_CODE_FRAMING_ERROR;
errorText = message;
- QPID_LOG(warning, message);
+ QPID_LOG(debug, message);
setState(FAILED);
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 7f19ca7ec0..2106e21686 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -510,9 +510,9 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
requested.erase(j->first);
}
} else if (key == AUTO_DELETE) {
- PnData(data).read(v);
+ PnData(data).get(v);
isAutoDeleted = v.asBool();
- } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) {
+ } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) {
requested.erase(j->first);
}
}
@@ -646,7 +646,7 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod
} else {
pn_data_put_ulong(filter, i->descriptorCode);
}
- PnData(filter).write(i->value);
+ PnData(filter).put(i->value);
pn_data_exit(filter);
}
pn_data_exit(filter);
@@ -733,7 +733,7 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
} else {
pn_data_put_symbol(data, convert(i->first));
- PnData(data).write(i->second);
+ PnData(data).put(i->second);
}
}
pn_data_exit(data);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index c974f6c5c1..d4a7b60e3c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -25,8 +25,11 @@
#include "Sasl.h"
#include "SenderContext.h"
#include "SessionContext.h"
+#include "Transaction.h"
#include "Transport.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/Encoder.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
@@ -43,6 +46,7 @@
#include "qpid/sys/urlAdd.h"
#include "config.h"
#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
#include <vector>
extern "C" {
#include <proton/engine.h>
@@ -157,14 +161,17 @@ ConnectionContext::~ConnectionContext()
bool ConnectionContext::isOpen() const
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- //wait for outstanding sends to settle
+ sys::Monitor::ScopedLock l(lock);
+ syncLH(ssn, l);
+}
+
+void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) {
while (!ssn->settled()) {
QPID_LOG(debug, "Waiting for sends to settle on sync()");
wait(ssn);//wait until message has been confirmed
@@ -175,18 +182,13 @@ void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn)
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
//explicitly release messages that have yet to be fetched
for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) {
drain_and_release_messages(ssn, i->second);
}
- //wait for outstanding sends to settle
- while (!ssn->settled()) {
- QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait(ssn);//wait until message has been confirmed
- wakeupDriver();
- }
+ syncLH(ssn, l);
}
if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
@@ -199,17 +201,11 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
void ConnectionContext::close()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- //wait for outstanding sends to settle
- while (!i->second->settled()) {
- QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait(i->second);//wait until message has been confirmed
- }
-
-
+ syncLH(i->second, l);
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
@@ -246,7 +242,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
*/
qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching);
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn, lnk);
if (!lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
@@ -257,10 +253,10 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
return true;
} else {
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
- while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait(ssn, lnk);
}
@@ -269,7 +265,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
}
}
if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
@@ -296,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
{
qpid::sys::AbsTime until(convert(timeout));
while (true) {
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn, lnk);
pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
@@ -320,6 +316,9 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
haveOutput = true;
}
}
+ // Automatically ack messages if we are in a transaction.
+ if (ssn->transaction)
+ acknowledgeLH(ssn, &message, false, l);
return true;
} else if (until > qpid::sys::now()) {
waitUntil(ssn, lnk, until);
@@ -334,7 +333,7 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared
{
qpid::sys::AbsTime until(convert(timeout));
while (true) {
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn);
boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
if (r) {
@@ -347,9 +346,13 @@ boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared
}
}
-void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
+void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) {
+ sys::Monitor::ScopedLock l(lock);
+ acknowledgeLH(ssn, message, cumulative, l);
+}
+
+void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
if (message) {
ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
@@ -361,7 +364,7 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid:
void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
checkClosed(ssn);
ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
wakeupDriver();
@@ -369,7 +372,7 @@ void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messag
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) {
lnk->close();
}
@@ -401,7 +404,7 @@ void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionCont
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
drain_and_release_messages(ssn, lnk);
if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
lnk->close();
@@ -415,7 +418,7 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->sender);
checkClosed(ssn, lnk);
@@ -425,7 +428,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
lnk->configure();
attach(ssn, lnk->receiver, lnk->capacity);
checkClosed(ssn, lnk);
@@ -445,11 +448,26 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t*
}
}
-void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+void ConnectionContext::send(
+ boost::shared_ptr<SessionContext> ssn,
+ boost::shared_ptr<SenderContext> snd,
+ const qpid::messaging::Message& message,
+ bool sync,
+ SenderContext::Delivery** delivery)
+{
+ sys::Monitor::ScopedLock l(lock);
+ sendLH(ssn, snd, message, sync, delivery, l);
+}
+
+void ConnectionContext::sendLH(
+ boost::shared_ptr<SessionContext> ssn,
+ boost::shared_ptr<SenderContext> snd,
+ const qpid::messaging::Message& message,
+ bool sync,
+ SenderContext::Delivery** delivery,
+ sys::Monitor::ScopedLock&)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
- SenderContext::Delivery* delivery(0);
while (pn_transport_pending(engine) > 65536) {
QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written...");
notifyOnWrite = true;
@@ -457,17 +475,17 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
wait(ssn, snd);
notifyOnWrite = false;
}
- while (!snd->send(message, &delivery)) {
+ while (!snd->send(message, delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
}
wakeupDriver();
- if (sync && delivery) {
- while (!delivery->delivered()) {
+ if (sync && *delivery) {
+ while (!(*delivery)->delivered()) {
QPID_LOG(debug, "Waiting for confirmation...");
wait(ssn, snd);//wait until message has been confirmed
}
- if (delivery->rejected()) {
+ if ((*delivery)->rejected()) {
throw MessageRejected("Message was rejected by peer");
}
@@ -476,46 +494,46 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
sender->setCapacity(capacity);
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return sender->getCapacity();
}
uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return sender->getUnsettled();
}
void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
receiver->setCapacity(capacity);
pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
wakeupDriver();
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return receiver->getCapacity();
}
uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return receiver->getAvailable();
}
uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return receiver->getUnsettled();
}
void ConnectionContext::activateOutput()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state == CONNECTED) wakeupDriver();
}
/**
@@ -555,7 +573,7 @@ void ConnectionContext::reset()
}
}
-void ConnectionContext::check() {
+bool ConnectionContext::check() {
if (checkDisconnected()) {
if (ConnectionOptions::reconnect) {
QPID_LOG(notice, "Auto-reconnecting to " << fullUrl);
@@ -564,7 +582,9 @@ void ConnectionContext::check() {
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
+ return true;
}
+ return false;
}
bool ConnectionContext::checkDisconnected() {
@@ -588,7 +608,7 @@ bool ConnectionContext::checkDisconnected() {
void ConnectionContext::wait()
{
- check();
+ if (check()) return; // Reconnected, may need to re-test condition.
lock.wait();
check();
}
@@ -630,6 +650,7 @@ void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::
void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
{
check();
+ ssn->error.raise();
if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
pn_condition_t* error = pn_session_remote_condition(ssn->session);
std::stringstream text;
@@ -690,6 +711,7 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li
void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
{
+ if (s->error) return;
pn_session_open(s->session);
wakeupDriver();
while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
@@ -718,26 +740,31 @@ void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported");
+ boost::shared_ptr<SessionContext> session;
std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n;
- SessionMap::const_iterator i = sessions.find(name);
- if (i == sessions.end()) {
- boost::shared_ptr<SessionContext> s(new SessionContext(connection));
- s->setName(name);
- s->session = pn_session(connection);
- pn_session_open(s->session);
- wakeupDriver();
- while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
- wait();
+ {
+ sys::Monitor::ScopedLock l(lock);
+ SessionMap::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ session = boost::shared_ptr<SessionContext>(new SessionContext(connection));
+ session->setName(name);
+ pn_session_open(session->session);
+ wakeupDriver();
+ sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait()
+ while (pn_session_state(session->session) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ } else {
+ throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
}
- sessions[name] = s;
- return s;
- } else {
- throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
- }
+ }
+ if (transactional) { // Outside of lock
+ startTxSession(session);
+ }
+ return session;
}
+
boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const
{
SessionMap::const_iterator i = sessions.find(name);
@@ -760,7 +787,7 @@ std::string ConnectionContext::getAuthenticatedUsername()
std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
QPID_LOG(trace, id << " decode(" << size << ")");
if (readHeader) {
size_t decoded = readProtocolHeader(buffer, size);
@@ -805,7 +832,7 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
}
std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
QPID_LOG(trace, id << " encode(" << size << ")");
if (writeHeader) {
size_t encoded = writeProtocolHeader(buffer, size);
@@ -843,19 +870,19 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
}
bool ConnectionContext::canEncodePlain()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
return haveOutput && state == CONNECTED;
}
void ConnectionContext::closed()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
state = DISCONNECTED;
lock.notifyAll();
}
void ConnectionContext::opened()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
state = CONNECTED;
lock.notifyAll();
}
@@ -921,7 +948,7 @@ const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions()
std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
size_t decoded = 0;
try {
if (sasl.get() && !sasl->authenticated()) {
@@ -939,7 +966,7 @@ std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
}
std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
size_t encoded = 0;
try {
if (sasl.get() && sasl->canEncode()) {
@@ -957,7 +984,7 @@ std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
}
bool ConnectionContext::canEncode()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (sasl.get()) {
try {
if (sasl->canEncode()) return true;
@@ -978,26 +1005,21 @@ const std::string CLIENT_PPID("qpid.client_ppid");
}
void ConnectionContext::setProperties()
{
- pn_data_t* data = pn_connection_properties(connection);
- pn_data_put_map(data);
- pn_data_enter(data);
-
- pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME));
- std::string processName = sys::SystemInfo::getProcessName();
- pn_data_put_string(data, PnData::str(processName));
-
- pn_data_put_symbol(data, PnData::str(CLIENT_PID));
- pn_data_put_int(data, sys::SystemInfo::getProcessId());
-
- pn_data_put_symbol(data, PnData::str(CLIENT_PPID));
- pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
-
+ PnData data(pn_connection_properties(connection));
+ pn_data_put_map(data.data);
+ pn_data_enter(data.data);
+ data.putSymbol(CLIENT_PROCESS_NAME);
+ data.putSymbol(sys::SystemInfo::getProcessName());
+ data.putSymbol(CLIENT_PID);
+ data.put(int32_t(sys::SystemInfo::getProcessId()));
+ data.putSymbol(CLIENT_PPID);
+ data.put(int32_t(sys::SystemInfo::getParentProcessId()));
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i)
{
- pn_data_put_symbol(data, PnData::str(i->first));
- PnData(data).write(i->second);
+ data.putSymbol(i->first);
+ data.put(i->second);
}
- pn_data_exit(data);
+ pn_data_exit(data.data);
}
const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings()
@@ -1007,7 +1029,7 @@ const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettin
void ConnectionContext::open()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
QPID_LOG(info, "Starting connection to " << fullUrl);
@@ -1049,7 +1071,7 @@ void ConnectionContext::autoconnect()
void ConnectionContext::reconnect(const Url& url) {
QPID_LOG(notice, "Reconnecting to " << url);
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
reset();
@@ -1137,7 +1159,7 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
std::string ConnectionContext::getUrl() const
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ sys::Monitor::ScopedLock l(lock);
return (state == CONNECTED) ? currentUrl.str() : std::string();
}
@@ -1209,6 +1231,40 @@ bool ConnectionContext::CodecAdapter::canEncode()
return context.canEncodePlain();
}
+void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) {
+ try {
+ QPID_LOG(debug, id << " attaching transaction for " << session->getName());
+ boost::shared_ptr<Transaction> tx(new Transaction(session->session));
+ session->transaction = tx;
+ attach(session, tx);
+ tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session);
+ } catch (const Exception& e) {
+ throw TransactionError(Msg() << "Cannot start transaction: " << e.what());
+ }
+}
+
+void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) {
+ {
+ sys::Monitor::ScopedLock l(lock);
+ checkClosed(session);
+ if (!session->transaction)
+ throw TransactionError("No Transaction");
+ Transaction::SendFunction sendFn = boost::bind(
+ &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l));
+ syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received.
+ session->transaction->discharge(sendFn, session, fail);
+ session->transaction->declare(sendFn, session);
+ }
+}
+
+void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) {
+ discharge(session, false);
+}
+
+void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) {
+ discharge(session, true);
+}
+
// setup the transport and connection objects:
void ConnectionContext::configureConnection()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 80da9dff10..b687219624 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -34,6 +34,7 @@
#include "qpid/sys/Monitor.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/amqp/TransportContext.h"
+#include "SenderContext.h"
struct pn_connection_t;
struct pn_link_t;
@@ -59,7 +60,6 @@ class DriverImpl;
class ReceiverContext;
class Sasl;
class SessionContext;
-class SenderContext;
class Transport;
/**
@@ -82,10 +82,20 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
- void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+
+ // Link operations
+ void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt,
+ const qpid::messaging::Message& message, bool sync,
+ SenderContext::Delivery** delivery);
+
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+
+ // Session operations
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+ void commit(boost::shared_ptr<SessionContext> ssn);
+ void rollback(boost::shared_ptr<SessionContext> ssn);
+
void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void sync(boost::shared_ptr<SessionContext> ssn);
boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
@@ -93,10 +103,10 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
+ // Link operations
void setCapacity(boost::shared_ptr<SenderContext>, uint32_t);
uint32_t getCapacity(boost::shared_ptr<SenderContext>);
uint32_t getUnsettled(boost::shared_ptr<SenderContext>);
-
void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t);
uint32_t getCapacity(boost::shared_ptr<ReceiverContext>);
uint32_t getAvailable(boost::shared_ptr<ReceiverContext>);
@@ -159,9 +169,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
bool notifyOnWrite;
boost::intrusive_ptr<qpid::sys::TimerTask> ticker;
- void check();
+ bool check();
bool checkDisconnected();
void waitNoReconnect();
+
+ // NOTE: All wait*() functions must be called in a loop that checks for the
+ // waited condition with the lock held.
void wait();
void waitUntil(qpid::sys::AbsTime until);
void wait(boost::shared_ptr<SessionContext>);
@@ -170,10 +183,12 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until);
void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until);
+
void checkClosed(boost::shared_ptr<SessionContext>);
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
+
void wakeupDriver();
void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
void autoconnect();
@@ -194,8 +209,18 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::string getError();
bool useSasl();
void setProperties();
+
void configureConnection();
bool checkTransportError(std::string&);
+
+ void discharge(boost::shared_ptr<SessionContext>, bool fail);
+ void startTxSession(boost::shared_ptr<SessionContext>);
+
+ void syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&);
+ void sendLH(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt,
+ const qpid::messaging::Message& message, bool sync,
+ SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&);
+ void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
index 5c57c5b0a3..3309d1a683 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
@@ -20,34 +20,53 @@
*/
#include "PnData.h"
#include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace messaging {
namespace amqp {
using types::Variant;
+using namespace types::encodings;
-void PnData::write(const Variant::Map& map)
+// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder.
+// Collapse them all into a single proton-based codec.
+
+void PnData::put(const Variant::Map& map)
{
pn_data_put_map(data);
pn_data_enter(data);
for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
- pn_data_put_string(data, str(i->first));
- write(i->second);
+ pn_data_put_string(data, bytes(i->first));
+ put(i->second);
}
pn_data_exit(data);
}
-void PnData::write(const Variant::List& list)
+
+void PnData::put(const Variant::List& list)
{
pn_data_put_list(data);
pn_data_enter(data);
for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- write(*i);
+ put(*i);
}
pn_data_exit(data);
}
-void PnData::write(const Variant& value)
+
+void PnData::put(const Variant& value)
{
+ // Open data descriptors associated with the value.
+ const Variant::List& descriptors = value.getDescriptors();
+ for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) {
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ if (i->getType() == types::VAR_STRING)
+ pn_data_put_symbol(data, bytes(i->asString()));
+ else
+ pn_data_put_ulong(data, i->asUint64());
+ }
+
+ // Put the variant value
switch (value.getType()) {
case qpid::types::VAR_VOID:
pn_data_put_null(data);
@@ -65,61 +84,70 @@ void PnData::write(const Variant& value)
pn_data_put_double(data, value.asDouble());
break;
case qpid::types::VAR_STRING:
- pn_data_put_string(data, str(value.asString()));
+ if (value.getEncoding() == ASCII)
+ pn_data_put_symbol(data, bytes(value.asString()));
+ else if (value.getEncoding() == BINARY)
+ pn_data_put_binary(data, bytes(value.asString()));
+ else
+ pn_data_put_string(data, bytes(value.asString()));
break;
case qpid::types::VAR_MAP:
- write(value.asMap());
+ put(value.asMap());
break;
case qpid::types::VAR_LIST:
- write(value.asList());
+ put(value.asList());
break;
default:
break;
}
+
+ // Close any descriptors.
+ for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i)
+ pn_data_exit(data);
}
-bool PnData::read(qpid::types::Variant& value)
+bool PnData::get(qpid::types::Variant& value)
{
- return read(pn_data_type(data), value);
+ return get(pn_data_type(data), value);
}
-void PnData::readList(qpid::types::Variant::List& value)
+void PnData::getList(qpid::types::Variant::List& value)
{
size_t count = pn_data_get_list(data);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
qpid::types::Variant e;
- if (read(e)) value.push_back(e);
+ if (get(e)) value.push_back(e);
}
pn_data_exit(data);
}
-void PnData::readMap(qpid::types::Variant::Map& value)
+void PnData::getMap(qpid::types::Variant::Map& value)
{
size_t count = pn_data_get_list(data);
pn_data_enter(data);
for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
- std::string key = str(pn_data_get_symbol(data));
+ std::string key = string(pn_data_get_symbol(data));
pn_data_next(data);
qpid::types::Variant e;
- if (read(e)) value[key]= e;
+ if (get(e)) value[key]= e;
}
pn_data_exit(data);
}
-void PnData::readArray(qpid::types::Variant::List& value)
+void PnData::getArray(qpid::types::Variant::List& value)
{
size_t count = pn_data_get_array(data);
pn_type_t type = pn_data_get_array_type(data);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
qpid::types::Variant e;
- if (read(type, e)) value.push_back(e);
+ if (get(type, e)) value.push_back(e);
}
pn_data_exit(data);
}
-bool PnData::read(pn_type_t type, qpid::types::Variant& value)
+bool PnData::get(pn_type_t type, qpid::types::Variant& value)
{
switch (type) {
case PN_NULL:
@@ -168,41 +196,41 @@ bool PnData::read(pn_type_t type, qpid::types::Variant& value)
value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
return true;
case PN_BINARY:
- value = str(pn_data_get_binary(data));
+ value = string(pn_data_get_binary(data));
value.setEncoding(qpid::types::encodings::BINARY);
return true;
case PN_STRING:
- value = str(pn_data_get_string(data));
+ value = string(pn_data_get_string(data));
value.setEncoding(qpid::types::encodings::UTF8);
return true;
case PN_SYMBOL:
- value = str(pn_data_get_string(data));
+ value = string(pn_data_get_string(data));
value.setEncoding(qpid::types::encodings::ASCII);
return true;
case PN_LIST:
value = qpid::types::Variant::List();
- readList(value.asList());
+ getList(value.asList());
return true;
break;
case PN_MAP:
value = qpid::types::Variant::Map();
- readMap(value.asMap());
+ getMap(value.asMap());
return true;
case PN_ARRAY:
value = qpid::types::Variant::List();
- readArray(value.asList());
+ getArray(value.asList());
return true;
case PN_DESCRIBED:
+ // TODO aconway 2014-11-20: get described values.
case PN_DECIMAL32:
case PN_DECIMAL64:
case PN_DECIMAL128:
default:
return false;
}
-
}
-pn_bytes_t PnData::str(const std::string& s)
+pn_bytes_t PnData::bytes(const std::string& s)
{
pn_bytes_t result;
result.start = const_cast<char*>(s.data());
@@ -210,7 +238,7 @@ pn_bytes_t PnData::str(const std::string& s)
return result;
}
-std::string PnData::str(const pn_bytes_t& in)
+std::string PnData::string(const pn_bytes_t& in)
{
return std::string(in.start, in.size);
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h
index 6d03235432..b0119f88fd 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/PnData.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h
@@ -32,28 +32,29 @@ namespace messaging {
namespace amqp {
/**
- * Helper class to read/write messaging types to/from pn_data_t.
+ * Helper class to put/get messaging types to/from pn_data_t.
*/
class PnData
{
public:
- PnData(pn_data_t* d) : data(d) {}
+ pn_data_t* data;
- void write(const types::Variant& value);
- void write(const types::Variant::Map& map);
- void write(const types::Variant::List& list);
+ PnData(pn_data_t* d) : data(d) {}
- bool read(pn_type_t type, types::Variant& value);
- bool read(types::Variant& value);
- void readList(types::Variant::List& value);
- void readMap(types::Variant::Map& value);
- void readArray(types::Variant::List& value);
+ void put(const types::Variant& value);
+ void put(const types::Variant::Map& map);
+ void put(const types::Variant::List& list);
+ void put(int32_t n) { pn_data_put_int(data, n); }
+ void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); }
- static pn_bytes_t str(const std::string&);
- static std::string str(const pn_bytes_t&);
+ bool get(pn_type_t type, types::Variant& value);
+ bool get(types::Variant& value);
+ void getList(types::Variant::List& value);
+ void getMap(types::Variant::Map& value);
+ void getArray(types::Variant::List& value);
- private:
- pn_data_t* data;
+ static pn_bytes_t bytes(const std::string&);
+ static std::string string(const pn_bytes_t&);
};
}}} // namespace messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 5e0707056f..a28509b0b1 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -37,9 +37,10 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
helper(address),
receiver(pn_receiver(session, name.c_str())),
capacity(0), used(0) {}
+
ReceiverContext::~ReceiverContext()
{
- pn_link_free(receiver);
+ if (receiver) pn_link_free(receiver);
}
void ReceiverContext::setCapacity(uint32_t c)
@@ -63,12 +64,13 @@ uint32_t ReceiverContext::getAvailable()
uint32_t ReceiverContext::getUnsettled()
{
+ assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver));
return pn_link_unsettled(receiver) - pn_link_queued(receiver);
}
void ReceiverContext::close()
{
- pn_link_close(receiver);
+ if (receiver) pn_link_close(receiver);
}
const std::string& ReceiverContext::getName() const
@@ -96,7 +98,7 @@ void ReceiverContext::verify()
}
void ReceiverContext::configure()
{
- configure(pn_link_source(receiver));
+ if (receiver) configure(pn_link_source(receiver));
}
void ReceiverContext::configure(pn_terminus_t* source)
{
@@ -116,13 +118,13 @@ Address ReceiverContext::getAddress() const
void ReceiverContext::reset(pn_session_t* session)
{
- receiver = pn_receiver(session, name.c_str());
- configure();
+ receiver = session ? pn_receiver(session, name.c_str()) : 0;
+ if (receiver) configure();
}
bool ReceiverContext::hasCurrent()
{
- return pn_link_current(receiver);
+ return receiver && pn_link_current(receiver);
}
bool ReceiverContext::wakeupToIssueCredit()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 2a48b2241a..b12af5eb25 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -18,8 +18,10 @@
* under the License.
*
*/
-#include "qpid/messaging/amqp/SenderContext.h"
-#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "SenderContext.h"
+#include "Transaction.h"
+#include "EncodedMessage.h"
+#include "PnData.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/Exception.h"
@@ -40,22 +42,29 @@ extern "C" {
namespace qpid {
namespace messaging {
namespace amqp {
+
//TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_)
- : name(n),
+SenderContext::SenderContext(pn_session_t* session, const std::string& n,
+ const qpid::messaging::Address& a,
+ bool setToOnSend_,
+ const CoordinatorPtr& coord)
+ : sender(pn_sender(session, n.c_str())),
+ name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()),
- setToOnSend(setToOnSend_) {}
+ nextId(0), capacity(50), unreliable(helper.isUnreliable()),
+ setToOnSend(setToOnSend_),
+ transaction(coord)
+{}
SenderContext::~SenderContext()
{
- pn_link_free(sender);
+ if (sender) pn_link_free(sender);
}
void SenderContext::close()
{
- pn_link_close(sender);
+ if (sender) pn_link_close(sender);
}
void SenderContext::setCapacity(uint32_t c)
@@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
{
resend();//if there are any messages needing to be resent at the front of the queue, send them first
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
+ types::Variant state;
+ if (transaction)
+ state = transaction->getSendState();
if (unreliable) {
Delivery delivery(nextId++);
delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
+ delivery.send(sender, unreliable, state);
*out = 0;
return true;
} else {
@@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
try {
Delivery& delivery = deliveries.back();
delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
+ delivery.send(sender, unreliable, state);
*out = &delivery;
return true;
} catch (const std::exception& e) {
@@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
throw SendError(e.what());
}
}
-void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
+
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
@@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
tag.bytes = reinterpret_cast<const char*>(&id);
#endif
token = pn_delivery(sender, tag);
+ if (!state.isVoid()) { // Add transaction state
+ PnData data(pn_disposition_data(pn_delivery_local(token)));
+ data.put(state);
+ pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
+ }
pn_link_send(sender, encoded.getData(), encoded.getSize());
if (unreliable) {
pn_delivery_settle(token);
@@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected()
{
return pn_delivery_remote_state(token) == PN_REJECTED;
}
+
+std::string SenderContext::Delivery::error()
+{
+ pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token));
+ return (condition && pn_condition_is_set(condition)) ?
+ Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) :
+ std::string();
+}
+
void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
@@ -570,10 +597,12 @@ void SenderContext::verify()
helper.checkAssertion(target, AddressHelper::FOR_SENDER);
}
+
void SenderContext::configure()
{
- configure(pn_link_target(sender));
+ if (sender) configure(pn_link_target(sender));
}
+
void SenderContext::configure(pn_terminus_t* target)
{
helper.configure(sender, target, AddressHelper::FOR_SENDER);
@@ -603,12 +632,10 @@ Address SenderContext::getAddress() const
void SenderContext::reset(pn_session_t* session)
{
- sender = pn_sender(session, name.c_str());
- configure();
-
- for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+ sender = session ? pn_sender(session, name.c_str()) : 0;
+ if (sender) configure();
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i)
i->reset();
- }
}
void SenderContext::resend()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 66e45a85a6..4d3c4bee79 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -24,6 +24,7 @@
#include <deque>
#include <string>
#include <vector>
+#include <boost/shared_ptr.hpp>
#include "qpid/sys/IntegerTypes.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/amqp/AddressHelper.h"
@@ -41,9 +42,10 @@ class Message;
class MessageImpl;
namespace amqp {
-/**
- *
- */
+
+class Transaction;
+
+
class SenderContext
{
public:
@@ -52,13 +54,15 @@ class SenderContext
public:
Delivery(int32_t id);
void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField);
- void send(pn_link_t*, bool unreliable);
+ void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant());
bool delivered();
bool accepted();
bool rejected();
void settle();
void reset();
bool sent() const;
+ pn_delivery_t* getToken() const { return token; }
+ std::string error();
private:
int32_t id;
pn_delivery_t* token;
@@ -66,22 +70,32 @@ class SenderContext
bool presettled;
};
- SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend);
+ typedef boost::shared_ptr<Transaction> CoordinatorPtr;
+
+ SenderContext(pn_session_t* session, const std::string& name,
+ const qpid::messaging::Address& target,
+ bool setToOnSend,
+ const CoordinatorPtr& transaction = CoordinatorPtr());
~SenderContext();
- void reset(pn_session_t* session);
- void close();
- void setCapacity(uint32_t);
- uint32_t getCapacity();
- uint32_t getUnsettled();
- const std::string& getName() const;
- const std::string& getTarget() const;
- bool send(const qpid::messaging::Message& message, Delivery**);
- void configure();
- void verify();
- void check();
- bool settled();
- bool closed();
- Address getAddress() const;
+
+ virtual void reset(pn_session_t* session);
+ virtual void close();
+ virtual void setCapacity(uint32_t);
+ virtual uint32_t getCapacity();
+ virtual uint32_t getUnsettled();
+ virtual const std::string& getName() const;
+ virtual const std::string& getTarget() const;
+ virtual bool send(const qpid::messaging::Message& message, Delivery**);
+ virtual void configure();
+ virtual void verify();
+ virtual void check();
+ virtual bool settled();
+ virtual bool closed();
+ virtual Address getAddress() const;
+
+ protected:
+ pn_link_t* sender;
+
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
@@ -89,12 +103,12 @@ class SenderContext
const std::string name;
qpid::messaging::Address address;
AddressHelper helper;
- pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
bool unreliable;
bool setToOnSend;
+ boost::shared_ptr<Transaction> transaction;
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
index 367db701cb..98f2d34e7d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
@@ -39,7 +39,8 @@ SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c,
void SenderHandle::send(const Message& message, bool sync)
{
- connection->send(session, sender, message, sync);
+ SenderContext::Delivery* d = 0;
+ connection->send(session, sender, message, sync, &d);
}
void SenderHandle::close()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 824b958af3..2b82ffc377 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -21,11 +21,15 @@
#include "SessionContext.h"
#include "SenderContext.h"
#include "ReceiverContext.h"
+#include "Transaction.h"
+#include "PnData.h"
#include <boost/format.hpp>
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/log/Statement.h"
+#include "qpid/amqp/descriptors.h"
+
extern "C" {
#include <proton/engine.h>
}
@@ -35,23 +39,32 @@ namespace messaging {
namespace amqp {
SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
+
SessionContext::~SessionContext()
{
- senders.clear(); receivers.clear();
- pn_session_free(session);
+ // Clear all pointers to senders and receivers before we free the session.
+ senders.clear();
+ receivers.clear();
+ transaction.reset(); // Transaction is a sender.
+ if (!error && session)
+ pn_session_free(session);
}
boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend)
{
+ error.raise();
std::string name = AddressHelper::getLinkName(address);
- if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection");
- boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend));
+ if (senders.find(name) != senders.end())
+ throw LinkError("Link name must be unique within the scope of the connection");
+ boost::shared_ptr<SenderContext> s(
+ new SenderContext(session, name, address, setToOnSend, transaction));
senders[name] = s;
return s;
}
boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
{
+ error.raise();
std::string name = AddressHelper::getLinkName(address);
if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
@@ -61,6 +74,7 @@ boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::me
boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
{
+ error.raise();
SenderMap::const_iterator i = senders.find(name);
if (i == senders.end()) {
throw qpid::messaging::KeyError(std::string("No such sender") + name);
@@ -71,6 +85,7 @@ boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& na
boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
{
+ error.raise();
ReceiverMap::const_iterator i = receivers.find(name);
if (i == receivers.end()) {
throw qpid::messaging::KeyError(std::string("No such receiver") + name);
@@ -81,16 +96,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string
void SessionContext::removeReceiver(const std::string& n)
{
+ error.raise();
receivers.erase(n);
}
void SessionContext::removeSender(const std::string& n)
{
+ error.raise();
senders.erase(n);
}
boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
{
+ error.raise();
for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
if (i->second->hasCurrent()) {
return i->second;
@@ -102,16 +120,19 @@ boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
uint32_t SessionContext::getReceivable()
{
+ error.raise();
return 0;//TODO
}
uint32_t SessionContext::getUnsettledAcks()
{
+ error.raise();
return 0;//TODO
}
qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
{
+ error.raise();
qpid::framing::SequenceNumber id = next++;
if (!pn_delivery_settled(delivery))
unacked[id] = delivery;
@@ -121,22 +142,32 @@ qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
{
+ error.raise();
for (DeliveryMap::iterator i = begin; i != end; ++i) {
- QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second);
- pn_delivery_update(i->second, PN_ACCEPTED);
- pn_delivery_settle(i->second);//TODO: different settlement modes?
+ types::Variant txState;
+ if (transaction) {
+ QPID_LOG(trace, "Setting disposition for transactional delivery "
+ << i->first << " -> " << i->second);
+ transaction->acknowledge(i->second);
+ } else {
+ QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second);
+ pn_delivery_update(i->second, PN_ACCEPTED);
+ pn_delivery_settle(i->second); //TODO: different settlement modes?
+ }
}
unacked.erase(begin, end);
}
void SessionContext::acknowledge()
{
+ error.raise();
QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages");
acknowledge(unacked.begin(), unacked.end());
}
void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
{
+ error.raise();
QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative);
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
@@ -149,6 +180,7 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c
void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
{
+ error.raise();
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
if (reject) {
@@ -166,7 +198,9 @@ void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
bool SessionContext::settled()
{
+ error.raise();
bool result = true;
+
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
try {
if (!i->second->closed() && !i->second->settled()) result = false;
@@ -189,8 +223,25 @@ std::string SessionContext::getName() const
void SessionContext::reset(pn_connection_t* connection)
{
- session = pn_session(connection);
unacked.clear();
+ if (transaction) {
+ if (transaction->isCommitting())
+ error = new TransactionUnknown("Transaction outcome unknown: transport failure");
+ else
+ error = new TransactionAborted("Transaction aborted: transport failure");
+ resetSession(0);
+ senders.clear();
+ receivers.clear();
+ transaction.reset();
+ return;
+ }
+ resetSession(pn_session(connection));
+
+}
+
+void SessionContext::resetSession(pn_session_t* session_) {
+ session = session_;
+ if (transaction) transaction->reset(session);
for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
i->second->reset(session);
}
@@ -198,4 +249,6 @@ void SessionContext::reset(pn_connection_t* connection)
i->second->reset(session);
}
}
+
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index 8c2bb040a6..67b3c1e401 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -26,6 +26,7 @@
#include <boost/shared_ptr.hpp>
#include "qpid/sys/IntegerTypes.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/ExceptionHolder.h"
struct pn_connection_t;
struct pn_session_t;
@@ -42,6 +43,8 @@ namespace amqp {
class ConnectionContext;
class SenderContext;
class ReceiverContext;
+class Transaction;
+
/**
*
*/
@@ -63,23 +66,29 @@ class SessionContext
bool settled();
void setName(const std::string&);
std::string getName() const;
+
+ void nack(const qpid::framing::SequenceNumber& id, bool reject);
+
private:
friend class ConnectionContext;
typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap;
typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap;
+
pn_session_t* session;
SenderMap senders;
+ boost::shared_ptr<Transaction> transaction;
ReceiverMap receivers;
DeliveryMap unacked;
qpid::framing::SequenceNumber next;
std::string name;
+ sys::ExceptionHolder error;
qpid::framing::SequenceNumber record(pn_delivery_t*);
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
- void nack(const qpid::framing::SequenceNumber& id, bool reject);
+ void resetSession(pn_session_t*);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 4d427639d3..44294e5f04 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -42,12 +42,12 @@ SessionHandle::SessionHandle(boost::shared_ptr<ConnectionContext> c, boost::shar
void SessionHandle::commit()
{
-
+ connection->commit(session);
}
void SessionHandle::rollback()
{
-
+ connection->rollback(session);
}
void SessionHandle::acknowledge(bool /*sync*/)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp
new file mode 100644
index 0000000000..754b00d802
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Transaction.h"
+#include "SessionContext.h"
+#include "ConnectionContext.h"
+#include "PnData.h"
+#include <proton/engine.h>
+#include <qpid/Exception.h>
+#include <qpid/amqp/descriptors.h>
+#include <qpid/messaging/exceptions.h>
+#include <qpid/log/Statement.h>
+#include "qpid/messaging/Message.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using namespace types;
+using types::Exception;
+
+namespace {
+const std::string LOCAL_TRANSACTIONS("amqp:local-transactions");
+const std::string TX_COORDINATOR("tx-transaction");
+const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}");
+}
+
+Transaction::Transaction(pn_session_t* session) :
+ SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false)
+{}
+
+void Transaction::clear() {
+ id.clear();
+ sendState.reset();
+ acceptState.reset();
+}
+
+void Transaction::configure() {
+ SenderContext::configure();
+ pn_terminus_t* target = pn_link_target(sender);
+ pn_terminus_set_type(target, PN_COORDINATOR);
+ PnData(pn_terminus_capabilities(target)).putSymbol(LOCAL_TRANSACTIONS);
+}
+
+void Transaction::verify() {}
+
+const std::string& Transaction::getTarget() const { return getName(); }
+
+void Transaction::declare(SendFunction send, const SessionPtr& session) {
+ committing = false;
+ error.raise();
+ clear();
+ Variant declare = Variant::described(qpid::amqp::transaction::DECLARE_CODE, Variant::List());
+ SenderContext::Delivery* delivery = 0;
+ send(session, shared_from_this(), Message(declare), true, &delivery);
+ setId(*delivery);
+}
+
+void Transaction::discharge(SendFunction send, const SessionPtr& session, bool fail) {
+ error.raise();
+ committing = !fail;
+ try {
+ // Send a discharge message to the remote coordinator.
+ Variant::List dischargeList;
+ dischargeList.push_back(Variant(id));
+ dischargeList.push_back(Variant(fail));
+ Variant discharge(dischargeList);
+ discharge.setDescriptor(qpid::amqp::transaction::DISCHARGE_CODE);
+ SenderContext::Delivery* delivery = 0;
+ send(session, shared_from_this(), Message(discharge), true, &delivery);
+ if (!delivery->accepted())
+ throw TransactionAborted(delivery->error());
+ committing = false;
+ }
+ catch(const TransactionError&) {
+ throw;
+ }
+ catch(const Exception& e) {
+ committing = false;
+ throw TransactionAborted(e.what());
+ }
+}
+
+// Set the transaction ID from the delivery returned by the remote coordinator.
+void Transaction::setId(const SenderContext::Delivery& delivery)
+{
+ if (delivery.getToken() &&
+ pn_delivery_remote_state(delivery.getToken()) == qpid::amqp::transaction::DECLARED_CODE)
+ {
+ pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery.getToken()));
+ if (data && pn_data_next(data)) {
+ size_t count = pn_data_get_list(data);
+ if (count > 0) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ setId(PnData::string(pn_data_get_binary(data)));
+ pn_data_exit(data);
+ return;
+ }
+ }
+ }
+ throw TransactionError("No transaction ID returned by remote coordinator.");
+}
+
+void Transaction::setId(const std::string& id_) {
+ id = id_;
+ if (id.empty()) {
+ clear();
+ }
+ else {
+ // NOTE: The send and accept states are NOT described, the descriptor
+ // is added in pn_delivery_update.
+ Variant::List list;
+ list.push_back(Variant(id, "binary"));
+ sendState = Variant(list);
+
+ Variant accepted = Variant::described(qpid::amqp::message::ACCEPTED_CODE, Variant::List());
+ list.push_back(accepted);
+ acceptState = Variant(list);
+ }
+}
+
+types::Variant Transaction::getSendState() const {
+ error.raise();
+ return sendState;
+}
+
+void Transaction::acknowledge(pn_delivery_t* delivery)
+{
+ error.raise();
+ PnData data(pn_disposition_data(pn_delivery_local(delivery)));
+ data.put(acceptState);
+ pn_delivery_update(delivery, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
+ pn_delivery_settle(delivery);
+}
+
+
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transaction.h b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h
new file mode 100644
index 0000000000..35492c9bb3
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/Transaction.h
@@ -0,0 +1,95 @@
+#ifndef COORDINATORCONTEXT_H
+#define COORDINATORCONTEXT_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#include "SenderContext.h"
+#include <qpid/types/Variant.h>
+#include "qpid/sys/ExceptionHolder.h"
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/function.hpp>
+
+struct pn_session_t;
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+class SessionContext;
+class ConnectionContext;
+
+/**
+ * Track the current transaction for a session.
+ *
+ * Implements SenderContext, to send transaction command messages to remote coordinator.
+ */
+class Transaction : public SenderContext, public boost::enable_shared_from_this<Transaction> {
+ public:
+ typedef boost::shared_ptr<SessionContext> SessionPtr;
+
+ typedef boost::function<void (boost::shared_ptr<SessionContext> ssn,
+ boost::shared_ptr<SenderContext> snd,
+ const qpid::messaging::Message& message,
+ bool sync,
+ SenderContext::Delivery** delivery)> SendFunction;
+
+ Transaction(pn_session_t*);
+
+ sys::ExceptionHolder error;
+
+ /** Declare a transaction using connection and session to send to remote co-ordinator. */
+ void declare(SendFunction, const SessionPtr& session);
+
+ /** Discharge a transaction using connection and session to send to remote co-ordinator.
+ *@param fail: true means rollback, false means commit.
+ */
+ void discharge(SendFunction, const SessionPtr& session, bool fail);
+
+ /** Update a delivery with a transactional accept state. */
+ void acknowledge(pn_delivery_t* delivery);
+
+ /** Get delivery state to attach to transfers sent in a transaction. */
+ types::Variant getSendState() const;
+
+ /** Override SenderContext::getTarget with a more readable value */
+ const std::string& getTarget() const;
+
+ bool isCommitting() const { return committing; }
+
+ protected:
+ // SenderContext overrides
+ void configure();
+ void verify();
+
+ private:
+ std::string id;
+ types::Variant sendState;
+ types::Variant acceptState;
+ bool committing;
+
+
+ void clear();
+ void setId(const SenderContext::Delivery& delivery);
+ void setId(const std::string& id);
+};
+
+}}}
+
+#endif
diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp
index 6b979a016b..26dbe0c91e 100644
--- a/qpid/cpp/src/qpid/types/Variant.cpp
+++ b/qpid/cpp/src/qpid/types/Variant.cpp
@@ -43,21 +43,23 @@ class VariantImpl
{
public:
VariantImpl();
- VariantImpl(bool);
- VariantImpl(uint8_t);
- VariantImpl(uint16_t);
- VariantImpl(uint32_t);
- VariantImpl(uint64_t);
- VariantImpl(int8_t);
- VariantImpl(int16_t);
- VariantImpl(int32_t);
- VariantImpl(int64_t);
- VariantImpl(float);
- VariantImpl(double);
- VariantImpl(const std::string&, const std::string& encoding=std::string());
- VariantImpl(const Variant::Map&);
- VariantImpl(const Variant::List&);
- VariantImpl(const Uuid&);
+ void reset();
+ void set(bool);
+ void set(uint8_t);
+ void set(uint16_t);
+ void set(uint32_t);
+ void set(uint64_t);
+ void set(int8_t);
+ void set(int16_t);
+ void set(int32_t);
+ void set(int64_t);
+ void set(float);
+ void set(double);
+ void set(const std::string&, const std::string& encoding=std::string());
+ void set(const Variant::Map&);
+ void set(const Variant::List&);
+ void set(const Uuid&);
+ void set(const Variant&);
~VariantImpl();
VariantType getType() const;
@@ -90,9 +92,10 @@ class VariantImpl
bool isEqualTo(VariantImpl&) const;
bool isEquivalentTo(VariantImpl&) const;
- static VariantImpl* create(const Variant&);
+ Variant::List descriptors; // Optional descriptors for described value.
+
private:
- const VariantType type;
+ VariantType type;
union {
bool b;
uint8_t ui8;
@@ -110,7 +113,7 @@ class VariantImpl
Variant::List* list;
std::string* string;
} value;
- std::string encoding;//optional encoding for variable length data
+ std::string encoding; // Optional encoding for variable length data.
template<class T> T convertFromString() const
{
@@ -136,26 +139,34 @@ class VariantImpl
};
+VariantImpl::VariantImpl() : type(VAR_VOID) {}
+
+void VariantImpl::set(bool b) { reset(); type = VAR_BOOL; value.b = b; }
+void VariantImpl::set(uint8_t i) { reset(); type = VAR_UINT8; value.ui8 = i; }
+void VariantImpl::set(uint16_t i) { reset(); type = VAR_UINT16; value.ui16 = i; }
+void VariantImpl::set(uint32_t i) { reset(); type = VAR_UINT32; value.ui32 = i; }
+void VariantImpl::set(uint64_t i) { reset(); type = VAR_UINT64; value.ui64 = i; }
+void VariantImpl::set(int8_t i) { reset(); type = VAR_INT8; value.i8 = i; }
+void VariantImpl::set(int16_t i) { reset(); type = VAR_INT16; value.i16 = i; }
+void VariantImpl::set(int32_t i) { reset(); type = VAR_INT32; value.i32 = i; }
+void VariantImpl::set(int64_t i) { reset(); type = VAR_INT64; value.i64 = i; }
+void VariantImpl::set(float f) { reset(); type = VAR_FLOAT; value.f = f; }
+void VariantImpl::set(double d) { reset(); type = VAR_DOUBLE; value.d = d; }
+void VariantImpl::set(const std::string& s, const std::string& e) { reset(); type = VAR_STRING; encoding = e; value.string = new std::string(s); }
+
+void VariantImpl::set(const Variant::Map& m) {
+ reset();
+ type = VAR_MAP;
+ value.map = new Variant::Map(m);
+}
+
+void VariantImpl::set(const Variant::List& l) { reset(); type = VAR_LIST; value.list = new Variant::List(l); }
+
+void VariantImpl::set(const Uuid& u) { reset(); type = VAR_UUID; value.uuid = new Uuid(u); }
-VariantImpl::VariantImpl() : type(VAR_VOID) { value.i64 = 0; }
-VariantImpl::VariantImpl(bool b) : type(VAR_BOOL) { value.b = b; }
-VariantImpl::VariantImpl(uint8_t i) : type(VAR_UINT8) { value.ui8 = i; }
-VariantImpl::VariantImpl(uint16_t i) : type(VAR_UINT16) { value.ui16 = i; }
-VariantImpl::VariantImpl(uint32_t i) : type(VAR_UINT32) { value.ui32 = i; }
-VariantImpl::VariantImpl(uint64_t i) : type(VAR_UINT64) { value.ui64 = i; }
-VariantImpl::VariantImpl(int8_t i) : type(VAR_INT8) { value.i8 = i; }
-VariantImpl::VariantImpl(int16_t i) : type(VAR_INT16) { value.i16 = i; }
-VariantImpl::VariantImpl(int32_t i) : type(VAR_INT32) { value.i32 = i; }
-VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; }
-VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; }
-VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; }
-VariantImpl::VariantImpl(const std::string& s, const std::string& e)
- : type(VAR_STRING), encoding(e) { value.string = new std::string(s); }
-VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.map = new Variant::Map(m); }
-VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.list = new Variant::List(l); }
-VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.uuid = new Uuid(u); }
-
-VariantImpl::~VariantImpl() {
+VariantImpl::~VariantImpl() { reset(); }
+
+void VariantImpl::reset() {
switch (type) {
case VAR_STRING:
delete value.string;
@@ -172,6 +183,7 @@ VariantImpl::~VariantImpl() {
default:
break;
}
+ type = VAR_VOID;
}
VariantType VariantImpl::getType() const { return type; }
@@ -637,46 +649,50 @@ bool isIntegerType(VariantType type)
}
}
-VariantImpl* VariantImpl::create(const Variant& v)
+void VariantImpl::set(const Variant& v)
{
switch (v.getType()) {
- case VAR_BOOL: return new VariantImpl(v.asBool());
- case VAR_UINT8: return new VariantImpl(v.asUint8());
- case VAR_UINT16: return new VariantImpl(v.asUint16());
- case VAR_UINT32: return new VariantImpl(v.asUint32());
- case VAR_UINT64: return new VariantImpl(v.asUint64());
- case VAR_INT8: return new VariantImpl(v.asInt8());
- case VAR_INT16: return new VariantImpl(v.asInt16());
- case VAR_INT32: return new VariantImpl(v.asInt32());
- case VAR_INT64: return new VariantImpl(v.asInt64());
- case VAR_FLOAT: return new VariantImpl(v.asFloat());
- case VAR_DOUBLE: return new VariantImpl(v.asDouble());
- case VAR_STRING: return new VariantImpl(v.asString(), v.getEncoding());
- case VAR_MAP: return new VariantImpl(v.asMap());
- case VAR_LIST: return new VariantImpl(v.asList());
- case VAR_UUID: return new VariantImpl(v.asUuid());
- default: return new VariantImpl();
+ case VAR_BOOL: set(v.asBool()); break;
+ case VAR_UINT8: set(v.asUint8()); break;
+ case VAR_UINT16: set(v.asUint16()); break;
+ case VAR_UINT32: set(v.asUint32()); break;
+ case VAR_UINT64: set(v.asUint64()); break;
+ case VAR_INT8: set(v.asInt8()); break;
+ case VAR_INT16: set(v.asInt16()); break;
+ case VAR_INT32: set(v.asInt32()); break;
+ case VAR_INT64: set(v.asInt64()); break;
+ case VAR_FLOAT: set(v.asFloat()); break;
+ case VAR_DOUBLE: set(v.asDouble()); break;
+ case VAR_STRING: set(v.asString(), v.getEncoding()); break;
+ case VAR_MAP: set(v.asMap()); break;
+ case VAR_LIST: set(v.asList()); break;
+ case VAR_UUID: set(v.asUuid()); break;
+ default: reset();
}
-}
-
-Variant::Variant() : impl(new VariantImpl()) {}
-Variant::Variant(bool b) : impl(new VariantImpl(b)) {}
-Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(uint32_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(uint64_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int8_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int16_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int32_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int64_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(float f) : impl(new VariantImpl(f)) {}
-Variant::Variant(double d) : impl(new VariantImpl(d)) {}
-Variant::Variant(const std::string& s) : impl(new VariantImpl(s)) {}
-Variant::Variant(const char* s) : impl(new VariantImpl(std::string(s))) {}
-Variant::Variant(const Map& m) : impl(new VariantImpl(m)) {}
-Variant::Variant(const List& l) : impl(new VariantImpl(l)) {}
-Variant::Variant(const Variant& v) : impl(VariantImpl::create(v)) {}
-Variant::Variant(const Uuid& u) : impl(new VariantImpl(u)) {}
+ encoding = v.getEncoding();
+ descriptors = v.getDescriptors();
+}
+
+Variant::Variant() : impl(0) {}
+Variant::Variant(bool b) : impl(new VariantImpl()) { impl->set(b); }
+Variant::Variant(uint8_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(uint16_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(uint32_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(uint64_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int8_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int16_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int32_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int64_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(float f) : impl(new VariantImpl()) { impl->set(f); }
+Variant::Variant(double d) : impl(new VariantImpl()) { impl->set(d); }
+Variant::Variant(const std::string& s) : impl(new VariantImpl()) { impl->set(s); }
+Variant::Variant(const std::string& s, const std::string& encoding) : impl(new VariantImpl()) { impl->set(s, encoding); }
+Variant::Variant(const char* s) : impl(new VariantImpl()) { impl->set(std::string(s)); }
+Variant::Variant(const char* s, const char* encoding) : impl(new VariantImpl()) { impl->set(std::string(s), std::string(encoding)); }
+Variant::Variant(const Map& m) : impl(new VariantImpl()) { impl->set(m); }
+Variant::Variant(const List& l) : impl(new VariantImpl()) { impl->set(l); }
+Variant::Variant(const Variant& v) : impl(new VariantImpl()) { impl->set(v); }
+Variant::Variant(const Uuid& u) : impl(new VariantImpl()) { impl->set(u); }
Variant::~Variant() { if (impl) delete impl; }
@@ -686,116 +702,105 @@ void Variant::reset()
impl = 0;
}
+namespace {
+VariantImpl* assure(VariantImpl*& ptr) {
+ if (!ptr) ptr = new VariantImpl();
+ return ptr;
+}
+}
Variant& Variant::operator=(bool b)
{
- if (impl) delete impl;
- impl = new VariantImpl(b);
+ assure(impl)->set(b);
return *this;
}
Variant& Variant::operator=(uint8_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(uint16_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(uint32_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(uint64_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(int8_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(int16_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(int32_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(int64_t i)
{
- if (impl) delete impl;
- impl = new VariantImpl(i);
+ assure(impl)->set(i);
return *this;
}
Variant& Variant::operator=(float f)
{
- if (impl) delete impl;
- impl = new VariantImpl(f);
+ assure(impl)->set(f);
return *this;
}
Variant& Variant::operator=(double d)
{
- if (impl) delete impl;
- impl = new VariantImpl(d);
+ assure(impl)->set(d);
return *this;
}
Variant& Variant::operator=(const std::string& s)
{
- if (impl) delete impl;
- impl = new VariantImpl(s);
+ assure(impl)->set(s);
return *this;
}
Variant& Variant::operator=(const char* s)
{
- if (impl) delete impl;
- impl = new VariantImpl(std::string(s));
+ assure(impl)->set(std::string(s));
return *this;
}
Variant& Variant::operator=(const Uuid& u)
{
- if (impl) delete impl;
- impl = new VariantImpl(u);
+ assure(impl)->set(u);
return *this;
}
Variant& Variant::operator=(const Map& m)
{
- if (impl) delete impl;
- impl = new VariantImpl(m);
+ assure(impl)->set(m);
return *this;
}
Variant& Variant::operator=(const List& l)
{
- if (impl) delete impl;
- impl = new VariantImpl(l);
+ assure(impl)->set(l);
return *this;
}
Variant& Variant::operator=(const Variant& v)
{
- if (impl) delete impl;
- impl = VariantImpl::create(v);
+ assure(impl)->set(v);
return *this;
}
@@ -841,8 +846,7 @@ Variant::List& Variant::asList() { if (!impl) throw InvalidConversion("Can't con
const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
void Variant::setEncoding(const std::string& s) {
- if (!impl) impl = new VariantImpl();
- impl->setEncoding(s);
+ assure(impl)->setEncoding(s);
}
const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; }
@@ -884,6 +888,12 @@ std::ostream& operator<<(std::ostream& out, const Variant::List& list)
std::ostream& operator<<(std::ostream& out, const Variant& value)
{
+ // Print the descriptors
+ const Variant::List& descriptors = value.getDescriptors();
+ for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i)
+ out << "@" << *i << " ";
+
+ // Print the value
switch (value.getType()) {
case VAR_MAP:
out << value.asMap();
@@ -910,7 +920,43 @@ bool operator!=(const Variant& a, const Variant& b) { return !(a == b); }
bool Variant::isEqualTo(const Variant& other) const
{
+ if (isVoid() && other.isVoid()) return true;
+ if (isVoid() || other.isVoid()) return false;
return impl && impl->isEqualTo(*other.impl);
}
+bool Variant::isDescribed() const {
+ return impl && !impl->descriptors.empty();
+}
+
+Variant::List& Variant::getDescriptors() {
+ return assure(impl)->descriptors;
+}
+
+const Variant::List& Variant::getDescriptors() const {
+ return assure(impl)->descriptors;
+}
+
+Variant Variant::getDescriptor() const {
+ if (getDescriptors().size() > 0) return getDescriptors().front();
+ else return Variant();
+}
+
+void Variant::setDescriptor(const Variant& descriptor) {
+ getDescriptors().clear();
+ getDescriptors().push_back(descriptor);
+}
+
+Variant Variant::described(const Variant& descriptor, const Variant& value) {
+ Variant described(value);
+ described.setDescriptor(descriptor);
+ return described;
+}
+
+Variant Variant::described(const Variant& descriptor, const List& value) {
+ Variant described(value);
+ described.setDescriptor(descriptor);
+ return described;
+}
+
}} // namespace qpid::types
diff --git a/qpid/cpp/src/qpid/types/encodings.h b/qpid/cpp/src/qpid/types/encodings.h
index 827b6964b9..571e8607aa 100644
--- a/qpid/cpp/src/qpid/types/encodings.h
+++ b/qpid/cpp/src/qpid/types/encodings.h
@@ -23,11 +23,13 @@
*/
namespace qpid {
namespace types {
+
namespace encodings {
const std::string BINARY("binary");
const std::string UTF8("utf8");
const std::string ASCII("ascii");
}
+
}} // namespace qpid::types
#endif /*!QPID_TYPES_ENCODINGS_H*/
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index c914c50e33..f3443aa57e 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -360,6 +360,11 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
# paged queue not yet implemented for windows
add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix})
endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+
+if (BUILD_AMQP)
+ add_test (interop_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interop_tests.py)
+endif (BUILD_AMQP)
+
add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
if (BUILD_AMQP)
diff --git a/qpid/cpp/src/tests/Variant.cpp b/qpid/cpp/src/tests/Variant.cpp
index d2394bfbad..d6605f9fe5 100644
--- a/qpid/cpp/src/tests/Variant.cpp
+++ b/qpid/cpp/src/tests/Variant.cpp
@@ -18,14 +18,16 @@
* under the License.
*
*/
-#include <iostream>
-#include "qpid/types/Variant.h"
-#include "qpid/amqp_0_10/Codecs.h"
#include "unit_test.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <boost/assign.hpp>
+#include <iostream>
using namespace qpid::types;
using namespace qpid::amqp_0_10;
+using boost::assign::list_of;
namespace qpid {
namespace tests {
@@ -807,6 +809,22 @@ QPID_AUTO_TEST_CASE(parse)
BOOST_CHECK(a.getType()==types::VAR_DOUBLE);
}
+QPID_AUTO_TEST_CASE(described)
+{
+ Variant a;
+ BOOST_CHECK(!a.isDescribed());
+ a.getDescriptors().push_back("foo");
+ BOOST_CHECK(a.isDescribed());
+ BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo"));
+ a = 42;
+ BOOST_CHECK(a.isDescribed());
+ BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo"));
+ a.getDescriptors().push_back(33);
+ BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")(33));
+ a.getDescriptors().clear();
+ BOOST_CHECK(!a.isDescribed());
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index ba65936df7..2566bc527d 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -21,6 +21,7 @@
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
+import proton
from qpid import connection, util
from qpid.compat import format_exc
from unittest import TestCase
@@ -48,13 +49,18 @@ from qpidtoollibs import BrokerAgent
import qpid.messaging
qm = qpid.messaging
qpid_messaging = None
+
+def env_has_log_config():
+ """True if there are qpid log configuratoin settings in the environment."""
+ return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ
+
if not os.environ.get("QPID_PY_NO_SWIG"):
try:
import qpid_messaging
from qpid.datatypes import uuid4
qm = qpid_messaging
# Silence warnings from swigged messaging library unless enabled in environment.
- if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+ if not env_has_log_config():
qm.Logger.configure(["--log-enable=error"])
except ImportError:
print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
@@ -135,7 +141,7 @@ _popen_id = AtomicCounter() # Popen identifier for use in output file names.
# Constants for file descriptor arguments to Popen
FILE = "FILE" # Write to file named after process
-PIPE = subprocess.PIPE
+from subprocess import PIPE, STDOUT
class Popen(subprocess.Popen):
"""
@@ -201,7 +207,7 @@ class Popen(subprocess.Popen):
def communicate(self, input=None):
ret = subprocess.Popen.communicate(self, input)
- self.cleanup()
+ self._cleanup()
return ret
def is_running(self): return self.poll() is None
@@ -253,6 +259,7 @@ class Popen(subprocess.Popen):
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
+
def checkenv(name):
value = os.getenv(name)
if not value: raise Exception("Environment variable %s is not set" % name)
@@ -307,7 +314,7 @@ class Broker(Popen):
cmd += ["--log-to-stderr=no"]
# Add default --log-enable arguments unless args already has --log arguments.
- if not [l for l in args if l.startswith("--log")]:
+ if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
args += ["--log-enable=info+"]
if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
@@ -443,10 +450,11 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content):
finally:
r.close()
-def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
+ if msg is None: msg = "browse '%s' failed" % queue
actual_contents = browse(session, queue, timeout, transform=transform)
if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
assert expect_contents == actual_contents, msg
@@ -485,6 +493,18 @@ class BrokerTest(TestCase):
test_store_lib = os.getenv("TEST_STORE_LIB")
rootdir = os.getcwd()
+ PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR)
+ PN_TX_VERSION = (0, 9)
+
+ amqp_tx_supported = PN_VERSION >= PN_TX_VERSION
+
+ @classmethod
+ def amqp_tx_warning(cls):
+ if not cls.amqp_tx_supported:
+ print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION)
+ return False
+ return True
+
def configure(self, config): self.config=config
def setUp(self):
@@ -497,8 +517,8 @@ class BrokerTest(TestCase):
if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0"
else: default_protocol="amqp0-10"
self.protocol = defs.get("PROTOCOL") or default_protocol
- self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
-
+ self.tx_protocol = self.protocol
+ if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10"
def tearDown(self):
err = []
@@ -529,15 +549,22 @@ class BrokerTest(TestCase):
self.teardown_add(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
+ def check_output(self, args, stdin=None):
+ p = self.popen(args, stdout=PIPE, stderr=STDOUT)
+ out = p.communicate(stdin)
+ if p.returncode != 0:
+ raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0]))
+ return out[0]
+
def browse(self, *args, **kwargs): browse(*args, **kwargs)
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index e262faea40..82ca808cb1 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -132,7 +132,7 @@ class HaBroker(Broker):
"--link-maintenance-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
# Add default --log-enable arguments unless args already has --log arguments.
- if not [l for l in args if l.startswith("--log")]:
+ if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
args += ["--log-enable=info+", "--log-enable=debug+:ha::"]
if not [h for h in args if h.startswith("--link-heartbeat-interval")]:
args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)]
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 1d475ebfe7..180831569f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1025,8 +1025,8 @@ class LongTests(HaBrokerTest):
"--broker", brokers[0].host_port(),
"--address", "q;{create:always}",
"--messages=1000",
- "--tx=10"
- # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
+ "--tx=10",
+ "--connection-options={protocol:%s}" % self.tx_protocol
])
receiver = self.popen(
["qpid-receive",
@@ -1034,8 +1034,8 @@ class LongTests(HaBrokerTest):
"--address", "q;{create:always}",
"--messages=990",
"--timeout=10",
- "--tx=10"
- # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
+ "--tx=10",
+ "--connection-options={protocol:%s}" % self.tx_protocol
])
self.assertEqual(sender.wait(), 0)
self.assertEqual(receiver.wait(), 0)
@@ -1268,7 +1268,7 @@ class StoreTests(HaBrokerTest):
"""Verify that a backup erases queue data from store recovery before
doing catch-up from the primary."""
if self.check_skip(): return
- cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
+ cluster = HaCluster(self, 2)
sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1532,7 +1532,7 @@ class TransactionTests(HaBrokerTest):
except qm.TransactionUnknown: pass
for b in cluster: self.assert_tx_clean(b)
try: tx.connection.close()
- except TransactionUnknown: pass # Occasionally get exception on close.
+ except qm.TransactionUnknown: pass # Occasionally get exception on close.
finally: l.restore()
def test_tx_no_backups(self):
@@ -1622,17 +1622,20 @@ class TransactionTests(HaBrokerTest):
import qpid_tests.broker_0_10
except ImportError:
raise Skipped("Tests not found")
-
cluster = HaCluster(self, 3)
- self.popen(["qpid-txtest", "-p%s"%cluster[0].port()]).assert_exit_ok()
+ if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"]
+ self.popen(["qpid-txtest2", "--broker", cluster[0].host_port()]).assert_exit_ok()
+ print
self.popen(["qpid-python-test",
"-m", "qpid_tests.broker_0_10",
+ "-m", "qpid_tests.broker_1_0",
"-b", "localhost:%s"%(cluster[0].port()),
- "*.tx.*"]).assert_exit_ok()
+ "*.tx.*"], stdout=None, stderr=None).assert_exit_ok()
if __name__ == "__main__":
qpid_ha_exec = os.getenv("QPID_HA_EXEC")
if qpid_ha_exec and os.path.isfile(qpid_ha_exec):
+ BrokerTest.amqp_tx_warning()
outdir = "ha_tests.tmp"
shutil.rmtree(outdir, True)
os.execvp("qpid-python-test",
diff --git a/qpid/cpp/src/tests/interop_tests.py b/qpid/cpp/src/tests/interop_tests.py
new file mode 100755
index 0000000000..d5533ead21
--- /dev/null
+++ b/qpid/cpp/src/tests/interop_tests.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A set of tests that can be run against a foreign AMQP 1.0 broker.
+
+RUNNING WITH A FOREIGN BROKER:
+
+1. Start the broker
+2. Create persistent queues named: interop-a interop-b interop-q tx-1 tx-2
+3. Export the environment variable QPID_INTEROP_URL with the URL to connect to your broker
+ in the form [user[:password]@]host[:port]
+4. From the build directory run this test:
+ ctest -VV -R interop_tests
+
+If QPID_INTEROP_URL is not set, a qpidd broker will be started for the test.
+"""
+
+import os, sys, shutil, subprocess
+import qpid_messaging as qm
+from brokertest import *
+
+URL='QPID_INTEROP_URL'
+
+class InteropTest(BrokerTest):
+
+ def setUp(self):
+ super(InteropTest, self).setUp()
+ self.url = os.environ[URL]
+ self.connect_opts = ['--broker', self.url, '--connection-options', '{protocol:amqp1.0}']
+
+ def connect(self, **kwargs):
+ """Python connection to interop URL"""
+ c = qm.Connection.establish(self.url, protocol='amqp1.0', **kwargs)
+ self.teardown_add(c)
+ return c
+
+ def drain(self, queue, connection=None):
+ """
+ Drain a queue to make sure it is empty. Throw away the messages.
+ """
+ c = connection or self.connect()
+ r = c.session().receiver(queue)
+ try:
+ while True:
+ r.fetch(timeout=0)
+ r.session.acknowledge()
+ except qm.Empty:
+ pass
+ r.close()
+
+ def clear_queue(self, queue, connection=None, properties=None, durable=False):
+ """
+ Make empty queue, prefix with self.id(). Create if needed, drain if needed
+ @return queue name.
+ """
+ queue = "interop-%s" % queue
+ c = connection or self.connect()
+ props = {'create':'always'}
+ if durable: props['node'] = {'durable':True}
+ if properties: props.update(properties)
+ self.drain("%s;%s" % (queue, props), c)
+ return queue
+
+
+class SimpleTest(InteropTest):
+ """Simple test to check the broker is responding."""
+
+ def test_send_receive_python(self):
+ c = self.connect()
+ q = self.clear_queue('q', c)
+ s = c.session()
+ s.sender(q).send('foo')
+ self.assertEqual('foo', s.receiver(q).fetch().content)
+
+ def test_send_receive_cpp(self):
+ q = self.clear_queue('q')
+ args = ['-b', self.url, '-a', q]
+ self.check_output(['qpid-send', '--content-string=cpp_foo'] + args)
+ self.assertEqual('cpp_foo', self.check_output(['qpid-receive'] + args).strip())
+
+
+class PythonTxTest(InteropTest):
+
+ def tx_simple_setup(self):
+ """Start a transaction, remove messages from queue a, add messages to queue b"""
+ c = self.connect()
+ qa, qb = self.clear_queue('a', c, durable=True), self.clear_queue('b', c, durable=True)
+
+ # Send messages to a, no transaction.
+ sa = c.session().sender(qa+";{create:always,node:{durable:true}}")
+ tx_msgs = ['x', 'y', 'z']
+ for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
+
+ # Receive messages from a, in transaction.
+ tx = c.session(transactional=True)
+ txr = tx.receiver(qa)
+ self.assertEqual(tx_msgs, [txr.fetch(1).content for i in xrange(3)])
+ tx.acknowledge()
+
+ # Send messages to b, transactional, mixed with non-transactional.
+ sb = c.session().sender(qb+";{create:always,node:{durable:true}}")
+ txs = tx.sender(qb)
+ msgs = [str(i) for i in xrange(3)]
+ for tx_m, m in zip(tx_msgs, msgs):
+ txs.send(tx_m);
+ sb.send(m)
+ tx.sync()
+ return tx, qa, qb
+
+ def test_tx_simple_commit(self):
+ tx, qa, qb = self.tx_simple_setup()
+ s = self.connect().session()
+ assert_browse(s, qa, [])
+ assert_browse(s, qb, ['0', '1', '2'])
+ tx.commit()
+ assert_browse(s, qa, [])
+ assert_browse(s, qb, ['0', '1', '2', 'x', 'y', 'z'])
+
+ def test_tx_simple_rollback(self):
+ tx, qa, qb = self.tx_simple_setup()
+ s = self.connect().session()
+ assert_browse(s, qa, [])
+ assert_browse(s, qb, ['0', '1', '2'])
+ tx.rollback()
+ assert_browse(s, qa, ['x', 'y', 'z'])
+ assert_browse(s, qb, ['0', '1', '2'])
+
+ def test_tx_sequence(self):
+ tx = self.connect().session(transactional=True)
+ notx = self.connect().session()
+ q = self.clear_queue('q', tx.connection, durable=True)
+ s = tx.sender(q)
+ r = tx.receiver(q)
+ s.send('a')
+ tx.commit()
+ assert_browse(notx, q, ['a'])
+ s.send('b')
+ tx.commit()
+ assert_browse(notx, q, ['a', 'b'])
+ self.assertEqual('a', r.fetch().content)
+ tx.acknowledge();
+ tx.commit()
+ assert_browse(notx, q, ['b'])
+ s.send('z')
+ tx.rollback()
+ assert_browse(notx, q, ['b'])
+ self.assertEqual('b', r.fetch().content)
+ tx.acknowledge();
+ tx.rollback()
+ assert_browse(notx, q, ['b'])
+
+
+class CppTxTest(InteropTest):
+
+ def test_txtest2(self):
+ self.popen(["qpid-txtest2"] + self.connect_opts).assert_exit_ok()
+
+ def test_send_receive(self):
+ q = self.clear_queue('q', durable=True)
+ sender = self.popen(["qpid-send",
+ "--address", q,
+ "--messages=100",
+ "--tx=10",
+ "--durable=yes"] + self.connect_opts)
+ receiver = self.popen(["qpid-receive",
+ "--address", q,
+ "--messages=90",
+ "--timeout=10",
+ "--tx=10"] + self.connect_opts)
+ sender.assert_exit_ok()
+ receiver.assert_exit_ok()
+ expect = [long(i) for i in range(91, 101)]
+ sn = lambda m: m.properties["sn"]
+ assert_browse(self.connect().session(), q, expect, transform=sn)
+
+
+if __name__ == "__main__":
+ if not BrokerTest.amqp_tx_supported:
+ BrokerTest.amqp_tx_warning()
+ print "Skipping interop_tests"
+ exit(0)
+ outdir = "interop_tests.tmp"
+ shutil.rmtree(outdir, True)
+ cmd = ["qpid-python-test", "-m", "interop_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:]
+ if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"]
+ if os.environ.get(URL):
+ os.execvp(cmd[0], cmd)
+ else:
+ dir = os.getcwd()
+ class StartBroker(BrokerTest):
+ def start_qpidd(self): pass
+ test = StartBroker('start_qpidd')
+ class Config:
+ def __init__(self):
+ self.defines = { 'OUTDIR': outdir }
+ test.configure(Config())
+ test.setUp()
+ os.environ[URL] = test.broker().host_port()
+ os.chdir(dir)
+ p = subprocess.Popen(cmd)
+ status = p.wait()
+ test.tearDown()
+ sys.exit(status)
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 05a1a6df10..a71fd11fa7 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -197,7 +197,7 @@ int main(int argc, char ** argv)
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
Receiver receiver = session.createReceiver(opts.address);
- receiver.setCapacity(opts.capacity);
+ receiver.setCapacity(std::min(opts.capacity, opts.messages));
Message msg;
uint count = 0;
uint txCount = 0;
@@ -207,9 +207,9 @@ int main(int argc, char ** argv)
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
if (!opts.readyAddress.empty()) {
session.createSender(opts.readyAddress).send(msg);
- if (opts.tx)
- session.commit();
- }
+ if (opts.tx)
+ session.commit();
+ }
// For receive rate calculation
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
@@ -290,6 +290,7 @@ int main(int argc, char ** argv)
connection.close();
return 0;
}
+ return 1;
} catch(const std::exception& error) {
std::cerr << "qpid-receive: " << error.what() << std::endl;
connection.close();
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index 498dc96ce9..970944f8d0 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -112,14 +112,14 @@ struct Options : public qpid::Options
log(argv0),
reportTotal(false),
reportEvery(0),
- reportHeader(true),
- sendRate(0),
- sequence(true),
- timestamp(true),
- groupPrefix("GROUP-"),
- groupSize(10),
- groupRandSize(false),
- groupInterleave(1)
+ reportHeader(true),
+ sendRate(0),
+ sequence(true),
+ timestamp(true),
+ groupPrefix("GROUP-"),
+ groupSize(10),
+ groupRandSize(false),
+ groupInterleave(1)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -272,7 +272,7 @@ class MapContentGenerator : public ContentGenerator {
// tag each generated message with a group identifer
//
class GroupGenerator {
-public:
+ public:
GroupGenerator(const std::string& key,
const std::string& prefix,
const uint size,
@@ -351,7 +351,7 @@ int main(int argc, char ** argv)
try {
Options opts;
if (opts.parse(argc, argv)) {
- connection = Connection(opts.url, opts.connectionOptions);
+ connection = Connection(opts.url, opts.connectionOptions);
connection.open();
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
@@ -447,6 +447,7 @@ int main(int argc, char ** argv)
connection.close();
return 0;
}
+ return 1;
} catch(const std::exception& error) {
std::cerr << "qpid-send: " << error.what() << std::endl;
connection.close();
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index 2393ec2396..58c48f9a8d 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -353,10 +353,11 @@ int main(int argc, char** argv)
if (opts.init) controller.init();
if (opts.transfer) controller.transfer();
if (opts.check) return controller.check();
+ return 0;
}
- return 0;
+ return 1;
} catch(const std::exception& e) {
- std::cout << argv[0] << ": " << e.what() << std::endl;
+ std::cerr << argv[0] << ": " << e.what() << std::endl;
}
return 2;
}
diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests
index 4d9e5e35d4..40c35ac0fa 100755
--- a/qpid/cpp/src/tests/swig_python_tests
+++ b/qpid/cpp/src/tests/swig_python_tests
@@ -39,7 +39,8 @@ skip() {
}
start_broker() {
- QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker"
+ rm -f swig_python_tests.log
+ QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no --log-to-file swig_python_tests.log) || fail "Could not start broker"
}
stop_broker() {
@@ -54,9 +55,9 @@ echo "Running swigged python tests using broker on port $QPID_PORT"
export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG
export QPID_USE_SWIG_CLIENT=1
-$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1
+$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests $* || FAILED=1
if [[ -a $AMQP_LIB ]] ; then
- $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
+ $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests $* || FAILED=1
fi
stop_broker
if [[ $FAILED -eq 1 ]]; then
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 96f1596890..1c4c117e4b 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -20,14 +20,14 @@
absdir() { echo `cd $1 && pwd`; }
# Environment variables substituted by cmake.
-srcdir=`absdir @abs_srcdir@`
-builddir=`absdir @abs_builddir@`
-top_srcdir=`absdir @abs_top_srcdir@`
-top_builddir=`absdir @abs_top_builddir@`
-moduledir=$top_builddir/src@builddir_lib_suffix@
-pythonswigdir=$top_builddir/bindings/qpid/python/
-pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
-testmoduledir=$builddir@builddir_lib_suffix@
+export srcdir=`absdir @abs_srcdir@`
+export builddir=`absdir @abs_builddir@`
+export top_srcdir=`absdir @abs_top_srcdir@`
+export top_builddir=`absdir @abs_top_builddir@`
+export moduledir=$top_builddir/src@builddir_lib_suffix@
+export pythonswigdir=$top_builddir/bindings/qpid/python/
+export pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
+export testmoduledir=$builddir@builddir_lib_suffix@
export QPID_INSTALL_PREFIX=@prefix@
# Tools substituted by cmake
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
index ee04dddd6a..14aee7b648 100644
--- a/qpid/cpp/src/tests/test_store.cpp
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -223,27 +223,18 @@ class TestStore : public NullMessageStore {
const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& queue)
{
- qpid::broker::amqp_0_10::MessageTransfer* msg =
- dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
- assert(msg);
-
ostringstream o;
- o << "<enqueue " << queue.getName() << " " << getContent(msg);
+ string data = getContent(pmsg);
+ o << "<enqueue " << queue.getName() << " " << data;
if (tx) o << " tx=" << getId(*tx);
o << ">";
log(o.str());
// Dump the message if there is a dump file.
if (dump.get()) {
- msg->getFrames().getMethod()->print(*dump);
- *dump << endl << " ";
- msg->getFrames().getHeaders()->print(*dump);
- *dump << endl << " ";
- *dump << msg->getFrames().getContentSize() << endl;
+ *dump << "Message(" << data.size() << "): " << data << endl;
}
string logPrefix = "TestStore "+name+": ";
- // Check the message for special instructions for this store.
- string data = msg->getFrames().getContent();
Action action(data);
bool doComplete = true;
if (action.index && action.executeIn(name)) {
@@ -258,7 +249,7 @@ class TestStore : public NullMessageStore {
QPID_LOG(error, logPrefix << "async-id needs argument: " << data);
break;
}
- asyncIds[action.args[0]] = msg;
+ asyncIds[action.args[0]] = pmsg;
QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]);
doComplete = false;
break;
@@ -284,7 +275,7 @@ class TestStore : public NullMessageStore {
QPID_LOG(error, logPrefix << "unknown action: " << data);
}
}
- if (doComplete) msg->enqueueComplete();
+ if (doComplete) pmsg->enqueueComplete();
}
void dequeue(TransactionContext* tx,
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
index 5ebbb4c651..b14bb96dc8 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
@@ -23,3 +23,4 @@ from general import *
from legacy_exchanges import *
from selector import *
from translation import *
+from tx import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py b/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py
new file mode 100644
index 0000000000..45817fc64f
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py
@@ -0,0 +1,264 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+
+class TxTests(TestBase010):
+ """
+ Tests for 'methods' on the amqp tx 'class'
+ """
+
+ def test_commit(self):
+ """
+ Test that commited publishes are delivered and commited acks are not re-delivered
+ """
+ session = self.session
+
+ #declare queues and create subscribers in the checking session
+ #to ensure that the queues are not auto-deleted too early:
+ self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+ session.message_subscribe(queue="tx-commit-a", destination="qa")
+ session.message_subscribe(queue="tx-commit-b", destination="qb")
+ session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+ #use a separate session for actual work
+ session2 = self.conn.session("worker", 2)
+ self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ session2.tx_commit()
+ session2.close()
+
+ session.tx_select()
+
+ self.enable_flow("qa")
+ queue_a = session.incoming("qa")
+
+ self.enable_flow("qb")
+ queue_b = session.incoming("qb")
+
+ self.enable_flow("qc")
+ queue_c = session.incoming("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("TxMessage %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("TxMessage 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("TxMessage 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def test_auto_rollback(self):
+ """
+ Test that a session closed with an open transaction is effectively rolled back
+ """
+ session = self.session
+ self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"])
+ session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+ session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+ session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
+ session2 = self.conn.session("worker", 2)
+ queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session2.close()
+
+ session.tx_select()
+
+ self.enable_flow("qa")
+ queue_a = session.incoming("qa")
+
+ self.enable_flow("qb")
+ queue_b = session.incoming("qb")
+
+ self.enable_flow("qc")
+ queue_c = session.incoming("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def test_rollback(self):
+ """
+ Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+ """
+ session = self.session
+ queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session.tx_rollback()
+
+ #need to release messages to get them redelivered now:
+ session.message_release(consumed)
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def perform_txn_work(self, session, name_a, name_b, name_c):
+ """
+ Utility method that does some setup and some work under a transaction. Used for testing both
+ commit and rollback
+ """
+ #setup:
+ self.declare_queues([name_a, name_b, name_c])
+
+ key = "my_key_" + name_b
+ topic = "my_topic_" + name_c
+
+ session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+ session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
+
+ dp = session.delivery_properties(routing_key=name_a)
+ for i in range(1, 5):
+ mp = session.message_properties(message_id="msg%d" % i)
+ session.message_transfer(message=Message(dp, mp, "Message %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
+
+ dp = session.delivery_properties(routing_key=topic)
+ mp = session.message_properties(message_id="msg7")
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
+
+ session.tx_select()
+
+ #consume and ack messages
+ acked = RangedSet()
+ self.subscribe(session, queue=name_a, destination="sub_a")
+ queue_a = session.incoming("sub_a")
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ acked.add(msg.id)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ self.subscribe(session, queue=name_b, destination="sub_b")
+ queue_b = session.incoming("sub_b")
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ acked.add(msg.id)
+
+ sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+ queue_c = session.incoming("sub_c")
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ acked.add(msg.id)
+
+ session.message_accept(acked)
+
+ dp = session.delivery_properties(routing_key=topic)
+ #publish messages
+ for i in range(1, 5):
+ mp = session.message_properties(message_id="tx-msg%d" % i)
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="tx-msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+
+ dp = session.delivery_properties(routing_key=name_a)
+ mp = session.message_properties(message_id="tx-msg7")
+ session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+ return queue_a, queue_b, queue_c, acked
+
+ def declare_queues(self, names, session=None):
+ session = session or self.session
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ def enable_flow(self, tag, session=None):
+ session = session or self.session
+ session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ def complete(self, session, msg):
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ session.channel.session_completed(session.receiver._completed)