Source code for sqlalchemy_utils.types.encrypted

# -*- coding: utf-8 -*-
import base64
import datetime

import six
from sqlalchemy.types import LargeBinary, String, TypeDecorator

from ..exceptions import ImproperlyConfigured
from .scalar_coercible import ScalarCoercible

cryptography = None
try:
    import cryptography
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.primitives.ciphers import (
        Cipher, algorithms, modes
    )
    from cryptography.fernet import Fernet
except ImportError:
    pass


dateutil = None
try:
    import dateutil
    from dateutil.parser import parse as datetime_parse
except ImportError:
    pass


class EncryptionDecryptionBaseEngine(object):
    """A base encryption and decryption engine.

    This class must be sub-classed in order to create
    new engines.
    """

    def _update_key(self, key):
        if isinstance(key, six.string_types):
            key = key.encode()
        digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
        digest.update(key)
        engine_key = digest.finalize()

        self._initialize_engine(engine_key)

    def encrypt(self, value):
        raise NotImplementedError('Subclasses must implement this!')

    def decrypt(self, value):
        raise NotImplementedError('Subclasses must implement this!')


class AesEngine(EncryptionDecryptionBaseEngine):
    """Provide AES encryption and decryption methods."""

    BLOCK_SIZE = 16
    PADDING = six.b('*')

    def _initialize_engine(self, parent_class_key):
        self.secret_key = parent_class_key
        self.iv = self.secret_key[:16]
        self.cipher = Cipher(
            algorithms.AES(self.secret_key),
            modes.CBC(self.iv),
            backend=default_backend()
        )

    def _pad(self, value):
        """Pad the message to be encrypted, if needed."""
        BS = self.BLOCK_SIZE
        P = self.PADDING
        padded = (value + (BS - len(value) % BS) * P)
        return padded

    def encrypt(self, value):
        if not isinstance(value, six.string_types):
            value = repr(value)
        if isinstance(value, six.text_type):
            value = str(value)
        value = value.encode()
        value = self._pad(value)
        encryptor = self.cipher.encryptor()
        encrypted = encryptor.update(value) + encryptor.finalize()
        encrypted = base64.b64encode(encrypted)
        return encrypted

    def decrypt(self, value):
        if isinstance(value, six.text_type):
            value = str(value)
        decryptor = self.cipher.decryptor()
        decrypted = base64.b64decode(value)
        decrypted = decryptor.update(decrypted) + decryptor.finalize()
        decrypted = decrypted.rstrip(self.PADDING)
        if not isinstance(decrypted, six.string_types):
            try:
                decrypted = decrypted.decode('utf-8')
            except UnicodeDecodeError:
                raise ValueError('Invalid decryption key')
        return decrypted


class FernetEngine(EncryptionDecryptionBaseEngine):
    """Provide Fernet encryption and decryption methods."""

    def _initialize_engine(self, parent_class_key):
        self.secret_key = base64.urlsafe_b64encode(parent_class_key)
        self.fernet = Fernet(self.secret_key)

    def encrypt(self, value):
        if not isinstance(value, six.string_types):
            value = repr(value)
        if isinstance(value, six.text_type):
            value = str(value)
        value = value.encode()
        encrypted = self.fernet.encrypt(value)
        return encrypted

    def decrypt(self, value):
        if isinstance(value, six.text_type):
            value = str(value)
        decrypted = self.fernet.decrypt(value)
        if not isinstance(decrypted, six.string_types):
            decrypted = decrypted.decode('utf-8')
        return decrypted


