summaryrefslogtreecommitdiff
path: root/src/test/bench/dumb_backend.h
blob: bbf8650add9ec9187683dd1df57ab51e3941d205 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-

#ifndef DUMBBACKEND
#define DUMBBACKEND

#include "backend.h"
#include "include/Context.h"
#include "os/ObjectStore.h"
#include "common/WorkQueue.h"
#include "common/Semaphore.h"

#include <deque>

class DumbBackend : public Backend {
	const string path;

  struct write_item {
    const string oid;
    bufferlist bl;
    uint64_t offset;
    Context *on_applied;
    Context *on_commit;
    write_item(
      const string &oid,
      const bufferlist &bl,
      uint64_t offset,
      Context *on_applied,
      Context *on_commit) :
      oid(oid), bl(bl), offset(offset), on_applied(on_applied),
      on_commit(on_commit) {}
  };

  Semaphore sem;

  bool do_fsync;
  bool do_sync_file_range;
  bool do_fadvise;
  unsigned sync_interval;
  int sync_fd;
  ThreadPool tp;

  class SyncThread : public Thread {
    DumbBackend *backend;
  public:
    SyncThread(DumbBackend *backend) : backend(backend) {}
    void *entry() {
      backend->sync_loop();
      return 0;
    }
  } thread;
  friend class SyncThread;

  Mutex sync_loop_mutex;
  Cond sync_loop_cond;
  int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
  void sync_loop();

  Mutex pending_commit_mutex;
  set<Context*> pending_commits;

  class WriteQueue : public ThreadPool::WorkQueue<write_item> {
    deque<write_item*> item_queue;
    DumbBackend *backend;

  public:
    WriteQueue(
      DumbBackend *backend,
      time_t ti,
      ThreadPool *tp) :
      ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
      backend(backend) {}
    bool _enqueue(write_item *item) {
      item_queue.push_back(item);
      return true;
    }
    void _dequeue(write_item*) { assert(0); }
    write_item *_dequeue() {
      if (item_queue.empty())
	return 0;
      write_item *retval = item_queue.front();
      item_queue.pop_front();
      return retval;
    }
    bool _empty() {
      return item_queue.empty();
    }
    void _process(write_item *item) {
      return backend->_write(
	item->oid,
	item->offset,
	item->bl,
	item->on_applied,
	item->on_commit);
      delete item;
    }
    void _clear() {
      return item_queue.clear();
    }
  } queue;
  friend class WriteQueue;

  string get_full_path(const string &oid);

  void _write(
    const string &oid,
    uint64_t offset,
    const bufferlist &bl,
    Context *on_applied,
    Context *on_commit);

public:
  DumbBackend(
    const string &path,
    bool do_fsync,
    bool do_sync_file_range,
    bool do_fadvise,
    unsigned sync_interval,
    int sync_fd,
    unsigned worker_threads,
    CephContext *cct)
    : path(path), do_fsync(do_fsync),
      do_sync_file_range(do_sync_file_range),
      do_fadvise(do_fadvise),
      sync_interval(sync_interval),
      sync_fd(sync_fd),
      tp(cct, "DumbBackend::tp", worker_threads),
      thread(this),
      sync_loop_mutex("DumbBackend::sync_loop_mutex"),
      sync_loop_stop(0),
      pending_commit_mutex("DumbBackend::pending_commit_mutex"),
      queue(this, 20, &tp) {
    thread.create();
    tp.start();
    for (unsigned i = 0; i < 10*worker_threads; ++i) {
      sem.Put();
    }
  }
  ~DumbBackend() {
    {
      Mutex::Locker l(sync_loop_mutex);
      if (sync_loop_stop == 0)
	sync_loop_stop = 1;
      while (sync_loop_stop < 2)
	sync_loop_cond.Wait(sync_loop_mutex);
    }
    tp.stop();
    thread.join();
  }
  void write(
    const string &oid,
    uint64_t offset,
    const bufferlist &bl,
    Context *on_applied,
    Context *on_commit) {
    sem.Get();
    queue.queue(
      new write_item(
	oid, bl, offset, on_applied, on_commit));
  }

  void read(
    const string &oid,
    uint64_t offset,
    uint64_t length,
    bufferlist *bl,
    Context *on_complete);
};

#endif