summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-04-25 18:12:59 +0300
committerGitHub <noreply@github.com>2019-04-25 18:12:59 +0300
commit84cf22a44bee8c6e27b6216374c2d52ff20e87f8 (patch)
tree63774c5532ed56197823c7846ece21b9c25c4f1f
parent0d67fc1007f38a49b8e561bbd4db4ffba1666bd1 (diff)
parenta359b43be83666346c1f878dc1421c3fc275022e (diff)
downloadrabbitmq-server-git-84cf22a44bee8c6e27b6216374c2d52ff20e87f8.tar.gz
Merge pull request #1988 from rabbitmq/rabbitmq-server-1904
Track messages that were not routed anywhere and also not published as mandatory
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl2
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl6
4 files changed, 16 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ff6186ad84..69add85057 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2111,8 +2111,9 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
- []}, State) -> %% optimisation
+ _RoutedToQs = []}, State) -> %% optimisation
?INCR_STATS(exchange_stats, XName, 1, publish, State),
+ ?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
@@ -2165,9 +2166,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
end,
State2#ch{queue_states = QueueStates}.
-process_routing_mandatory(true, [], Msg, State) ->
+process_routing_mandatory(_Mandatory = true,
+ _RoutedToQs = [],
+ Msg, State) ->
ok = basic_return(Msg, State, no_route),
ok;
+process_routing_mandatory(_Mandatory = false,
+ _RoutedToQs = [],
+ #basic_message{exchange_name = ExchangeName}, State) ->
+ ?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
+ ok;
process_routing_mandatory(_, _, _, _) ->
ok.
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index f4980aec7d..99ad8eef34 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -157,7 +157,7 @@ gc_process_and_entity(Table, GbSet) ->
ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _, _}, none)
when Table == channel_queue_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
- ({{Pid, Id} = Key, _, _, _, _}, none)
+ ({{Pid, Id} = Key, _, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index ea6e973ca2..c44b799caa 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -177,6 +177,8 @@ channel_metrics(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}),
amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>},
#amqp_msg{payload = <<"hello">>}),
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"won't route $ยข% anywhere">>},
+ #amqp_msg{payload = <<"hello">>}),
{#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>,
no_ack=true}),
timer:sleep(150),
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index 56b645c98e..ed64fcf1c5 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -510,7 +510,7 @@ channel_statistics1(_Config) ->
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 0}] = ets:lookup(
channel_queue_metrics,
{Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[{{Ch, {QRes, X}}, 1, 0}] = ets:lookup(
@@ -525,7 +525,7 @@ channel_statistics1(_Config) ->
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 1}] = ets:lookup(
channel_queue_metrics,
{Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[{{Ch, {QRes, X}}, 1, 1}] = ets:lookup(
@@ -538,7 +538,7 @@ channel_statistics1(_Config) ->
force_metric_gc(),
Check4 = fun() ->
[] = ets:lookup(channel_queue_metrics, {Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[] = ets:lookup(channel_queue_exchange_metrics,