summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-04-29 17:03:53 +0200
committerKarl Nilsson <kjnilsson@gmail.com>2021-05-12 17:12:09 +0100
commit75b11bea5b4c284696b4fe41796e3838579f4112 (patch)
treec7d4c728d2ed30e007452937964c76aef460f9eb
parentacddc0ea9d4595cede41f2835b1ac0a41980a6e6 (diff)
downloadrabbitmq-server-git-75b11bea5b4c284696b4fe41796e3838579f4112.tar.gz
Make init callbacks of queue types return {ok, State}report-coordinator-unavailable-as-amqp-error
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl6
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl4
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl8
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl10
4 files changed, 14 insertions, 14 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index b9e0c0a1c6..ecada2c788 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -144,11 +144,11 @@ stat(Q) ->
delegate:invoke(amqqueue:get_pid(Q),
{gen_server2, call, [stat, infinity]}).
--spec init(amqqueue:amqqueue()) -> state().
+-spec init(amqqueue:amqqueue()) -> {ok, state()}.
init(Q) when ?amqqueue_is_classic(Q) ->
QName = amqqueue:get_name(Q),
- #?STATE{pid = amqqueue:get_pid(Q),
- qref = QName}.
+ {ok, #?STATE{pid = amqqueue:get_pid(Q),
+ qref = QName}}.
-spec close(state()) -> ok.
close(_State) ->
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index a532893be2..5de1831c08 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -140,7 +140,7 @@
%% stateful
%% intitialise and return a queue type specific session context
--callback init(amqqueue:amqqueue()) -> queue_state().
+-callback init(amqqueue:amqqueue()) -> {ok, queue_state()} | {error, Reason :: term()}.
-callback close(queue_state()) -> ok.
%% update the queue type state from amqqrecord
@@ -544,7 +544,7 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
case Mod:init(Q) of
{error, Reason} ->
exit({Reason, Ref});
- QState ->
+ {ok, QState} ->
#ctx{module = Mod,
name = Name,
state = QState}
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 9075ec1df1..14e79694ea 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -113,7 +113,7 @@ is_enabled() ->
%%----------------------------------------------------------------------------
--spec init(amqqueue:amqqueue()) -> rabbit_fifo_client:state().
+-spec init(amqqueue:amqqueue()) -> {ok, rabbit_fifo_client:state()}.
init(Q) when ?is_amqqueue(Q) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
@@ -124,9 +124,9 @@ init(Q) when ?is_amqqueue(Q) ->
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
- rabbit_fifo_client:init(QName, Servers, SoftLimit,
- fun() -> credit_flow:block(Name) end,
- fun() -> credit_flow:unblock(Name), ok end).
+ {ok, rabbit_fifo_client:init(QName, Servers, SoftLimit,
+ fun() -> credit_flow:block(Name) end,
+ fun() -> credit_flow:unblock(Name), ok end)}.
-spec close(rabbit_fifo_client:state()) -> ok.
close(_State) ->
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index b9f0bbb328..1f44dcd6c0 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -636,11 +636,11 @@ init(Q) when ?is_amqqueue(Q) ->
Prefix = erlang:pid_to_list(self()) ++ "_",
WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
{ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
- #stream_client{stream_id = StreamId,
- name = amqqueue:get_name(Q),
- leader = Leader,
- writer_id = WriterId,
- soft_limit = SoftLimit};
+ {ok, #stream_client{stream_id = StreamId,
+ name = amqqueue:get_name(Q),
+ leader = Leader,
+ writer_id = WriterId,
+ soft_limit = SoftLimit}};
{error, coordinator_unavailable} = E ->
rabbit_log:warning("Failed to start stream queue ~p: coordinator unavailable",
[rabbit_misc:rs(QName)]),