summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2010-09-03 16:55:32 +0100
committerMichael Bridgen <mikeb@rabbitmq.com>2010-09-03 16:55:32 +0100
commit646dc17d80d4b093056431e98f32c37b7e557eeb (patch)
treea44020696815ea98dc3ac0676b9b5c73764a1770
parent908711536659deb01260d81d241bda8a8fa352be (diff)
parent9a484e99114d27eba19685ccc48213bc3a47dbea (diff)
downloadrabbitmq-server-bug23116.tar.gz
Merge default into bug23116bug23116
-rw-r--r--src/file_handle_cache.erl22
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_access_control.erl25
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_msg_store.erl19
-rw-r--r--src/rabbit_networking.erl10
-rw-r--r--src/rabbit_plugin_activator.erl69
-rw-r--r--src/rabbit_reader.erl30
-rw-r--r--src/rabbit_router.erl12
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl8
-rw-r--r--src/rabbit_writer.erl59
15 files changed, 135 insertions, 172 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f83fa0bc..aecfb096 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -131,6 +131,7 @@
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -242,6 +243,7 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
@@ -781,7 +783,11 @@ init([]) ->
Watermark > 0) ->
Watermark;
_ ->
- ulimit()
+ case ulimit() of
+ infinity -> infinity;
+ unknown -> ?FILE_HANDLES_LIMIT_OTHER;
+ Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
+ end
end,
ObtainLimit = obtain_limit(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
@@ -1131,7 +1137,7 @@ track_client(Pid, Clients) ->
ulimit() ->
case os:type() of
{win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS;
+ ?FILE_HANDLES_LIMIT_WINDOWS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
%% builtin, not a command. In OS X, it's a command.
@@ -1141,16 +1147,14 @@ ulimit() ->
"unlimited" ->
infinity;
String = [C|_] when $0 =< C andalso C =< $9 ->
- Num = list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String)) -
- ?RESERVED_FOR_OTHERS,
- lists:max([1, Num]);
+ list_to_integer(
+ lists:takewhile(
+ fun (D) -> $0 =< D andalso D =< $9 end, String));
_ ->
%% probably a variant of
%% "/bin/sh: line 1: ulimit: command not found\n"
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end;
_ ->
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 303d1f3a..c2574970 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -205,8 +205,7 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir().
+ ok = ensure_working_log_handlers().
start() ->
try
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 8d00f591..9cfe1ca8 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -171,10 +171,6 @@ check_resource_access(Username,
check_resource_access(Username,
R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(_Username,
- #resource{name = <<"amq.gen",_/binary>>},
- #permission{scope = client}) ->
- ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
Permission) ->
@@ -184,14 +180,19 @@ check_resource_access(Username,
[] ->
false;
[#user_permission{permission = P}] ->
- PermRegexp = case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
+ case {Name, P} of
+ {<<"amq.gen",_/binary>>, #permission{scope = client}} ->
+ true;
+ _ ->
+ PermRegexp = case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
end
end,
if Res -> ok;
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0cdb4fff..6b9ac560 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -56,7 +56,7 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(EXPIRES_TYPE, long).
+-define(EXPIRES_TYPES, [byte, short, signedint, long]).
%%----------------------------------------------------------------------------
@@ -313,13 +313,13 @@ check_declare_arguments(QueueName, Args) ->
check_expires_argument(undefined) ->
ok;
-check_expires_argument({?EXPIRES_TYPE, Expires})
- when is_integer(Expires) andalso Expires > 0 ->
- ok;
-check_expires_argument({?EXPIRES_TYPE, _Expires}) ->
- {error, expires_zero_or_less};
-check_expires_argument(_) ->
- {error, expires_not_of_type_long}.
+check_expires_argument({Type, Expires}) when Expires > 0 ->
+ case lists:member(Type, ?EXPIRES_TYPES) of
+ true -> ok;
+ false -> {error, {expires_not_of_acceptable_type, Type, Expires}}
+ end;
+check_expires_argument({_Type, _Expires}) ->
+ {error, expires_zero_or_less}.
list(VHostPath) ->
mnesia:dirty_match_object(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2cab7136..90a0503b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -146,8 +146,8 @@ code_change(_OldVsn, State, _Extra) ->
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
- undefined -> State
+ {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
+ undefined -> State
end.
declare(Recover, From,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5fa3f8ed..086d260e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -39,7 +39,6 @@
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
--export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
-export([table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -108,10 +107,6 @@
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
'ok' | rabbit_types:connection_exit()).
--spec(get_config/1 ::
- (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
--spec(get_config/2 :: (atom(), A) -> A).
--spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
@@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
[Key, rabbit_misc:rs(Name), New1, Orig1])
end.
-get_config(Key) ->
- case dirty_read({rabbit_config, Key}) of
- {ok, {rabbit_config, Key, V}} -> {ok, V};
- Other -> Other
- end.
-
-get_config(Key, DefaultValue) ->
- case get_config(Key) of
- {ok, V} -> V;
- {error, not_found} -> DefaultValue
- end.
-
-set_config(Key, Value) ->
- ok = mnesia:dirty_write({rabbit_config, Key, Value}).
-
dirty_read(ReadSpec) ->
case mnesia:dirty_read(ReadSpec) of
[Result] -> {ok, Result};
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4a5adfae..a3214888 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -169,10 +169,6 @@ table_definitions() ->
{attributes, record_info(fields, vhost)},
{disc_copies, [node()]},
{match, #vhost{_='_'}}]},
- {rabbit_config,
- [{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]},
- {match, {rabbit_config, '_', '_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index ff248c23..a9c7db76 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/1,
+ sync/3, client_init/2, client_terminate/2,
client_delete_and_terminate/3, successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -136,7 +136,7 @@
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), binary()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
@@ -373,13 +373,13 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState) ->
+client_terminate(CState, Server) ->
close_all_handles(CState),
- ok.
+ ok = gen_server2:call(Server, client_terminate, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
- ok = client_terminate(CState),
- ok = gen_server2:cast(Server, {delete_client, Ref}).
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
@@ -604,7 +604,10 @@ handle_call({new_client_state, CRef}, _From,
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
handle_call(successfully_recovered_state, _From, State) ->
- reply(State #msstate.successfully_recovered, State).
+ reply(State #msstate.successfully_recovered, State);
+
+handle_call(client_terminate, _From, State) ->
+ reply(ok, State).
handle_cast({write, Guid},
State = #msstate { current_file_handle = CurHdl,
@@ -721,7 +724,7 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({delete_client, CRef},
+handle_cast({client_delete, CRef},
State = #msstate { client_refs = ClientRefs }) ->
noreply(
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f656e04c..08272afe 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -107,7 +107,15 @@ boot_ssl() ->
ok;
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
- {ok, SslOpts} = application:get_env(ssl_options),
+ {ok, SslOptsConfig} = application:get_env(ssl_options),
+ SslOpts =
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end,
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
end.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 26274a36..b23776cd 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -51,7 +51,7 @@
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ..."),
+ io:format("Activating RabbitMQ plugins ...~n"),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
@@ -130,7 +130,7 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated:~n", [length(PluginApps)]),
+ io:format("~w plugins activated:~n", [length(PluginApps)]),
[io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
|| App <- PluginApps],
io:nl(),
@@ -151,29 +151,33 @@ determine_version(App) ->
{ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
-assert_dir(Dir) ->
- case filelib:is_dir(Dir) of
- true -> ok;
- false -> ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
- end.
-
-delete_dir(Dir) ->
- case filelib:is_dir(Dir) of
+delete_recursively(Fn) ->
+ case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
true ->
- case file:list_dir(Dir) of
+ case file:list_dir(Fn) of
{ok, Files} ->
- [case Dir ++ "/" ++ F of
- Fn ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true -> delete_dir(Fn);
- false -> file:delete(Fn)
- end
- end || F <- Files]
- end,
- ok = file:del_dir(Dir);
+ case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
+ Fn ++ "/" ++ Fn1);
+ (_Fn1, Err) -> Err
+ end, ok, Files) of
+ ok -> case file:del_dir(Fn) of
+ ok -> ok;
+ {error, E} -> {error,
+ {cannot_delete, Fn, E}}
+ end;
+ Err -> Err
+ end;
+ {error, E} ->
+ {error, {cannot_list_files, Fn, E}}
+ end;
false ->
- ok
+ case filelib:is_file(Fn) of
+ true -> case file:delete(Fn) of
+ ok -> ok;
+ {error, E} -> {error, {cannot_delete, Fn, E}}
+ end;
+ false -> ok
+ end
end.
is_symlink(Name) ->
@@ -182,13 +186,18 @@ is_symlink(Name) ->
_ -> false
end.
-unpack_ez_plugins(PluginSrcDir, PluginDestDir) ->
+unpack_ez_plugins(SrcDir, DestDir) ->
%% Eliminate the contents of the destination directory
- delete_dir(PluginDestDir),
-
- assert_dir(PluginDestDir),
- [unpack_ez_plugin(PluginName, PluginDestDir) ||
- PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")].
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2])
+ end,
+ [unpack_ez_plugin(PluginName, DestDir) ||
+ PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")].
unpack_ez_plugin(PluginFn, PluginDestDir) ->
zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
@@ -247,8 +256,8 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
- [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
+ [{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3c32fc9e..09ada1c0 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -46,7 +46,6 @@
-export([emit_stats/1]).
-import(gen_tcp).
--import(fprof).
-import(inet).
-import(prim_inet).
@@ -227,33 +226,6 @@ info(Pid, Items) ->
emit_stats(Pid) ->
gen_server:cast(Pid, emit_stats).
-setup_profiling() ->
- Value = rabbit_misc:get_config(profiling_enabled, false),
- case Value of
- once ->
- rabbit_log:info("Enabling profiling for this connection, "
- "and disabling for subsequent.~n"),
- rabbit_misc:set_config(profiling_enabled, false),
- fprof:trace(start);
- true ->
- rabbit_log:info("Enabling profiling for this connection.~n"),
- fprof:trace(start);
- false ->
- ok
- end,
- Value.
-
-teardown_profiling(Value) ->
- case Value of
- false ->
- ok;
- _ ->
- rabbit_log:info("Completing profiling for this connection.~n"),
- fprof:trace(stop),
- fprof:profile(),
- fprof:analyse([{dest, []}, {cols, 100}])
- end.
-
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -290,7 +262,6 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
- ProfilingValue = setup_profiling(),
try
mainloop(Deb, switch_callback(
#v1{parent = Parent,
@@ -330,7 +301,6 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ec049a1a..bfccb0da 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,9 +33,7 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2,
- match_bindings/2,
- match_routing_key/2]).
+-export([deliver/2, match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
@@ -45,9 +43,15 @@
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(qpids() :: [pid()]).
-spec(deliver/2 ::
- ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+ (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_exchange:name(),
+ fun ((rabbit_types:binding()) -> boolean())) ->
+ qpids()).
+-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
+ qpids()).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a6980b94..bdd3cdcd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1460,7 +1460,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1523,7 +1523,7 @@ test_msg_store() ->
ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
%% read the second half again, just for fun (aka code coverage)
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7),
+ ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1548,7 +1548,7 @@ test_msg_store() ->
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9)),
+ msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee8..30d3a8ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -439,9 +439,10 @@ terminate(State) ->
remove_pending_ack(true, tx_commit_index(State)),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(MSCStateP)
+ _ -> rabbit_msg_store:client_terminate(
+ MSCStateP, ?PERSISTENT_MSG_STORE)
end,
- rabbit_msg_store:client_terminate(MSCStateT),
+ rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -464,8 +465,7 @@ delete_and_terminate(State) ->
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef),
- rabbit_msg_store:client_terminate(MSCStateP)
+ MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
end,
rabbit_msg_store:client_delete_and_terminate(
MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index fd5b5ba5..feb214c2 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -114,41 +114,24 @@ mainloop1(ReaderPid, State) ->
erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
-handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
+handle_message({send_command, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
State;
-handle_message({send_command, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+handle_message({send_command, MethodRecord, Content}, State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
State;
-handle_message({send_command_sync, From, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
gen_server:reply(From, ok),
State;
-handle_message({send_command_sync, From, {MethodRecord, Content}},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
@@ -169,10 +152,10 @@ send_command(W, MethodRecord, Content) ->
ok.
send_command_sync(W, MethodRecord) ->
- call(W, send_command_sync, MethodRecord).
+ call(W, {send_command_sync, MethodRecord}).
send_command_sync(W, MethodRecord, Content) ->
- call(W, send_command_sync, {MethodRecord, Content}).
+ call(W, {send_command_sync, MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
@@ -180,8 +163,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
%---------------------------------------------------------------------------
-call(Pid, Label, Msg) ->
- {ok, Res} = gen:call(Pid, Label, Msg, infinity),
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
%---------------------------------------------------------------------------
@@ -231,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+internal_send_command_async(MethodRecord,
+ #wstate{sock = Sock,
+ channel = Channel,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
- Protocol) ->
+internal_send_command_async(MethodRecord, Content,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
Content, FrameMax, Protocol)),
ok.