summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile10
-rw-r--r--include/rabbit.hrl3
-rw-r--r--packaging/macports/Makefile2
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl26
-rw-r--r--src/rabbit_channel.erl119
-rw-r--r--src/rabbit_invariable_queue.erl82
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/tcp_acceptor.erl7
10 files changed, 147 insertions, 129 deletions
diff --git a/Makefile b/Makefile
index 3d39ccb0..982780c7 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
+$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index cf33b6fd..0d75310b 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,8 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
+ arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index 0ef7dd5e..4ad4c30b 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile
$(DEST)/files/rabbitmq-script-wrapper
cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
if [ -n "$(MACPORTS_USERHOST)" ] ; then \
- tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \
+ tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \
d="/tmp/mkportindex.$$$$" ; \
mkdir $$d \
&& cd $$d \
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7c2d5581..483b5a93 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -65,8 +65,8 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) ->
- amqqueue()).
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
+ maybe(pid())) -> amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -101,8 +101,7 @@
-spec(basic_consume/7 ::
(amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
- 'ok' | {'error', 'queue_owned_by_another_connection' |
- 'exclusive_consume_unavailable'}).
+ 'ok' | {'error', 'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
@@ -320,7 +319,7 @@ flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
-internal_delete2(QueueName) ->
+internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
%% we want to execute some things, as
@@ -334,7 +333,7 @@ internal_delete(QueueName) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] -> internal_delete2(QueueName)
+ [_] -> internal_delete1(QueueName)
end
end) of
Err = {error, _} -> Err;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 92e2e1c0..27e69498 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -484,8 +484,8 @@ i(pid, _) ->
self();
i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) ->
- ReaderPid;
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ ExclusiveOwner;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -533,7 +533,7 @@ handle_call({init, Recover}, From,
noreply(
State#q{backing_queue_state =
BQ:init(QName, IsDurable, Recover)});
- Q1 ->
+ Q1 ->
{stop, normal, Q1, State}
end
end,
@@ -642,18 +642,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ok ->
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
+ ack_required = not NoAck},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
- end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
+ ok = case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
@@ -817,7 +815,7 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
- State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
+ State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
%% Exclusively owned queues must disappear with their owner.
%% In the case of clean shutdown we delete the queue synchronously in the
%% reader - although not required by the spec this seems to match what
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eeab1fb4..66326396 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -299,13 +299,24 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-exclusive_access_or_locked(ReaderPid, Q) ->
- case Q of
- #amqqueue{ exclusive_owner = none} -> Q;
- #amqqueue{ exclusive_owner = ReaderPid } -> Q;
- _ -> rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)])
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ case rabbit_amqqueue:with_or_die(
+ QName, fun(Q) -> case Q of
+ #amqqueue{exclusive_owner = none} ->
+ F(Q);
+ #amqqueue{exclusive_owner = ReaderPid} ->
+ F(Q);
+ _ ->
+ {error, wrong_exclusive_owner}
+ end
+ end) of
+ {error, wrong_exclusive_owner} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QName)]);
+ Else ->
+ Else
end.
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
@@ -493,12 +504,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- exclusive_access_or_locked(ReaderPid, Q),
- NoAck, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -677,16 +687,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid,
- queue_collector_pid = CollectorPid }) ->
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
true -> ReaderPid;
false -> none
@@ -694,17 +704,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
Finish =
- fun(Q) ->
+ fun(Q = #amqqueue{name = QueueName}) ->
case Q of
%% "equivalent" rule. NB: we don't pay attention to
%% anything in the arguments table, so for the sake of the
%% "equivalent" rule, all tables of arguments are
%% semantically equivalant.
- Matched = #amqqueue{name = QueueName,
- durable = Durable, %% i.e., as supplied
- exclusive_owner = Owner,
- auto_delete = AutoDelete %% i.e,. as supplied
- } ->
+ #amqqueue{exclusive_owner = Owner} ->
check_configure_permitted(QueueName, State),
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
@@ -712,21 +718,17 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% connection shuts down.
case Owner of
none -> ok;
- _ -> rabbit_reader_queue_collector:register_exclusive_queue(
- CollectorPid, Matched)
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(
+ CollectorPid, Q)
end,
- Matched;
+ Q;
%% exclusivity trumps non-equivalence arbitrarily
- #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
- when ExclusiveOwner =/= Owner ->
- rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QueueName)]);
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(channel_error,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end
+ #amqqueue{} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -738,37 +740,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
- Found -> Found
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner));
+ #amqqueue{} = Other ->
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end,
- Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State = #ch{ reader_pid = ReaderPid }) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q),
- IfUnused, IfEmpty)
- end) of
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
@@ -800,15 +798,12 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{ reader_pid = ReaderPid }) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- exclusive_access_or_locked(ReaderPid, Q),
- rabbit_amqqueue:purge(Q)
- end),
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b4fd9156..a7ca20c8 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,7 +43,7 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack }).
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
@@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) ->
true -> rabbit_persister:queue_content(QName);
false -> []
end),
- #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
pending_ack = dict:new() }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(none, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
- ok = persist_message(none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, len = 0,
- pending_ack = PA }) ->
- ok = persist_message(none, QName, Msg),
- ok = persist_delivery(QName, Msg, false),
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(none, QName, [Guid], PA1),
+ false -> ok = persist_acks(QName, IsDurable, none,
+ [Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(Txn, QName, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(Txn, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -228,32 +239,33 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
- ok;
-persist_message(Txn, QName, Msg) ->
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties,
%% rebuild from properties_bin on restore
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
-persist_delivery(_QName, #basic_message { is_persistent = false },
- _IsDelivered) ->
- ok;
-persist_delivery(_QName, _Message, true) ->
- ok;
-persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
-persist_acks(Txn, QName, AckTags, PA) ->
+persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
{ok, Msg} = dict:find(Guid, PA),
Msg #basic_message.is_persistent
- end]).
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 723b818b..9a911ab1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -537,18 +537,24 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
+ Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, IdStr, SerStr]} ->
- %% turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
+ %% the NodeStr atom might be quoted, so we have to parse
+ %% it rather than doing a simple list_to_atom
+ NodeAtom = case erl_scan:string(NodeStr) of
+ {ok, [{atom, _, X}], _} -> X;
+ {error, _, _} -> throw(Err)
+ end,
+ <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
Id = list_to_integer(IdStr),
Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- throw({error, {invalid_pid_syntax, Str}})
+ throw(Err)
end.
version_compare(A, B, lte) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f573ca6f..73a58f13 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,7 +52,7 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
--define(SLEEP_BEFORE_SILENT_CLOSE, 3000).
+-define(SILENT_CLOSE_DELAY, 3).
%---------------------------------------------------------------------------
@@ -590,7 +590,7 @@ handle_method0(MethodName, FieldsBin, State) ->
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SLEEP_BEFORE_SILENT_CLOSE),
+ Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({channel0_error, Other, CompleteReason})
end
end.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 3b23daa5..cc4982c9 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% In the event that somebody floods us with connections we can spew
+ %% the above message at error_logger faster than it can keep up.
+ %% So error_logger's mailbox grows unbounded until we eat all the
+ %% memory available and crash. So here's a meaningless synchronous call
+ %% to the underlying gen_event mechanism - when it returns the mailbox
+ %% is drained.
+ gen_event:which_handlers(error_logger),
%% handle
file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->