diff options
author | Alan Conway <aconway@apache.org> | 2008-11-04 19:52:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-04 19:52:49 +0000 |
commit | eda249ff22edb3726243da81ff48c82e4d88e872 (patch) | |
tree | 0939d790e6a1b0d86993c9c3804c1adaa369aeb8 | |
parent | 5d2471636928eff8b8031237c54348db0d5c388d (diff) | |
download | qpid-python-eda249ff22edb3726243da81ff48c82e4d88e872.tar.gz |
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711365 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | cpp/rubygen/amqpgen.rb | 25 | ||||
-rwxr-xr-x | cpp/rubygen/cppgen.rb | 2 | ||||
-rwxr-xr-x | cpp/rubygen/framing.0-10/constants.rb | 57 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Array.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Array.h | 36 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldValue.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldValue.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/FieldValue.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 45 |
14 files changed, 267 insertions, 87 deletions
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb index 80542ebc77..b77ab09ac0 100755 --- a/cpp/rubygen/amqpgen.rb +++ b/cpp/rubygen/amqpgen.rb @@ -165,7 +165,7 @@ class AmqpElement # The root <amqp> element. def root() @root ||=parent ? parent.root : self; end - def to_s() "#<#{self.class}(#{fqname})>"; end + def to_s() "#<#{self.class}(#{fqname})>"; end def inspect() to_s; end # Text of doc child if there is one. @@ -181,6 +181,21 @@ class AmqpElement return self if is_a? AmqpClass return parent && parent.containing_class end + + # 0-10 array domains are missing element type information, add it here. + ArrayTypes={ + "str16-array" => "str-16", + "amqp-host-array" => "connection.amqp-host-url", + "command-fragments" => "session.command-fragment", + "in-doubt" => "dtx.xid", + "tx-publish" => "str-8", + "queues" => "str-8" + } + + def array_type(name) + return ArrayTypes[name] if ArrayTypes[name] + raise "Missing ArrayType entry for " + name + end end @@ -204,14 +219,6 @@ class AmqpEnum < AmqpElement amqp_child_reader :choice end -# 0-10 array domains are missing element type information, add it here. -ArrayTypes={ - "str16-array" => "str-16", - "amqp-host-array" => "connection.amqp-host-url", - "command-fragments" => "session.command-fragment", - "in-doubt" => "dtx.xid" -} - class AmqpDomain < AmqpElement def initialize(xml, parent) super diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb index 13f6f3744d..7818e1c4b0 100755 --- a/cpp/rubygen/cppgen.rb +++ b/cpp/rubygen/cppgen.rb @@ -147,7 +147,7 @@ end class AmqpElement # convert my amqp type_ attribute to a C++ type. def amqp2cpp() - return "ArrayDomain<#{ArrayTypes[name].amqp2cpp}> " if type_=="array" + return "ArrayDomain<#{array_type(name).amqp2cpp}> " if type_=="array" return type_.amqp2cpp end diff --git a/cpp/rubygen/framing.0-10/constants.rb b/cpp/rubygen/framing.0-10/constants.rb index 0560cef887..206aabadf9 100755 --- a/cpp/rubygen/framing.0-10/constants.rb +++ b/cpp/rubygen/framing.0-10/constants.rb @@ -46,6 +46,62 @@ class ConstantsGen < CppGen } end + def typecode_enum(t) "TYPE_CODE_#{t.name.shout}" end + + def typecode_h_cpp + path="#{@dir}/TypeCode" + h_file(path) { + include("<iosfwd>") + namespace(@namespace) { + scope("enum TypeCode {", "};") { + genl @amqp.types.map { |t| "#{typecode_enum t} = #{t.code}" if t.code }.compact.join(",\n") + } + genl <<EOS + +/** True if t is a valid TypeCode value */ +bool isTypeCode(uint8_t t); + +/** Throw exception if not a valid TypeCode */ +TypeCode typeCode(uint8_t); + +/**@return 0 if t is not a valid enum TypeCode value. */ +const char* typeName(TypeCode t); + +std::ostream& operator<<(std::ostream&, TypeCode); +EOS + } + } + + cpp_file(path) { + include(path); + include("qpid/Exception.h") + include("<ostream>") + namespace(@namespace) { + scope("const char* typeName(TypeCode t) {") { + scope("switch (t) {") { + @amqp.types.each { |t| genl "case #{typecode_enum t}: return \"#{t.name}\";" if t.code } + genl "default: break;" + } + genl "return 0;"; + } + genl <<EOS + +bool isTypeCode(uint8_t t) { return typeName(TypeCode(t)); } + +TypeCode typeCode(uint8_t t) { + if (!isTypeCode(t)) throw Exception(QPID_MSG("Invalid TypeCode " << t)); + return TypeCode(t); +} + +std::ostream& operator<<(std::ostream& o, TypeCode t) { + if (isTypeCode(t)) return o << typeName(t); + else return o << "Invalid TypeCode " << t; +} +EOS + } + } + end + def enum_h() h_file("#{@dir}/enum") { # Constants for enum domains. @@ -134,6 +190,7 @@ class ConstantsGen < CppGen enum_h reply_exceptions_h reply_exceptions_cpp + typecode_h_cpp end end diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 250d520145..97807425dd 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -101,11 +101,8 @@ class SessionState : public qpid::SessionState, void readyToSend(); // Used by cluster to create replica sessions. - template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); } - template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); } - SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); } + SemanticState& getSemanticState() { return semanticState; } boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); } - void record(const DeliveryRecord& delivery) { semanticState.record(delivery); } private: diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 604df9dde6..ada26ab2fb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" +#include "qpid/broker/TxPublish.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -35,6 +36,14 @@ #include <boost/current_function.hpp> +// FIXME aconway 2008-11-03: +// +// Disproportionate amount of code here is dedicated to receiving a +// brain-dump when joining a cluster and building initial +// state. Should be separated out into its own classes. +// + + namespace qpid { namespace cluster { @@ -180,10 +189,16 @@ void Connection::deliverBuffer(Buffer& buf) { delivered(mcastDecoder.frame); } +broker::SessionState& Connection::sessionState() { + return *connection.getChannel(currentChannel).getSession(); +} + +broker::SemanticState& Connection::semanticState() { + return sessionState().getSemanticState(); +} + void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - broker::SemanticState::ConsumerImpl& c = s->getConsumer(name); + broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); } @@ -197,9 +212,7 @@ void Connection::sessionState( const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - s->setState( + sessionState().setState( replayStart, sendCommandPoint, sentIncomplete, @@ -207,7 +220,7 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state dump for " << s->getId()); + QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { @@ -234,6 +247,15 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } +broker::QueuedMessage Connection::getDumpMessage() { + // Get a message from the DUMP queue. + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); + if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue")); + broker::QueuedMessage m = dumpQueue->get(); + if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); + return m; +} + void Connection::deliveryRecord(const string& qname, const SequenceNumber& position, const string& tag, @@ -245,15 +267,14 @@ void Connection::deliveryRecord(const string& qname, bool ended, bool windowing) { - broker::QueuedMessage m; broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); - broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); - if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue")); - + broker::QueuedMessage m; if (!ended) { // Has a message - if (acquired) // Message at front of dump queue + if (acquired) { // Message at front of dump queue + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); m = dumpQueue->get(); + } else // Message at original position in original queue m = queue->find(position); if (!m.payload) @@ -266,10 +287,7 @@ void Connection::deliveryRecord(const string& qname, if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - broker::SessionHandler& h = connection.getChannel(currentChannel); - broker::SessionState* s = h.getSession(); - assert(s); - s->record(dr); + semanticState().record(dr); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -286,7 +304,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } - + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 9f75d3dae3..331ac33ab0 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -40,6 +40,13 @@ namespace qpid { namespace framing { class AMQFrame; } +namespace broker { +class SemanticState; +class QueuedMessage; +class TxBuffer; +class TxAccept; +} + namespace cluster { class Cluster; @@ -117,15 +124,17 @@ class Connection : bool windowing); void queuePosition(const std::string&, const framing::SequenceNumber&); - - private: - bool catcUp; + private: bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); void deliverDoOutput(uint32_t requested); void sendDoOutput(); + broker::SessionState& sessionState(); + broker::SemanticState& semanticState(); + broker::QueuedMessage getDumpMessage(); + static NoOpConnectionOutputHandler discardHandler; Cluster& cluster; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 40852a0411..a2860f6f32 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -39,10 +39,12 @@ #include "qpid/framing/ClusterConnectionConsumerStateBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> + namespace qpid { namespace cluster { @@ -103,7 +105,7 @@ void DumpClient::dump() { // Dump exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); -// Dump queue is used to transfer acquired messages that are no longer on their original queue. + // Dump queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true); session.sync(); session.close(); @@ -154,7 +156,7 @@ class MessageDumper { session.exchangeUnbind(queue, DumpClient::DUMP); } - void dump(const broker::QueuedMessage& message) { + void dumpQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); haveLastPos = true; @@ -165,6 +167,10 @@ class MessageDumper { framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames()); } + + void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) { + dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); + } }; @@ -178,7 +184,7 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { arg::autoDelete=q->isAutoDelete(), arg::arguments=q->getSettings()); MessageDumper dumper(q->getName(), session); - q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1)); + q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1)); q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); } @@ -217,11 +223,14 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33. - ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); - ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + QPID_LOG(debug, dumperId << " dumping consumers."); + ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); + + QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); + ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); + // Adjust for command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); - // Adjust for message in progress, will be sent after state update. SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) --received; @@ -274,14 +283,22 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { } void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { - assert(dr.isEnded() || dr.getMessage().payload); + dumpDeliveryRecordMessage(dr); + dumpDeliveryRecord(dr); +} - if (!dr.isEnded() && dr.isAcquired()) { +void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) { + // Dump the message associated with a dr if need be. + if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the // dumpees queue, put it on the dump queue for dumpee to pick up. // - MessageDumper(DUMP, shadowSession).dump(dr.getMessage()); + MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); } +} + +void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) { + // Assumes the associated message has already been dumped (if needed) ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index bb349a39ee..716e7dcc3a 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -44,6 +44,8 @@ class QueueBinding; class QueuedMessage; class SessionHandler; class DeliveryRecord; +class SessionState; +class SemanticState; } // namespace broker @@ -79,8 +81,9 @@ class DumpClient : public sys::Runnable { void dumpSession(broker::SessionHandler& s); void dumpConsumer(const broker::SemanticState::ConsumerImpl*); void dumpUnacked(const broker::DeliveryRecord&); - - private: + void dumpDeliveryRecord(const broker::DeliveryRecord&); + void dumpDeliveryRecordMessage(const broker::DeliveryRecord&); + MemberId dumperId; MemberId dumpeeId; Url dumpeeUrl; diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp index 42d05f71c9..9f072f7b05 100644 --- a/cpp/src/qpid/framing/Array.cpp +++ b/cpp/src/qpid/framing/Array.cpp @@ -28,20 +28,21 @@ namespace qpid { namespace framing { -Array::Array() : typeOctet(0xF0/*void*/) {} +Array::Array() : type(TYPE_CODE_VOID) {} -Array::Array(uint8_t type) : typeOctet(type) {} +Array::Array(TypeCode t) : type(t) {} + +Array::Array(uint8_t t) : type(typeCode(t)) {} Array::Array(const std::vector<std::string>& in) { - typeOctet = 0xA4; + type = TYPE_CODE_STR16; for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { - ValuePtr value(new StringValue(*i)); + ValuePtr value(new Str16Value(*i)); values.push_back(value); } } - uint32_t Array::encodedSize() const { //note: size is only included when used as a 'top level' type uint32_t len(4/*size*/ + 1/*type*/ + 4/*count*/); @@ -55,18 +56,18 @@ int Array::count() const { return values.size(); } -std::ostream& operator<<(std::ostream& out, const Array& t) { - out << "{"; - for(Array::ValueVector::const_iterator i = t.values.begin(); i != t.values.end(); ++i) { - if (i != t.values.begin()) out << ", "; - out << *(i->get()); +std::ostream& operator<<(std::ostream& out, const Array& a) { + out << typeName(a.getType()) << "{"; + for(Array::ValueVector::const_iterator i = a.values.begin(); i != a.values.end(); ++i) { + if (i != a.values.begin()) out << ", "; + (*i)->print(out); } return out << "}"; } void Array::encode(Buffer& buffer) const{ buffer.putLong(encodedSize() - 4);//size added only when array is a top-level type - buffer.putOctet(typeOctet); + buffer.putOctet(type); buffer.putLong(count()); for (ValueVector::const_iterator i = values.begin(); i!=values.end(); ++i) { (*i)->getData().encode(buffer); @@ -81,11 +82,11 @@ void Array::decode(Buffer& buffer){ << size << " bytes but only " << available << " available")); } if (size) { - typeOctet = buffer.getOctet(); + type = TypeCode(buffer.getOctet()); uint32_t count = buffer.getLong(); FieldValue dummy; - dummy.setType(typeOctet); + dummy.setType(type); available = buffer.available(); if (available < count * dummy.getData().encodedSize()) { throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " @@ -95,7 +96,7 @@ void Array::decode(Buffer& buffer){ for (uint32_t i = 0; i < count; i++) { ValuePtr value(new FieldValue); - value->setType(typeOctet); + value->setType(type); value->getData().decode(buffer); values.push_back(ValuePtr(value)); } @@ -104,7 +105,7 @@ void Array::decode(Buffer& buffer){ bool Array::operator==(const Array& x) const { - if (typeOctet != x.typeOctet) return false; + if (type != x.type) return false; if (values.size() != x.values.size()) return false; for (ValueVector::const_iterator i = values.begin(), j = x.values.begin(); i != values.end(); ++i, ++j) { @@ -114,12 +115,13 @@ bool Array::operator==(const Array& x) const { return true; } -void Array::add(ValuePtr value) -{ - if (typeOctet != value->getType()) { - throw IllegalArgumentException(QPID_MSG("Wrong type of value in Array, expected " << typeOctet << " but found " << value->getType())); +void Array::insert(iterator i, ValuePtr value) { + if (type != value->getType()) { + // FIXME aconway 2008-10-31: put meaningful strings in this message. + throw Exception(QPID_MSG("Wrong type of value in Array, expected " << type + << " but found " << TypeCode(value->getType()))); } - values.push_back(value); + values.insert(i, value); } diff --git a/cpp/src/qpid/framing/Array.h b/cpp/src/qpid/framing/Array.h index d3ca04dd1d..183fcb6d5c 100644 --- a/cpp/src/qpid/framing/Array.h +++ b/cpp/src/qpid/framing/Array.h @@ -18,12 +18,12 @@ * under the License. * */ -#include <iostream> -#include <vector> -#include <boost/shared_ptr.hpp> -#include <map> #include "amqp_types.h" #include "FieldValue.h" +#include "qpid/framing/TypeCode.h" +#include <boost/shared_ptr.hpp> +#include <iostream> +#include <vector> #ifndef _Array_ #define _Array_ @@ -38,6 +38,8 @@ class Array public: typedef boost::shared_ptr<FieldValue> ValuePtr; typedef std::vector<ValuePtr> ValueVector; + typedef ValueVector::const_iterator const_iterator; + typedef ValueVector::iterator iterator; uint32_t encodedSize() const; void encode(Buffer& buffer) const; @@ -47,11 +49,30 @@ class Array bool operator==(const Array& other) const; Array(); + Array(TypeCode type); Array(uint8_t type); //creates a longstr array Array(const std::vector<std::string>& in); - void add(ValuePtr value); + TypeCode getType() const { return type; } + + // std collection interface. + const_iterator begin() const { return values.begin(); } + const_iterator end() const { return values.end(); } + iterator begin() { return values.begin(); } + iterator end(){ return values.end(); } + + ValuePtr front() const { return values.front(); } + ValuePtr back() const { return values.back(); } + size_t size() const { return values.size(); } + + void insert(iterator i, ValuePtr value); + void erase(iterator i) { values.erase(i); } + void push_back(ValuePtr value) { values.insert(end(), value); } + void pop_back() { values.pop_back(); } + + // Non-std interface + void add(ValuePtr value) { push_back(value); } template <class T> void collect(std::vector<T>& out) const @@ -60,12 +81,9 @@ class Array out.push_back((*i)->get<T>()); } } - - ValueVector::const_iterator begin() const { return values.begin(); } - ValueVector::const_iterator end() const { return values.end(); } private: - uint8_t typeOctet; + TypeCode type; ValueVector values; friend std::ostream& operator<<(std::ostream& out, const Array& body); diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp index 9107ceeeea..ecf469236d 100644 --- a/cpp/src/qpid/framing/FieldValue.cpp +++ b/cpp/src/qpid/framing/FieldValue.cpp @@ -109,10 +109,10 @@ bool FieldValue::operator==(const FieldValue& v) const *data == *v.data; } -StringValue::StringValue(const std::string& v) : +Str8Value::Str8Value(const std::string& v) : FieldValue( - 0xA4, - new VariableWidthValue<4>( + TYPE_CODE_STR8, + new VariableWidthValue<1>( reinterpret_cast<const uint8_t*>(v.data()), reinterpret_cast<const uint8_t*>(v.data()+v.size()))) { @@ -168,4 +168,13 @@ ArrayValue::ArrayValue(const Array& a) : FieldValue(0xaa, new EncodedValue<Array { } +void FieldValue::print(std::ostream& out) const { + data->print(out); + out << TypeCode(typeOctet) << '('; + if (data->convertsToString()) out << data->getString(); + else if (data->convertsToInt()) out << data->getInt(); + else data->print(out); + out << ')'; +} + }} diff --git a/cpp/src/qpid/framing/FieldValue.h b/cpp/src/qpid/framing/FieldValue.h index 68081c5674..4f78d7f0f2 100644 --- a/cpp/src/qpid/framing/FieldValue.h +++ b/cpp/src/qpid/framing/FieldValue.h @@ -89,7 +89,8 @@ class FieldValue { void decode(Buffer& buffer); bool operator==(const FieldValue&) const; bool operator!=(const FieldValue& v) const { return !(*this == v); } - void print(std::ostream& out) const { out << "(0x" << std::hex << int(typeOctet) << ")"; data->print(out); } + + void print(std::ostream& out) const; template <typename T> bool convertsTo() const { return false; } template <typename T> T get() const { throw InvalidConversionException(); } @@ -239,12 +240,9 @@ class EncodedValue : public FieldValue::Data { void print(std::ostream& o) const { o << "[" << value << "]"; }; }; -/* - * Basic string value encodes as iso-8859-15 with 32 bit length - */ -class StringValue : public FieldValue { +class Str8Value : public FieldValue { public: - StringValue(const std::string& v); + Str8Value(const std::string& v); }; class Str16Value : public FieldValue { diff --git a/cpp/src/tests/FieldValue.cpp b/cpp/src/tests/FieldValue.cpp index eacf098034..448f068107 100644 --- a/cpp/src/tests/FieldValue.cpp +++ b/cpp/src/tests/FieldValue.cpp @@ -24,17 +24,17 @@ QPID_AUTO_TEST_SUITE(FieldValueTestSuite) using namespace qpid::framing; -StringValue s("abc"); +Str16Value s("abc"); IntegerValue i(42); //DecimalValue d(1234,2); //FieldTableValue ft; //EmptyValue e; -QPID_AUTO_TEST_CASE(testStringValueEquals) +QPID_AUTO_TEST_CASE(testStr16ValueEquals) { - BOOST_CHECK(StringValue("abc") == s); - BOOST_CHECK(StringValue("foo") != s); + BOOST_CHECK(Str16Value("abc") == s); + BOOST_CHECK(Str16Value("foo") != s); BOOST_CHECK(s != i); BOOST_CHECK(s.convertsTo<std::string>() == true); BOOST_CHECK(s.convertsTo<int>() == false); diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index eeedbf5ec5..d8f366009d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -216,6 +216,51 @@ class Sender { uint16_t channel; }; +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) { + ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at index 1 + Client c0(cluster[0], "c0"); + c0.session.queueDeclare(arg::queue="q"); + c0.session.messageTransfer(arg::content=Message("A", "q")); + c0.session.messageTransfer(arg::content=Message("B", "q")); + + // Start a transaction that will commit. + Session commitSession = c0.connection.newSession("commit"); + SubscriptionManager commitSubs(commitSession); + commitSession.txSelect(); + commitSession.messageTransfer(arg::content=Message("a", "q")); + commitSession.messageTransfer(arg::content=Message("b", "q")); + BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A"); + + // Start a transaction that will roll back. + Session rollbackSession = c0.connection.newSession("rollback"); + SubscriptionManager rollbackSubs(rollbackSession); + rollbackSession.txSelect(); + rollbackSession.messageTransfer(arg::content=Message("1", "q")); + BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B"); + + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); + // Add new member mid transaction. + cluster.add(); + Client c1(cluster[1], "c1"); + + // More transactional work + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + rollbackSession.messageTransfer(arg::content=Message("2", "q")); + commitSession.messageTransfer(arg::content=Message("c", "q")); + rollbackSession.messageTransfer(arg::content=Message("3", "q")); + + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + // Commit/roll back. + commitSession.txCommit(); + rollbackSession.txRollback(); + // Verify queue status: just the comitted messages + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c"); +} + QPID_AUTO_TEST_CASE(testUnsupported) { ScopedSuppressLogging sl; ClusterFixture cluster(1); |