summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-06-16 09:16:02 +0100
committerMatthias Radestock <matthias@lshift.net>2009-06-16 09:16:02 +0100
commitd5c5eb0a8368c10e31f86a5158eb072ed5b1068c (patch)
tree10bd827be3b74e53c45b2b986adfe43306fa6417
parent32f0e93b5579b4e13b6532e387c7a69a70a29e4d (diff)
parent45ebf0c2d2a03ef6939ae412196ea0cb000ff251 (diff)
downloadrabbitmq-server-d5c5eb0a8368c10e31f86a5158eb072ed5b1068c.tar.gz
emergency merge of bug20958 into default
further qa is still required
-rw-r--r--Makefile2
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/RPMS/Fedora/Makefile6
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec2
-rw-r--r--packaging/debs/Debian/debian/control4
-rw-r--r--src/rabbit_amqqueue_process.erl268
6 files changed, 142 insertions, 142 deletions
diff --git a/Makefile b/Makefile
index 4ff8573a..367f153a 100644
--- a/Makefile
+++ b/Makefile
@@ -69,7 +69,7 @@ clean: cleandb
rm -f docs/*.[0-9].gz
cleandb: stop-node
- erl -mnesia dir '"$(RABBITMQ_MNESIA_DIR)"' -noshell -eval 'lists:foreach(fun file:delete/1, filelib:wildcard(mnesia:system_info(directory) ++ "/*")), halt().'
+ rm -rf $(RABBITMQ_MNESIA_DIR)/*
############ various tasks to interact with RabbitMQ ###################
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 5be07492..8e1c890e 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -11,6 +11,8 @@
rabbit_sup,
rabbit_tcp_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+%% we also depend on ssl but it shouldn't be in here as we don't
+%% actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{extra_startup_steps, []},
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 9fe91b98..c74d4533 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -13,12 +13,10 @@ endif
ifeq "x$(RPM_OS)" "xsuse"
REQUIRES=/sbin/chkconfig /sbin/service
-OS_DEFINES=--define '_initrddir /etc/init.d'
-RELEASE_OS=.suse
+OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
else
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
-RELEASE_OS=
endif
rpms: clean server
@@ -27,7 +25,7 @@ prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
- sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \
+ sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 3c3be609..875381e8 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -2,7 +2,7 @@
Name: rabbitmq-server
Version: %%VERSION%%
-Release: 1%%RELEASE_OS%%
+Release: 1%{?dist}
License: MPLv1.1
Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 21636072..d4e2cd17 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,12 +2,12 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-simplejson
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson
Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox, erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+Depends: erlang-base | erlang-base-hipe, erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7ffb1c8f..cf0ef44f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,14 +53,15 @@
has_had_consumers,
next_msg_id,
message_buffer,
- round_robin}).
+ active_consumers,
+ blocked_consumers}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
--record(cr, {consumers,
+-record(cr, {consumer_count,
ch_pid,
limiter_pid,
monitor_ref,
@@ -99,7 +100,8 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}, ?HIBERNATE_AFTER}.
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -129,7 +131,7 @@ ch_record(ChPid) ->
case get(Key) of
undefined ->
MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumers = [],
+ C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
@@ -148,7 +150,7 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+ Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
BlockedOld = is_ch_blocked(OldCR),
@@ -165,13 +167,14 @@ record_current_channel_tx(ChPid, Txn) ->
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- case queue:out(RoundRobin) of
+ case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
- RoundRobinTail} ->
+ ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
@@ -187,18 +190,32 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
- NewConsumers =
+ {NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
+ ok -> {queue:in(QEntry, ActiveConsumersTail),
+ BlockedConsumers};
+ block ->
+ {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId + 1}};
+ {offered, AckRequired,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
- NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_immediately(Message, Delivered,
- State#q{round_robin = NewConsumers})
+ {NewActiveConsumers, NewBlockedConsumers} =
+ move_consumers(ChPid,
+ ActiveConsumers,
+ BlockedConsumers),
+ deliver_immediately(
+ Message, Delivered,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end;
{empty, _} ->
{not_offered, State}
@@ -234,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
State).
-block_consumers(ChPid, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(RoundRobin))).
-
-unblock_consumers(ChPid, Consumers, RoundRobin) ->
- %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]),
- queue:join(RoundRobin,
- queue:from_list([{ChPid, Con} || Con <- Consumers])).
+add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-block_consumer(ChPid, ConsumerTag, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]),
+remove_consumer(ChPid, ConsumerTag, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
queue:from_list(lists:filter(
fun ({CP, #consumer{tag = CT}}) ->
(CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(RoundRobin))).
+ end, queue:to_list(Queue))).
+
+remove_consumers(ChPid, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
+ queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(Queue))).
+
+move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(From)),
+ {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -260,50 +279,25 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> NewRR = unblock_consumers(ChPid,
- NewC#cr.consumers,
- State#q.round_robin),
- run_poke_burst(State#q{round_robin = NewRR})
+ unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedeConsumers})
end
end.
-check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
- {continue, State};
-check_auto_delete(State = #q{has_had_consumers = false}) ->
- {continue, State};
-check_auto_delete(State = #q{round_robin = RoundRobin}) ->
- % The clauses above rule out cases where no-one has consumed from
- % this queue yet, and cases where we are not an auto_delete queue
- % in any case. Thus it remains to check whether we have any active
- % listeners at this point.
- case queue:is_empty(RoundRobin) of
- true ->
- % There are no waiting listeners. It's possible that we're
- % completely unused. Check.
- case is_unused() of
- true ->
- % There are no active consumers at this
- % point. This is the signal to autodelete.
- {stop, State};
- false ->
- % There is at least one active consumer, so we
- % shouldn't delete ourselves.
- {continue, State}
- end;
- false ->
- % There are some waiting listeners, thus we are not
- % unused, so can continue life as normal without needing
- % to check the process dictionary.
- {continue, State}
- end.
+should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
+should_auto_delete(#q{has_had_consumers = false}) -> false;
+should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
- round_robin = ActiveConsumers}) ->
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
- NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
case Txn of
@@ -311,20 +305,22 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
_ -> ok = rollback_work(Txn, qname(State)),
erase_tx(Txn)
end,
- case check_auto_delete(
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- round_robin = NewActive})) of
- {continue, NewState} ->
- noreply(NewState);
- {stop, NewState} ->
- {stop, normal, NewState}
+ NewState =
+ deliver_or_enqueue_n(
+ [{Message, true} ||
+ {_Messsage_id, Message} <- dict:to_list(UAM)],
+ State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
+ blocked_consumers = remove_consumers(
+ ChPid, State#q.blocked_consumers)}),
+ case should_auto_delete(NewState) of
+ false -> noreply(NewState);
+ true -> {stop, normal, NewState}
end
end.
@@ -337,12 +333,12 @@ check_queue_owner(none, _) -> ok;
check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
check_queue_owner({_, _}, _) -> mismatch.
-check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) ->
+check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
-check_exclusive_access(none, false) ->
+check_exclusive_access(none, false, _State) ->
ok;
-check_exclusive_access(none, true) ->
- case is_unused() of
+check_exclusive_access(none, true, State) ->
+ case is_unused(State) of
true -> ok;
false -> in_use
end.
@@ -367,16 +363,8 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
-is_unused() ->
- is_unused1(get()).
-
-is_unused1([]) ->
- true;
-is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest])
- when Consumers /= [] ->
- false;
-is_unused1([_ | Rest]) ->
- is_unused1(Rest).
+is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
+ queue:is_empty(State#q.blocked_consumers).
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -535,9 +523,8 @@ i(messages, State) ->
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
-i(consumers, _) ->
- lists:sum([length(Consumers) ||
- #cr{consumers = Consumers} <- all_ch_record()]);
+i(consumers, State) ->
+ queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@@ -619,78 +606,91 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder,
- round_robin = RoundRobin}) ->
+ exclusive_consumer = ExistingHolder}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumers = Consumers} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if Consumers == [] ->
+ if ConsumerCount == 0 ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
- exclusive_consumer =
- if
- ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
+ exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- reply(ok, run_poke_burst(State1))
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_poke_burst(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder,
- round_robin = RoundRobin}) ->
+ State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
- NewConsumers = lists:filter
- (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
- Consumers),
- store_ch_record(C#cr{consumers = NewConsumers}),
- if NewConsumers == [] ->
+ C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
+ store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
+ if ConsumerCount == 1 ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->
ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
- case check_auto_delete(
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- round_robin = block_consumer(ChPid,
- ConsumerTag,
- RoundRobin)}) of
- {continue, State1} ->
- reply(ok, State1);
- {stop, State1} ->
- {stop, normal, ok, State1}
+ NewState =
+ State#q{exclusive_consumer = cancel_holder(ChPid,
+ ConsumerTag,
+ Holder),
+ active_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.active_consumers),
+ blocked_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.blocked_consumers)},
+ case should_auto_delete(NewState) of
+ false -> reply(ok, NewState);
+ true -> {stop, normal, ok, NewState}
end
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
- round_robin = RoundRobin}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
+ active_consumers = ActiveConsumers}) ->
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
+ State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
IsEmpty = queue:is_empty(MessageBuffer),
- IsUnused = is_unused(),
+ IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
@@ -709,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
case Owner of
none ->
- case check_exclusive_access(Holder, true) of
+ case check_exclusive_access(Holder, true, State) of
in_use ->
%% FIXME: Is this really the right answer? What if
%% an active consumer's reader is actually the
@@ -785,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumers = Consumers,
+ fun (C = #cr{consumer_count = ConsumerCount,
limiter_pid = OldLimiterPid,
is_limit_active = Limited}) ->
- if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok