From b40dbcd46d002a044896783d47faa58ddab9dad2 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Fri, 7 Jan 2011 01:26:41 +0000 Subject: Generate API functions corresponding to most synchronous methods There's a lot of boilerplate code in amqp_api.c for API functions corresponding to synchronous AMQP methods. And some of the functions that should be there are missing (e.g. basic.qos, basic.cancel). Instead, we can generate these functions from the protocol definition, plus a little information to describe where the function arguments do not reflect all of the method fields. --- librabbitmq/amqp.h | 62 ------------- librabbitmq/amqp_api.c | 232 ------------------------------------------------- librabbitmq/codegen.py | 119 +++++++++++++++++++++++-- 3 files changed, 112 insertions(+), 301 deletions(-) (limited to 'librabbitmq') diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 1541583..144dc83 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -326,10 +326,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, int heartbeat, amqp_sasl_method_enum sasl_method, ...); -RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open( - amqp_connection_state_t state, - amqp_channel_t channel); - struct amqp_basic_properties_t_; RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, @@ -348,58 +344,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close( amqp_connection_state_t state, int code); -RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty); - -RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter); - RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, @@ -410,12 +354,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_bytes_t queue, amqp_boolean_t no_ack); -RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait); - RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select( amqp_connection_state_t state, amqp_channel_t channel); diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index bf19761..456987d 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -119,23 +119,6 @@ const amqp_array_t amqp_empty_array = { 0, NULL }; ? (replytype *) state->most_recent_api_result.reply.decoded \ : NULL) -amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0}; - amqp_channel_open_t req; - req.out_of_band.bytes = NULL; - req.out_of_band.len = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_CHANNEL_OPEN_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, @@ -238,162 +221,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, replies, &req); } -amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0}; - amqp_exchange_declare_t req; - req.exchange = exchange; - req.type = type; - req.passive = passive; - req.durable = durable; - req.auto_delete = 0; - req.internal = 0; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_EXCHANGE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0}; - amqp_queue_declare_t req; - req.queue = queue; - req.passive = passive; - req.durable = durable; - req.exclusive = exclusive; - req.auto_delete = auto_delete; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0}; - amqp_queue_delete_t req; - req.queue = queue; - req.if_unused = if_unused; - req.if_empty = if_empty; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DELETE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0}; - amqp_queue_bind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_BIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0}; - amqp_queue_unbind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_UNBIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter) -{ - amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0}; - amqp_basic_consume_t req; - req.ticket = 0; - req.queue = queue; - req.consumer_tag = consumer_tag; - req.no_local = no_local; - req.no_ack = no_ack; - req.exclusive = exclusive; - req.nowait = 0; - req.arguments = filter; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_CONSUME_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, @@ -405,26 +232,6 @@ int amqp_basic_ack(amqp_connection_state_t state, return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } -amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0}; - amqp_queue_purge_t req; - req.ticket = 0; - req.queue = queue; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_PURGE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, @@ -444,45 +251,6 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, return state->most_recent_api_result; } -amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_SELECT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_COMMIT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_ROLLBACK_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 6fd149e..cded64a 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -112,7 +112,7 @@ class BitEncoder(object): self.emitter.emit(line) def encode_bit(self, value): - """Generate code to ebcode a value of the AMQP bit type from + """Generate code to encode a value of the AMQP bit type from the given value.""" if self.bit == 0: self.emitter.emit("bit_buffer = 0;") @@ -138,6 +138,8 @@ class SimpleType(object): def encode(self, emitter, value): emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value)) + def literal(self, value): + return value class StrType(object): """The AMQP shortstr or longstr types.""" @@ -159,6 +161,11 @@ class StrType(object): emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,)) emitter.emit(" return -ERROR_BAD_AMQP_DATA;") + def literal(self, value): + if value != '': + raise NotImplementedError() + + return "amqp_empty_bytes" class BitType(object): """The AMQP bit type.""" @@ -172,6 +179,8 @@ class BitType(object): def encode(self, emitter, value): emitter.encode_bit(value) + def literal(self, value): + return {True: 1, False: 0}[value] class TableType(object): """The AMQP table type.""" @@ -191,6 +200,8 @@ class TableType(object): emitter.emit(" if (res < 0) return res;") emitter.emit("}") + def literal(self, value): + raise NotImplementedError() types = { 'octet': SimpleType(8), @@ -213,11 +224,54 @@ def c_ize(s): s = s.replace(' ', '_') return s +# When generating API functions corresponding to synchronous methods, +# we need some information that isn't in the protocol def: Some +# methods should not be exposed, indicated here by a False value. +# Some methods should be exposed but certain fields should not be +# exposed as parameters. +apiMethodInfo = { + "amqp_connection_start": False, # application code should not use this + "amqp_connection_secure": False, # application code should not use this + "amqp_connection_tune": False, # application code should not use this + "amqp_connection_open": False, # application code should not use this + "amqp_connection_close": False, # needs special handling + "amqp_channel_open": ["out_of_band"], + "amqp_channel_close": False, # needs special handling + "amqp_access_request": False, # huh? + "amqp_exchange_declare": ["auto_delete", "internal"], + "amqp_basic_get": False, # get-ok has content +} + +# When generating API functions corresponding to synchronous methods, +# some fields should be suppressed everywhere. This dict names those +# fields, and the fixed values to use for them. +apiMethodsSuppressArgs = {"ticket": 0, "nowait": False} + AmqpMethod.defName = lambda m: cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + "_method") -AmqpMethod.structName = lambda m: "amqp_" + c_ize(m.klass.name) + '_' + c_ize(m.name) + "_t" +AmqpMethod.fullName = lambda m: "amqp_%s_%s" % (c_ize(m.klass.name), c_ize(m.name)) +AmqpMethod.structName = lambda m: m.fullName() + "_t" AmqpClass.structName = lambda c: "amqp_" + c_ize(c.name) + "_properties_t" +def methodApiPrototype(m): + fn = m.fullName() + info = apiMethodInfo.get(fn, []) + + args = [] + for f in m.arguments: + n = c_ize(f.name) + if n in apiMethodsSuppressArgs or n in info: + continue + + args.append(", ") + args.append(typeFor(m.klass.spec, f).ctype) + args.append(" ") + args.append(n) + + return "%s_ok_t *%s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (fn, fn, ''.join(args)) + +AmqpMethod.apiPrototype = methodApiPrototype + def cConstantName(s): return 'AMQP_' + '_'.join(re.split('[- ]', s.upper())) @@ -424,13 +478,56 @@ int amqp_encode_properties(uint16_t class_id, remaining_flags = remainder; } while (remaining_flags != 0); } - + switch (class_id) {""" for c in spec.allClasses(): genEncodeProperties(c) print """ default: return -ERROR_UNKNOWN_CLASS; } }""" + for m in methods: + if not m.isSynchronous: + continue + + info = apiMethodInfo.get(m.fullName(), []) + if info is False: + continue + + reply = cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + + "_ok_method") + + print + print m.apiPrototype() + print "{" + print " amqp_method_number_t replies[2] = { %s, 0};" % (reply,) + print " %s req;" % (m.structName(),) + + for f in m.arguments: + n = c_ize(f.name) + + val = apiMethodsSuppressArgs.get(n) + if val is None and n in info: + val = f.defaultvalue + + if val is None: + val = n + else: + val = typeFor(spec, f).literal(val) + + + print " req.%s = %s;" % (n, val) + + print """ + state->most_recent_api_result = amqp_simple_rpc(state, channel, + %s, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; +} +""" % (m.defName(),) + def genHrl(spec): def fieldDeclList(fields): if fields: @@ -439,7 +536,7 @@ def genHrl(spec): for f in fields]) else: return " char dummy; /* Dummy field to avoid empty struct */\n" - + def propDeclList(fields): return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name)) for f in fields @@ -465,6 +562,7 @@ extern "C" { print print """/* Function prototypes. */ + extern char const *amqp_constant_name(int constantNumber); extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber); RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber); @@ -485,7 +583,7 @@ extern int amqp_encode_properties(uint16_t class_id, amqp_bytes_t encoded); """ - print "/* Method field records. */" + print "/* Method field records. */\n" for m in methods: methodid = m.klass.index << 16 | m.index print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \ @@ -515,7 +613,14 @@ extern int amqp_encode_properties(uint16_t class_id, fieldDeclList(c.fields), c.structName()) - print """#ifdef __cplusplus + print "/* API functions for methods */\n" + + for m in methods: + if m.isSynchronous and apiMethodInfo.get(m.fullName()) is not False: + print "RABBITMQ_EXPORT %s;" % (m.apiPrototype(),) + + print """ +#ifdef __cplusplus } #endif @@ -526,6 +631,6 @@ def generateErl(specPath): def generateHrl(specPath): genHrl(AmqpSpec(specPath)) - + if __name__ == "__main__": do_main(generateHrl, generateErl) -- cgit v1.2.1