#!/usr/bin/env python """ xml_producer.py Publishes messages to an XML exchange, using the routing key "weather" """ import qpid import sys import os from qpid.util import connect from qpid.connection import Connection from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty #----- Functions ---------------------------------------- # Data for weather reports station = ("Raleigh-Durham International Airport (KRDU)", "New Bern, Craven County Regional Airport (KEWN)", "Boone, Watauga County Hospital Heliport (KTNB)", "Hatteras, Mitchell Field (KHSE)") wind_speed_mph = ( 0, 2, 5, 10, 16, 22, 28, 35, 42, 51, 61, 70, 80 ) temperature_f = ( 30, 40, 50, 60, 70, 80, 90, 100 ) dewpoint = ( 35, 40, 45, 50 ) def pick_one(list, i): return str( list [ i % len(list)] ) def report(i): return "" + "" + pick_one(station,i)+ "" + "" + pick_one(wind_speed_mph,i) + "" + "" + pick_one(temperature_f,i) + "" + "" + pick_one(dewpoint,i) + "" + "" #----- Initialization ----------------------------------- # Set parameters for login host="127.0.0.1" port=5672 user="guest" password="guest" amqp_spec="/usr/share/amqp/amqp.0-10.xml" # If an alternate host or port has been specified, use that instead # (this is used in our unit tests) # # If AMQP_SPEC is defined, use it to locate the spec file instead of # looking for it in the default location. if len(sys.argv) > 1 : host=sys.argv[1] if len(sys.argv) > 2 : port=int(sys.argv[2]) try: amqp_spec = os.environ["AMQP_SPEC"] except KeyError: amqp_spec="/usr/share/amqp/amqp.0-10.xml" # Create a connection. socket = connect(host, port) connection = Connection (sock=socket, spec=qpid.spec.load(amqp_spec)) connection.start() session = connection.session(str(uuid4())) #----- Publish some messages ------------------------------ # Create some messages and put them on the broker. props = session.delivery_properties(routing_key="weather") for i in range(10): print report(i) session.message_transfer(destination="xml", message=Message(props, report(i))) #----- Cleanup -------------------------------------------- # Clean up before exiting so there are no open threads. session.close()