diff options
author | Michael Bridgen <mikeb@rabbitmq.com> | 2010-09-03 16:55:32 +0100 |
---|---|---|
committer | Michael Bridgen <mikeb@rabbitmq.com> | 2010-09-03 16:55:32 +0100 |
commit | 646dc17d80d4b093056431e98f32c37b7e557eeb (patch) | |
tree | a44020696815ea98dc3ac0676b9b5c73764a1770 | |
parent | 908711536659deb01260d81d241bda8a8fa352be (diff) | |
parent | 9a484e99114d27eba19685ccc48213bc3a47dbea (diff) | |
download | rabbitmq-server-646dc17d80d4b093056431e98f32c37b7e557eeb.tar.gz |
Merge default into bug23116bug23116
-rw-r--r-- | src/file_handle_cache.erl | 22 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 25 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 20 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 19 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 10 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 69 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 30 | ||||
-rw-r--r-- | src/rabbit_router.erl | 12 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 59 |
15 files changed, 135 insertions, 172 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index f83fa0bc..aecfb096 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -131,6 +131,7 @@ last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -242,6 +243,7 @@ -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). +-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. @@ -781,7 +783,11 @@ init([]) -> Watermark > 0) -> Watermark; _ -> - ulimit() + case ulimit() of + infinity -> infinity; + unknown -> ?FILE_HANDLES_LIMIT_OTHER; + Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS]) + end end, ObtainLimit = obtain_limit(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", @@ -1131,7 +1137,7 @@ track_client(Pid, Clients) -> ulimit() -> case os:type() of {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS; + ?FILE_HANDLES_LIMIT_WINDOWS; {unix, _OsName} -> %% Under Linux, Solaris and FreeBSD, ulimit is a shell %% builtin, not a command. In OS X, it's a command. @@ -1141,16 +1147,14 @@ ulimit() -> "unlimited" -> infinity; String = [C|_] when $0 =< C andalso C =< $9 -> - Num = list_to_integer( - lists:takewhile( - fun (D) -> $0 =< D andalso D =< $9 end, String)) - - ?RESERVED_FOR_OTHERS, - lists:max([1, Num]); + list_to_integer( + lists:takewhile( + fun (D) -> $0 =< D andalso D =< $9 end, String)); _ -> %% probably a variant of %% "/bin/sh: line 1: ulimit: command not found\n" - ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + unknown end; _ -> - ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + unknown end. diff --git a/src/rabbit.erl b/src/rabbit.erl index 303d1f3a..c2574970 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -205,8 +205,7 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(). + ok = ensure_working_log_handlers(). start() -> try diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 8d00f591..9cfe1ca8 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -171,10 +171,6 @@ check_resource_access(Username, check_resource_access(Username, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(_Username, - #resource{name = <<"amq.gen",_/binary>>}, - #permission{scope = client}) -> - ok; check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, Permission) -> @@ -184,14 +180,19 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> - PermRegexp = case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false + case {Name, P} of + {<<"amq.gen",_/binary>>, #permission{scope = client}} -> + true; + _ -> + PermRegexp = case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false + end end end, if Res -> ok; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0cdb4fff..6b9ac560 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -56,7 +56,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(EXPIRES_TYPE, long). +-define(EXPIRES_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -313,13 +313,13 @@ check_declare_arguments(QueueName, Args) -> check_expires_argument(undefined) -> ok; -check_expires_argument({?EXPIRES_TYPE, Expires}) - when is_integer(Expires) andalso Expires > 0 -> - ok; -check_expires_argument({?EXPIRES_TYPE, _Expires}) -> - {error, expires_zero_or_less}; -check_expires_argument(_) -> - {error, expires_not_of_type_long}. +check_expires_argument({Type, Expires}) when Expires > 0 -> + case lists:member(Type, ?EXPIRES_TYPES) of + true -> ok; + false -> {error, {expires_not_of_acceptable_type, Type, Expires}} + end; +check_expires_argument({_Type, _Expires}) -> + {error, expires_zero_or_less}. list(VHostPath) -> mnesia:dirty_match_object( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2cab7136..90a0503b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -146,8 +146,8 @@ code_change(_OldVsn, State, _Extra) -> init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State + {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); + undefined -> State end. declare(Recover, From, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5fa3f8ed..086d260e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -39,7 +39,6 @@ -export([die/1, frame_error/2, amqp_error/4, protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). --export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). -export([table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -108,10 +107,6 @@ rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> 'ok' | rabbit_types:connection_exit()). --spec(get_config/1 :: - (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). --spec(get_config/2 :: (atom(), A) -> A). --spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(table_lookup/2 :: @@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) -> [Key, rabbit_misc:rs(Name), New1, Orig1]) end. -get_config(Key) -> - case dirty_read({rabbit_config, Key}) of - {ok, {rabbit_config, Key, V}} -> {ok, V}; - Other -> Other - end. - -get_config(Key, DefaultValue) -> - case get_config(Key) of - {ok, V} -> V; - {error, not_found} -> DefaultValue - end. - -set_config(Key, Value) -> - ok = mnesia:dirty_write({rabbit_config, Key, Value}). - dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4a5adfae..a3214888 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -169,10 +169,6 @@ table_definitions() -> {attributes, record_info(fields, vhost)}, {disc_copies, [node()]}, {match, #vhost{_='_'}}]}, - {rabbit_config, - [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}, - {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index ff248c23..a9c7db76 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/1, + sync/3, client_init/2, client_terminate/2, client_delete_and_terminate/3, successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -136,7 +136,7 @@ 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), binary()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). @@ -373,13 +373,13 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState, Server) -> close_all_handles(CState), - ok. + ok = gen_server2:call(Server, client_terminate, infinity). client_delete_and_terminate(CState, Server, Ref) -> - ok = client_terminate(CState), - ok = gen_server2:cast(Server, {delete_client, Ref}). + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). @@ -604,7 +604,10 @@ handle_call({new_client_state, CRef}, _From, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); handle_call(successfully_recovered_state, _From, State) -> - reply(State #msstate.successfully_recovered, State). + reply(State #msstate.successfully_recovered, State); + +handle_call(client_terminate, _From, State) -> + reply(ok, State). handle_cast({write, Guid}, State = #msstate { current_file_handle = CurHdl, @@ -721,7 +724,7 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({delete_client, CRef}, +handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> noreply( State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index f656e04c..08272afe 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -107,7 +107,15 @@ boot_ssl() -> ok; {ok, SslListeners} -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), - {ok, SslOpts} = application:get_env(ssl_options), + {ok, SslOptsConfig} = application:get_env(ssl_options), + SslOpts = + case proplists:get_value(verify, SslOptsConfig, verify_none) of + verify_none -> SslOptsConfig; + verify_peer -> [{verify_fun, fun([]) -> true; + ([_|_]) -> false + end} + | SslOptsConfig] + end, [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], ok end. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 26274a36..b23776cd 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -51,7 +51,7 @@ %%---------------------------------------------------------------------------- start() -> - io:format("Activating RabbitMQ plugins ..."), + io:format("Activating RabbitMQ plugins ...~n"), %% Ensure Rabbit is loaded so we can access it's environment application:load(rabbit), @@ -130,7 +130,7 @@ start() -> ok -> ok; error -> error("failed to compile boot script file ~s", [ScriptFile]) end, - io:format("~n~w plugins activated:~n", [length(PluginApps)]), + io:format("~w plugins activated:~n", [length(PluginApps)]), [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) || App <- PluginApps], io:nl(), @@ -151,29 +151,33 @@ determine_version(App) -> {ok, Vsn} = application:get_key(App, vsn), {App, Vsn}. -assert_dir(Dir) -> - case filelib:is_dir(Dir) of - true -> ok; - false -> ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) - end. - -delete_dir(Dir) -> - case filelib:is_dir(Dir) of +delete_recursively(Fn) -> + case filelib:is_dir(Fn) and not(is_symlink(Fn)) of true -> - case file:list_dir(Dir) of + case file:list_dir(Fn) of {ok, Files} -> - [case Dir ++ "/" ++ F of - Fn -> - case filelib:is_dir(Fn) and not(is_symlink(Fn)) of - true -> delete_dir(Fn); - false -> file:delete(Fn) - end - end || F <- Files] - end, - ok = file:del_dir(Dir); + case lists:foldl(fun ( Fn1, ok) -> delete_recursively( + Fn ++ "/" ++ Fn1); + (_Fn1, Err) -> Err + end, ok, Files) of + ok -> case file:del_dir(Fn) of + ok -> ok; + {error, E} -> {error, + {cannot_delete, Fn, E}} + end; + Err -> Err + end; + {error, E} -> + {error, {cannot_list_files, Fn, E}} + end; false -> - ok + case filelib:is_file(Fn) of + true -> case file:delete(Fn) of + ok -> ok; + {error, E} -> {error, {cannot_delete, Fn, E}} + end; + false -> ok + end end. is_symlink(Name) -> @@ -182,13 +186,18 @@ is_symlink(Name) -> _ -> false end. -unpack_ez_plugins(PluginSrcDir, PluginDestDir) -> +unpack_ez_plugins(SrcDir, DestDir) -> %% Eliminate the contents of the destination directory - delete_dir(PluginDestDir), - - assert_dir(PluginDestDir), - [unpack_ez_plugin(PluginName, PluginDestDir) || - PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")]. + case delete_recursively(DestDir) of + ok -> ok; + {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E]) + end, + case filelib:ensure_dir(DestDir ++ "/") of + ok -> ok; + {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2]) + end, + [unpack_ez_plugin(PluginName, DestDir) || + PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")]. unpack_ez_plugin(PluginFn, PluginDestDir) -> zip:unzip(PluginFn, [{cwd, PluginDestDir}]), @@ -247,8 +256,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) -> - [Entry, {apply,{rabbit,prepare,[]}}]; +process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> + [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3c32fc9e..09ada1c0 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -46,7 +46,6 @@ -export([emit_stats/1]). -import(gen_tcp). --import(fprof). -import(inet). -import(prim_inet). @@ -227,33 +226,6 @@ info(Pid, Items) -> emit_stats(Pid) -> gen_server:cast(Pid, emit_stats). -setup_profiling() -> - Value = rabbit_misc:get_config(profiling_enabled, false), - case Value of - once -> - rabbit_log:info("Enabling profiling for this connection, " - "and disabling for subsequent.~n"), - rabbit_misc:set_config(profiling_enabled, false), - fprof:trace(start); - true -> - rabbit_log:info("Enabling profiling for this connection.~n"), - fprof:trace(start); - false -> - ok - end, - Value. - -teardown_profiling(Value) -> - case Value of - false -> - ok; - _ -> - rabbit_log:info("Completing profiling for this connection.~n"), - fprof:trace(stop), - fprof:profile(), - fprof:analyse([{dest, []}, {cols, 100}]) - end. - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -290,7 +262,6 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), - ProfilingValue = setup_profiling(), try mainloop(Deb, switch_callback( #v1{parent = Parent, @@ -330,7 +301,6 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ec049a1a..bfccb0da 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,9 +33,7 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, - match_bindings/2, - match_routing_key/2]). +-export([deliver/2, match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- @@ -45,9 +43,15 @@ -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(qpids() :: [pid()]). -spec(deliver/2 :: - ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). + (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_exchange:name(), + fun ((rabbit_types:binding()) -> boolean())) -> + qpids()). +-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> + qpids()). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a6980b94..bdd3cdcd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1460,7 +1460,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1523,7 +1523,7 @@ test_msg_store() -> ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), %% read the second half again, just for fun (aka code coverage) MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1548,7 +1548,7 @@ test_msg_store() -> {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9)), + msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee8..30d3a8ae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -439,9 +439,10 @@ terminate(State) -> remove_pending_ack(true, tx_commit_index(State)), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_terminate(MSCStateP) + _ -> rabbit_msg_store:client_terminate( + MSCStateP, ?PERSISTENT_MSG_STORE) end, - rabbit_msg_store:client_terminate(MSCStateT), + rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -464,8 +465,7 @@ delete_and_terminate(State) -> case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef), - rabbit_msg_store:client_terminate(MSCStateP) + MSCStateP, ?PERSISTENT_MSG_STORE, PRef) end, rabbit_msg_store:client_delete_and_terminate( MSCStateT, ?TRANSIENT_MSG_STORE, TRef), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index fd5b5ba5..feb214c2 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -114,41 +114,24 @@ mainloop1(ReaderPid, State) -> erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. -handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), +handle_message({send_command, MethodRecord}, State) -> + ok = internal_send_command_async(MethodRecord, State), State; -handle_message({send_command, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), +handle_message({send_command, MethodRecord, Content}, State) -> + ok = internal_send_command_async(MethodRecord, Content, State), State; -handle_message({send_command_sync, From, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), +handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> + ok = internal_send_command_async(MethodRecord, State), gen_server:reply(From, ok), State; -handle_message({send_command_sync, From, {MethodRecord, Content}}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), +handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), gen_server:reply(From, ok), State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> @@ -169,10 +152,10 @@ send_command(W, MethodRecord, Content) -> ok. send_command_sync(W, MethodRecord) -> - call(W, send_command_sync, MethodRecord). + call(W, {send_command_sync, MethodRecord}). send_command_sync(W, MethodRecord, Content) -> - call(W, send_command_sync, {MethodRecord, Content}). + call(W, {send_command_sync, MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, @@ -180,8 +163,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> %--------------------------------------------------------------------------- -call(Pid, Label, Msg) -> - {ok, Res} = gen:call(Pid, Label, Msg, infinity), +call(Pid, Msg) -> + {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity), Res. %--------------------------------------------------------------------------- @@ -231,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> +internal_send_command_async(MethodRecord, + #wstate{sock = Sock, + channel = Channel, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, - Protocol) -> +internal_send_command_async(MethodRecord, Content, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol)), ok. |