#!/usr/bin/env python """ server.py Server for a client/server example """ import qpid import sys import os from random import randint from qpid.util import connect from qpid.connection import Connection from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty #----- Functions ------------------------------------------- def getProperty(msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) return None def respond(session, request): # The routing key for the response is the request's reply-to # property. The body for the response is the request's body, # converted to upper case. reply_to = getProperty(request,"reply_to") if reply_to == None: raise Exception("reply to property needs to be there") props = session.delivery_properties(routing_key=reply_to["routing_key"]) session.message_transfer(reply_to["exchange"],None, None, Message(props,request.body.upper())) #----- Initialization -------------------------------------- # Set parameters for login host=len(sys.argv) > 1 and sys.argv[1] or "127.0.0.1" port=len(sys.argv) > 2 and int(sys.argv[2]) or 5672 user="guest" password="guest" amqp_spec="" try: amqp_spec = os.environ["AMQP_SPEC"] except KeyError: amqp_spec="/usr/share/amqp/amqp.0-10.xml" # Create a connection. conn = Connection (connect (host,port), qpid.spec.load(amqp_spec)) conn.start() session_id = str(uuid4()) session = conn.session(session_id) #----- Main Body -- ---------------------------------------- # Create a request queue and subscribe to it session.queue_declare(queue="request", exclusive=True) session.exchange_bind(exchange="amq.direct", queue="request", binding_key="request") dest = "request_destination" session.message_subscribe(queue="request", destination=dest) session.message_flow(dest, 0, 0xFFFFFFFF) session.message_flow(dest, 1, 0xFFFFFFFF) # Remind the user to start the client program print "Request server running - run your client now." print "(Times out after 100 seconds ...)" sys.stdout.flush() # Respond to each request queue = session.incoming(dest) # If we get a message, send it back to the user (as indicated in the # ReplyTo property) while True: try: request = queue.get(timeout=100) respond(session, request) session.message_accept(RangedSet(request.id)) except Empty: print "No more messages!" break; #----- Cleanup ------------------------------------------------ # Clean up before exiting so there are no open threads. session.close(timeout=10)