summaryrefslogtreecommitdiff
path: root/test/test-server.py
blob: 8bb55506f6ef3e9c3b0bdf8df031ecdd4c4f18ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python

# Copyright 2008 Mathias Hasselmann and/or Openismus
# Copyright 2008-2016 Collabora Ltd.
# SPDX-License-Identifier: MIT

from dbus.mainloop.glib import DBusGMainLoop

import dbus
import dbus.connection
import dbus.service
import dbus.server

from gi.repository import GLib
import os, sys

class TestService(dbus.service.Object):
    NAME = 'org.freedesktop.dbus.TestService'
    PATH = '/org/freedesktop/dbus/TestService'

    def __init__(self, conn, path=PATH):
        super(TestService, self).__init__(conn, path)

    @dbus.service.method(dbus_interface=NAME,
                         out_signature='s',
                         in_signature='s')
    def reverse(self, text):
        text = list(text)
        text.reverse()

        return ''.join(text)

pin, pout = os.pipe()
child = os.fork()

if 0 == child:
    DBusGMainLoop(set_as_default=True)
    server = dbus.server.Server('unix:tmpdir=/tmp')

    def new_connection(conn):
        print "new connection, %r" % conn
        TestService(conn)

    def connection_gone(conn):
        print "goodbye, %r" % conn

    # Instantiate a TestService every time a connection is created
    server.on_connection_added.append(new_connection)
    server.on_connection_removed.append(connection_gone)

    os.write(pout, server.address)
    os.close(pout)
    os.close(pin)

    print 'server running: %s' % server.address
    GLib.MainLoop().run()

    print 'server quit'

    print 'done?'

else:
    os.waitpid(child, os.WNOHANG)
    os.close(pout)

    address = os.read(pin, 128)
    os.close(pin)

    client = dbus.connection.Connection(address)
    proxy = client.get_object(TestService.NAME, TestService.PATH)
    object = dbus.Interface(proxy, TestService.NAME)

    while True:
        line = sys.stdin.readline()
        if not line: break

        text = line.strip()
        print 'reverse(%s): %s' % (text, object.reverse(text))

    client.close()