diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gm.erl | 134 | ||||
-rw-r--r-- | src/gm_speed_test.erl | 82 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 1 |
4 files changed, 179 insertions, 44 deletions
@@ -376,15 +376,16 @@ confirmed_broadcast/2, group_members/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_cast/2, prioritise_info/2]). -export([behaviour_info/1]). --export([table_definitions/0]). +-export([table_definitions/0, flush/1]). -define(GROUP_TABLE, gm_group). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(BROADCAST_TIMER, 25). -define(SETS, ordsets). -define(DICT, orddict). @@ -398,7 +399,9 @@ pub_count, members_state, callback_args, - confirms + confirms, + broadcast_buffer, + broadcast_timer }). -record(gm_group, { name, version, members }). @@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) -> group_members(Server) -> gen_server2:call(Server, group_members, infinity). +flush(Server) -> + gen_server2:cast(Server, flush). + init([GroupName, Module, Args]) -> random:seed(now()), gen_server2:cast(self(), join), Self = self(), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = 0, - members_state = undefined, - callback_args = Args, - confirms = queue:new() }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = 0, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_timer = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self, {Module:joined(Args, all_known_members(View)), State1}); handle_cast(leave, State) -> - {stop, normal, State}. + {stop, normal, State}; + +handle_cast(flush, State) -> + noreply( + flush_broadcast_buffer(State #state { broadcast_timer = undefined })). handle_info({'DOWN', MRef, process, _Pid, _Reason}, @@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, end. -terminate(Reason, #state { module = Module, - callback_args = Args }) -> +terminate(Reason, State = #state { module = Module, + callback_args = Args }) -> + flush_broadcast_buffer(State), Module:terminate(Args, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. +prioritise_cast(flush, _State) -> 1; +prioritise_cast(_ , _State) -> 0. prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; prioritise_info(_ , _State) -> 0. @@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, State, hibernate}. + {noreply, ensure_broadcast_timer(State), hibernate}. reply(Reply, State) -> - {reply, Reply, State, hibernate}. - -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - members_state = MembersState, - module = Module, - confirms = Confirms, - callback_args = Args }) -> - PubMsg = {PubCount, Msg}, - Activity = activity_cons(Self, [PubMsg], [], activity_nil()), - ok = maybe_send_activity(activity_finalise(Activity), State), - MembersState1 = - with_member( - fun (Member = #member { pending_ack = PA }) -> - Member #member { pending_ack = queue:in(PubMsg, PA) } - end, Self, MembersState), + {reply, Reply, ensure_broadcast_timer(State), hibernate}. + +ensure_broadcast_timer(State = #state { broadcast_buffer = [], + broadcast_timer = undefined }) -> + State; +ensure_broadcast_timer(State = #state { broadcast_buffer = [], + broadcast_timer = TRef }) -> + timer:cancel(TRef), + State #state { broadcast_timer = undefined }; +ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> + {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]), + State #state { broadcast_timer = TRef }; +ensure_broadcast_timer(State) -> + State. + +internal_broadcast(Msg, From, State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer }) -> + Result = Module:handle_msg(Args, Self, Msg), + Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; _ -> queue:in({PubCount, From}, Confirms) end, - handle_callback_result({Module:handle_msg(Args, Self, Msg), - State #state { pub_count = PubCount + 1, - members_state = MembersState1, - confirms = Confirms1 }}). + State1 = State #state { pub_count = PubCount + 1, + confirms = Confirms1, + broadcast_buffer = Buffer1 }, + case From =/= none of + true -> + handle_callback_result({Result, flush_broadcast_buffer(State1)}); + false -> + handle_callback_result( + {Result, State1 #state { broadcast_buffer = Buffer1 }}) + end. + +flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> + State; +flush_broadcast_buffer(State = #state { self = Self, + members_state = MembersState, + broadcast_buffer = Buffer }) -> + Pubs = lists:reverse(Buffer), + Activity = activity_cons(Self, Pubs, [], activity_nil()), + ok = maybe_send_activity(activity_finalise(Activity), State), + MembersState1 = with_member( + fun (Member = #member { pending_ack = PA }) -> + PA1 = queue:join(PA, queue:from_list(Pubs)), + Member #member { pending_ack = PA1 } + end, Self, MembersState), + State #state { members_state = MembersState1, + broadcast_buffer = [] }. %% --------------------------------------------------------------------------- @@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) -> maybe_monitor(Other, _Self) -> erlang:monitor(process, Other). -check_neighbours(State = #state { self = Self, - left = Left, - right = Right, - view = View }) -> +check_neighbours(State = #state { self = Self, + left = Left, + right = Right, + view = View, + broadcast_buffer = Buffer }) -> #view_member { left = VLeft, right = VRight } = fetch_view_member(Self, View), Ver = view_version(View), Left1 = ensure_neighbour(Ver, Self, Left, VLeft), Right1 = ensure_neighbour(Ver, Self, Right, VRight), - State1 = State #state { left = Left1, right = Right1 }, + Buffer1 = case Right1 of + {Self, undefined} -> []; + _ -> Buffer + end, + State1 = State #state { left = Left1, right = Right1, + broadcast_buffer = Buffer1 }, ok = maybe_send_catchup(Right, State1), State1. diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl new file mode 100644 index 00000000..defb0f29 --- /dev/null +++ b/src/gm_speed_test.erl @@ -0,0 +1,82 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm_speed_test). + +-export([test/3]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([wile_e_coyote/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +%% callbacks + +joined(Owner, _Members) -> + Owner ! joined, + ok. + +members_changed(_Owner, _Births, _Deaths) -> + ok. + +handle_msg(Owner, _From, ping) -> + Owner ! ping, + ok. + +terminate(Owner, _Reason) -> + Owner ! terminated, + ok. + +%% other + +wile_e_coyote(Time, WriteUnit) -> + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + receive joined -> ok end, + timer:sleep(1000), %% wait for all to join + timer:send_after(Time, stop), + Start = now(), + {Sent, Received} = loop(Pid, WriteUnit, 0, 0), + End = now(), + ok = gm:leave(Pid), + receive terminated -> ok end, + Elapsed = timer:now_diff(End, Start) / 1000000, + io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n", + [Sent/Elapsed, Received/Elapsed]), + ok. + +loop(Pid, WriteUnit, Sent, Received) -> + case read(Received) of + {stop, Received1} -> {Sent, Received1}; + {ok, Received1} -> ok = write(Pid, WriteUnit), + loop(Pid, WriteUnit, Sent + WriteUnit, Received1) + end. + +read(Count) -> + receive + ping -> read(Count + 1); + stop -> {stop, Count} + after 5 -> + {ok, Count} + end. + +write(_Pid, 0) -> ok; +write(Pid, N) -> ok = gm:broadcast(Pid, ping), + write(Pid, N - 1). + +test(Time, WriteUnit, Nodes) -> + ok = gm:create_tables(), + [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes]. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 0120f0d6..3fb0817a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -67,8 +67,12 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> + %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's + %% second resolution, not millisecond. + Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, - #'P_basic'{content_type = <<"text/plain">>}, + #'P_basic'{content_type = <<"text/plain">>, + timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e79a58a1..2e9563cf 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -871,4 +871,3 @@ is_process_alive(Pid) -> true -> true; _ -> false end. - |