diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-04 20:37:37 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-04 20:37:37 +0000 |
commit | d64f32854c7782c4b4a8a36386bba038842cf95e (patch) | |
tree | 02da01c80465f37e650f2466e33336e0d54b667f | |
parent | 78b753aaf006c397c113b810ef8195a6fc1f927b (diff) | |
parent | c8044c53b6a8eed5b685ff263b4ffbcba37a98c7 (diff) | |
download | rabbitmq-server-d64f32854c7782c4b4a8a36386bba038842cf95e.tar.gz |
merge default into bug23914
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 14 | ||||
-rw-r--r-- | src/rabbit_control.erl | 46 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 22 | ||||
-rw-r--r-- | src/rabbit_msg_file.erl | 52 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 54 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 18 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 19 | ||||
-rw-r--r-- | src/rabbit_router.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 33 |
10 files changed, 132 insertions, 138 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8b53d948..3aa20821 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -214,8 +214,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [] -> ok = store_queue(Q), B = add_default_binding(Q), fun (Tx) -> B(Tx), Q end; - [_] -> %% Q exists on stopped node - rabbit_misc:const(not_found) + %% Q exists on stopped node + [_] -> rabbit_misc:const(not_found) end; [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of @@ -288,7 +288,7 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, - RequiredArgs) -> + RequiredArgs) -> rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, [<<"x-expires">>]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5fccb542..526fb428 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -301,8 +301,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), State3 = (case Reason of - normal -> fun record_confirms/2; - _ -> fun send_nacks/2 + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 end)(MXs, State2), noreply(queue_blocked(QPid, State3)). @@ -715,9 +715,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of ok -> {noreply, State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - QueueName, - ConsumerMapping)}}; + dict:store(ActualConsumerTag, + QueueName, + ConsumerMapping)}}; {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -739,8 +739,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, return_ok(State, NoWait, OkMsg); {ok, QueueName} -> NewState = State#ch{consumer_mapping = - dict:erase(ConsumerTag, - ConsumerMapping)}, + dict:erase(ConsumerTag, + ConsumerMapping)}, case rabbit_amqqueue:with( QueueName, fun (Q) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 746bb66e..8364ecd8 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -103,24 +103,22 @@ print_badrpc_diagnostics(Node) -> diagnostics(Node) -> {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - [ - {"diagnostics:", []}, - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - {"- unable to connect to epmd on ~s: ~w", - [NodeHost, EpmdReason]}; - {ok, NamePorts} -> - {"- nodes and their ports on ~s: ~p", - [NodeHost, [{list_to_atom(Name), Port} || - {Name, Port} <- NamePorts]]} - end, - {"- current node: ~w", [node()]}, - case init:get_argument(home) of - {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; - Other -> {"- no current node home dir: ~p", [Other]} - end, - {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]} - ]. + [{"diagnostics:", []}, + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + {"- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]}; + {ok, NamePorts} -> + {"- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]} + end, + {"- current node: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; + Other -> {"- no current node home dir: ~p", [Other]} + end, + {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}]. stop() -> ok. @@ -152,13 +150,13 @@ action(force_reset, Node, [], _Opts, Inform) -> action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Clustering node ~p with ~p", - [Node, ClusterNodes]), + [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", - [Node, ClusterNodes]), + [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); action(status, Node, [], _Opts, Inform) -> @@ -320,10 +318,8 @@ wait_for_application0(Node, Attempts) -> wait_for_application(Node, Attempts). default_if_empty(List, Default) when is_list(List) -> - if List == [] -> - Default; - true -> - [list_to_atom(X) || X <- List] + if List == [] -> Default; + true -> [list_to_atom(X) || X <- List] end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> @@ -414,7 +410,7 @@ prettify_typed_amqp_value(Type, Value) -> _ -> Value end. -% the slower shutdown on windows required to flush stdout +%% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of {unix, _} -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5579dbab..e79a58a1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -469,11 +469,11 @@ map_in_order(F, L) -> table_fold(F, Acc0, TableName) -> lists:foldl( fun (E, Acc) -> execute_mnesia_transaction( - fun () -> case mnesia:match_object(TableName, E, read) of - [] -> Acc; - _ -> F(E, Acc) - end - end) + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> Acc; + _ -> F(E, Acc) + end + end) end, Acc0, dirty_read_all(TableName)). dirty_read_all(TableName) -> @@ -755,12 +755,12 @@ unlink_and_capture_exit(Pid) -> after 0 -> ok end. -% Separate flags and options from arguments. -% get_options([{flag, "-q"}, {option, "-p", "/"}], -% ["set_permissions","-p","/","guest", -% "-q",".*",".*",".*"]) -% == {["set_permissions","guest",".*",".*",".*"], -% [{"-q",true},{"-p","/"}]} +%% Separate flags and options from arguments. +%% get_options([{flag, "-q"}, {option, "-p", "/"}], +%% ["set_permissions","-p","/","guest", +%% "-q",".*",".*",".*"]) +%% == {["set_permissions","guest",".*",".*",".*"], +%% [{"-q",true},{"-p","/"}]} get_options(Defs, As) -> lists:foldl(fun(Def, {AsIn, RsIn}) -> {AsOut, Value} = case Def of diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 22ad3d05..b7de27d4 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -62,9 +62,9 @@ append(FileHdl, MsgId, MsgBody) Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES, case file_handle_cache:append(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgId:?MSG_ID_SIZE_BYTES/binary, - MsgBodyBin:MsgBodyBinSize/binary, - ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of + MsgId:?MSG_ID_SIZE_BYTES/binary, + MsgBodyBin:MsgBodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO end. @@ -74,9 +74,9 @@ read(FileHdl, TotalSize) -> BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgId:?MSG_ID_SIZE_BYTES/binary, - MsgBodyBin:BodyBinSize/binary, - ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> + MsgId:?MSG_ID_SIZE_BYTES/binary, + MsgBodyBin:BodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> {ok, {MsgId, binary_to_term(MsgBodyBin)}}; KO -> KO end. @@ -99,27 +99,27 @@ scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) -> end. scanner(<<>>, Offset, _Fun, Acc) -> - {<<>>, Acc, Offset}; + {<<>>, Acc, Offset}; scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) -> - {<<>>, Acc, Offset}; %% Nothing to do other than stop. + {<<>>, Acc, Offset}; %% Nothing to do other than stop. scanner(<<Size:?INTEGER_SIZE_BITS, MsgIdAndMsg:Size/binary, WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) -> - TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, - case WriteMarker of - ?WRITE_OK_MARKER -> - %% Here we take option 5 from - %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in - %% which we read the MsgId as a number, and then convert it - %% back to a binary in order to work around bugs in - %% Erlang's GC. - <<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> = - <<MsgIdAndMsg:Size/binary>>, - <<MsgId:?MSG_ID_SIZE_BYTES/binary>> = - <<MsgIdNum:?MSG_ID_SIZE_BITS>>, - scanner(Rest, Offset + TotalSize, Fun, - Fun({MsgId, TotalSize, Offset, Msg}, Acc)); - _ -> - scanner(Rest, Offset + TotalSize, Fun, Acc) - end; + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + case WriteMarker of + ?WRITE_OK_MARKER -> + %% Here we take option 5 from + %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in + %% which we read the MsgId as a number, and then convert it + %% back to a binary in order to work around bugs in + %% Erlang's GC. + <<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> = + <<MsgIdAndMsg:Size/binary>>, + <<MsgId:?MSG_ID_SIZE_BYTES/binary>> = + <<MsgIdNum:?MSG_ID_SIZE_BITS>>, + scanner(Rest, Offset + TotalSize, Fun, + Fun({MsgId, TotalSize, Offset, Msg}, Acc)); + _ -> + scanner(Rest, Offset + TotalSize, Fun, Acc) + end; scanner(Data, Offset, _Fun, Acc) -> - {Data, Acc, Offset}. + {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1b5d51a6..48fce9ed 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -549,7 +549,7 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, %% GC ends, we +1 readers, msg_store ets:deletes (and %% unlocks the dest) try Release(), - Defer() + Defer() catch error:badarg -> read(MsgId, CState) end; [#file_summary { locked = false }] -> @@ -667,7 +667,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, cref_to_msg_ids = dict:new() - }, + }, %% If we didn't recover the msg location index then we need to %% rebuild it now. @@ -1259,7 +1259,7 @@ safe_file_delete(File, Dir, FileHandlesEts) -> close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, client_ref = Ref } = - CState) -> + CState) -> Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}), {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) -> true = ets:delete(FileHandlesEts, Key), @@ -1468,7 +1468,7 @@ recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of {ok, Tid} -> file:delete(Path), - {true, Tid}; + {true, Tid}; {error, _Error} -> recover_file_summary(false, Dir) end. @@ -1533,7 +1533,7 @@ scan_file_for_valid_messages(Dir, FileName) -> {ok, Hdl} -> Valid = rabbit_msg_file:scan( Hdl, filelib:file_size( form_filename(Dir, FileName)), - fun scan_fun/2, []), + fun scan_fun/2, []), %% if something really bad has happened, %% the close could fail, but ignore file_handle_cache:close(Hdl), @@ -1696,8 +1696,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, pending_gc_completion = Pending, file_summary_ets = FileSummaryEts, file_size_limit = FileSizeLimit }) - when (SumFileSize > 2 * FileSizeLimit andalso - (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> + when SumFileSize > 2 * FileSizeLimit andalso + (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> %% TODO: the algorithm here is sub-optimal - it may result in a %% complete traversal of FileSummaryEts. case ets:first(FileSummaryEts) of @@ -1760,10 +1760,10 @@ delete_file_if_empty(File, State = #msstate { locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - 0 -> %% don't delete the file_summary_ets entry for File here - %% because we could have readers which need to be able to - %% decrement the readers count. - true = ets:update_element(FileSummaryEts, File, + %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + 0 -> true = ets:update_element(FileSummaryEts, File, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:delete(GCPid, File), Pending1 = orddict_store(File, [], Pending), @@ -1816,17 +1816,17 @@ combine_files(Source, Destination, dir = Dir, msg_store = Server }) -> [#file_summary { - readers = 0, - left = Destination, - valid_total_size = SourceValid, - file_size = SourceFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Source), + readers = 0, + left = Destination, + valid_total_size = SourceValid, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Source), [#file_summary { - readers = 0, - right = Source, - valid_total_size = DestinationValid, - file_size = DestinationFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Destination), + readers = 0, + right = Source, + valid_total_size = DestinationValid, + file_size = DestinationFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Destination), SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -2004,12 +2004,12 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> ?HANDLE_CACHE_BUFFER_SIZE}]), {ok, _Acc, _IgnoreSize} = rabbit_msg_file:scan( - RefOld, filelib:file_size(FileOld), - fun({MsgId, _Size, _Offset, BinMsg}, ok) -> - {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), - {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), - ok - end, ok), + RefOld, filelib:file_size(FileOld), + fun({MsgId, _Size, _Offset, BinMsg}, ok) -> + {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), + {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), + ok + end, ok), file_handle_cache:close(RefOld), file_handle_cache:close(RefNew), ok. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bb63f0f1..59d87654 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -271,7 +271,7 @@ publish(MsgId, SeqId, MsgProps, IsPersistent, false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]), + create_pub_record_body(MsgId, MsgProps)]), maybe_flush_journal( add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). @@ -666,8 +666,8 @@ recover_journal(State) -> journal_minus_segment(JEntries, SegEntries), Segment #segment { journal_entries = JEntries1, unacked = (UnackedCountInJournal + - UnackedCountInSeg - - UnackedCountDuplicates) } + UnackedCountInSeg - + UnackedCountDuplicates) } end, Segments), State1 #qistate { segments = Segments1 }. @@ -799,16 +799,16 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> {MsgId, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]) + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(MsgId, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> ok; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, + RelSeq:?REL_SEQ_BITS>>, file_handle_cache:append( Hdl, case {Del, Ack} of {del, ack} -> [Binary, Binary]; @@ -853,14 +853,14 @@ load_segment(KeepAcked, #segment { path = Path }) -> load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> {MsgId, MsgProps} = read_pub_record_body(Hdl), Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> + RelSeq:?REL_SEQ_BITS>>} -> {UnackedCountDelta, SegEntries1} = case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f9a3d9c7..710e6878 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -592,14 +592,14 @@ handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> HandleException = fun(R) -> - case ?IS_RUNNING(State) of - true -> send_exception(State, 0, R); - %% We don't trust the client at this point - force - %% them to wait for a bit so they can't DOS us with - %% repeated failed logins etc. - false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, R}) - end + case ?IS_RUNNING(State) of + true -> send_exception(State, 0, R); + %% We don't trust the client at this point - force + %% them to wait for a bit so they can't DOS us with + %% repeated failed logins etc. + false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, State#v1.connection_state, R}) + end end, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), @@ -734,8 +734,7 @@ auth_mechanisms(Sock) -> auth_mechanisms_binary(Sock) -> list_to_binary( - string:join( - [atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). + string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, State = #v1{auth_mechanism = AuthMechanism, diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 53e707f4..f6a1c92f 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -59,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false, {routed, QPids}; deliver(QNames, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> + immediate = Immediate}) -> QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, @@ -67,7 +67,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory, rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), + lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Mandatory, Immediate, {Routed, Handled}). @@ -91,7 +91,7 @@ match_routing_key(SrcName, [RoutingKey]) -> mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]); match_routing_key(SrcName, [_|_] = RoutingKeys) -> Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} || - RKey <- RoutingKeys]]), + RKey <- RoutingKeys]]), MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', key = '$2', diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c7eccd15..67cba052 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -510,8 +510,7 @@ publish(Msg, MsgProps, State) -> a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { id = MsgId }, - #message_properties { - needs_confirming = NeedsConfirming }, + #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0 }) -> case NeedsConfirming of true -> blind_confirm(self(), gb_sets:singleton(MsgId)); @@ -632,12 +631,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { %% 3. If an ack is required, add something sensible to PA {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, @@ -778,8 +777,8 @@ ram_duration(State = #vqstate { RamAckCount = gb_trees:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso - AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of + case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -1394,7 +1393,7 @@ accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore}) -> {PersistentSeqIdsAcc, MsgIdsByStore}; accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, {PersistentSeqIdsAcc, MsgIdsByStore}) -> @@ -1817,12 +1816,12 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> multiple_routing_keys() -> transform_storage( - fun ({basic_message, ExchangeName, Routing_Key, Content, - MsgId, Persistent}) -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - MsgId, Persistent}}; - (_) -> {error, corrupt_message} - end), + fun ({basic_message, ExchangeName, Routing_Key, Content, + MsgId, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + MsgId, Persistent}}; + (_) -> {error, corrupt_message} + end), ok. |