diff options
-rw-r--r-- | Makefile | 10 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | packaging/macports/Makefile | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 26 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 119 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 82 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 4 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 7 |
10 files changed, 147 insertions, 129 deletions
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ +$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ diff --git a/include/rabbit.hrl b/include/rabbit.hrl index cf33b6fd..0d75310b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,8 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, + arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index 0ef7dd5e..4ad4c30b 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ - tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \ d="/tmp/mkportindex.$$$$" ; \ mkdir $$d \ && cd $$d \ diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c2d5581..483b5a93 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -65,8 +65,8 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) -> - amqqueue()). +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), + maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -101,8 +101,7 @@ -spec(basic_consume/7 :: (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> - 'ok' | {'error', 'queue_owned_by_another_connection' | - 'exclusive_consume_unavailable'}). + 'ok' | {'error', 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). @@ -320,7 +319,7 @@ flush_all(QPids, ChPid) -> delegate:invoke_no_result( QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). -internal_delete2(QueueName) -> +internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), %% we want to execute some things, as @@ -334,7 +333,7 @@ internal_delete(QueueName) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [_] -> internal_delete2(QueueName) + [_] -> internal_delete1(QueueName) end end) of Err = {error, _} -> Err; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 92e2e1c0..27e69498 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -484,8 +484,8 @@ i(pid, _) -> self(); i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; -i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) -> - ReaderPid; +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> + ExclusiveOwner; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> @@ -533,7 +533,7 @@ handle_call({init, Recover}, From, noreply( State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)}); - Q1 -> + Q1 -> {stop, normal, Q1, State} end end, @@ -642,18 +642,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ok -> C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, + ack_required = not NoAck}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok - end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, + ok = case ConsumerCount of + 0 -> rabbit_limiter:register(LimiterPid, self()); + _ -> ok + end, + ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), @@ -817,7 +815,7 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, - State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) -> + State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> %% Exclusively owned queues must disappear with their owner. %% In the case of clean shutdown we delete the queue synchronously in the %% reader - although not required by the spec this seems to match what diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eeab1fb4..66326396 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -299,13 +299,24 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -exclusive_access_or_locked(ReaderPid, Q) -> - case Q of - #amqqueue{ exclusive_owner = none} -> Q; - #amqqueue{ exclusive_owner = ReaderPid } -> Q; - _ -> rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]) +with_exclusive_access_or_die(QName, ReaderPid, F) -> + case rabbit_amqqueue:with_or_die( + QName, fun(Q) -> case Q of + #amqqueue{exclusive_owner = none} -> + F(Q); + #amqqueue{exclusive_owner = ReaderPid} -> + F(Q); + _ -> + {error, wrong_exclusive_owner} + end + end) of + {error, wrong_exclusive_owner} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QName)]); + Else -> + Else end. expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> @@ -493,12 +504,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( - exclusive_access_or_locked(ReaderPid, Q), - NoAck, self(), LimiterPid, + Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -677,16 +687,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, arguments = Args}, - _, State = #ch { virtual_host = VHostPath, - reader_pid = ReaderPid, - queue_collector_pid = CollectorPid }) -> + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ReaderPid; false -> none @@ -694,17 +704,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% We use this in both branches, because queue_declare may yet return an %% existing queue. Finish = - fun(Q) -> + fun(Q = #amqqueue{name = QueueName}) -> case Q of %% "equivalent" rule. NB: we don't pay attention to %% anything in the arguments table, so for the sake of the %% "equivalent" rule, all tables of arguments are %% semantically equivalant. - Matched = #amqqueue{name = QueueName, - durable = Durable, %% i.e., as supplied - exclusive_owner = Owner, - auto_delete = AutoDelete %% i.e,. as supplied - } -> + #amqqueue{exclusive_owner = Owner} -> check_configure_permitted(QueueName, State), %% We need to notify the reader within the channel %% process so that we can be sure there are no @@ -712,21 +718,17 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% connection shuts down. case Owner of none -> ok; - _ -> rabbit_reader_queue_collector:register_exclusive_queue( - CollectorPid, Matched) + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue( + CollectorPid, Q) end, - Matched; + Q; %% exclusivity trumps non-equivalence arbitrarily - #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner} - when ExclusiveOwner =/= Owner -> - rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QueueName)]); - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error(channel_error, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end + #amqqueue{} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -738,37 +740,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); - Found -> Found + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner)); + #amqqueue{} = Other -> + Other end, return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.declare'{queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, - nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath, - reader_pid = ReaderPid }) -> + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end, - Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), + Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, - nowait = NoWait - }, - _, State = #ch{ reader_pid = ReaderPid }) -> + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q), - IfUnused, IfEmpty) - end) of + case with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); @@ -800,15 +798,12 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid }) -> + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - exclusive_access_or_locked(ReaderPid, Q), - rabbit_amqqueue:purge(Q) - end), + {ok, PurgedMessageCount} = with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b4fd9156..a7ca20c8 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -43,7 +43,7 @@ -include("rabbit.hrl"). --record(iv_state, { queue, qname, len, pending_ack }). +-record(iv_state, { queue, qname, durable, len, pending_ack }). -record(tx, { pending_messages, pending_acks, is_persistent }). -ifdef(use_specs). @@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) -> true -> rabbit_persister:queue_content(QName); false -> [] end), - #iv_state { queue = Q, qname = QName, len = queue:len(Q), + #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = queue:len(Q), pending_ack = dict:new() }. terminate(State) -> State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. -delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), +delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), {_PLen, State1} = purge(State), terminate(State1). -purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> +purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( @@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> Acc; ({Msg = #basic_message { guid = Guid }, IsDelivered}, {AckTagsN, PAN}) -> - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} end, {[], dict:new()}, Q), - ok = persist_acks(none, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> - ok = persist_message(none, QName, Msg), +publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - State = #iv_state { qname = QName, len = 0, - pending_ack = PA }) -> - ok = persist_message(none, QName, Msg), - ok = persist_delivery(QName, Msg, false), + State = #iv_state { qname = QName, durable = IsDurable, + len = 0, pending_ack = PA }) -> + ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, +fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable, pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = queue:out(Q), Len1 = Len - 1, - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; - false -> ok = persist_acks(none, QName, [Guid], PA1), + false -> ok = persist_acks(QName, IsDurable, none, + [Guid], PA1), {blank_ack, PA} end, {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. -ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, AckTags, PA), +ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, AckTags, PA), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> +tx_publish(Txn, Msg, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(Txn, QName, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg), State. -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(Txn, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> @@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- -persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> - ok; -persist_message(Txn, QName, Msg) -> +persist_message(QName, true, Txn, Msg = #basic_message { + is_persistent = true }) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties, %% rebuild from properties_bin on restore content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg) -> + ok. -persist_delivery(_QName, #basic_message { is_persistent = false }, - _IsDelivered) -> - ok; -persist_delivery(_QName, _Message, true) -> - ok; -persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). +persist_delivery(QName, true, false, #basic_message { is_persistent = true, + guid = Guid }) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]); +persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> + ok. -persist_acks(Txn, QName, AckTags, PA) -> +persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin {ok, Msg} = dict:find(Guid, PA), Msg #basic_message.is_persistent - end]). + end]); +persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> + ok. persist_work(_Txn,_QName, []) -> ok; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 723b818b..9a911ab1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) -> %% inverse of above string_to_pid(Str) -> + Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, IdStr, SerStr]} -> - %% turn the triple into a pid - see pid_to_string - <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), + %% the NodeStr atom might be quoted, so we have to parse + %% it rather than doing a simple list_to_atom + NodeAtom = case erl_scan:string(NodeStr) of + {ok, [{atom, _, X}], _} -> X; + {error, _, _} -> throw(Err) + end, + <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), Id = list_to_integer(IdStr), Ser = list_to_integer(SerStr), binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); nomatch -> - throw({error, {invalid_pid_syntax, Str}}) + throw(Err) end. version_compare(A, B, lte) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f573ca6f..73a58f13 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -52,7 +52,7 @@ -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). --define(SLEEP_BEFORE_SILENT_CLOSE, 3000). +-define(SILENT_CLOSE_DELAY, 3). %--------------------------------------------------------------------------- @@ -590,7 +590,7 @@ handle_method0(MethodName, FieldsBin, State) -> %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. - Other -> timer:sleep(?SLEEP_BEFORE_SILENT_CLOSE), + Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), throw({channel0_error, Other, CompleteReason}) end end. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 3b23daa5..cc4982c9 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), + %% In the event that somebody floods us with connections we can spew + %% the above message at error_logger faster than it can keep up. + %% So error_logger's mailbox grows unbounded until we eat all the + %% memory available and crash. So here's a meaningless synchronous call + %% to the underlying gen_event mechanism - when it returns the mailbox + %% is drained. + gen_event:which_handlers(error_logger), %% handle file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> |