# rpmpython.at: test rpm python bindings # TODO: conditionalize on python availability AT_BANNER([Python bindings]) RPMPY_TEST([module import],[ myprint(rpm.__version__) ], [AT_PACKAGE_VERSION] ) RPMPY_TEST([macro manipulation],[ def prexp(m): myprint(rpm.expandMacro('%%{?%s}' % m)) mname = '__no_such_macro_yet' prexp(mname) rpm.addMacro(mname, '/bin/sh') prexp(mname) rpm.addMacro(mname, '/bin/false') prexp(mname) rpm.delMacro(mname) prexp(mname) rpm.delMacro(mname) prexp(mname) ], [ /bin/sh /bin/false /bin/sh ], []) RPMPY_TEST([basic rpmio],[ msg = b'Killroy was here\n' data = msg * 10 # TODO: test other compression types too if built in for iot in [ 'fpio', 'fdio', 'ufdio', 'gzdio' ]: fn = 'pyio.%s' % iot fd = rpm.fd(fn, 'w', iot) pos = fd.tell() if pos != -2 and pos != 0: myprint('bad start pos %d' % fd.tell()) if fd.write(data) != len(data): myprint('%s write fail' % iot) if fn != fd.name: myprint('bad file name %s' % fd.name) fd.flush() pos = fd.tell() if pos != -2 and pos != len(data): myprint('bad end pos %d' % fd.tell()) fd = rpm.fd(fn, 'r', iot) rdata = fd.read() if rdata != data: myprint('%s read fail (got %d bytes)\n%s' % (iot, len(rdata), rdata)) # compressed io types can't seek if iot == 'ufdio': fd.seek(0) else: fd = rpm.fd(fn, 'r', iot) if fn != fd.name: myprint('bad file name %s' % fd.name) rdata = fd.read(len(msg)) if rdata != msg: myprint('%s sized read fail (got %d bytes)\n%s' % (iot, len(rdata), rdata)) ], []) RPMPY_TEST([spec parse 1],[ # TODO: add a better test spec with sub-packages etc spec = rpm.spec('${RPMDATA}/SPECS/hello.spec') for (name, num, flags) in spec.sources: myprint('src %s %d %d' % (name, num, flags)) for pkg in spec.packages: myprint(pkg.header.format('%{nvr}')) myprint(spec.sourceHeader.format('%{nvr}')) ], [src hello-1.0-modernize.patch 0 2 src hello-1.0.tar.gz 0 1 hello-1.0-1 hello-1.0-1 ]) RPMPY_TEST([spec parse 2],[ spec = rpm.spec('${RPMDATA}/SPECS/mini.spec') rpm.reloadConfig() ]) RPMPY_TEST([basic header manipulation],[ h = rpm.hdr() h['name'] = 'testpkg' h['version'] = '1.0' h['release'] = '1' h['epoch'] = 5 h['arch'] = 'noarch' myprint(h['nevra']) del h['epoch'] myprint(h['nevra']) for a in ['name', 'bugurl', '__foo__', ]: try: x = getattr(h, a) myprint(x) except AttributeError as exc: myprint(exc) ], [testpkg-5:1.0-1.noarch testpkg-1.0-1.noarch testpkg None 'rpm.hdr' object has no attribute '__foo__'] ) RPMPY_TEST([non-utf8 data in header],[ str = u'älämölö' enc = 'iso-8859-1' b = str.encode(enc) h = rpm.hdr() h['group'] = b d = h['group'] try: # python 3 t = bytes(d, 'utf-8', 'surrogateescape') except TypeError: # python 2 t = bytes(d) res = t.decode(enc) myprint(str == res) ], [True] ) RPMPY_TEST([invalid header data],[ h1 = rpm.hdr() h1['basenames'] = ['bing', 'bang', 'bong'] h1['dirnames'] = ['/opt/', '/flopt/'] h1['dirindexes'] = [ 1, 0, 3 ] h2 = rpm.hdr() h2['basenames'] = ['bing', 'bang', 'bong'] h2['dirnames'] = ['/opt/', '/flopt/'] h2['dirindexes'] = [ 0, 0, 1 ] for h in [h1, h2]: try: myprint(','.join(h['filenames'])) except rpm.error as exc: myprint(exc) ], [invalid header data /opt/bing,/opt/bang,/flopt/bong] ) RPMPY_TEST([labelCompare],[ v = '1.0' r = '1' e = 3 h = rpm.hdr() h['name'] = 'testpkg' h['version'] = v h['release'] = r h['epoch'] = e myprint(rpm.labelCompare((str(h['epoch']), h['version'], h['release']), (str(e), v, r))) myprint(rpm.labelCompare((None, '1.0', '1'), (None, '1.1', '2'))) myprint(rpm.labelCompare((None, '1.0', '1'), (None, '1.1', '1'))) myprint(rpm.labelCompare((None, '1.1', '1'), (None, '1.0', '1'))) myprint(rpm.labelCompare((None, '1.1', '1'), (None, '1.0', '2'))) myprint(rpm.labelCompare(('1', '1.0', '1'), ('0', '1.0', '1'))) myprint(rpm.labelCompare(('0', '1.0', '1'), ('1', '1.0', '1'))) myprint(rpm.labelCompare(('0', '1.0', '1'), (None, '1.0', '1'))) myprint(rpm.labelCompare("1.0-1", "2.0-1")) myprint(rpm.labelCompare("2:1.0-1", "2.0-1")) ], [0 -1 -1 1 1 1 -1 0 -1 1] ) RPMPY_TEST([vfyflags API],[ ts = rpm.ts() dlv = ts.getVfyFlags() tlv = dlv | rpm.RPMVSF_NOSHA1HEADER olv = ts.setVfyFlags(tlv) nlv = ts.getVfyFlags() myprint('%s %s %s' % (hex(dlv), hex(olv), hex(nlv))) ], [0x0 0x0 0x100] ) RPMPY_TEST([vfylevel API],[ ts = rpm.ts() dlv = ts.getVfyLevel() olv = ts.setVfyLevel(rpm.RPMSIG_SIGNATURE_TYPE|rpm.RPMSIG_DIGEST_TYPE) nlv = ts.getVfyLevel() myprint('%s %s %s' % (hex(dlv), hex(olv), hex(nlv))) ], [0x1 0x1 0x3] ) RPMPY_TEST([reading a package file],[ ts = rpm.ts() h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-1.0-1.ppc64.rpm') myprint(h['arch']) ], [ppc64] ) RPMPY_TEST([reading a signed package file 1],[ ts = rpm.ts() # avoid rpmlog spew with absolute path to package sink = open('/dev/null', 'w') rpm.setLogFile(sink) try: h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-2.0-1.x86_64-signed.rpm') myprint(h['arch']) except rpm.error as e: myprint(e) ], [public key not available ], ) RPMPY_TEST([reading a signed package file 2],[ keydata = open('${RPMDATA}/keys/rpm.org-rsa-2048-test.pub', 'rb').read() pubkey = rpm.pubkey(keydata) keyring = rpm.keyring() keyring.addKey(pubkey) ts = rpm.ts() ts.setKeyring(keyring) try: h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-2.0-1.x86_64-signed.rpm') myprint(h['arch']) except rpm.error as e: myprint(e) ], [x86_64] ) RPMPY_TEST([add package to transaction],[ ts = rpm.ts() ts.addInstall('${RPMDATA}/RPMS/foo-1.0-1.noarch.rpm', 'u') for e in ts: myprint(e.NEVRA()) ts.clear() for e in ts: myprint(e.NEVRA()) ], [foo-1.0-1.noarch] ) RPMPY_TEST([add erasure to transaction],[ ts = rpm.ts() for i in ['foo', 1234]: myprint('addErase %s' % i) try: ts.addErase(i) except rpm.error as err: myprint(err) myprint('addErase mi') mi = ts.dbMatch('name', 'foo') try: ts.addErase(mi) except rpm.error as err: myprint(err) ], [addErase foo package not installed addErase 1234 package not installed addErase mi] ) RPMPY_TEST([add restore to transaction],[ ts = rpm.ts() for i in ['foo', 1234]: myprint('addRestore %s' % i) try: ts.addRestore(i) except rpm.error as err: myprint(err) myprint('addRestore mi') mi = ts.dbMatch('name', 'foo') try: ts.addRestore(mi) except rpm.error as err: myprint(err) ], [addRestore foo package not installed addRestore 1234 package not installed addRestore mi] ) RPMPY_TEST([add bogus package to transaction 1],[ ts = rpm.ts() h = rpm.hdr() h['name'] = "foo" try: ts.addInstall(h, 'foo', 'i') except rpm.error as err: myprint(err) for e in ts: myprint(e.NEVRA()) ], [adding install to transaction failed] ) RPMPY_TEST([add bogus package to transaction 2],[ ts = rpm.ts() h = rpm.hdr() h['name'] = 'foo' h['version'] = '1.0' h['release'] = '1' h['os'] = 'linux' h['arch'] = 'noarch' h['basenames'] = ['bing', 'bang', 'bong'] h['dirnames'] = ['/opt' '/flopt'] h['dirindexes'] = [ 1, 2, 3 ] try: ts.addInstall(h, 'foo', 'u') except rpm.error as err: myprint(err) for e in ts: myprint(e.NEVRA()) ], [adding upgrade to transaction failed] ) RPMPY_TEST([transaction element userdata],[ mydata = { 'foo': 'bar', 'capstest': 'lock' } ts = rpm.ts() ts.addInstall('${RPMDATA}/RPMS/foo-1.0-1.noarch.rpm', 'u') ts.addInstall('${RPMDATA}/RPMS/capstest-1.0-1.noarch.rpm', 'u') for e in ts: e.SetUserdata(mydata[e.N()]) for e in ts: myprint(e.Userdata()) ], [bar lock] ) RPMPY_TEST([transaction callback 1],[ def ocb(what, amount, total, key, data): print(what, amount, total, type(key), data) return -1 def ncb(arg, what, amount, total, data): print(type(arg), what, amount, total, data) return -1 cbd = "mine" ts = rpm.ts() ts.addInstall('${RPMDATA}/RPMS/foo-1.0-1.noarch.rpm', 'mykey', 'u') ts.setFlags(rpm.RPMTRANS_FLAG_TEST) print("- old -") ts.run(callback=ocb, data=cbd) ts.cbStyle = 1 print("- new -") ts.run(callback=ncb, data=cbd) ], [- old - 2097152 0 1 mine 1048576 0 1 mine 4 0 0 mine 4194304 1 1 mine 32 6 1 mine 16 0 1 mine 64 6 1 mine - new - 2097152 0 1 mine 1048576 0 1 mine 4 0 0 mine 4194304 1 1 mine 32 6 1 mine 16 0 1 mine 64 6 1 mine] ) AT_SETUP([database iterators]) AT_KEYWORDS([python rpmdb]) RPMDB_INIT AT_CHECK([ runroot rpm -i \ --justdb --nodeps --ignorearch --ignoreos \ /data/RPMS/foo-1.0-1.noarch.rpm \ /data/RPMS/hello-2.0-1.i686.rpm ], [0], [], []) RPMPY_CHECK([ ts = rpm.ts() ix = 0 mi = ts.dbMatch() mi.pattern('name', rpm.RPMMIRE_STRCMP, 'hello') for h in mi: ix = h['dbinstance'] break del mi for h in ts.dbMatch('packages', ix): myprint(h['nevra']) ], [hello-2.0-1.i686 ], []) RPMPY_CHECK([ ts = rpm.ts() mi = ts.dbMatch() mi.pattern('name', rpm.RPMMIRE_DEFAULT, 'f*') for h in mi: myprint(h['nevra']) ], [foo-1.0-1.noarch ], []) RPMPY_CHECK([ ts = rpm.ts() for h in ts.dbMatch('name'): myprint(h['nevra']) ], [foo-1.0-1.noarch hello-2.0-1.i686 ], []) RPMPY_CHECK([ ts = rpm.ts() for h in ts.dbMatch('obsoletes'): myprint(h['nevra']) ], [foo-1.0-1.noarch ], []) RPMPY_CHECK([ ts = rpm.ts() for h in ts.dbMatch('provides', 'hi'): myprint(h['nevra']) ], [foo-1.0-1.noarch ], []) RPMPY_CHECK([ ts = rpm.ts() for h in ts.dbMatch('basenames', '/usr/share/doc/hello-2.0/FAQ'): myprint(h['nevra']) ], [hello-2.0-1.i686 ], []) RPMPY_CHECK([ ts = rpm.ts() for di in sorted(ts.dbIndex('obsoletes')): myprint(di) ], [howdy ], []) RPMPY_CHECK([ ts = rpm.ts() for di in sorted(ts.dbIndex('provides')): myprint(di) ], [foo hello hello(x86-32) hi ], []) AT_CLEANUP AT_SETUP([database cookies]) AT_KEYWORDS([python rpmdb]) RPMDB_INIT AT_CHECK([ ], [0], [], []) RPMPY_CHECK([ ts = rpm.ts() ts.openDB() c1 = ts.dbCookie() ts.closeDB() ts.openDB() c2 = ts.dbCookie() myprint(c1 == c2 != None) open("dbcookie", "w+").write(c1) ], [True ], []) AT_CHECK([ runroot rpm -i \ --justdb --nodeps --ignorearch --ignoreos \ /data/RPMS/foo-1.0-1.noarch.rpm \ /data/RPMS/hello-2.0-1.i686.rpm \ ], [0], [], []) RPMPY_CHECK([ ts = rpm.ts() ts.openDB() c1 = ts.dbCookie() c2 = open("dbcookie", "r").read() myprint(c1 != c2) open("dbcookie", "w+").write(c1) ], [True ], []) AT_CHECK([ runroot rpm -i \ --justdb --nodeps --ignorearch --ignoreos \ --define "_transaction_color 3" \ --define "_prefer_color 2" \ /data/RPMS/hello-2.0-1.x86_64.rpm ], [0], [], []) RPMPY_CHECK([ ts = rpm.ts() ts.openDB() c1 = ts.dbCookie() c2 = open("dbcookie", "r").read() myprint(c1 != c2) ], [True ], []) AT_CLEANUP RPMPY_TEST([dependency sets 1],[ ts = rpm.ts() h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-1.0-1.ppc64.rpm') for dep in rpm.ds(h, 'requires'): myprint(dep.DNEVR()) ], [R /bin/sh R /bin/sh R /bin/sh R /bin/sh R libc.so.6 R libc.so.6(GLIBC_2.0) R rpmlib(CompressedFileNames) <= 3.0.4-1 R rpmlib(PayloadFilesHavePrefix) <= 4.0-1 R rtld(GNU_HASH)] ) RPMPY_TEST([dependency sets 2],[ ts = rpm.ts() h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-2.0-1.i686.rpm') ds = rpm.ds(h, 'provides') myprint('%d %d' % (ds.Instance(), ds.Count())) ], [0 2 ], []) RPMPY_TEST([dependency sets 3],[ deps = [] deps.append(rpm.ds(('foo', rpm.RPMSENSE_EQUAL, '2.0'), rpm.RPMTAG_REQUIRENAME)) deps.append(rpm.ds(('bar', rpm.RPMSENSE_EQUAL, '2.0'), rpm.RPMTAG_RECOMMENDNAME)) deps.append(rpm.ds(('bar', rpm.RPMSENSE_EQUAL|rpm.RPMSENSE_MISSINGOK, '2.0'), rpm.RPMTAG_REQUIRENAME)) deps.append(rpm.ds(('(foo if bar)',), rpm.RPMTAG_REQUIRENAME)) deps.append(rpm.ds(('(foo if bar)',), rpm.RPMTAG_SUGGESTNAME)) deps.append(rpm.ds(('(foo if bar)',), rpm.RPMTAG_SUPPLEMENTNAME)) for d in deps: myprint("%s %s %s %s" % (d.DNEVR(), d.IsWeak(), d.IsRich(), d.IsReverse())) ], [R foo = 2.0 False False False r bar = 2.0 True False False R bar = 2.0 True False False R (foo if bar) False True False s (foo if bar) True True False S (foo if bar) True True True ], []) RPMPY_TEST([file info sets 1],[ ts = rpm.ts() h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-2.0-1.i686.rpm') fi = rpm.files(h) myprint(len(fi)) for f in fi: myprint('%x: %s' % (f.fflags, f.name)) ], [5 0: /usr/bin/hello 0: /usr/share/doc/hello-2.0 2: /usr/share/doc/hello-2.0/COPYING 2: /usr/share/doc/hello-2.0/FAQ 2: /usr/share/doc/hello-2.0/README ], []) RPMPY_TEST([file sets 1],[ ts = rpm.ts() for p in ['imatest-1.0-1.fc34.noarch.rpm', 'capstest-1.0-1.noarch.rpm']: h = ts.hdrFromFdno('${RPMDATA}/RPMS/%s' % p) files = rpm.files(h) for f in files: sig = f.imasig if sig: sig = sig.hex() myprint('%s: %s' % (f.name, sig)) files = rpm.files(h, flags = rpm.RPMFI_FLAGS_ONLY_FILENAMES) for f in files: sig = f.imasig if sig: sig = sig.hex() myprint('%s: %s' % (f.name, sig)) ], [/usr/share/example1: 030204a598255400483046022100e5117bdafa73baaeb1f1dc46ecaa46981a62d417745a33532572b63dc6d95d16022100c789107ac5b91e2d915e1df3c7b78414f6b3f50899d44c1de381d0e938dfc82b /usr/share/example2: 030204a598255400473045022100c10943795bff5d9c0db53dd4f8e4b845615fd08a2be295c30a80f5bdb4e6a41302203038840cc6abaab92acb56cb3e3ce520b17f22ff7444a8d5d0f703a44d5307a3 /usr/share/example1: None /usr/share/example2: None /a/emptyCaps1: None /a/emptyCaps2: None /a/noCaps: None /a/emptyCaps1: None /a/emptyCaps2: None /a/noCaps: None ], []) RPMPY_TEST([string pool 1],[ p = rpm.strpool() for s in ['foo', 'bar', 'foo', 'zoo']: p.str2id(s) myprint('%s' % len(p)) for i in range(1, len(p)+1): myprint('%s: %s' % (i, p[i])) ], [3 1: foo 2: bar 3: zoo ], []) RPMPY_TEST([string pool 2],[ p = rpm.strpool() d1 = rpm.ds(('foo', rpm.RPMSENSE_EQUAL, '2.0'), rpm.RPMTAG_PROVIDES, pool=p) d2 = rpm.ds(('bar', rpm.RPMSENSE_EQUAL, '2.0'), rpm.RPMTAG_PROVIDES, pool=p) d3 = rpm.ds(('bar', rpm.RPMSENSE_EQUAL, '2.0'), rpm.RPMTAG_PROVIDES, pool=p) myprint('%s' % len(p)) del p myprint(d1.DNEVR()) myprint(d2.DNEVR()) myprint(d3.DNEVR()) ], [3 P foo = 2.0 P bar = 2.0 P bar = 2.0 ], []) RPMPY_TEST([archive 1],[ import hashlib ts = rpm.ts() fd = rpm.fd.open('${RPMDATA}/SRPMS/hello-1.0-1.src.rpm') h = ts.hdrFromFdno(fd) payload = rpm.fd.open(fd, flags=h['payloadcompressor']) files = rpm.files(h) archive = files.archive(payload) for f in archive: if not f.fflags & rpm.RPMFILE_SPECFILE: continue spec = archive.read() hash = hashlib.md5(spec) if f.digest != hash.hexdigest(): myprint('%s should be %s' % (hash.hexdigest(), f.digest)) break ], [], []) RPMPY_TEST([header unload],[ ts = rpm.ts() h = ts.hdrFromFdno('${RPMDATA}/RPMS/hello-2.0-1.i686.rpm') # Add some garbage to header to make it non-sorted h['installtime'] = 0 # RhBug:1061730 causes export of non-sorted header to be larger than it should len1 = len(h.unload()) # Accessing the header before export forces sorting to take place even on # buggy versions t = h['installtime'] len2 = len(h.unload()) myprint(len1 == len2) ], [True ], []) RPMPY_TEST([version objects 1],[ pv = None for vs in [ '1.0', '1.0', '4:1.0', '1.5-1', '3:2.0-1']: v = rpm.ver(vs) myprint('(%s, %s, %s)' % (v.e, v.v, v.r)) if pv: myprint('%s < %s: %s' % (pv.evr, v.evr, pv < v)) myprint('%s > %s: %s' % (pv.evr, v.evr, pv > v)) myprint('%s == %s: %s' % (pv.evr, v.evr, pv == v)) pv = v ], [(None, 1.0, None) (None, 1.0, None) 1.0 < 1.0: False 1.0 > 1.0: False 1.0 == 1.0: True (4, 1.0, None) 1.0 < 4:1.0: True 1.0 > 4:1.0: False 1.0 == 4:1.0: False (None, 1.5, 1) 4:1.0 < 1.5-1: False 4:1.0 > 1.5-1: True 4:1.0 == 1.5-1: False (3, 2.0, 1) 1.5-1 < 3:2.0-1: True 1.5-1 > 3:2.0-1: False 1.5-1 == 3:2.0-1: False ], []) RPMPY_TEST([version objects 2],[ for vt in [ (None, "1.0", None), (None, "1.0", "2"), ("1", "1.0", "3") ]: v = rpm.ver(vt) myprint(v) ], [1.0 1.0-2 1:1.0-3 ], [])