summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-04 20:37:37 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-04 20:37:37 +0000
commitd64f32854c7782c4b4a8a36386bba038842cf95e (patch)
tree02da01c80465f37e650f2466e33336e0d54b667f
parent78b753aaf006c397c113b810ef8195a6fc1f927b (diff)
parentc8044c53b6a8eed5b685ff263b4ffbcba37a98c7 (diff)
downloadrabbitmq-server-d64f32854c7782c4b4a8a36386bba038842cf95e.tar.gz
merge default into bug23914
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_channel.erl14
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_misc.erl22
-rw-r--r--src/rabbit_msg_file.erl52
-rw-r--r--src/rabbit_msg_store.erl54
-rw-r--r--src/rabbit_queue_index.erl18
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_variable_queue.erl33
10 files changed, 132 insertions, 138 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8b53d948..3aa20821 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -214,8 +214,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
fun (Tx) -> B(Tx), Q end;
- [_] -> %% Q exists on stopped node
- rabbit_misc:const(not_found)
+ %% Q exists on stopped node
+ [_] -> rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -288,7 +288,7 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
- RequiredArgs) ->
+ RequiredArgs) ->
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5fccb542..526fb428 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -301,8 +301,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
{MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
end)(MXs, State2),
noreply(queue_blocked(QPid, State3)).
@@ -715,9 +715,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
ok ->
{noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
+ dict:store(ActualConsumerTag,
+ QueueName,
+ ConsumerMapping)}};
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -739,8 +739,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
return_ok(State, NoWait, OkMsg);
{ok, QueueName} ->
NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
+ dict:erase(ConsumerTag,
+ ConsumerMapping)},
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 746bb66e..8364ecd8 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -103,24 +103,22 @@ print_badrpc_diagnostics(Node) ->
diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [
- {"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
- ].
+ [{"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
stop() ->
ok.
@@ -152,13 +150,13 @@ action(force_reset, Node, [], _Opts, Inform) ->
action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
action(status, Node, [], _Opts, Inform) ->
@@ -320,10 +318,8 @@ wait_for_application0(Node, Attempts) ->
wait_for_application(Node, Attempts).
default_if_empty(List, Default) when is_list(List) ->
- if List == [] ->
- Default;
- true ->
- [list_to_atom(X) || X <- List]
+ if List == [] -> Default;
+ true -> [list_to_atom(X) || X <- List]
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -414,7 +410,7 @@ prettify_typed_amqp_value(Type, Value) ->
_ -> Value
end.
-% the slower shutdown on windows required to flush stdout
+%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
{unix, _} ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5579dbab..e79a58a1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -469,11 +469,11 @@ map_in_order(F, L) ->
table_fold(F, Acc0, TableName) ->
lists:foldl(
fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> Acc;
+ _ -> F(E, Acc)
+ end
+ end)
end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
@@ -755,12 +755,12 @@ unlink_and_capture_exit(Pid) ->
after 0 -> ok
end.
-% Separate flags and options from arguments.
-% get_options([{flag, "-q"}, {option, "-p", "/"}],
-% ["set_permissions","-p","/","guest",
-% "-q",".*",".*",".*"])
-% == {["set_permissions","guest",".*",".*",".*"],
-% [{"-q",true},{"-p","/"}]}
+%% Separate flags and options from arguments.
+%% get_options([{flag, "-q"}, {option, "-p", "/"}],
+%% ["set_permissions","-p","/","guest",
+%% "-q",".*",".*",".*"])
+%% == {["set_permissions","guest",".*",".*",".*"],
+%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
{AsOut, Value} = case Def of
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 22ad3d05..b7de27d4 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -62,9 +62,9 @@ append(FileHdl, MsgId, MsgBody)
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
- MsgId:?MSG_ID_SIZE_BYTES/binary,
- MsgBodyBin:MsgBodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
+ MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgBodyBin:MsgBodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
@@ -74,9 +74,9 @@ read(FileHdl, TotalSize) ->
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- MsgId:?MSG_ID_SIZE_BYTES/binary,
- MsgBodyBin:BodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
+ MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgBodyBin:BodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
@@ -99,27 +99,27 @@ scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
end.
scanner(<<>>, Offset, _Fun, Acc) ->
- {<<>>, Acc, Offset};
+ {<<>>, Acc, Offset};
scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
- {<<>>, Acc, Offset}; %% Nothing to do other than stop.
+ {<<>>, Acc, Offset}; %% Nothing to do other than stop.
scanner(<<Size:?INTEGER_SIZE_BITS, MsgIdAndMsg:Size/binary,
WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- case WriteMarker of
- ?WRITE_OK_MARKER ->
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
- %% which we read the MsgId as a number, and then convert it
- %% back to a binary in order to work around bugs in
- %% Erlang's GC.
- <<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> =
- <<MsgIdAndMsg:Size/binary>>,
- <<MsgId:?MSG_ID_SIZE_BYTES/binary>> =
- <<MsgIdNum:?MSG_ID_SIZE_BITS>>,
- scanner(Rest, Offset + TotalSize, Fun,
- Fun({MsgId, TotalSize, Offset, Msg}, Acc));
- _ ->
- scanner(Rest, Offset + TotalSize, Fun, Acc)
- end;
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the MsgId as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> =
+ <<MsgIdAndMsg:Size/binary>>,
+ <<MsgId:?MSG_ID_SIZE_BYTES/binary>> =
+ <<MsgIdNum:?MSG_ID_SIZE_BITS>>,
+ scanner(Rest, Offset + TotalSize, Fun,
+ Fun({MsgId, TotalSize, Offset, Msg}, Acc));
+ _ ->
+ scanner(Rest, Offset + TotalSize, Fun, Acc)
+ end;
scanner(Data, Offset, _Fun, Acc) ->
- {Data, Acc, Offset}.
+ {Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1b5d51a6..48fce9ed 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -549,7 +549,7 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
%% GC ends, we +1 readers, msg_store ets:deletes (and
%% unlocks the dest)
try Release(),
- Defer()
+ Defer()
catch error:badarg -> read(MsgId, CState)
end;
[#file_summary { locked = false }] ->
@@ -667,7 +667,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
cref_to_msg_ids = dict:new()
- },
+ },
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
@@ -1259,7 +1259,7 @@ safe_file_delete(File, Dir, FileHandlesEts) ->
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
client_ref = Ref } =
- CState) ->
+ CState) ->
Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
{ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
@@ -1468,7 +1468,7 @@ recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> file:delete(Path),
- {true, Tid};
+ {true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1533,7 +1533,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
- fun scan_fun/2, []),
+ fun scan_fun/2, []),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1696,8 +1696,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
pending_gc_completion = Pending,
file_summary_ets = FileSummaryEts,
file_size_limit = FileSizeLimit })
- when (SumFileSize > 2 * FileSizeLimit andalso
- (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
+ when SumFileSize > 2 * FileSizeLimit andalso
+ (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
case ets:first(FileSummaryEts) of
@@ -1760,10 +1760,10 @@ delete_file_if_empty(File, State = #msstate {
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- 0 -> %% don't delete the file_summary_ets entry for File here
- %% because we could have readers which need to be able to
- %% decrement the readers count.
- true = ets:update_element(FileSummaryEts, File,
+ %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ 0 -> true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
Pending1 = orddict_store(File, [], Pending),
@@ -1816,17 +1816,17 @@ combine_files(Source, Destination,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
- readers = 0,
- left = Destination,
- valid_total_size = SourceValid,
- file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
[#file_summary {
- readers = 0,
- right = Source,
- valid_total_size = DestinationValid,
- file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -2004,12 +2004,12 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
?HANDLE_CACHE_BUFFER_SIZE}]),
{ok, _Acc, _IgnoreSize} =
rabbit_msg_file:scan(
- RefOld, filelib:file_size(FileOld),
- fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
- {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
- ok
- end, ok),
+ RefOld, filelib:file_size(FileOld),
+ fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
+ {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
+ ok
+ end, ok),
file_handle_cache:close(RefOld),
file_handle_cache:close(RefNew),
ok.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bb63f0f1..59d87654 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -271,7 +271,7 @@ publish(MsgId, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ create_pub_record_body(MsgId, MsgProps)]),
maybe_flush_journal(
add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
@@ -666,8 +666,8 @@ recover_journal(State) ->
journal_minus_segment(JEntries, SegEntries),
Segment #segment { journal_entries = JEntries1,
unacked = (UnackedCountInJournal +
- UnackedCountInSeg -
- UnackedCountDuplicates) }
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
end, Segments),
State1 #qistate { segments = Segments1 }.
@@ -799,16 +799,16 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
{MsgId, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(MsgId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
ok;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
+ RelSeq:?REL_SEQ_BITS>>,
file_handle_cache:append(
Hdl, case {Del, Ack} of
{del, ack} -> [Binary, Binary];
@@ -853,14 +853,14 @@ load_segment(KeepAcked, #segment { path = Path }) ->
load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
{MsgId, MsgProps} = read_pub_record_body(Hdl),
Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
+ RelSeq:?REL_SEQ_BITS>>} ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f9a3d9c7..710e6878 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -592,14 +592,14 @@ handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
HandleException =
fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, R);
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state, R})
+ end
end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -734,8 +734,7 @@ auth_mechanisms(Sock) ->
auth_mechanisms_binary(Sock) ->
list_to_binary(
- string:join(
- [atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
State = #v1{auth_mechanism = AuthMechanism,
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 53e707f4..f6a1c92f 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -59,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
{routed, QPids};
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
+ immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
@@ -67,7 +67,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
@@ -91,7 +91,7 @@ match_routing_key(SrcName, [RoutingKey]) ->
mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]);
match_routing_key(SrcName, [_|_] = RoutingKeys) ->
Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
- RKey <- RoutingKeys]]),
+ RKey <- RoutingKeys]]),
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = '$2',
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c7eccd15..67cba052 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -510,8 +510,7 @@ publish(Msg, MsgProps, State) ->
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { id = MsgId },
- #message_properties {
- needs_confirming = NeedsConfirming },
+ #message_properties { needs_confirming = NeedsConfirming },
State = #vqstate { len = 0 }) ->
case NeedsConfirming of
true -> blind_confirm(self(), gb_sets:singleton(MsgId));
@@ -632,12 +631,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -778,8 +777,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
+ case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -1394,7 +1393,7 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
{PersistentSeqIdsAcc, MsgIdsByStore};
accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
{PersistentSeqIdsAcc, MsgIdsByStore}) ->
@@ -1817,12 +1816,12 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
multiple_routing_keys() ->
transform_storage(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- MsgId, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- MsgId, Persistent}};
- (_) -> {error, corrupt_message}
- end),
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ MsgId, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ MsgId, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
ok.