aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikel Astiz <mikel.astiz@bmw-carit.de>2012-06-04 11:37:56 +0200
committerLuiz Augusto von Dentz <luiz.von.dentz@intel.com>2012-06-04 23:57:27 +0300
commitc5c8053fae0d1a79ba6ff83830c997af8aa23523 (patch)
tree909d3f7c1014a0115147526cb1cac2fdb93ac07a
parent6510ff727fd4716de2e2e341d2f227ddb4db7a7b (diff)
downloadobexd-c5c8053fae0d1a79ba6ff83830c997af8aa23523.tar.gz
client-test: pbap-client uses new API
-rwxr-xr-xtest/pbap-client183
1 files changed, 147 insertions, 36 deletions
diff --git a/test/pbap-client b/test/pbap-client
index ecca14f..0ed8b1e 100755
--- a/test/pbap-client
+++ b/test/pbap-client
@@ -1,41 +1,152 @@
#!/usr/bin/python
+import gobject
+
import sys
+import os
import dbus
+import dbus.service
+import dbus.mainloop.glib
+
+class Transfer:
+ def __init__(self, callback_func):
+ self.callback_func = callback_func
+ self.path = None
+ self.filename = None
+
+class PbapClient:
+ def __init__(self, session_path):
+ self.pending_transfers = 0
+ self.transfer_dict = dict()
+ self.flush_func = None
+ bus = dbus.SessionBus()
+ obj = bus.get_object("org.openobex.client", session_path)
+ self.session = dbus.Interface(obj, "org.openobex.Session")
+ self.pbap = dbus.Interface(obj, "org.openobex.PhonebookAccess")
+ bus.add_signal_receiver(self.transfer_complete,
+ dbus_interface="org.openobex.Transfer",
+ signal_name="Complete",
+ path_keyword="path")
+ bus.add_signal_receiver(self.transfer_error,
+ dbus_interface="org.openobex.Transfer",
+ signal_name="Error",
+ path_keyword="path")
+
+ def register(self, reply, transfer):
+ (path, properties) = reply
+ transfer.path = path
+ transfer.filename = properties["Filename"]
+ self.transfer_dict[path] = transfer
+ print "Transfer created: %s (file %s)" % (path,
+ transfer.filename)
+
+ def error(self, err):
+ print err
+ mainloop.quit()
+
+ def transfer_complete(self, path):
+ req = self.transfer_dict.get(path)
+ if req == None:
+ return
+ self.pending_transfers -= 1
+ print "Transfer %s finished" % path
+ f = open(req.filename, "r")
+ os.remove(req.filename)
+ lines = f.readlines()
+ del self.transfer_dict[path]
+ req.callback_func(lines)
+
+ if (len(self.transfer_dict) == 0) and (self.pending_transfers == 0):
+ if self.flush_func != None:
+ f = self.flush_func
+ self.flush_func = None
+ f()
+
+ def transfer_error(self, code, message, path):
+ req = self.transfer_dict.get(path)
+ if req == None:
+ return
+ print "Transfer finished with error %s: %s" % (code, message)
+ mainloop.quit()
+
+ def pull(self, vcard, func):
+ req = Transfer(func)
+ self.pbap.Pull(vcard, "",
+ reply_handler=lambda r: self.register(r, req),
+ error_handler=self.error)
+ self.pending_transfers += 1
+
+ def pull_all(self, func):
+ req = Transfer(func)
+ self.pbap.PullAll("",
+ reply_handler=lambda r: self.register(r, req),
+ error_handler=self.error)
+ self.pending_transfers += 1
+
+ def flush_transfers(self, func):
+ if (len(self.transfer_dict) == 0) and (self.pending_transfers == 0):
+ return
+ self.flush_func = func
+
+ def interface(self):
+ return self.pbap
+
+if __name__ == '__main__':
+
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ bus = dbus.SessionBus()
+ mainloop = gobject.MainLoop()
+
+ client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
+ "org.openobex.Client")
+
+ if (len(sys.argv) < 2):
+ print "Usage: %s <device>" % (sys.argv[0])
+ sys.exit(1)
+
+ print "Creating Session"
+ session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
+
+ pbap_client = PbapClient(session_path)
+
+ def process_result(lines, header):
+ if header != None:
+ print header
+ for line in lines:
+ print line,
+ print
+
+ def test_paths(paths):
+ if len(paths) == 0:
+ print
+ print "FINISHED"
+ mainloop.quit()
+ return
+
+ path = paths[0]
+
+ print "\n--- Select Phonebook %s ---\n" % (path)
+ pbap_client.interface().Select("int", path)
+
+ print "\n--- GetSize ---\n"
+ ret = pbap_client.interface().GetSize()
+ print "Size = %d\n" % (ret)
+
+ print "\n--- List vCard ---\n"
+ ret = pbap_client.interface().List()
+ for item in ret:
+ print "%s : %s" % (item[0], item[1])
+ pbap_client.interface().SetFormat("vcard30")
+ pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
+ pbap_client.pull(item[0], lambda x: process_result(x, None))
+
+ pbap_client.interface().SetFormat("vcard30")
+ pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]);
+ pbap_client.pull_all(lambda x: process_result(x, "\n--- PullAll ---\n"))
+
+ pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
+
+ test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
-bus = dbus.SessionBus()
-
-client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
- "org.openobex.Client")
-
-print "Creating Session"
-session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
-pbap = dbus.Interface(bus.get_object("org.openobex.client", session_path),
- "org.openobex.PhonebookAccess")
-session = dbus.Interface(bus.get_object("org.openobex.client", session_path),
- "org.openobex.Session")
-
-paths = ["PB", "ICH", "OCH", "MCH", "CCH"]
-
-for path in paths:
- print "\n--- Select Phonebook %s ---\n" % (path)
- pbap.Select("int", path)
-
- print "\n--- GetSize ---\n"
- ret = pbap.GetSize()
- print "Size = %d\n" % (ret)
-
- print "\n--- List vCard ---\n"
- ret = pbap.List()
- for item in ret:
- print "%s : %s" % (item[0], item[1])
- pbap.SetFormat("vcard30")
- pbap.SetFilter(["VERSION", "FN", "TEL"]);
- ret = pbap.Pull(item[0])
- print "%s" % (ret)
-
- print "\n--- PullAll ---\n"
- pbap.SetFormat("vcard30")
- pbap.SetFilter(["VERSION", "FN", "TEL"]);
- ret = pbap.PullAll()
- print "%s" % (ret)
+ mainloop.run()