Fixed an issue that prevented the password reset tokens from working. Added email templates for password reset success and new account creation. Added more dynamic email template support.
306 lines
11 KiB
Python
306 lines
11 KiB
Python
#
|
|
# DEPRECATED: implementation for ffi.verify()
|
|
#
|
|
import sys, os, binascii, shutil, io
|
|
from . import __version_verifier_modules__
|
|
from . import ffiplatform
|
|
from .error import VerificationError
|
|
|
|
if sys.version_info >= (3, 3):
|
|
import importlib.machinery
|
|
def _extension_suffixes():
|
|
return importlib.machinery.EXTENSION_SUFFIXES[:]
|
|
else:
|
|
import imp
|
|
def _extension_suffixes():
|
|
return [suffix for suffix, _, type in imp.get_suffixes()
|
|
if type == imp.C_EXTENSION]
|
|
|
|
|
|
if sys.version_info >= (3,):
|
|
NativeIO = io.StringIO
|
|
else:
|
|
class NativeIO(io.BytesIO):
|
|
def write(self, s):
|
|
if isinstance(s, unicode):
|
|
s = s.encode('ascii')
|
|
super(NativeIO, self).write(s)
|
|
|
|
|
|
class Verifier(object):
|
|
|
|
def __init__(self, ffi, preamble, tmpdir=None, modulename=None,
|
|
ext_package=None, tag='', force_generic_engine=False,
|
|
source_extension='.c', flags=None, relative_to=None, **kwds):
|
|
if ffi._parser._uses_new_feature:
|
|
raise VerificationError(
|
|
"feature not supported with ffi.verify(), but only "
|
|
"with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,))
|
|
self.ffi = ffi
|
|
self.preamble = preamble
|
|
if not modulename:
|
|
flattened_kwds = ffiplatform.flatten(kwds)
|
|
vengine_class = _locate_engine_class(ffi, force_generic_engine)
|
|
self._vengine = vengine_class(self)
|
|
self._vengine.patch_extension_kwds(kwds)
|
|
self.flags = flags
|
|
self.kwds = self.make_relative_to(kwds, relative_to)
|
|
#
|
|
if modulename:
|
|
if tag:
|
|
raise TypeError("can't specify both 'modulename' and 'tag'")
|
|
else:
|
|
key = '\x00'.join(['%d.%d' % sys.version_info[:2],
|
|
__version_verifier_modules__,
|
|
preamble, flattened_kwds] +
|
|
ffi._cdefsources)
|
|
if sys.version_info >= (3,):
|
|
key = key.encode('utf-8')
|
|
k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff)
|
|
k1 = k1.lstrip('0x').rstrip('L')
|
|
k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff)
|
|
k2 = k2.lstrip('0').rstrip('L')
|
|
modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key,
|
|
k1, k2)
|
|
suffix = _get_so_suffixes()[0]
|
|
self.tmpdir = tmpdir or _caller_dir_pycache()
|
|
self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension)
|
|
self.modulefilename = os.path.join(self.tmpdir, modulename + suffix)
|
|
self.ext_package = ext_package
|
|
self._has_source = False
|
|
self._has_module = False
|
|
|
|
def write_source(self, file=None):
|
|
"""Write the C source code. It is produced in 'self.sourcefilename',
|
|
which can be tweaked beforehand."""
|
|
with self.ffi._lock:
|
|
if self._has_source and file is None:
|
|
raise VerificationError(
|
|
"source code already written")
|
|
self._write_source(file)
|
|
|
|
def compile_module(self):
|
|
"""Write the C source code (if not done already) and compile it.
|
|
This produces a dynamic link library in 'self.modulefilename'."""
|
|
with self.ffi._lock:
|
|
if self._has_module:
|
|
raise VerificationError("module already compiled")
|
|
if not self._has_source:
|
|
self._write_source()
|
|
self._compile_module()
|
|
|
|
def load_library(self):
|
|
"""Get a C module from this Verifier instance.
|
|
Returns an instance of a FFILibrary class that behaves like the
|
|
objects returned by ffi.dlopen(), but that delegates all
|
|
operations to the C module. If necessary, the C code is written
|
|
and compiled first.
|
|
"""
|
|
with self.ffi._lock:
|
|
if not self._has_module:
|
|
self._locate_module()
|
|
if not self._has_module:
|
|
if not self._has_source:
|
|
self._write_source()
|
|
self._compile_module()
|
|
return self._load_library()
|
|
|
|
def get_module_name(self):
|
|
basename = os.path.basename(self.modulefilename)
|
|
# kill both the .so extension and the other .'s, as introduced
|
|
# by Python 3: 'basename.cpython-33m.so'
|
|
basename = basename.split('.', 1)[0]
|
|
# and the _d added in Python 2 debug builds --- but try to be
|
|
# conservative and not kill a legitimate _d
|
|
if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'):
|
|
basename = basename[:-2]
|
|
return basename
|
|
|
|
def get_extension(self):
|
|
if not self._has_source:
|
|
with self.ffi._lock:
|
|
if not self._has_source:
|
|
self._write_source()
|
|
sourcename = ffiplatform.maybe_relative_path(self.sourcefilename)
|
|
modname = self.get_module_name()
|
|
return ffiplatform.get_extension(sourcename, modname, **self.kwds)
|
|
|
|
def generates_python_module(self):
|
|
return self._vengine._gen_python_module
|
|
|
|
def make_relative_to(self, kwds, relative_to):
|
|
if relative_to and os.path.dirname(relative_to):
|
|
dirname = os.path.dirname(relative_to)
|
|
kwds = kwds.copy()
|
|
for key in ffiplatform.LIST_OF_FILE_NAMES:
|
|
if key in kwds:
|
|
lst = kwds[key]
|
|
if not isinstance(lst, (list, tuple)):
|
|
raise TypeError("keyword '%s' should be a list or tuple"
|
|
% (key,))
|
|
lst = [os.path.join(dirname, fn) for fn in lst]
|
|
kwds[key] = lst
|
|
return kwds
|
|
|
|
# ----------
|
|
|
|
def _locate_module(self):
|
|
if not os.path.isfile(self.modulefilename):
|
|
if self.ext_package:
|
|
try:
|
|
pkg = __import__(self.ext_package, None, None, ['__doc__'])
|
|
except ImportError:
|
|
return # cannot import the package itself, give up
|
|
# (e.g. it might be called differently before installation)
|
|
path = pkg.__path__
|
|
else:
|
|
path = None
|
|
filename = self._vengine.find_module(self.get_module_name(), path,
|
|
_get_so_suffixes())
|
|
if filename is None:
|
|
return
|
|
self.modulefilename = filename
|
|
self._vengine.collect_types()
|
|
self._has_module = True
|
|
|
|
def _write_source_to(self, file):
|
|
self._vengine._f = file
|
|
try:
|
|
self._vengine.write_source_to_f()
|
|
finally:
|
|
del self._vengine._f
|
|
|
|
def _write_source(self, file=None):
|
|
if file is not None:
|
|
self._write_source_to(file)
|
|
else:
|
|
# Write our source file to an in memory file.
|
|
f = NativeIO()
|
|
self._write_source_to(f)
|
|
source_data = f.getvalue()
|
|
|
|
# Determine if this matches the current file
|
|
if os.path.exists(self.sourcefilename):
|
|
with open(self.sourcefilename, "r") as fp:
|
|
needs_written = not (fp.read() == source_data)
|
|
else:
|
|
needs_written = True
|
|
|
|
# Actually write the file out if it doesn't match
|
|
if needs_written:
|
|
_ensure_dir(self.sourcefilename)
|
|
with open(self.sourcefilename, "w") as fp:
|
|
fp.write(source_data)
|
|
|
|
# Set this flag
|
|
self._has_source = True
|
|
|
|
def _compile_module(self):
|
|
# compile this C source
|
|
tmpdir = os.path.dirname(self.sourcefilename)
|
|
outputfilename = ffiplatform.compile(tmpdir, self.get_extension())
|
|
try:
|
|
same = ffiplatform.samefile(outputfilename, self.modulefilename)
|
|
except OSError:
|
|
same = False
|
|
if not same:
|
|
_ensure_dir(self.modulefilename)
|
|
shutil.move(outputfilename, self.modulefilename)
|
|
self._has_module = True
|
|
|
|
def _load_library(self):
|
|
assert self._has_module
|
|
if self.flags is not None:
|
|
return self._vengine.load_library(self.flags)
|
|
else:
|
|
return self._vengine.load_library()
|
|
|
|
# ____________________________________________________________
|
|
|
|
_FORCE_GENERIC_ENGINE = False # for tests
|
|
|
|
def _locate_engine_class(ffi, force_generic_engine):
|
|
if _FORCE_GENERIC_ENGINE:
|
|
force_generic_engine = True
|
|
if not force_generic_engine:
|
|
if '__pypy__' in sys.builtin_module_names:
|
|
force_generic_engine = True
|
|
else:
|
|
try:
|
|
import _cffi_backend
|
|
except ImportError:
|
|
_cffi_backend = '?'
|
|
if ffi._backend is not _cffi_backend:
|
|
force_generic_engine = True
|
|
if force_generic_engine:
|
|
from . import vengine_gen
|
|
return vengine_gen.VGenericEngine
|
|
else:
|
|
from . import vengine_cpy
|
|
return vengine_cpy.VCPythonEngine
|
|
|
|
# ____________________________________________________________
|
|
|
|
_TMPDIR = None
|
|
|
|
def _caller_dir_pycache():
|
|
if _TMPDIR:
|
|
return _TMPDIR
|
|
result = os.environ.get('CFFI_TMPDIR')
|
|
if result:
|
|
return result
|
|
filename = sys._getframe(2).f_code.co_filename
|
|
return os.path.abspath(os.path.join(os.path.dirname(filename),
|
|
'__pycache__'))
|
|
|
|
def set_tmpdir(dirname):
|
|
"""Set the temporary directory to use instead of __pycache__."""
|
|
global _TMPDIR
|
|
_TMPDIR = dirname
|
|
|
|
def cleanup_tmpdir(tmpdir=None, keep_so=False):
|
|
"""Clean up the temporary directory by removing all files in it
|
|
called `_cffi_*.{c,so}` as well as the `build` subdirectory."""
|
|
tmpdir = tmpdir or _caller_dir_pycache()
|
|
try:
|
|
filelist = os.listdir(tmpdir)
|
|
except OSError:
|
|
return
|
|
if keep_so:
|
|
suffix = '.c' # only remove .c files
|
|
else:
|
|
suffix = _get_so_suffixes()[0].lower()
|
|
for fn in filelist:
|
|
if fn.lower().startswith('_cffi_') and (
|
|
fn.lower().endswith(suffix) or fn.lower().endswith('.c')):
|
|
try:
|
|
os.unlink(os.path.join(tmpdir, fn))
|
|
except OSError:
|
|
pass
|
|
clean_dir = [os.path.join(tmpdir, 'build')]
|
|
for dir in clean_dir:
|
|
try:
|
|
for fn in os.listdir(dir):
|
|
fn = os.path.join(dir, fn)
|
|
if os.path.isdir(fn):
|
|
clean_dir.append(fn)
|
|
else:
|
|
os.unlink(fn)
|
|
except OSError:
|
|
pass
|
|
|
|
def _get_so_suffixes():
|
|
suffixes = _extension_suffixes()
|
|
if not suffixes:
|
|
# bah, no C_EXTENSION available. Occurs on pypy without cpyext
|
|
if sys.platform == 'win32':
|
|
suffixes = [".pyd"]
|
|
else:
|
|
suffixes = [".so"]
|
|
|
|
return suffixes
|
|
|
|
def _ensure_dir(filename):
|
|
dirname = os.path.dirname(filename)
|
|
if dirname and not os.path.isdir(dirname):
|
|
os.makedirs(dirname)
|