# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpid.codec010 import StringCodec from qpid.ops import PRIMITIVE def codec(name): type = PRIMITIVE[name] def encode(x): sc = StringCodec() sc.write_primitive(type, x) return sc.encoded def decode(x): sc = StringCodec(x) return sc.read_primitive(type) return encode, decode # XXX: need to correctly parse the mime type and deal with # content-encoding header TYPE_MAPPINGS={ dict: "amqp/map", list: "amqp/list", unicode: "text/plain; charset=utf8", unicode: "text/plain", buffer: None, str: None, None.__class__: None } DEFAULT_CODEC = (lambda x: x, lambda x: x) def encode_text_plain(x): if x is None: return None else: return x.encode("utf8") def decode_text_plain(x): if x is None: return None else: return x.decode("utf8") TYPE_CODEC={ "amqp/map": codec("map"), "amqp/list": codec("list"), "text/plain; charset=utf8": (encode_text_plain, decode_text_plain), "text/plain": (encode_text_plain, decode_text_plain), "": DEFAULT_CODEC, None: DEFAULT_CODEC } def get_type(content): return TYPE_MAPPINGS[content.__class__] def get_codec(content_type): return TYPE_CODEC.get(content_type, DEFAULT_CODEC) UNSPECIFIED = object() class Message: """ A message consists of a standard set of fields, an application defined set of properties, and some content. @type id: str @ivar id: the message id @type subject: str @ivar subject: message subject @type user_id: str @ivar user_id: the user-id of the message producer @type reply_to: str @ivar reply_to: the address to send replies @type correlation_id: str @ivar correlation_id: a correlation-id for the message @type durable: bool @ivar durable: message durability @type priority: int @ivar priority: message priority @type ttl: float @ivar ttl: time-to-live measured in seconds @type properties: dict @ivar properties: application specific message properties @type content_type: str @ivar content_type: the content-type of the message @type content: str, unicode, buffer, dict, list @ivar content: the message content """ def __init__(self, content=None, content_type=UNSPECIFIED, id=None, subject=None, user_id=None, reply_to=None, correlation_id=None, durable=None, priority=None, ttl=None, properties=None): """ Construct a new message with the supplied content. The content-type of the message will be automatically inferred from type of the content parameter. @type content: str, unicode, buffer, dict, list @param content: the message content @type content_type: str @param content_type: the content-type of the message """ self.id = id self.subject = subject self.user_id = user_id self.reply_to = reply_to self.correlation_id = correlation_id self.durable = durable self.priority = priority self.ttl = ttl self.redelivered = False if properties is None: self.properties = {} else: self.properties = properties if content_type is UNSPECIFIED: self.content_type = get_type(content) else: self.content_type = content_type self.content = content def __repr__(self): args = [] for name in ["id", "subject", "user_id", "reply_to", "correlation_id", "priority", "ttl"]: value = self.__dict__[name] if value is not None: args.append("%s=%r" % (name, value)) for name in ["durable", "redelivered", "properties"]: value = self.__dict__[name] if value: args.append("%s=%r" % (name, value)) if self.content_type != get_type(self.content): args.append("content_type=%r" % self.content_type) if self.content is not None: if args: args.append("content=%r" % self.content) else: args.append(repr(self.content)) return "Message(%s)" % ", ".join(args) class Disposition: def __init__(self, type, **options): self.type = type self.options = options def __repr__(self): args = [str(self.type)] + \ ["%s=%r" % (k, v) for k, v in self.options.items()] return "Disposition(%s)" % ", ".join(args) __all__ = ["Message", "Disposition"]