summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/perftest
blob: f4d3c95e964766a2a1dd2239804816cb81d6bdae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

def publisher(n):
	import qpid
	import sys
	from qpid.client import Client
	from qpid.content import Content    
	if len(sys.argv) >= 3:
		n = int(sys.argv[2])	
    	client = Client("127.0.0.1", 5672)
	client.start({"LOGIN": "guest", "PASSWORD": "guest"})
	channel = client.channel(1)
	channel.session_open()
	message = Content("message")
	message["routing_key"] = "message_queue"
	print "producing ", n, " messages" 
	for i in range(n):  			  
	  channel.message_transfer(destination="amq.direct", content=message)		
	
	print "producing final message" 
	message = Content("That's done")
	message["routing_key"] = "message_queue"
	channel.message_transfer(destination="amq.direct", content=message)

	print "consuming sync message"
	consumer = "consumer"
	queue = client.queue(consumer)	
	channel.message_subscribe(queue="sync_queue", destination=consumer)
	channel.message_flow(consumer, 0, 0xFFFFFFFF)
	channel.message_flow(consumer, 1, 0xFFFFFFFF)
        queue.get(block = True)
	print "done"
	channel.session_close()

def consumer():
	import sys
	import qpid
	from qpid.client import Client
	from qpid.content import Content    	
	client = Client("127.0.0.1", 5672)
	client.start({"LOGIN": "guest", "PASSWORD": "guest"})
	channel = client.channel(1)
	channel.session_open()
	consumer = "consumer"
	queue = client.queue(consumer)	
	channel.message_subscribe(queue="message_queue", destination=consumer)
	channel.message_flow(consumer, 0, 0xFFFFFFFF)
	channel.message_flow(consumer, 1, 0xFFFFFFFF)
        final = "That's done"   
	content = ""	
	message = None
	print "getting messages"
	while content != final:
		message = queue.get(block = True)
		content = message.content.body				
	message.complete(cumulative=True)
	
	print "consumed all messages"
	message = Content("message")
	message["routing_key"] = "sync_queue"
	channel.message_transfer(destination="amq.direct", content=message)
	print "done"
	channel.session_close()

if __name__=='__main__':	
	import sys
	import qpid
    	from timeit import Timer
	from qpid.client import Client
	from qpid.content import Content    
	client = Client("127.0.0.1", 5672)
	client.start({"LOGIN": "guest", "PASSWORD": "guest"})	
	channel = client.channel(1)
	channel.session_open()
	channel.queue_declare(queue="message_queue")
	channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue")
	channel.queue_declare(queue="sync_queue")
	channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue")
	channel.session_close()

	numMess = 100
	if len(sys.argv) >= 3:
		numMess = int(sys.argv[2])	
	if len(sys.argv) == 1:
		print "error: please specify prod or cons"
	elif sys.argv[1] == 'prod':		
		tprod = Timer("publisher(100)", "from __main__ import publisher")
		tp = tprod.timeit(1)
		print "produced and consumed" , numMess + 2 ,"messages in: ", tp
	elif sys.argv[1] == 'cons':
		tcons = Timer("consumer()", "from __main__ import consumer")
		tc = tcons.timeit(1)
		print "consumed " , numMess ," in: ", tc
	else:
	        print "please specify prod or cons"