Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:Backports:SLE-15-SP6:Update
python-rpmfile
python-rpmfile-1.0.0+git20190702.208ac80.obscpio
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File python-rpmfile-1.0.0+git20190702.208ac80.obscpio of Package python-rpmfile
07070100000000000081A4000003E800000064000000015D1BA0290000016A000000000000000000000000000000000000003200000000python-rpmfile-1.0.0+git20190702.208ac80/.project<?xml version="1.0" encoding="UTF-8"?> <projectDescription> <name>rpmquery</name> <comment></comment> <projects> </projects> <buildSpec> <buildCommand> <name>org.python.pydev.PyDevBuilder</name> <arguments> </arguments> </buildCommand> </buildSpec> <natures> <nature>org.python.pydev.pythonNature</nature> </natures> </projectDescription> 07070100000001000081A4000003E800000064000000015D1BA0290000019E000000000000000000000000000000000000003700000000python-rpmfile-1.0.0+git20190702.208ac80/.pydevproject<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?eclipse-pydev version="1.0"?> <pydev_project> <pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property> <pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property> <pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH"> <path>/rpmquery</path> </pydev_pathproperty> </pydev_project> 07070100000002000041ED000003E800000064000000025D1BA02900000000000000000000000000000000000000000000003300000000python-rpmfile-1.0.0+git20190702.208ac80/.settings07070100000003000081A4000003E800000064000000015D1BA02900000095000000000000000000000000000000000000005400000000python-rpmfile-1.0.0+git20190702.208ac80/.settings/org.eclipse.core.resources.prefseclipse.preferences.version=1 encoding//rpmfile/__init__.py=iso-8859-15 encoding//rpmfile/cpiofile.py=utf-8 encoding//rpmfile/rpmdefs.py=iso-8859-15 07070100000004000081A4000003E800000064000000015D1BA02900000062000000000000000000000000000000000000003500000000python-rpmfile-1.0.0+git20190702.208ac80/.travis.ymllanguage: python python: - "2.7" - "3.4" - "3.5" - "3.6" script: - python setup.py test 07070100000005000081A4000003E800000064000000015D1BA0290000042F000000000000000000000000000000000000003100000000python-rpmfile-1.0.0+git20190702.208ac80/LICENSECopyright (c) 2015 Sean Ross-Ross MIT License Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 07070100000006000081A4000003E800000064000000015D1BA02900000389000000000000000000000000000000000000003300000000python-rpmfile-1.0.0+git20190702.208ac80/README.md# rpmfile [![Build Status](https://travis-ci.org/srossross/rpmfile.svg?branch=master)](https://travis-ci.org/srossross/rpmfile) Tools for inspecting RPM files in python. This module is modeled after the [tarfile](https://docs.python.org/3/library/tarfile.html) module. ## Example ```python import rpmfile with rpmfile.open('file.rpm') as rpm: # Inspect the RPM headers print(rpm.headers.keys()) print(rpm.headers.get('arch', 'noarch')) # Extract a fileobject from the archive fd = rpm.extractfile('./usr/bin/script') print(fd.read()) for member in rpm.getmembers(): print(member) ``` ## Classes * rpmfile.RPMFile: The RPMFile object provides an interface to a RPM archive * rpmfile.RPMInfo: An RPMInfo object represents one member in a RPMFile. ## Code in this module was borrowed from: * https://bitbucket.org/krp/cpiofile * https://github.com/mjvm/pyrpm 07070100000007000041ED000003E800000064000000025D1BA02900000000000000000000000000000000000000000000003100000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile07070100000008000081A4000003E800000064000000015D1BA02900001668000000000000000000000000000000000000003D00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/__init__.py from __future__ import print_function, unicode_literals, absolute_import from .headers import get_headers import sys import io import gzip try: import lzma except ImportError: pass import struct from rpmfile import cpiofile from functools import wraps from rpmfile.io_extra import _SubFile pad = lambda fileobj: (4 - (fileobj.tell() % 4)) % 4 class NoLZMAModuleError(NotImplementedError): pass class RPMInfo(object): ''' Informational class which holds the details about an archive member given by an RPM entry block. RPMInfo objects are returned by RPMFile.getmember() and RPMFile.getmembers() and are usually created internally. ''' _new_coder = struct.Struct(b'8s8s8s8s8s8s8s8s8s8s8s8s8s') def __init__(self, name, file_start, file_size, initial_offset, isdir): self.name = name self.file_start = file_start self.size = file_size self.initial_offset = initial_offset self._isdir = isdir @property def isdir(self): return self._isdir def __repr__(self): return '<RPMMember %r>' % self.name @classmethod def _read(cls, magic, fileobj): if magic == b'070701': return cls._read_new(fileobj, magic=magic) else: raise Exception('bad magic number %r' % magic) @classmethod def _read_new(cls, fileobj, magic=None): coder = cls._new_coder initial_offset = fileobj.tell() d = coder.unpack_from(fileobj.read(coder.size)) namesize = int(d[11], 16) name = fileobj.read(namesize)[:-1].decode('utf-8') fileobj.seek(pad(fileobj), 1) file_start = fileobj.tell() file_size = int(d[6], 16) fileobj.seek(file_size, 1) fileobj.seek(pad(fileobj), 1) nlink = int(d[4], 16) isdir = nlink == 2 and file_size == 0 return cls(name, file_start, file_size, initial_offset, isdir) class RPMFile(object): ''' Open an RPM archive `name'. `mode' must be 'r' to read from an existing archive. If `fileobj' is given, it is used for reading or writing data. If it can be determined, `mode' is overridden by `fileobj's mode. `fileobj' is not closed, when TarFile is closed. ''' def __init__(self, name=None, mode='rb', fileobj=None): if mode != 'rb': raise NotImplementedError("currently the only supported mode is 'rb'") self._fileobj = fileobj or io.open(name, mode) self._header_range, self._headers = get_headers(self._fileobj) self._ownes_fd = fileobj is None @property def data_offset(self): return self._header_range[1] @property def header_range(self): return self._header_range @property def headers(self): 'RPM headers' return self._headers def __enter__(self): return self def __exit__(self, *excinfo): if self._ownes_fd: self._fileobj.close() _members = None def getmembers(self): ''' Return the members of the archive as a list of RPMInfo objects. The list has the same order as the members in the archive. ''' if self._members is None: self._members = _members = [] g = self.data_file magic = g.read(2) while magic: if magic == b'07': magic += g.read(4) member = RPMInfo._read(magic, g) if member.name == 'TRAILER!!!': break if not member.isdir: _members.append(member) magic = g.read(2) return _members return self._members def getmember(self, name): ''' Return an RPMInfo object for member `name'. If `name' can not be found in the archive, KeyError is raised. If a member occurs more than once in the archive, its last occurrence is assumed to be the most up-to-date version. ''' members = self.getmembers() for m in members[::-1]: if m.name == name: return m raise KeyError("member %s could not be found" % name) def extractfile(self, member): ''' Extract a member from the archive as a file object. `member' may be a filename or an RPMInfo object. The file-like object is read-only and provides the following methods: read(), readline(), readlines(), seek() and tell() ''' if not isinstance(member, RPMInfo): member = self.getmember(member) return _SubFile(self.data_file, member.file_start, member.size) _data_file = None @property def data_file(self): """Return the uncompressed raw CPIO data of the RPM archive.""" if self._data_file is None: fileobj = _SubFile(self._fileobj, self.data_offset) if self.headers["archive_compression"] == b"xz": if not getattr(sys.modules[__name__], 'lzma', False): raise NoLZMAModuleError('lzma module not present') self._data_file = lzma.LZMAFile(fileobj) else: self._data_file = gzip.GzipFile(fileobj=fileobj) return self._data_file def open(name=None, mode='rb', fileobj=None): ''' Open an RPM archive for reading. Return an appropriate RPMFile class. ''' return RPMFile(name, mode, fileobj) def main(): print(sys.argv[1]) with open(sys.argv[1]) as rpm: print(rpm.headers) for m in rpm.getmembers(): print(m) print('done') if __name__ == '__main__': main() 07070100000009000081A4000003E800000064000000015D1BA0290000453F000000000000000000000000000000000000003D00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/cpiofile.py#!/usr/bin/env python -3 # -*- coding: utf-8 -*- # # Copyright © 2011, 2013 K Richard Pixley # # See LICENSE for details. # # Time-stamp: <30-Jun-2013 19:07:22 PDT by rich@noir.com> """ Cpiofile is a library which reads and writes unix style 'cpio' format archives. .. todo:: open vs context manager .. todo:: make is_cpiofile work on fileobj """ from __future__ import unicode_literals, print_function __docformat__ = 'restructuredtext en' __all__ = [ 'CheckSumError', 'CpioError', 'CpioFile', 'CpioMember', 'HeaderError', 'InvalidFileFormat', 'InvalidFileFormatNull', 'is_cpiofile', 'valid_magic', ] import abc import io import mmap import os import struct class CpioError(Exception): """Base class for CpioFile exceptions""" pass class CheckSumError(CpioError): """Exception indicating a check sum error""" pass class InvalidFileFormat(CpioError): """Exception indicating a file format error""" pass class InvalidFileFormatNull(InvalidFileFormat): """Exception indicating a null file""" pass class HeaderError(CpioError): """Exception indicating a header error""" pass def valid_magic(block): """predicate indicating whether *block* includes a valid magic number""" return CpioMember.valid_magic(block) def is_cpiofile(name): """predicate indicating whether *name* is a valid cpiofile""" with io.open(name, 'rb') as fff: return valid_magic(fff.read(16)) class StructBase(object): """ An abstract base class representing objects which are inherently based on a struct. """ __metaclass__ = abc.ABCMeta coder = None """ The :py:class:`struct.Struct` used to encode/decode this object into a block of memory. This is expected to be overridden by subclasses. """ # pylint: disable=W0105 @property def size(self): """ Exact size in bytes of a block of memory into which is suitable for packing this instance. """ return self.coder.size def unpack(self, block): """convenience function for unpacking""" return self.unpack_from(block) @abc.abstractmethod def unpack_from(self, block, offset=0): """ Set the values of this instance from an in-memory representation of the struct. :param string block: block of memory from which to unpack :param int offset: optional offset into the memory block from which to start unpacking """ raise NotImplementedError def pack(self): """convenience function for packing""" block = bytearray(self.size) self.pack_into(block) return block @abc.abstractmethod def pack_into(self, block, offset=0): """ Store the values of this instance into an in-memory representation of the file. :param string block: block of memory into which to pack :param int offset: optional offset into the memory block into which to start packing """ raise NotImplementedError __hash__ = None def __eq__(self, other): raise NotImplementedError def __ne__(self, other): return not self.__eq__(other) def close_enough(self, other): """ This is a comparison similar to __eq__ except that here the goal is to determine whether two objects are "close enough" despite perhaps having been produced at different times in different locations in the file system. """ return self == other class CpioFile(StructBase): """Class representing an entire cpio file""" _members = [] def __init__(self): self._members = [] @property def members(self): """accessor for a list of the members of this cpio file""" return self._members @property def names(self): """accessor for a list of names of the members of this cpio file""" return [member.name for member in self.members] def __enter__(self): return self def __exit__(self, thingy, value, traceback): self.close() @classmethod def open(cls, name=None, mode=None): return cls._open(cls(), name) def _open(self, name=None, fileobj=None, mymap=None, block=None): """ The _open function takes some form of file identifier and creates an :py:class:`CpioFile` instance from it. :param :py:class:`str` name: a file name :param :py:class:`file` fileobj: if given, this overrides *name* :param :py:class:`mmap.mmap` mymap: if given, this overrides *fileobj* :param :py:class:`bytes` block: file contents in a block of memory, (if given, this overrides *mymap*) The file to be used can be specified in any of four different forms, (in reverse precedence): #. a file name #. :py:class:`file` object #. :py:mod:`mmap.mmap`, or #. a block of memory """ if block is not None: if not name: name = '<unknown>' self.unpack_from(block) if fileobj: fileobj.close() return self if mymap is not None: block = mymap elif fileobj: try: mymap = mmap.mmap(fileobj.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ) # pylint: disable=W0702 except: mymap = 0 block = fileobj.read() elif name: fileobj = io.open(os.path.normpath(os.path.expanduser(name)), 'rb') else: assert False return self._open(name=name, fileobj=fileobj, mymap=mymap, block=block) def close(self): """noop - here for completeness""" pass def unpack_from(self, block, offset=0): pointer = offset print("unpack_from") while 'TRAILER!!!' not in self.names: cmem = CpioMember.encoded_class(block, pointer)() print(type(cmem)) self.members.append(cmem.unpack_from(block, pointer)) pointer += cmem.size del self.members[-1] def pack_into(self, block, offset=0): pointer = offset for member in self.members: member.pack_into(block, pointer) pointer += member.size cmtype = type(self.members[0]) if self.members else CpioMemberNew cmt = cmtype() cmt.name = 'TRAILER!!!' cmt.pack_into(block, pointer) def get_member(self, name): """return a member by *name*""" for member in self.members: if member.name == name: return member return None def __eq__(self, other): raise NotImplementedError class CpioMember(StructBase): """class representing a member of a cpio archive""" coder = None name = None magic = None devmajor = None devminor = None ino = None mode = None uid = None gid = None nlink = None rdevmajor = None rdevminor = None mtime = None filesize = None content = None @staticmethod def valid_magic(block, offset=0): """ predicate indicating whether a block of memory has a valid magic number """ try: return CpioMember.encoded_class(block, offset) except InvalidFileFormat: return False @staticmethod def encoded_class(block, offset=0): """ predicate indicating whether a block of memory includes a magic number """ if not block: raise InvalidFileFormatNull for key in __magicmap__: if block.find(key, offset, offset + len(key)) > -1: return __magicmap__[key] raise InvalidFileFormat def unpack_from(self, block, offset=0): (self.magic, dev, self.ino, self.mode, self.uid, self.gid, self.nlink, rdev, mtimehigh, mtimelow, namesize, filesizehigh, filesizelow) = self.coder.unpack_from(block, offset) self.devmajor = os.major(dev) self.devminor = os.minor(dev) self.rdevmajor = os.major(rdev) self.rdevminor = os.minor(rdev) self.mtime = (mtimehigh << 16) | mtimelow self.filesize = (filesizehigh << 16) | filesizelow namestart = offset + self.coder.size datastart = namestart + namesize self.name = block[namestart:datastart - 1] # drop the null if isinstance(self, CpioMemberBin) and (namesize & 1): datastart += 1 # skip a pad byte if necessary self.content = block[datastart:datastart + self.filesize] return self def pack_into(self, block, offset=0): namesize = len(self.name) dev = os.makedev(self.devmajor, self.devminor) rdev = os.makedev(self.rdevmajor, self.rdevminor) mtimehigh = self.mtime >> 16 mtimelow = self.mtime & 0xffff filesizehigh = self.filesize >> 16 filesizelow = self.filesize & 0xffff self.coder.pack_into(block, offset, self.magic, dev, self.ino, self.mode, self.uid, self.gid, self.nlink, rdev, mtimehigh, mtimelow, namesize, filesizehigh, filesizelow) namestart = offset + self.coder.size datastart = namestart + namesize + 1 block[namestart:datastart - 1] = self.name block[datastart - 1] = '\x00' if isinstance(self, CpioMemberBin) and (namesize & 1): datastart += 1 block[datastart - 1] = '\x00' block[datastart:datastart + self.filesize] = self.content if isinstance(self, CpioMemberBin) and (self.filesize & 1): block[datastart + self.filesize] = '\x00' return self @property def size(self): return (self.coder.size + len(self.name) + 1 + self.filesize) def __repr__(self): return (b'<{0}@{1}: coder={2}, name=\'{3}\', magic=\'{4}\'' + ', devmajor={5}, devminor={6}, ino={7}, mode={8}' + ', uid={9}, gid={10}, nlink={11}, rdevmajor={12}' + ', rdevmino={13}, mtime={14}, filesize={15}>' .format(self.__class__.__name__, hex(id(self)), self.coder, self.name, self.magic, self.devmajor, self.devminor, self.ino, self.mode, self.uid, self.gid, self.nlink, self.rdevmajor, self.rdevminor, self.mtime, self.filesize)) def __eq__(self, other): return (isinstance(other, self.__class__) and self.coder == other.coder and self.magic == other.magic and self.devmajor == other.devmajor and self.devminor == other.devminor and self.ino == other.ino and self.mode == other.mode and self.uid == other.uid and self.gid == other.gid and self.nlink == other.nlink and self.rdevmajor == other.rdevmajor and self.rdevminor == other.rdevminor and self.mtime == other.mtime and self.filesize == other.filesize) close_enough = __eq__ class CpioMemberBin(CpioMember): """intermediate class indicating binary members - for subclassing only""" @property def size(self): namesize = len(self.name) + 1 # add null retval = self.coder.size retval += namesize if isinstance(self, CpioMemberBin) and (namesize & 1): retval += 1 retval += self.filesize if isinstance(self, CpioMemberBin) and (self.filesize & 1): retval += 1 return retval class CpioMember32b(CpioMemberBin): """ .. todo:: need to pad after name and after content for old binary. """ coder = struct.Struct(b'>2sHHHHHHHHHHHH') class CpioMember32l(CpioMemberBin): """class representing a 32bit little endian binary member""" coder = struct.Struct(b'<2sHHHHHHHHHHHH') class CpioMemberODC(CpioMember): """class representing an ODC member""" coder = struct.Struct(b'=6s6s6s6s6s6s6s6s11s6s11s') def unpack_from(self, block, offset=0): (self.magic, dev, ino, mode, uid, gid, nlink, rdev, mtime, namesize, filesize) = self.coder.unpack_from(block, offset) _namesize = namesize self.ino = int(ino, 8) self.mode = int(mode, 8) self.uid = int(uid, 8) self.gid = int(gid, 8) self.nlink = int(nlink, 8) dev = int(dev, 8) rdev = int(rdev, 8) self.devmajor = os.major(dev) self.devminor = os.minor(dev) self.rdevmajor = os.major(rdev) self.rdevminor = os.minor(rdev) self.mtime = int(mtime, 8) namesize = int(namesize, 8) self.filesize = int(filesize, 8) namestart = offset + self.coder.size datastart = namestart + namesize self.name = block[namestart:datastart - 1] # drop the null print('+', _namesize, self.name) self.content = block[datastart:datastart + self.filesize] return self def pack_into(self, block, offset=0): dev = os.makedev(self.devmajor, self.devminor) ino = str(self.ino) mode = str(self.mode) uid = str(self.uid) gid = str(self.gid) nlink = str(self.nlink) rdev = os.makedev(self.rdevmajor, self.rdevminor) mtime = str(self.mtime) namesize = str(len(self.name) + 1) # add a null filesize = str(self.filesize) self.coder.pack_into(block, offset, self.magic, dev, ino, mode, uid, gid, nlink, rdev, mtime, namesize, filesize) namesize = len(self.name) + 1 namestart = offset + self.coder.size datastart = namestart + namesize block[namestart:datastart - 2] = self.name block[datastart - 1] = '\x00' block[datastart:datastart + self.filesize] = self.content return self class CpioMemberNew(CpioMember): """class representing a new member""" coder = struct.Struct(b'6s8s8s8s8s8s8s8s8s8s8s8s8s8s') # pylint: disable=W0613 @staticmethod def _checksum(block, offset, length): """return a checksum for *block* at *offset* and *length*""" return 0 # pylint: enable=W0613 def unpack_from(self, block, offset=0): unpacks = self.coder.unpack_from(block, offset) self.magic = unpacks[0] self.ino = int(unpacks[1], 16) self.mode = int(unpacks[2], 16) self.uid = int(unpacks[3], 16) self.gid = int(unpacks[4], 16) self.nlink = int(unpacks[5], 16) self.mtime = int(unpacks[6], 16) self.filesize = int(unpacks[7], 16) self.devmajor = int(unpacks[8], 16) self.devminor = int(unpacks[9], 16) self.rdevmajor = int(unpacks[10], 16) self.rdevminor = int(unpacks[11], 16) namesize = int(unpacks[12], 16) check = int(unpacks[13], 16) namestart = offset + self.coder.size nameend = namestart + namesize datastart = nameend + ((4 - (nameend % 4)) % 4) # pad dataend = datastart + self.filesize self.name = block[namestart:nameend - 1] # drop the null print( "name", namesize, self.name) print( 'pad', ((4 - (nameend % 4)) % 4) )# pad self.content = block[datastart:dataend] if check != self._checksum(self.content, 0, self.filesize): raise CheckSumError return self def pack_into(self, block, offset=0): namesize = len(self.name) + 1 # unused: rdev = os.makedev(self.rdevmajor, self.rdevminor) self.coder.pack_into( block, offset, self.magic, str(self.ino), str(self.mode), str(self.uid), str(self.gid), str(self.nlink), str(self.mtime), str(self.filesize), str(self.devmajor), str(self.devminor), str(self.rdevmajor), str(self.rdevminor), str(namesize), self._checksum(self.content, 0, self.filesize)) namestart = offset + self.coder.size nameend = namestart + namesize datastart = nameend + ((4 - (nameend % 4)) % 4) # pad dataend = datastart + self.filesize block[namestart:nameend] = self.name for i in range(nameend, datastart): block[i] = '\x00' block[datastart:dataend] = self.content padend = dataend + ((4 - (datastart % 4)) % 4) # pad for i in range(dataend, padend): block[i] = '\x00' return self @property def size(self): retval = self.coder.size retval += len(self.name) + 1 retval += ((4 - (retval % 4)) % 4) retval += self.filesize retval += ((4 - (retval % 4)) % 4) return retval class CpioMemberCRC(CpioMemberNew): """class representing a cpio archive member with a CRC""" @staticmethod def _checksum(block, offset, length): csum = 0 for i in range(length): csum += ord(block[offset + i]) return csum & 0xffffffff __magicmap__ = { b'\x71\xc7': CpioMember32b, b'\xc7\x71': CpioMember32l, b'070707': CpioMemberODC, b'070701': CpioMemberNew, b'070702': CpioMemberCRC, } 0707010000000A000081A4000003E800000064000000015D1BA02900000052000000000000000000000000000000000000003B00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/errors.py''' Created on Jan 10, 2014 @author: sean ''' class RPMError(Exception): pass0707010000000B000081A4000003E800000064000000015D1BA029000012ED000000000000000000000000000000000000003C00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/headers.py''' Created on Jan 10, 2014 @author: sean ''' from __future__ import print_function, unicode_literals, absolute_import import sys import struct from pprint import pprint from .errors import RPMError tags = {'signature': 267 , 'md5': 269 , 'name': 1000 , 'version': 1001 , 'release': 1002 , 'serial': 1003 , 'summary': 1004 , 'description': 1005 , 'buildtime': 1006 , 'buildhost': 1007 , 'installtime': 1008 , 'size': 1009 , 'distribution': 1010 , 'vendor': 1011 , 'gif': 1012 , 'xpm': 1013 , 'copyright': 1014 , 'packager': 1015 , 'group': 1016 , 'changelog': 1017 , 'source': 1018 , 'patch': 1019 , 'url': 1020 , 'os': 1021 , 'arch': 1022 , 'prein': 1023 , 'postin': 1024 , 'preun': 1025 , 'postun': 1026 , 'filenames': 1027 , 'filesizes': 1028 , 'filestates': 1029 , 'filemodes': 1030 , 'fileuids': 1031 , 'filegids': 1032 , 'filerdevs': 1033 , 'filemtimes': 1034 , 'filemd5s': 1035 , 'filelinktos': 1036 , 'fileflags': 1037 , 'root': 1038 , 'fileusername': 1039 , 'filegroupname': 1040 , 'exclude': 1041 , 'exclusive': 1042 , 'icon': 1043 , 'sourcerpm': 1044 , 'fileverifyflags': 1045 , 'archivesize': 1046 , 'provides': 1047 , 'requireflags': 1048 , 'requirename': 1049 , 'requireversion': 1050 , 'nosource': 1051 , 'nopatch': 1052 , 'conflictflags': 1053 , 'conflictname': 1054 , 'conflictversion': 1055 , 'defaultprefix': 1056 , 'buildroot': 1057 , 'installprefix': 1058 , 'excludearch': 1059 , 'excludeos': 1060 , 'exclusivearch': 1061 , 'exclusiveos': 1062 , 'autoreqprov': 1063 , 'rpmversion': 1064 , 'triggerscripts': 1065 , 'triggername': 1066 , 'triggerversion': 1067 , 'triggerflags': 1068 , 'triggerindex': 1069 , 'verifyscript': 1079 , 'basenames': 1117 , 'archive_format': 1124 , 'archive_compression': 1125 , 'target': 1132 , 'authors':1081 , 'comments':1082 } rtags = {value:key for (key, value) in tags.items()} def extract_string(offset, count, store): assert count == 1 idx = store[offset:].index(b'\x00') return store[offset:offset + idx] def extract_array(offset, count, store): return store[offset:].split(b'\x00', count)[:-1] def extract_bin(offset, count, store): return store[offset:offset + count] def extract_int32(offset, count, store): values = struct.unpack(b'!' + b'i' * count, store[offset:offset + 4 * count]) if count == 1: values = values[0] return values def extract_int16(offset, count, store): values = struct.unpack(b'!' + b'h' * count, store[offset:offset + 2 * count]) if count == 1: values = values[0] return values ty_map = { 3: extract_int16, 4: extract_int32, 6: extract_string, 7: extract_bin, 8: extract_array, 9: extract_string, } def extract_data(ty, offset, count, store): extract = ty_map.get(ty) if extract: return extract(offset, count, store) else: return 'could not extract %s' % ty def _readheader(fileobj): char = fileobj.read(1) while char != b'\x8e': char = fileobj.read(1) if char is None or char == b'': raise RPMError("reached end of file without finding magic char \x8e") magic = b'\x8e' + fileobj.read(2) from binascii import hexlify assert hexlify(magic) == b'8eade8', hexlify(magic) version = ord(fileobj.read(1)) header_start = fileobj.tell() - 4 # -4 for magic _ = fileobj.read(4) num_entries, = struct.unpack(b'!i', fileobj.read(4)) header_structure_size, = struct.unpack(b'!i', fileobj.read(4)) header = struct.Struct(b'!iiii') entries = [] for _ in range(num_entries): entry = header.unpack(fileobj.read(header.size)) entries.append(entry) store = fileobj.read(header_structure_size) store headers = {} for tag, ty, offset, count in entries: key = rtags.get(tag, tag) value = extract_data(ty, offset, count, store) headers[key] = value header_end = fileobj.tell() return (header_start, header_end), headers def get_headers(fileobj): lead = struct.Struct(b'!4sBBhh66shh16s') data = fileobj.read(lead.size) value = lead.unpack(data) # Not sure what the first set of headers are for first_range, first_headers = _readheader(fileobj) second_range, second_headers = _readheader(fileobj) first_headers.update(second_headers) return second_range, first_headers def main(): with open(sys.argv[1]) as fileobj: headers = get_headers(fileobj) pprint(headers) if __name__ == '__main__': main() 0707010000000C000081A4000003E800000064000000015D1BA02900000813000000000000000000000000000000000000003D00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/io_extra.py''' Created on Jan 11, 2014 @author: sean ''' import io def _doc(from_func): '''copy doc from one function to another use as a decorator eg:: @_doc(file.tell) def tell(..): ... ''' def decorator(to_func): to_func.__doc__ = from_func.__doc__ return to_func return decorator class _SubFile(object): """A thin wrapper around an existing file object that provides a part of its data as an individual file object. """ def __init__(self, fileobj, start=0, size=None): self._fileobj = fileobj self._start = start if size is None: fileobj.seek(0, 2) pos = fileobj.tell() self._size = pos - start else: self._size = size self._pos = 0 def __getattr__(self, attr): return getattr(self._fileobj, attr) @_doc(io.FileIO.tell) def tell(self): return self._pos @_doc(io.FileIO.seek) def seek(self, offset, whence=0): if whence == 0: self._pos = offset elif whence == 1: self._pos += offset else: self._pos = self._size + offset self._pos = max(0, self._pos) def _n(self, size=None): if not size: size = self._size return min(size, self._size - self._pos) @_doc(io.FileIO.read) def read(self, size=None): self._fileobj.seek(self._pos + self._start, 0) n = self._n(size) self._pos += n return self._fileobj.read(n) @_doc(io.FileIO.readline) def readline(self, size=None): self._fileobj.seek(self._pos + self._start, 0) n = self._n(size) line = self._fileobj.readline(n) self._pos += len(line) return line @_doc(io.FileIO.readlines) def readlines(self, size=None): n = self._n(size) line = self.readline(n) n -= len(line) while line: yield line line = self.readline(n) n -= len(line) 0707010000000D000081A4000003E800000064000000015D1BA0290000061F000000000000000000000000000000000000003C00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/rpmdefs.py# -*- coding: iso-8859-15 -*- # -*- Mode: Python; py-ident-offset: 4 -*- # vim:ts=4:sw=4:et ''' rpm definitions Mario Morgado (BSD) https://github.com/mjvm/pyrpm ''' __revision__ = '$Rev$'[6:-2] RPM_LEAD_MAGIC_NUMBER = '\xed\xab\xee\xdb' RPM_HEADER_MAGIC_NUMBER = '\x8e\xad\xe8' RPMTAG_MIN_NUMBER = 1000 RPMTAG_MAX_NUMBER = 1146 #signature tags RPMSIGTAG_SIZE = 1000 RPMSIGTAG_LEMD5_1 = 1001 RPMSIGTAG_PGP = 1002 RPMSIGTAG_LEMD5_2 = 1003 RPMSIGTAG_MD5 = 1004 RPMSIGTAG_GPG = 1005 RPMSIGTAG_PGP5 = 1006 MD5_SIZE = 16 #16 bytes long PGP_SIZE = 152 #152 bytes long # data types definition RPM_DATA_TYPE_NULL = 0 RPM_DATA_TYPE_CHAR = 1 RPM_DATA_TYPE_INT8 = 2 RPM_DATA_TYPE_INT16 = 3 RPM_DATA_TYPE_INT32 = 4 RPM_DATA_TYPE_INT64 = 5 RPM_DATA_TYPE_STRING = 6 RPM_DATA_TYPE_BIN = 7 RPM_DATA_TYPE_STRING_ARRAY = 8 RPM_DATA_TYPE_I18NSTRING_TYPE = 9 RPM_DATA_TYPES = (RPM_DATA_TYPE_NULL, RPM_DATA_TYPE_CHAR, RPM_DATA_TYPE_INT8, RPM_DATA_TYPE_INT16, RPM_DATA_TYPE_INT32, RPM_DATA_TYPE_INT64, RPM_DATA_TYPE_STRING, RPM_DATA_TYPE_BIN, RPM_DATA_TYPE_STRING_ARRAY,) RPMTAG_NAME = 1000 RPMTAG_VERSION = 1001 RPMTAG_RELEASE = 1002 RPMTAG_DESCRIPTION = 1005 RPMTAG_COPYRIGHT = 1014 RPMTAG_URL = 1020 RPMTAG_ARCH = 1022 RPMTAGS = (RPMTAG_NAME, RPMTAG_VERSION, RPMTAG_RELEASE, RPMTAG_DESCRIPTION, RPMTAG_COPYRIGHT, RPMTAG_URL, RPMTAG_ARCH,) 0707010000000E000081A4000003E800000064000000015D1BA02900000246000000000000000000000000000000000000003200000000python-rpmfile-1.0.0+git20190702.208ac80/setup.py''' @author: sean ''' from setuptools import setup, find_packages from os.path import isfile if isfile('README.md'): with open('README.md') as readme: long_description = readme.read() else: long_description = '???' setup( name='rpmfile', description='Read rpm archive files', version="0.1.5", author='Sean Ross-Ross', author_email='srossross@gmail.com', url='https://github.com/srossross/rpmfile', license='MIT', long_description=long_description, long_description_content_type='text/markdown', packages=find_packages(), ) 0707010000000F000041ED000003E800000064000000025D1BA02900000000000000000000000000000000000000000000002F00000000python-rpmfile-1.0.0+git20190702.208ac80/tests07070100000010000081A4000003E800000064000000015D1BA02900000000000000000000000000000000000000000000003B00000000python-rpmfile-1.0.0+git20190702.208ac80/tests/__init__.py07070100000011000081A4000003E800000064000000015D1BA02900000A62000000000000000000000000000000000000003F00000000python-rpmfile-1.0.0+git20190702.208ac80/tests/test_extract.pyimport os import sys import shutil import hashlib import tempfile import unittest from functools import wraps try: from urllib2 import urlopen except ImportError: from urllib.request import urlopen import rpmfile def download(url, rpmname): def _downloader(func): @wraps(func) def wrapper(*args, **kwds): args = list(args) rpmpath = os.path.join(args[0].tempdir, rpmname) args.append(rpmpath) download = urlopen(url) with open(rpmpath, 'wb') as target_file: target_file.write(download.read()) download.close() return func(*args, **kwds) return wrapper return _downloader class TempDirTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.prevdir = os.getcwd() cls.tempdir = tempfile.mkdtemp() os.chdir(cls.tempdir) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tempdir) os.chdir(cls.prevdir) @unittest.skipUnless(sys.version_info.major >= 3 \ and sys.version_info.minor >= 3, 'Need lzma module') @download('https://download.clearlinux.org/releases/10540/clear/x86_64/os/Packages/sudo-setuid-1.8.17p1-34.x86_64.rpm', 'sudo.rpm') def test_lzma_sudo(self, rpmpath): with rpmfile.open(rpmpath) as rpm: # Inspect the RPM headers self.assertIn('name', rpm.headers.keys()) self.assertEqual(rpm.headers.get('arch', 'noarch'), b'x86_64') members = list(rpm.getmembers()) self.assertEqual(len(members), 1) fd = rpm.extractfile('./usr/bin/sudo') calculated = hashlib.md5(fd.read()).hexdigest() self.assertEqual(calculated, 'a208f3d9170ecfa69a0f4ccc78d2f8f6') @download('https://download.fedoraproject.org/pub/fedora/linux/releases/30/Everything/source/tree/Packages/r/rpm-4.14.2.1-4.fc30.1.src.rpm', 'sample.rpm') def test_autoclose(self, rpmpath): """Test that RPMFile.open context manager properly closes rpm file""" rpm_ref = None with rpmfile.open(rpmpath) as rpm: rpm_ref = rpm # Inspect the RPM headers self.assertIn('name', rpm.headers.keys()) self.assertEqual(rpm.headers.get('arch', 'noarch'), b'x86_64') members = list(rpm.getmembers()) self.assertEqual(len(members), 13) # Test that RPMFile owned file descriptor and that underlying file is really closed self.assertTrue(rpm_ref._fileobj.closed) self.assertTrue(rpm_ref._ownes_fd) 07070100000012000081A4000003E800000064000000015D1BA029000003E9000000000000000000000000000000000000003F00000000python-rpmfile-1.0.0+git20190702.208ac80/tests/test_subfile.py''' Created on Jan 11, 2014 @author: sean ''' import unittest import rpmfile import io class Test(unittest.TestCase): def test_seek(self): fd = io.BytesIO(b'Hello world') sub = rpmfile._SubFile(fd, start=2, size=4) sub.seek(0) self.assertEqual(sub.tell(), 0) sub.seek(1) self.assertEqual(sub.tell(), 1) sub.seek(1, 1) self.assertEqual(sub.tell(), 2) sub.seek(-1, 1) self.assertEqual(sub.tell(), 1) sub.seek(-10, 1) self.assertEqual(sub.tell(), 0) def test_read(self): fd = io.BytesIO(b'Hello world') sub = rpmfile._SubFile(fd, start=2, size=4) self.assertEqual(sub.read(), b'llo ') self.assertEqual(sub.read(), b'') sub.seek(0) self.assertEqual(sub.read(2), b'll') sub.seek(0) self.assertEqual(sub.read(10), b'llo ') if __name__ == "__main__": # import sys;sys.argv = ['', 'Test.testSeek'] unittest.main() 07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!84 blocks
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor