1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
#!/usr/bin/env python
# Copyright 2008 Mathias Hasselmann and/or Openismus
# Copyright 2008-2016 Collabora Ltd.
# SPDX-License-Identifier: MIT
from dbus.mainloop.glib import DBusGMainLoop
import dbus
import dbus.connection
import dbus.service
import dbus.server
from gi.repository import GLib
import os, sys
class TestService(dbus.service.Object):
NAME = 'org.freedesktop.dbus.TestService'
PATH = '/org/freedesktop/dbus/TestService'
def __init__(self, conn, path=PATH):
super(TestService, self).__init__(conn, path)
@dbus.service.method(dbus_interface=NAME,
out_signature='s',
in_signature='s')
def reverse(self, text):
text = list(text)
text.reverse()
return ''.join(text)
pin, pout = os.pipe()
child = os.fork()
if 0 == child:
DBusGMainLoop(set_as_default=True)
server = dbus.server.Server('unix:tmpdir=/tmp')
def new_connection(conn):
print "new connection, %r" % conn
TestService(conn)
def connection_gone(conn):
print "goodbye, %r" % conn
# Instantiate a TestService every time a connection is created
server.on_connection_added.append(new_connection)
server.on_connection_removed.append(connection_gone)
os.write(pout, server.address)
os.close(pout)
os.close(pin)
print 'server running: %s' % server.address
GLib.MainLoop().run()
print 'server quit'
print 'done?'
else:
os.waitpid(child, os.WNOHANG)
os.close(pout)
address = os.read(pin, 128)
os.close(pin)
client = dbus.connection.Connection(address)
proxy = client.get_object(TestService.NAME, TestService.PATH)
object = dbus.Interface(proxy, TestService.NAME)
while True:
line = sys.stdin.readline()
if not line: break
text = line.strip()
print 'reverse(%s): %s' % (text, object.reverse(text))
client.close()
|