summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-05 17:57:27 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-05 17:57:27 +0000
commitd09904c0493b77aac1d0b90164f90ac59d25ccc9 (patch)
treec945d411b3a11a03f90b05d91599dfd1f3239803
parent976787bbbaf1ebbae5e7c620f8b8ae40f55afd71 (diff)
parentb0e1d30b61c493e1a108842076376cbfea72040b (diff)
downloadrabbitmq-server-d09904c0493b77aac1d0b90164f90ac59d25ccc9.tar.gz
merge bug23875 into default
-rw-r--r--packaging/RPMS/Fedora/Makefile9
-rw-r--r--packaging/common/rabbitmq-server.init4
-rw-r--r--packaging/debs/Debian/Makefile4
-rw-r--r--src/file_handle_cache.erl6
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/gm.erl24
-rw-r--r--src/gm_tests.erl14
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl113
-rw-r--r--src/rabbit_auth_backend_internal.erl4
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_basic.erl36
-rw-r--r--src/rabbit_binary_generator.erl13
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_channel_sup.erl12
-rw-r--r--src/rabbit_client_sup.erl4
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_direct.erl22
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl8
-rw-r--r--src/rabbit_exchange_type_topic.erl34
-rw-r--r--src/rabbit_memory_monitor.erl10
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_mnesia.erl22
-rw-r--r--src/rabbit_msg_file.erl54
-rw-r--r--src/rabbit_msg_store.erl58
-rw-r--r--src/rabbit_networking.erl8
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_prelaunch.erl14
-rw-r--r--src/rabbit_queue_index.erl28
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_ssl.erl6
-rw-r--r--src/rabbit_tests.erl290
-rw-r--r--src/rabbit_types.erl96
-rw-r--r--src/rabbit_upgrade.erl6
-rw-r--r--src/rabbit_variable_queue.erl44
-rw-r--r--src/rabbit_vhost.erl18
-rw-r--r--src/rabbit_writer.erl10
41 files changed, 561 insertions, 565 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 287945fe..c67d8fd6 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -12,7 +12,7 @@ ifndef RPM_OS
RPM_OS=fedora
endif
-ifeq "x$(RPM_OS)" "xsuse"
+ifeq "$(RPM_OS)" "suse"
REQUIRES=/sbin/chkconfig /sbin/service
OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
else
@@ -33,6 +33,11 @@ prepare:
sed -i \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
SOURCES/rabbitmq-server.init
+ifeq "$(RPM_OS)" "fedora"
+# Fedora says that only vital services should have Default-Start
+ sed -i -e '/^# Default-Start:/d;/^# Default-Stop:/d' \
+ SOURCES/rabbitmq-server.init
+endif
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
@@ -40,5 +45,5 @@ prepare:
server: prepare
rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES)
-clean:
+clean:
rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index 916dee6f..f3bdc3d2 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -10,8 +10,8 @@
# Provides: rabbitmq-server
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
-# Default-Start:
-# Default-Stop:
+# Default-Start: 3 4 5
+# Default-Stop: 0 1 2 6
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index d937fbb2..31979a8e 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -22,8 +22,12 @@ package: clean
tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
+# Debian and descendants differ from most other distros in that
+# runlevel 2 should start network services.
sed -i \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
+ -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \
+ -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f41815d0..855427dd 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -242,7 +242,7 @@
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
- val_or_error([char()] | binary()) | 'eof').
+ val_or_error([char()] | binary()) | 'eof').
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
-spec(sync/1 :: (ref()) -> ok_or_error()).
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
@@ -252,7 +252,7 @@
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
- val_or_error(non_neg_integer())).
+ val_or_error(non_neg_integer())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
@@ -1117,7 +1117,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
case CStates of
[] -> ok;
_ -> case (Sum / ClientCount) -
- (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
AverageAge when AverageAge > 0 ->
notify_age(CStates, AverageAge);
_ ->
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 94296f97..43e0a8f5 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -453,8 +453,8 @@ unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
Pid;
-% Under R12 let's just ignore it, as we have a single term as Name.
-% On R13 it will never get here, as we get tuple with 'local/global' atom.
+%% Under R12 let's just ignore it, as we have a single term as Name.
+%% On R13 it will never get here, as we get tuple with 'local/global' atom.
unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
diff --git a/src/gm.erl b/src/gm.erl
index 70633a08..fd8d9b77 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -852,9 +852,9 @@ alive_view_members({_Ver, View}) ->
all_known_members({_Ver, View}) ->
?DICT:fold(
- fun (Member, #view_member { aliases = Aliases }, Acc) ->
- ?SETS:to_list(Aliases) ++ [Member | Acc]
- end, [], View).
+ fun (Member, #view_member { aliases = Aliases }, Acc) ->
+ ?SETS:to_list(Aliases) ++ [Member | Acc]
+ end, [], View).
group_to_view(#gm_group { members = Members, version = Ver }) ->
Alive = lists:filter(fun is_member_alive/1, Members),
@@ -1037,15 +1037,15 @@ maybe_erase_aliases(State = #state { self = Self,
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
- fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
- #member { last_pub = LP, last_ack = LA } =
- find_member_or_blank(Id, MembersState),
- case can_erase_view_member(Self, Id, LA, LP) of
- true -> {[Id | ErasableAcc],
- erase_member(Id, MembersStateAcc)};
- false -> Acc
- end
- end, {[], MembersState}, Aliases),
+ fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
+ #member { last_pub = LP, last_ack = LA } =
+ find_member_or_blank(Id, MembersState),
+ case can_erase_view_member(Self, Id, LA, LP) of
+ true -> {[Id | ErasableAcc],
+ erase_member(Id, MembersStateAcc)};
+ false -> Acc
+ end
+ end, {[], MembersState}, Aliases),
State1 = State #state { members_state = MembersState1 },
case Erasable of
[] -> {ok, State1};
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 65e9cff0..ca0ffd64 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -117,13 +117,13 @@ test_broadcast(Fun) ->
with_two_members(test_broadcast_fun(Fun)).
test_broadcast_fun(Fun) ->
- fun (Pid, Pid2) ->
- ok = Fun(Pid, magic_message),
- passed = receive_or_throw({msg, Pid, Pid, magic_message},
- timeout_waiting_for_msg),
- passed = receive_or_throw({msg, Pid2, Pid, magic_message},
- timeout_waiting_for_msg)
- end.
+ fun (Pid, Pid2) ->
+ ok = Fun(Pid, magic_message),
+ passed = receive_or_throw({msg, Pid, Pid, magic_message},
+ timeout_waiting_for_msg),
+ passed = receive_or_throw({msg, Pid2, Pid, magic_message},
+ timeout_waiting_for_msg)
+ end.
with_two_members(Fun) ->
ok = gm:create_tables(),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 6eb59c3e..c9a929ae 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -375,7 +375,7 @@ config_files() ->
error -> []
end.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
print_banner() ->
{ok, Product} = application:get_key(id),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 46b78c39..8e4ca8e3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -52,7 +52,7 @@
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
- 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
@@ -100,13 +100,13 @@
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
- (rabbit_types:amqqueue(), 'false', 'false')
+ (rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
- (rabbit_types:amqqueue(), 'true' , 'false')
+ (rabbit_types:amqqueue(), 'true' , 'false')
-> qlen() | rabbit_types:error('in_use');
- (rabbit_types:amqqueue(), 'false', 'true' )
+ (rabbit_types:amqqueue(), 'false', 'true' )
-> qlen() | rabbit_types:error('not_empty');
- (rabbit_types:amqqueue(), 'true' , 'true' )
+ (rabbit_types:amqqueue(), 'true' , 'true' )
-> qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
@@ -122,10 +122,10 @@
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), qmsg()} | 'empty').
+ {'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
- rabbit_types:ctag(), boolean(), any())
+ (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
+ rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -214,8 +214,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
fun (Tx) -> B(Tx), Q end;
- [_] -> %% Q exists on stopped node
- rabbit_misc:const(not_found)
+ %% Q exists on stopped node
+ [_] -> rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -288,7 +288,7 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
- RequiredArgs) ->
+ RequiredArgs) ->
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 44053593..24de9415 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
-% Queue's state
+%% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -283,17 +283,16 @@ lookup_ch(ChPid) ->
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
- undefined ->
- MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = sets:new(),
- is_limit_active = false,
- txn = none,
- unsent_message_count = 0},
- put(Key, C),
- C;
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ C = #cr{consumer_count = 0,
+ ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = sets:new(),
+ is_limit_active = false,
+ txn = none,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
C = #cr{} -> C
end.
@@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-all_ch_record() ->
- [C || {{ch, _}, C} <- get()].
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
- BlockedOld = is_ch_blocked(OldCR),
- BlockedNew = is_ch_blocked(NewCR),
- if BlockedOld andalso not(BlockedNew) -> unblock;
- BlockedNew andalso not(BlockedOld) -> block;
- true -> ok
+ case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
+ {true, false} -> unblock;
+ {false, true} -> block;
+ {_, _} -> ok
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
@@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ block -> {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
@@ -396,8 +392,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
- not IsEmpty.
+deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
@@ -405,17 +400,16 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
- {CMs, GTC1} =
- lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(Guid, GTC0)};
- _ ->
- {CMs, GTC0}
- end
- end, {gb_trees:empty(), GTC}, Guids),
+ {CMs, GTC1} = lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {gb_trees:empty(), GTC}, Guids),
gb_trees:map(fun(ChPid, MsgSeqNos) ->
rabbit_channel:confirm(ChPid, MsgSeqNos)
end, CMs),
@@ -480,17 +474,14 @@ attempt_delivery(#delivery{txn = none,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
{Delivered, NeedsConfirming, State1};
-attempt_delivery(#delivery{txn = Txn,
+attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- {NeedsConfirming,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ {NeedsConfirming, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- {true,
- NeedsConfirming,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
@@ -661,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) ->
- Now > Expiry
- end, BQS),
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -722,10 +712,10 @@ i(Item, _) ->
consumers(#q{active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
emit_stats(State) ->
emit_stats(State, []).
@@ -747,7 +737,7 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
{channel, ChPid},
{queue, self()}]).
-%---------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
case Msg of
@@ -814,8 +804,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery},
- _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -906,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
case is_ch_blocked(C) of
true -> State1#q{
blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
+ add_consumer(ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_message_queue(
State1#q{
active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
+ add_consumer(ChPid, Consumer,
+ State1#q.active_consumers)})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index a564480b..3d005845 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -52,8 +52,8 @@
-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok').
-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]).
-spec(lookup_user/1 :: (rabbit_types:username())
- -> rabbit_types:ok(rabbit_types:internal_user())
- | rabbit_types:error('not_found')).
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index 2168495d..b8682a46 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -54,5 +54,5 @@ handle_response(Response, _State) ->
_ ->
{protocol_error,
"AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable]}
+ [LoginTable]}
end.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 57aad808..f9a8ee1d 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -44,7 +44,7 @@
-spec(message/3 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
rabbit_types:decoded_content()) ->
- rabbit_types:ok_or_error2(rabbit_types:message(), any())).
+ rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -107,21 +107,21 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
false -> DecodedContent;
{value, Found} -> Headers0 = lists:delete(Found, Headers),
rabbit_binary_generator:clear_encoded_content(
- DecodedContent#content{
- properties = Props#'P_basic'{
- headers = Headers0}})
+ DecodedContent#content{
+ properties = Props#'P_basic'{
+ headers = Headers0}})
end.
message(ExchangeName, RoutingKey,
#content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
- exchange_name = ExchangeName,
- content = strip_header(DecodedContent, ?DELETED_HEADER),
- guid = rabbit_guid:guid(),
- is_persistent = is_message_persistent(DecodedContent),
- routing_keys = [RoutingKey |
- header_routes(Props#'P_basic'.headers)]}}
+ exchange_name = ExchangeName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
catch
{error, _Reason} = Error -> Error
end.
@@ -175,15 +175,15 @@ is_message_persistent(#content{properties = #'P_basic'{
Other -> throw({error, {delivery_mode_unknown, Other}})
end.
-% Extract CC routes from headers
+%% Extract CC routes from headers
header_routes(undefined) ->
[];
header_routes(HeadersTable) ->
lists:append(
- [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
- {array, Routes} -> [Route || {longstr, Route} <- Routes];
- undefined -> [];
- {Type, _Val} -> throw({error, {unacceptable_type_in_header,
- Type,
- binary_to_list(HeaderKey)}})
- end || HeaderKey <- ?ROUTING_HEADERS]).
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ Type,
+ binary_to_list(HeaderKey)}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index dc81ace6..68511a32 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -18,12 +18,13 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
-% - 1 byte of frame type
-% - 2 bytes of channel number
-% - 4 bytes of frame payload length
-% - 1 byte of payload trailer FRAME_END byte
-% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
+%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
+%% - 1 byte of frame type
+%% - 2 bytes of channel number
+%% - 4 bytes of frame payload length
+%% - 1 byte of payload trailer FRAME_END byte
+%% See definition of check_empty_content_body_frame_size/0,
+%% an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
-export([build_simple_method_frame/3,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 96a22dca..7ddb7814 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -70,7 +70,7 @@
rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
+ -> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
-spec(remove_for_destination/1 ::
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e92421fc..526fb428 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -68,9 +68,9 @@
-type(channel_number() :: non_neg_integer()).
-spec(start_link/9 ::
- (channel_number(), pid(), pid(), rabbit_types:protocol(),
- rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+ (channel_number(), pid(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -301,8 +301,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
{MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
end)(MXs, State2),
noreply(queue_blocked(QPid, State3)).
@@ -715,9 +715,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
ok ->
{noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
+ dict:store(ActualConsumerTag,
+ QueueName,
+ ConsumerMapping)}};
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -739,8 +739,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
return_ok(State, NoWait, OkMsg);
{ok, QueueName} ->
NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
+ dict:erase(ConsumerTag,
+ ConsumerMapping)},
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 9cc407bc..8175ad80 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -68,12 +68,12 @@ start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
- SupPid,
- {channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid, Protocol,
- User, VHost, Capabilities, Collector,
- start_limiter_fun(SupPid)]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dbdc6cd4..15e92542 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -29,9 +29,9 @@
-ifdef(use_specs).
-spec(start_link/1 :: (mfa()) ->
- rabbit_types:ok_pid_or_error()).
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
- rabbit_types:ok_pid_or_error()).
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 746bb66e..8364ecd8 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -103,24 +103,22 @@ print_badrpc_diagnostics(Node) ->
diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [
- {"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
- ].
+ [{"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
stop() ->
ok.
@@ -152,13 +150,13 @@ action(force_reset, Node, [], _Opts, Inform) ->
action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
action(status, Node, [], _Opts, Inform) ->
@@ -320,10 +318,8 @@ wait_for_application0(Node, Attempts) ->
wait_for_application(Node, Attempts).
default_if_empty(List, Default) when is_list(List) ->
- if List == [] ->
- Default;
- true ->
- [list_to_atom(X) || X <- List]
+ if List == [] -> Default;
+ true -> [list_to_atom(X) || X <- List]
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -414,7 +410,7 @@ prettify_typed_amqp_value(Type, Value) ->
_ -> Value
end.
-% the slower shutdown on windows required to flush stdout
+%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
{unix, _} ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 586563f6..a2693c69 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -26,8 +26,8 @@
-spec(boot/0 :: () -> 'ok').
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
-spec(start_channel/7 ::
(rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
@@ -40,12 +40,12 @@
boot() ->
{ok, _} =
supervisor2:start_child(
- rabbit_sup,
- {rabbit_direct_client_sup,
- {rabbit_client_sup, start_link,
- [{local, rabbit_direct_client_sup},
- {rabbit_channel_sup, start_link, []}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
%%----------------------------------------------------------------------------
@@ -73,7 +73,7 @@ start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
- rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, Protocol, User, VHost,
- Capabilities, Collector}]),
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 40651d36..9ed532db 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -101,7 +101,7 @@ ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [Fun, []]),
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
ensure_stats_timer(State, _Fun) ->
State.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 92259195..a463e570 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -62,7 +62,7 @@
-> rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
+ -> [rabbit_types:infos()]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
-spec(delete/2 ::
@@ -266,9 +266,9 @@ process_route(#resource{kind = queue} = QName,
call_with_exchange(XName, Fun, PrePostCommitFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end
+ [] -> {error, not_found};
+ [X] -> Fun(X)
+ end
end, PrePostCommitFun).
delete(XName, IfUnused) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 2363d05e..f12661d4 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -42,8 +42,8 @@ description() ->
route(#exchange{name = X},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
lists:append([begin
- Words = split_topic_key(RKey),
- mnesia:async_dirty(fun trie_match/2, [X, Words])
+ Words = split_topic_key(RKey),
+ mnesia:async_dirty(fun trie_match/2, [X, Words])
end || RKey <- Routes]).
validate(_X) -> ok.
@@ -51,9 +51,9 @@ create(_Tx, _X) -> ok.
recover(_Exchange, Bs) ->
rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end).
+ fun () ->
+ lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
+ end).
delete(true, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
@@ -166,9 +166,9 @@ trie_child(X, Node, Word) ->
trie_bindings(X, Node) ->
MatchHead = #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = '$1'}},
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
trie_add_edge(X, FromNode, ToNode, W) ->
@@ -194,9 +194,9 @@ trie_remove_binding(X, Node, D) ->
trie_binding_op(X, Node, D, Op) ->
ok = Op(rabbit_topic_trie_binding,
#topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = D}},
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
write).
trie_has_any_children(X, Node) ->
@@ -209,10 +209,10 @@ trie_has_any_children(X, Node) ->
trie_has_any_bindings(X, Node) ->
has_any(rabbit_topic_trie_binding,
#topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
@@ -223,8 +223,8 @@ trie_remove_all_edges(X) ->
trie_remove_all_bindings(X) ->
remove_all(rabbit_topic_trie_binding,
#topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X, _ = '_'},
- _ = '_'}).
+ trie_binding = #trie_binding{exchange_name = X, _ = '_'},
+ _ = '_'}).
has_any(Table, MatchHead) ->
Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read),
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 2f8c940b..996b0a98 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -111,11 +111,11 @@ stop() ->
init([]) ->
MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
- (try
- vm_memory_monitor:get_memory_limit()
- catch
- exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
- end)),
+ (try
+ vm_memory_monitor:get_memory_limit()
+ catch
+ exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
+ end)),
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index abc27c5f..e79a58a1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -105,7 +105,7 @@
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
- -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
+ -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -469,11 +469,11 @@ map_in_order(F, L) ->
table_fold(F, Acc0, TableName) ->
lists:foldl(
fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> Acc;
+ _ -> F(E, Acc)
+ end
+ end)
end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
@@ -755,12 +755,12 @@ unlink_and_capture_exit(Pid) ->
after 0 -> ok
end.
-% Separate flags and options from arguments.
-% get_options([{flag, "-q"}, {option, "-p", "/"}],
-% ["set_permissions","-p","/","guest",
-% "-q",".*",".*",".*"])
-% == {["set_permissions","guest",".*",".*",".*"],
-% [{"-q",true},{"-p","/"}]}
+%% Separate flags and options from arguments.
+%% get_options([{flag, "-q"}, {option, "-p", "/"}],
+%% ["set_permissions","-p","/","guest",
+%% "-q",".*",".*",".*"])
+%% == {["set_permissions","guest",".*",".*",".*"],
+%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
{AsOut, Value} = case Def of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fc95b77b..66436920 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -129,10 +129,10 @@ empty_ram_only_tables() ->
Node = node(),
lists:foreach(
fun (TabName) ->
- case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
- true -> {atomic, ok} = mnesia:clear_table(TabName);
- false -> ok
- end
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
end, table_names()),
ok.
@@ -519,13 +519,13 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%% unused code - commented out to keep dialyzer happy
-%% Type =:= disc_only ->
-%% if
-%% HasDiscCopies or HasDiscOnlyCopies ->
-%% disc_only_copies;
-%% true -> ram_copies
-%% end;
+%%% unused code - commented out to keep dialyzer happy
+%%% Type =:= disc_only ->
+%%% if
+%%% HasDiscCopies or HasDiscOnlyCopies ->
+%%% disc_only_copies;
+%%% true -> ram_copies
+%%% end;
Type =:= ram ->
ram_copies
end,
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 55e6ac47..ea7cf80c 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -46,8 +46,8 @@
rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()},
any())).
-spec(scan/4 :: (io_device(), file_size(),
- fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
- A) -> {'ok', A, position()}).
+ fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
+ A) -> {'ok', A, position()}).
-endif.
@@ -60,9 +60,9 @@ append(FileHdl, Guid, MsgBody)
Size = MsgBodyBinSize + ?GUID_SIZE_BYTES,
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:MsgBodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
+ Guid:?GUID_SIZE_BYTES/binary,
+ MsgBodyBin:MsgBodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
@@ -72,9 +72,9 @@ read(FileHdl, TotalSize) ->
BodyBinSize = Size - ?GUID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:BodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
+ Guid:?GUID_SIZE_BYTES/binary,
+ MsgBodyBin:BodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {Guid, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
@@ -97,26 +97,26 @@ scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
end.
scanner(<<>>, Offset, _Fun, Acc) ->
- {<<>>, Acc, Offset};
+ {<<>>, Acc, Offset};
scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
- {<<>>, Acc, Offset}; %% Nothing to do other than stop.
+ {<<>>, Acc, Offset}; %% Nothing to do other than stop.
scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- case WriteMarker of
- ?WRITE_OK_MARKER ->
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
- %% which we read the Guid as a number, and then convert it
- %% back to a binary in order to work around bugs in
- %% Erlang's GC.
- <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
- <<GuidAndMsg:Size/binary>>,
- <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scanner(Rest, Offset + TotalSize, Fun,
- Fun({Guid, TotalSize, Offset, Msg}, Acc));
- _ ->
- scanner(Rest, Offset + TotalSize, Fun, Acc)
- end;
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the Guid as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
+ <<GuidAndMsg:Size/binary>>,
+ <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
+ scanner(Rest, Offset + TotalSize, Fun,
+ Fun({Guid, TotalSize, Offset, Msg}, Acc));
+ _ ->
+ scanner(Rest, Offset + TotalSize, Fun, Acc)
+ end;
scanner(Data, Offset, _Fun, Acc) ->
- {Data, Acc, Offset}.
+ {Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9e65e442..8e1b2ac4 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -75,7 +75,7 @@
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
cref_to_guids %% client ref to synced messages mapping
- }).
+ }).
-record(client_msstate,
{ server,
@@ -89,7 +89,7 @@
file_summary_ets,
dedup_cache_ets,
cur_file_cache_ets
- }).
+ }).
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
@@ -549,7 +549,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% GC ends, we +1 readers, msg_store ets:deletes (and
%% unlocks the dest)
try Release(),
- Defer()
+ Defer()
catch error:badarg -> read(Guid, CState)
end;
[#file_summary { locked = false }] ->
@@ -667,7 +667,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
cref_to_guids = dict:new()
- },
+ },
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
@@ -1256,7 +1256,7 @@ safe_file_delete(File, Dir, FileHandlesEts) ->
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
client_ref = Ref } =
- CState) ->
+ CState) ->
Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
{ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
@@ -1465,7 +1465,7 @@ recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> file:delete(Path),
- {true, Tid};
+ {true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1530,7 +1530,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
- fun scan_fun/2, []),
+ fun scan_fun/2, []),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1693,8 +1693,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
pending_gc_completion = Pending,
file_summary_ets = FileSummaryEts,
file_size_limit = FileSizeLimit })
- when (SumFileSize > 2 * FileSizeLimit andalso
- (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
+ when SumFileSize > 2 * FileSizeLimit andalso
+ (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
case ets:first(FileSummaryEts) of
@@ -1757,10 +1757,10 @@ delete_file_if_empty(File, State = #msstate {
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- 0 -> %% don't delete the file_summary_ets entry for File here
- %% because we could have readers which need to be able to
- %% decrement the readers count.
- true = ets:update_element(FileSummaryEts, File,
+ %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ 0 -> true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
Pending1 = orddict_store(File, [], Pending),
@@ -1813,17 +1813,17 @@ combine_files(Source, Destination,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
- readers = 0,
- left = Destination,
- valid_total_size = SourceValid,
- file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
[#file_summary {
- readers = 0,
- right = Source,
- valid_total_size = DestinationValid,
- file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -2001,12 +2001,12 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
?HANDLE_CACHE_BUFFER_SIZE}]),
{ok, _Acc, _IgnoreSize} =
rabbit_msg_file:scan(
- RefOld, filelib:file_size(FileOld),
- fun({Guid, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
- {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
- ok
- end, ok),
+ RefOld, filelib:file_size(FileOld),
+ fun({Guid, _Size, _Offset, BinMsg}, ok) ->
+ {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ ok
+ end, ok),
file_handle_cache:close(RefOld),
file_handle_cache:close(RefNew),
ok.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 36f61628..877d2cf7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -67,7 +67,7 @@
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+ -> [{inet:ip_address(), ip_port(), family(), atom()}]).
-endif.
@@ -90,15 +90,15 @@ boot_ssl() ->
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(ssl_options),
- % unknown_ca errors are silently ignored prior to R14B unless we
- % supply this verify_fun - remove when at least R14B is required
+ %% unknown_ca errors are silently ignored prior to R14B unless we
+ %% supply this verify_fun - remove when at least R14B is required
SslOpts =
case proplists:get_value(verify, SslOptsConfig, verify_none) of
verify_none -> SslOptsConfig;
verify_peer -> [{verify_fun, fun([]) -> true;
([_|_]) -> false
end}
- | SslOptsConfig]
+ | SslOptsConfig]
end,
[start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners],
ok
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 1917c12c..1f30a2fc 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -77,7 +77,7 @@ handle_cast(_Msg, State) ->
handle_info({nodedown, Node}, State) ->
rabbit_log:info("node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, State};
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
ok = handle_dead_rabbit(Node),
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d9d92788..7bb8c0ea 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -250,13 +250,13 @@ duplicate_node_check(NodeStr) ->
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
- true -> io:format("node with name ~p "
- "already running on ~p~n",
- [NodeName, NodeHost]),
- [io:format(Fmt ++ "~n", Args) ||
- {Fmt, Args} <- rabbit_control:diagnostics(Node)],
- terminate(?ERROR_CODE);
- false -> ok
+ true -> io:format("node with name ~p "
+ "already running on ~p~n",
+ [NodeName, NodeHost]),
+ [io:format(Fmt ++ "~n", Args) ||
+ {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ terminate(?ERROR_CODE);
+ false -> ok
end;
{error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
[EpmdReason])
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76b1136f..00f5a752 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -145,8 +145,8 @@
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUBLISH_RECORD_LENGTH_BYTES +
- (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
+ (?PUBLISH_RECORD_LENGTH_BYTES +
+ (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
%% ---- misc ----
@@ -177,7 +177,7 @@
path :: file:filename(),
journal_entries :: array(),
unacked :: non_neg_integer()
- })).
+ })).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
@@ -188,10 +188,10 @@
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
unsynced_guids :: [rabbit_guid:guid()]
- }).
+ }).
-type(startup_fun_state() ::
{fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
- A}).
+ A}).
-type(shutdown_terms() :: [any()]).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
@@ -272,7 +272,7 @@ publish(Guid, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)]),
+ create_pub_record_body(Guid, MsgProps)]),
maybe_flush_journal(
add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
@@ -666,8 +666,8 @@ recover_journal(State) ->
journal_minus_segment(JEntries, SegEntries),
Segment #segment { journal_entries = JEntries1,
unacked = (UnackedCountInJournal +
- UnackedCountInSeg -
- UnackedCountDuplicates) }
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
end, Segments),
State1 #qistate { segments = Segments1 }.
@@ -799,16 +799,16 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
{Guid, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)])
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
ok;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
+ RelSeq:?REL_SEQ_BITS>>,
file_handle_cache:append(
Hdl, case {Del, Ack} of
{del, ack} -> [Binary, Binary];
@@ -853,14 +853,14 @@ load_segment(KeepAcked, #segment { path = Path }) ->
load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
{Guid, MsgProps} = read_pub_record_body(Hdl),
Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
+ RelSeq:?REL_SEQ_BITS>>} ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b172db56..710e6878 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,7 +37,7 @@
-define(SILENT_CLOSE_DELAY, 3).
-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
-%---------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
@@ -62,7 +62,7 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
-%%----------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -592,14 +592,14 @@ handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
HandleException =
fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, R);
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state, R})
+ end
end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -734,8 +734,7 @@ auth_mechanisms(Sock) ->
auth_mechanisms_binary(Sock) ->
list_to_binary(
- string:join(
- [atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
State = #v1{auth_mechanism = AuthMechanism,
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 53e707f4..f6a1c92f 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -59,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
{routed, QPids};
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
+ immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
@@ -67,7 +67,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
@@ -91,7 +91,7 @@ match_routing_key(SrcName, [RoutingKey]) ->
mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]);
match_routing_key(SrcName, [_|_] = RoutingKeys) ->
Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
- RKey <- RoutingKeys]]),
+ RKey <- RoutingKeys]]),
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = '$2',
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e831ee51..1953b6b8 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -87,8 +87,8 @@ cert_info(F, Cert) ->
find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
- <- lists:flatten(RDNs),
- T == Type] of
+ <- lists:flatten(RDNs),
+ T == Type] of
[{printableString, S}] -> S;
[] -> not_found
end.
@@ -166,7 +166,7 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
true -> S
end;
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
- Min1, Min2, S1, S2, $Z]}) ->
+ Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
format_asn1_value(V) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 0c6250df..88b58166 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -425,35 +425,35 @@ test_content_properties() ->
[{<<"one">>, signedint, 1},
{<<"two">>, signedint, 2}]}]}],
<<
- % property-flags
- 16#8000:16,
+ %% property-flags
+ 16#8000:16,
- % property-list:
+ %% property-list:
- % table
- 117:32, % table length in bytes
+ %% table
+ 117:32, % table length in bytes
- 11,"a signedint", % name
- "I",12345678:32, % type and value
+ 11,"a signedint", % name
+ "I",12345678:32, % type and value
- 9,"a longstr",
- "S",10:32,"yes please",
+ 9,"a longstr",
+ "S",10:32,"yes please",
- 9,"a decimal",
- "D",123,12345678:32,
+ 9,"a decimal",
+ "D",123,12345678:32,
- 11,"a timestamp",
- "T", 123456789012345:64,
+ 11,"a timestamp",
+ "T", 123456789012345:64,
- 14,"a nested table",
- "F",
- 18:32,
+ 14,"a nested table",
+ "F",
+ 18:32,
- 3,"one",
- "I",1:32,
+ 3,"one",
+ "I",1:32,
- 3,"two",
- "I",2:32 >>),
+ 3,"two",
+ "I",2:32 >>),
case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of
{'EXIT', content_properties_binary_overflow} -> passed;
V -> exit({got_success_but_expected_failure, V})
@@ -480,28 +480,28 @@ test_field_values() ->
]}],
<<
- % property-flags
- 16#8000:16,
- % table length in bytes
- 228:32,
-
- 7,"longstr", "S", 21:32, "Here is a long string", % = 34
- 9,"signedint", "I", 12345:32/signed, % + 15 = 49
- 7,"decimal", "D", 3, 123456:32, % + 14 = 63
- 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
- 5,"table", "F", 31:32, % length of table % + 11 = 93
- 3,"one", "I", 54321:32, % + 9 = 102
- 3,"two", "S", 13:32, "A long string",% + 22 = 124
- 4,"byte", "b", 255:8, % + 7 = 131
- 4,"long", "l", 1234567890:64, % + 14 = 145
- 5,"short", "s", 655:16, % + 9 = 154
- 4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
- 4,"void", "V", % + 6 = 194
- 5,"array", "A", 23:32, % + 11 = 205
- "I", 54321:32, % + 5 = 210
- "S", 13:32, "A long string" % + 18 = 228
- >>),
+ %% property-flags
+ 16#8000:16,
+ %% table length in bytes
+ 228:32,
+
+ 7,"longstr", "S", 21:32, "Here is a long string", % = 34
+ 9,"signedint", "I", 12345:32/signed, % + 15 = 49
+ 7,"decimal", "D", 3, 123456:32, % + 14 = 63
+ 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
+ 5,"table", "F", 31:32, % length of table % + 11 = 93
+ 3,"one", "I", 54321:32, % + 9 = 102
+ 3,"two", "S", 13:32, "A long string", % + 22 = 124
+ 4,"byte", "b", 255:8, % + 7 = 131
+ 4,"long", "l", 1234567890:64, % + 14 = 145
+ 5,"short", "s", 655:16, % + 9 = 154
+ 4,"bool", "t", 1, % + 7 = 161
+ 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
+ 4,"void", "V", % + 6 = 194
+ 5,"array", "A", 23:32, % + 11 = 205
+ "I", 54321:32, % + 5 = 210
+ "S", 13:32, "A long string" % + 18 = 228
+ >>),
passed.
%% Test that content frames don't exceed frame-max
@@ -598,65 +598,65 @@ test_topic_matching() ->
%% add some bindings
Bindings = lists:map(
- fun ({Key, Q}) ->
- #binding{source = XName,
- key = list_to_binary(Key),
- destination = #resource{virtual_host = <<"/">>,
- kind = queue,
- name = list_to_binary(Q)}}
- end, [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]),
+ fun ({Key, Q}) ->
+ #binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}}
+ end, [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]),
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
%% test some matches
- test_topic_expect_match(X,
- [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
- "t18", "t20", "t21", "t22", "t23", "t24",
- "t26"]},
- {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
- "t12", "t15", "t21", "t22", "t23", "t24",
- "t26"]},
- {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
- "t18", "t21", "t22", "t23", "t24", "t26"]},
- {"", ["t5", "t6", "t17", "t24"]},
- {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24",
- "t26"]},
- {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23",
- "t24"]},
- {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
- "t24"]},
- {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
- "t24"]},
- {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22",
- "t23", "t24", "t26"]},
- {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
- {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
- "t25"]}]),
+ test_topic_expect_match(
+ X, [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
+ "t18", "t20", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
+ "t12", "t15", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
+ "t18", "t21", "t22", "t23", "t24", "t26"]},
+ {"", ["t5", "t6", "t17", "t24"]},
+ {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23",
+ "t24", "t26"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22",
+ "t23", "t24"]},
+ {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
+ "t24"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21",
+ "t22", "t23", "t24", "t26"]},
+ {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
+ {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25"]}]),
%% remove some bindings
RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
@@ -669,21 +669,21 @@ test_topic_matching() ->
%% test some matches
test_topic_expect_match(X,
- [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
- "t23", "t24", "t26"]},
- {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
- "t22", "t23", "t24", "t26"]},
- {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
- "t23", "t24", "t26"]},
- {"", ["t6", "t17", "t24"]},
- {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
- {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
- {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
- {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
- {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
- "t24", "t26"]},
- {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
@@ -693,23 +693,23 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, ExtraArgs) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
+ fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
test_topic_expect_match(X, List) ->
lists:foreach(
- fun ({Key, Expected}) ->
- BinKey = list_to_binary(Key),
- Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_keys =
+ fun ({Key, Expected}) ->
+ BinKey = list_to_binary(Key),
+ Res = rabbit_exchange_type_topic:route(
+ X, #delivery{message = #basic_message{routing_keys =
[BinKey]}}),
- ExpectedRes = lists:map(
- fun (Q) -> #resource{virtual_host = <<"/">>,
- kind = queue,
- name = list_to_binary(Q)}
- end, Expected),
- true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
- end, List).
+ ExpectedRes = lists:map(
+ fun (Q) -> #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}
+ end, Expected),
+ true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
+ end, List).
test_app_management() ->
%% starting, stopping, status
@@ -818,7 +818,7 @@ test_log_management_during_startup() ->
ok = delete_log_handlers([sasl_report_tty_h]),
ok = case catch control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_tty_no_handlers_test});
+ log_rotation_tty_no_handlers_test});
{error, {cannot_log_to_tty, _, _}} -> ok
end,
@@ -843,8 +843,8 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_no_write_permission_dir_test});
- {error, {cannot_log_to_file, _, _}} -> ok
+ log_rotation_no_write_permission_dir_test});
+ {error, {cannot_log_to_file, _, _}} -> ok
end,
%% start application with logging to a subdirectory which
@@ -854,9 +854,9 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotatation_parent_dirs_test});
+ log_rotatation_parent_dirs_test});
{error, {cannot_log_to_file, _,
- {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
+ {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
end,
ok = set_permissions(TmpDir, 8#00700),
ok = set_permissions(TmpLog, 8#00600),
@@ -876,22 +876,22 @@ test_log_management_during_startup() ->
passed.
test_option_parser() ->
- % command and arguments should just pass through
+ %% command and arguments should just pass through
ok = check_get_options({["mock_command", "arg1", "arg2"], []},
[], ["mock_command", "arg1", "arg2"]),
- % get flags
+ %% get flags
ok = check_get_options(
{["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
[{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
- % get options
+ %% get options
ok = check_get_options(
{["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
[{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
["mock_command", "-foo", "bar"]),
- % shuffled and interleaved arguments and options
+ %% shuffled and interleaved arguments and options
ok = check_get_options(
{["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
[{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
@@ -1143,7 +1143,7 @@ test_server_status() ->
[_|_] = rabbit_binding:list_for_source(
rabbit_misc:r(<<"/">>, exchange, <<"">>)),
[_] = rabbit_binding:list_for_destination(
- rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
[_] = rabbit_binding:list_for_source_and_destination(
rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
@@ -1305,9 +1305,9 @@ test_delegates_async(SecondaryNode) ->
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
- receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
- end
+ receive Msg -> FMsg(Msg)
+ after 1000 -> throw(Throw)
+ end
end.
spawn_responders(Node, Responder, Count) ->
@@ -1318,10 +1318,10 @@ await_response(0) ->
await_response(Count) ->
receive
response -> ok,
- await_response(Count - 1)
+ await_response(Count - 1)
after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ io:format("Async reply not received~n"),
+ throw(timeout)
end.
must_exit(Fun) ->
@@ -1337,7 +1337,7 @@ test_delegates_sync(SecondaryNode) ->
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
- gen_server:reply(From, response)
+ gen_server:reply(From, response)
end),
BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
@@ -1349,7 +1349,7 @@ test_delegates_sync(SecondaryNode) ->
must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun () ->
- delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
@@ -1438,7 +1438,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
throw(failed_to_create_and_kill_queue)
end.
-%---------------------------------------------------------------------
+%%---------------------------------------------------------------------
control_action(Command, Args) ->
control_action(Command, node(), Args, default_options()).
@@ -1953,7 +1953,7 @@ test_queue_index() ->
with_empty_test_queue(
fun (Qi0) ->
{Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD,
- false, Qi0),
+ false, Qi0),
Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1),
Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2),
rabbit_queue_index:flush(Qi3)
@@ -2195,7 +2195,7 @@ check_variable_queue_status(VQ0, Props) ->
variable_queue_wait_for_shuffling_end(VQ) ->
case rabbit_variable_queue:needs_idle_timeout(VQ) of
true -> variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:idle_timeout(VQ));
+ rabbit_variable_queue:idle_timeout(VQ));
false -> VQ
end.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ab2300c0..a11595e5 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,39 +42,39 @@
%% TODO: make this more precise by tying specific class_ids to
%% specific properties
-type(undecoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: 'none',
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]} |
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: rabbit_framing:amqp_property_record(),
- properties_bin :: 'none',
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: 'none',
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]} |
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: 'none',
+ payload_fragments_rev :: [binary()]}).
-type(unencoded_content() :: undecoded_content()).
-type(decoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: rabbit_framing:amqp_property_record(),
- properties_bin :: maybe(binary()),
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: maybe(binary()),
+ payload_fragments_rev :: [binary()]}).
-type(encoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: maybe(rabbit_framing:amqp_property_record()),
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: maybe(rabbit_framing:amqp_property_record()),
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]}).
-type(content() :: undecoded_content() | decoded_content()).
-type(basic_message() ::
- #basic_message{exchange_name :: rabbit_exchange:name(),
- routing_keys :: [rabbit_router:routing_key()],
- content :: content(),
- guid :: rabbit_guid:guid(),
- is_persistent :: boolean()}).
+ #basic_message{exchange_name :: rabbit_exchange:name(),
+ routing_keys :: [rabbit_router:routing_key()],
+ content :: content(),
+ guid :: rabbit_guid:guid(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
- #delivery{mandatory :: boolean(),
- immediate :: boolean(),
- txn :: maybe(txn()),
- sender :: pid(),
- message :: message()}).
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
-type(message_properties() ::
#message_properties{expiry :: pos_integer() | 'undefined',
needs_confirming :: boolean()}).
@@ -89,9 +89,9 @@
-type(infos() :: [info()]).
-type(amqp_error() ::
- #amqp_error{name :: rabbit_framing:amqp_exception(),
- explanation :: string(),
- method :: rabbit_framing:amqp_method_name()}).
+ #amqp_error{name :: rabbit_framing:amqp_exception(),
+ explanation :: string(),
+ method :: rabbit_framing:amqp_method_name()}).
-type(r(Kind) ::
r2(vhost(), Kind)).
@@ -103,34 +103,34 @@
name :: Name}).
-type(listener() ::
- #listener{node :: node(),
- protocol :: atom(),
- host :: rabbit_networking:hostname(),
- port :: rabbit_networking:ip_port()}).
+ #listener{node :: node(),
+ protocol :: atom(),
+ host :: rabbit_networking:hostname(),
+ port :: rabbit_networking:ip_port()}).
-type(binding_source() :: rabbit_exchange:name()).
-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()).
-type(binding() ::
- #binding{source :: rabbit_exchange:name(),
- destination :: binding_destination(),
- key :: rabbit_binding:key(),
- args :: rabbit_framing:amqp_table()}).
+ #binding{source :: rabbit_exchange:name(),
+ destination :: binding_destination(),
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
- #amqqueue{name :: rabbit_amqqueue:name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- exclusive_owner :: rabbit_types:maybe(pid()),
- arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ #amqqueue{name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: rabbit_types:maybe(pid()),
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: rabbit_types:maybe(pid())}).
-type(exchange() ::
- #exchange{name :: rabbit_exchange:name(),
- type :: rabbit_exchange:type(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: rabbit_framing:amqp_table()}).
+ #exchange{name :: rabbit_exchange:name(),
+ type :: rabbit_exchange:type(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ arguments :: rabbit_framing:amqp_table()}).
-type(connection() :: pid()).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 89acc10c..ebda5d03 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -106,9 +106,9 @@ upgrades_to_apply(Heads, G) ->
%% everything we've already applied. Subtract that from all
%% vertices: that's what we have to apply.
Unsorted = sets:to_list(
- sets:subtract(
- sets:from_list(digraph:vertices(G)),
- sets:from_list(digraph_utils:reaching(Heads, G)))),
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
[element(2, digraph:vertex(G, StepName)) ||
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d1307b85..591e5a66 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -268,13 +268,13 @@
msg_on_disk,
index_on_disk,
msg_props
- }).
+ }).
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
end_seq_id %% end_seq_id is exclusive
- }).
+ }).
-record(tx, { pending_messages, pending_acks }).
@@ -510,8 +510,12 @@ publish(Msg, MsgProps, State) ->
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { guid = Guid },
- _MsgProps, State = #vqstate { len = 0 }) ->
- blind_confirm(self(), gb_sets:singleton(Guid)),
+ #message_properties { needs_confirming = NeedsConfirming },
+ State = #vqstate { len = 0 }) ->
+ case NeedsConfirming of
+ true -> blind_confirm(self(), gb_sets:singleton(Guid));
+ false -> ok
+ end,
{undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -540,7 +544,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
- State1.
+ a(State1).
dropwhile1(Pred, State) ->
internal_queue_out(
@@ -627,12 +631,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -772,8 +776,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
+ case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -1388,7 +1392,7 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
{PersistentSeqIdsAcc, GuidsByStore}) ->
@@ -1812,12 +1816,12 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
multiple_routing_keys() ->
transform_storage(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- Guid, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
- (_) -> {error, corrupt_message}
- end),
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ Guid, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
ok.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index efebef06..24c130ed 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -48,15 +48,15 @@ add(VHostPath) ->
ok;
(ok, false) ->
[rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
ok
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index eba86a55..ac3434d2 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -28,7 +28,7 @@
-define(HIBERNATE_AFTER, 5000).
-%%----------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
-ifdef(use_specs).
@@ -69,7 +69,7 @@
-endif.
-%%----------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
@@ -133,7 +133,7 @@ handle_message({inet_reply, _, Status}, _State) ->
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
send_command(W, MethodRecord) ->
W ! {send_command, MethodRecord},
@@ -157,13 +157,13 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
call(Pid, Msg) ->
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),