summaryrefslogtreecommitdiff
path: root/include/distributable_counter.h
blob: eb309f6819cce7a9115fb4548d417a51facdffc6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/*****************************************************************************

Copyright (c) 2021 MariaDB Corporation.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA

*****************************************************************************/

#ifndef DISTRIBUTABLE_COUNTER_H
#define DISTRIBUTABLE_COUNTER_H

#include <cassert>
#include <cstddef>

#include <array>
#include <atomic>
#include <mutex>

#include "ilist.h"
#include "my_attribute.h"

namespace detail
{

// Represents strong memory model: atomical increments
template <typename Integral> class strong_bumper
{
public:
  strong_bumper(std::atomic<Integral> &value) noexcept : value_(value) {}

  void operator+=(Integral amount) noexcept
  {
    value_.fetch_add(amount, std::memory_order_relaxed);
  }
  void operator-=(Integral amount) noexcept
  {
    value_.fetch_sub(amount, std::memory_order_relaxed);
  }
  void operator++() noexcept { *this+= 1; }
  void operator++(int) noexcept { *this+= 1; }
  void operator--() noexcept { *this-= 1; }
  void operator--(int) noexcept { *this-= 1; }

private:
  std::atomic<Integral> &value_;
};

// Represents weak memory model: non-atomical but still race-free increments
template <typename Integral> class weak_bumper
{
public:
  weak_bumper(std::atomic<Integral> &value) noexcept : value_(value) {}

  void operator+=(Integral amount) noexcept
  {
    value_.store(value_.load(std::memory_order_relaxed) + amount,
                 std::memory_order_relaxed);
  }
  void operator-=(Integral amount) noexcept
  {
    value_.store(value_.load(std::memory_order_relaxed) - amount,
                 std::memory_order_relaxed);
  }
  void operator++() noexcept { *this+= 1; }
  void operator++(int) noexcept { *this+= 1; }
  void operator--() noexcept { *this-= 1; }
  void operator--(int) noexcept { *this-= 1; }

private:
  std::atomic<Integral> &value_;
};

}; // namespace detail

template <typename Integral, size_t Size> class counter_broker_array;

// This class is supposed to be used as a main (possibly global) thread-safe
// collection of counters. In a nutshell it's just an array of std::atomics.
// Counters can be incremented directly but when contention becomes a problem,
// this counter can be distributed via counter_broker_array.
template <typename Integral, size_t Size> class distributable_counter_array
{
  distributable_counter_array(const distributable_counter_array &)= delete;
  distributable_counter_array &
  operator=(const distributable_counter_array &)= delete;
  distributable_counter_array(distributable_counter_array &&)= delete;
  distributable_counter_array &
  operator=(distributable_counter_array &&)= delete;

public:
  using size_type= std::size_t;

  distributable_counter_array() noexcept
  {
    for (auto &counter : counters_)
      counter.store(0, std::memory_order_relaxed);
  }

  __attribute__((warn_unused_result)) detail::strong_bumper<Integral>
  operator[](size_type idx) noexcept
  {
    assert(idx < size());
    return detail::strong_bumper<Integral>(counters_[idx]);
  }

  __attribute__((warn_unused_result)) size_type size() const noexcept
  {
    return counters_.size();
  }
  __attribute__((warn_unused_result)) Integral load(size_type idx);
  Integral exchange(size_type idx, Integral to);

private:
  std::array<std::atomic<Integral>, Size> counters_;
  ilist<counter_broker_array<Integral, Size>> brokers_; // guarded by mutex_
  std::mutex mutex_;

  friend counter_broker_array<Integral, Size>;
};

// This class is always bound to some distributable_counter_array and
// represents a shard of data for it. Only write operations are allowed for
// this class. Read the values from distributable_counter_array when needed.
// Typically this class should be a 'thread_local' or even a local variable.
template <typename Integral, size_t Size>
class counter_broker_array : public ilist_node<>
{
  counter_broker_array(const counter_broker_array &)= delete;
  counter_broker_array &operator=(const counter_broker_array &)= delete;
  counter_broker_array(counter_broker_array &&)= delete;
  counter_broker_array &operator=(counter_broker_array &&)= delete;

public:
  using size_type= std::size_t;

  counter_broker_array(distributable_counter_array<Integral, Size> &array)
      : base_(array)
  {
    for (auto &counter : counters_)
      counter.store(0, std::memory_order_relaxed);

    std::lock_guard<std::mutex> _(array.mutex_);
    array.brokers_.push_back(*this);
  }

  ~counter_broker_array() noexcept
  {
    // A reader of a distributable_counter_array may access this object while
    // it's being destroyed. To prevent a double sum of a counter we use an
    // exchange() here.
    for (size_type i= 0; i < size(); i++)
      base_[i]+= counters_[i].exchange(0, std::memory_order_relaxed);

    std::lock_guard<std::mutex> _(base_.mutex_);
    base_.brokers_.remove(*this);
  }

  __attribute__((warn_unused_result)) detail::weak_bumper<Integral>
  operator[](size_type idx) noexcept
  {
    assert(idx < size());
    return detail::weak_bumper<Integral>(counters_[idx]);
  }

  __attribute__((warn_unused_result)) size_type size() const noexcept
  {
    return counters_.size();
  }

private:
  std::array<std::atomic<Integral>, Size> counters_;
  distributable_counter_array<Integral, Size> &base_;

  friend class distributable_counter_array<Integral, Size>;
};

template <typename Integral, size_t Size>
Integral distributable_counter_array<Integral, Size>::load(size_type idx)
{
  assert(idx < size());

  Integral accumulator= 0;
  {
    std::lock_guard<std::mutex> _(mutex_);
    for (const auto &broker : brokers_)
      accumulator+= broker.counters_[idx].load(std::memory_order_relaxed);
  }
  return accumulator + counters_[idx].load(std::memory_order_relaxed);
}

template <typename Integral, size_t Size>
Integral distributable_counter_array<Integral, Size>::exchange(size_type idx,
                                                               Integral to)
{
  assert(idx < size());

  Integral accumulator= 0;
  {
    std::lock_guard<std::mutex> _(mutex_);
    for (const auto &broker : brokers_)
      accumulator+=
          broker.counters_[idx].exchange(0, std::memory_order_relaxed);
  }

  return accumulator + counters_[idx].exchange(to, std::memory_order_relaxed);
}

// Uses TLS to automatically distribute counter over any number of threads.
// Writing is a weak atomical increment.
// Reading of values is O(N) where N is a number of threads.
// Thus, counter is optimized for writing and pessimized for reading.
template <typename Integral, size_t Size> class singleton_counter_array
{
public:
  __attribute__((warn_unused_result)) detail::weak_bumper<Integral>
  operator[](size_t idx)
  {
    assert(idx < Size);
    return local()[idx];
  }

  __attribute__((warn_unused_result)) Integral load(size_t idx)
  {
    return global_.load(idx);
  }
  Integral exchange(size_t idx, Integral to)
  {
    return global_.exchange(idx, to);
  }

private:
  counter_broker_array<Integral, Size> &local()
  {
    // Meyers' singleton ensures that the broker will be initialized on the
    // first access and thus will not slow down thread creation.
    thread_local counter_broker_array<Integral, Size> broker(global_);
    return broker;
  }

  distributable_counter_array<Integral, Size> global_;
};

#endif