Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ services:
addons:
postgresql: "9.2"
python:
- "2.7"
- "3.6"
- "3.14"
install:
- |
if [[ $TRAVIS_PYTHON_VERSION = 2.7 ]]; then
pip install -r requirements/dev.txt
else
pip install -r requirements/dev-3.txt
fi
pip install -r requirements/dev-3.txt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also prepare environment management with uv?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into it :)

script:
- if [[ $TRAVIS_PYTHON_VERSION = 2.7 ]]; then flake8; fi
- if [[ $TRAVIS_PYTHON_VERSION != 2.7 ]]; then mypy flask_phpbb3; fi
- if [[ $TRAVIS_PYTHON_VERSION = 2.7 ]]; then python setup.py test; fi
- if [[ $TRAVIS_PYTHON_VERSION = 3.14 ]]; then flake8; fi
- if [[ $TRAVIS_PYTHON_VERSION != 3.14 ]]; then mypy flask_phpbb3; fi
- if [[ $TRAVIS_PYTHON_VERSION = 3.14 ]]; then python setup.py test; fi
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ services:
image: postgres:9.6
ports:
- 5432:5432
environment:
POSTGRES_HOST_AUTH_METHOD: trust
10 changes: 3 additions & 7 deletions flask_phpbb3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from __future__ import absolute_import
import importlib.metadata

import pkg_resources

__version__ = pkg_resources.get_distribution(__name__).version
__version__ = importlib.metadata.distribution(__name__).version

from .extension import PhpBB3

__all__ = (
'PhpBB3',
)
__all__ = ("PhpBB3",)
233 changes: 107 additions & 126 deletions flask_phpbb3/backends/base.py
Original file line number Diff line number Diff line change
@@ -1,150 +1,74 @@
from __future__ import absolute_import

import abc
import typing

import werkzeug.contrib.cache
import cachelib

ACL_OPTIONS_CACHE_TTL = 3600 * 1

ACL_OPTIONS_CACHE_TTL: int = 3600 * 1

class BaseBackend(object):
KNOWN_OPERATIONS = (
'fetch',
'get',
'has',
'set',
)
KNOWN_DRIVERS = (
'psycopg2',
)

class UserAcl:
def __init__(
self,
cache, # type: werkzeug.contrib.cache.BaseCache
config # type: typing.Dict[str, str]
self, raw_acl_options: typing.List[typing.Dict], raw_user_permissions: str
):
# type: (...) -> None
self._functions = {} # type: typing.Dict[str, str]
self._connection = None
self._cache = cache
self._config = config

self._prepare_statements()
custom_statements = self._config.get('CUSTOM_STATEMENTS', {})
if not isinstance(custom_statements, dict):
custom_statements = {}
self._functions.update(custom_statements)

def _setup_connection(self):
# type: () -> None
raise NotImplementedError

def _prepare_statements(self):
# type: () -> None
raise NotImplementedError

@property
def _db(self):
# type: () -> typing.Any
raise NotImplementedError

def execute(
self,
command, # type: str
cache=False, # type: bool
cache_ttl=None, # type: typing.Optional[int]
**kwargs # type: typing.Any
):
# type: (...) -> typing.Any
raise NotImplementedError

def close(self):
# type: () -> None
raise NotImplementedError

@property
def is_closed(self):
# type: () -> bool
raise NotImplementedError

def get_user_acl(self, raw_user_permissions):
# type: (str) -> UserAcl
raw_acl_options = self.execute(
'fetch_acl_options',
cache=True,
cache_ttl=ACL_OPTIONS_CACHE_TTL,
limit=None,
)

return UserAcl(raw_acl_options, raw_user_permissions)


class UserAcl(object):
def __init__(self, raw_acl_options, raw_user_permissions):
# type: (typing.List[dict], str) -> None
self._acl_options = self._parse_acl_options(raw_acl_options)
self._acl = self._parse_user_permissions(raw_user_permissions)
self._acl_lookup_cache = {} # type: dict
self._acl_lookup_cache: typing.Dict[str, typing.Any] = {}

@classmethod
def _parse_acl_options(cls, raw_acl_options):
# type: (typing.List[dict]) -> dict
def _parse_acl_options(
cls, raw_acl_options: typing.List[typing.Dict[str, typing.Any]]
) -> typing.Dict[str, typing.Dict[str, int]]:
# Load ACL options, so we can decode the user ACL
acl_options = {
'local': {},
'global': {}
} # type: dict
local_index = 0
global_index = 0
acl_options: typing.Dict[str, typing.Dict[str, int]] = {
"local": {},
"global": {},
}
local_index: int = 0
global_index: int = 0

for opt in raw_acl_options or []:
if opt['is_local'] == 1:
acl_options['local'][opt['auth_option']] =\
local_index
if opt["is_local"] == 1:
acl_options["local"][opt["auth_option"]] = local_index
local_index += 1
if opt['is_global'] == 1:
acl_options['global'][opt['auth_option']] =\
global_index
if opt["is_global"] == 1:
acl_options["global"][opt["auth_option"]] = global_index
global_index += 1
# TODO By looking phpbb3 code, here also comes translation
# option <=> id