[docs]class EncryptedType(TypeDecorator, ScalarCoercible): """ EncryptedType provides a way to encrypt and decrypt values, to and from databases, that their type is a basic SQLAlchemy type. For example Unicode, String or even Boolean. On the way in, the value is encrypted and on the way out the stored value is decrypted. EncryptedType needs Cryptography_ library in order to work. A simple example is given below. .. _Cryptography: https://cryptography.io/en/latest/ :: import sqlalchemy as sa from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy_utils import EncryptedType secret_key = 'secretkey1234' # setup engine = create_engine('sqlite:///:memory:') connection = engine.connect() Base = declarative_base() class User(Base): __tablename__ = "user" id = sa.Column(sa.Integer, primary_key=True) username = sa.Column(EncryptedType(sa.Unicode, secret_key)) access_token = sa.Column(EncryptedType(sa.String, secret_key)) is_active = sa.Column(EncryptedType(sa.Boolean, secret_key)) number_of_accounts = sa.Column(EncryptedType(sa.Integer, secret_key)) sa.orm.configure_mappers() Base.metadata.create_all(connection) # create a configured "Session" class Session = sessionmaker(bind=connection) # create a Session session = Session() # example user_name = u'secret_user' test_token = 'atesttoken' active = True num_of_accounts = 2 user = User(username=user_name, access_token=test_token, is_active=active, accounts_num=accounts) session.add(user) session.commit() print('id: {}'.format(user.id)) print('username: {}'.format(user.username)) print('token: {}'.format(user.access_token)) print('active: {}'.format(user.is_active)) print('accounts: {}'.format(user.accounts_num)) # teardown session.close_all() Base.metadata.drop_all(connection) connection.close() engine.dispose() The key parameter accepts a callable to allow for the key to change per-row instead of be fixed for the whole table. :: def get_key(): return 'dynamic-key' class User(Base): __tablename__ = 'user' id = sa.Column(sa.Integer, primary_key=True) username = sa.Column(EncryptedType( sa.Unicode, get_key)) """ impl = LargeBinary def __init__(self, type_in=None, key=None, engine=None, **kwargs): """Initialization.""" if not cryptography: raise ImproperlyConfigured( "'cryptography' is required to use EncryptedType" ) super(EncryptedType, self).__init__(**kwargs) # set the underlying type if type_in is None: type_in = String() elif isinstance(type_in, type): type_in = type_in() self.underlying_type = type_in self._key = key if not engine: engine = AesEngine self.engine = engine() @property def key(self): return self._key @key.setter def key(self, value): self._key = value def _update_key(self): key = self._key() if callable(self._key) else self._key self.engine._update_key(key) def process_bind_param(self, value, dialect): """Encrypt a value on the way in.""" if value is not None: self._update_key() try: value = self.underlying_type.process_bind_param( value, dialect ) except AttributeError: # Doesn't have 'process_bind_param' # Handle 'boolean' and 'dates' type_ = self.underlying_type.python_type if issubclass(type_, bool): value = 'true' if value else 'false' elif issubclass(type_, (datetime.date, datetime.time)): value = value.isoformat() return self.engine.encrypt(value) def process_result_value(self, value, dialect): """Decrypt value on the way out.""" if value is not None: self._update_key() decrypted_value = self.engine.decrypt(value) try: return self.underlying_type.process_result_value( decrypted_value, dialect ) except AttributeError: # Doesn't have 'process_result_value' # Handle 'boolean' and 'dates' type_ = self.underlying_type.python_type date_types = [datetime.datetime, datetime.time, datetime.date] if issubclass(type_, bool): return decrypted_value == 'true' elif type_ in date_types: return DatetimeHandler.process_value( decrypted_value, type_ ) # Handle all others return self.underlying_type.python_type(decrypted_value) def _coerce(self, value): if isinstance(self.underlying_type, ScalarCoercible): return self.underlying_type._coerce(value) return value
class DatetimeHandler(object): """ DatetimeHandler is responsible for parsing strings and returning the appropriate date, datetime or time objects. """ @classmethod def process_value(cls, value, python_type): """ process_value returns a datetime, date or time object according to a given string value and a python type. """ if not dateutil: raise ImproperlyConfigured( "'python-dateutil' is required to process datetimes" ) return_value = datetime_parse(value) if issubclass(python_type, datetime.datetime): return return_value elif issubclass(python_type, datetime.time): return return_value.time() elif issubclass(python_type, datetime.date): return return_value.date()