diff --git a/AddonCatalogCacheCreator.py b/AddonCatalogCacheCreator.py index 32ae8fef..9716868b 100644 --- a/AddonCatalogCacheCreator.py +++ b/AddonCatalogCacheCreator.py @@ -308,22 +308,31 @@ def generate_cache_entry_from_package_xml( print(f"ERROR: Unknown error while reading icon file {absolute_icon_path}") print(e) icon_data_is_good = False - if absolute_icon_path.lower().endswith(".svg"): - try: - if not icon_utils.is_svg_bytes(icon_data): + if icon_data is not None: + if absolute_icon_path.lower().endswith(".svg"): + try: + if not icon_utils.is_svg_bytes(icon_data): + self.icon_errors[metadata.name] = { + "valid_icon_path": relative_icon_path, + "error_message": "SVG file does not have valid XML header", + } + icon_data_is_good = False + except icon_utils.BadIconData as e: self.icon_errors[metadata.name] = { "valid_icon_path": relative_icon_path, - "error_message": "SVG file does not have valid XML header", + "error_message": str(e), } icon_data_is_good = False - except icon_utils.BadIconData as e: - self.icon_errors[metadata.name] = { - "valid_icon_path": relative_icon_path, - "error_message": str(e), - } - icon_data_is_good = False - if icon_data_is_good: - cache_entry.icon_data = base64.b64encode(icon_data).decode("utf-8") + elif absolute_icon_path.lower().endswith(".png"): + if icon_utils.png_has_duplicate_iccp(icon_data): + self.icon_errors[metadata.name] = { + "valid_icon_path": relative_icon_path, + "error_message": "PNG data has duplicate iCCP chunk", + } + icon_data_is_good = False + + if icon_data_is_good: + cache_entry.icon_data = base64.b64encode(icon_data).decode("utf-8") else: self.icon_errors[metadata.name] = {"bad_icon_path": relative_icon_path} print(f"ERROR: Could not find icon file {absolute_icon_path}") diff --git a/AddonManagerTest/app/test_freecad_interface.py b/AddonManagerTest/app/test_freecad_interface.py index 2088bab1..a8646062 100644 --- a/AddonManagerTest/app/test_freecad_interface.py +++ b/AddonManagerTest/app/test_freecad_interface.py @@ -75,39 +75,34 @@ def test_log_no_freecad(self): """Test that if the FreeCAD import fails, the logger is set up correctly, and implements PrintLog""" sys.modules["FreeCAD"] = None - with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging: - import addonmanager_freecad_interface as fc + import addonmanager_freecad_interface as fc + with self.assertLogs("addonmanager", level="DEBUG") as cm: fc.Console.PrintLog("Test output") - self.assertTrue(isinstance(fc.Console, fc.ConsoleReplacement)) - self.assertTrue(mock_logging.log.called) def test_message_no_freecad(self): """Test that if the FreeCAD import fails the logger implements PrintMessage""" sys.modules["FreeCAD"] = None - with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging: - import addonmanager_freecad_interface as fc + import addonmanager_freecad_interface as fc + with self.assertLogs("addonmanager", level="INFO") as cm: fc.Console.PrintMessage("Test output") - self.assertTrue(mock_logging.info.called) def test_warning_no_freecad(self): """Test that if the FreeCAD import fails the logger implements PrintWarning""" sys.modules["FreeCAD"] = None - with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging: - import addonmanager_freecad_interface as fc + import addonmanager_freecad_interface as fc + with self.assertLogs("addonmanager", level="WARNING") as cm: fc.Console.PrintWarning("Test output") - self.assertTrue(mock_logging.warning.called) def test_error_no_freecad(self): """Test that if the FreeCAD import fails the logger implements PrintError""" sys.modules["FreeCAD"] = None - with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging: - import addonmanager_freecad_interface as fc + import addonmanager_freecad_interface as fc + with self.assertLogs("addonmanager", level="ERROR") as cm: fc.Console.PrintError("Test output") - self.assertTrue(mock_logging.error.called) class TestParameters(WrapTestFreeCADImports): diff --git a/AddonManagerTest/app/test_macro_cache_creator.py b/AddonManagerTest/app/test_macro_cache_creator.py index 9d01638d..d471f5f6 100644 --- a/AddonManagerTest/app/test_macro_cache_creator.py +++ b/AddonManagerTest/app/test_macro_cache_creator.py @@ -23,6 +23,7 @@ from unittest.mock import patch, MagicMock from MacroCacheCreator import MacroCatalog, CacheWriter +import addonmanager_freecad_interface as fci class TestMacroCatalog(unittest.TestCase): @@ -84,3 +85,30 @@ def test_retrieve_macros_fetch_failure(self, mock_console, mock_add_macro, mock_ instance.retrieve_macros_from_wiki() mock_add_macro.assert_not_called() + + def test_log_error(self): + instance = MacroCatalog() + instance.log_error("macro", "Test error") + instance.log_error("macro", "Test error 2") + self.assertIn("macro", instance.macro_errors) + self.assertEqual(len(instance.macro_errors["macro"]), 2) + + def test_fetch_macros_logs_errors(self): + + def fake_git(self): + fci.Console.PrintError("git failure") + + def fake_wiki(self): + fci.Console.PrintWarning("wiki failure") + + instance = MacroCatalog() + with patch.object(type(instance), "retrieve_macros_from_git", fake_git), patch.object( + type(instance), "retrieve_macros_from_wiki", fake_wiki + ): + instance.fetch_macros() + + messages = [record["msg"] for record in instance.log_buffer] + + self.assertIn("git failure", messages) + self.assertIn("wiki failure", messages) + self.assertEqual(len(messages), 2) diff --git a/AddonManagerTest/gui/test_icon_utilities.py b/AddonManagerTest/gui/test_icon_utilities.py index ceacee8d..7264d70d 100644 --- a/AddonManagerTest/gui/test_icon_utilities.py +++ b/AddonManagerTest/gui/test_icon_utilities.py @@ -3,9 +3,11 @@ import gzip import io +import struct import unittest from types import SimpleNamespace from unittest.mock import patch +import zlib import addonmanager_icon_utilities as iu @@ -435,3 +437,85 @@ def test_defaults_initialized_once_and_selected_by_repo_type(self): ) a3 = iu.get_icon_for_addon(addon3) # type: ignore[arg-type] self.assertIs(a3, iu.cached_default_icons["package"]) + + +def build_png_data(chunk_types: list[bytes]): + + def chunk(typ, data): + return ( + struct.pack(">I", len(data)) + + typ + + data + + struct.pack(">I", zlib.crc32(typ + data) & 0xFFFFFFFF) + ) + + png = b"\x89PNG\r\n\x1a\n" + + for chunk_type in chunk_types: + if chunk_type == b"IHDR": + # IHDR: 1x1 RGBA + ihdr = struct.pack(">IIBBBBB", 1, 1, 8, 6, 0, 0, 0) + png += chunk(b"IHDR", ihdr) + + elif chunk_type == b"iCCP": + # iCCP: profile name + compression method + compressed ICC data + icc_profile = b"FakeICCProfile" + icc_compressed = zlib.compress(icc_profile) + iccp = b"icc\x00" + b"\x00" + icc_compressed + png += chunk(b"iCCP", iccp) + + elif chunk_type == b"pHYs": + # 72 DPI + phys = struct.pack(">IIB", 2835, 2835, 1) + png += chunk(b"pHYs", phys) + + elif chunk_type == b"tEXt": + text = b"Comment\x00PNG test file" + png += chunk(b"tEXt", text) + + elif chunk_type == b"IDAT": + # IDAT: white pixel + raw = b"\x00\xff\xff\xff\xff" # filter + RGBA + png += chunk(b"IDAT", zlib.compress(raw)) + + elif chunk_type == b"IEND": + # IEND + png += chunk(b"IEND", b"") + + return png + + +class TestPNGAnalysis(unittest.TestCase): + + def test_get_chunk_types_simplest_possible_png(self): + simple_chunks = [b"IHDR", b"IDAT", b"IEND"] + png_data = build_png_data(simple_chunks) + chunks = iu.get_png_chunk_types(png_data) + self.assertEqual(chunks, simple_chunks) + + def test_get_chunk_types_more_complete_png(self): + more_chunks = [b"IHDR", b"iCCP", b"pHYs", b"tEXt", b"IDAT", b"IEND"] + png_data = build_png_data(more_chunks) + chunks = iu.get_png_chunk_types(png_data) + self.assertEqual(chunks, more_chunks) + + def test_get_chunk_types_extra_elements(self): + more_chunks = [b"IHDR", b"iCCP", b"iCCP", b"pHYs", b"tEXt", b"tEXt", b"IDAT", b"IEND"] + png_data = build_png_data(more_chunks) + chunks = iu.get_png_chunk_types(png_data) + self.assertEqual(chunks, more_chunks) + + def test_valid_no_iccp(self): + simple_chunks = [b"IHDR", b"IDAT", b"IEND"] + png_data = build_png_data(simple_chunks) + self.assertFalse(iu.png_has_duplicate_iccp(png_data)) + + def test_good_iccp(self): + chunks_with_iccp = [b"IHDR", b"iCCP", b"pHYs", b"tEXt", b"IDAT", b"IEND"] + png_data = build_png_data(chunks_with_iccp) + self.assertFalse(iu.png_has_duplicate_iccp(png_data)) + + def test_duplicate_iccp(self): + chunks_with_iccp = [b"IHDR", b"iCCP", b"iCCP", b"pHYs", b"tEXt", b"IDAT", b"IEND"] + png_data = build_png_data(chunks_with_iccp) + self.assertTrue(iu.png_has_duplicate_iccp(png_data)) diff --git a/MacroCacheCreator.py b/MacroCacheCreator.py index 05790b7f..556ead0f 100644 --- a/MacroCacheCreator.py +++ b/MacroCacheCreator.py @@ -22,10 +22,12 @@ """The MacroCacheCreator is an independent script run server-side to generate a cache of the macros and their metadata. Supports both git-based and wiki-based macros.""" +from collections import deque import contextlib import hashlib import io import json +import logging import os.path import re import sys @@ -70,21 +72,39 @@ def __init__(self): "macros_on_wiki": 0, "macros_on_git": 0, "duplicated_macros": 0, - "errors": 0, + "macros_with_errors": 0, } + self.log_buffer = deque(maxlen=100) def fetch_macros(self): - print("Retrieving macros from git...") - self.retrieve_macros_from_git() - print("Retrieving macros from wiki...") - self.retrieve_macros_from_wiki() - print("Downloading icons...") - for macro in self.macros.values(): - try: - self.get_icon(macro) - except RuntimeError as e: - self.macro_errors[macro.name] = str(e) - self.macro_stats["errors"] = len(self.macro_errors) + logger = logging.getLogger("addonmanager") + with capture_console_output( + logger, + handlers=[DequeHandler(self.log_buffer)], + level=logging.INFO, + propagate=False, + ): + print("Retrieving macros from git...") + self.retrieve_macros_from_git() + print("Retrieving macros from wiki...") + self.retrieve_macros_from_wiki() + print("Downloading icons...") + for number, macro in enumerate(self.macros.values()): + try: + if macro.icon.startswith("/* XPM */"): + macro.xpm = macro.icon + macro.icon = "" + print( + f"{number+1}/{len(self.macros)}: {macro.name} has an XPM icon, skipping download..." + ) + continue + print( + f"{number+1}/{len(self.macros)}: Downloading icon for {macro.name} from {macro.icon}..." + ) + self.get_icon(macro) + except RuntimeError as e: + self.log_error(macro.name, str(e)) + self.macro_stats["macros_with_errors"] = len(self.macro_errors) def create_cache(self) -> str: """Create a cache from the macros in this catalog""" @@ -101,7 +121,7 @@ def retrieve_macros_from_git(self): writer.clone_or_update(GIT_MACROS_CLONE_NAME, GIT_MACROS_URL, GIT_MACROS_BRANCH) except RuntimeError as e: print(f"Failed to clone git macros from {GIT_MACROS_URL}: {e}") - self.macro_errors["retrieve_macros_from_git"] = str(e) + self.log_error("**INTERNAL**", str(e)) return for dirpath, _, filenames in os.walk(os.path.join(os.getcwd(), GIT_MACROS_CLONE_NAME)): @@ -115,13 +135,19 @@ def retrieve_macros_from_git(self): def add_git_macro_to_cache(self, dirpath: str, filename: str): macro = Macro(filename[:-8]) # Remove ".FCMacro". if macro.name in self.macros: - print(f"Ignoring second macro named {macro.name} (found on git)\n") + self.log_error(macro.name, f"Ignoring second macro named {macro.name} (found on git)") return macro.on_git = True absolute_path_to_fcmacro = os.path.join(dirpath, filename) + self.log_buffer.clear() macro.fill_details_from_file(absolute_path_to_fcmacro) macro.src_filename = os.path.relpath(absolute_path_to_fcmacro, os.getcwd()) self.macros[macro.name] = macro + for log_entry in self.log_buffer: + level = log_entry.get("level", logging.INFO) + if level >= logging.WARNING: + self.log_error(macro.name, log_entry["msg"]) + self.log_buffer.clear() def retrieve_macros_from_wiki(self): """Retrieve macros from the wiki @@ -134,16 +160,17 @@ def retrieve_macros_from_wiki(self): p = requests.get(WIKI_MACROS_URL, headers=headers, timeout=10.0) except requests.exceptions.RequestException as e: message = f"Failed to fetch {WIKI_MACROS_URL}: {e}" - self.macro_errors["retrieve_macros_from_wiki"] = message + self.log_error("**INTERNAL**", message) return if not p.status_code == 200: message = f"Failed to fetch {WIKI_MACROS_URL}, response code was {p.status_code}" - self.macro_errors["retrieve_macros_from_wiki"] = message + self.log_error("**INTERNAL**", message) return macros = re.findall(r'title="(Macro.*?)"', p.text) macros = [mac for mac in macros if "translated" not in mac] - for _, wiki_page_name in enumerate(macros): + for number, wiki_page_name in enumerate(macros): + print(f"{number+1}/{len(macros)}: {wiki_page_name}") macro_name = wiki_page_name[6:] # Remove "Macro ". macro_name = macro_name.replace("&", "&") if not macro_name: @@ -156,7 +183,7 @@ def add_wiki_macro_to_cache(self, macro_name): macro = Macro(macro_name) if macro.name in self.macros: self.macro_stats["duplicated_macros"] += 1 - print(f"Ignoring duplicate of '{macro.name}' (using git repo copy instead of wiki)") + self.log_error(macro.name, "Using git repo copy instead of duplicate found on wiki") return macro.on_wiki = True macro.parsed = False @@ -165,7 +192,13 @@ def add_wiki_macro_to_cache(self, macro_name): wiki_page_name = wiki_page_name.replace("&", "%26") wiki_page_name = wiki_page_name.replace("+", "%2B") url = "https://wiki.freecad.org/Macro_" + wiki_page_name + self.log_buffer.clear() macro.fill_details_from_wiki(url) + for log_entry in self.log_buffer: + level = log_entry.get("level", logging.INFO) + if level >= logging.WARNING: + self.log_error(macro.name, log_entry["msg"]) + self.log_buffer.clear() def get_icon(self, macro: Macro): """Downloads the macro's icon from whatever source is specified and stores its binary @@ -178,7 +211,7 @@ def get_icon(self, macro: Macro): p = requests.get(macro.icon, headers=headers, timeout=10.0) except requests.exceptions.RequestException as e: message = f"Failed to get data from icon URL {macro.icon}: {e}" - self.macro_errors[macro.name] = message + self.log_error(macro.name, message) macro.icon = "" return if p.status_code == 200: @@ -186,17 +219,23 @@ def get_icon(self, macro: Macro): base, _, extension = filename.rpartition(".") if base.lower().startswith("file:"): message = f"Cannot use specified icon for {macro.name}, {macro.icon} is not a direct download link" - self.macro_errors[macro.name] = message + self.log_error(macro.name, message) macro.icon = "" return macro.icon_data = p.content macro.icon_extension = extension + + if icon_utils.png_has_duplicate_iccp(macro.icon_data): + message = f"MACRO DEVELOPER WARNING: multiple iCCP chunks found in PNG icon for {macro.name}" + self.log_error(macro.name, message) + macro.icon_data = None + macro.icon = "" else: message = ( f"MACRO DEVELOPER WARNING: failed to download icon from {macro.icon}" + f" for macro {macro.name}. Status code returned: {p.status_code}\n" ) - self.macro_errors[macro.name] = message + self.log_error(macro.name, message) macro.icon = "" elif macro.on_git: relative_path_to_macro_directory = os.path.dirname(macro.src_filename) @@ -219,24 +258,64 @@ def write(self, s): # Do some tests on the icon data to make sure it's valid throw_on_write = StderrAsError() if macro.icon and not macro.icon_data: - if macro.name not in self.macro_errors: - self.macro_errors[macro.name] = "There is no data for the icon" + self.log_error(macro.name, "There is no data for the icon") elif macro.icon.lower().endswith(".svg"): try: if not icon_utils.is_svg_bytes(macro.icon_data): - self.macro_errors[macro.name] = "SVG file does not have valid XML header" + self.log_error(macro.name, "SVG file does not have valid XML header") except icon_utils.BadIconData as e: - self.macro_errors[macro.name] = str(e) + self.log_error(macro.name, str(e)) elif macro.icon: try: with contextlib.redirect_stderr(throw_on_write): test_icon = icon_utils.icon_from_bytes(macro.icon_data) if test_icon.isNull(): - self.macro_errors[macro.name] = "Icon data is invalid" - except icon_utils.BadIconData as e: - self.macro_errors[macro.name] = str(e) - except RuntimeError as e: - self.macro_errors[macro.name] = str(e) + self.log_error(macro.name, "Icon data is invalid") + except (icon_utils.BadIconData, RuntimeError) as e: + self.log_error(macro.name, str(e)) + + def log_error(self, macro_name: str, error_message: str): + if macro_name not in self.macro_errors: + self.macro_errors[macro_name] = [] + self.macro_errors[macro_name].append(error_message) + + +class DequeHandler(logging.Handler): + def __init__(self, store: deque, level=logging.NOTSET): + super().__init__(level) + self.store = store + + def emit(self, record: logging.LogRecord) -> None: + self.store.append( + { + "name": record.name, + "level": record.levelno, + "msg": record.getMessage(), + "time": record.created, + "pathname": record.pathname, + "lineno": record.lineno, + "funcName": record.funcName, + } + ) + + +@contextlib.contextmanager +def capture_console_output(logger: logging.Logger, *, handlers, level=None, propagate=None): + old_handlers = list(logger.handlers) + old_level = logger.level + old_propagate = logger.propagate + + logger.handlers = list(handlers) + try: + if level is not None: + logger.setLevel(level) + if propagate is not None: + logger.propagate = propagate + yield + finally: + logger.handlers = old_handlers + logger.setLevel(old_level) + logger.propagate = old_propagate if __name__ == "__main__": diff --git a/addonmanager_freecad_interface.py b/addonmanager_freecad_interface.py index 355d3878..4ecd5a12 100644 --- a/addonmanager_freecad_interface.py +++ b/addonmanager_freecad_interface.py @@ -107,6 +107,8 @@ def get_python_exe(): getUserMacroDir = None loadUi = None + logger = logging.getLogger("addonmanager") + try: from PySide6 import QtCore, QtWidgets @@ -154,19 +156,19 @@ class ConsoleReplacement: @staticmethod def PrintLog(arg: str) -> None: - logging.log(logging.DEBUG, arg) + logger.log(logging.DEBUG, arg) @staticmethod def PrintMessage(arg: str) -> None: - logging.info(arg) + logger.info(arg) @staticmethod def PrintWarning(arg: str) -> None: - logging.warning(arg) + logger.warning(arg) @staticmethod def PrintError(arg: str) -> None: - logging.error(arg) + logger.error(arg) Console = ConsoleReplacement() diff --git a/addonmanager_icon_utilities.py b/addonmanager_icon_utilities.py index 7d1d7fdb..1e6bc7b4 100644 --- a/addonmanager_icon_utilities.py +++ b/addonmanager_icon_utilities.py @@ -21,7 +21,8 @@ import re import os -from typing import Optional +import struct +from typing import Optional, List from PySideWrapper import QtCore, QtGui, QtSvg @@ -138,6 +139,33 @@ def scalable_icon_from_svg_bytes(svg_bytes: bytes) -> QtGui.QIcon: cached_default_icons = {} +def get_png_chunk_types(data) -> List[bytes] | None: + PNG_SIG = b"\x89PNG\r\n\x1a\n" + if not data.startswith(PNG_SIG): + return None # not a PNG + i = 8 + out = [] + # PNG structure: [len:4][type:4][data:len][crc:4] repeating + while i + 12 <= len(data): + (length,) = struct.unpack(">I", data[i : i + 4]) + chunk_total = 8 + length + 4 # length+type + data + crc + if i + chunk_total > len(data): + return None + ctype = data[i + 4 : i + 8] + out.append(ctype) + i += chunk_total + if ctype == b"IEND": + break + return out + + +def png_has_duplicate_iccp(data) -> bool: + """Returns True if the PNG contains multiple ICCP chunks. Returns false if the iCCP is OK or + if this is not a PNG at all.""" + chunk_types = get_png_chunk_types(data) + return chunk_types is not None and chunk_types.count(b"iCCP") > 1 + + def get_icon_for_addon(addon: Addon, update: bool = False) -> QtGui.QIcon: """Returns an icon for an Addon. :param addon: The addon to get an icon for. diff --git a/addonmanager_macro_parser.py b/addonmanager_macro_parser.py index f3ae096a..ed3eca91 100644 --- a/addonmanager_macro_parser.py +++ b/addonmanager_macro_parser.py @@ -203,17 +203,23 @@ def _cleanup_comment(self): self.parse_results["comment"] = self.parse_results["comment"][:511] + "…" def _cleanup_license(self): + if len(self.parse_results["license"].strip()) == 0: + fci.Console.PrintWarning( + f"No license specified for macro {self.name} -- assuming 'All rights reserved'.\n" + ) + self.parse_results["license"] = "All rights reserved" + return if get_license_manager is not None: lm = get_license_manager() normed_license = lm.normalize(self.parse_results["license"]) if not normed_license: fci.Console.PrintWarning( - f"Failed to normalize license {self.parse_results['license']} for macro '{self.name}'\n" + f"Failed to normalize license '{self.parse_results['license']}' for macro '{self.name}'\n" ) return elif normed_license != self.parse_results["license"]: fci.Console.PrintMessage( - f"Normalized license {self.parse_results['license']} for macro '{self.name}' to {normed_license}\n" + f"Normalized license '{self.parse_results['license']}' for macro '{self.name}' to {normed_license}\n" ) self.parse_results["license"] = normed_license