diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 91 |
3 files changed, 61 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index fe388c2fe9..243d089ccb 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -152,7 +152,9 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe buffer.getShortString(type); buffer.get(args); - return exchanges.declare(name, type, durable, args).first; + Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first; + exch->sequenceNo = buffer.getInt64(); + return exch; } void Exchange::encode(Buffer& buffer) const @@ -161,6 +163,7 @@ void Exchange::encode(Buffer& buffer) const buffer.putOctet(durable); buffer.putShortString(getType()); buffer.put(args); + buffer.putInt64(sequenceNo); } uint32_t Exchange::encodedSize() const @@ -168,7 +171,8 @@ uint32_t Exchange::encodedSize() const return name.size() + 1/*short string size*/ + 1 /*durable*/ + getType().size() + 1/*short string size*/ - + args.encodedSize(); + + args.encodedSize() + + 8; /*int64 */ } ManagementObject* Exchange::GetManagementObject (void) const diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 50410b6e06..05b465d9df 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -50,7 +50,7 @@ private: protected: bool sequence; mutable qpid::sys::Mutex sequenceLock; - uint64_t sequenceNo; + int64_t sequenceNo; bool ive; boost::intrusive_ptr<Message> lastMsg; diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index c2f6078b33..0946d3115d 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -179,48 +179,65 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { FieldTable args; args.setInt("qpid.msg_sequence",1); - - DirectExchange direct("direct1", false, args); - - intrusive_ptr<Message> msg1 = cmessage("e", "A"); - intrusive_ptr<Message> msg2 = cmessage("e", "B"); - intrusive_ptr<Message> msg3 = cmessage("e", "C"); + char* buff = new char[10000]; + framing::Buffer buffer(buff,10000); + { + DirectExchange direct("direct1", false, args); - DeliverableMessage dmsg1(msg1); - DeliverableMessage dmsg2(msg2); - DeliverableMessage dmsg3(msg3); + intrusive_ptr<Message> msg1 = cmessage("e", "A"); + intrusive_ptr<Message> msg2 = cmessage("e", "B"); + intrusive_ptr<Message> msg3 = cmessage("e", "C"); - direct.route(dmsg1, "abc", 0); - direct.route(dmsg2, "abc", 0); - direct.route(dmsg3, "abc", 0); + DeliverableMessage dmsg1(msg1); + DeliverableMessage dmsg2(msg2); + DeliverableMessage dmsg3(msg3); - BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - - FanOutExchange fanout("fanout1", false, args); - HeadersExchange header("headers1", false, args); - TopicExchange topic ("topic1", false, args); - - // check other exchanges, that they preroute - intrusive_ptr<Message> msg4 = cmessage("e", "A"); - intrusive_ptr<Message> msg5 = cmessage("e", "B"); - intrusive_ptr<Message> msg6 = cmessage("e", "C"); - - DeliverableMessage dmsg4(msg4); - DeliverableMessage dmsg5(msg5); - DeliverableMessage dmsg6(msg6); - - fanout.route(dmsg4, "abc", 0); - BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - - FieldTable headers; - header.route(dmsg5, "abc", &headers); - BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + direct.route(dmsg1, "abc", 0); + direct.route(dmsg2, "abc", 0); + direct.route(dmsg3, "abc", 0); + + BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + + FanOutExchange fanout("fanout1", false, args); + HeadersExchange header("headers1", false, args); + TopicExchange topic ("topic1", false, args); + + // check other exchanges, that they preroute + intrusive_ptr<Message> msg4 = cmessage("e", "A"); + intrusive_ptr<Message> msg5 = cmessage("e", "B"); + intrusive_ptr<Message> msg6 = cmessage("e", "C"); + + DeliverableMessage dmsg4(msg4); + DeliverableMessage dmsg5(msg5); + DeliverableMessage dmsg6(msg6); - topic.route(dmsg6, "abc", 0); - BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + fanout.route(dmsg4, "abc", 0); + BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + FieldTable headers; + header.route(dmsg5, "abc", &headers); + BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + + topic.route(dmsg6, "abc", 0); + BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + direct.encode(buffer); + } + { + + ExchangeRegistry exchanges; + buffer.reset(); + DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer); + + intrusive_ptr<Message> msg1 = cmessage("e", "A"); + DeliverableMessage dmsg1(msg1); + exch_dec->route(dmsg1, "abc", 0); + + BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + + } + delete [] buff; } QPID_AUTO_TEST_CASE(testIVEOption) |