"""M2Crypto wrapper for OpenSSL EVP API.
Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved.
Portions Copyright (c) 2004-2007 Open Source Applications Foundation.
Author: Heikki Toivonen
"""
import logging
from M2Crypto import BIO, Err, RSA, EC, m2, util, types as C
from typing import Optional, Callable, Union # noqa
log = logging.getLogger("EVP")
[docs]
class EVPError(ValueError):
pass
m2.evp_init(EVPError)
[docs]
def pbkdf2(password: bytes, salt: bytes, iter: int, keylen: int) -> bytes:
"""
Derive a key from password using PBKDF2 algorithm specified in RFC 2898.
:param password: Derive the key from this password.
:param salt: Salt.
:param iter: Number of iterations to perform.
:param keylen: Length of key to produce.
:return: Key.
"""
return m2.pkcs5_pbkdf2_hmac_sha1(password, salt, iter, keylen)
[docs]
class MessageDigest(object):
"""
Message Digest
"""
[docs]
@staticmethod
def m2_md_ctx_free(ctx: C.EVP_MD_CTX) -> None:
m2.md_ctx_free(ctx)
def __init__(self, algo: str) -> None:
md: Optional[Callable] = getattr(m2, algo, None)
if md is None:
# if the digest algorithm isn't found as an attribute of the m2
# module, try to look up the digest using get_digestbyname()
self.md = m2.get_digestbyname(algo)
if self.md is None:
raise EVPError("unknown algorithm %s" % (algo,))
else:
self.md = md()
self.ctx = m2.md_ctx_new()
m2.digest_init(self.ctx, self.md)
def __del__(self) -> None:
if getattr(self, "ctx", None):
self.m2_md_ctx_free(self.ctx)
[docs]
def update(self, data: bytes) -> int:
"""
Add data to be digested.
:return: -1 for Python error, 1 for success, 0 for OpenSSL failure.
"""
return m2.digest_update(self.ctx, data)
[docs]
def final(self):
return m2.digest_final(self.ctx)
# Deprecated.
digest = final
[docs]
class HMAC(object):
[docs]
@staticmethod
def m2_hmac_ctx_free(ctx: C.HMAC_CTX) -> None:
m2.hmac_ctx_free(ctx)
def __init__(self, key: bytes, algo: str = "sha256") -> None:
md = getattr(m2, algo, None)
if md is None:
raise ValueError("unknown algorithm", algo)
self.md = md()
self.ctx = m2.hmac_ctx_new()
m2.hmac_init(self.ctx, key, self.md)
def __del__(self) -> None:
if getattr(self, "ctx", None):
self.m2_hmac_ctx_free(self.ctx)
[docs]
def reset(self, key: bytes) -> None:
m2.hmac_init(self.ctx, key, self.md)
[docs]
def update(self, data: bytes) -> None:
m2.hmac_update(self.ctx, data)
[docs]
def final(self) -> bytes:
return m2.hmac_final(self.ctx)
digest = final
[docs]
def hmac(key: bytes, data: bytes, algo: str = "sha256") -> bytes:
md = getattr(m2, algo, None)
if md is None:
raise ValueError("unknown algorithm", algo)
return m2.hmac(key, data, md())
[docs]
class Cipher(object):
[docs]
@staticmethod
def m2_cipher_ctx_free(ctx: C.EVP_CIPHER_CTX) -> None:
m2.cipher_ctx_free(ctx)
def __init__(
self,
alg: str,
key: bytes,
iv: bytes,
op: int,
key_as_bytes: int = 0,
d: str = "md5",
salt: bytes = b"12345678",
i: int = 1,
padding: int = 1,
) -> None:
cipher = getattr(m2, alg, None)
if cipher is None:
raise ValueError("unknown cipher", alg)
self.cipher = cipher()
if key_as_bytes:
kmd = getattr(m2, d, None)
if kmd is None:
raise ValueError("unknown message digest", d)
key = m2.bytes_to_key(self.cipher, kmd(), key, salt, iv, i)
self.ctx = m2.cipher_ctx_new()
m2.cipher_init(self.ctx, self.cipher, key, iv, op)
self.set_padding(padding)
del key
def __del__(self) -> None:
if getattr(self, "ctx", None):
self.m2_cipher_ctx_free(self.ctx)
[docs]
def update(self, data: bytes) -> bytes:
return m2.cipher_update(self.ctx, data)
[docs]
def final(self) -> bytes:
return m2.cipher_final(self.ctx)
[docs]
def set_padding(self, padding: int = 1) -> int:
"""
Actually always return 1
"""
return m2.cipher_set_padding(self.ctx, padding)
[docs]
class PKey(object):
"""
Object to hold diverse types of asymmetric keys (also known
as "key pairs").
"""
def __init__(
self,
pkey: Optional[C.EVP_PKEY] = None,
_pyfree: int = 0,
md: str = "sha256",
) -> None:
if pkey is not None:
self.pkey = pkey
self._pyfree = _pyfree
else:
self.pkey = m2.pkey_new()
self._pyfree = 1
self._set_context(md)
@staticmethod
def _m2_pkey_free(pkey: C.EVP_PKEY) -> None:
m2.pkey_free(pkey)
@staticmethod
def _m2_md_ctx_free(ctx: C.EVP_MD_CTX) -> None:
m2.md_ctx_free(ctx)
def __del__(self) -> None:
if getattr(self, "_pyfree", 0):
self._m2_pkey_free(self.pkey)
if getattr(self, "ctx", None):
self._m2_md_ctx_free(self.ctx)
def _ptr(self):
return self.pkey
def _set_context(self, md: str) -> None:
if not md:
self.md = None
else:
mda: Optional[Callable] = getattr(m2, md, None)
if mda is None:
raise ValueError("unknown message digest", md)
self.md = mda()
self.ctx: C.EVP_MD_CTX = m2.md_ctx_new()
[docs]
def reset_context(self, md: str = "sha256") -> None:
"""
Reset internal message digest context.
:param md: The message digest algorithm.
"""
self._set_context(md)
[docs]
def sign_init(self) -> None:
"""
Initialise signing operation with self.
"""
if self.md is None:
raise EVPError("Digest algorithm not set")
m2.sign_init(self.ctx, self.md)
[docs]
def sign_update(self, data: bytes) -> None:
"""
Feed data to signing operation.
:param data: Data to be signed.
"""
m2.sign_update(self.ctx, data)
[docs]
def sign_final(self) -> bytes:
"""
Return signature.
:return: The signature.
"""
return m2.sign_final(self.ctx, self.pkey)
# Deprecated
update = sign_update
final = sign_final
[docs]
def verify_init(self) -> None:
"""
Initialise signature verification operation with self.
"""
if self.md is None:
raise EVPError("Digest algorithm not set")
m2.verify_init(self.ctx, self.md)
[docs]
def verify_update(self, data: bytes) -> int:
"""
Feed data to verification operation.
:param data: Data to be verified.
:return: -1 on Python error, 1 for success, 0 for OpenSSL error
"""
return m2.verify_update(self.ctx, data)
[docs]
def verify_final(self, sign: bytes) -> int:
"""
Return result of verification.
:param sign: Signature to use for verification
:return: Result of verification: 1 for success, 0 for failure, -1 on
other error.
"""
return m2.verify_final(self.ctx, sign, self.pkey)
[docs]
def digest_sign_init(self) -> None:
"""
Initialise digest signing operation with self.
"""
if self.md is None:
m2.digest_sign_init(self.ctx, self.pkey)
else:
m2.digest_sign_init(self.ctx, None, self.md, None, self.pkey)
[docs]
def digest_sign_update(self, data: bytes) -> None:
"""
Feed data to digest signing operation.
:param data: Data to be signed.
"""
m2.digest_sign_update(self.ctx, data)
[docs]
def digest_sign_final(self) -> bytes:
"""
Return signature.
:return: The signature.
"""
return m2.digest_sign_final(self.ctx)
[docs]
def digest_sign(self, data: bytes) -> bytes:
"""
Return signature.
:return: The signature.
"""
if not hasattr(m2, "digest_sign"):
raise NotImplemented(
"This method requires OpenSSL version " + "1.1.1 or greater."
) # type: ignore[misc]
return m2.digest_sign(self.ctx, data)
[docs]
def digest_verify_init(self) -> None:
"""
Initialise verification operation with self.
"""
if self.md is None:
m2.digest_verify_init(self.ctx, self.pkey)
else:
m2.digest_verify_init(self.ctx, None, self.md, None, self.pkey)
[docs]
def digest_verify_update(self, data: bytes) -> int:
"""
Feed data to verification operation.
:param data: Data to be verified.
:return: -1 on Python error, 1 for success, 0 for OpenSSL error
"""
return m2.digest_verify_update(self.ctx, data)
[docs]
def digest_verify_final(self, sign: bytes) -> int:
"""
Feed data to digest verification operation.
:param sign: Signature to use for verification
:return: Result of verification: 1 for success, 0 for failure, -1 on
other error.
"""
return m2.digest_verify_final(self.ctx, sign)
[docs]
def digest_verify(self, sign: bytes, data: bytes) -> int:
"""
Return result of verification.
:param sign: Signature to use for verification
:param data: Data to be verified.
:return: Result of verification: 1 for success, 0 for failure, -1 on
other error.
"""
if not hasattr(m2, "digest_verify"):
raise NotImplemented(
"This method requires OpenSSL version " + "1.1.1 or greater."
) # type: ignore[misc]
return m2.digest_verify(self.ctx, sign, data)
[docs]
def assign_rsa(self, rsa: RSA.RSA, capture: int = 1) -> int:
"""
Assign the RSA key pair to self.
:param rsa: M2Crypto.RSA.RSA object to be assigned to self.
:param capture: If true (default), this PKey object will own the RSA
object, meaning that once the PKey object gets
deleted it is no longer safe to use the RSA object.
:return: Return 1 for success and 0 for failure.
"""
if capture:
ret = m2.pkey_assign_rsa(self.pkey, rsa.rsa)
if ret:
rsa._pyfree = 0
else:
ret = m2.pkey_set1_rsa(self.pkey, rsa.rsa)
return ret
[docs]
def get_rsa(self) -> RSA.RSA_pub:
"""
Return the underlying RSA key if that is what the EVP
instance is holding.
"""
rsa_ptr = m2.pkey_get1_rsa(self.pkey)
if rsa_ptr is None:
raise EVPError("Not an RSA key")
rsa = RSA.RSA_pub(rsa_ptr, 1)
return rsa
[docs]
def assign_ec(self, ec: "EC.EC", capture: int = 1) -> int:
"""
Assign the EC key pair to self.
:param ec: M2Crypto.EC.EC object to be assigned to self.
:param capture: If true (default), this PKey object will own the EC
object, meaning that once the PKey object gets
deleted it is no longer safe to use the EC object.
:return: Return 1 for success and 0 for failure.
"""
if capture:
ret = m2.pkey_assign_ec(self.pkey, ec.ec)
if ret:
ec._pyfree = 0
else:
ret = m2.pkey_set1_ec(self.pkey, ec.ec)
return ret
[docs]
def get_ec(self) -> "EC.EC_pub":
"""
Return the underlying EC key if that is what the EVP
instance is holding.
"""
ec_ptr = m2.pkey_get1_ec(self.pkey)
if ec_ptr is None:
raise ValueError("Not an EC key")
ec = EC.EC_pub(ec_ptr, 1)
return ec
[docs]
def save_key(
self,
file: Union[str, bytes],
cipher: Optional[str] = "aes_128_cbc",
callback: Callable = util.passphrase_callback,
) -> int:
"""
Save the key pair to a file in PEM format.
:param file: Name of file to save key to.
:param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then
the key is saved in the clear.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect
the key. The default is
util.passphrase_callback.
"""
with BIO.openfile(file, "wb") as bio:
return self.save_key_bio(bio, cipher, callback)
[docs]
def save_key_bio(
self,
bio: BIO.BIO,
cipher: Optional[str] = "aes_128_cbc",
callback: Callable = util.passphrase_callback,
) -> int:
"""
Save the key pair to the M2Crypto.BIO object 'bio' in PEM format.
:param bio: M2Crypto.BIO object to save key to.
:param cipher: Symmetric cipher to protect the key. The default
cipher is 'aes_128_cbc'. If cipher is None, then
the key is saved in the clear.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect
the key. The default is
util.passphrase_callback.
"""
if cipher is None:
return m2.pkey_write_pem_no_cipher(self.pkey, bio._ptr(), callback)
else:
proto = getattr(m2, cipher, None)
if proto is None:
raise ValueError("no such cipher %s" % cipher)
return m2.pkey_write_pem(self.pkey, bio._ptr(), proto(), callback)
[docs]
def as_pem(
self,
cipher: Optional[str] = "aes_128_cbc",
callback: Callable = util.passphrase_callback,
) -> bytes:
"""
Return key in PEM format in a string.
:param cipher: Symmetric cipher to protect the key. The default
cipher is ``'aes_128_cbc'``. If cipher is None,
then the key is saved in the clear.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect
the key. The default is
util.passphrase_callback.
"""
bio = BIO.MemoryBuffer()
self.save_key_bio(bio, cipher, callback)
return bio.read_all()
[docs]
def as_der(self) -> bytes:
"""
Return key in DER format in a string
"""
buf = m2.pkey_as_der(self.pkey)
bio = BIO.MemoryBuffer(buf)
return bio.read_all()
[docs]
def size(self) -> int:
"""
Return the size of the key in bytes.
"""
return m2.pkey_size(self.pkey)
[docs]
def get_modulus(self) -> Optional[bytes]:
"""
Return the modulus in hex format.
"""
return m2.pkey_get_modulus(self.pkey)
[docs]
def get_key_identifier(self) -> bytes:
"""
Return the Subject Key Identifier (SKID) calculated as the SHA-1
hash of the raw public key bytes (method 1 in RFC 5280, section 4.2.1.2).
:return: SHA1 digest (20 bytes) of the public key.
"""
return m2.pkey_get_raw_pub_key_sha1(self.pkey)
[docs]
def load_key(
file: Union[str, bytes],
callback: Callable = util.passphrase_callback,
) -> PKey:
"""
Load an M2Crypto.EVP.PKey from file.
:param file: Name of file containing the key in PEM format.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the
key.
:return: M2Crypto.EVP.PKey object.
"""
with BIO.openfile(file, "r") as bio:
cptr = m2.pkey_read_pem(bio.bio, callback)
return PKey(cptr, 1)
[docs]
def load_key_pubkey(
file: Union[str, bytes],
callback: Callable = util.passphrase_callback,
) -> PKey:
"""
Load an M2Crypto.EVP.PKey from a public key as a file.
:param file: Name of file containing the key in PEM format.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the
key.
:return: M2Crypto.EVP.PKey object.
"""
with BIO.openfile(file, "r") as bio:
cptr = m2.pkey_read_pem_pubkey(bio._ptr(), callback)
if cptr is None:
raise EVPError(Err.get_error())
return PKey(cptr, 1)
[docs]
def load_key_bio(bio: BIO.BIO, callback: Callable = util.passphrase_callback) -> PKey:
"""
Load an M2Crypto.EVP.PKey from an M2Crypto.BIO object.
:param bio: M2Crypto.BIO object containing the key in PEM format.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the
key.
:return: M2Crypto.EVP.PKey object.
"""
cptr = m2.pkey_read_pem(bio._ptr(), callback)
return PKey(cptr, 1)
[docs]
def load_key_bio_pubkey(
bio: BIO.BIO, callback: Callable = util.passphrase_callback
) -> PKey:
"""
Load an M2Crypto.EVP.PKey from a public key as a M2Crypto.BIO object.
:param bio: M2Crypto.BIO object containing the key in PEM format.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the
key.
:return: M2Crypto.EVP.PKey object.
"""
cptr = m2.pkey_read_pem_pubkey(bio._ptr(), callback)
if cptr is None:
raise EVPError(Err.get_error())
return PKey(cptr, 1)
[docs]
def load_key_string(
string: Union[str, bytes],
callback: Callable = util.passphrase_callback,
) -> PKey:
"""
Load an M2Crypto.EVP.PKey from a string.
:param string: String containing the key in PEM format.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the
key.
:return: M2Crypto.EVP.PKey object.
"""
data = string.encode("utf8") if isinstance(string, str) else string
bio = BIO.MemoryBuffer(data)
return load_key_bio(bio, callback)
[docs]
def load_key_string_pubkey(
string: Union[str, bytes],
callback: Callable = util.passphrase_callback,
) -> PKey:
"""
Load an M2Crypto.EVP.PKey from a public key as a string.
:param string: String containing the key in PEM format.
:param callback: A Python callable object that is invoked
to acquire a passphrase with which to protect the
key.
:return: M2Crypto.EVP.PKey object.
"""
data = string.encode("utf8") if isinstance(string, str) else string
bio = BIO.MemoryBuffer(data)
return load_key_bio_pubkey(bio, callback)