Source code for ez_crypto

# encoding=utf-8 ==============================================================#
#                                  ez_crypto                                   #
#==============================================================================#

#============#
#  Includes  #
#============#
from Crypto.Hash import SHA256 # considered more secure than SHA1
from Crypto.Hash import HMAC
from Crypto.Cipher import PKCS1_OAEP, AES
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_PSS
from Crypto import Random
import os.path as path
import ez_preferences as ep
import ez_user as eu

#==============================================================================#
#                               PrivateKeyError                                #
#==============================================================================#

[docs]class PrivateKeyError(Exception): """ PrivateKeyError exception is raised if the private key is not found or corrupted. """ def __init__(self, value): self.value = value def __str__(self): return repr(self.value) #==============================================================================# # PublicKeyError # #==============================================================================#
[docs]class PublicKeyError(Exception): """ PublicKeyError exception is raised if the public key is not found or corrupted. """ def __init__(self, value): self.value = value def __str__(self): return repr(self.value) #==============================================================================# # class CryptoBaseClass # #==============================================================================#
[docs]class CryptoBaseClass(object): """ Base class defining common functions. """
[docs] def attribute_setter(self, **kwargs): """ Sets kwargs as instance attributes. """ for key, value in kwargs.iteritems(): setattr(self, key, value)
[docs] def return_dict(self, return_list): """ Extracts instance attributes into dictionary from return_list. """ return {k:v for k, v in self.__dict__.iteritems() if k in return_list} #==============================================================================# # class eZ_CryptoScheme # #==============================================================================#
[docs]class eZ_CryptoScheme(CryptoBaseClass): """ Outer crypto API to encrypt+sign and decrypt+verify message objects. Encryption must be provided as dictionary with following keys: ['etime', 'sender', 'recipient', 'content'] """ def __init__(self, **kwargs): self.attribute_setter(**kwargs)
[docs] def encrypt_sign(self): """ Pack content, exact time and sender to plaintext block. Sign and encrypt plaintext block. Return crypto items as dictionary. """ encrypt_items = ['ciphered_key', 'iv', 'crypt_mode', 'cipher', 'recipient', 'ciphered_mac'] # Encode with AES: _plain_block = "\1".join([self.etime, self.sender, self.content]) _aes_output = eZ_AES(_plain_block).encrypt() self.attribute_setter(**_aes_output) # encode AES-key and HMAC with public RSA key: _public_key = eZ_RSA().get_public_key(self.recipient) self.ciphered_key = eZ_RSA().encrypt(_public_key, self.key) self.ciphered_mac = eZ_RSA().encrypt(_public_key, self.hmac) return self.return_dict(encrypt_items)
[docs] def decrypt_verify(self): """ Decrypt and unpack cipher block, check HMAC. Return HMAC check result in 'authorized' key, as well as the other plaintext attributes. """ _aes_items = ['key', 'iv', 'crypt_mode', 'cipher', 'hmac'] decrypt_items = ['etime', 'content', 'sender', 'recipient', 'authorized'] # Decrypt AES key and HMAC: _private_key = eZ_RSA().get_private_key(self.recipient) self.key = eZ_RSA().decrypt(_private_key, self.ciphered_key) self.hmac = eZ_RSA().decrypt(_private_key, self.ciphered_mac) # Decrypt cipher block (and HMAC check inside AES class): _aes_input = self.return_dict(_aes_items) _aes_output = eZ_AES(**_aes_input).decrypt() self.attribute_setter(**_aes_output) # Unpack plaintext block (self.etime, self.sender, self.content) = self.plain.split("\1") return self.return_dict(decrypt_items) #==============================================================================# # class eZ_RSA # #==============================================================================#
[docs]class eZ_RSA(CryptoBaseClass): """ RSA cipher object. Provides asymmetric encrytpion. Recommended minimal keylength: 2048 bit. """ RSA_KEY_SIZE = 2048
[docs] def priv_key_loc(self, user): """ Sets the path for the private keyfiles. Base path retrieved from the user preferences. """ return ep.join(ep.location['key'], 'ez_rsa_' + user + '.priv')
[docs] def get_private_key(self, user): """ Import the senders keypair from Harddisk. """ try: with open(self.priv_key_loc(user), 'r') as keypairfile: keypair = RSA.importKey(keypairfile.read()) return keypair except IOError: raise PrivateKeyError('Could not get private key from file!')
[docs] def get_public_key(self, user): """ Get recipient public key from database. """ try: pub_key_stored = eu.user_database.get_entry(name=user).public_key return RSA.importKey(pub_key_stored) except: raise PublicKeyError("Could not get public key from database!") #self.shutdown()
[docs] def generate_keys(self, user, testing=False): """ Create RSA keypair, return the exported public key, which will be stored in the database, and write the exported private key to disc. """ fresh_key = RSA.generate(eZ_RSA.RSA_KEY_SIZE) private_key = fresh_key public_key = fresh_key.publickey() if testing: return private_key, public_key else: #pragma : no cover try: with open(self.priv_key_loc(user), 'w') as priv_file: priv_file.write(private_key.exportKey()) return public_key.exportKey() except IOError: return None
[docs] def encrypt(self, public_key, plaintext): """ RSA encrypt method, PKCS1_OAEP. (See PyCrypto documentation for further information.) """ cipher_scheme = PKCS1_OAEP.new(public_key) cipher = cipher_scheme.encrypt(plaintext) return cipher.encode('base64')
[docs] def decrypt(self, private_key, ciphertext): """ RSA decrypt method, PKCS1_OAEP. (See PyCrypto documentation for further information.) """ decipher_scheme = PKCS1_OAEP.new(private_key) try: plaintext = decipher_scheme.decrypt(ciphertext.decode('base64')) return plaintext except ValueError: print("Warning: Could not decrypt, wrong format.")
[docs] def sign(self, private_key, plaintext): """ Sign plaintext with private key. """ msg_hash = SHA256.new(plaintext) signer = PKCS1_PSS.new(private_key) signature = signer.sign(msg_hash) return signature.encode('base64')
[docs] def verify(self, public_key, plaintext, signature): """ Verify signature against plaintext with public key. Return True if successful, false otherwise. """ msg_hash = SHA256.new(plaintext) verifier = PKCS1_PSS.new(public_key) try: return verifier.verify(msg_hash, signature.decode('base64')) except: return False #==============================================================================# # class eZ_AES # #==============================================================================#
[docs]class eZ_AES(CryptoBaseClass): """ AES cipher object. Provides symmetric encryption. Requires plaintext string or ciphered dictionary object. Dictionary object must contain following keys: ['iv', 'key', 'cipher'] Encryption parameters as of crypt_mode_1: keylength = 32 Bytes, padding = '\01\00\00...', AES cipher mode = Cipher Block Chain. """ def __init__(self, plaintext=None, **kwargs): crypt_parameters_mode_1 = {'KEY_LENGTH':32, 'INTERRUPT':"\1", 'PAD':"\0", 'MODE': AES.MODE_CBC} self.attribute_setter(**crypt_parameters_mode_1) if type(plaintext) is str: self.plain = plaintext self.crypt_mode = 0 if kwargs: self.attribute_setter(**kwargs)
[docs] def encrypt(self): """ Creates random IV (Injection Vector) and random symmetric key. Encrypts padded text. Returns dictionary with base64 encoded ciphertext, key, IV and the crypt_mode used. """ assert self.crypt_mode is 0, "Can not encrypt. Data already encrypted" # changed to EtA # !! check for iv and mac still needed # http://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac # http://cseweb.ucsd.edu/~mihir/papers/oem.html _iv = RNG.read(AES.block_size) _key = RNG.read(self.KEY_LENGTH) _crypter = AES.new(_key, mode=self.MODE, IV=_iv) padded_text = self.add_padding(self.plain) self.crypt_mode = 1 self.cipher = _crypter.encrypt(padded_text).encode('base64') self.key = _key.encode('base64') self.iv = _iv.encode('base64') self.hmac = self.hmac_digest(_key, self.cipher) # Create HMAC encrypt_items = ['key', 'iv', 'crypt_mode', 'cipher', 'hmac'] return self.return_dict(encrypt_items)
[docs] def decrypt(self): """ Produces plaintext from ciphertext, if provided with correct key and encryption parameters. """ assert self.crypt_mode is not 0, "Can not decrypt. Data is not encrypted" _key = self.key.decode('base64') _iv = self.iv.decode('base64') _cipher = self.cipher.decode('base64') self.authorized = self.hmac_verify(_key, self.cipher, self.hmac) if self.authorized: decrypter = AES.new(_key, mode=self.MODE, IV=_iv) padded_text = decrypter.decrypt(_cipher) self.plain = self.remove_padding(padded_text) self.crypt_mode = 0 else: raise ValueError("HMAC Authentification failed") self.crypt_mode = 1 self.plain = None plain_items = ['plain', 'crypt_mode', 'authorized'] return self.return_dict(plain_items)
[docs] def hmac_verify(self, key, plaintext, hexmac_to_verify): """ Return bool. True if verification sucessfull, False otherwise. """ mac_object = HMAC.new(key, digestmod=SHA256) mac_object.update(plaintext) if mac_object.hexdigest() == hexmac_to_verify: authorized = True else: authorized = False return authorized
[docs] def hmac_digest(self, key, plaintext): """ Returns the hexdigest of a message, if provided with key. """ mac_object = HMAC.new(key, digestmod=SHA256) mac_object.update(plaintext) return mac_object.hexdigest()
[docs] def add_padding(self, text): """ Pads text to whole blocks (AES blocksize = 16). Padding scheme is binary '100000...'. If message length is multiple of blocksize, a whole additional block will be padded. """ pad_length = AES.block_size - len(text) % AES.block_size if pad_length: pass else: pad_length = AES.block_size return text + self.INTERRUPT + (pad_length - 1) * self.PAD
[docs] def remove_padding(self, text): """ Unpads decrypted text. Removes rightmost zeros and one (interrupt) byte. """ return text.rstrip(self.PAD)[:-1] #==============================================================================# # GLOBAL INSTANCES # #==============================================================================# # Strong random generator as file object:
RNG = Random.new()