#!/usr/bin/python # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Library General Public License as published by # the Free Software Foundation; version 2 only # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU Library General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Copyright 2004, 2005 Red Hat, Inc. # # Author: Paul Nasrat, Florian La Roche, Phil Knirsch # # # Read .rpm packages from python. Implemented completely in python without # using the rpmlib C library. Extensive checks have been added and this # script can be used to check the binary packages for unusual format/content. # # Possible options: # - "--strict" should only be used for the FC development tree # - "--nodigest" to skip sha1/md5sum check for the header+payload # - "--nopayload" to not read in the compressed filedata (payload) # # Example usage: # find /mirror/ -name "*.rpm" -type f -print0 2>/dev/null | xargs -0 ./pyrpm.py # locate '*.rpm' | xargs ./pyrpm.py --nodigest --nopayload # ./pyrpm.py --strict /mirror/fedora/development/i386/Fedora/RPMS/*.rpm # XXX TODO: # - check those unknown tags (content of them) # "immutable", "header_signatures" # - allow reading unknown tags in non-strict mode, only warn about them # - evrSplit(): why not 'epoch = ""' # - add streaming support to bzip2 compressed payload # - add support to verify signatures # dsa = self[["dsaheader"]] # header # gpg = self[["gpg"]] # header + payload # - define a bigger rpmtagrequired for strict? or make sure no code depends # on certain headers to be always present # - how to check some content for correct utf-8 encoding? # - check more tags to have the same lenth (file-related tags) and check # what python does if they don't have the same length # - verifyindex handling # possible rpm format changes: # - deps could be left away from src.rpms # - "cookie" could go away # - "rhnplatform" could go away # - can hardlinks go into a new rpm tag? # - should deps "rpmlib()" and "config()" go away? # - can requires for the same rpm be deleted? # - can requires be reduced by putting several equal ones # which only differ in "flag" go into one requirement? # import os.path, gzip, md5, sha, pwd, grp, re from types import StringType, IntType, ListType from struct import unpack from stat import S_ISREG, S_ISLNK, S_ISDIR # rpm tag types #RPM_NULL = 0 RPM_CHAR = 1 RPM_INT8 = 2 RPM_INT16 = 3 RPM_INT32 = 4 RPM_INT64 = 5 # currently unused RPM_STRING = 6 RPM_BIN = 7 RPM_STRING_ARRAY = 8 RPM_I18NSTRING = 9 # new type internal to this tool: # RPM_STRING_ARRAY for app + params, otherwise a single RPM_STRING RPM_ARGSTRING = 12 # RPMSENSEFLAGS RPMSENSE_ANY = 0 RPMSENSE_SERIAL = (1 << 0) # legacy RPMSENSE_LESS = (1 << 1) RPMSENSE_GREATER = (1 << 2) RPMSENSE_EQUAL = (1 << 3) RPMSENSE_PROVIDES = (1 << 4) # only used internally by builds RPMSENSE_CONFLICTS = (1 << 5) # only used internally by builds RPMSENSE_PREREQ = (1 << 6) # legacy RPMSENSE_OBSOLETES = (1 << 7) # only used internally by builds RPMSENSE_INTERP = (1 << 8) # Interpreter used by scriptlet. RPMSENSE_SCRIPT_PRE = ((1 << 9) | RPMSENSE_PREREQ) # %pre dependency RPMSENSE_SCRIPT_POST = ((1 << 10)|RPMSENSE_PREREQ) # %post dependency RPMSENSE_SCRIPT_PREUN = ((1 << 11)|RPMSENSE_PREREQ) # %preun dependency RPMSENSE_SCRIPT_POSTUN = ((1 << 12)|RPMSENSE_PREREQ) # %postun dependency RPMSENSE_SCRIPT_VERIFY = (1 << 13) # %verify dependency RPMSENSE_FIND_REQUIRES = (1 << 14) # find-requires generated dependency RPMSENSE_FIND_PROVIDES = (1 << 15) # find-provides generated dependency RPMSENSE_TRIGGERIN = (1 << 16) # %triggerin dependency RPMSENSE_TRIGGERUN = (1 << 17) # %triggerun dependency RPMSENSE_TRIGGERPOSTUN = (1 << 18) # %triggerpostun dependency RPMSENSE_MISSINGOK = (1 << 19) # suggests/enhances/recommends hint RPMSENSE_SCRIPT_PREP = (1 << 20) # %prep build dependency RPMSENSE_SCRIPT_BUILD = (1 << 21) # %build build dependency RPMSENSE_SCRIPT_INSTALL = (1 << 22) # %install build dependency RPMSENSE_SCRIPT_CLEAN = (1 << 23) # %clean build dependency RPMSENSE_RPMLIB = ((1 << 24) | RPMSENSE_PREREQ) # rpmlib(feature) dependency RPMSENSE_TRIGGERPREIN = (1 << 25) # @todo Implement %triggerprein RPMSENSE_KEYRING = (1 << 26) RPMSENSE_PATCHES = (1 << 27) RPMSENSE_CONFIG = (1 << 28) RPMSENSE_SENSEMASK = 15 # Mask to get senses: serial, less, greater, equal. # RPM file attributes RPMFILE_NONE = 0 RPMFILE_CONFIG = (1 << 0) # from %%config RPMFILE_DOC = (1 << 1) # from %%doc RPMFILE_ICON = (1 << 2) # from %%donotuse. RPMFILE_MISSINGOK = (1 << 3) # from %%config(missingok) RPMFILE_NOREPLACE = (1 << 4) # from %%config(noreplace) RPMFILE_SPECFILE = (1 << 5) # .spec file in source rpm RPMFILE_GHOST = (1 << 6) # from %%ghost RPMFILE_LICENSE = (1 << 7) # from %%license RPMFILE_README = (1 << 8) # from %%readme RPMFILE_EXCLUDE = (1 << 9) # from %%exclude, internal RPMFILE_UNPATCHED = (1 << 10) # placeholder (SuSE) RPMFILE_PUBKEY = (1 << 11) # from %%pubkey RPMFILE_POLICY = (1 << 12) # from %%policy # List of all rpm tags we care about. We mark older tags which are # not anymore in newer rpm packages (Fedora Core development tree) as # "legacy". # tagname: (tag, type, how-many, flags:legacy=1,src-only=2,bin-only=4) rpmtag = { # basic info "name": (1000, RPM_STRING, None, 0), "epoch": (1003, RPM_INT32, 1, 0), "version": (1001, RPM_STRING, None, 0), "release": (1002, RPM_STRING, None, 0), "arch": (1022, RPM_STRING, None, 0), # dependencies: provides, requires, obsoletes, conflicts "providename": (1047, RPM_STRING_ARRAY, None, 0), "provideflags": (1112, RPM_INT32, None, 0), "provideversion": (1113, RPM_STRING_ARRAY, None, 0), "requirename": (1049, RPM_STRING_ARRAY, None, 0), "requireflags": (1048, RPM_INT32, None, 0), "requireversion": (1050, RPM_STRING_ARRAY, None, 0), "obsoletename": (1090, RPM_STRING_ARRAY, None, 4), "obsoleteflags": (1114, RPM_INT32, None, 4), "obsoleteversion": (1115, RPM_STRING_ARRAY, None, 4), "conflictname": (1054, RPM_STRING_ARRAY, None, 0), "conflictflags": (1053, RPM_INT32, None, 0), "conflictversion": (1055, RPM_STRING_ARRAY, None, 0), # triggers "triggername": (1066, RPM_STRING_ARRAY, None, 4), "triggerflags": (1068, RPM_INT32, None, 4), "triggerversion": (1067, RPM_STRING_ARRAY, None, 4), "triggerscripts": (1065, RPM_STRING_ARRAY, None, 4), "triggerscriptprog": (1092, RPM_STRING_ARRAY, None, 4), "triggerindex": (1069, RPM_INT32, None, 4), # scripts "prein": (1023, RPM_STRING, None, 4), "preinprog": (1085, RPM_ARGSTRING, None, 4), "postin": (1024, RPM_STRING, None, 4), "postinprog": (1086, RPM_ARGSTRING, None, 4), "preun": (1025, RPM_STRING, None, 4), "preunprog": (1087, RPM_ARGSTRING, None, 4), "postun": (1026, RPM_STRING, None, 4), "postunprog": (1088, RPM_ARGSTRING, None, 4), "verifyscript": (1079, RPM_STRING, None, 4), "verifyscriptprog": (1091, RPM_ARGSTRING, None, 4), # addon information: "i18ntable": (100, RPM_STRING_ARRAY, None, 0), # list of available langs "summary": (1004, RPM_I18NSTRING, None, 0), "description": (1005, RPM_I18NSTRING, None, 0), "url": (1020, RPM_STRING, None, 0), "license": (1014, RPM_STRING, None, 0), "rpmversion": (1064, RPM_STRING, None, 0), "sourcerpm": (1044, RPM_STRING, None, 4), "changelogtime": (1080, RPM_INT32, None, 0), "changelogname": (1081, RPM_STRING_ARRAY, None, 0), "changelogtext": (1082, RPM_STRING_ARRAY, None, 0), "prefixes": (1098, RPM_STRING_ARRAY, None, 4), # relocatable rpm packages "optflags": (1122, RPM_STRING, None, 4), # optimization flags for gcc "pubkeys": (266, RPM_STRING_ARRAY, None, 4), "sourcepkgid": (1146, RPM_BIN, 16, 4), # md5 from srpm (header+payload) "immutable": (63, RPM_BIN, 16, 0), # less important information: "buildtime": (1006, RPM_INT32, 1, 0), # time of rpm build "buildhost": (1007, RPM_STRING, None, 0), # hostname where rpm was built "cookie": (1094, RPM_STRING, None, 0), # build host and time # ignored now, successor is comps.xml # Code allows hardcoded exception to also have type RPM_STRING # for RPMTAG_GROUP==1016. "group": (1016, RPM_I18NSTRING, None, 0), "size": (1009, RPM_INT32, 1, 0), # sum of all file sizes "distribution": (1010, RPM_STRING, None, 0), "vendor": (1011, RPM_STRING, None, 0), "packager": (1015, RPM_STRING, None, 0), "os": (1021, RPM_STRING, None, 0), # always "linux" "payloadformat": (1124, RPM_STRING, None, 0), # "cpio" "payloadcompressor": (1125, RPM_STRING, None, 0),# "gzip" or "bzip2" "payloadflags": (1126, RPM_STRING, None, 0), # "9" "rhnplatform": (1131, RPM_STRING, None, 4), # == arch "platform": (1132, RPM_STRING, None, 0), # rpm source packages: "source": (1018, RPM_STRING_ARRAY, None, 2), "patch": (1019, RPM_STRING_ARRAY, None, 2), "buildarchs": (1089, RPM_STRING_ARRAY, None, 2), "excludearch": (1059, RPM_STRING_ARRAY, None, 2), "exclusivearch": (1061, RPM_STRING_ARRAY, None, 2), "exclusiveos": (1062, RPM_STRING_ARRAY, None, 2), # ['Linux'] or ['linux'] # information about files "dirindexes": (1116, RPM_INT32, None, 0), "dirnames": (1118, RPM_STRING_ARRAY, None, 0), "basenames": (1117, RPM_STRING_ARRAY, None, 0), "fileusername": (1039, RPM_STRING_ARRAY, None, 0), "filegroupname": (1040, RPM_STRING_ARRAY, None, 0), "filemodes": (1030, RPM_INT16, None, 0), "filemtimes": (1034, RPM_INT32, None, 0), "filedevices": (1095, RPM_INT32, None, 0), "fileinodes": (1096, RPM_INT32, None, 0), "filesizes": (1028, RPM_INT32, None, 0), "filemd5s": (1035, RPM_STRING_ARRAY, None, 0), "filerdevs": (1033, RPM_INT16, None, 0), "filelinktos": (1036, RPM_STRING_ARRAY, None, 0), "fileflags": (1037, RPM_INT32, None, 0), "fileverifyflags": (1045, RPM_INT32, None, 0), "fileclass": (1141, RPM_INT32, None, 0), "filelangs": (1097, RPM_STRING_ARRAY, None, 0), "filecolors": (1140, RPM_INT32, None, 0), "filedependsx": (1143, RPM_INT32, None, 0), "filedependsn": (1144, RPM_INT32, None, 0), "classdict": (1142, RPM_STRING_ARRAY, None, 0), "dependsdict": (1145, RPM_INT32, None, 0), # tags not in Fedora Core development trees anymore: "filecontexts": (1147, RPM_STRING_ARRAY, None, 1), # selinux filecontexts "capability": (1105, RPM_INT32, None, 1), "xpm": (1013, RPM_BIN, None, 1), "gif": (1012, RPM_BIN, None, 1), "verifyscript2": (15, RPM_STRING, None, 1), "nosource": (1051, RPM_INT32, None, 1), "nopatch": (1052, RPM_INT32, None, 1), "disturl": (1123, RPM_STRING, None, 1), "oldfilenames": (1027, RPM_STRING_ARRAY, None, 1), "archivesize": (1046, RPM_INT32, 1, 1), # only in /var/lib/rpm/Packages "triggerin": (1100, RPM_STRING, None, 5), "triggerun": (1101, RPM_STRING, None, 5), "triggerpostun": (1102, RPM_STRING, None, 5) } # Add a reverse mapping for all tags. for v in rpmtag.values(): rpmtag[v[0]] = v del v # Required tags in a header. rpmtagrequired = [] for key in ["name", "version", "release", "arch"]: rpmtagrequired.append(rpmtag[key][0]) del key # Info within the sig header. rpmsigtag = { # size of gpg/dsaheader sums differ between 64/65(contains '\n') "dsaheader": (267, RPM_BIN, None, 0), "gpg": (1005, RPM_BIN, None, 0), "header_signatures": (62, RPM_BIN, 16, 0), "payloadsize": (1007, RPM_INT32, 1, 0), "size_in_sig": (1000, RPM_INT32, 1, 0), "sha1header": (269, RPM_STRING, None, 0), "md5": (1004, RPM_BIN, 16, 0), # legacy entries in older rpm packages: "pgp": (1002, RPM_BIN, None, 1), "badsha1_1": (264, RPM_STRING, None, 1), "badsha1_2": (265, RPM_STRING, None, 1) } # Add a reverse mapping for all tags. for v in rpmsigtag.values(): rpmsigtag[v[0]] = v del v # Required tags in a signature header. rpmsigtagrequired = [] for key in ["md5"]: rpmsigtagrequired.append(rpmsigtag[key][0]) del key # check arch names against this list possible_archs = {'noarch':1, 'i386':1, 'i486':1, 'i586':1, 'i686':1, 'athlon':1, 'pentium3':1, 'pentium4':1, 'x86_64':1, 'ia32e':1, 'ia64':1, 'alpha':1, 'axp':1, 'sparc':1, 'sparc64':1, 's390':1, 's390x':1, 'ia64':1, 'ppc':1, 'ppc64':1, 'ppc64iseries':1, 'ppc64pseries':1, 'ppcpseries':1, 'ppciseries':1, 'ppcmac':1, 'ppc8260':1, 'm68k':1, 'arm':1, 'armv4l':1, 'mips':1, 'mipseb':1, 'mipsel':1, 'hppa':1, 'sh':1 } possible_scripts = { None: 1, "/bin/sh": 1, "/sbin/ldconfig": 1, "/usr/bin/fc-cache": 1, "/usr/bin/scrollkeeper-update": 1, "/usr/sbin/build-locale-archive": 1, "/usr/sbin/glibc_post_upgrade": 1, "/usr/sbin/glibc_post_upgrade.i386": 1, "/usr/sbin/glibc_post_upgrade.i686": 1, "/usr/sbin/libgcc_post_upgrade": 1 } class UGid: """Store a list of user- and groupnames and transform them in uids/gids.""" def __init__(self, names=None): self.ugid = {} if names: self.addUGids(names) def addUGids(self, names): # hack: building up a new array of names makes sure the string # is only stored once per uid/gid ids = [] for name in names: if not self.ugid.has_key(name): self.ugid[name] = name ids.append(self.ugid[name]) return ids def transform(self): pass class Uid(UGid): def transform(self): for uid in self.ugid.keys(): if uid == "root": self.ugid[uid] = 0 else: try: self.ugid[uid] = pwd.getpwnam(uid)[2] except: print "warning: user %s not found, using uid 0" % uid self.ugid[uid] = 0 class Gid(UGid): def transform(self): for gid in self.ugid.keys(): if gid == "root": self.ugid[gid] = 0 else: try: self.ugid[gid] = grp.getgrnam(gid)[2] except: print "warning: group %s not found, using gid 0" % gid self.ugid[gid] = 0 class CPIO: """Read a cpio archive.""" def __init__(self, fd, issrc, size=None, verify=None, strict=None): self.fd = fd self.issrc = issrc self.size = size self.verify = verify self.strict = strict def printErr(self, err): print "%s: %s" % ("cpio-header", err) def readDataPad(self, size, pad=0): data = self.fd.read(size) pad = (4 - ((size + pad) % 4)) % 4 self.fd.read(pad) if self.size != None: self.size -= size + pad return data def readEntry(self): # (magic, inode, mode, uid, gid, nlink, mtime, filesize, # devMajor, devMinor, rdevMajor, rdevMinor, namesize, checksum) data = self.fd.read(110) if self.size != None: self.size -= 110 # CPIO ASCII hex, expanded device numbers (070702 with CRC) if data[0:6] not in ["070701", "070702"]: raise IOError, "Bad magic reading CPIO headers %s" % data[0:6] filename = self.readDataPad(int(data[94:102], 16), 110).rstrip("\x00") if filename == "TRAILER!!!": return None if filename.startswith("./"): filename = filename[1:] if not self.issrc and not filename.startswith("/"): filename = "%s%s" % ("/", filename) if filename.endswith("/") and len(filename) > 1: filename = filename[:-1] if self.verify: # printconf-0.3.61-4.1.i386.rpm is an example where paths are # stored like: /usr/share/printconf/tests/../mf.magic # This makes te normapth() check fail and also gives trouble # for the algorithm finding hardlinks as the files are also # included with their normal path. So same dev/inode pairs # can be hardlinks or they can be wrongly packaged rpms. if self.strict and filename != os.path.normpath(filename): self.printErr("failed: normpath(%s)" % filename) # XXX: Do not even parse the below items, data should always be used # from rpm tags as too many items in the cpio header have been broken # in older rpm packages. Maybe leave it for the strict case, delete # it otherwise. # (name, inode, mode, nlink, mtime, filesize, dev, rdev) return [filename, int(data[6:14], 16), int(data[14:22], 16), int(data[38:46], 16), int(data[46:54], 16), int(data[54:62], 16), int(data[62:70], 16) * 256 + int(data[70:78], 16), int(data[78:86], 16) * 256 + int(data[86:94], 16)] def readCpio(self, func, filenamehash, devinode): files = [] while 1: filedata = self.readEntry() if filedata == None: if self.size != None and self.size != 0: self.printErr("failed cpiosize check") return files files.append(filedata) # XXX reading the data should be done with streaming # It will then move into verifyCpio() data = self.readDataPad(filedata[5]) func(filedata, data, filenamehash, devinode) return None # limit: does not support all RHL5.x and earlier rpms if verify is enabled class ReadRpm: """Read (Linux) rpm packages.""" def __init__(self, filename, verify=None, fd=None, hdronly=None, strict=1, nodigest=None): self.filename = filename self.issrc = 0 if filename.endswith(".src.rpm") or filename.endswith(".nosrc.rpm"): self.issrc = 1 self.verify = verify # enable/disable more data checking self.fd = fd # filedescriptor self.hdronly = hdronly # if only the header is present from a hdlist # 1 == check if old tags are included, 0 allows all tags # 1 is good for Fedora Core development trees self.strict = strict self.nodigest = nodigest def printErr(self, err): print "%s: %s" % (self.filename, err) def raiseErr(self, err): raise ValueError, "%s: %s" % (self.filename, err) def openFd(self, offset=None): if not self.fd: try: self.fd = open(self.filename, "ro") except: self.printErr("could not open file") return 1 if offset: self.fd.seek(offset, 1) return None def closeFd(self): self.fd = None def __repr__(self): return self.hdr.__repr__() def __getitem__(self, key): if isinstance(key, StringType): return self.hdr.get(rpmtag[key][0]) if isinstance(key, IntType): return self.hdr.get(key) # trick to also look at the sig header if isinstance(key, ListType): if isinstance(key[0], StringType): return self.sig.get(rpmsigtag[key[0]][0]) return self.sig.get(key[0]) self.raiseErr("wrong arg") return None def getOne(self, key): value = self[key] if value != None: return value[0] return value def __setitem__(self, key, value): # only limited support compared to __getitem__() from above self.hdr[rpmtag[key][0]] = value return value def parseLead(self, leaddata): (magic, major, minor, rpmtype, arch, name, osnum, sigtype) = \ unpack("!4scchh66shh16x", leaddata) if self.verify: failed = None if major not in ('\x03', '\x04') or minor != '\x00' or \ sigtype != 5 or rpmtype not in (0, 1): failed = 1 # 21 == darwin if osnum not in (1, 21, 255, 256): failed = 1 name = name.rstrip('\x00') if self.strict: if os.path.basename(self.filename)[:len(name)] != name: failed = 1 if failed: print major, minor, rpmtype, arch, name, osnum, sigtype self.printErr("wrong data in rpm lead") return (magic, major, minor, rpmtype, arch, name, osnum, sigtype) def verifyTag(self, index, fmt, issig): (tag, ttype, offset, count) = index if issig: if not rpmsigtag.has_key(tag): self.printErr("rpmsigtag has no tag %d" % tag) else: t = rpmsigtag[tag] if t[1] != None and t[1] != ttype: self.printErr("sigtag %d has wrong type %d" % (tag, ttype)) if t[2] != None and t[2] != count: self.printErr("sigtag %d has wrong count %d" % (tag, count)) if (t[3] & 1) and self.strict: self.printErr("tag %d is old" % tag) if self.issrc: if (t[3] & 4): self.printErr("tag %d should be for binary rpms" % tag) else: if (t[3] & 2): self.printErr("tag %d should be for src rpms" % tag) else: if not rpmtag.has_key(tag): self.printErr("rpmtag has no tag %d" % tag) else: t = rpmtag[tag] if t[1] != None and t[1] != ttype: if t[1] == RPM_ARGSTRING and (ttype == RPM_STRING or \ ttype == RPM_STRING_ARRAY): pass # special exception case elif t[0] == 1016 and ttype == RPM_STRING: pass # hardcoded exception for RPMTAG_GROUP=1016 else: self.printErr("tag %d has wrong type %d" % (tag, ttype)) if t[2] != None and t[2] != count: self.printErr("tag %d has wrong count %d" % (tag, count)) if (t[3] & 1) and self.strict: self.printErr("tag %d is old" % tag) if self.issrc: if (t[3] & 4): self.printErr("tag %d should be for binary rpms" % tag) else: if (t[3] & 2): self.printErr("tag %d should be for src rpms" % tag) if count == 0: self.raiseErr("zero length tag") if ttype < 1 or ttype > 9: self.raiseErr("unknown rpmtype %d" % ttype) if ttype == RPM_INT32: count = count * 4 elif ttype == RPM_STRING_ARRAY or \ ttype == RPM_I18NSTRING: size = 0 for i in xrange(count): end = fmt.index('\x00', offset) + 1 size += end - offset offset = end count = size elif ttype == RPM_STRING: if count != 1: self.raiseErr("tag string count wrong") count = fmt.index('\x00', offset) - offset + 1 elif ttype == RPM_CHAR or ttype == RPM_INT8: pass elif ttype == RPM_INT16: count = count * 2 elif ttype == RPM_INT64: count = count * 8 elif ttype == RPM_BIN: pass else: self.raiseErr("unknown tag header") return count def verifyIndex(self, fmt, fmt2, indexNo, storeSize, issig): checkSize = 0 for i in xrange(0, indexNo * 16, 16): index = unpack("!iiii", fmt[i:i + 16]) ttype = index[1] # alignment for some types of data if ttype == RPM_INT16: checkSize += (2 - (checkSize % 2)) % 2 elif ttype == RPM_INT32: checkSize += (4 - (checkSize % 4)) % 4 elif ttype == RPM_INT64: checkSize += (8 - (checkSize % 8)) % 8 checkSize += self.verifyTag(index, fmt2, issig) if checkSize != storeSize: # XXX: add a check for very old rpm versions here, seems this # is triggered for a few RHL5.x rpm packages self.printErr("storeSize/checkSize is %d/%d" % (storeSize, checkSize)) def readIndex(self, pad, issig=None): data = self.fd.read(16) if not len(data): return None (magic, indexNo, storeSize) = unpack("!8sii", data) if magic != "\x8e\xad\xe8\x01\x00\x00\x00\x00" or indexNo < 1: self.raiseErr("bad index magic") fmt = self.fd.read(16 * indexNo) fmt2 = self.fd.read(storeSize) padfmt = "" if pad != 1: padfmt = self.fd.read((pad - (storeSize % pad)) % pad) if self.verify: self.verifyIndex(fmt, fmt2, indexNo, storeSize, issig) return (indexNo, storeSize, data, fmt, fmt2, 16 + len(fmt) + \ len(fmt2) + len(padfmt)) def parseTag(self, ttype, fmt, offset, count): if ttype == RPM_INT32: return unpack("!%dI" % count, fmt[offset:offset + count * 4]) elif ttype == RPM_STRING_ARRAY or ttype == RPM_I18NSTRING: data = [] for i in xrange(count): end = fmt.index('\x00', offset) data.append(fmt[offset:end]) offset = end + 1 return data elif ttype == RPM_STRING: return fmt[offset:fmt.index('\x00', offset)] elif ttype == RPM_CHAR: return unpack("!%dc" % count, fmt[offset:offset + count]) elif ttype == RPM_INT8: return unpack("!%dB" % count, fmt[offset:offset + count]) elif ttype == RPM_INT16: return unpack("!%dH" % count, fmt[offset:offset + count * 2]) elif ttype == RPM_INT64: return unpack("!%dQ" % count, fmt[offset:offset + count * 8]) elif ttype == RPM_BIN: return fmt[offset:offset + count] self.raiseErr("unknown tag header") return None def parseIndex(self, indexNo, fmt, fmt2, tags=None): # parseIndex() could be implemented as C function for faster speed hdr = {} for i in xrange(0, indexNo * 16, 16): (tag, ttype, offset, count) = unpack("!4i", fmt[i:i + 16]) # support reading only some tags if tags and tags.get(tag) != None: continue # ignore duplicate entries as long as they are identical if hdr.has_key(tag): if hdr[tag] != self.parseTag(ttype, fmt2, offset, count): self.printErr("tag %d included twice" % tag) else: hdr[tag] = self.parseTag(ttype, fmt2, offset, count) return hdr def verifyHeader(self): if self.hdronly: return self.doVerify() def parseHeader(self, tags=None, parsesig=None): if (self.verify or parsesig) and not self.hdronly: (sigindexNo, sigstoreSize, sigdata, sigfmt, sigfmt2, size) = \ self.sigdata self.sig = self.parseIndex(sigindexNo, sigfmt, sigfmt2) if self.verify: for i in rpmsigtagrequired: if not self.sig.has_key(i): self.printErr("sig header is missing: %d" % i) (hdrindexNo, hdrstoreSize, hdrdata, hdrfmt, hdrfmt2, size) = \ self.hdrdata self.hdr = self.parseIndex(hdrindexNo, hdrfmt, hdrfmt2, tags) if self.verify: for i in rpmtagrequired: if not self.hdr.has_key(i): self.printErr("hdr is missing: %d" % i) self.verifyHeader() def readHeader(self, parse=1, tags=None, keepdata=None): if self.openFd(): return 1 leaddata = self.fd.read(96) if leaddata[:4] != '\xed\xab\xee\xdb': self.printErr("no rpm magic found") return 1 if self.verify: self.parseLead(leaddata) self.sigdata = self.readIndex(8, 1) self.hdrdata = self.readIndex(1) if keepdata: self.leaddata = leaddata if parse: self.parseHeader(tags) return None def readHdlist(self, parse=1, tags=None): self.hdrdata = self.readIndex(1) if not self.hdrdata: return None if parse: self.parseHeader(tags) return 1 def verifyCpio(self, filedata, data, filenamehash, devinode): # Overall result is that apart from the filename information # we should not depend on any data from the cpio header. # Data is also stored in rpm tags and the cpio header has # been broken in enough details to ignore it. filename = filedata[0] i = filenamehash.get(filename) if i == None: self.printErr("cpio file %s not in rpm header" % filename) return del filenamehash[filename] isreg = S_ISREG(filedata[2]) if isreg and filedata[1] != self["fileinodes"][i]: self.printErr("wrong fileinode for %s" % filename) if self.strict and filedata[2] != self["filemodes"][i]: self.printErr("wrong filemode for %s" % filename) # uid/gid are ignored from cpio # device/inode are only set correctly for regular files di = devinode.get("%d:%d" % (filedata[6], filedata[1])) if di == None: pass # nlink is only set correctly for hardlinks, so disable this check: # if filedata[3] != 1: # self.printErr("wrong number of hardlinks") else: # Search for "normpath" to read why hardlinks might not # be hardlinks, but only double stored files with "/../" # stored in their filename. Broken packages out there... if self.strict and filedata[3] != len(di): self.printErr("wrong number of hardlinks %s, %d / %d" % \ (filename, filedata[3], len(di))) #elif filedata[3] > len(di): # # This case also happens e.g. in RHL6.2: procps-2.0.6-5.i386.rpm # # where nlinks is greater than the number of actual hardlinks. # self.printErr("wrong number of hardlinks %s, %d / %d" % \ # (filename, filedata[3], len(di))) if filedata[4] != self["filemtimes"][i]: self.printErr("wrong filemtimes for %s" % filename) if filedata[5] != self["filesizes"][i] and \ not (filedata[5] == 0 and filedata[3] > 1): self.printErr("wrong filesize for %s" % filename) if isreg and filedata[6] != self["filedevices"][i]: self.printErr("wrong filedevice for %s" % filename) if self.strict and filedata[7] != self["filerdevs"][i]: self.printErr("wrong filerdevs for %s" % filename) if S_ISLNK(filedata[2]): if data.rstrip("\x00") != self["filelinktos"][i]: self.printErr("wrong filelinkto for %s" % filename) elif isreg: if not (filedata[5] == 0 and filedata[3] > 1): ctx = md5.new() ctx.update(data) if ctx.hexdigest() != self["filemd5s"][i]: self.printErr("wrong filemd5s for %s" % filename) def getFilenames(self): basenames = self["basenames"] if basenames != None: dirnames = self["dirnames"] dirindexes = self["dirindexes"] return [ "%s%s" % (dirnames[dirindexes[i]], basenames[i]) for i in xrange(len(basenames)) ] else: oldfilenames = self["oldfilenames"] if oldfilenames != None: return oldfilenames return [] def readPayload(self, func): self.openFd(96 + self.sigdata[5] + self.hdrdata[5]) #import zlib #payload = self.fd.read() #if payload[:9] != '\037\213\010\000\000\000\000\000\000': # self.raiseErr("not gzipped data") #cpiodata = zlib.decompress(payload) fileflags = self["fileflags"] filedevices = self["filedevices"] fileinodes = self["fileinodes"] filemodes = self["filemodes"] filenames = self.getFilenames() devinode = {} # this will contain hardlinked files filenamehash = {} # full filename of all files for i in xrange(len(filenames)): if fileflags[i] & (RPMFILE_GHOST | RPMFILE_EXCLUDE): continue filenamehash[filenames[i]] = i if S_ISREG(filemodes[i]): di = "%d:%d" % (filedevices[i], fileinodes[i]) #if not devinode.has_key(di): # devinode[di] = [] #devinode[di].append(i) devinode.setdefault(di, []).append(i) for di in devinode.keys(): if len(devinode[di]) <= 1: del devinode[di] # sanity check hardlinks if self.verify: for hardlinks in devinode.values(): for i in xrange(len(hardlinks) - 1): j = hardlinks[i] k = hardlinks[i + 1] # dev/inode are already guaranteed to be the same if self["filemodes"][j] != self["filemodes"][k]: self.raiseErr("mmodes differ for hardlink") if self["filemtimes"][j] != self["filemtimes"][k]: self.raiseErr("mtimes differ for hardlink") if self["filesizes"][j] != self["filesizes"][k]: self.raiseErr("sizes differ for hardlink") if self["filemd5s"][j] != self["filemd5s"][k]: self.raiseErr("md5s differ for hardlink") if self["payloadcompressor"] == "bzip2": import bz2, cStringIO payload = self.fd.read() fd = cStringIO.StringIO(bz2.decompress(payload)) else: #if self["payloadcompressor"] not in [None, "gzip"]: # return fd = gzip.GzipFile(fileobj=self.fd) cpiosize = None if self.verify: cpiosize = self.getOne(["payloadsize"]) archivesize = self.getOne("archivesize") if archivesize != None: if cpiosize == None: cpiosize = archivesize elif cpiosize != archivesize: self.printErr("wrong archive size") c = CPIO(fd, self.issrc, cpiosize, self.verify, self.strict) self.cpiodata = c.readCpio(func, filenamehash, devinode) if self.cpiodata == None: self.raiseErr("Error reading CPIO payload") self.closeFd() for filename in filenamehash.iterkeys(): self.printErr("file not in cpio: %s" % filename) def getSpecfile(self): filenames = self.getFilenames() fileflags = self["fileflags"] for i in xrange(len(filenames)): if fileflags[i] & RPMFILE_SPECFILE: return i for i in xrange(len(filenames)): if filenames[i].endswith(".spec"): return i return None def getScript(self, s, p): script = self[s] # prog can be a string or an string_array (with args to the app) prog = self[p] if self.verify: if script and prog == None: self.raiseErr("no prog") if self.strict: if not possible_scripts.has_key(prog): self.raiseErr("unknown prog: %s" % prog) return (script, prog) def getNVR(self): return "%s-%s-%s" % (self["name"], self["version"], self["release"]) def getNA(self): return "%s.%s" % (self["name"], self["arch"]) def getFilename(self): return "%s-%s-%s.%s.rpm" % (self["name"], self["version"], self["release"], self["arch"]) def getDeps(self, name, flags, version): n = self[name] if not n: return None f = self[flags] v = self[version] if f == None or v == None or len(n) != len(f) or len(f) != len(v): if f != None or v != None: self.raiseErr("wrong length of deps") if f != None: return zip(n, f, v) #deps = [] #for d in n: # deps.append( (d, None, None) ) #return deps return [ (d, None, None) for d in n ] def getProvides(self): return self.getDeps("providename", "provideflags", "provideversion") def getRequires(self): return self.getDeps("requirename", "requireflags", "requireversion") def getObsoletes(self): return self.getDeps("obsoletename", "obsoleteflags", "obsoleteversion") def getConflicts(self): return self.getDeps("conflictname", "conflictflags", "conflictversion") def getTriggers(self): return self.getDeps("triggername", "triggerflags", "triggerversion") def buildFileNames(self): """Returns (dir, filename, linksto, flags).""" if self["filemodes"] == None: # detect empty filelist return [] # XXX We loose the class data here completely, move this stuff # for the non-verify (e.g. extraction) case. uid = Uid() self["fileusername"] = uid.addUGids(self["fileusername"]) gid = Gid() self["filegroupname"] = gid.addUGids(self["filegroupname"]) basenames = self["basenames"] oldfilenames = self["oldfilenames"] if basenames == None and oldfilenames: basenames = oldfilenames dirnames = [None] * len(oldfilenames) else: d = self["dirnames"] dirnames = [ d[index] for index in self["dirindexes"] ] return zip (dirnames, basenames, self["fileflags"], self["fileusername"], self["filegroupname"], self["filemodes"], self["filemtimes"], self["filedevices"], self["fileinodes"], self["filesizes"], self["filemd5s"], self["filelinktos"], self["filerdevs"]) def doVerify(self): size_in_sig = self.getOne(["size_in_sig"]) if size_in_sig != None: rpmsize = os.stat(self.filename)[6] if rpmsize != 96 + self.sigdata[5] + size_in_sig: self.printErr("wrong size in rpm package") filenames = self.getFilenames() fileflags = self["fileflags"] for i in xrange(len(filenames)): # This might be only valid for self.strict: if fileflags[i] & RPMFILE_EXCLUDE: self.printErr("exclude flag set in rpm") if self.issrc: i = self.getSpecfile() if i == None: self.printErr("no specfile found in src.rpm") else: if not filenames[i].endswith(".spec"): self.printErr("specfile does not end with .spec") if self["triggerscripts"] != None: if len(self["triggerscripts"]) != len(self["triggerscriptprog"]): self.printErr("wrong trigger lengths") if "-" in self["version"] or ":" in self["version"]: self.printErr("version contains wrong char") if ":" in self["release"]: self.printErr("version contains wrong char") if self.strict: if "," in self["version"] or "," in self["release"]: self.printErr("version contains wrong char") if self["payloadformat"] not in [None, "cpio"]: self.printErr("wrong payload format") if self.strict: if self["payloadcompressor"] not in [None, "gzip"]: self.printErr("no gzip compressor: %s" % self["payloadcompressor"]) else: if self["payloadcompressor"] not in [None, "gzip", "bzip2"]: self.printErr("no gzip/bzip2 compressor: %s" % \ self["payloadcompressor"]) if self.strict: if self["payloadflags"] not in ["9"]: self.printErr("no payload flags: %s" % self["payloadflags"]) if self.strict and self["os"] not in ["Linux", "linux"]: self.printErr("bad os: %s" % self["os"]) elif self["os"] not in ["Linux", "linux", "darwin"]: self.printErr("bad os: %s" % self["os"]) if self.strict: if self["packager"] not in (None, \ "Red Hat, Inc. "): self.printErr("unknown packager: %s" % self["packager"]) if self["vendor"] not in (None, "Red Hat, Inc."): self.printErr("unknown vendor: %s" % self["vendor"]) if self["distribution"] not in (None, "Red Hat Linux", "Red Hat FC-3", "Red Hat (FC-3)", "Red Hat (RHEL-3)", "Red Hat (FC-4)"): self.printErr("unknown distribution: %s" % self["distribution"]) arch = self["arch"] if self["rhnplatform"] not in (None, arch): self.printErr("unknown arch for rhnplatform") if self.strict: if self["platform"] not in (None, "", arch + "-redhat-linux-gnu", arch + "-redhat-linux", "--target=${target_platform}", arch + "-unknown-linux", "--target=${TARGET_PLATFORM}", "--target=$TARGET_PLATFORM"): self.printErr("unknown arch %s" % self["platform"]) if self["exclusiveos"] not in (None, ['Linux'], ['linux']): self.printErr("unknown os %s" % self["exclusiveos"]) if self.strict: if self["buildarchs"] not in (None, ['noarch']): self.printErr("bad buildarch: %s" % self["buildarchs"]) if self["excludearch"] != None: for i in self["excludearch"]: if not possible_archs.has_key(i): self.printErr("new possible arch %s" % i) if self["exclusivearch"] != None: for i in self["exclusivearch"]: if not possible_archs.has_key(i): self.printErr("new possible arch %s" % i) if self.nodigest: return # sha1 of the header sha1header = self[["sha1header"]] if sha1header: ctx = sha.new() ctx.update(self.hdrdata[2]) ctx.update(self.hdrdata[3]) ctx.update(self.hdrdata[4]) if ctx.hexdigest() != sha1header: self.printErr("wrong sha1: %s / %s" % (sha1header, ctx.hexdigest())) # md5sum of header plus payload md5sum = self[["md5"]] if md5sum: ctx = md5.new() ctx.update(self.hdrdata[2]) ctx.update(self.hdrdata[3]) ctx.update(self.hdrdata[4]) data = self.fd.read(65536) while data: ctx.update(data) data = self.fd.read(65536) self.closeFd() if ctx.digest() != md5sum: self.printErr("wrong md5: %s / %s" % (md5sum, ctx.hexdigest())) class RRpm: """Sample class that uses ReadRpm() to get important information about an rpm.""" def __init__(self, rpm): self.filename = rpm.filename self.name = rpm["name"] self.version = rpm["version"] self.release = rpm["release"] self.epoch = rpm.getOne("epoch") if self.epoch != None: evr = "%d:%s-%s" % (self.epoch, self.version, self.release) else: evr = "%s-%s" % (self.version, self.release) self.dep = (self.name, RPMSENSE_EQUAL, evr) self.arch = rpm["arch"] self.fileinfo = rpm.buildFileNames() self.provides = rpm.getProvides() self.requires = rpm.getRequires() self.obsoletes = rpm.getObsoletes() self.conflicts = rpm.getConflicts() (self.pre, self.preprog) = rpm.getScript("prein", "preinprog") (self.post, self.postprog) = rpm.getScript("postin", "postinprog") (self.preun, self.preunprog) = rpm.getScript("preun", "preunprog") (self.postun, self.postunprog) = rpm.getScript("postun", "postunprog") (self.verify, self.verifyprog) = rpm.getScript("verifyscript", "verifyscriptprog") self.triggers = rpm.getTriggers() self.triggerindex = rpm["triggerindex"] self.trigger = rpm["triggerscripts"] self.triggerprog = rpm["triggerscriptprog"] # old tags: self.triggerin = rpm["triggerin"] self.triggerun = rpm["triggerun"] self.triggerpostun = rpm["triggerpostun"] def printErr(self, err): print "%s: %s" % (self.filename, err) def verifyRpm(filename, strict=1, payload=None, nodigest=None): """Read in a complete rpm and verify its integrity.""" rpm = ReadRpm(filename, 1, strict=strict, nodigest=nodigest) if rpm.readHeader(): return None if payload: rpm.readPayload(rpm.verifyCpio) rpm.closeFd() return rpm def checkDirs(repo): """Check if any two dirs in a repository differ in user/group/mode.""" dirs = {} # collect all directories for rpm in repo: files = rpm.getFilenames() if not files: continue modes = rpm["filemodes"] users = rpm["fileusername"] groups = rpm["filegroupname"] for (f, mode, user, group) in zip(files, modes, users, groups): if f.startswith("/etc/init.d"): print "init.d:", rpm.filename, f if not S_ISDIR(mode): continue dirs.setdefault(f, []).append( (f, user, group, mode, rpm.filename) ) for d in dirs.values(): if len(d) < 2: continue failed = 0 for i in xrange(len(d) - 1): if d[i][1] != d[i + 1][1]: failed = 1 if d[i][2] != d[i + 1][2]: failed = 1 if d[i][3] != d[i + 1][3]: failed = 1 if failed: print "dir check failed for ", d ########### # Title: Remove duplicates from a sequence # Submitter: Tim Peters # From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52560 def uniq(s): """Return a list of the elements in s, but without duplicates. For example, unique([1,2,3,1,2,3]) is some permutation of [1,2,3], unique("abcabc") some permutation of ["a", "b", "c"], and unique(([1, 2], [2, 3], [1, 2])) some permutation of [[2, 3], [1, 2]]. For best speed, all sequence elements should be hashable. Then unique() will usually work in linear time. If not possible, the sequence elements should enjoy a total ordering, and if list(s).sort() doesn't raise TypeError it's assumed that they do enjoy a total ordering. Then unique() will usually work in O(N*log2(N)) time. If that's not possible either, the sequence elements must support equality-testing. Then unique() will usually work in quadratic time. """ n = len(s) if n == 0: return [] # Try using a dict first, as that's the fastest and will usually # work. If it doesn't work, it will usually fail quickly, so it # usually doesn't cost much to *try* it. It requires that all the # sequence elements be hashable, and support equality comparison. u = {} try: for x in s: u[x] = 1 except TypeError: del u # move on to the next method else: return u.keys() # We can't hash all the elements. Second fastest is to sort, # which brings the equal elements together; then duplicates are # easy to weed out in a single pass. # NOTE: Python's list.sort() was designed to be efficient in the # presence of many duplicate elements. This isn't true of all # sort functions in all languages or libraries, so this approach # is more effective in Python than it may be elsewhere. try: t = list(s) t.sort() except TypeError: del t # move on to the next method else: assert n > 0 last = t[0] lasti = i = 1 while i < n: if t[i] != last: t[lasti] = last = t[i] lasti += 1 i += 1 return t[:lasti] # Brute force is all that's left. u = [] for x in s: if x not in u: u.append(x) return u # split EVR string in epoch, version and release def evrSplit(evr): epoch = "0" i = evr.find(":") if i != -1: epoch = evr[:i] j = evr.find("-", i + 1) if j != -1: return (epoch, evr[i + 1:j], evr[j + 1:]) return (epoch, evr[i + 1:], "") fileglobs = ['.*bin\/.*', '^\/etc\/.*', '^\/usr\/lib\/sendmail$'] dirglobs = ['.*bin\/.*', '^\/etc\/.*'] class RepoRpm: """Read one rpm for createrepo data.""" def __init__(self, filename): stats = os.stat(filename) self.size = stats[6] self.mtime = stats[8] self.rpm = ReadRpm(filename) if self.rpm.readHeader(): return self.rpm.closeFd() self.rangestart = 96 + self.rpm.sigdata[5] self.rangeend = self.rangestart + self.rpm.hdrdata[5] - 1 # setup regex objects self.filerc = [] self.dirrc = [] for glob in fileglobs: self.filerc.append(re.compile(glob)) for glob in dirglobs: self.dirrc.append(re.compile(glob)) self.filenames = [] self.dirnames = [] self.ghostnames = [] filenames = self.rpm.getFilenames() if filenames != []: modes = self.rpm["filemodes"] flags = self.rpm["fileflags"] for (filename, mode, flag) in zip(filenames, modes, flags): if S_ISDIR(mode): self.dirnames.append(filename) else: if flag & RPMFILE_GHOST: self.ghostnames.append(filename) continue self.filenames.append(filename) self.usefulfiles = [] for f in self.filenames: for glob in self.filerc: if glob.match(f): self.usefulfiles.append(f) self.usefulghosts = [] for f in self.ghostnames: for glob in self.filerc: if glob.match(f): self.usefulghosts.append(f) self.usefuldirs = [] for f in self.dirnames: for glob in self.dirrc: if glob.match(f): self.usefuldirs.append(f) def readHdlist(filename, verify=None): fd = open(filename, "ro") rpms = [] while 1: rpm = ReadRpm(filename, verify, fd, 1) if not rpm.readHdlist(): break rpms.append(rpm) return rpms #rpms = readHdlist("/home/fedora/i386/Fedora/base/hdlist", 1) #for rpm in rpms: # print rpm.getFilename() #rpms = readHdlist("/home/fedora/i386/Fedora/base/hdlist2", 1) if __name__ == "__main__": import sys #import time repo = [] args = sys.argv[1:] strict = 0 nodigest = 0 payload = 1 if args and args[0] == "--strict": strict = 1 args = args[1:] if args and args[0] == "--digest": nodigest = 0 args = args[1:] if args and args[0] == "--nodigest": nodigest = 1 args = args[1:] if args and args[0] == "--payload": payload = 1 args = args[1:] if args and args[0] == "--nopayload": payload = 0 args = args[1:] for a in args: #reporpm = RepoRpm(a) rpm = verifyRpm(a, strict, payload, nodigest) if rpm != None: #f = rpm["oldfilenames"] #if f: # print rpm.getFilename() # print f rrpm = RRpm(rpm) if strict: repo.append(rpm) del rpm if strict: checkDirs(repo) #print "ready" #time.sleep(30) sys.exit(0) # vim:ts=4:sw=4:showmatch:expandtab