summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl134
-rw-r--r--src/gm_speed_test.erl82
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_misc.erl1
4 files changed, 179 insertions, 44 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 8cf22581..5b3623cf 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,15 +376,16 @@
confirmed_broadcast/2, group_members/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_info/2]).
+ code_change/3, prioritise_cast/2, prioritise_info/2]).
-export([behaviour_info/1]).
--export([table_definitions/0]).
+-export([table_definitions/0, flush/1]).
-define(GROUP_TABLE, gm_group).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(BROADCAST_TIMER, 25).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -398,7 +399,9 @@
pub_count,
members_state,
callback_args,
- confirms
+ confirms,
+ broadcast_buffer,
+ broadcast_timer
}).
-record(gm_group, { name, version, members }).
@@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) ->
group_members(Server) ->
gen_server2:call(Server, group_members, infinity).
+flush(Server) ->
+ gen_server2:cast(Server, flush).
+
init([GroupName, Module, Args]) ->
random:seed(now()),
gen_server2:cast(self(), join),
Self = self(),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = 0,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new() }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = 0,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_timer = undefined }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self,
{Module:joined(Args, all_known_members(View)), State1});
handle_cast(leave, State) ->
- {stop, normal, State}.
+ {stop, normal, State};
+
+handle_cast(flush, State) ->
+ noreply(
+ flush_broadcast_buffer(State #state { broadcast_timer = undefined })).
handle_info({'DOWN', MRef, process, _Pid, _Reason},
@@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
end.
-terminate(Reason, #state { module = Module,
- callback_args = Args }) ->
+terminate(Reason, State = #state { module = Module,
+ callback_args = Args }) ->
+ flush_broadcast_buffer(State),
Module:terminate(Args, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+prioritise_cast(flush, _State) -> 1;
+prioritise_cast(_ , _State) -> 0.
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
prioritise_info(_ , _State) -> 0.
@@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, State, hibernate}.
+ {noreply, ensure_broadcast_timer(State), hibernate}.
reply(Reply, State) ->
- {reply, Reply, State, hibernate}.
-
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- members_state = MembersState,
- module = Module,
- confirms = Confirms,
- callback_args = Args }) ->
- PubMsg = {PubCount, Msg},
- Activity = activity_cons(Self, [PubMsg], [], activity_nil()),
- ok = maybe_send_activity(activity_finalise(Activity), State),
- MembersState1 =
- with_member(
- fun (Member = #member { pending_ack = PA }) ->
- Member #member { pending_ack = queue:in(PubMsg, PA) }
- end, Self, MembersState),
+ {reply, Reply, ensure_broadcast_timer(State), hibernate}.
+
+ensure_broadcast_timer(State = #state { broadcast_buffer = [],
+ broadcast_timer = undefined }) ->
+ State;
+ensure_broadcast_timer(State = #state { broadcast_buffer = [],
+ broadcast_timer = TRef }) ->
+ timer:cancel(TRef),
+ State #state { broadcast_timer = undefined };
+ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
+ {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]),
+ State #state { broadcast_timer = TRef };
+ensure_broadcast_timer(State) ->
+ State.
+
+internal_broadcast(Msg, From, State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer }) ->
+ Result = Module:handle_msg(Args, Self, Msg),
+ Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
_ -> queue:in({PubCount, From}, Confirms)
end,
- handle_callback_result({Module:handle_msg(Args, Self, Msg),
- State #state { pub_count = PubCount + 1,
- members_state = MembersState1,
- confirms = Confirms1 }}).
+ State1 = State #state { pub_count = PubCount + 1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1 },
+ case From =/= none of
+ true ->
+ handle_callback_result({Result, flush_broadcast_buffer(State1)});
+ false ->
+ handle_callback_result(
+ {Result, State1 #state { broadcast_buffer = Buffer1 }})
+ end.
+
+flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
+ State;
+flush_broadcast_buffer(State = #state { self = Self,
+ members_state = MembersState,
+ broadcast_buffer = Buffer }) ->
+ Pubs = lists:reverse(Buffer),
+ Activity = activity_cons(Self, Pubs, [], activity_nil()),
+ ok = maybe_send_activity(activity_finalise(Activity), State),
+ MembersState1 = with_member(
+ fun (Member = #member { pending_ack = PA }) ->
+ PA1 = queue:join(PA, queue:from_list(Pubs)),
+ Member #member { pending_ack = PA1 }
+ end, Self, MembersState),
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [] }.
%% ---------------------------------------------------------------------------
@@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) ->
maybe_monitor(Other, _Self) ->
erlang:monitor(process, Other).
-check_neighbours(State = #state { self = Self,
- left = Left,
- right = Right,
- view = View }) ->
+check_neighbours(State = #state { self = Self,
+ left = Left,
+ right = Right,
+ view = View,
+ broadcast_buffer = Buffer }) ->
#view_member { left = VLeft, right = VRight }
= fetch_view_member(Self, View),
Ver = view_version(View),
Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
Right1 = ensure_neighbour(Ver, Self, Right, VRight),
- State1 = State #state { left = Left1, right = Right1 },
+ Buffer1 = case Right1 of
+ {Self, undefined} -> [];
+ _ -> Buffer
+ end,
+ State1 = State #state { left = Left1, right = Right1,
+ broadcast_buffer = Buffer1 },
ok = maybe_send_catchup(Right, State1),
State1.
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
new file mode 100644
index 00000000..defb0f29
--- /dev/null
+++ b/src/gm_speed_test.erl
@@ -0,0 +1,82 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_speed_test).
+
+-export([test/3]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([wile_e_coyote/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+%% callbacks
+
+joined(Owner, _Members) ->
+ Owner ! joined,
+ ok.
+
+members_changed(_Owner, _Births, _Deaths) ->
+ ok.
+
+handle_msg(Owner, _From, ping) ->
+ Owner ! ping,
+ ok.
+
+terminate(Owner, _Reason) ->
+ Owner ! terminated,
+ ok.
+
+%% other
+
+wile_e_coyote(Time, WriteUnit) ->
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ receive joined -> ok end,
+ timer:sleep(1000), %% wait for all to join
+ timer:send_after(Time, stop),
+ Start = now(),
+ {Sent, Received} = loop(Pid, WriteUnit, 0, 0),
+ End = now(),
+ ok = gm:leave(Pid),
+ receive terminated -> ok end,
+ Elapsed = timer:now_diff(End, Start) / 1000000,
+ io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n",
+ [Sent/Elapsed, Received/Elapsed]),
+ ok.
+
+loop(Pid, WriteUnit, Sent, Received) ->
+ case read(Received) of
+ {stop, Received1} -> {Sent, Received1};
+ {ok, Received1} -> ok = write(Pid, WriteUnit),
+ loop(Pid, WriteUnit, Sent + WriteUnit, Received1)
+ end.
+
+read(Count) ->
+ receive
+ ping -> read(Count + 1);
+ stop -> {stop, Count}
+ after 5 ->
+ {ok, Count}
+ end.
+
+write(_Pid, 0) -> ok;
+write(Pid, N) -> ok = gm:broadcast(Pid, ping),
+ write(Pid, N - 1).
+
+test(Time, WriteUnit, Nodes) ->
+ ok = gm:create_tables(),
+ [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes].
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 0120f0d6..3fb0817a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -67,8 +67,12 @@ publish(_Other, _Format, _Data, _State) ->
ok.
publish1(RoutingKey, Format, Data, LogExch) ->
+ %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
+ %% second resolution, not millisecond.
+ Timestamp = rabbit_misc:now_ms() div 1000,
{ok, _RoutingRes, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey, false, false, none,
- #'P_basic'{content_type = <<"text/plain">>},
+ #'P_basic'{content_type = <<"text/plain">>,
+ timestamp = Timestamp},
list_to_binary(io_lib:format(Format, Data))),
ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e79a58a1..2e9563cf 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -871,4 +871,3 @@ is_process_alive(Pid) ->
true -> true;
_ -> false
end.
-