summaryrefslogtreecommitdiff
path: root/test/ftp-client
blob: 595fb19a49ec6cac448fe4b71661ec04c50841d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/python

import gobject

import sys
import dbus
import dbus.service
import dbus.mainloop.glib
import os.path
from optparse import OptionParser

class Agent(dbus.service.Object):
    def __init__(self, conn=None, obj_path=None, verbose=False):
        dbus.service.Object.__init__(self, conn, obj_path)
        self.verbose = verbose

    @dbus.service.method("org.openobex.Agent",
                    in_signature="ot", out_signature="")
    def Progress(self, path, transferred):
        if self.verbose:
            print "Transfer progress (%d bytes)" % (transferred)
        return

    @dbus.service.method("org.openobex.Agent",
                    in_signature="o", out_signature="")
    def Complete(self, path):
        if self.verbose:
            print "Transfer finished"
        mainloop.quit()

    @dbus.service.method("org.openobex.Agent",
                    in_signature="os", out_signature="")
    def Error(self, path, error):
        print "Transfer finished with an error: %s" % (error)
        mainloop.quit()

    @dbus.service.method("org.openobex.Agent",
                    in_signature="", out_signature="")
    def Release(self):
        mainloop.quit()


def parse_options():
    parser.add_option("-d", "--device", dest="device",
                      help="Device to connect", metavar="DEVICE")
    parser.add_option("-c", "--chdir", dest="new_dir",
                      help="Change current directory to DIR", metavar="DIR")
    parser.add_option("-l", "--list", action="store_true", dest="list_dir",
                      help="List the current directory")
    parser.add_option("-g", "--get", dest="get_file",
                      help="Get FILE", metavar="FILE")
    parser.add_option("-p", "--put", dest="put_file",
                      help="Put FILE", metavar="FILE")
    parser.add_option("-v", "--verbose", action="store_true", dest="verbose")

    return parser.parse_args()


def change_folder(session, new_dir):
    for node in new_dir.split("/"):
        session.ChangeFolder(node)

def list_folder(session):
    for i in session.ListFolder():
        if i["Type"] == "folder":
            print "%s/" % (i["Name"])
        else:
            print "%s" % (i["Name"])

def put_file(session, filename):
    session.PutFile(os.path.abspath(filename),
                    os.path.basename(filename))

def get_file(session, filename):
    session.GetFile(os.path.abspath(filename),
                    os.path.basename(filename))


if  __name__ == '__main__':

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    parser = OptionParser()

    (options, args) = parse_options()

    if not options.device:
        parser.print_help()
        sys.exit(0)

    bus = dbus.SessionBus()
    mainloop = gobject.MainLoop()

    need_mainloop = False

    path = "/test/agent"
    agent = Agent(bus, path, options.verbose)

    client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
                            "org.openobex.Client")

    session_path = client.CreateSession({ "Destination": options.device,
                                          "Target": "ftp"})

    session = dbus.Interface(bus.get_object("org.openobex.client", session_path),
                 "org.openobex.Session")

    session.AssignAgent(path)

    ftp = dbus.Interface(bus.get_object("org.openobex.client", session_path),
                 "org.openobex.FileTransfer")

    if options.new_dir:
        change_folder(ftp, options.new_dir)

    if options.list_dir:
        list_folder(ftp)

    if options.get_file:
        need_mainloop = True
        get_file(ftp, options.get_file)

    if options.put_file:
        need_mainloop = True
        put_file(ftp, options.put_file)

    if need_mainloop:
        mainloop.run()