diff options
author | Simon MacMullen <simon@lshift.net> | 2010-05-26 12:47:51 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-05-26 12:47:51 +0100 |
commit | 47fc35f38d9835542a7365fba7aff1947f787a29 (patch) | |
tree | 1ed23c1a9f2419a3c1752a11ec919c03b8e0e160 | |
parent | 56c8b07c2296ccc67688711f81355486e3adb305 (diff) | |
parent | d2f1abadb3c701e9824c196c88277b59974faefb (diff) | |
download | rabbitmq-server-47fc35f38d9835542a7365fba7aff1947f787a29.tar.gz |
Merge default into 21915.
-rw-r--r-- | Makefile | 10 | ||||
-rw-r--r-- | packaging/macports/Makefile | 2 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 82 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 7 |
5 files changed, 69 insertions, 44 deletions
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ +$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index 0ef7dd5e..4ad4c30b 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ - tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \ d="/tmp/mkportindex.$$$$" ; \ mkdir $$d \ && cd $$d \ diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b4fd9156..a7ca20c8 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -43,7 +43,7 @@ -include("rabbit.hrl"). --record(iv_state, { queue, qname, len, pending_ack }). +-record(iv_state, { queue, qname, durable, len, pending_ack }). -record(tx, { pending_messages, pending_acks, is_persistent }). -ifdef(use_specs). @@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) -> true -> rabbit_persister:queue_content(QName); false -> [] end), - #iv_state { queue = Q, qname = QName, len = queue:len(Q), + #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = queue:len(Q), pending_ack = dict:new() }. terminate(State) -> State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. -delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), +delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), {_PLen, State1} = purge(State), terminate(State1). -purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> +purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( @@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> Acc; ({Msg = #basic_message { guid = Guid }, IsDelivered}, {AckTagsN, PAN}) -> - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} end, {[], dict:new()}, Q), - ok = persist_acks(none, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> - ok = persist_message(none, QName, Msg), +publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - State = #iv_state { qname = QName, len = 0, - pending_ack = PA }) -> - ok = persist_message(none, QName, Msg), - ok = persist_delivery(QName, Msg, false), + State = #iv_state { qname = QName, durable = IsDurable, + len = 0, pending_ack = PA }) -> + ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, +fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable, pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = queue:out(Q), Len1 = Len - 1, - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; - false -> ok = persist_acks(none, QName, [Guid], PA1), + false -> ok = persist_acks(QName, IsDurable, none, + [Guid], PA1), {blank_ack, PA} end, {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. -ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, AckTags, PA), +ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, AckTags, PA), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> +tx_publish(Txn, Msg, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(Txn, QName, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg), State. -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(Txn, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> @@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- -persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> - ok; -persist_message(Txn, QName, Msg) -> +persist_message(QName, true, Txn, Msg = #basic_message { + is_persistent = true }) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties, %% rebuild from properties_bin on restore content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg) -> + ok. -persist_delivery(_QName, #basic_message { is_persistent = false }, - _IsDelivered) -> - ok; -persist_delivery(_QName, _Message, true) -> - ok; -persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). +persist_delivery(QName, true, false, #basic_message { is_persistent = true, + guid = Guid }) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]); +persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> + ok. -persist_acks(Txn, QName, AckTags, PA) -> +persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin {ok, Msg} = dict:find(Guid, PA), Msg #basic_message.is_persistent - end]). + end]); +persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> + ok. persist_work(_Txn,_QName, []) -> ok; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 723b818b..9a911ab1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) -> %% inverse of above string_to_pid(Str) -> + Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, IdStr, SerStr]} -> - %% turn the triple into a pid - see pid_to_string - <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), + %% the NodeStr atom might be quoted, so we have to parse + %% it rather than doing a simple list_to_atom + NodeAtom = case erl_scan:string(NodeStr) of + {ok, [{atom, _, X}], _} -> X; + {error, _, _} -> throw(Err) + end, + <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), Id = list_to_integer(IdStr), Ser = list_to_integer(SerStr), binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); nomatch -> - throw({error, {invalid_pid_syntax, Str}}) + throw(Err) end. version_compare(A, B, lte) -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 3b23daa5..cc4982c9 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), + %% In the event that somebody floods us with connections we can spew + %% the above message at error_logger faster than it can keep up. + %% So error_logger's mailbox grows unbounded until we eat all the + %% memory available and crash. So here's a meaningless synchronous call + %% to the underlying gen_event mechanism - when it returns the mailbox + %% is drained. + gen_event:which_handlers(error_logger), %% handle file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> |