/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "unit_test.h" #include "test_tools.h" #include "MessagingFixture.h" #include "qpid/broker/BrokerObserver.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/broker/TxBuffer.h" #include "qpid/broker/Queue.h" #include "qpid/ha/types.h" #include #include #include #include #include namespace qpid { namespace tests { using framing::SequenceSet; using messaging::Message; using boost::shared_ptr; using namespace boost::assign; using namespace boost; using namespace broker; using namespace std; using namespace messaging; using namespace types; QPID_AUTO_TEST_SUITE(TransactionalObserverTest) Message msg(string content) { return Message(content); } struct MockTransactionObserver : public TransactionObserver { bool prep; vector events; MockTransactionObserver(bool prep_=true) : prep(prep_) {} void record(const string& e) { events.push_back(e); } void enqueue(const shared_ptr& q, const broker::Message& m) { record("enqueue "+q->getName()+" "+m.getContent()); } void dequeue(const Queue::shared_ptr& q, SequenceNumber p, SequenceNumber r) { record("dequeue "+q->getName()+" "+ lexical_cast(p)+" "+lexical_cast(r)); } bool prepare() { record("prepare"); return prep; } void commit() { record("commit"); } void rollback() {record("rollback"); } }; struct MockBrokerObserver : public BrokerObserver { bool prep; shared_ptr tx; MockBrokerObserver(bool prep_=true) : prep(prep_) {} void startTx(const shared_ptr& buffer) { tx.reset(new MockTransactionObserver(prep)); buffer->setObserver(tx); } }; Session simpleTxTransaction(MessagingFixture& fix) { fix.session.createSender("q1;{create:always}").send(msg("foo")); // Not in TX // Transaction with 1 enqueue and 1 dequeue. Session txSession = fix.connection.createTransactionalSession(); BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent()); txSession.acknowledge(); txSession.createSender("q2;{create:always}").send(msg("bar")); return txSession; } QPID_AUTO_TEST_CASE(tesTxtCommit) { MessagingFixture fix; shared_ptr brokerObserver(new MockBrokerObserver); fix.broker->getBrokerObservers().add(brokerObserver); Session txSession = simpleTxTransaction(fix); txSession.commit(); // Note on ordering: observers see enqueues as they happen, but dequeues just // before prepare. BOOST_CHECK_EQUAL( list_of("enqueue q2 bar")("dequeue q1 1 0")("prepare")("commit"), brokerObserver->tx->events ); } QPID_AUTO_TEST_CASE(testTxFail) { MessagingFixture fix; shared_ptr brokerObserver(new MockBrokerObserver(false)); fix.broker->getBrokerObservers().add(brokerObserver); Session txSession = simpleTxTransaction(fix); try { txSession.commit(); BOOST_FAIL("Expected exception"); } catch(...) {} BOOST_CHECK_EQUAL( list_of("enqueue q2 bar")("dequeue q1 1 0")("prepare")("rollback"), brokerObserver->tx->events ); } QPID_AUTO_TEST_CASE(testTxRollback) { MessagingFixture fix; shared_ptr brokerObserver(new MockBrokerObserver(false)); fix.broker->getBrokerObservers().add(brokerObserver); Session txSession = simpleTxTransaction(fix); txSession.rollback(); // Note: The dequeue does not appear here. This is because TxAccepts // (i.e. dequeues) are not enlisted until SemanticState::commit and are // never enlisted if the transaction is rolled back. BOOST_CHECK_EQUAL( list_of("enqueue q2 bar")("rollback"), brokerObserver->tx->events ); } QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests