1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(gm_test).
-export([test/0]).
-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-behaviour(gm).
-include("gm_specs.hrl").
get_state() ->
get(state).
with_state(Fun) ->
put(state, Fun(get_state())).
inc() ->
case 1 + get(count) of
100000 -> Now = os:timestamp(),
Start = put(ts, Now),
Diff = timer:now_diff(Now, Start),
Rate = 100000 / (Diff / 1000000),
io:format("~p seeing ~p msgs/sec~n", [self(), Rate]),
put(count, 0);
N -> put(count, N)
end.
joined([], Members) ->
io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
put(state, dict:from_list([{Member, empty} || Member <- Members])),
put(count, 0),
put(ts, os:timestamp()),
ok.
members_changed([], Births, Deaths) ->
with_state(
fun (State) ->
State1 =
lists:foldl(
fun (Born, StateN) ->
false = dict:is_key(Born, StateN),
dict:store(Born, empty, StateN)
end, State, Births),
lists:foldl(
fun (Died, StateN) ->
true = dict:is_key(Died, StateN),
dict:store(Died, died, StateN)
end, State1, Deaths)
end),
ok.
handle_msg([], From, {test_msg, Num}) ->
inc(),
with_state(
fun (State) ->
ok = case dict:find(From, State) of
{ok, died} ->
exit({{from, From},
{received_posthumous_delivery, Num}});
{ok, empty} -> ok;
{ok, Num} -> ok;
{ok, Num1} when Num < Num1 ->
exit({{from, From},
{duplicate_delivery_of, Num1},
{expecting, Num}});
{ok, Num1} ->
exit({{from, From},
{missing_delivery_of, Num},
{received_early, Num1}});
error ->
exit({{from, From},
{received_premature_delivery, Num}})
end,
dict:store(From, Num + 1, State)
end),
ok.
terminate([], Reason) ->
io:format("Left ~p (~p)~n", [self(), Reason]),
ok.
spawn_member() ->
spawn_link(
fun () ->
random:seed(now()),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
Start = random:uniform(10000),
send_loop(Pid, Start, Start + random:uniform(10000)),
gm:leave(Pid),
spawn_more()
end).
spawn_more() ->
[spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))].
send_loop(_Pid, Target, Target) ->
ok;
send_loop(Pid, Count, Target) when Target > Count ->
case random:uniform(3) of
3 -> gm:confirmed_broadcast(Pid, {test_msg, Count});
_ -> gm:broadcast(Pid, {test_msg, Count})
end,
timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms
send_loop(Pid, Count + 1, Target).
test() ->
ok = gm:create_tables(),
spawn_member(),
spawn_member().
|