diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-01-23 11:05:43 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-01-23 11:05:43 +0000 |
commit | 37d3df1a559b8fc852abb3aa5c12df089060e2fc (patch) | |
tree | b72e66276039aa440a82e8560f39bf6681d034d2 | |
parent | 19e9691700293806f0255efe0f2fda93ced1d312 (diff) | |
parent | d6ba4aa042bcef61e3dd3b15322ec00b5bc328c4 (diff) | |
download | rabbitmq-server-37d3df1a559b8fc852abb3aa5c12df089060e2fc.tar.gz |
merge bug24980 into default
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 7 | ||||
-rw-r--r-- | src/rabbit_exchange_type_invalid.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 25 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 57 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 20 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 80 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 55 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 154 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 24 |
17 files changed, 309 insertions, 179 deletions
@@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) else \ dialyzer --output_plt $@ --build_plt \ --apps erts kernel stdlib compiler sasl os_mon mnesia tools \ - public_key crypto ssl; \ + public_key crypto ssl xmerl; \ fi clean: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c7069aed..bbd2fe5b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -465,8 +465,7 @@ synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be - mirrored, and must not have any pending unacknowledged - messages for this command to succeed. + mirrored for this command to succeed. </para> <para> Note that unsynchronised queues from which messages are diff --git a/src/rabbit.erl b/src/rabbit.erl index 641f81c0..0e6c970f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -540,6 +540,9 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +-ifdef(use_specs). +-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()). +-endif. boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> AllNodes = rabbit_mnesia:cluster_nodes(all), {Err, Nodes} = @@ -559,13 +562,15 @@ boot_error(Reason, Stacktrace) -> Args = [Reason, log_location(kernel), log_location(sasl)], boot_error(Reason, Fmt, Args, Stacktrace). +-ifdef(use_specs). +-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()]) + -> no_return()). +-endif. +boot_error(Reason, Fmt, Args, not_available) -> + basic_boot_error(Reason, Fmt, Args); boot_error(Reason, Fmt, Args, Stacktrace) -> - case Stacktrace of - not_available -> basic_boot_error(Reason, Fmt, Args); - _ -> basic_boot_error(Reason, Fmt ++ - "Stack trace:~n ~p~n~n", - Args ++ [Stacktrace]) - end. + basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n", + Args ++ [Stacktrace]). basic_boot_error(Reason, Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2477b891..21b6bb92 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -174,8 +174,7 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). --spec(sync_mirrors/1 :: (pid()) -> - 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')). -spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). -endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0a07a005..2795e317 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1163,7 +1163,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue(AckTags, ChPid, State)); handle_call(sync_mirrors, _From, - State = #q{backing_queue = rabbit_mirror_queue_master = BQ, + State = #q{backing_queue = rabbit_mirror_queue_master, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, HandleInfo = fun (Status) -> @@ -1179,13 +1179,9 @@ handle_call(sync_mirrors, _From, State, #q.stats_timer, fun() -> emit_stats(State#q{status = Status}) end) end, - case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors( - HandleInfo, EmitStats, BQS) of - {ok, BQS1} -> reply(ok, S(BQS1)); - {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} - end; - _ -> reply({error, pending_acks}, State) + case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of + {ok, BQS1} -> reply(ok, S(BQS1)); + {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} end; handle_call(sync_mirrors, _From, State) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 99b5946e..4245f7e2 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -71,10 +71,14 @@ %% content. -callback delete_and_terminate(any(), state()) -> state(). -%% Remove all messages in the queue, but not messages which have been -%% fetched and are pending acks. +%% Remove all 'fetchable' messages from the queue, i.e. all messages +%% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + %% Publish a message. -callback publish(rabbit_types:basic_message(), rabbit_types:message_properties(), boolean(), pid(), @@ -164,7 +168,7 @@ %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), rabbit_types:message_properties(), - A) -> {('stop' | 'cont'), A}), + boolean(), A) -> {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? @@ -226,7 +230,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e3dfc5..614e307c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,7 +284,8 @@ handle_cast(ready_for_close, State = #ch{state = closing, ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, @@ -412,8 +413,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol, {stop, normal, State1} end. +-ifdef(use_specs). +-spec(precondition_failed/1 :: (string()) -> no_return()). +-endif. precondition_failed(Format) -> precondition_failed(Format, []). +-ifdef(use_specs). +-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()). +-endif. precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 12a532b6..d9a4735c 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,16 +42,11 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), - {ok, ChannelSupSupPid} = - supervisor2:start_child( - SupPid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, + [SupPid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 101fe434..c5d781c2 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -31,6 +31,10 @@ description() -> serialise_events() -> false. +-ifdef(use_specs). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> no_return()). +-endif. route(#exchange{name = Name, type = Type}, _) -> rabbit_misc:protocol_error( precondition_failed, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b5f72cad..c704804e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/5, publish_delivered/4, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -198,6 +198,8 @@ purge(State = #state { gm = GM, {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1 }}. +purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). + publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, State = #state { gm = GM, seen_status = SS, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9f12b34e..27b0326d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -37,18 +37,10 @@ -include("rabbit.hrl"). -%%---------------------------------------------------------------------------- - -include("gm_specs.hrl"). --ifdef(use_specs). -%% Shut dialyzer up --spec(promote_me/2 :: (_, _) -> no_return()). --endif. - %%---------------------------------------------------------------------------- - -define(CREATION_EVENT_KEYS, [pid, name, @@ -79,6 +71,8 @@ depth_delta }). +%%---------------------------------------------------------------------------- + start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> @@ -227,10 +221,12 @@ handle_cast({sync_start, Ref, Syncer}, backing_queue = BQ, backing_queue_state = BQS }) -> State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), - S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined, - rate_timer_ref = TRefN, - backing_queue_state = BQSN} end, - %% [0] We can only sync when there are no pending acks + S = fun({MA, TRefN, BQSN}) -> + State1#state{depth_delta = undefined, + msg_id_ack = dict:from_list(MA), + rate_timer_ref = TRefN, + backing_queue_state = BQSN} + end, case rabbit_mirror_queue_sync:slave( DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> @@ -240,7 +236,7 @@ handle_cast({sync_start, Ref, Syncer}, {TRefN, BQSN1} end) of denied -> noreply(State1); - {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {ok, Res} -> noreply(set_delta(0, S(Res))); {failed, Res} -> noreply(S(Res)); {stop, Reason, Res} -> {stop, Reason, S(Res)} end; @@ -469,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. +-ifdef(use_specs). +-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()). +-endif. promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f2ab67cd..b8cfe4a9 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -57,6 +57,9 @@ -type(log_fun() :: fun ((string(), [any()]) -> 'ok')). -type(bq() :: atom()). -type(bqs() :: any()). +-type(ack() :: any()). +-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), + bqs()}). -spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), @@ -69,8 +72,8 @@ -spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(), bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> 'denied' | - {'ok' | 'failed', {timer:tref(), bqs()}} | - {'stop', any(), {timer:tref(), bqs()}}). + {'ok' | 'failed', slave_sync_state()} | + {'stop', any(), slave_sync_state()}). -endif. @@ -91,16 +94,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Acc) -> - master_send(Msg, MsgProps, Args, Acc) + case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> + master_send(Msg, MsgProps, Unacked, Args, Acc) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {I, Last}) -> +master_send(Msg, MsgProps, Unacked, + {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> EmitStats({syncing, I}), Log("~p messages", [I]), @@ -119,7 +122,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), gen_server2:reply(From, ok), {stop, cancelled}; - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} @@ -164,11 +167,11 @@ syncer(Ref, Log, MPid, SPids) -> syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive - {msg, Ref, Msg, MsgProps} -> + {msg, Ref, Msg, MsgProps, Unacked} -> SPids1 = wait_for_credit(SPids), [begin credit_flow:send(SPid), - SPid ! {sync_msg, Ref, Msg, MsgProps} + SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> @@ -204,12 +207,12 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, Syncer), Syncer ! {sync_ready, Ref, self()}, - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration, - rabbit_misc:get_parent()}, TRef, BQS1). + rabbit_misc:get_parent()}, {[], TRef, BQS1}). slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, - TRef, BQS) -> + State = {MA, TRef, BQS}) -> receive {'DOWN', MRef, process, Syncer, _Reason} -> %% If the master dies half way we are not in the usual @@ -218,34 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, %% sync with a newly promoted master, or even just receive %% messages from it, we have a hole in the middle. So the %% only thing to do here is purge. - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), credit_flow:peer_down(Syncer), - {failed, {TRef, BQS1}}; + {failed, {[], TRef, BQS1}}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - slave_sync_loop(Args, TRef, BQS); + slave_sync_loop(Args, State); {sync_complete, Ref} -> erlang:demonitor(MRef, [flush]), credit_flow:peer_down(Syncer), - {ok, {TRef, BQS}}; + {ok, State}; {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), - slave_sync_loop(Args, TRef, BQS); + slave_sync_loop(Args, State); {'$gen_cast', {set_ram_duration_target, Duration}} -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - slave_sync_loop(Args, TRef, BQS1); + slave_sync_loop(Args, {MA, TRef, BQS1}); update_ram_duration -> {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), - slave_sync_loop(Args, TRef1, BQS1); - {sync_msg, Ref, Msg, Props} -> + slave_sync_loop(Args, {MA, TRef1, BQS1}); + {sync_msg, Ref, Msg, Props, Unacked} -> credit_flow:ack(Syncer), Props1 = Props#message_properties{needs_confirming = false}, - BQS1 = BQ:publish(Msg, Props1, true, none, BQS), - slave_sync_loop(Args, TRef, BQS1); + {MA1, BQS1} = + case Unacked of + false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)}; + true -> {AckTag, BQS2} = BQ:publish_delivered( + Msg, Props1, none, BQS), + {[{Msg#basic_message.id, AckTag} | MA], BQS2} + end, + slave_sync_loop(Args, {MA1, TRef, BQS1}); {'EXIT', Parent, Reason} -> - {stop, Reason, {TRef, BQS}}; + {stop, Reason, State}; %% If the master throws an exception {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> BQ:delete_and_terminate(Reason, BQS), - {stop, Reason, {TRef, undefined}} + {stop, Reason, {[], TRef, undefined}} end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 31eeef73..080e0987 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -18,7 +18,8 @@ -export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, stop_tcp_listener/1, on_node_down/1, active_listeners/0, - node_listeners/1, connections/0, connection_info_keys/0, + node_listeners/1, register_connection/1, unregister_connection/1, + connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, close_connection/2, force_connection_event_refresh/0, tcp_host/1]). @@ -65,6 +66,8 @@ -spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). +-spec(register_connection/1 :: (pid()) -> ok). +-spec(unregister_connection/1 :: (pid()) -> ok). -spec(connections/0 :: () -> [rabbit_types:connection()]). -spec(connections_local/0 :: () -> [rabbit_types:connection()]). -spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()). @@ -294,20 +297,15 @@ start_client(Sock) -> start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). +register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). + +unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). + connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). -connections_local() -> - [Reader || - {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup), - Reader <- [try - rabbit_connection_sup:reader(ConnSup) - catch exit:{noproc, _} -> - noproc - end], - Reader =/= noproc]. +connections_local() -> pg_local:get_members(rabbit_connections). connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7a28c8a3..ae832749 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2]). +-export([init/4, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -37,7 +37,8 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). + conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, @@ -109,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -203,7 +204,7 @@ name(Sock) -> socket_ends(Sock) -> socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end). -start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = name(Sock), @@ -234,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - channel_sup_sup_pid = ChannelSupSupPid, + conn_sup_pid = ConnSupPid, + channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, @@ -244,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_at = never}}, try ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), - recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)), + run({?MODULE, recvloop, + [Deb, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -263,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, %% accounting as accurate as possible we ought to close the %% socket w/o delay before termination. rabbit_net:fast_close(ClientSock), + rabbit_networking:unregister_connection(self()), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. +run({M, F, A}) -> + try apply(M, F, A) + catch {become, MFA} -> run(MFA) + end. + recvloop(Deb, State = #v1{pending_recv = true}) -> mainloop(Deb, State); recvloop(Deb, State = #v1{connection_state = blocked}) -> @@ -689,8 +698,17 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); +%% ... and finally, the 1.0 spec is crystal clear! Note that the +%% TLS uses a different protocol number, and would go here. +handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) -> + become_1_0(amqp, {0, 1, 0, 0}, State); + +%% 3 stands for "SASL" +handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) -> + become_1_0(sasl, {3, 1, 0, 0}, State); + handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, A, B, C, D}); + refuse_connection(Sock, {bad_version, {A, B, C, D}}); handle_input(handshake, Other, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); @@ -704,6 +722,7 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> + rabbit_networking:register_connection(self()), Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, @@ -799,17 +818,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, + conn_sup_pid = ConnSupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + {ok, ChannelSupSupPid} = + supervisor2:start_child( + ConnSupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - throttle = Throttle#throttle{ - conserve_resources = Conserve}}), + State#v1{connection_state = running, + connection = NewConnection, + channel_sup_sup_pid = ChannelSupSupPid, + throttle = Throttle1}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -979,3 +1005,29 @@ cert_info(F, #v1{sock = Sock}) -> emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +%% 1.0 stub +-ifdef(use_specs). +-spec(become_1_0/3 :: ('amqp' | 'sasl', + {non_neg_integer(), non_neg_integer(), + non_neg_integer(), non_neg_integer()}, + #v1{}) -> no_return()). +-endif. +become_1_0(Mode, Version, State = #v1{sock = Sock}) -> + case code:is_loaded(rabbit_amqp1_0_reader) of + false -> refuse_connection(Sock, {bad_version, Version}); + _ -> throw({become, {rabbit_amqp1_0_reader, become, + [Mode, pack_for_1_0(State)]}}) + end. + +pack_for_1_0(#v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + queue_collector = QueueCollector, + conn_sup_pid = ConnSupPid, + start_heartbeat_fun = SHF, + buf = Buf, + buf_len = BufLen}) -> + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + Buf, BufLen}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7257827a..b0ff5af9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1086,7 +1086,7 @@ test_policy_validation() -> test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun () -> receive shutdown -> ok end end), + Writer = spawn(fun test_writer/0), {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), @@ -1123,7 +1123,8 @@ test_server_status() -> [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), N =:= node()], - {ok, _C} = gen_tcp:connect(H, P, []), + {ok, C} = gen_tcp:connect(H, P, []), + gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>), timer:sleep(100), ok = info_action(list_connections, rabbit_networking:connection_info_keys(), false), @@ -1160,10 +1161,15 @@ test_server_status() -> passed. +test_writer() -> test_writer(none). + test_writer(Pid) -> receive - shutdown -> ok; - {send_command, Method} -> Pid ! Method, test_writer(Pid) + {'$gen_call', From, flush} -> gen_server:reply(From, ok), + test_writer(Pid); + {send_command, Method} -> Pid ! Method, + test_writer(Pid); + shutdown -> ok end. test_spawn() -> @@ -2322,21 +2328,26 @@ test_variable_queue() -> fun test_dropwhile_varying_ram_duration/1, fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, + fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, fun test_variable_queue_fold/1]], passed. test_variable_queue_fold(VQ0) -> - {Count, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), - Msgs = RequeuedMsgs ++ FreshMsgs, - lists:foldl( - fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end, - VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). - -test_variable_queue_fold(Cut, Msgs, VQ0) -> + {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Count = rabbit_variable_queue:depth(VQ1), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, A) -> + fun (M, _, Pending, A) -> MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert case MInt =< Cut of true -> {cont, [MInt | A]}; false -> {stop, A} @@ -2397,10 +2408,11 @@ variable_queue_with_holes(VQ0) -> Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), - {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}. + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. test_variable_queue_requeue(VQ0) -> - {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), Msgs = lists:zip(RequeuedMsgs, lists:duplicate(length(RequeuedMsgs), true)) ++ @@ -2416,6 +2428,21 @@ test_variable_queue_requeue(VQ0) -> {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), VQ3. +test_variable_queue_purge(VQ0) -> + LenDepth = fun (VQ) -> + {rabbit_variable_queue:len(VQ), + rabbit_variable_queue:depth(VQ)} + end, + VQ1 = variable_queue_publish(false, 10, VQ0), + {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1), + {4, VQ3} = rabbit_variable_queue:purge(VQ2), + {0, 6} = LenDepth(VQ3), + {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3), + {2, 6} = LenDepth(VQ4), + VQ5 = rabbit_variable_queue:purge_acks(VQ4), + {2, 2} = LenDepth(VQ5), + VQ5. + test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages Len = 1024, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a7045ea..34a4b52f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, @@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4, ram_msg_count = 0, persistent_count = PCount1 })}. +purge_acks(State) -> a(purge_pending_ack(false, State)). + publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, @@ -676,25 +678,12 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, #vqstate { q1 = Q1, - q2 = Q2, - delta = #delta { start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd }, - q3 = Q3, - q4 = Q4 } = State) -> - QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, State0), - {StopGo, AccNext} = - Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), - {StopGo, {AccNext, State1}} - end, - {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), - {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), - {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, - DeltaSeqId, DeltaSeqIdEnd, State2), - {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), - {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), - {Acc5, State5}. +fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> + {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, + [msg_iterator(State), + disk_ack_iterator(State), + ram_ack_iterator(State)]), + ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. @@ -1101,14 +1090,16 @@ queue_out(State = #vqstate { q4 = Q4 }) -> read_msg(#msg_status{msg = undefined, msg_id = MsgId, - is_persistent = IsPersistent}, - State = #vqstate{msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1}}; + is_persistent = IsPersistent}, State) -> + read_msg(MsgId, IsPersistent, State); read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate {msg_store_clients = MSCState1}}. + inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. @@ -1389,7 +1380,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue and fold +%% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1459,48 +1450,81 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; -qfoldl( Fun, {cont, Acc} = A, Q) -> +%%---------------------------------------------------------------------------- +%% Iterator +%%---------------------------------------------------------------------------- + +ram_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. + +disk_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. + +msg_iterator(State) -> istate(start, State). + +istate(start, State) -> {q4, State#vqstate.q4, State}; +istate(q4, State) -> {q3, State#vqstate.q3, State}; +istate(q3, State) -> {delta, State#vqstate.delta, State}; +istate(delta, State) -> {q2, State#vqstate.q2, State}; +istate(q2, State) -> {q1, State#vqstate.q1, State}; +istate(q1, _State) -> done. + +next({ack, It}, IndexState) -> + case gb_trees:next(It) of + none -> {empty, IndexState}; + {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, + {value, MsgStatus, true, Next, IndexState} + end; +next(done, IndexState) -> {empty, IndexState}; +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqId}, State}, IndexState) -> + next(istate(delta, State), IndexState); +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), + SeqId1 = lists:min([SeqIdB, SeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); +next({delta, Delta, [], State}, IndexState) -> + next({delta, Delta, State}, IndexState); +next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> + case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + false -> Next = {delta, Delta, Rest, State}, + {value, beta_msg_status(M), false, Next, IndexState}; + true -> next({delta, Delta, Rest, State}, IndexState) + end; +next({Key, Q, State}, IndexState) -> case ?QUEUE:out(Q) of - {empty, _Q} -> A; - {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) + {empty, _Q} -> next(istate(Key, State), IndexState); + {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, + {value, MsgStatus, false, Next, IndexState} end. -lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; -lfoldl(_Fun, {cont, _Acc} = A, []) -> A; -lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). - -delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> - {stop, {Acc, State}}; -delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {cont, {Acc, State}}; -delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - index_state = IndexState, - msg_store_clients = MSCState } = State) -> - DeltaSeqId1 = lists:min( - [rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered}, - {Acc0, MSCState0}) -> - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, - MsgId), - {StopCont, AccNext} = - Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}}; - true -> {cont, {Acc0, MSCState0}} - end - end, {cont, {Acc, MSCState}}, List), - delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, - State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }). +inext(It, {Its, IndexState}) -> + case next(It, IndexState) of + {empty, IndexState1} -> + {Its, IndexState1}; + {value, MsgStatus1, Unacked, It1, IndexState1} -> + {[{MsgStatus1, Unacked, It1} | Its], IndexState1} + end. + +ifold(_Fun, Acc, [], State) -> + {Acc, State}; +ifold(Fun, Acc, Its, State) -> + [{MsgStatus, Unacked, It} | Rest] = + lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, + {#msg_status{seq_id = SeqId2}, _, _}) -> + SeqId1 =< SeqId2 + end, Its), + {Msg, State1} = read_msg(MsgStatus, State), + case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of + {stop, Acc1} -> + {Acc1, State}; + {cont, Acc1} -> + {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), + ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) + end. %%---------------------------------------------------------------------------- %% Phase changes diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a7ea3d99..059d3839 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -21,7 +21,8 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5]). + send_command_and_notify/4, send_command_and_notify/5, + flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -69,6 +70,7 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -130,7 +132,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(flush(State)) + ?MODULE:mainloop1(internal_flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, Content, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, Content, State)), + gen_server:reply(From, ok), + State1; +handle_message({'$gen_call', From, flush}, State) -> + State1 = internal_flush(State), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. +flush(W) -> call(W, flush). + %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> flush(State); + true -> internal_flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +internal_flush(State = #wstate{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. |