diff --git a/compliance/utils/credentials.py b/compliance/utils/credentials.py index 76b649c5..17821447 100644 --- a/compliance/utils/credentials.py +++ b/compliance/utils/credentials.py @@ -14,6 +14,9 @@ """Compliance credentials configuration.""" import logging +import os +import shlex +import subprocess # nosec B404 from collections import OrderedDict, namedtuple from configparser import RawConfigParser from os import environ @@ -26,18 +29,82 @@ logger = logging.getLogger("compliance.utils.credentials") +class OnePasswordBackend: + def __init__(self, **kwargs): + self._url = kwargs.get("url") + self._vault = kwargs.get("vault", "auditree") + + def get_section(self, section): + cmd = f"op item get --vault {self._vault} --format json {section}" + subprocess.run( # nosec B603 + shlex.split(cmd), + env=os.environ, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + + def attribute_error_msg(self, section, attr): + return ( + f'Unable to locate field "{attr}" ' + f'in secure note "{section}" ' + f'at 1Password vault "{self._vault}"' + ) + + +class ConfigParserBackend: + def __init__(self, **kwargs): + self._cfg = RawConfigParser() + self._cfg_file = kwargs.get("cfg_file") + self._cfg.read(str(Path(self._cfg_file).expanduser())) + + def get_section(self, section): + params = [] + values = [] + if self._cfg.has_section(section): + params = self._cfg.options(section) + values = [self._cfg.get(section, x) for x in self._cfg.options(section)] + return OrderedDict(zip(params, values)) + + def attribute_error_msg(self, section, attr): + return ( + f'Unable to locate attribute "{attr}" ' + f'in section "{section}" ' + f'at config file "{self._cfg_file}"' + ) + + class Config: """Handle credentials configuration.""" - def __init__(self, cfg_file="~/.credentials"): + BACKENDS = {"1password": OnePasswordBackend, "configparser": ConfigParserBackend} + + def __init__(self, cfg_file="~/.credentials", backend_cfg=None): + """ + Create an instance of a dictionary-like configuration object. + + :param cfg_file: The path to a config file for building a ConfigParserBackend. + :param backend_cfg: A dictionary with the backend config + """ + + if backend_cfg is None: + backend_cfg = {"name": "configparser", "cfg_file": cfg_file} + self._init_backend(backend_cfg) + + def _init_backend(self, backend_cfg): """ Create an instance of a dictionary-like configuration object. :param cfg_file: The path to the RawConfigParser compatible config file """ - self._cfg = RawConfigParser() - self._cfg.read(str(Path(cfg_file).expanduser())) - self._cfg_file = cfg_file + name = backend_cfg.get("name") + if backend_cfg.get("name") not in Config.BACKENDS: + raise ValueError(f"Invalid credentials backend name: {name}") + self._backend = Config.BACKENDS[name](**backend_cfg) + + @classmethod + def from_auditree_config(cls, at_cfg): + return cls(backend_cfg=at_cfg.get("creds.backend")) def __getitem__(self, section): """ @@ -60,13 +127,7 @@ def _getattr_wrapper(t, attr): try: return t.__getattribute__(attr) except AttributeError as exc: - exc.args = ( - ( - f'Unable to locate attribute "{attr}" ' - f'in section "{type(t).__name__}" ' - f'at config file "{self._cfg_file}"' - ), - ) + exc.args = (self._backend.attribute_error_msg(type(t).__name__, attr),) raise exc env_vars = [k for k in environ.keys() if k.startswith(f"{section.upper()}_")] @@ -74,12 +135,8 @@ def _getattr_wrapper(t, attr): env_values = [environ[e] for e in env_vars] if env_vars: logger.debug(f'Loading credentials from ENV vars: {", ".join(env_vars)}') - params = [] - if self._cfg.has_section(section): - params = self._cfg.options(section) - values = [self._cfg.get(section, x) for x in params] - d = OrderedDict(zip(params, values)) + d = self._backend.get_section(section) if env_vars: d.update(zip(env_keys, env_values)) diff --git a/test/t_compliance/t_utils/test_credentials.py b/test/t_compliance/t_utils/test_credentials.py new file mode 100644 index 00000000..98857870 --- /dev/null +++ b/test/t_compliance/t_utils/test_credentials.py @@ -0,0 +1,28 @@ +# Copyright (c) 2023 EnterpriseDB. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Compliance automation utilities tests module.""" + +import unittest + + +from compliance.utils.credentials import Config + + +class ConfigTest(unittest.TestCase): + """Test Config class.""" + + def test_default(self): + """Test default credentials.""" + cfg = Config() + self.assertTrue(cfg)