diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-06-14 15:36:28 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-06-14 15:36:28 +0100 |
commit | fc47e9132cbc30fd61e9c6dcdfbf7c8607d21e16 (patch) | |
tree | ab8b6a8047e033522aaacdd76a135285967e8609 | |
parent | fce9ba707699382fec51f68fc061f8ecaa4283b7 (diff) | |
parent | dafdcc8bb119f93b1b8576a4674ef1632c0685ca (diff) | |
download | rabbitmq-server-fc47e9132cbc30fd61e9c6dcdfbf7c8607d21e16.tar.gz |
Merged bug 21842.
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | Makefile | 9 | ||||
-rw-r--r-- | codegen.py | 102 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 60 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 19 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 10 |
9 files changed, 135 insertions, 98 deletions
@@ -10,6 +10,7 @@ syntax: regexp ^cover/ ^dist/ ^include/rabbit_framing\.hrl$ +^include/rabbit_framing_spec\.hrl$ ^src/rabbit_framing\.erl$ ^src/.*\_usage.erl$ ^rabbit\.plt$ @@ -11,10 +11,10 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include DOCS_DIR=docs -INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl +INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) +TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) @@ -90,6 +90,9 @@ $(EBIN_DIR)/%.beam: $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ +$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py spec $(AMQP_SPEC_JSON_FILES) $@ + $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ @@ -116,7 +119,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -93,6 +93,27 @@ class PackedMethodBitField: def full(self): return self.count() == 8 +def multiLineFormat(things, prologue, separator, lineSeparator, epilogue, thingsPerLine = 4): + r = [prologue] + i = 0 + for t in things: + if i != 0: + if i % thingsPerLine == 0: + r += [lineSeparator] + else: + r += [separator] + r += [t] + i += 1 + r += [epilogue] + return "".join(r) + +def prettyType(typeName, subTypes, typesPerLine = 4): + """Pretty print a type signature made up of many alternative subtypes""" + sTs = multiLineFormat(subTypes, + "( ", " | ", "\n | ", " )", + thingsPerLine = typesPerLine) + return "-type(%s ::\n %s)." % (typeName, sTs) + def printFileHeader(): print """%% Autogenerated code. Do not edit. %% @@ -314,6 +335,22 @@ def genErl(spec): bitvalue(true) -> 1; bitvalue(false) -> 0; bitvalue(undefined) -> 0. + +%% Method signatures +-ifdef(use_specs). +-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()). +-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()). +-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()). +-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()). +-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()). +-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]). +-spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()). +-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()). +-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()). +-spec(encode_properties/1 :: (amqp_method_record()) -> binary()). +-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}). +-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()). +-endif. % use_specs """ for m in methods: genLookupMethodName(m) print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})." @@ -388,12 +425,75 @@ def genHrl(spec): for c in spec.allClasses(): print "-record('P_%s', {%s})." % (erlangize(c.name), fieldNameList(c.fields)) + print "-ifdef(use_specs)." + print "%% Various types" + print prettyType("amqp_method_name()", + [m.erlangName() for m in methods]) + print prettyType("amqp_method()", + ["{%s, %s}" % (m.klass.index, m.index) for m in methods], + 6) + print prettyType("amqp_method_record()", + ["#%s{}" % (m.erlangName()) for m in methods]) + fieldNames = set() + for m in methods: + fieldNames.update(m.arguments) + fieldNames = [erlangize(f.name) for f in fieldNames] + print prettyType("amqp_method_field_name()", + fieldNames) + print prettyType("amqp_property_record()", + ["#'P_%s'{}" % erlangize(c.name) for c in spec.allClasses()]) + print prettyType("amqp_exception()", + ["'%s'" % erlangConstantName(c).lower() for (c, v, cls) in spec.constants]) + print prettyType("amqp_exception_code()", + ["%i" % v for (c, v, cls) in spec.constants]) + print "-endif. % use_specs" + +def genSpec(spec): + methods = spec.allMethods() + + printFileHeader() + print """% Hard-coded types +-type(amqp_field_type() :: + 'longstr' | 'signedint' | 'decimal' | 'timestamp' | + 'table' | 'byte' | 'double' | 'float' | 'long' | + 'short' | 'bool' | 'binary' | 'void'). +-type(amqp_property_type() :: + 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | + 'longlongint' | 'timestamp' | 'bit' | 'table'). +%% we could make this more precise but ultimately are limited by +%% dialyzer's lack of support for recursive types +-type(amqp_table() :: [{binary(), amqp_field_type(), any()}]). +%% TODO: make this more precise +-type(amqp_properties() :: tuple()). + +-type(channel_number() :: non_neg_integer()). +-type(resource_name() :: binary()). +-type(routing_key() :: binary()). +-type(username() :: binary()). +-type(password() :: binary()). +-type(vhost() :: binary()). +-type(ctag() :: binary()). +-type(exchange_type() :: atom()). +-type(binding_key() :: binary()). +""" + print "% Auto-generated types" + classIds = set() + for m in spec.allMethods(): + classIds.add(m.klass.index) + print prettyType("amqp_class_id()", + ["%i" % ci for ci in classIds]) + def generateErl(specPath): genErl(AmqpSpec(specPath)) def generateHrl(specPath): genHrl(AmqpSpec(specPath)) +def generateSpec(specPath): + genSpec(AmqpSpec(specPath)) + if __name__ == "__main__": - do_main(generateHrl, generateErl) + do_main_dict({"header": generateHrl, + "spec": generateSpec, + "body": generateErl}) diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl deleted file mode 100644 index 1a979899..00000000 --- a/include/rabbit_framing_spec.hrl +++ /dev/null @@ -1,60 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - -%% TODO: much of this should be generated - --type(amqp_field_type() :: - 'longstr' | 'signedint' | 'decimal' | 'timestamp' | - 'table' | 'byte' | 'double' | 'float' | 'long' | - 'short' | 'bool' | 'binary' | 'void'). --type(amqp_property_type() :: - 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | - 'longlongint' | 'timestamp' | 'bit' | 'table'). -%% we could make this more precise but ultimately are limited by -%% dialyzer's lack of support for recursive types --type(amqp_table() :: [{binary(), amqp_field_type(), any()}]). -%% TODO: make this more precise --type(amqp_class_id() :: non_neg_integer()). -%% TODO: make this more precise --type(amqp_properties() :: tuple()). -%% TODO: make this more precise --type(amqp_method() :: tuple()). -%% TODO: make this more precise --type(amqp_method_name() :: atom()). --type(channel_number() :: non_neg_integer()). --type(resource_name() :: binary()). --type(routing_key() :: binary()). --type(username() :: binary()). --type(password() :: binary()). --type(vhost() :: binary()). --type(ctag() :: binary()). --type(exchange_type() :: atom()). --type(binding_key() :: binary()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 76488255..3c9c41bd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -268,7 +268,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_cast(QPid, {requeue, MsgIds, ChPid}). + delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). @@ -398,9 +398,6 @@ delegate_pcall(Pid, Pri, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - delegate_pcast(Pid, Pri, Msg) -> delegate:invoke_no_result(Pid, fun (P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3283cb66..5fdf0ffa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,6 +716,19 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); +handle_call({requeue, AckTags, ChPid}, From, State) -> + gen_server2:reply(From, ok), + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) + end; + handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). @@ -743,18 +756,6 @@ handle_cast({ack, Txn, AckTags, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, AckTags, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 27a1275a..81cf3cee 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -57,7 +57,7 @@ -type(frame() :: [binary()]). -spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method()) -> frame()). + (channel_number(), amqp_method_record()) -> frame()). -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 387064bd..1ab34f86 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -75,8 +75,8 @@ -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). --spec(do/2 :: (pid(), amqp_method()) -> 'ok'). --spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). +-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). @@ -463,13 +463,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> - if DeliveryTag >= NextDeliveryTag -> - rabbit_misc:protocol_error( - command_invalid, "unknown delivery tag ~w", [DeliveryTag]); - true -> ok - end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(TxnKey, Acked), {noreply, case TxnKey of @@ -531,9 +525,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Other -> Other end, - %% In order to ensure that the consume_ok gets sent before - %% any messages are sent to the consumer, we get the queue - %% process to send the consume_ok on our behalf. + %% We get the queue process to send the consume_ok on our + %% behalf. This is for symmetry with basic.cancel - see + %% the comment in that method for why. case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> @@ -980,7 +974,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - {ToAcc, PrefixAcc} + rabbit_misc:protocol_error( + not_found, "unknown delivery tag ~w", [DeliveryTag]) end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 54c60f5b..3d10dc12 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -50,17 +50,17 @@ -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). -spec(send_command_and_signal_back/4 :: (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: - (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). + (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). -spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method()) -> 'ok'). + (socket(), channel_number(), amqp_method_record()) -> 'ok'). -spec(internal_send_command/5 :: - (socket(), channel_number(), amqp_method(), + (socket(), channel_number(), amqp_method_record(), content(), non_neg_integer()) -> 'ok'). -endif. |