diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-04-29 17:03:53 +0200 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-05-12 17:12:09 +0100 |
commit | 75b11bea5b4c284696b4fe41796e3838579f4112 (patch) | |
tree | c7d4c728d2ed30e007452937964c76aef460f9eb | |
parent | acddc0ea9d4595cede41f2835b1ac0a41980a6e6 (diff) | |
download | rabbitmq-server-git-75b11bea5b4c284696b4fe41796e3838579f4112.tar.gz |
Make init callbacks of queue types return {ok, State}report-coordinator-unavailable-as-amqp-error
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue.erl | 6 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 8 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 10 |
4 files changed, 14 insertions, 14 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index b9e0c0a1c6..ecada2c788 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -144,11 +144,11 @@ stat(Q) -> delegate:invoke(amqqueue:get_pid(Q), {gen_server2, call, [stat, infinity]}). --spec init(amqqueue:amqqueue()) -> state(). +-spec init(amqqueue:amqqueue()) -> {ok, state()}. init(Q) when ?amqqueue_is_classic(Q) -> QName = amqqueue:get_name(Q), - #?STATE{pid = amqqueue:get_pid(Q), - qref = QName}. + {ok, #?STATE{pid = amqqueue:get_pid(Q), + qref = QName}}. -spec close(state()) -> ok. close(_State) -> diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index a532893be2..5de1831c08 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -140,7 +140,7 @@ %% stateful %% intitialise and return a queue type specific session context --callback init(amqqueue:amqqueue()) -> queue_state(). +-callback init(amqqueue:amqqueue()) -> {ok, queue_state()} | {error, Reason :: term()}. -callback close(queue_state()) -> ok. %% update the queue type state from amqqrecord @@ -544,7 +544,7 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) case Mod:init(Q) of {error, Reason} -> exit({Reason, Ref}); - QState -> + {ok, QState} -> #ctx{module = Mod, name = Name, state = QState} diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 9075ec1df1..14e79694ea 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -113,7 +113,7 @@ is_enabled() -> %%---------------------------------------------------------------------------- --spec init(amqqueue:amqqueue()) -> rabbit_fifo_client:state(). +-spec init(amqqueue:amqqueue()) -> {ok, rabbit_fifo_client:state()}. init(Q) when ?is_amqqueue(Q) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), %% This lookup could potentially return an {error, not_found}, but we do not @@ -124,9 +124,9 @@ init(Q) when ?is_amqqueue(Q) -> %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], - rabbit_fifo_client:init(QName, Servers, SoftLimit, - fun() -> credit_flow:block(Name) end, - fun() -> credit_flow:unblock(Name), ok end). + {ok, rabbit_fifo_client:init(QName, Servers, SoftLimit, + fun() -> credit_flow:block(Name) end, + fun() -> credit_flow:unblock(Name), ok end)}. -spec close(rabbit_fifo_client:state()) -> ok. close(_State) -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index b9f0bbb328..1f44dcd6c0 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -636,11 +636,11 @@ init(Q) when ?is_amqqueue(Q) -> Prefix = erlang:pid_to_list(self()) ++ "_", WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix), {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit), - #stream_client{stream_id = StreamId, - name = amqqueue:get_name(Q), - leader = Leader, - writer_id = WriterId, - soft_limit = SoftLimit}; + {ok, #stream_client{stream_id = StreamId, + name = amqqueue:get_name(Q), + leader = Leader, + writer_id = WriterId, + soft_limit = SoftLimit}}; {error, coordinator_unavailable} = E -> rabbit_log:warning("Failed to start stream queue ~p: coordinator unavailable", [rabbit_misc:rs(QName)]), |