Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 73 additions & 16 deletions compliance/utils/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"""Compliance credentials configuration."""

import logging
import os
import shlex
import subprocess # nosec B404
from collections import OrderedDict, namedtuple
from configparser import RawConfigParser
from os import environ
Expand All @@ -26,18 +29,82 @@
logger = logging.getLogger("compliance.utils.credentials")


class OnePasswordBackend:
def __init__(self, **kwargs):
self._url = kwargs.get("url")
self._vault = kwargs.get("vault", "auditree")

def get_section(self, section):
cmd = f"op item get --vault {self._vault} --format json {section}"
subprocess.run( # nosec B603
shlex.split(cmd),
env=os.environ,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)

def attribute_error_msg(self, section, attr):
return (
f'Unable to locate field "{attr}" '
f'in secure note "{section}" '
f'at 1Password vault "{self._vault}"'
)


class ConfigParserBackend:
def __init__(self, **kwargs):
self._cfg = RawConfigParser()
self._cfg_file = kwargs.get("cfg_file")
self._cfg.read(str(Path(self._cfg_file).expanduser()))

def get_section(self, section):
params = []
values = []
if self._cfg.has_section(section):
params = self._cfg.options(section)
values = [self._cfg.get(section, x) for x in self._cfg.options(section)]
return OrderedDict(zip(params, values))

def attribute_error_msg(self, section, attr):
return (
f'Unable to locate attribute "{attr}" '
f'in section "{section}" '
f'at config file "{self._cfg_file}"'
)


class Config:
"""Handle credentials configuration."""

def __init__(self, cfg_file="~/.credentials"):
BACKENDS = {"1password": OnePasswordBackend, "configparser": ConfigParserBackend}

def __init__(self, cfg_file="~/.credentials", backend_cfg=None):
"""
Create an instance of a dictionary-like configuration object.

:param cfg_file: The path to a config file for building a ConfigParserBackend.
:param backend_cfg: A dictionary with the backend config
"""

if backend_cfg is None:
backend_cfg = {"name": "configparser", "cfg_file": cfg_file}
self._init_backend(backend_cfg)

def _init_backend(self, backend_cfg):
"""
Create an instance of a dictionary-like configuration object.

:param cfg_file: The path to the RawConfigParser compatible config file
"""
self._cfg = RawConfigParser()
self._cfg.read(str(Path(cfg_file).expanduser()))
self._cfg_file = cfg_file
name = backend_cfg.get("name")
if backend_cfg.get("name") not in Config.BACKENDS:
raise ValueError(f"Invalid credentials backend name: {name}")
self._backend = Config.BACKENDS[name](**backend_cfg)

@classmethod
def from_auditree_config(cls, at_cfg):
return cls(backend_cfg=at_cfg.get("creds.backend"))

def __getitem__(self, section):
"""
Expand All @@ -60,26 +127,16 @@ def _getattr_wrapper(t, attr):
try:
return t.__getattribute__(attr)
except AttributeError as exc:
exc.args = (
(
f'Unable to locate attribute "{attr}" '
f'in section "{type(t).__name__}" '
f'at config file "{self._cfg_file}"'
),
)
exc.args = (self._backend.attribute_error_msg(type(t).__name__, attr),)
raise exc

env_vars = [k for k in environ.keys() if k.startswith(f"{section.upper()}_")]
env_keys = [k.split(section.upper())[1].lstrip("_").lower() for k in env_vars]
env_values = [environ[e] for e in env_vars]
if env_vars:
logger.debug(f'Loading credentials from ENV vars: {", ".join(env_vars)}')
params = []
if self._cfg.has_section(section):
params = self._cfg.options(section)
values = [self._cfg.get(section, x) for x in params]

d = OrderedDict(zip(params, values))
d = self._backend.get_section(section)

if env_vars:
d.update(zip(env_keys, env_values))
Expand Down
28 changes: 28 additions & 0 deletions test/t_compliance/t_utils/test_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) 2023 EnterpriseDB. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compliance automation utilities tests module."""

import unittest


from compliance.utils.credentials import Config


class ConfigTest(unittest.TestCase):
"""Test Config class."""

def test_default(self):
"""Test default credentials."""
cfg = Config()
self.assertTrue(cfg)