diff options
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/broker/DeliveryRecord.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/broker/DeliveryRecord.h | 3 | ||||
-rw-r--r-- | cpp/lib/broker/TxAck.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/broker/TxAck.h | 4 | ||||
-rw-r--r-- | cpp/lib/broker/TxPublish.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/broker/TxPublish.h | 3 | ||||
-rw-r--r-- | cpp/tests/TxAckTest.cpp | 31 | ||||
-rw-r--r-- | cpp/tests/TxPublishTest.cpp | 27 |
9 files changed, 61 insertions, 27 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index e5b8336b25..8805b72774 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -204,7 +204,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ throw InvalidAckException(); }else if(multiple){ ack_iterator end = ++i; - for_each(unacked.begin(), end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), static_cast<qpid::broker::TransactionContext*>(0))); + for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: diff --git a/cpp/lib/broker/DeliveryRecord.cpp b/cpp/lib/broker/DeliveryRecord.cpp index 9d02cb615e..19b01cc312 100644 --- a/cpp/lib/broker/DeliveryRecord.cpp +++ b/cpp/lib/broker/DeliveryRecord.cpp @@ -42,8 +42,12 @@ DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, pull(true){} -void DeliveryRecord::discard(TransactionContext* ctxt) const{ - queue->dequeue(ctxt, msg, 0); +void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{ + queue->dequeue(ctxt, msg, xid); +} + +void DeliveryRecord::discard() const{ + discard(0, 0); } bool DeliveryRecord::matches(u_int64_t tag) const{ diff --git a/cpp/lib/broker/DeliveryRecord.h b/cpp/lib/broker/DeliveryRecord.h index c1c8d6d13c..01a4024b28 100644 --- a/cpp/lib/broker/DeliveryRecord.h +++ b/cpp/lib/broker/DeliveryRecord.h @@ -46,7 +46,8 @@ namespace qpid { DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const u_int64_t deliveryTag); DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag); - void discard(TransactionContext* ctxt = 0) const; + void discard() const; + void discard(TransactionContext* ctxt, const std::string* const xid) const; bool matches(u_int64_t tag) const; bool coveredBy(const AccumulatedAck* const range) const; void requeue() const; diff --git a/cpp/lib/broker/TxAck.cpp b/cpp/lib/broker/TxAck.cpp index 2b55b81c58..b5211158f3 100644 --- a/cpp/lib/broker/TxAck.cpp +++ b/cpp/lib/broker/TxAck.cpp @@ -25,7 +25,8 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){ +TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) : + acked(_acked), unacked(_unacked), xid(_xid){ } @@ -34,10 +35,9 @@ bool TxAck::prepare(TransactionContext* ctxt) throw(){ //dequeue all acked messages from their queues for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { if (i->coveredBy(&acked)) { - i->discard(ctxt); + i->discard(ctxt, xid); } } - //for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked)); return true; }catch(...){ std::cout << "TxAck::prepare() - Failed to prepare" << std::endl; diff --git a/cpp/lib/broker/TxAck.h b/cpp/lib/broker/TxAck.h index d6ff8fea9c..88c321c445 100644 --- a/cpp/lib/broker/TxAck.h +++ b/cpp/lib/broker/TxAck.h @@ -37,13 +37,15 @@ namespace qpid { class TxAck : public TxOp{ AccumulatedAck& acked; std::list<DeliveryRecord>& unacked; + const std::string* const xid; + public: /** * @param acked a representation of the accumulation of * acks received * @param unacked the record of delivered messages */ - TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/lib/broker/TxPublish.cpp b/cpp/lib/broker/TxPublish.cpp index 0de5fbb200..49dd8abd89 100644 --- a/cpp/lib/broker/TxPublish.cpp +++ b/cpp/lib/broker/TxPublish.cpp @@ -22,11 +22,11 @@ using namespace qpid::broker; -TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {} +TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {} bool TxPublish::prepare(TransactionContext* ctxt) throw(){ try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0)); + for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid)); return true; }catch(...){ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; diff --git a/cpp/lib/broker/TxPublish.h b/cpp/lib/broker/TxPublish.h index 2756addab7..75f201257e 100644 --- a/cpp/lib/broker/TxPublish.h +++ b/cpp/lib/broker/TxPublish.h @@ -60,10 +60,11 @@ namespace qpid { }; Message::shared_ptr msg; + const std::string* const xid; std::list<Queue::shared_ptr> queues; public: - TxPublish(Message::shared_ptr msg); + TxPublish(Message::shared_ptr msg, const std::string* const xid = 0); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index 6832c5995b..0ffe984ded 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -37,11 +37,11 @@ class TxAckTest : public CppUnit::TestCase class TestMessageStore : public NullMessageStore { public: - vector<Message*> dequeued; + vector< std::pair<Message*, const string*> > dequeued; - void dequeue(TransactionContext*, Message* const msg, const Queue& /*queue*/, const string * const /*xid*/) + void dequeue(TransactionContext*, Message* const msg, const Queue& /*queue*/, const string * const xid) { - dequeued.push_back(msg); + dequeued.push_back(std::pair<Message*, const string*>(msg, xid)); } TestMessageStore() : NullMessageStore(false) {} @@ -49,6 +49,7 @@ class TxAckTest : public CppUnit::TestCase }; CPPUNIT_TEST_SUITE(TxAckTest); + CPPUNIT_TEST(testPrepare2pc); CPPUNIT_TEST(testPrepare); CPPUNIT_TEST(testCommit); CPPUNIT_TEST_SUITE_END(); @@ -60,11 +61,12 @@ class TxAckTest : public CppUnit::TestCase vector<Message::shared_ptr> messages; list<DeliveryRecord> deliveries; TxAck op; + std::string xid; public: - TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) + TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid) { for(int i = 0; i < 10; i++){ Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); @@ -86,13 +88,20 @@ public: op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); - CPPUNIT_ASSERT_EQUAL(messages[0].get(), store.dequeued[0]);//msg 1 - CPPUNIT_ASSERT_EQUAL(messages[1].get(), store.dequeued[1]);//msg 2 - CPPUNIT_ASSERT_EQUAL(messages[2].get(), store.dequeued[2]);//msg 3 - CPPUNIT_ASSERT_EQUAL(messages[3].get(), store.dequeued[3]);//msg 4 - CPPUNIT_ASSERT_EQUAL(messages[4].get(), store.dequeued[4]);//msg 5 - CPPUNIT_ASSERT_EQUAL(messages[6].get(), store.dequeued[5]);//msg 7 - CPPUNIT_ASSERT_EQUAL(messages[8].get(), store.dequeued[6]);//msg 9 + int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; + for (int i = 0; i < 7; i++) { + CPPUNIT_ASSERT_EQUAL(messages[dequeued[i]].get(), store.dequeued[i].first); + } + } + + void testPrepare2pc() + { + xid = "abcdefg"; + testPrepare(); + const string expected(xid); + for (int i = 0; i < 7; i++) { + CPPUNIT_ASSERT_EQUAL(expected, *store.dequeued[i].second); + } } void testCommit() diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index d33d84ec6e..3542e08f45 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -34,15 +34,22 @@ using namespace qpid::framing; class TxPublishTest : public CppUnit::TestCase { + struct Triple + { + string first; + Message* second; + const string* third; + }; class TestMessageStore : public NullMessageStore { public: - vector< pair<string, Message*> > enqueued; + vector<Triple> enqueued; - void enqueue(TransactionContext*, Message* const msg, const Queue& queue, const string * const /*xid*/) + void enqueue(TransactionContext*, Message* const msg, const Queue& queue, const string * const xid) { - enqueued.push_back(pair<string, Message*>(queue.getName(),msg)); + Triple args = {queue.getName(), msg, xid}; + enqueued.push_back(args); } //dont care about any of the other methods: @@ -52,6 +59,7 @@ class TxPublishTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(TxPublishTest); CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testPrepare2pc); CPPUNIT_TEST(testCommit); CPPUNIT_TEST_SUITE_END(); @@ -61,14 +69,14 @@ class TxPublishTest : public CppUnit::TestCase Queue::shared_ptr queue2; Message::shared_ptr const msg; TxPublish op; - + string xid; public: TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), msg(new Message(0, "exchange", "routing_key", false, false)), - op(msg) + op(msg, &xid) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); @@ -87,6 +95,15 @@ public: CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[1].second); } + void testPrepare2pc() + { + xid = "abcde"; + const string expected(xid); + testPrepare(); + CPPUNIT_ASSERT_EQUAL(expected, *store.enqueued[0].third); + CPPUNIT_ASSERT_EQUAL(expected, *store.enqueued[1].third); + } + void testCommit() { //ensure messages are delivered to queue |