summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/TxReplicator.h
blob: 9d80ecb8d37049d60ab8c25aea176c25d42cca80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#ifndef QPID_HA_TRANSACTIONREPLICATOR_H
#define QPID_HA_TRANSACTIONREPLICATOR_H

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "QueueReplicator.h"
#include "Event.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/sys/Mutex.h"

namespace qpid {

namespace broker {
class TxBuffer;
class TxAccept;
class DtxBuffer;
class Broker;
class MessageStore;
class Deliverable;
}

namespace ha {
class BrokerReplicator;

/**
 * Exchange created on a backup broker to replicate a transaction on the primary.
 *
 * Subscribes to a tx-queue like a normal queue but puts replicated messages and
 * transaction events into a local TxBuffer.
 *
 * THREAD SAFE: Called in different connection threads.
 */
class TxReplicator : public QueueReplicator {
  public:
    typedef boost::shared_ptr<broker::Queue> QueuePtr;
    typedef boost::shared_ptr<broker::Link> LinkPtr;

    static bool isTxQueue(const std::string& queue);
    static std::string getTxId(const std::string& queue);

    TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
    ~TxReplicator();

    std::string getType() const;

    // QueueReplicator overrides
    void route(broker::Deliverable& deliverable);
    void destroy();

  protected:

    void deliver(const broker::Message&);

  private:

    typedef void (TxReplicator::*DispatchFunction)(
        const std::string&, sys::Mutex::ScopedLock&);
    typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
    typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;

    void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&);
    void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
    void dequeue(const std::string& data, sys::Mutex::ScopedLock&);
    void prepare(const std::string& data, sys::Mutex::ScopedLock&);
    void commit(const std::string& data, sys::Mutex::ScopedLock&);
    void rollback(const std::string& data, sys::Mutex::ScopedLock&);
    void backups(const std::string& data, sys::Mutex::ScopedLock&);
    void end(sys::Mutex::ScopedLock&);

    std::string logPrefix;
    TxEnqueueEvent enq;         // Enqueue data for next deliver.
    boost::intrusive_ptr<broker::TxBuffer> txBuffer;
    broker::MessageStore* store;
    std::auto_ptr<broker::TransactionContext> context;
    framing::ChannelId channel; // Channel to send prepare-complete.
    bool empty, ended;

    // Class to process dequeues and create DeliveryRecords to populate a
    // TxAccept.
    class DequeueState {
      public:
        DequeueState(broker::QueueRegistry& qr) : queues(qr) {}
        void add(const TxDequeueEvent&);
        boost::shared_ptr<broker::TxAccept> makeAccept();

      private:
        // Delivery record IDs are command IDs from the session.
        // On a backup we will just fake these Ids.
        typedef framing::SequenceNumber Id;
        typedef framing::SequenceSet IdSet;
        typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap;

        bool addRecord(const broker::Message& m,
                       const boost::shared_ptr<broker::Queue>&,
                       const ReplicationIdSet& );
        void addRecords(const DequeueMap::value_type& entry);

        broker::QueueRegistry& queues;
        EventMap events;
        broker::DeliveryRecords records;
        broker::QueueCursor cursor;
        framing::SequenceNumber nextId;
        IdSet recordIds;
    };
    DequeueState dequeueState;
};


}} // namespace qpid::ha

#endif  /*!QPID_HA_TRANSACTIONREPLICATOR_H*/