diff options
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 62 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 232 | ||||
-rw-r--r-- | librabbitmq/codegen.py | 119 |
3 files changed, 112 insertions, 301 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 1541583..144dc83 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -326,10 +326,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, int heartbeat, amqp_sasl_method_enum sasl_method, ...); -RABBITMQ_EXPORT struct amqp_channel_open_ok_t_ *amqp_channel_open( - amqp_connection_state_t state, - amqp_channel_t channel); - struct amqp_basic_properties_t_; RABBITMQ_EXPORT int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, @@ -348,58 +344,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_connection_close( amqp_connection_state_t state, int code); -RABBITMQ_EXPORT struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_declare_ok_t_ *amqp_queue_declare( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_delete_ok_t_ *amqp_queue_delete( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty); - -RABBITMQ_EXPORT struct amqp_queue_bind_ok_t_ *amqp_queue_bind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_queue_unbind_ok_t_ *amqp_queue_unbind( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments); - -RABBITMQ_EXPORT struct amqp_basic_consume_ok_t_ *amqp_basic_consume( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter); - RABBITMQ_EXPORT int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, @@ -410,12 +354,6 @@ RABBITMQ_EXPORT amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_bytes_t queue, amqp_boolean_t no_ack); -RABBITMQ_EXPORT struct amqp_queue_purge_ok_t_ *amqp_queue_purge( - amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait); - RABBITMQ_EXPORT struct amqp_tx_select_ok_t_ *amqp_tx_select( amqp_connection_state_t state, amqp_channel_t channel); diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index bf19761..456987d 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -119,23 +119,6 @@ const amqp_array_t amqp_empty_array = { 0, NULL }; ? (replytype *) state->most_recent_api_result.reply.decoded \ : NULL) -amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_CHANNEL_OPEN_OK_METHOD, 0}; - amqp_channel_open_t req; - req.out_of_band.bytes = NULL; - req.out_of_band.len = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_CHANNEL_OPEN_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, @@ -238,162 +221,6 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, replies, &req); } -amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t type, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_EXCHANGE_DECLARE_OK_METHOD, 0}; - amqp_exchange_declare_t req; - req.exchange = exchange; - req.type = type; - req.passive = passive; - req.durable = durable; - req.auto_delete = 0; - req.internal = 0; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_EXCHANGE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t passive, - amqp_boolean_t durable, - amqp_boolean_t exclusive, - amqp_boolean_t auto_delete, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DECLARE_OK_METHOD, 0}; - amqp_queue_declare_t req; - req.queue = queue; - req.passive = passive; - req.durable = durable; - req.exclusive = exclusive; - req.auto_delete = auto_delete; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DECLARE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_delete_ok_t *amqp_queue_delete(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t if_unused, - amqp_boolean_t if_empty) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_DELETE_OK_METHOD, 0}; - amqp_queue_delete_t req; - req.queue = queue; - req.if_unused = if_unused; - req.if_empty = if_empty; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_DELETE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_BIND_OK_METHOD, 0}; - amqp_queue_bind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.nowait = 0; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_BIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_table_t arguments) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_UNBIND_OK_METHOD, 0}; - amqp_queue_unbind_t req; - req.ticket = 0; - req.queue = queue; - req.exchange = exchange; - req.routing_key = routing_key; - req.arguments = arguments; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_UNBIND_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_bytes_t consumer_tag, - amqp_boolean_t no_local, - amqp_boolean_t no_ack, - amqp_boolean_t exclusive, - amqp_table_t filter) -{ - amqp_method_number_t replies[2] = { AMQP_BASIC_CONSUME_OK_METHOD, 0}; - amqp_basic_consume_t req; - req.ticket = 0; - req.queue = queue; - req.consumer_tag = consumer_tag; - req.no_local = no_local; - req.no_ack = no_ack; - req.exclusive = exclusive; - req.nowait = 0; - req.arguments = filter; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_CONSUME_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, @@ -405,26 +232,6 @@ int amqp_basic_ack(amqp_connection_state_t state, return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); } -amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_wait) -{ - amqp_method_number_t replies[2] = { AMQP_QUEUE_PURGE_OK_METHOD, 0}; - amqp_queue_purge_t req; - req.ticket = 0; - req.queue = queue; - req.nowait = 0; - - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_QUEUE_PURGE_METHOD, - replies, &req); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, @@ -444,45 +251,6 @@ amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, return state->most_recent_api_result; } -amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_SELECT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_SELECT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_COMMIT_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_COMMIT_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - -amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state, - amqp_channel_t channel) -{ - amqp_method_number_t replies[2] = { AMQP_TX_ROLLBACK_OK_METHOD, 0}; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_TX_ROLLBACK_METHOD, - replies, NULL); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) - return state->most_recent_api_result.reply.decoded; - else - return NULL; -} - amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py index 6fd149e..cded64a 100644 --- a/librabbitmq/codegen.py +++ b/librabbitmq/codegen.py @@ -112,7 +112,7 @@ class BitEncoder(object): self.emitter.emit(line) def encode_bit(self, value): - """Generate code to ebcode a value of the AMQP bit type from + """Generate code to encode a value of the AMQP bit type from the given value.""" if self.bit == 0: self.emitter.emit("bit_buffer = 0;") @@ -138,6 +138,8 @@ class SimpleType(object): def encode(self, emitter, value): emitter.emit("if (!amqp_encode_%d(encoded, &offset, %s)) return -ERROR_BAD_AMQP_DATA;" % (self.bits, value)) + def literal(self, value): + return value class StrType(object): """The AMQP shortstr or longstr types.""" @@ -159,6 +161,11 @@ class StrType(object): emitter.emit(" || !amqp_encode_bytes(encoded, &offset, %s))" % (value,)) emitter.emit(" return -ERROR_BAD_AMQP_DATA;") + def literal(self, value): + if value != '': + raise NotImplementedError() + + return "amqp_empty_bytes" class BitType(object): """The AMQP bit type.""" @@ -172,6 +179,8 @@ class BitType(object): def encode(self, emitter, value): emitter.encode_bit(value) + def literal(self, value): + return {True: 1, False: 0}[value] class TableType(object): """The AMQP table type.""" @@ -191,6 +200,8 @@ class TableType(object): emitter.emit(" if (res < 0) return res;") emitter.emit("}") + def literal(self, value): + raise NotImplementedError() types = { 'octet': SimpleType(8), @@ -213,11 +224,54 @@ def c_ize(s): s = s.replace(' ', '_') return s +# When generating API functions corresponding to synchronous methods, +# we need some information that isn't in the protocol def: Some +# methods should not be exposed, indicated here by a False value. +# Some methods should be exposed but certain fields should not be +# exposed as parameters. +apiMethodInfo = { + "amqp_connection_start": False, # application code should not use this + "amqp_connection_secure": False, # application code should not use this + "amqp_connection_tune": False, # application code should not use this + "amqp_connection_open": False, # application code should not use this + "amqp_connection_close": False, # needs special handling + "amqp_channel_open": ["out_of_band"], + "amqp_channel_close": False, # needs special handling + "amqp_access_request": False, # huh? + "amqp_exchange_declare": ["auto_delete", "internal"], + "amqp_basic_get": False, # get-ok has content +} + +# When generating API functions corresponding to synchronous methods, +# some fields should be suppressed everywhere. This dict names those +# fields, and the fixed values to use for them. +apiMethodsSuppressArgs = {"ticket": 0, "nowait": False} + AmqpMethod.defName = lambda m: cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + "_method") -AmqpMethod.structName = lambda m: "amqp_" + c_ize(m.klass.name) + '_' + c_ize(m.name) + "_t" +AmqpMethod.fullName = lambda m: "amqp_%s_%s" % (c_ize(m.klass.name), c_ize(m.name)) +AmqpMethod.structName = lambda m: m.fullName() + "_t" AmqpClass.structName = lambda c: "amqp_" + c_ize(c.name) + "_properties_t" +def methodApiPrototype(m): + fn = m.fullName() + info = apiMethodInfo.get(fn, []) + + args = [] + for f in m.arguments: + n = c_ize(f.name) + if n in apiMethodsSuppressArgs or n in info: + continue + + args.append(", ") + args.append(typeFor(m.klass.spec, f).ctype) + args.append(" ") + args.append(n) + + return "%s_ok_t *%s(amqp_connection_state_t state, amqp_channel_t channel%s)" % (fn, fn, ''.join(args)) + +AmqpMethod.apiPrototype = methodApiPrototype + def cConstantName(s): return 'AMQP_' + '_'.join(re.split('[- ]', s.upper())) @@ -424,13 +478,56 @@ int amqp_encode_properties(uint16_t class_id, remaining_flags = remainder; } while (remaining_flags != 0); } - + switch (class_id) {""" for c in spec.allClasses(): genEncodeProperties(c) print """ default: return -ERROR_UNKNOWN_CLASS; } }""" + for m in methods: + if not m.isSynchronous: + continue + + info = apiMethodInfo.get(m.fullName(), []) + if info is False: + continue + + reply = cConstantName(c_ize(m.klass.name) + '_' + c_ize(m.name) + + "_ok_method") + + print + print m.apiPrototype() + print "{" + print " amqp_method_number_t replies[2] = { %s, 0};" % (reply,) + print " %s req;" % (m.structName(),) + + for f in m.arguments: + n = c_ize(f.name) + + val = apiMethodsSuppressArgs.get(n) + if val is None and n in info: + val = f.defaultvalue + + if val is None: + val = n + else: + val = typeFor(spec, f).literal(val) + + + print " req.%s = %s;" % (n, val) + + print """ + state->most_recent_api_result = amqp_simple_rpc(state, channel, + %s, + replies, &req); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + return state->most_recent_api_result.reply.decoded; + else + return NULL; +} +""" % (m.defName(),) + def genHrl(spec): def fieldDeclList(fields): if fields: @@ -439,7 +536,7 @@ def genHrl(spec): for f in fields]) else: return " char dummy; /* Dummy field to avoid empty struct */\n" - + def propDeclList(fields): return ''.join([" %s %s;\n" % (typeFor(spec, f).ctype, c_ize(f.name)) for f in fields @@ -465,6 +562,7 @@ extern "C" { print print """/* Function prototypes. */ + extern char const *amqp_constant_name(int constantNumber); extern amqp_boolean_t amqp_constant_is_hard_error(int constantNumber); RABBITMQ_EXPORT char const *amqp_method_name(amqp_method_number_t methodNumber); @@ -485,7 +583,7 @@ extern int amqp_encode_properties(uint16_t class_id, amqp_bytes_t encoded); """ - print "/* Method field records. */" + print "/* Method field records. */\n" for m in methods: methodid = m.klass.index << 16 | m.index print "#define %s ((amqp_method_number_t) 0x%.08X) /* %d, %d; %d */" % \ @@ -515,7 +613,14 @@ extern int amqp_encode_properties(uint16_t class_id, fieldDeclList(c.fields), c.structName()) - print """#ifdef __cplusplus + print "/* API functions for methods */\n" + + for m in methods: + if m.isSynchronous and apiMethodInfo.get(m.fullName()) is not False: + print "RABBITMQ_EXPORT %s;" % (m.apiPrototype(),) + + print """ +#ifdef __cplusplus } #endif @@ -526,6 +631,6 @@ def generateErl(specPath): def generateHrl(specPath): genHrl(AmqpSpec(specPath)) - + if __name__ == "__main__": do_main(generateHrl, generateErl) |