From 75b29dbc9a82535a74d96ce95cebb1d23ef96e1b Mon Sep 17 00:00:00 2001 From: lhhyung Date: Fri, 18 Oct 2024 13:40:44 +0900 Subject: [PATCH] feat: Implement OTP for multi-factor authentication (MFA) --- pkg/pip_requirements.txt | 1 + src/setup.py | 1 + src/spaceone/identity/conf/global_conf.py | 1 + .../identity/manager/mfa_manager/__init__.py | 1 + .../identity/manager/mfa_manager/base.py | 32 ++++- .../manager/mfa_manager/email_mfa_manger.py | 21 ++-- .../manager/mfa_manager/otp_mfa_manager.py | 112 ++++++++++++++++++ .../identity/manager/secret_manager.py | 14 +++ .../token_manager/mfa_token_manager.py | 28 +++-- .../identity/service/token_service.py | 23 +++- .../identity/service/user_profile_service.py | 65 ++++++---- 11 files changed, 246 insertions(+), 53 deletions(-) create mode 100644 src/spaceone/identity/manager/mfa_manager/otp_mfa_manager.py diff --git a/pkg/pip_requirements.txt b/pkg/pip_requirements.txt index 0bf76ba6..08f1d9c1 100644 --- a/pkg/pip_requirements.txt +++ b/pkg/pip_requirements.txt @@ -4,3 +4,4 @@ bcrypt jinja2 fakeredis pytz +pyotp diff --git a/src/setup.py b/src/setup.py index c8cb2751..a503bb82 100644 --- a/src/setup.py +++ b/src/setup.py @@ -34,6 +34,7 @@ "jinja2", "fakeredis", "pytz", + "pyotp", ], package_data={ "spaceone": [ diff --git a/src/spaceone/identity/conf/global_conf.py b/src/spaceone/identity/conf/global_conf.py index cba27e3c..0a8c99ce 100644 --- a/src/spaceone/identity/conf/global_conf.py +++ b/src/spaceone/identity/conf/global_conf.py @@ -1,6 +1,7 @@ # Email Settings EMAIL_CONSOLE_DOMAIN = "" EMAIL_SERVICE_NAME = "Cloudforet" +MFA_OTP_ISSUER_NAME = "Cloudforet" # Enums: ACCESS_TOKEN (default) | PASSWORD RESET_PASSWORD_TYPE = "ACCESS_TOKEN" diff --git a/src/spaceone/identity/manager/mfa_manager/__init__.py b/src/spaceone/identity/manager/mfa_manager/__init__.py index 78298c92..d0f8c7d7 100644 --- a/src/spaceone/identity/manager/mfa_manager/__init__.py +++ b/src/spaceone/identity/manager/mfa_manager/__init__.py @@ -1 +1,2 @@ from spaceone.identity.manager.mfa_manager.email_mfa_manger import EmailMFAManager +from spaceone.identity.manager.mfa_manager.otp_mfa_manager import OTPMFAManager diff --git a/src/spaceone/identity/manager/mfa_manager/base.py b/src/spaceone/identity/manager/mfa_manager/base.py index c543ed2b..7e5fe7bc 100644 --- a/src/spaceone/identity/manager/mfa_manager/base.py +++ b/src/spaceone/identity/manager/mfa_manager/base.py @@ -1,5 +1,6 @@ import logging import random +import pyotp from abc import abstractmethod, ABC, ABCMeta from collections import OrderedDict @@ -26,12 +27,16 @@ def disable_mfa(self, **kwargs): def confirm_mfa(self, **kwargs): pass + def set_mfa_options(self, **kwargs): + pass + def _load_conf(self): identity_conf = config.get_global("IDENTITY") or {} mfa_conf = identity_conf.get("mfa", {}) self.CONST_MFA_VERIFICATION_CODE_TIMEOUT = mfa_conf.get( "verify_code_timeout", 300 ) + self.CONST_MFA_OTP_ISSUER_NAME = config.get_global("MFA_OTP_ISSUER_NAME", "Cloudforet") class MFAManager(BaseMFAManager, metaclass=ABCMeta): @@ -50,7 +55,10 @@ def disable_mfa(self, **kwargs): def confirm_mfa(self, **kwargs): raise NotImplementedError("MFAManager.confirm_mfa not implemented!") - def create_mfa_verify_code(self, user_id: str, domain_id: str, credentials: dict): + def set_mfa_options(self, **kwargs): + raise NotImplementedError("MFAManager.set_mfa_options not implemented!") + + def create_mfa_verify_code(self, user_id: str, domain_id: str, credentials: dict, user_mfa: dict = None): if cache.is_set(): verify_code = self._generate_verify_code() ordered_credentials = OrderedDict(sorted(credentials.items())) @@ -62,6 +70,7 @@ def create_mfa_verify_code(self, user_id: str, domain_id: str, credentials: dict "verify_code": verify_code, "user_id": user_id, "domain_id": domain_id, + "user_mfa": user_mfa }, expire=self.CONST_MFA_VERIFICATION_CODE_TIMEOUT, ) @@ -72,18 +81,23 @@ def get_manager_by_mfa_type(cls, mfa_type): for subclass in cls.__subclasses__(): if subclass.mfa_type == mfa_type: return subclass() - raise ERROR_NOT_SUPPORTED_MFA_TYPE(support_mfa_types=["EMAIL"]) + raise ERROR_NOT_SUPPORTED_MFA_TYPE(support_mfa_types=["EMAIL", "OTP"]) - @staticmethod - def check_mfa_verify_code(credentials: dict, verify_code: str) -> bool: + def check_mfa_verify_code(self, credentials: dict, verify_code: str) -> bool: if cache.is_set(): ordered_credentials = OrderedDict(sorted(credentials.items())) hashed_credentials = utils.dict_to_hash(ordered_credentials) cached_mfa_info = cache.get(f"identity:mfa:{hashed_credentials}") - if cached_mfa_info["verify_code"] == verify_code: + if self.mfa_type == "OTP": + otp = self._generate_otp(cached_mfa_info["otp_secret_key"]) + is_verified = otp.verify(verify_code) + else: + is_verified = True if cached_mfa_info["verify_code"] == verify_code else False + + if is_verified: cache.delete(f"identity:mfa:{hashed_credentials}") return True - raise ERROR_INVALID_VERIFY_CODE(verify_code=credentials["verify_code"]) + raise ERROR_INVALID_VERIFY_CODE(verify_code=verify_code) @staticmethod def get_mfa_info(credentials: dict): @@ -97,3 +111,9 @@ def get_mfa_info(credentials: dict): @staticmethod def _generate_verify_code(): return str(random.randint(100000, 999999)) + + @staticmethod + def _generate_otp(otp_secret_key: str): + otp = pyotp.TOTP(otp_secret_key) + return otp + diff --git a/src/spaceone/identity/manager/mfa_manager/email_mfa_manger.py b/src/spaceone/identity/manager/mfa_manager/email_mfa_manger.py index e4c772dd..229ff62b 100644 --- a/src/spaceone/identity/manager/mfa_manager/email_mfa_manger.py +++ b/src/spaceone/identity/manager/mfa_manager/email_mfa_manger.py @@ -41,24 +41,31 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.smtp_connector = SMTPConnector() - def enable_mfa(self, user_id, domain_id, user_mfa, language): + def enable_mfa(self, user_id: str, domain_id: str, user_mfa: dict, user_vo): self.send_mfa_verify_email( - user_id, domain_id, user_mfa["options"].get("email"), language + user_id, domain_id, user_mfa["options"].get("email"), user_vo.language, user_mfa ) + return user_mfa - def disable_mfa(self, user_id, domain_id, user_mfa, language): + def disable_mfa(self, user_id: str, domain_id: str, user_mfa: dict, user_vo): self.send_mfa_verify_email( - user_id, domain_id, user_mfa["options"].get("email"), language + user_id, domain_id, user_mfa["options"].get("email"), user_vo.language ) def confirm_mfa(self, credentials: dict, verify_code: str): - return self.check_mfa_verify_code(credentials, verify_code) - def send_mfa_verify_email(self, user_id, domain_id, email, language): + confirm_result = self.check_mfa_verify_code(credentials, verify_code) + + return confirm_result + + def set_mfa_options(self, user_mfa: dict, credentials: dict): + return user_mfa + + def send_mfa_verify_email(self, user_id: str, domain_id: str, email: str, language: str, user_mfa: dict = None): service_name = self._get_service_name() language_map_info = LANGUAGE_MAPPER.get(language, "default") credentials = {"user_id": user_id, "domain_id": domain_id} - verify_code = self.create_mfa_verify_code(user_id, domain_id, credentials) + verify_code = self.create_mfa_verify_code(user_id, domain_id, credentials, user_mfa) template = JINJA_ENV.get_template(f"verification_MFA_code_{language}.html") email_contents = template.render( diff --git a/src/spaceone/identity/manager/mfa_manager/otp_mfa_manager.py b/src/spaceone/identity/manager/mfa_manager/otp_mfa_manager.py new file mode 100644 index 00000000..b52c5d24 --- /dev/null +++ b/src/spaceone/identity/manager/mfa_manager/otp_mfa_manager.py @@ -0,0 +1,112 @@ +import logging +import pyotp +from collections import OrderedDict + +from spaceone.core import utils, cache + +from spaceone.identity.manager import SecretManager +from spaceone.identity.manager.mfa_manager.base import MFAManager + +from spaceone.identity.error.error_user import ERROR_INVALID_VERIFY_CODE + +_LOGGER = logging.getLogger(__name__) + + +class OTPMFAManager(MFAManager): + mfa_type = "OTP" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def enable_mfa(self, user_id: str, domain_id: str, user_mfa: dict, user_vo): + credentials = { + "user_id": user_id, + "domain_id": domain_id + } + + otp_secret_key = self._generate_otp_secret_key() + otp_qrcode_uri = self._generate_otp_qrcode_uri(self._generate_otp(otp_secret_key), user_id) + + self.set_cache_otp_mfa_secret_key(otp_secret_key, user_id, domain_id, credentials, user_mfa) + + user_mfa["options"]["otp_qrcode_uri"] = otp_qrcode_uri + + return user_mfa + + def disable_mfa(self, user_id: str, domain_id: str, user_mfa: dict, user_vo): + credentials = { + "user_id": user_id, + "domain_id": domain_id + } + + secret_manager: SecretManager = self.locator.get_manager(SecretManager) + user_secret_id = user_mfa["options"].get("user_secret_id") + otp_secret_key = secret_manager.get_user_otp_secret_key(user_secret_id, domain_id) + + self.set_cache_otp_mfa_secret_key(otp_secret_key, user_id, domain_id, credentials) + + def confirm_mfa(self, credentials: dict, verify_code: str): + + confirm_result = self.check_otp_mfa_verify_code(credentials, verify_code) + + return confirm_result + + def set_mfa_options(self, user_mfa: dict, credentials: dict): + mfa_state = user_mfa.get("state", "DISABLED") + + secret_manager: SecretManager = self.locator.get_manager(SecretManager) + + if mfa_state == "ENABLED": + user_secret_id = user_mfa["options"]["user_secret_id"] + secret_manager.delete_user_secret(user_secret_id) + + elif mfa_state == "DISABLED": + otp_secret_key = self.get_cached_otp_secret_key(credentials) + user_secret_info = secret_manager.create_user_secret(otp_secret_key) + user_mfa["options"]["user_secret_id"] = user_secret_info.get("user_secret_id") + + return user_mfa + + def set_cache_otp_mfa_secret_key(self, otp_secret_key: str, user_id: str, domain_id: str, credentials: dict, user_mfa: dict = None): + if cache.is_set(): + ordered_credentials = OrderedDict(sorted(credentials.items())) + hashed_credentials = utils.dict_to_hash(ordered_credentials) + cache.delete(f"identity:mfa:{hashed_credentials}") + cache.set( + f"identity:mfa:{hashed_credentials}", + { + "otp_secret_key": otp_secret_key, + "user_id": user_id, + "domain_id": domain_id, + "user_mfa": user_mfa + }, + expire=self.CONST_MFA_VERIFICATION_CODE_TIMEOUT, + ) + + def check_otp_mfa_verify_code(self, credentials: dict, verify_code: str) -> bool: + if cache.is_set(): + ordered_credentials = OrderedDict(sorted(credentials.items())) + hashed_credentials = utils.dict_to_hash(ordered_credentials) + cached_mfa_info = cache.get(f"identity:mfa:{hashed_credentials}") + otp = self._generate_otp(cached_mfa_info["otp_secret_key"]) + if otp.verify(verify_code): + return True + raise ERROR_INVALID_VERIFY_CODE(verify_code=verify_code) + + @staticmethod + def get_cached_otp_secret_key(credentials: dict): + if cache.is_set(): + ordered_credentials = OrderedDict(sorted(credentials.items())) + hashed_credentials = utils.dict_to_hash(ordered_credentials) + cached_mfa_info = cache.get(f"identity:mfa:{hashed_credentials}") + cache.delete(f"identity:mfa:{hashed_credentials}") + return cached_mfa_info["otp_secret_key"] + + @staticmethod + def _generate_otp_secret_key() -> str: + return pyotp.random_base32() + + def _generate_otp_qrcode_uri(self, otp, user_id: str) -> str: + otp_qrcode_uri = otp.provisioning_uri(name=user_id, issuer_name=self.CONST_MFA_OTP_ISSUER_NAME) + return otp_qrcode_uri + diff --git a/src/spaceone/identity/manager/secret_manager.py b/src/spaceone/identity/manager/secret_manager.py index d93ac92f..b21d6bd6 100644 --- a/src/spaceone/identity/manager/secret_manager.py +++ b/src/spaceone/identity/manager/secret_manager.py @@ -118,3 +118,17 @@ def list_secrets(self, params: dict, domain_id: str = None) -> dict: ) else: return self.secret_conn.dispatch("Secret.list", params) + + def create_user_secret(self, params: dict) -> dict: + return self.secret_conn.dispatch("UserSecret.create", params) + + def get_user_secret_data(self, user_secret_id: str, domain_id: str = None) -> dict: + response = self.secret_conn.dispatch("UserSecret.get_data", {"user_secret_id": user_secret_id, "domain_id": domain_id}) + return response["data"] + + def delete_user_secret(self, user_secret_id: str) -> None: + self.secret_conn.dispatch("UserSecret.delete", {"user_secret_id": user_secret_id}) + + def get_user_otp_secret_key(self, user_secret_id: str, domain_id: str = None) -> str: + user_secret_info = self.get_user_secret_data(user_secret_id, domain_id) + return user_secret_info["otp_secret_key"] diff --git a/src/spaceone/identity/manager/token_manager/mfa_token_manager.py b/src/spaceone/identity/manager/token_manager/mfa_token_manager.py index 3cdf6611..e4d9d3f2 100644 --- a/src/spaceone/identity/manager/token_manager/mfa_token_manager.py +++ b/src/spaceone/identity/manager/token_manager/mfa_token_manager.py @@ -8,6 +8,7 @@ from spaceone.identity.manager.user_manager import UserManager from spaceone.identity.manager.mfa_manager.base import MFAManager from spaceone.identity.manager.token_manager.base import TokenManager +from spaceone.identity.manager import SecretManager from spaceone.identity.model.domain.database import Domain from spaceone.identity.model.user.database import User @@ -36,20 +37,29 @@ def authenticate(self, domain_id: str, **kwargs): self.user: User = self.user_mgr.get_user(user_id, domain_id) self._check_user_state() + user_mfa = self.user.mfa.to_dict() + mfa_type = user_mfa.get("mfa_type") + mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) + if verify_code := kwargs.get("verify_code"): - if MFAManager.check_mfa_verify_code(credentials, verify_code): + if mfa_manager.check_mfa_verify_code(credentials, verify_code): self.is_authenticated = True else: raise ERROR_INVALID_CREDENTIALS() + else: - user_mfa = self.user.mfa.to_dict() - mfa_email = user_mfa["options"].get("email") - - mfa_manager = MFAManager.get_manager_by_mfa_type(user_mfa.get("mfa_type")) - mfa_manager.send_mfa_authentication_email( - self.user.user_id, domain_id, mfa_email, self.user.language, credentials - ) - raise ERROR_MFA_REQUIRED(user_id=mfa_email) + if mfa_type == "EMAIL": + mfa_email = user_mfa["options"].get("email") + + mfa_manager.send_mfa_authentication_email( + self.user.user_id, domain_id, mfa_email, self.user.language, credentials + ) + elif mfa_type == "OTP": + secret_manager: SecretManager = self.locator.get_manager(SecretManager) + user_secret_id = user_mfa["options"].get("user_secret_id") + otp_secret_key = secret_manager.get_user_otp_secret_key(user_secret_id, domain_id) + + mfa_manager.set_cache_otp_mfa_secret_key(otp_secret_key, self.user.user_id, domain_id, credentials) def _check_user_state(self) -> None: if self.user.state not in ["ENABLED", "PENDING"]: diff --git a/src/spaceone/identity/service/token_service.py b/src/spaceone/identity/service/token_service.py index ece2a1f9..4dbc397a 100644 --- a/src/spaceone/identity/service/token_service.py +++ b/src/spaceone/identity/service/token_service.py @@ -12,6 +12,7 @@ from spaceone.identity.error.error_workspace import ERROR_WORKSPACE_STATE from spaceone.identity.manager.app_manager import AppManager from spaceone.identity.manager.domain_manager import DomainManager +from spaceone.identity.manager import SecretManager from spaceone.identity.manager.domain_secret_manager import DomainSecretManager from spaceone.identity.manager.mfa_manager.base import MFAManager from spaceone.identity.manager.project_group_manager import ProjectGroupManager @@ -82,15 +83,25 @@ def issue(self, params: TokenIssueRequest) -> Union[TokenResponse, dict]: user_vo = token_mgr.user user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {} + mfa_type = user_mfa.get('mfa_type') permissions = self._get_permissions_from_required_actions(user_vo) if user_mfa.get("state", "DISABLED") == "ENABLED" and params.auth_type != "MFA": - mfa_manager = MFAManager.get_manager_by_mfa_type(user_mfa.get("mfa_type")) - mfa_email = user_mfa["options"].get("email") - mfa_manager.send_mfa_authentication_email( - user_vo.user_id, domain_id, mfa_email, user_vo.language, credentials - ) - raise ERROR_MFA_REQUIRED(user_id=mfa_email) + mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) + if mfa_type == "EMAIL": + mfa_email = user_mfa["options"].get("email") + mfa_manager.send_mfa_authentication_email( + user_vo.user_id, domain_id, mfa_email, user_vo.language, credentials + ) + + elif mfa_type == "OTP": + secret_manager: SecretManager = self.locator.get_manager(SecretManager) + user_secret_id = user_mfa["options"].get("user_secret_id") + otp_secret_key = secret_manager.get_user_otp_secret_key(user_secret_id, domain_id) + + mfa_manager.set_cache_otp_mfa_secret_key(otp_secret_key, user_vo.user_id, domain_id, credentials) + + raise ERROR_MFA_REQUIRED(user_id=user_vo.user_id) token_info = token_mgr.issue_token( private_jwk, diff --git a/src/spaceone/identity/service/user_profile_service.py b/src/spaceone/identity/service/user_profile_service.py index 7b81556a..16eb8d73 100644 --- a/src/spaceone/identity/service/user_profile_service.py +++ b/src/spaceone/identity/service/user_profile_service.py @@ -226,22 +226,21 @@ def enable_mfa( user_vo = self.user_mgr.get_user(user_id, domain_id) user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {} - if not options: - raise ERROR_REQUIRED_PARAMETER(key="options") + self._check_mfa_options(options, mfa_type) if user_mfa.get("state", "DISABLED") == "ENABLED": raise ERROR_MFA_ALREADY_ENABLED(user_id=user_id) mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) - if mfa_type == "EMAIL": - user_mfa["mfa_type"] = mfa_type - user_mfa["options"] = options - user_mfa["state"] = user_mfa.get("state", "DISABLED") - mfa_manager.enable_mfa(user_id, domain_id, user_mfa, user_vo.language) - user_vo = self.user_mgr.update_user_by_vo({"mfa": user_mfa}, user_vo) + user_mfa["mfa_type"] = mfa_type + user_mfa["state"] = user_mfa.get("state", "DISABLED") + user_mfa["options"] = options + + if mfa_type in ["EMAIL", "OTP"]: + user_vo.mfa = mfa_manager.enable_mfa(user_id, domain_id, user_mfa, user_vo) else: - raise ERROR_NOT_SUPPORTED_MFA_TYPE(support_mfa_types=["EMAIL"]) + raise ERROR_NOT_SUPPORTED_MFA_TYPE(support_mfa_types=["EMAIL", "OTP"]) return UserResponse(**user_vo.to_dict()) @@ -271,7 +270,7 @@ def disable_mfa( raise ERROR_MFA_ALREADY_DISABLED(user_id=user_id) mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) - mfa_manager.disable_mfa(user_id, domain_id, user_mfa, user_vo.language) + mfa_manager.disable_mfa(user_id, domain_id, user_mfa, user_vo) return UserResponse(**user_vo.to_dict()) @@ -294,28 +293,39 @@ def confirm_mfa( domain_id = params.domain_id verify_code = params.verify_code + credentials = { + "user_id": user_id, + "domain_id": domain_id, + } + user_vo = self.user_mgr.get_user(user_id, domain_id) - mfa_type = user_vo.mfa.mfa_type + user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {} + mfa_state = user_mfa.get("state", "DISABLED") + + if mfa_state == "DISABLED": + user_mfa = MFAManager.get_mfa_info(credentials)["user_mfa"] + + mfa_type = user_mfa["mfa_type"] if not mfa_type: raise ERROR_MFA_NOT_ENABLED(user_id=user_id) mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) - if mfa_type == "EMAIL": - credentials = { - "user_id": user_id, - "domain_id": domain_id, - } - if mfa_manager.confirm_mfa(credentials, verify_code): - user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {} - if user_mfa.get("state", "DISABLED") == "ENABLED": - user_mfa = {"state": "DISABLED"} - elif user_mfa.get("state", "DISABLED") == "DISABLED": - user_mfa["state"] = "ENABLED" - self.user_mgr.update_user_by_vo({"mfa": user_mfa}, user_vo) - else: - raise ERROR_INVALID_VERIFY_CODE(verify_code=verify_code) + if mfa_manager.confirm_mfa(credentials, verify_code): + + user_mfa = mfa_manager.set_mfa_options(user_mfa, credentials) + + if mfa_state == "ENABLED": + user_mfa = {"state": "DISABLED"} + elif mfa_state == "DISABLED": + user_mfa["state"] = "ENABLED" + + user_vo = self.user_mgr.update_user_by_vo({"mfa": user_mfa}, user_vo) + + else: + raise ERROR_INVALID_VERIFY_CODE(verify_code=verify_code) + return UserResponse(**user_vo.to_dict()) @transaction(permission="identity:UserProfile.read", role_types=["USER"]) @@ -605,3 +615,8 @@ def _get_my_workspace_groups_info( my_workspace_groups_info.append(workspace_group_info) return my_workspace_groups_info + + @staticmethod + def _check_mfa_options(options, mfa_type): + if mfa_type in ["EMAIL"] and not options: + raise ERROR_REQUIRED_PARAMETER(key="options.email")