Unverified Commit 6457e76d by Jeremy Bowman Committed by GitHub

Merge pull request #16612 from edx/jmbowman/PLAT-1199

PLAT-1199 Stop using pycrypto
parents 80d7cefd 43a11af7
......@@ -103,11 +103,9 @@ class NotificationPrefViewTest(UrlResetMixin, TestCase):
self.assertRaises(PermissionDenied, ajax_enable, request)
self.assertNotPrefExists(self.user)
@patch("Crypto.Random.new")
def test_ajax_enable_success(self, mock_random_new):
mock_stream = Mock()
mock_stream.read.return_value = self.INITIALIZATION_VECTOR
mock_random_new.return_value = mock_stream
@patch("os.urandom")
def test_ajax_enable_success(self, mock_urandom):
mock_urandom.return_value = self.INITIALIZATION_VECTOR
def test_user(user):
request = self.request_factory.post("dummy")
......
"""
Views to support notification preferences.
"""
from __future__ import division
import json
import os
from base64 import urlsafe_b64decode, urlsafe_b64encode
from hashlib import sha256
from Crypto import Random
from Crypto.Cipher import AES
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.modes import CBC
from cryptography.hazmat.primitives.padding import PKCS7
from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
......@@ -15,6 +25,8 @@ from notification_prefs import NOTIFICATION_PREF_KEY
from openedx.core.djangoapps.user_api.models import UserPreference
from openedx.core.djangoapps.user_api.preferences.api import delete_user_preference
AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8)
class UsernameDecryptionException(Exception):
pass
......@@ -39,35 +51,20 @@ class UsernameCipher(object):
decryption cipher
5. base64url encode the result
"""
@staticmethod
def _get_aes_cipher(initialization_vector):
hash_ = sha256()
hash_.update(settings.SECRET_KEY)
return AES.new(hash_.digest(), AES.MODE_CBC, initialization_vector)
@staticmethod
def _add_padding(input_str):
"""Return `input_str` with PKCS#7 padding added to match AES block length"""
padding_len = AES.block_size - len(input_str) % AES.block_size
return input_str + padding_len * chr(padding_len)
@staticmethod
def _remove_padding(input_str):
"""Return `input_str` with PKCS#7 padding trimmed to match AES block length"""
num_pad_bytes = ord(input_str[-1])
if num_pad_bytes < 1 or num_pad_bytes > AES.block_size or num_pad_bytes >= len(input_str):
raise UsernameDecryptionException("padding")
return input_str[:-num_pad_bytes]
return Cipher(AES(hash_.digest()), CBC(initialization_vector), backend=default_backend())
@staticmethod
def encrypt(username):
initialization_vector = Random.new().read(AES.block_size)
initialization_vector = os.urandom(AES_BLOCK_SIZE_BYTES)
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
return urlsafe_b64encode(
initialization_vector +
aes_cipher.encrypt(UsernameCipher._add_padding(username.encode("utf-8")))
)
encryptor = aes_cipher.encryptor()
padder = PKCS7(AES.block_size).padder()
padded = padder.update(username.encode("utf-8")) + padder.finalize()
return urlsafe_b64encode(initialization_vector + encryptor.update(padded) + encryptor.finalize())
@staticmethod
def decrypt(token):
......@@ -76,19 +73,27 @@ class UsernameCipher(object):
except TypeError:
raise UsernameDecryptionException("base64url")
if len(base64_decoded) < AES.block_size:
if len(base64_decoded) < AES_BLOCK_SIZE_BYTES:
raise UsernameDecryptionException("initialization_vector")
initialization_vector = base64_decoded[:AES.block_size]
aes_encrypted = base64_decoded[AES.block_size:]
initialization_vector = base64_decoded[:AES_BLOCK_SIZE_BYTES]
aes_encrypted = base64_decoded[AES_BLOCK_SIZE_BYTES:]
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
decryptor = aes_cipher.decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
try:
decrypted = aes_cipher.decrypt(aes_encrypted)
decrypted = decryptor.update(aes_encrypted) + decryptor.finalize()
except ValueError:
raise UsernameDecryptionException("aes")
return UsernameCipher._remove_padding(decrypted)
try:
unpadded = unpadder.update(decrypted) + unpadder.finalize()
if len(unpadded) == 0:
raise UsernameDecryptionException("padding")
return unpadded
except ValueError:
raise UsernameDecryptionException("padding")
def enable_notifications(user):
......
......@@ -608,7 +608,7 @@ class SoftwareSecurePhotoVerification(PhotoVerification):
both Software Secure and edx-platform.
2. The snapshot of a user's photo ID is also encrypted using AES-256, but
the key is randomly generated using pycrypto's Random. Every verification
the key is randomly generated using os.urandom. Every verification
attempt has a new key. The AES key is then encrypted using a public key
provided by Software Secure. We store only the RSA-encryped AES key.
Since edx-platform does not have Software Secure's private RSA key, it
......
......@@ -6,8 +6,6 @@ words, passing the `str` value of
You want to pass in the result of calling .decode('hex') on that, so this instead:
"'2\xfer\xaa\xf2\xab\xb4M\xe9\xe1a\x13\x1bT5\xc8\xd3|\xbd\xb6\xf5\xdf$*\xe8`\xb2\x83\x11_-\xae'"
The RSA functions take any key format that RSA.importKey() accepts, so...
An RSA public key can be in any of the following formats:
* X.509 subjectPublicKeyInfo DER SEQUENCE (binary or PEM encoding)
* PKCS#1 RSAPublicKey DER SEQUENCE (binary or PEM encoding)
......@@ -16,27 +14,32 @@ An RSA public key can be in any of the following formats:
An RSA private key can be in any of the following formats:
* PKCS#1 RSAPrivateKey DER SEQUENCE (binary or PEM encoding)
* PKCS#8 PrivateKeyInfo DER SEQUENCE (binary or PEM encoding)
* OpenSSH (textual public key only)
In case of PEM encoding, the private key can be encrypted with DES or 3TDES
according to a certain pass phrase. Only OpenSSL-compatible pass phrases are
supported.
"""
from __future__ import division
import base64
import binascii
import hmac
import logging
import os
from hashlib import md5, sha256
from Crypto import Random
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.modes import CBC
from cryptography.hazmat.primitives.hashes import SHA1
from cryptography.hazmat.primitives.padding import PKCS7
log = logging.getLogger(__name__)
AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8)
def encrypt_and_encode(data, key):
""" Encrypts and endcodes `data` using `key' """
""" Encrypts and encodes `data` using `key' """
return base64.urlsafe_b64encode(aes_encrypt(data, key))
......@@ -51,7 +54,8 @@ def aes_encrypt(data, key):
"""
cipher = aes_cipher_from_key(key)
padded_data = pad(data)
return cipher.encrypt(padded_data)
encryptor = cipher.encryptor()
return encryptor.update(padded_data) + encryptor.finalize()
def aes_decrypt(encrypted_data, key):
......@@ -59,17 +63,18 @@ def aes_decrypt(encrypted_data, key):
Decrypt `encrypted_data` using `key`
"""
cipher = aes_cipher_from_key(key)
padded_data = cipher.decrypt(encrypted_data)
decryptor = cipher.decryptor()
padded_data = decryptor.update(encrypted_data) + decryptor.finalize()
return unpad(padded_data)
def aes_cipher_from_key(key):
"""
Given an AES key, return a Cipher object that has `encrypt()` and
`decrypt()` methods. It will create the cipher to use CBC mode, and create
Given an AES key, return a Cipher object that has `encryptor()` and
`decryptor()` methods. It will create the cipher to use CBC mode, and create
the initialization vector as Software Secure expects it.
"""
return AES.new(key, AES.MODE_CBC, generate_aes_iv(key))
return Cipher(AES(key), CBC(generate_aes_iv(key)), backend=default_backend())
def generate_aes_iv(key):
......@@ -77,42 +82,47 @@ def generate_aes_iv(key):
Return the initialization vector Software Secure expects for a given AES
key (they hash it a couple of times and take a substring).
"""
return md5(key + md5(key).hexdigest()).hexdigest()[:AES.block_size]
return md5(key + md5(key).hexdigest()).hexdigest()[:AES_BLOCK_SIZE_BYTES]
def random_aes_key():
return Random.new().read(32)
return os.urandom(32)
def pad(data):
""" Pad the given `data` such that it fits into the proper AES block size """
bytes_to_pad = AES.block_size - len(data) % AES.block_size
return data + (bytes_to_pad * chr(bytes_to_pad))
padder = PKCS7(AES.block_size).padder()
return padder.update(data) + padder.finalize()
def unpad(padded_data):
""" remove all padding from `padded_data` """
num_padded_bytes = ord(padded_data[-1])
return padded_data[:-num_padded_bytes]
unpadder = PKCS7(AES.block_size).unpadder()
return unpadder.update(padded_data) + unpadder.finalize()
def rsa_encrypt(data, rsa_pub_key_str):
def rsa_encrypt(data, rsa_pub_key_bytes):
"""
`rsa_pub_key` is a string with the public key
`rsa_pub_key_bytes` is a byte sequence with the public key
"""
key = RSA.importKey(rsa_pub_key_str)
cipher = PKCS1_OAEP.new(key)
encrypted_data = cipher.encrypt(data)
return encrypted_data
if rsa_pub_key_bytes.startswith(b'-----'):
key = serialization.load_pem_public_key(rsa_pub_key_bytes, backend=default_backend())
elif rsa_pub_key_bytes.startswith(b'ssh-rsa '):
key = serialization.load_ssh_public_key(rsa_pub_key_bytes, backend=default_backend())
else:
key = serialization.load_der_public_key(rsa_pub_key_bytes, backend=default_backend())
return key.encrypt(data, OAEP(MGF1(SHA1()), SHA1(), label=None))
def rsa_decrypt(data, rsa_priv_key_str):
def rsa_decrypt(data, rsa_priv_key_bytes):
"""
When given some `data` and an RSA private key, decrypt the data
"""
key = RSA.importKey(rsa_priv_key_str)
cipher = PKCS1_OAEP.new(key)
return cipher.decrypt(data)
if rsa_priv_key_bytes.startswith(b'-----'):
key = serialization.load_pem_private_key(rsa_priv_key_bytes, password=None, backend=default_backend())
else:
key = serialization.load_der_private_key(rsa_priv_key_bytes, password=None, backend=default_backend())
return key.decrypt(data, OAEP(MGF1(SHA1()), SHA1(), label=None))
def has_valid_signature(method, headers_dict, body_dict, access_key, secret_key):
......
......@@ -7,7 +7,7 @@ import unittest
import ddt
import httpretty
from Crypto.PublicKey import RSA
from Cryptodome.PublicKey import RSA
from django.conf import settings
from django.core.urlresolvers import reverse
from django.test import RequestFactory, TestCase, override_settings
......
......@@ -8,7 +8,7 @@ from __future__ import unicode_literals
import hashlib
import json
from Crypto.PublicKey import RSA
from Cryptodome.PublicKey import RSA
from django.conf import settings
from django.core.urlresolvers import reverse
from django.http import JsonResponse
......
......@@ -47,7 +47,7 @@ edx-lint==0.4.3
astroid==1.3.8
edx-django-oauth2-provider==1.2.5
edx-django-sites-extensions==2.3.0
edx-enterprise==0.53.14
edx-enterprise==0.53.15
edx-oauth2-provider==1.2.2
edx-opaque-keys==0.4.0
edx-organizations==0.4.8
......@@ -76,7 +76,7 @@ piexif==1.0.2
Pillow==3.4
polib==1.0.3
psutil==1.2.1
pycrypto>=2.6
pycryptodomex==3.4.7
pygments==2.2.0
pygraphviz==1.1
pyjwkest==1.3.2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment