1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_MDS_H
#define CEPH_MDS_H
#include "mdstypes.h"
#include "msg/Dispatcher.h"
#include "include/CompatSet.h"
#include "include/types.h"
#include "include/Context.h"
#include "common/DecayCounter.h"
#include "common/perf_counters.h"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/Timer.h"
#include "common/LogClient.h"
#include "MDSMap.h"
#include "SessionMap.h"
#define CEPH_MDS_PROTOCOL 19 /* cluster internal */
enum {
l_mds_first = 2000,
l_mds_req,
l_mds_reply,
l_mds_replyl,
l_mds_fw,
l_mds_dir_f,
l_mds_dir_c,
l_mds_dir_sp,
l_mds_dir_ffc,
l_mds_imax,
l_mds_i,
l_mds_itop,
l_mds_ibot,
l_mds_iptail,
l_mds_ipin,
l_mds_iex,
l_mds_icap,
l_mds_cap,
l_mds_dis,
l_mds_t,
l_mds_thit,
l_mds_tfw,
l_mds_tdis,
l_mds_tdirf,
l_mds_trino,
l_mds_tlock,
l_mds_l,
l_mds_q,
l_mds_popanyd,
l_mds_popnest,
l_mds_sm,
l_mds_ex,
l_mds_iexp,
l_mds_im,
l_mds_iim,
l_mds_last,
};
enum {
l_mdc_first = 3000,
l_mdc_last,
};
enum {
l_mdm_first = 2500,
l_mdm_ino,
l_mdm_inoa,
l_mdm_inos,
l_mdm_dir,
l_mdm_dira,
l_mdm_dirs,
l_mdm_dn,
l_mdm_dna,
l_mdm_dns,
l_mdm_cap,
l_mdm_capa,
l_mdm_caps,
l_mdm_rss,
l_mdm_heap,
l_mdm_malloc,
l_mdm_buf,
l_mdm_last,
};
class filepath;
class MonClient;
class OSDMap;
class Objecter;
class Filer;
class Server;
class Locker;
class MDCache;
class MDLog;
class MDBalancer;
class CInode;
class CDir;
class CDentry;
class Messenger;
class Message;
class MClientRequest;
class MClientReply;
class MMDSBeacon;
class InoTable;
class SnapServer;
class SnapClient;
class AnchorServer;
class AnchorClient;
class MDSTableServer;
class MDSTableClient;
class AuthAuthorizeHandlerRegistry;
class MDS : public Dispatcher {
public:
Mutex mds_lock;
SafeTimer timer;
AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
string name;
int whoami;
int incarnation;
int standby_for_rank;
int standby_type;
string standby_for_name;
bool standby_replaying; // true if current replay pass is in standby-replay mode
Messenger *messenger;
MonClient *monc;
MDSMap *mdsmap;
OSDMap *osdmap;
Objecter *objecter;
Filer *filer; // for reading/writing to/from osds
LogClient clog;
// sub systems
Server *server;
MDCache *mdcache;
Locker *locker;
MDLog *mdlog;
MDBalancer *balancer;
InoTable *inotable;
AnchorServer *anchorserver;
AnchorClient *anchorclient;
SnapServer *snapserver;
SnapClient *snapclient;
MDSTableClient *get_table_client(int t);
MDSTableServer *get_table_server(int t);
PerfCounters *logger, *mlogger;
int orig_argc;
const char **orig_argv;
protected:
// -- MDS state --
int last_state;
int state; // my confirmed state
int want_state; // the state i want
list<Context*> waiting_for_active, waiting_for_replay, waiting_for_reconnect, waiting_for_resolve;
list<Context*> replay_queue;
map<int, list<Context*> > waiting_for_active_peer;
list<Message*> waiting_for_nolaggy;
map<epoch_t, list<Context*> > waiting_for_mdsmap;
map<int,version_t> peer_mdsmap_epoch;
tid_t last_tid; // for mds-initiated requests (e.g. stray rename)
public:
void wait_for_active(Context *c) {
waiting_for_active.push_back(c);
}
void wait_for_active_peer(int who, Context *c) {
waiting_for_active_peer[who].push_back(c);
}
void wait_for_replay(Context *c) {
waiting_for_replay.push_back(c);
}
void wait_for_reconnect(Context *c) {
waiting_for_reconnect.push_back(c);
}
void wait_for_resolve(Context *c) {
waiting_for_resolve.push_back(c);
}
void wait_for_mdsmap(epoch_t e, Context *c) {
waiting_for_mdsmap[e].push_back(c);
}
void enqueue_replay(Context *c) {
replay_queue.push_back(c);
}
int get_state() { return state; }
int get_want_state() { return want_state; }
bool is_creating() { return state == MDSMap::STATE_CREATING; }
bool is_starting() { return state == MDSMap::STATE_STARTING; }
bool is_standby() { return state == MDSMap::STATE_STANDBY; }
bool is_replay() { return state == MDSMap::STATE_REPLAY; }
bool is_standby_replay() { return state == MDSMap::STATE_STANDBY_REPLAY; }
bool is_resolve() { return state == MDSMap::STATE_RESOLVE; }
bool is_reconnect() { return state == MDSMap::STATE_RECONNECT; }
bool is_rejoin() { return state == MDSMap::STATE_REJOIN; }
bool is_clientreplay() { return state == MDSMap::STATE_CLIENTREPLAY; }
bool is_active() { return state == MDSMap::STATE_ACTIVE; }
bool is_stopping() { return state == MDSMap::STATE_STOPPING; }
bool is_oneshot_replay() { return state == MDSMap::STATE_ONESHOT_REPLAY; }
bool is_any_replay() { return (is_replay() || is_standby_replay() ||
is_oneshot_replay()); }
bool is_stopped() { return mdsmap->is_stopped(whoami); }
void request_state(int s);
tid_t issue_tid() { return ++last_tid; }
// -- waiters --
list<Context*> finished_queue;
void queue_waiter(Context *c) {
finished_queue.push_back(c);
}
void queue_waiters(list<Context*>& ls) {
finished_queue.splice( finished_queue.end(), ls );
}
bool queue_one_replay() {
if (replay_queue.empty())
return false;
queue_waiter(replay_queue.front());
replay_queue.pop_front();
return true;
}
// -- keepalive beacon --
version_t beacon_last_seq; // last seq sent to monitor
map<version_t,utime_t> beacon_seq_stamp; // seq # -> time sent
utime_t beacon_last_acked_stamp; // last time we sent a beacon that got acked
bool was_laggy;
utime_t laggy_until;
bool is_laggy();
utime_t get_laggy_until() { return laggy_until; }
class C_MDS_BeaconSender : public Context {
MDS *mds;
public:
C_MDS_BeaconSender(MDS *m) : mds(m) {}
void finish(int r) {
mds->beacon_sender = 0;
mds->beacon_send();
}
} *beacon_sender;
// tick and other timer fun
class C_MDS_Tick : public Context {
MDS *mds;
public:
C_MDS_Tick(MDS *m) : mds(m) {}
void finish(int r) {
mds->tick_event = 0;
mds->tick();
}
} *tick_event;
void reset_tick();
// -- client map --
SessionMap sessionmap;
epoch_t last_client_mdsmap_bcast;
//void log_clientmap(Context *c);
// shutdown crap
int req_rate;
// ino's and fh's
public:
int get_req_rate() { return req_rate; }
private:
bool ms_dispatch(Message *m);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
bool ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key);
void ms_handle_accept(Connection *con);
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
public:
MDS(const std::string &n, Messenger *m, MonClient *mc);
~MDS();
// handle a signal (e.g., SIGTERM)
void handle_signal(int signum);
// who am i etc
int get_nodeid() { return whoami; }
MDSMap *get_mds_map() { return mdsmap; }
OSDMap *get_osd_map() { return osdmap; }
void send_message_mds(Message *m, int mds);
void forward_message_mds(Message *req, int mds);
void send_message_client_counted(Message *m, client_t client);
void send_message_client_counted(Message *m, Session *session);
void send_message_client_counted(Message *m, Connection *connection);
void send_message_client_counted(Message *m, const ConnectionRef& con) {
send_message_client_counted(m, con.get());
}
void send_message_client(Message *m, Session *session);
void send_message(Message *m, Connection *c);
void send_message(Message *m, const ConnectionRef& c) {
send_message(m, c.get());
}
// start up, shutdown
int init(int wanted_state=MDSMap::STATE_BOOT);
void create_logger();
void bcast_mds_map(); // to mounted clients
void boot_create(); // i am new mds.
void boot_start(int step=0, int r=0); // starting|replay
void calc_recovery_set();
void replay_start();
void creating_done();
void starting_done();
void replay_done();
void standby_replay_restart();
void _standby_replay_restart_finish(int r, uint64_t old_read_pos);
class C_MDS_StandbyReplayRestart;
class C_MDS_StandbyReplayRestartFinish;
void reopen_log();
void resolve_start();
void resolve_done();
void reconnect_start();
void reconnect_done();
void rejoin_joint_start();
void rejoin_start();
void rejoin_done();
void recovery_done();
void clientreplay_start();
void clientreplay_done();
void active_start();
void stopping_start();
void stopping_done();
void handle_mds_recovery(int who);
void handle_mds_failure(int who);
void suicide();
void respawn();
void tick();
void beacon_start();
void beacon_send();
void handle_mds_beacon(MMDSBeacon *m);
void request_osdmap(Context *c);
// messages
bool _dispatch(Message *m);
bool is_stale_message(Message *m);
bool handle_core_message(Message *m);
bool handle_deferrable_message(Message *m);
// special message types
void handle_command(class MMonCommand *m);
void handle_mds_map(class MMDSMap *m);
};
/* This expects to be given a reference which it is responsible for.
* The finish function calls functions which
* will put the Message exactly once.*/
class C_MDS_RetryMessage : public Context {
Message *m;
MDS *mds;
public:
C_MDS_RetryMessage(MDS *mds, Message *m) {
assert(m);
this->m = m;
this->mds = mds;
}
virtual void finish(int r) {
mds->_dispatch(m);
}
};
#endif
|