return acl_options

@classmethod
def _parse_user_permissions(cls, raw_user_permissions):
# type: (str) -> dict
seq_cache = {} # type: dict
acl = {}

split_user_permissions = raw_user_permissions\
.rstrip()\
.splitlines()
def _parse_user_permissions(
cls, raw_user_permissions: str
) -> typing.Dict[str, str]:
seq_cache: typing.Dict[str, str] = {}
acl: typing.Dict[str, str] = {}

split_user_permissions = raw_user_permissions.rstrip().splitlines()
for forum_id, perms in enumerate(split_user_permissions):
if not perms:
continue

# Do the conversion magic
acl[str(forum_id)] = ''
for sub in [perms[j:j + 6] for j in range(0, len(perms), 6)]:
acl[str(forum_id)] = ""
for sub in [perms[j : j + 6] for j in range(0, len(perms), 6)]:
if sub in seq_cache:
converted = seq_cache[sub]
else:
converted = bin(int(sub, 36))[2:]
converted = seq_cache[sub] = '0'\
* (31 - len(converted))\
+ converted
converted = seq_cache[sub] = "0" * (31 - len(converted)) + converted

acl[str(forum_id)] += converted

return acl

def has_privilege(self, privilege, forum_id=0):
# type: (str, int) -> bool
def has_privilege(self, privilege: str, forum_id: int = 0) -> bool:
# Parse negation
str_forum_id = str(forum_id)

negated = privilege.startswith('!')
negated = privilege.startswith("!")
if negated:
option = privilege[1:]
else:
Expand All @@ -153,38 +77,95 @@ def has_privilege(self, privilege, forum_id=0):
self._acl_lookup_cache.setdefault(str_forum_id, {})[option] = False

# Global permissions
if option in self._acl_options['global']\
and '0' in self._acl:
if option in self._acl_options["global"] and "0" in self._acl:
try:
acl_option = self._acl_options['global'][option]
permission = self._acl['0'][acl_option]
self._acl_lookup_cache[str_forum_id][option] =\
bool(int(permission))
acl_option = self._acl_options["global"][option]
permission = self._acl["0"][acl_option]
self._acl_lookup_cache[str_forum_id][option] = bool(int(permission))
except IndexError:
pass

# Local permissions
if str_forum_id != '0'\
and option in self._acl_options['local']:
if str_forum_id != "0" and option in self._acl_options["local"]:
try:
acl_option = self._acl_options['local'][option]
permission =\
self._acl.get(str_forum_id, '0' * 31)[acl_option]
self._acl_lookup_cache[str_forum_id][option] |=\
bool(int(permission))
acl_option = self._acl_options["local"][option]
permission = self._acl.get(str_forum_id, "0" * 31)[acl_option]
self._acl_lookup_cache[str_forum_id][option] |= bool(int(permission))
except IndexError:
pass

output = (
negated ^ self._acl_lookup_cache[str_forum_id][option]
) # type: bool
output: bool = negated ^ self._acl_lookup_cache[str_forum_id][option]
return output

def has_privileges(self, *privileges, **kwargs):
# type: (*str, **int) -> bool
forum_id = kwargs.get('forum_id', 0)
def has_privileges(self, *privileges: str, **kwargs: int) -> bool:
forum_id = kwargs.get("forum_id", 0)

output = False
for option in privileges:
output |= self.has_privilege(option, forum_id=forum_id)
return output


class BaseBackend:
KNOWN_OPERATIONS = (
"fetch",
"get",
"has",
"set",
)
KNOWN_DRIVERS = ("psycopg2",)

def __init__(self, cache: cachelib.BaseCache, config: typing.Dict[str, typing.Any]):
self._functions: typing.Dict[str, str] = {}
self._connection = None
self._cache = cache
self._config = config

self._prepare_statements()
custom_statements = self._config.get("CUSTOM_STATEMENTS", {})
if not isinstance(custom_statements, dict):
custom_statements = {}
self._functions.update(custom_statements)

@abc.abstractmethod
def _setup_connection(self) -> None:
raise NotImplementedError

@abc.abstractmethod
def _prepare_statements(self) -> None:
raise NotImplementedError

@property
@abc.abstractmethod
def _db(self) -> typing.Any:
raise NotImplementedError

@abc.abstractmethod
def execute(
self,
command: str,
cache: bool = False,
cache_ttl: typing.Optional[int] = None,
skip: int = 0,
limit: typing.Optional[int] = 10,
**kwargs: typing.Union[int, str],
) -> typing.Any:
raise NotImplementedError

@abc.abstractmethod
def close(self) -> None:
raise NotImplementedError

@property
@abc.abstractmethod
def is_closed(self) -> bool:
raise NotImplementedError

def get_user_acl(self, raw_user_permissions: str) -> UserAcl:
raw_acl_options = self.execute(
"fetch_acl_options",
cache=True,
cache_ttl=ACL_OPTIONS_CACHE_TTL,
limit=None,
)
return UserAcl(raw_acl_options, raw_user_permissions)
Loading