From bfcbacd3a3f5349a30ef65bc9aa69b6ba4bcb854 Mon Sep 17 00:00:00 2001 From: fox0 <15684995+fox0@users.noreply.github.com> Date: Wed, 31 Dec 2025 14:36:25 +0700 Subject: [PATCH 1/2] Add argument --not-check-update `get_stats_display` now width default terminal size 80 --- src/main.py | 156 ++++++++++++++++++++++++++++------------------------ 1 file changed, 83 insertions(+), 73 deletions(-) diff --git a/src/main.py b/src/main.py index 022b646..4897631 100644 --- a/src/main.py +++ b/src/main.py @@ -32,8 +32,6 @@ __version__ = "2.1" -os.system("") - class ConnectionInfo: """Class to store connection information""" @@ -64,6 +62,7 @@ def __init__(self): self.no_blacklist = False self.auto_blacklist = False self.quiet = False + self.not_check_update = True class IBlacklistManager(ABC): @@ -285,7 +284,7 @@ def emit(self, record): ) error_handler.setFormatter( logging.Formatter( - "[%(asctime)s][%(levelname)s]: %(message)s", "%Y-%m-%d %H:%M:%S" + "[%(asctime)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S" ) ) error_handler.setLevel(logging.ERROR) @@ -414,6 +413,12 @@ def update_speeds(self) -> None: def get_stats_display(self) -> str: """Get formatted statistics display""" + if sys.stdout.isatty(): + console_width = os.get_terminal_size().columns + else: + console_width = 80 + # self.logger.debug("console_width=", console_width) + col_width = 30 conns_stat = f"\033[97mTotal: \033[93m{self.total_connections}\033[0m".ljust( @@ -470,17 +475,15 @@ def get_stats_display(self) -> str: ) ) - title = "STATISTICS" + title = " STATISTICS " - top_border = f"\033[92m{'═' * 36} {title} {'═' * 36}\033[0m" - line_conns = f"\033[92m {'Conns'.ljust(8)}:\033[0m {conns_stat}\033[0m" - line_traffic = f"\033[92m {'Traffic'.ljust(8)}:\033[0m {traffic_stat}\033[0m" - line_speed = f"\033[92m {'Speed'.ljust(8)}:\033[0m {speed_stat}\033[0m" - bottom_border = f"\033[92m{'═' * (36*2+len(title)+2)}\033[0m" + top_border = f"\033[92m{title.center(console_width, '=')}\033[0m" + line_conns = f"\033[92m{'Conns'.ljust(8)}:\033[0m {conns_stat}\033[0m" + line_traffic = f"\033[92m{'Traffic'.ljust(8)}:\033[0m {traffic_stat}\033[0m" + line_speed = f"\033[92m{'Speed'.ljust(8)}:\033[0m {speed_stat}\033[0m" + bottom_border = f"\033[92m{'═' * console_width}\033[0m" - return ( - f"{top_border}\n{line_conns}\n{line_traffic}\n{line_speed}\n{bottom_border}" - ) + return '\n'.join((top_border, line_conns, line_traffic, line_speed, bottom_border)) @staticmethod def format_size(size: int) -> str: @@ -910,6 +913,7 @@ async def check_for_updates(self): def sync_check(): try: + # self.logger.debug("Check for updates") req = Request( "https://gvcoder09.github.io/nodpi_site/api/v1/update_info.json", ) @@ -929,7 +933,7 @@ def sync_check(): if latest_version: self.update_available = latest_version self.update_event.set() - return f"\033[93m[UPDATE]: Available new version: v{latest_version} \033[97m" + return f"\033[93m[UPDATE] Available new version: v{latest_version} \033[97m" except Exception: pass finally: @@ -939,19 +943,20 @@ def sync_check(): async def print_banner(self) -> None: """Print startup banner""" - self.update_check_task = asyncio.create_task(self.check_for_updates()) + if not self.config.not_check_update: + self.update_check_task = asyncio.create_task(self.check_for_updates()) - try: - await asyncio.wait_for(self.update_event.wait(), timeout=2.0) - except asyncio.TimeoutError: - if self.update_check_task and not self.update_check_task.done(): - self.update_check_task.cancel() - try: - await self.update_check_task - except asyncio.CancelledError: - pass + try: + await asyncio.wait_for(self.update_event.wait(), timeout=2.0) + except asyncio.TimeoutError: + if self.update_check_task and not self.update_check_task.done(): + self.update_check_task.cancel() + try: + await self.update_check_task + except asyncio.CancelledError: + pass - self.logger.info("\033]0;NoDPI\007") + # self.logger.info("\033]0;NoDPI\007") if sys.platform == "win32": os.system("mode con: lines=33") @@ -960,6 +965,7 @@ async def print_banner(self) -> None: console_width = os.get_terminal_size().columns else: console_width = 80 + # self.logger.debug("console_width=", console_width) disclaimer = ( "DISCLAIMER. The developer and/or supplier of this software " @@ -979,7 +985,7 @@ async def print_banner(self) -> None: left_padding = (console_width - 76) // 2 - self.logger.info("\n\n\n") + # self.logger.info("\n\n\n") self.logger.info( "\033[91m" + " " * left_padding + "╔" + "═" * 72 + "╗" + "\033[0m" ) @@ -997,83 +1003,83 @@ async def print_banner(self) -> None: time.sleep(1) - update_message = None - if self.update_check_task and self.update_check_task.done(): - try: - update_message = self.update_check_task.result() - except (asyncio.CancelledError, Exception): - pass - - self.logger.info("\033[2J\033[H") + # self.logger.info("\033[2J\033[H") self.logger.info( - """ -\033[92m ██████ █████ ██████████ ███████████ █████ + """\033[92m + ██████ █████ ██████████ ███████████ █████ ░░██████ ░░███ ░░███░░░░███ ░░███░░░░░███░░███ ░███░███ ░███ ██████ ░███ ░░███ ░███ ░███ ░███ ░███░░███░███ ███░░███ ░███ ░███ ░██████████ ░███ ░███ ░░██████ ░███ ░███ ░███ ░███ ░███░░░░░░ ░███ ░███ ░░█████ ░███ ░███ ░███ ███ ░███ ░███ █████ ░░█████░░██████ ██████████ █████ █████ - ░░░░░ ░░░░░ ░░░░░░ ░░░░░░░░░░ ░░░░░ ░░░░░\033[0m - """ + ░░░░░ ░░░░░ ░░░░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ +\033[0m""" ) - self.logger.info(f"\033[92mVersion: {__version__}".center(50)) + self.logger.info(f"\033[92mVersion: {__version__}".center(console_width)) self.logger.info( "\033[97m" + - "Enjoy watching! / Наслаждайтесь просмотром!".center(50) + "Enjoy watching! / Наслаждайтесь просмотром!".center(console_width) ) - self.logger.info("\n") + # self.logger.info("\n") - if update_message: - self.logger.info(update_message) + if not self.config.not_check_update: + update_message = None + if self.update_check_task and self.update_check_task.done(): + try: + update_message = self.update_check_task.result() + except (asyncio.CancelledError, Exception): + pass + if update_message: + self.logger.info(update_message) self.logger.info( - f"\033[92m[INFO]:\033[97m Proxy is running on {self.config.host}:{self.config.port} at {datetime.now().strftime('%H:%M on %Y-%m-%d')}" + f"\033[92m[INFO]\033[97m Proxy is running on {self.config.host}:{self.config.port} at {datetime.now().strftime('%H:%M on %Y-%m-%d')}" ) self.logger.info( - f"\033[92m[INFO]:\033[97m The selected fragmentation method: {self.config.fragment_method}" + f"\033[92m[INFO]\033[97m The selected fragmentation method: {self.config.fragment_method}" ) - self.logger.info("") + # self.logger.info("") if isinstance(self.blacklist_manager, NoBlacklistManager): self.logger.info( - "\033[92m[INFO]:\033[97m Blacklist is disabled. All domains will be subject to unblocking." + "\033[92m[INFO]\033[97m Blacklist is disabled. All domains will be subject to unblocking." ) elif isinstance(self.blacklist_manager, AutoBlacklistManager): self.logger.info( - "\033[92m[INFO]:\033[97m Auto-blacklist is enabled") + "\033[92m[INFO]\033[97m Auto-blacklist is enabled") else: self.logger.info( - f"\033[92m[INFO]:\033[97m Blacklist contains {len(self.blacklist_manager.blocked)} domains" + f"\033[92m[INFO]\033[97m Blacklist contains {len(self.blacklist_manager.blocked)} domains" ) self.logger.info( - f"\033[92m[INFO]:\033[97m Path to blacklist: '{os.path.normpath(self.config.blacklist_file)}'" + f"\033[92m[INFO]\033[97m Path to blacklist: '{os.path.normpath(self.config.blacklist_file)}'" ) - self.logger.info("") + # self.logger.info("") if self.config.log_error_file: self.logger.info( - f"\033[92m[INFO]:\033[97m Error logging is enabled. Path to error log: '{self.config.log_error_file}'" + f"\033[92m[INFO]\033[97m Error logging is enabled. Path to error log: '{self.config.log_error_file}'" ) else: self.logger.info( - "\033[92m[INFO]:\033[97m Error logging is disabled") + "\033[92m[INFO]\033[97m Error logging is disabled") if self.config.log_access_file: self.logger.info( - f"\033[92m[INFO]:\033[97m Access logging is enabled. Path to access log: '{self.config.log_access_file}'" + f"\033[92m[INFO]\033[97m Access logging is enabled. Path to access log: '{self.config.log_access_file}'" ) else: self.logger.info( - "\033[92m[INFO]:\033[97m Access logging is disabled") + "\033[92m[INFO]\033[97m Access logging is disabled") - self.logger.info("") + # self.logger.info("") self.logger.info( - "\033[92m[INFO]:\033[97m To stop the proxy, press Ctrl+C twice" + "\033[92m[INFO]\033[97m To stop the proxy, press Ctrl+C twice" ) - self.logger.info("") + # self.logger.info("") async def display_stats(self) -> None: """Display live statistics""" @@ -1100,7 +1106,7 @@ async def run(self) -> None: ) except OSError: self.logger.error( - f"\033[91m[ERROR]: Failed to start proxy on this address ({self.config.host}:{self.config.port}). It looks like the port is already in use\033[0m" + f"\033[91m[ERROR] Failed to start proxy on this address ({self.config.host}:{self.config.port}). It looks like the port is already in use\033[0m" ) sys.exit(1) @@ -1133,7 +1139,7 @@ def create(config: ProxyConfig, logger: ILogger) -> IBlacklistManager: try: return FileBlacklistManager(config) except FileNotFoundError as e: - logger.error(f"\033[91m[ERROR]: {e}\033[0m") + logger.error(f"\033[91m[ERROR] {e}\033[0m") sys.exit(1) @@ -1155,6 +1161,7 @@ def load_from_args(args) -> ProxyConfig: config.no_blacklist = args.no_blacklist config.auto_blacklist = args.autoblacklist config.quiet = args.quiet + config.not_check_update = args.not_check_update return config @@ -1182,20 +1189,20 @@ def manage_autostart(action: str = "install") -> None: f'"{exe_path}" --blacklist "{os.path.dirname(exe_path)}/blacklist.txt"', ) print( - f"\033[92m[INFO]:\033[97m Added to autostart: {exe_path}") + f"\033[92m[INFO]\033[97m Added to autostart: {exe_path}") elif action == "uninstall": try: with winreg.OpenKey(key, reg_path, 0, winreg.KEY_WRITE) as regkey: winreg.DeleteValue(regkey, app_name) - print("\033[92m[INFO]:\033[97m Removed from autostart") + print("\033[92m[INFO]\033[97m Removed from autostart") except FileNotFoundError: - print("\033[91m[ERROR]: Not found in autostart\033[0m") + print("\033[91m[ERROR] Not found in autostart\033[0m") except PermissionError: - print("\033[91m[ERROR]: Access denied. Run as administrator\033[0m") + print("\033[91m[ERROR] Access denied. Run as administrator\033[0m") except Exception as e: - print(f"\033[91m[ERROR]: Autostart operation failed: {e}\033[0m") + print(f"\033[91m[ERROR] Autostart operation failed: {e}\033[0m") class LinuxAutostartManager(IAutostartManager): @@ -1248,15 +1255,15 @@ def manage_autostart(action: str = "install") -> None: ) print( - f"\033[92m[INFO]:\033[97m Service installed and started: {service_name}" + f"\033[92m[INFO]\033[97m Service installed and started: {service_name}" ) - print("\033[93m[NOTE]:\033[97m Service will auto-start on login") + print("\033[93m[NOTE]\033[97m Service will auto-start on login") except subprocess.CalledProcessError as e: - print(f"\033[91m[ERROR]: Systemd command failed: {e}\033[0m") + print(f"\033[91m[ERROR] Systemd command failed: {e}\033[0m") except Exception as e: print( - f"\033[91m[ERROR]: Autostart operation failed: {e}\033[0m") + f"\033[91m[ERROR] Autostart operation failed: {e}\033[0m") elif action == "uninstall": try: @@ -1277,13 +1284,13 @@ def manage_autostart(action: str = "install") -> None: subprocess.run( ["systemctl", "--user", "daemon-reload"], check=True) - print("\033[92m[INFO]:\033[97m Service removed from autostart") + print("\033[92m[INFO]\033[97m Service removed from autostart") except subprocess.CalledProcessError as e: - print(f"\033[91m[ERROR]: Systemd command failed: {e}\033[0m") + print(f"\033[91m[ERROR] Systemd command failed: {e}\033[0m") except Exception as e: print( - f"\033[91m[ERROR]: Autostart operation failed: {e}\033[0m") + f"\033[91m[ERROR] Autostart operation failed: {e}\033[0m") class ProxyApplication: @@ -1335,6 +1342,9 @@ def parse_args(): parser.add_argument( "-q", "--quiet", action="store_true", help="Remove UI output" ) + parser.add_argument( + "--not-check-update", action="store_true", help="Do not check for updates" + ) autostart_group = parser.add_mutually_exclusive_group() autostart_group.add_argument( @@ -1373,7 +1383,7 @@ async def run(cls): sys.exit(0) else: print( - "\033[91m[ERROR]: Autostart works only in executable version\033[0m" + "\033[91m[ERROR] Autostart works only in executable version\033[0m" ) sys.exit(1) @@ -1395,7 +1405,7 @@ async def run(cls): except asyncio.CancelledError: await proxy.shutdown() logger.info( - "\n" * 6 + "\033[92m[INFO]:\033[97m Shutting down proxy...") + "\n" * 6 + "\033[92m[INFO]\033[97m Shutting down proxy...") try: if sys.platform == "win32": os.system("mode con: lines=3000") From 3df09748f11b327b5a009890316a2ca91c4b1c9a Mon Sep 17 00:00:00 2001 From: fox0 <15684995+fox0@users.noreply.github.com> Date: Thu, 1 Jan 2026 18:43:19 +0700 Subject: [PATCH 2/2] Add argument --hide-banner --- src/main.py | 125 +++++++++++++++++++++++----------------------------- 1 file changed, 55 insertions(+), 70 deletions(-) diff --git a/src/main.py b/src/main.py index 4897631..3104a42 100644 --- a/src/main.py +++ b/src/main.py @@ -62,7 +62,8 @@ def __init__(self): self.no_blacklist = False self.auto_blacklist = False self.quiet = False - self.not_check_update = True + self.not_check_update = False + self.hide_banner = False class IBlacklistManager(ABC): @@ -421,30 +422,9 @@ def get_stats_display(self) -> str: col_width = 30 - conns_stat = f"\033[97mTotal: \033[93m{self.total_connections}\033[0m".ljust( - col_width - ) + "\033[97m| " + f"\033[97mMiss: \033[96m{self.allowed_connections}\033[0m".ljust( - col_width - ) + "\033[97m| " + f"\033[97mUnblock: \033[92m{self.blocked_connections}\033[0m".ljust( - col_width - ) + "\033[97m| " f"\033[97mErrors: \033[91m{self.errors_connections}\033[0m".ljust( - col_width - ) + conns_stat = f"\033[97mUnblock: \033[92m{self.blocked_connections}\033[0m".ljust(col_width) + "\033[97m| " + f"\033[97mMiss: \033[96m{self.allowed_connections}\033[0m".ljust(col_width) + "\033[97m| " + f"\033[97mTotal: \033[93m{self.total_connections}\033[0m".ljust(col_width) + "\033[97m| " + f"\033[97mErrors: \033[91m{self.errors_connections}\033[0m".ljust(col_width) - traffic_stat = ( - f"\033[97mTotal: \033[96m{self.format_size(self.traffic_out + self.traffic_in)}\033[0m".ljust( - col_width - ) - + "\033[97m| " - + f"\033[97mDL: \033[96m{self.format_size(self.traffic_in)}\033[0m".ljust( - col_width - ) - + "\033[97m| " - + f"\033[97mUL: \033[96m{self.format_size(self.traffic_out)}\033[0m".ljust( - col_width - ) - + "\033[97m| " - ) + traffic_stat = f"\033[97mDL: \033[96m{self.format_size(self.traffic_in)}\033[0m".ljust(col_width) + "\033[97m| " + f"\033[97mUL: \033[96m{self.format_size(self.traffic_out)}\033[0m".ljust(col_width) + "\033[97m| " + f"\033[97mTotal: \033[96m{self.format_size(self.traffic_out + self.traffic_in)}\033[0m".ljust(col_width) + "\033[97m| " avg_speed_in = ( self.average_speed_in[0] / self.average_speed_in[1] @@ -477,7 +457,7 @@ def get_stats_display(self) -> str: title = " STATISTICS " - top_border = f"\033[92m{title.center(console_width, '=')}\033[0m" + top_border = f"\033[92m{title.center(console_width, '═')}\033[0m" line_conns = f"\033[92m{'Conns'.ljust(8)}:\033[0m {conns_stat}\033[0m" line_traffic = f"\033[92m{'Traffic'.ljust(8)}:\033[0m {traffic_stat}\033[0m" line_speed = f"\033[92m{'Speed'.ljust(8)}:\033[0m {speed_stat}\033[0m" @@ -943,7 +923,7 @@ def sync_check(): async def print_banner(self) -> None: """Print startup banner""" - if not self.config.not_check_update: + if not (self.config.not_check_update or self.config.quiet): self.update_check_task = asyncio.create_task(self.check_for_updates()) try: @@ -961,52 +941,53 @@ async def print_banner(self) -> None: if sys.platform == "win32": os.system("mode con: lines=33") - if sys.stdout.isatty(): - console_width = os.get_terminal_size().columns - else: - console_width = 80 - # self.logger.debug("console_width=", console_width) + if not self.config.hide_banner: + if sys.stdout.isatty(): + console_width = os.get_terminal_size().columns + else: + console_width = 80 + # self.logger.debug("console_width=", console_width) + + disclaimer = ( + "DISCLAIMER. The developer and/or supplier of this software " + "shall not be liable for any loss or damage, including but " + "not limited to direct, indirect, incidental, punitive or " + "consequential damages arising out of the use of or inability " + "to use this software, even if the developer or supplier has been " + "advised of the possibility of such damages. The developer and/or " + "supplier of this software shall not be liable for any legal " + "consequences arising out of the use of this software. This includes, " + "but is not limited to, violation of laws, rules or regulations, " + "as well as any claims or suits arising out of the use of this software. " + "The user is solely responsible for compliance with all applicable laws " + "and regulations when using this software." + ) + wrapped_text = textwrap.TextWrapper(width=70).wrap(disclaimer) - disclaimer = ( - "DISCLAIMER. The developer and/or supplier of this software " - "shall not be liable for any loss or damage, including but " - "not limited to direct, indirect, incidental, punitive or " - "consequential damages arising out of the use of or inability " - "to use this software, even if the developer or supplier has been " - "advised of the possibility of such damages. The developer and/or " - "supplier of this software shall not be liable for any legal " - "consequences arising out of the use of this software. This includes, " - "but is not limited to, violation of laws, rules or regulations, " - "as well as any claims or suits arising out of the use of this software. " - "The user is solely responsible for compliance with all applicable laws " - "and regulations when using this software." - ) - wrapped_text = textwrap.TextWrapper(width=70).wrap(disclaimer) + left_padding = (console_width - 76) // 2 - left_padding = (console_width - 76) // 2 + # self.logger.info("\n\n\n") + self.logger.info( + "\033[91m" + " " * left_padding + "╔" + "═" * 72 + "╗" + "\033[0m" + ) - # self.logger.info("\n\n\n") - self.logger.info( - "\033[91m" + " " * left_padding + "╔" + "═" * 72 + "╗" + "\033[0m" - ) + for line in wrapped_text: + padded_line = line.ljust(70) + self.logger.info( + "\033[91m" + " " * left_padding + + "║ " + padded_line + " ║" + "\033[0m" + ) - for line in wrapped_text: - padded_line = line.ljust(70) self.logger.info( - "\033[91m" + " " * left_padding + - "║ " + padded_line + " ║" + "\033[0m" + "\033[91m" + " " * left_padding + "╚" + "═" * 72 + "╝" + "\033[0m" ) - self.logger.info( - "\033[91m" + " " * left_padding + "╚" + "═" * 72 + "╝" + "\033[0m" - ) + time.sleep(1) - time.sleep(1) + # self.logger.info("\033[2J\033[H") - # self.logger.info("\033[2J\033[H") - - self.logger.info( - """\033[92m + self.logger.info( + """\033[92m ██████ █████ ██████████ ███████████ █████ ░░██████ ░░███ ░░███░░░░███ ░░███░░░░░███░░███ ░███░███ ░███ ██████ ░███ ░░███ ░███ ░███ ░███ @@ -1016,14 +997,14 @@ async def print_banner(self) -> None: █████ ░░█████░░██████ ██████████ █████ █████ ░░░░░ ░░░░░ ░░░░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ \033[0m""" - ) - self.logger.info(f"\033[92mVersion: {__version__}".center(console_width)) - self.logger.info( - "\033[97m" + - "Enjoy watching! / Наслаждайтесь просмотром!".center(console_width) - ) + ) + self.logger.info(f"\033[92mVersion: {__version__}".center(console_width)) + self.logger.info( + "\033[97m" + + "Enjoy watching! / Наслаждайтесь просмотром!".center(console_width) + ) - # self.logger.info("\n") + # self.logger.info("\n") if not self.config.not_check_update: update_message = None @@ -1162,6 +1143,7 @@ def load_from_args(args) -> ProxyConfig: config.auto_blacklist = args.autoblacklist config.quiet = args.quiet config.not_check_update = args.not_check_update + config.hide_banner = args.hide_banner return config @@ -1345,6 +1327,9 @@ def parse_args(): parser.add_argument( "--not-check-update", action="store_true", help="Do not check for updates" ) + parser.add_argument( + "--hide-banner", action="store_true", help="Suppress printing banner" + ) autostart_group = parser.add_mutually_exclusive_group() autostart_group.add_argument(