summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-01-23 11:05:43 +0000
committerTim Watson <tim@rabbitmq.com>2013-01-23 11:05:43 +0000
commit37d3df1a559b8fc852abb3aa5c12df089060e2fc (patch)
treeb72e66276039aa440a82e8560f39bf6681d034d2
parent19e9691700293806f0255efe0f2fda93ced1d312 (diff)
parentd6ba4aa042bcef61e3dd3b15322ec00b5bc328c4 (diff)
downloadrabbitmq-server-37d3df1a559b8fc852abb3aa5c12df089060e2fc.tar.gz
merge bug24980 into default
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml3
-rw-r--r--src/rabbit.erl17
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_connection_sup.erl7
-rw-r--r--src/rabbit_exchange_type_invalid.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl25
-rw-r--r--src/rabbit_mirror_queue_sync.erl57
-rw-r--r--src/rabbit_networking.erl20
-rw-r--r--src/rabbit_reader.erl80
-rw-r--r--src/rabbit_tests.erl55
-rw-r--r--src/rabbit_variable_queue.erl154
-rw-r--r--src/rabbit_writer.erl24
17 files changed, 309 insertions, 179 deletions
diff --git a/Makefile b/Makefile
index c63e3dfd..bf33b931 100644
--- a/Makefile
+++ b/Makefile
@@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
else \
dialyzer --output_plt $@ --build_plt \
--apps erts kernel stdlib compiler sasl os_mon mnesia tools \
- public_key crypto ssl; \
+ public_key crypto ssl xmerl; \
fi
clean:
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c7069aed..bbd2fe5b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -465,8 +465,7 @@
synchronise itself. The queue will block while
synchronisation takes place (all publishers to and
consumers from the queue will block). The queue must be
- mirrored, and must not have any pending unacknowledged
- messages for this command to succeed.
+ mirrored for this command to succeed.
</para>
<para>
Note that unsynchronised queues from which messages are
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 641f81c0..0e6c970f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -540,6 +540,9 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+-ifdef(use_specs).
+-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()).
+-endif.
boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
@@ -559,13 +562,15 @@ boot_error(Reason, Stacktrace) ->
Args = [Reason, log_location(kernel), log_location(sasl)],
boot_error(Reason, Fmt, Args, Stacktrace).
+-ifdef(use_specs).
+-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()])
+ -> no_return()).
+-endif.
+boot_error(Reason, Fmt, Args, not_available) ->
+ basic_boot_error(Reason, Fmt, Args);
boot_error(Reason, Fmt, Args, Stacktrace) ->
- case Stacktrace of
- not_available -> basic_boot_error(Reason, Fmt, Args);
- _ -> basic_boot_error(Reason, Fmt ++
- "Stack trace:~n ~p~n~n",
- Args ++ [Stacktrace])
- end.
+ basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n",
+ Args ++ [Stacktrace]).
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2477b891..21b6bb92 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -174,8 +174,7 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
--spec(sync_mirrors/1 :: (pid()) ->
- 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')).
-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
-endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0a07a005..2795e317 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1163,7 +1163,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue(AckTags, ChPid, State));
handle_call(sync_mirrors, _From,
- State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
+ State = #q{backing_queue = rabbit_mirror_queue_master,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
HandleInfo = fun (Status) ->
@@ -1179,13 +1179,9 @@ handle_call(sync_mirrors, _From,
State, #q.stats_timer,
fun() -> emit_stats(State#q{status = Status}) end)
end,
- case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(
- HandleInfo, EmitStats, BQS) of
- {ok, BQS1} -> reply(ok, S(BQS1));
- {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
- end;
- _ -> reply({error, pending_acks}, State)
+ case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
end;
handle_call(sync_mirrors, _From, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 99b5946e..4245f7e2 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -71,10 +71,14 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
-%% Remove all messages in the queue, but not messages which have been
-%% fetched and are pending acks.
+%% Remove all 'fetchable' messages from the queue, i.e. all messages
+%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean(), pid(),
@@ -164,7 +168,7 @@
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
rabbit_types:message_properties(),
- A) -> {('stop' | 'cont'), A}),
+ boolean(), A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.
%% How long is my queue?
@@ -226,7 +230,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e3dfc5..614e307c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,7 +284,8 @@ handle_cast(ready_for_close, State = #ch{state = closing,
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
{stop, normal, State};
-handle_cast(terminate, State) ->
+handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
@@ -412,8 +413,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+-ifdef(use_specs).
+-spec(precondition_failed/1 :: (string()) -> no_return()).
+-endif.
precondition_failed(Format) -> precondition_failed(Format, []).
+-ifdef(use_specs).
+-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()).
+-endif.
precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 12a532b6..d9a4735c 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,16 +42,11 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
- {ok, ChannelSupSupPid} =
- supervisor2:start_child(
- SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector,
+ [SupPid, Collector,
rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 101fe434..c5d781c2 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -31,6 +31,10 @@ description() ->
serialise_events() -> false.
+-ifdef(use_specs).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> no_return()).
+-endif.
route(#exchange{name = Name, type = Type}, _) ->
rabbit_misc:protocol_error(
precondition_failed,
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b5f72cad..c704804e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/4,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -198,6 +198,8 @@ purge(State = #state { gm = GM,
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1 }}.
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
+
publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9f12b34e..27b0326d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -37,18 +37,10 @@
-include("rabbit.hrl").
-%%----------------------------------------------------------------------------
-
-include("gm_specs.hrl").
--ifdef(use_specs).
-%% Shut dialyzer up
--spec(promote_me/2 :: (_, _) -> no_return()).
--endif.
-
%%----------------------------------------------------------------------------
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -79,6 +71,8 @@
depth_delta
}).
+%%----------------------------------------------------------------------------
+
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
@@ -227,10 +221,12 @@ handle_cast({sync_start, Ref, Syncer},
backing_queue = BQ,
backing_queue_state = BQS }) ->
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
- S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
- rate_timer_ref = TRefN,
- backing_queue_state = BQSN} end,
- %% [0] We can only sync when there are no pending acks
+ S = fun({MA, TRefN, BQSN}) ->
+ State1#state{depth_delta = undefined,
+ msg_id_ack = dict:from_list(MA),
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN}
+ end,
case rabbit_mirror_queue_sync:slave(
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
@@ -240,7 +236,7 @@ handle_cast({sync_start, Ref, Syncer},
{TRefN, BQSN1}
end) of
denied -> noreply(State1);
- {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {ok, Res} -> noreply(set_delta(0, S(Res)));
{failed, Res} -> noreply(S(Res));
{stop, Reason, Res} -> {stop, Reason, S(Res)}
end;
@@ -469,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
+-ifdef(use_specs).
+-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()).
+-endif.
promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f2ab67cd..b8cfe4a9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -57,6 +57,9 @@
-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
-type(bq() :: atom()).
-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
@@ -69,8 +72,8 @@
-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
'denied' |
- {'ok' | 'failed', {timer:tref(), bqs()}} |
- {'stop', any(), {timer:tref(), bqs()}}).
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
-endif.
@@ -91,16 +94,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Acc) ->
- master_send(Msg, MsgProps, Args, Acc)
+ case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {I, Last}) ->
+master_send(Msg, MsgProps, Unacked,
+ {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
@@ -119,7 +122,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
gen_server2:reply(From, ok),
{stop, cancelled};
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
@@ -164,11 +167,11 @@ syncer(Ref, Log, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
- {msg, Ref, Msg, MsgProps} ->
+ {msg, Ref, Msg, MsgProps, Unacked} ->
SPids1 = wait_for_credit(SPids),
[begin
credit_flow:send(SPid),
- SPid ! {sync_msg, Ref, Msg, MsgProps}
+ SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
@@ -204,12 +207,12 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, Syncer),
Syncer ! {sync_ready, Ref, self()},
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
- rabbit_misc:get_parent()}, TRef, BQS1).
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
- TRef, BQS) ->
+ State = {MA, TRef, BQS}) ->
receive
{'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -218,34 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
%% sync with a newly promoted master, or even just receive
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
credit_flow:peer_down(Syncer),
- {failed, {TRef, BQS1}};
+ {failed, {[], TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef, [flush]),
credit_flow:peer_down(Syncer),
- {ok, {TRef, BQS}};
+ {ok, State};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ slave_sync_loop(Args, {MA, TRef, BQS1});
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
- slave_sync_loop(Args, TRef1, BQS1);
- {sync_msg, Ref, Msg, Props} ->
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
+ {sync_msg, Ref, Msg, Props, Unacked} ->
credit_flow:ack(Syncer),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
- {stop, Reason, {TRef, BQS}};
+ {stop, Reason, State};
%% If the master throws an exception
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, Reason, {TRef, undefined}}
+ {stop, Reason, {[], TRef, undefined}}
end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 31eeef73..080e0987 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -18,7 +18,8 @@
-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info_keys/0,
+ node_listeners/1, register_connection/1, unregister_connection/1,
+ connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
@@ -65,6 +66,8 @@
-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(register_connection/1 :: (pid()) -> ok).
+-spec(unregister_connection/1 :: (pid()) -> ok).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
-spec(connections_local/0 :: () -> [rabbit_types:connection()]).
-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -294,20 +297,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
+register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
+
+unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
+
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() ->
- [Reader ||
- {_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup),
- Reader <- [try
- rabbit_connection_sup:reader(ConnSup)
- catch exit:{noproc, _} ->
- noproc
- end],
- Reader =/= noproc].
+connections_local() -> pg_local:get_members(rabbit_connections).
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7a28c8a3..ae832749 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,7 +37,8 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+ conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -109,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ConnSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -203,7 +204,7 @@ name(Sock) ->
socket_ends(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = name(Sock),
@@ -234,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- channel_sup_sup_pid = ChannelSupSupPid,
+ conn_sup_pid = ConnSupPid,
+ channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
@@ -244,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -263,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% accounting as accurate as possible we ought to close the
%% socket w/o delay before termination.
rabbit_net:fast_close(ClientSock),
+ rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -689,8 +698,17 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+%% TLS uses a different protocol number, and would go here.
+handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) ->
+ become_1_0(amqp, {0, 1, 0, 0}, State);
+
+%% 3 stands for "SASL"
+handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) ->
+ become_1_0(sasl, {3, 1, 0, 0}, State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, A, B, C, D});
+ refuse_connection(Sock, {bad_version, {A, B, C, D}});
handle_input(handshake, Other, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
@@ -704,6 +722,7 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
@@ -799,17 +818,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
+ conn_sup_pid = ConnSupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ ConnSupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- throttle = Throttle#throttle{
- conserve_resources = Conserve}}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -979,3 +1005,29 @@ cert_info(F, #v1{sock = Sock}) ->
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+-ifdef(use_specs).
+-spec(become_1_0/3 :: ('amqp' | 'sasl',
+ {non_neg_integer(), non_neg_integer(),
+ non_neg_integer(), non_neg_integer()},
+ #v1{}) -> no_return()).
+-endif.
+become_1_0(Mode, Version, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, {bad_version, Version});
+ _ -> throw({become, {rabbit_amqp1_0_reader, become,
+ [Mode, pack_for_1_0(State)]}})
+ end.
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ conn_sup_pid = ConnSupPid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF,
+ Buf, BufLen}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7257827a..b0ff5af9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1086,7 +1086,7 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Writer = spawn(fun test_writer/0),
{ok, Ch} = rabbit_channel:start_link(
1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
user(<<"user">>), <<"/">>, [], self(),
@@ -1123,7 +1123,8 @@ test_server_status() ->
[L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
N =:= node()],
- {ok, _C} = gen_tcp:connect(H, P, []),
+ {ok, C} = gen_tcp:connect(H, P, []),
+ gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>),
timer:sleep(100),
ok = info_action(list_connections,
rabbit_networking:connection_info_keys(), false),
@@ -1160,10 +1161,15 @@ test_server_status() ->
passed.
+test_writer() -> test_writer(none).
+
test_writer(Pid) ->
receive
- shutdown -> ok;
- {send_command, Method} -> Pid ! Method, test_writer(Pid)
+ {'$gen_call', From, flush} -> gen_server:reply(From, ok),
+ test_writer(Pid);
+ {send_command, Method} -> Pid ! Method,
+ test_writer(Pid);
+ shutdown -> ok
end.
test_spawn() ->
@@ -2322,21 +2328,26 @@ test_variable_queue() ->
fun test_dropwhile_varying_ram_duration/1,
fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
passed.
test_variable_queue_fold(VQ0) ->
- {Count, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
- Msgs = RequeuedMsgs ++ FreshMsgs,
- lists:foldl(
- fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end,
- VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
-
-test_variable_queue_fold(Cut, Msgs, VQ0) ->
+ {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Count = rabbit_variable_queue:depth(VQ1),
+ Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
+ lists:foldl(fun (Cut, VQ2) ->
+ test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
+ end, VQ1, [0, 1, 2, Count div 2,
+ Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
- fun (M, _, A) ->
+ fun (M, _, Pending, A) ->
MInt = msg2int(M),
+ Pending = lists:member(MInt, PendingMsgs), %% assert
case MInt =< Cut of
true -> {cont, [MInt | A]};
false -> {stop, A}
@@ -2397,10 +2408,11 @@ variable_queue_with_holes(VQ0) ->
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
- {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}.
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
test_variable_queue_requeue(VQ0) ->
- {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
Msgs =
lists:zip(RequeuedMsgs,
lists:duplicate(length(RequeuedMsgs), true)) ++
@@ -2416,6 +2428,21 @@ test_variable_queue_requeue(VQ0) ->
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
VQ3.
+test_variable_queue_purge(VQ0) ->
+ LenDepth = fun (VQ) ->
+ {rabbit_variable_queue:len(VQ),
+ rabbit_variable_queue:depth(VQ)}
+ end,
+ VQ1 = variable_queue_publish(false, 10, VQ0),
+ {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
+ {4, VQ3} = rabbit_variable_queue:purge(VQ2),
+ {0, 6} = LenDepth(VQ3),
+ {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
+ {2, 6} = LenDepth(VQ4),
+ VQ5 = rabbit_variable_queue:purge_acks(VQ4),
+ {2, 2} = LenDepth(VQ5),
+ VQ5.
+
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
Len = 1024,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a7045ea..34a4b52f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
@@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
@@ -676,25 +678,12 @@ ackfold(MsgFun, Acc, State, AckTags) ->
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, #vqstate { q1 = Q1,
- q2 = Q2,
- delta = #delta { start_seq_id = DeltaSeqId,
- end_seq_id = DeltaSeqIdEnd },
- q3 = Q3,
- q4 = Q4 } = State) ->
- QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, State0),
- {StopGo, AccNext} =
- Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
- {StopGo, {AccNext, State1}}
- end,
- {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
- {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
- {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
- DeltaSeqId, DeltaSeqIdEnd, State2),
- {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
- {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
- {Acc5, State5}.
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
@@ -1101,14 +1090,16 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
read_msg(#msg_status{msg = undefined,
msg_id = MsgId,
- is_persistent = IsPersistent},
- State = #vqstate{msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1}};
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
@@ -1389,7 +1380,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue and fold
+%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1459,48 +1450,81 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
-qfoldl( Fun, {cont, Acc} = A, Q) ->
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
case ?QUEUE:out(Q) of
- {empty, _Q} -> A;
- {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
end.
-lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
-lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
-lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
-
-delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
- {stop, {Acc, State}};
-delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {cont, {Acc, State}};
-delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- index_state = IndexState,
- msg_store_clients = MSCState } = State) ->
- DeltaSeqId1 = lists:min(
- [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered},
- {Acc0, MSCState0}) ->
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent,
- MsgId),
- {StopCont, AccNext} =
- Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}};
- true -> {cont, {Acc0, MSCState0}}
- end
- end, {cont, {Acc, MSCState}}, List),
- delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
- State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }).
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
%%----------------------------------------------------------------------------
%% Phase changes
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index a7ea3d99..059d3839 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -21,7 +21,8 @@
-export([start/5, start_link/5, start/6, start_link/6]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
- send_command_and_notify/4, send_command_and_notify/5]).
+ send_command_and_notify/4, send_command_and_notify/5,
+ flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
%% internal
@@ -69,6 +70,7 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:protocol())
@@ -130,7 +132,7 @@ mainloop1(State) ->
receive
Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(flush(State))
+ ?MODULE:mainloop1(internal_flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) ->
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
- State1 = flush(internal_send_command_async(MethodRecord, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
- State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, Content, State)),
+ gen_server:reply(From, ok),
+ State1;
+handle_message({'$gen_call', From, flush}, State) ->
+ State1 = internal_flush(State),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
@@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
+flush(W) -> call(W, flush).
+
%%---------------------------------------------------------------------------
call(Pid, Msg) ->
@@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content,
maybe_flush(State = #wstate{pending = Pending}) ->
case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
- true -> flush(State);
+ true -> internal_flush(State);
false -> State
end.
-flush(State = #wstate{pending = []}) ->
+internal_flush(State = #wstate{pending = []}) ->
State;
-flush(State = #wstate{sock = Sock, pending = Pending}) ->
+internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.