summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-06-14 15:36:28 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-06-14 15:36:28 +0100
commitfc47e9132cbc30fd61e9c6dcdfbf7c8607d21e16 (patch)
treeab8b6a8047e033522aaacdd76a135285967e8609
parentfce9ba707699382fec51f68fc061f8ecaa4283b7 (diff)
parentdafdcc8bb119f93b1b8576a4674ef1632c0685ca (diff)
downloadrabbitmq-server-fc47e9132cbc30fd61e9c6dcdfbf7c8607d21e16.tar.gz
Merged bug 21842.
-rw-r--r--.hgignore1
-rw-r--r--Makefile9
-rw-r--r--codegen.py102
-rw-r--r--include/rabbit_framing_spec.hrl60
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_channel.erl19
-rw-r--r--src/rabbit_writer.erl10
9 files changed, 135 insertions, 98 deletions
diff --git a/.hgignore b/.hgignore
index caaa3ace..7b796b66 100644
--- a/.hgignore
+++ b/.hgignore
@@ -10,6 +10,7 @@ syntax: regexp
^cover/
^dist/
^include/rabbit_framing\.hrl$
+^include/rabbit_framing_spec\.hrl$
^src/rabbit_framing\.erl$
^src/.*\_usage.erl$
^rabbit\.plt$
diff --git a/Makefile b/Makefile
index 5eb030a1..725f20a6 100644
--- a/Makefile
+++ b/Makefile
@@ -11,10 +11,10 @@ SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
DOCS_DIR=docs
-INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
+INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
-TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
+TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
@@ -90,6 +90,9 @@ $(EBIN_DIR)/%.beam:
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
$(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
+$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py spec $(AMQP_SPEC_JSON_FILES) $@
+
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
$(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
@@ -116,7 +119,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
- rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
+ rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
diff --git a/codegen.py b/codegen.py
index 91c70e81..0d6d9d56 100644
--- a/codegen.py
+++ b/codegen.py
@@ -93,6 +93,27 @@ class PackedMethodBitField:
def full(self):
return self.count() == 8
+def multiLineFormat(things, prologue, separator, lineSeparator, epilogue, thingsPerLine = 4):
+ r = [prologue]
+ i = 0
+ for t in things:
+ if i != 0:
+ if i % thingsPerLine == 0:
+ r += [lineSeparator]
+ else:
+ r += [separator]
+ r += [t]
+ i += 1
+ r += [epilogue]
+ return "".join(r)
+
+def prettyType(typeName, subTypes, typesPerLine = 4):
+ """Pretty print a type signature made up of many alternative subtypes"""
+ sTs = multiLineFormat(subTypes,
+ "( ", " | ", "\n | ", " )",
+ thingsPerLine = typesPerLine)
+ return "-type(%s ::\n %s)." % (typeName, sTs)
+
def printFileHeader():
print """%% Autogenerated code. Do not edit.
%%
@@ -314,6 +335,22 @@ def genErl(spec):
bitvalue(true) -> 1;
bitvalue(false) -> 0;
bitvalue(undefined) -> 0.
+
+%% Method signatures
+-ifdef(use_specs).
+-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()).
+-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()).
+-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
+-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
+-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()).
+-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]).
+-spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()).
+-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()).
+-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()).
+-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
+-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}).
+-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()).
+-endif. % use_specs
"""
for m in methods: genLookupMethodName(m)
print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})."
@@ -388,12 +425,75 @@ def genHrl(spec):
for c in spec.allClasses():
print "-record('P_%s', {%s})." % (erlangize(c.name), fieldNameList(c.fields))
+ print "-ifdef(use_specs)."
+ print "%% Various types"
+ print prettyType("amqp_method_name()",
+ [m.erlangName() for m in methods])
+ print prettyType("amqp_method()",
+ ["{%s, %s}" % (m.klass.index, m.index) for m in methods],
+ 6)
+ print prettyType("amqp_method_record()",
+ ["#%s{}" % (m.erlangName()) for m in methods])
+ fieldNames = set()
+ for m in methods:
+ fieldNames.update(m.arguments)
+ fieldNames = [erlangize(f.name) for f in fieldNames]
+ print prettyType("amqp_method_field_name()",
+ fieldNames)
+ print prettyType("amqp_property_record()",
+ ["#'P_%s'{}" % erlangize(c.name) for c in spec.allClasses()])
+ print prettyType("amqp_exception()",
+ ["'%s'" % erlangConstantName(c).lower() for (c, v, cls) in spec.constants])
+ print prettyType("amqp_exception_code()",
+ ["%i" % v for (c, v, cls) in spec.constants])
+ print "-endif. % use_specs"
+
+def genSpec(spec):
+ methods = spec.allMethods()
+
+ printFileHeader()
+ print """% Hard-coded types
+-type(amqp_field_type() ::
+ 'longstr' | 'signedint' | 'decimal' | 'timestamp' |
+ 'table' | 'byte' | 'double' | 'float' | 'long' |
+ 'short' | 'bool' | 'binary' | 'void').
+-type(amqp_property_type() ::
+ 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
+ 'longlongint' | 'timestamp' | 'bit' | 'table').
+%% we could make this more precise but ultimately are limited by
+%% dialyzer's lack of support for recursive types
+-type(amqp_table() :: [{binary(), amqp_field_type(), any()}]).
+%% TODO: make this more precise
+-type(amqp_properties() :: tuple()).
+
+-type(channel_number() :: non_neg_integer()).
+-type(resource_name() :: binary()).
+-type(routing_key() :: binary()).
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(vhost() :: binary()).
+-type(ctag() :: binary()).
+-type(exchange_type() :: atom()).
+-type(binding_key() :: binary()).
+"""
+ print "% Auto-generated types"
+ classIds = set()
+ for m in spec.allMethods():
+ classIds.add(m.klass.index)
+ print prettyType("amqp_class_id()",
+ ["%i" % ci for ci in classIds])
+
def generateErl(specPath):
genErl(AmqpSpec(specPath))
def generateHrl(specPath):
genHrl(AmqpSpec(specPath))
+def generateSpec(specPath):
+ genSpec(AmqpSpec(specPath))
+
if __name__ == "__main__":
- do_main(generateHrl, generateErl)
+ do_main_dict({"header": generateHrl,
+ "spec": generateSpec,
+ "body": generateErl})
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
deleted file mode 100644
index 1a979899..00000000
--- a/include/rabbit_framing_spec.hrl
+++ /dev/null
@@ -1,60 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
-%% TODO: much of this should be generated
-
--type(amqp_field_type() ::
- 'longstr' | 'signedint' | 'decimal' | 'timestamp' |
- 'table' | 'byte' | 'double' | 'float' | 'long' |
- 'short' | 'bool' | 'binary' | 'void').
--type(amqp_property_type() ::
- 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
- 'longlongint' | 'timestamp' | 'bit' | 'table').
-%% we could make this more precise but ultimately are limited by
-%% dialyzer's lack of support for recursive types
--type(amqp_table() :: [{binary(), amqp_field_type(), any()}]).
-%% TODO: make this more precise
--type(amqp_class_id() :: non_neg_integer()).
-%% TODO: make this more precise
--type(amqp_properties() :: tuple()).
-%% TODO: make this more precise
--type(amqp_method() :: tuple()).
-%% TODO: make this more precise
--type(amqp_method_name() :: atom()).
--type(channel_number() :: non_neg_integer()).
--type(resource_name() :: binary()).
--type(routing_key() :: binary()).
--type(username() :: binary()).
--type(password() :: binary()).
--type(vhost() :: binary()).
--type(ctag() :: binary()).
--type(exchange_type() :: atom()).
--type(binding_key() :: binary()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 76488255..3c9c41bd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -268,7 +268,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
@@ -398,9 +398,6 @@ delegate_pcall(Pid, Pri, Msg, Timeout) ->
delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
-
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3283cb66..5fdf0ffa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,6 +716,19 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
+handle_call({requeue, AckTags, ChPid}, From, State) ->
+ gen_server2:reply(From, ok),
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
+ end;
+
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -743,18 +756,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, AckTags, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 27a1275a..81cf3cee 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -57,7 +57,7 @@
-type(frame() :: [binary()]).
-spec(build_simple_method_frame/2 ::
- (channel_number(), amqp_method()) -> frame()).
+ (channel_number(), amqp_method_record()) -> frame()).
-spec(build_simple_content_frames/3 ::
(channel_number(), content(), non_neg_integer()) -> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 387064bd..1ab34f86 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -75,8 +75,8 @@
-spec(start_link/6 ::
(channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
--spec(do/2 :: (pid(), amqp_method()) -> 'ok').
--spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
+-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok').
+-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
@@ -463,13 +463,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
- next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
- if DeliveryTag >= NextDeliveryTag ->
- rabbit_misc:protocol_error(
- command_invalid, "unknown delivery tag ~w", [DeliveryTag]);
- true -> ok
- end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
@@ -531,9 +525,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Other -> Other
end,
- %% In order to ensure that the consume_ok gets sent before
- %% any messages are sent to the consumer, we get the queue
- %% process to send the consume_ok on our behalf.
+ %% We get the queue process to send the consume_ok on our
+ %% behalf. This is for symmetry with basic.cancel - see
+ %% the comment in that method for why.
case with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
@@ -980,7 +974,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- {ToAcc, PrefixAcc}
+ rabbit_misc:protocol_error(
+ not_found, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 54c60f5b..3d10dc12 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -50,17 +50,17 @@
-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
+-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
+-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
-spec(send_command_and_signal_back/4 ::
(pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
- (pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
+ (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok').
-spec(internal_send_command/3 ::
- (socket(), channel_number(), amqp_method()) -> 'ok').
+ (socket(), channel_number(), amqp_method_record()) -> 'ok').
-spec(internal_send_command/5 ::
- (socket(), channel_number(), amqp_method(),
+ (socket(), channel_number(), amqp_method_record(),
content(), non_neg_integer()) -> 'ok').
-endif.