diff options
author | Gordon Sim <gsim@apache.org> | 2007-11-06 17:27:27 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-11-06 17:27:27 +0000 |
commit | 6ee11959bff03826083c85943f14e6b4ceeaacff (patch) | |
tree | 217ad7ff3106b030505d7a70007c27aaf0eb2eff | |
parent | 14efd3ce4c7f12a2da13164b5c8acc562fd30832 (diff) | |
download | qpid-python-6ee11959bff03826083c85943f14e6b4ceeaacff.tar.gz |
Add support for array type to c++ (and python, decode only for now)
Change the type of the in-doubt field in dtx-coordination.recover to an array (to bring in line with amqp spec)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@592494 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/rubygen/cppgen.rb | 1 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/Array.cpp | 114 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/Array.h | 75 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/FieldValue.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/FieldValue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/amqp_types_full.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Array.cpp | 78 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java | 4 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java | 4 | ||||
-rw-r--r-- | qpid/python/qpid/codec.py | 17 | ||||
-rw-r--r-- | qpid/python/qpid/spec.py | 1 | ||||
-rw-r--r-- | qpid/python/tests_0-10/dtx.py | 55 | ||||
-rw-r--r-- | qpid/specs/amqp.0-10-preview.xml | 14 |
16 files changed, 355 insertions, 46 deletions
diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index 5db5793649..60a653e18d 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -122,6 +122,7 @@ class AmqpDomain "longstr"=>CppType.new("string").passcref.retcref.code("LongString"), "shortstr"=>CppType.new("string").passcref.retcref.code("ShortString"), "table"=>CppType.new("FieldTable").passcref.retcref, + "array"=>CppType.new("Array").passcref.retcref, "content"=>CppType.new("Content").passcref.retcref, "rfc1982-long-set"=>CppType.new("SequenceNumberSet").passcref.retcref, "long-struct"=>CppType.new("string").passcref.retcref.code("LongString"), diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 09bdb351b1..681b8ed8ed 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -98,6 +98,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/AMQFrame.cpp \ qpid/framing/AMQHeaderBody.cpp \ qpid/framing/AMQHeartbeatBody.cpp \ + qpid/framing/Array.cpp \ qpid/framing/BasicHeaderProperties.cpp \ qpid/framing/BodyHandler.cpp \ qpid/framing/ChannelAdapter.cpp \ @@ -337,6 +338,7 @@ nobase_include_HEADERS = \ qpid/framing/AMQMethodBody.h \ qpid/framing/AMQP_HighestVersion.h \ qpid/framing/AccumulatedAck.h \ + qpid/framing/Array.h \ qpid/framing/BasicHeaderProperties.h \ qpid/framing/Blob.h \ qpid/framing/BodyHandler.h \ diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp index ec042ff56a..533872e849 100644 --- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -20,6 +20,7 @@ #include <boost/format.hpp> #include "Broker.h" #include "qpid/framing/constants.h" +#include "qpid/framing/Array.h" using namespace qpid::broker; using namespace qpid::framing; @@ -136,25 +137,14 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, // strictly 'legal', but that is ok for testing std::set<std::string> xids; getBroker().getStore().collectPreparedXids(xids); - uint size(0); - for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { - size += i->size() + 1/*shortstr size*/; - } - char* bytes = static_cast<char*>(::alloca(size + 4/*longstr size*/)); - Buffer wbuffer(bytes, size + 4/*longstr size*/); - wbuffer.putLong(size); + //TODO: remove the need to copy from one container type to another + std::vector<std::string> data; for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { - wbuffer.putShortString(*i); + data.push_back(*i); } - - Buffer rbuffer(bytes, size + 4/*longstr size*/); - string data; - rbuffer.getLongString(data); - - FieldTable response; - response.setString("xids", data); - return DtxCoordinationRecoverResult(response); + Array indoubt(data); + return DtxCoordinationRecoverResult(indoubt); } void DtxHandlerImpl::forget(u_int16_t /*ticket*/, diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp new file mode 100644 index 0000000000..1215c8a28b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Array.cpp @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Array.h" +#include "Buffer.h" +#include "FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include <assert.h> + +namespace qpid { +namespace framing { + +Array::Array() : typeOctet(0xF0/*void*/) {} + +Array::Array(const std::vector<std::string>& in) +{ + typeOctet = 0xA4; + for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { + ValuePtr value(new StringValue(*i)); + values.push_back(value); + } +} + +uint32_t Array::size() const { + //note: size is only included when used as a 'top level' type + uint32_t len(4/*size*/ + 1/*type*/ + 4/*count*/); + for(ValueVector::const_iterator i = values.begin(); i != values.end(); ++i) { + len += (*i)->getData().size(); + } + return len; +} + +int Array::count() const { + return values.size(); +} + +std::ostream& operator<<(std::ostream& out, const Array& t) { + out << "{"; + for(Array::ValueVector::const_iterator i = t.values.begin(); i != t.values.end(); ++i) { + if (i != t.values.begin()) out << ", "; + out << *(i->get()); + } + return out << "}"; +} + +void Array::encode(Buffer& buffer) const{ + buffer.putLong(size() - 4);//size added only when array is a top-level type + buffer.putOctet(typeOctet); + buffer.putLong(count()); + for (ValueVector::const_iterator i = values.begin(); i!=values.end(); ++i) { + (*i)->getData().encode(buffer); + } +} + +void Array::decode(Buffer& buffer){ + uint32_t size = buffer.getLong();//size added only when array is a top-level type + uint32_t available = buffer.available(); + if (available < size) { + throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected " + << size << " bytes but only " << available << " available")); + } + typeOctet = buffer.getOctet(); + uint32_t count = buffer.getLong(); + + FieldValue dummy; + dummy.setType(typeOctet); + available = buffer.available(); + if (available < count * dummy.getData().size()) { + throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected " + << count << " items of " << dummy.getData().size() + << " bytes each but only " << available << " bytes available")); + } + + for (uint32_t i = 0; i < count; i++) { + ValuePtr value(new FieldValue); + value->setType(typeOctet); + value->getData().decode(buffer); + values.push_back(ValuePtr(value)); + } +} + + +bool Array::operator==(const Array& x) const { + if (typeOctet != x.typeOctet) return false; + if (values.size() != x.values.size()) return false; + + for (ValueVector::const_iterator i = values.begin(), j = x.values.begin(); i != values.end(); ++i, ++j) { + if (*(i->get()) != *(j->get())) return false; + } + + return true; +} + + +} +} diff --git a/qpid/cpp/src/qpid/framing/Array.h b/qpid/cpp/src/qpid/framing/Array.h new file mode 100644 index 0000000000..6a13c63672 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/Array.h @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <vector> +#include <boost/shared_ptr.hpp> +#include <map> +#include "amqp_types.h" +#include "FieldValue.h" + +#ifndef _Array_ +#define _Array_ + +namespace qpid { +namespace framing { + +class Buffer; + +class Array +{ + public: + typedef boost::shared_ptr<FieldValue> ValuePtr; + typedef std::vector<ValuePtr> ValueVector; + + uint32_t size() const; + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + + int count() const; + bool operator==(const Array& other) const; + + Array(); + //only long string arrays can currently be created (any type can be decoded) + Array(const std::vector<std::string>& in); + + template <class T> + void collect(std::vector<T>& out) + { + for (ValueVector::const_iterator i = values.begin(); i != values.end(); ++i) { + out.push_back((*i)->get<T>()); + } + } + + private: + uint8_t typeOctet; + ValueVector values; + + ValueVector::const_iterator begin() const { return values.begin(); } + ValueVector::const_iterator end() const { return values.end(); } + + friend std::ostream& operator<<(std::ostream& out, const Array& body); +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp index 5526c9cb72..961b6017cd 100644 --- a/qpid/cpp/src/qpid/framing/FieldValue.cpp +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -25,9 +25,9 @@ namespace qpid { namespace framing { -void FieldValue::decode(Buffer& buffer) +void FieldValue::setType(uint8_t type) { - typeOctet = buffer.getOctet(); + typeOctet = type; uint8_t lenType = typeOctet >> 4; switch(lenType){ @@ -76,6 +76,11 @@ void FieldValue::decode(Buffer& buffer) default: throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); } +} + +void FieldValue::decode(Buffer& buffer) +{ + setType(buffer.getOctet()); data->decode(buffer); } diff --git a/qpid/cpp/src/qpid/framing/FieldValue.h b/qpid/cpp/src/qpid/framing/FieldValue.h index 3ea367c481..3ec95a99e1 100644 --- a/qpid/cpp/src/qpid/framing/FieldValue.h +++ b/qpid/cpp/src/qpid/framing/FieldValue.h @@ -78,6 +78,8 @@ class FieldValue { FieldValue(): data(0) {}; // Default assignment operator is fine + void setType(uint8_t type); + Data& getData() { return *data; } uint32_t size() const { return 1 + data->size(); }; bool empty() const { return data.get() == 0; } void encode(Buffer& buffer); diff --git a/qpid/cpp/src/qpid/framing/amqp_types_full.h b/qpid/cpp/src/qpid/framing/amqp_types_full.h index bf89d59980..f1ed44ec05 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types_full.h +++ b/qpid/cpp/src/qpid/framing/amqp_types_full.h @@ -30,6 +30,7 @@ */ #include "amqp_types.h" +#include "Array.h" #include "FramingContent.h" #include "FieldTable.h" #include "SequenceNumberSet.h" diff --git a/qpid/cpp/src/tests/Array.cpp b/qpid/cpp/src/tests/Array.cpp new file mode 100644 index 0000000000..5bf7fadce0 --- /dev/null +++ b/qpid/cpp/src/tests/Array.cpp @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <sstream> +#include "qpid/framing/Array.h" +#include "qpid/framing/FieldValue.h" + +#include <boost/test/auto_unit_test.hpp> +BOOST_AUTO_TEST_SUITE(Array); + +using namespace qpid::framing; + +void populate(std::vector<std::string>& data, int count = 10) +{ + for (int i = 0; i < count; i++) { + std::stringstream out; + out << "item-" << i; + data.push_back(out.str()); + } +} + +BOOST_AUTO_TEST_CASE(testEncodeDecode) +{ + std::vector<std::string> data; + populate(data); + + Array a(data); + + char buff[200]; + Buffer wbuffer(buff, 200); + a.encode(wbuffer); + + Array b; + Buffer rbuffer(buff, 200); + b.decode(rbuffer); + BOOST_CHECK_EQUAL(a, b); + + std::vector<std::string> data2; + b.collect(data2); + //BOOST_CHECK_EQUAL(data, data2); + BOOST_CHECK(data == data2); +} + +BOOST_AUTO_TEST_CASE(testAssignment) +{ + std::vector<std::string> data; + populate(data); + Array b; + { + Array a(data); + b = a; + BOOST_CHECK_EQUAL(a, b); + } + std::vector<std::string> data2; + b.collect(data2); + //BOOST_CHECK_EQUAL(data, data2); + BOOST_CHECK(data == data2); +} + +BOOST_AUTO_TEST_SUITE_END(); diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 37d59aa9e2..b4944c1294 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -31,7 +31,7 @@ unit_test_SOURCES= unit_test.cpp \ RefCounted.cpp RefCountedMap.cpp \ SessionState.cpp Blob.cpp logging.cpp \ Url.cpp Uuid.cpp \ - Shlib.cpp FieldValue.cpp FieldTable.cpp + Shlib.cpp FieldValue.cpp FieldTable.cpp Array.cpp check_LTLIBRARIES += libshlibtest.la libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 89f460056c..75218fc32e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -335,9 +335,9 @@ public class XAResourceImpl implements XAResource int i = 0; try { - for (String xid : res.getInDoubt().keySet()) + for (Object xid : res.getInDoubt()) { - result[i] = new XidImpl(xid); + result[i] = new XidImpl((String) xid); i++; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java index 37921ece4d..6e722776e8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java @@ -335,9 +335,9 @@ public class XAResourceImpl implements XAResource int i = 0; try { - for (String xid : res.getInDoubt().keySet()) + for (Object xid : res.getInDoubt()) { - result[i] = new XidImpl(xid); + result[i] = new XidImpl((String) xid); i++; } } diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py index 18928698a5..6399ad2a5d 100644 --- a/qpid/python/qpid/codec.py +++ b/qpid/python/qpid/codec.py @@ -510,6 +510,23 @@ class Codec: type = self.spec.structs[codec.decode_short()] return codec.decode_struct_body(type) + def decode_array(self): + size = self.decode_long() + code = self.decode_octet() + count = self.decode_long() + result = [] + for i in range(0, count): + if self.types.has_key(code): + value = self.decode(self.types[code]) + else: + w = width(code) + if fixed(code): + value = self.read(w) + else: + value = self.read(self.dec_num(w)) + result.append(value) + return result + def fixed(code): return (code >> 6) != 2 diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index 31c79276aa..4d0497bced 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -255,6 +255,7 @@ class Method(Metadata): "shortstr": "", "longstr": "", "table": {}, + "array": [], "octet": 0, "short": 0, "long": 0, diff --git a/qpid/python/tests_0-10/dtx.py b/qpid/python/tests_0-10/dtx.py index 8fdd32c2f5..5ee4dd4c16 100644 --- a/qpid/python/tests_0-10/dtx.py +++ b/qpid/python/tests_0-10/dtx.py @@ -304,6 +304,38 @@ class DtxTests(TestBase): self.assertMessageId("a", "two") self.assertMessageId("b", "one") + def test_suspend_start_end_resume(self): + """ + Test suspension and resumption of an association with work + done on another transaction when the first transaction is + suspended + """ + channel = self.channel + channel.dtx_demarcation_select() + + #setup + channel.queue_declare(queue="one", exclusive=True, auto_delete=True) + channel.queue_declare(queue="two", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + + tx = self.xid("dummy") + + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' + channel.dtx_demarcation_end(xid=tx, suspend=True) + + channel.dtx_demarcation_start(xid=tx, resume=True) + self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' + channel.dtx_demarcation_end(xid=tx) + + #commit and check + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + def test_end_suspend_and_fail(self): """ Verify that the correct error is signalled if the suspend and @@ -538,18 +570,7 @@ class DtxTests(TestBase): else: channel.dtx_coordination_rollback(xid=tx) - indoubt = channel.dtx_coordination_recover().in_doubt - #convert indoubt table to a list of xids (note: this will change for 0-10) - data = indoubt["xids"] - xids = [] - pos = 0 - while pos < len(data): - size = unpack("!B", data[pos])[0] - start = pos + 1 - end = start + size - xid = data[start:end] - xids.append(xid) - pos = end + xids = channel.dtx_coordination_recover().in_doubt #rollback the prepared transactions returned by recover for x in xids: @@ -567,6 +588,16 @@ class DtxTests(TestBase): channel.dtx_coordination_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) + def test_bad_resume(self): + """ + Test that a resume on a session not selected for use with dtx fails + """ + channel = self.channel + try: + channel.dtx_demarcation_start(resume=True) + except Closed, e: + self.assertConnectionException(503, e.args[0]) + def xid(self, txid): DtxTests.tx_counter += 1 branchqual = "v%s" % DtxTests.tx_counter diff --git a/qpid/specs/amqp.0-10-preview.xml b/qpid/specs/amqp.0-10-preview.xml index be15561ed2..6ba6bfc5ed 100644 --- a/qpid/specs/amqp.0-10-preview.xml +++ b/qpid/specs/amqp.0-10-preview.xml @@ -1310,6 +1310,7 @@ <domain name="timestamp" type="timestamp" label="64-bit POSIX timestamp" /> <domain name="table" type="table" label="field table" /> <domain name="uuid" type="uuid" label="UUID (RFC4122 section 4.1.2) - 16 octets" /> + <domain name="array" type="array" label="array"/> <domain name="content" type="content" label="message content"> <doc> @@ -5912,19 +5913,10 @@ are in a prepared or heuristically completed state. </doc> - <field name="in-doubt" domain="table" label="Table of xids to be recovered"> + <field name="in-doubt" domain="array" label="array of xids to be recovered"> <doc> - Table containing the sequence of xids to be recovered (xids that are in a prepared or - heuristically completed state). + xids to be recovered (xids that are in a prepared or heuristically completed state). </doc> - - <rule name="xid-sequence"> - <doc> - The field table must contain a field called 'xids' of type sequence of longstrs - representing the xids that are in a prepared or heuristically completed state. - </doc> - </rule> - <assert check="notnull" /> </field> </struct> |