summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/amqp/Descriptor.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2015-02-27 16:37:06 +0000
committerAlan Conway <aconway@apache.org>2015-02-27 16:37:06 +0000
commit3aaa53e9103b6019c9e31d15186b12a95a1993be (patch)
treef5950c063ff08f574c808023ece7745739ca7027 /qpid/cpp/src/qpid/amqp/Descriptor.cpp
parent9c9f0e2c935d11c0f8d1ebddf1bbb78c3c22c606 (diff)
downloadqpid-python-3aaa53e9103b6019c9e31d15186b12a95a1993be.tar.gz
QPID-4710: [AMQP 1.0] Support for transactions in qpid::messaging C++ client.
Implements the "transactional retire and settle immediately" option for transactions as specified in AMQP 1.0 in the qpid::messaging C++ client. NOTE: Transactions over AMQP 1.0 require proton 0.9 or greater. With older versions, attempting a transactions over AMQP 1.0 will raise a link-detached exception "Node not found: tx-transaction" 1. Added descriptor list to Variant with support in Encoder and PnData. Required to support transactions, need to be able to create described lists. Variant changes are source and binary compatible. A Variant now has a Variant::List of descripors which can be numeric or string. Nested descriptors are implemented by putting multiple descriptors in the list. Other minor changes: - Variant refactor: don't delete impl on every assignment. - Add Variant constructors that take a string encoding. (new constructors, not defaulted arguments, so the change is binary and source compatible.) - Growable buffer support for Encoder. - Printing described Variant prints descriptors in form @descriptor value 2. Added transaction support to AMQP 1.0 client code Added messaging/amqp/Transaction.h,cpp: transaction logic - communicate with coordinator, send declare/dischange messages. - add tx state info to transfers and acknowledgements. - Sync session after discharge. - A transactional session automatically acks any message retrieved by fetch/get to bring them into the transaction. This is consistent the 0-10 client. Minor fixes to existing client code: - Fix use of pn_drain API in C++ client to work with C++ and Java brokers. - Make amqp::Exception derive from qpid::Exception 3. Fixes to existing broker code: - Incoming.cpp fix: start async completion before processing message. - Delay accept of dischage message till commit is complete. - newSession - handle failover during session creation. 4. Added tests interop_tests.py: transaction tests that can run against an external broker, see comments. ha_tests.py: Enable transaction tests over AMQP 1.0. Minor test fixes: - brokertest.py don't set default logging if QPID_LOG env vars set. - brokertest.py Pass kwargs to broker() create function. - qpid-receive: capacity should never be larger than message count. - Accept user:pass as well as user/pass in Url. - brokertest.py: Always do a ready() check on all brokers. If proton < 0.9 is used, transaction tests will be skipped or will downgrade to the amqp0-10 protocol with a printed warning. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1662743 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/amqp/Descriptor.cpp')
-rw-r--r--qpid/cpp/src/qpid/amqp/Descriptor.cpp97
1 files changed, 84 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.cpp b/qpid/cpp/src/qpid/amqp/Descriptor.cpp
index 9e33294edd..43d388ee76 100644
--- a/qpid/cpp/src/qpid/amqp/Descriptor.cpp
+++ b/qpid/cpp/src/qpid/amqp/Descriptor.cpp
@@ -19,11 +19,17 @@
*
*/
#include "Descriptor.h"
+#include "descriptors.h"
+#include <qpid/framing/reply_exceptions.h>
+#include <map>
namespace qpid {
namespace amqp {
+
Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; }
+
Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { value.symbol = symbol; }
+
bool Descriptor::match(const std::string& symbol, uint64_t code) const
{
switch (type) {
@@ -58,20 +64,85 @@ Descriptor* Descriptor::nest(const Descriptor& d)
return nested.get();
}
-std::ostream& operator<<(std::ostream& os, const Descriptor& d)
-{
- switch (d.type) {
- case Descriptor::SYMBOLIC:
- if (d.value.symbol.data && d.value.symbol.size) os << std::string(d.value.symbol.data, d.value.symbol.size);
- else os << "null";
- break;
- case Descriptor::NUMERIC:
- os << "0x" << std::hex << d.value.code;
- break;
+namespace {
+
+class DescriptorMap {
+ typedef std::map<uint64_t, std::string> SymbolMap;
+ typedef std::map<std::string, uint64_t> CodeMap;
+
+ SymbolMap symbols;
+ CodeMap codes;
+
+ public:
+ DescriptorMap() {
+ symbols[message::HEADER_CODE] = message::HEADER_SYMBOL;
+ symbols[message::DELIVERY_ANNOTATIONS_CODE] = message::DELIVERY_ANNOTATIONS_SYMBOL;
+ symbols[message::MESSAGE_ANNOTATIONS_CODE] = message::MESSAGE_ANNOTATIONS_SYMBOL;
+ symbols[message::PROPERTIES_CODE] = message::PROPERTIES_SYMBOL;
+ symbols[message::APPLICATION_PROPERTIES_CODE] = message::APPLICATION_PROPERTIES_SYMBOL;
+ symbols[message::DATA_CODE] = message::DATA_SYMBOL;
+ symbols[message::AMQP_SEQUENCE_CODE] = message::AMQP_SEQUENCE_SYMBOL;
+ symbols[message::AMQP_VALUE_CODE] = message::AMQP_VALUE_SYMBOL;
+ symbols[message::FOOTER_CODE] = message::FOOTER_SYMBOL;
+ symbols[message::ACCEPTED_CODE] = message::ACCEPTED_SYMBOL;
+ symbols[sasl::SASL_MECHANISMS_CODE] = sasl::SASL_MECHANISMS_SYMBOL;
+ symbols[sasl::SASL_INIT_CODE] = sasl::SASL_INIT_SYMBOL;
+ symbols[sasl::SASL_CHALLENGE_CODE] = sasl::SASL_CHALLENGE_SYMBOL;
+ symbols[sasl::SASL_RESPONSE_CODE] = sasl::SASL_RESPONSE_SYMBOL;
+ symbols[sasl::SASL_OUTCOME_CODE] = sasl::SASL_OUTCOME_SYMBOL;
+ symbols[filters::LEGACY_DIRECT_FILTER_CODE] = filters::LEGACY_DIRECT_FILTER_SYMBOL;
+ symbols[filters::LEGACY_TOPIC_FILTER_CODE] = filters::LEGACY_TOPIC_FILTER_SYMBOL;
+ symbols[filters::LEGACY_HEADERS_FILTER_CODE] = filters::LEGACY_HEADERS_FILTER_SYMBOL;
+ symbols[filters::SELECTOR_FILTER_CODE] = filters::SELECTOR_FILTER_SYMBOL;
+ symbols[filters::XQUERY_FILTER_CODE] = filters::XQUERY_FILTER_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_CLOSE_CODE] = lifetime_policy::DELETE_ON_CLOSE_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_NO_LINKS_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_NO_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL;
+ symbols[lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL;
+ symbols[transaction::DECLARE_CODE] = transaction::DECLARE_SYMBOL;
+ symbols[transaction::DISCHARGE_CODE] = transaction::DISCHARGE_SYMBOL;
+ symbols[transaction::DECLARED_CODE] = transaction::DECLARED_SYMBOL;
+ symbols[transaction::TRANSACTIONAL_STATE_CODE] = transaction::TRANSACTIONAL_STATE_SYMBOL;
+ symbols[0] = "unknown-descriptor";
+
+ for (SymbolMap::const_iterator i = symbols.begin(); i != symbols.end(); ++i)
+ codes[i->second] = i->first;
+ }
+
+ std::string operator[](uint64_t code) const {
+ SymbolMap::const_iterator i = symbols.find(code);
+ return (i == symbols.end()) ? "unknown-descriptor" : i->second;
}
- if (d.nested.get()) {
- os << " ->(" << *d.nested << ")";
+
+ uint64_t operator[](const std::string& symbol) const {
+ CodeMap::const_iterator i = codes.find(symbol);
+ return (i == codes.end()) ? 0 : i->second;
+ }
+};
+
+DescriptorMap DESCRIPTOR_MAP;
+}
+
+std::string Descriptor::symbol() const {
+ switch (type) {
+ case Descriptor::NUMERIC: return DESCRIPTOR_MAP[value.code];
+ case Descriptor::SYMBOLIC: return value.symbol.str();
+ }
+ assert(0);
+ return std::string();
+}
+
+uint64_t Descriptor::code() const {
+ switch (type) {
+ case Descriptor::NUMERIC: return value.code;
+ case Descriptor::SYMBOLIC: return DESCRIPTOR_MAP[value.symbol.str()];
}
- return os;
+ assert(0);
+ return 0;
}
+
+std::ostream& operator<<(std::ostream& os, const Descriptor& d) {
+ return os << d.symbol() << "(" << "0x" << std::hex << d.code() << ")";
+}
+
}} // namespace qpid::amqp