# # DEPRECATED: implementation for ffi.verify() # import sys, os, binascii, shutil, io from . import __version_verifier_modules__ from . import ffiplatform from .error import VerificationError if sys.version_info >= (3, 3): import importlib.machinery def _extension_suffixes(): return importlib.machinery.EXTENSION_SUFFIXES[:] else: import imp def _extension_suffixes(): return [suffix for suffix, _, type in imp.get_suffixes() if type == imp.C_EXTENSION] if sys.version_info >= (3,): NativeIO = io.StringIO else: class NativeIO(io.BytesIO): def write(self, s): if isinstance(s, unicode): s = s.encode('ascii') super(NativeIO, self).write(s) class Verifier(object): def __init__(self, ffi, preamble, tmpdir=None, modulename=None, ext_package=None, tag='', force_generic_engine=False, source_extension='.c', flags=None, relative_to=None, **kwds): if ffi._parser._uses_new_feature: raise VerificationError( "feature not supported with ffi.verify(), but only " "with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,)) self.ffi = ffi self.preamble = preamble if not modulename: flattened_kwds = ffiplatform.flatten(kwds) vengine_class = _locate_engine_class(ffi, force_generic_engine) self._vengine = vengine_class(self) self._vengine.patch_extension_kwds(kwds) self.flags = flags self.kwds = self.make_relative_to(kwds, relative_to) # if modulename: if tag: raise TypeError("can't specify both 'modulename' and 'tag'") else: key = '\x00'.join(['%d.%d' % sys.version_info[:2], __version_verifier_modules__, preamble, flattened_kwds] + ffi._cdefsources) if sys.version_info >= (3,): key = key.encode('utf-8') k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff) k1 = k1.lstrip('0x').rstrip('L') k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff) k2 = k2.lstrip('0').rstrip('L') modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key, k1, k2) suffix = _get_so_suffixes()[0] self.tmpdir = tmpdir or _caller_dir_pycache() self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension) self.modulefilename = os.path.join(self.tmpdir, modulename + suffix) self.ext_package = ext_package self._has_source = False self._has_module = False def write_source(self, file=None): """Write the C source code. It is produced in 'self.sourcefilename', which can be tweaked beforehand.""" with self.ffi._lock: if self._has_source and file is None: raise VerificationError( "source code already written") self._write_source(file) def compile_module(self): """Write the C source code (if not done already) and compile it. This produces a dynamic link library in 'self.modulefilename'.""" with self.ffi._lock: if self._has_module: raise VerificationError("module already compiled") if not self._has_source: self._write_source() self._compile_module() def load_library(self): """Get a C module from this Verifier instance. Returns an instance of a FFILibrary class that behaves like the objects returned by ffi.dlopen(), but that delegates all operations to the C module. If necessary, the C code is written and compiled first. """ with self.ffi._lock: if not self._has_module: self._locate_module() if not self._has_module: if not self._has_source: self._write_source() self._compile_module() return self._load_library() def get_module_name(self): basename = os.path.basename(self.modulefilename) # kill both the .so extension and the other .'s, as introduced # by Python 3: 'basename.cpython-33m.so' basename = basename.split('.', 1)[0] # and the _d added in Python 2 debug builds --- but try to be # conservative and not kill a legitimate _d if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'): basename = basename[:-2] return basename def get_extension(self): ffiplatform._hack_at_distutils() # backward compatibility hack if not self._has_source: with self.ffi._lock: if not self._has_source: self._write_source() sourcename = ffiplatform.maybe_relative_path(self.sourcefilename) modname = self.get_module_name() return ffiplatform.get_extension(sourcename, modname, **self.kwds) def generates_python_module(self): return self._vengine._gen_python_module def make_relative_to(self, kwds, relative_to): if relative_to and os.path.dirname(relative_to): dirname = os.path.dirname(relative_to) kwds = kwds.copy() for key in ffiplatform.LIST_OF_FILE_NAMES: if key in kwds: lst = kwds[key] if not isinstance(lst, (list, tuple)): raise TypeError("keyword '%s' should be a list or tuple" % (key,)) lst = [os.path.join(dirname, fn) for fn in lst] kwds[key] = lst return kwds # ---------- def _locate_module(self): if not os.path.isfile(self.modulefilename): if self.ext_package: try: pkg = __import__(self.ext_package, None, None, ['__doc__']) except ImportError: return # cannot import the package itself, give up # (e.g. it might be called differently before installation) path = pkg.__path__ else: path = None filename = self._vengine.find_module(self.get_module_name(), path, _get_so_suffixes()) if filename is None: return self.modulefilename = filename self._vengine.collect_types() self._has_module = True def _write_source_to(self, file): self._vengine._f = file try: self._vengine.write_source_to_f() finally: del self._vengine._f def _write_source(self, file=None): if file is not None: self._write_source_to(file) else: # Write our source file to an in memory file. f = NativeIO() self._write_source_to(f) source_data = f.getvalue() # Determine if this matches the current file if os.path.exists(self.sourcefilename): with open(self.sourcefilename, "r") as fp: needs_written = not (fp.read() == source_data) else: needs_written = True # Actually write the file out if it doesn't match if needs_written: _ensure_dir(self.sourcefilename) with open(self.sourcefilename, "w") as fp: fp.write(source_data) # Set this flag self._has_source = True def _compile_module(self): # compile this C source tmpdir = os.path.dirname(self.sourcefilename) outputfilename = ffiplatform.compile(tmpdir, self.get_extension()) try: same = ffiplatform.samefile(outputfilename, self.modulefilename) except OSError: same = False if not same: _ensure_dir(self.modulefilename) shutil.move(outputfilename, self.modulefilename) self._has_module = True def _load_library(self): assert self._has_module if self.flags is not None: return self._vengine.load_library(self.flags) else: return self._vengine.load_library() # ____________________________________________________________ _FORCE_GENERIC_ENGINE = False # for tests def _locate_engine_class(ffi, force_generic_engine): if _FORCE_GENERIC_ENGINE: force_generic_engine = True if not force_generic_engine: if '__pypy__' in sys.builtin_module_names: force_generic_engine = True else: try: import _cffi_backend except ImportError: _cffi_backend = '?' if ffi._backend is not _cffi_backend: force_generic_engine = True if force_generic_engine: from . import vengine_gen return vengine_gen.VGenericEngine else: from . import vengine_cpy return vengine_cpy.VCPythonEngine # ____________________________________________________________ _TMPDIR = None def _caller_dir_pycache(): if _TMPDIR: return _TMPDIR result = os.environ.get('CFFI_TMPDIR') if result: return result filename = sys._getframe(2).f_code.co_filename return os.path.abspath(os.path.join(os.path.dirname(filename), '__pycache__')) def set_tmpdir(dirname): """Set the temporary directory to use instead of __pycache__.""" global _TMPDIR _TMPDIR = dirname def cleanup_tmpdir(tmpdir=None, keep_so=False): """Clean up the temporary directory by removing all files in it called `_cffi_*.{c,so}` as well as the `build` subdirectory.""" tmpdir = tmpdir or _caller_dir_pycache() try: filelist = os.listdir(tmpdir) except OSError: return if keep_so: suffix = '.c' # only remove .c files else: suffix = _get_so_suffixes()[0].lower() for fn in filelist: if fn.lower().startswith('_cffi_') and ( fn.lower().endswith(suffix) or fn.lower().endswith('.c')): try: os.unlink(os.path.join(tmpdir, fn)) except OSError: pass clean_dir = [os.path.join(tmpdir, 'build')] for dir in clean_dir: try: for fn in os.listdir(dir): fn = os.path.join(dir, fn) if os.path.isdir(fn): clean_dir.append(fn) else: os.unlink(fn) except OSError: pass def _get_so_suffixes(): suffixes = _extension_suffixes() if not suffixes: # bah, no C_EXTENSION available. Occurs on pypy without cpyext if sys.platform == 'win32': suffixes = [".pyd"] else: suffixes = [".so"] return suffixes def _ensure_dir(filename): dirname = os.path.dirname(filename) if dirname and not os.path.isdir(dirname): os.makedirs(dirname)