diff --git a/src/CheckUpdateGui.py b/src/CheckUpdateGui.py index 014bde5..0804faa 100644 --- a/src/CheckUpdateGui.py +++ b/src/CheckUpdateGui.py @@ -1,13 +1,18 @@ -from PyQt5.QtWidgets import QWidget, QDialog, QScrollArea, QLabel, QVBoxLayout, QApplication, QHBoxLayout, QSpacerItem, \ - QSizePolicy, QPushButton, QFrame, QMessageBox, QFormLayout, QProgressDialog, QTextEdit, QComboBox -from githubApi import GitHub, Release, SourceManager, PingThread from PyQt5.QtCore import QObject, pyqtSlot, Qt, pyqtSignal, QUrl, QPropertyAnimation, \ - QRect, QSize, pyqtProperty, QVariantAnimation,QDateTime,QEvent,QEasingCurve -from PyQt5.QtGui import QDesktopServices, QFont, QIcon, QMouseEvent, QPainter, QPixmap, QPaintEvent,QEnterEvent + QSize, QVariantAnimation, QDateTime, QEvent, QEasingCurve +from PyQt5.QtGui import QDesktopServices, QFont, QMouseEvent, QPainter, QPixmap, QEnterEvent +from PyQt5.QtWidgets import QWidget, QDialog, QScrollArea, QLabel, QVBoxLayout, QHBoxLayout, QSpacerItem, \ + QSizePolicy, QPushButton, QFrame, QMessageBox, QFormLayout, QProgressDialog, QTextEdit, QComboBox + +from githubApi import GitHub, ReleaseInfo, PingThread class AnimationButton(QPushButton): def __init__(self, parent=None): + """ + :param parent: QWidget + AnimationButton的构造函数 + .""" super().__init__(parent) self.pixmap: QPixmap = None self.clicked.connect(self.animationStart) @@ -22,6 +27,7 @@ def rotate(self): def setRotate(self, rotate): self.__rotate = rotate self.update() + # rotate = property(int,rotate, setRotate) def paintEvent(self, event): @@ -32,11 +38,16 @@ def paintEvent(self, event): painter.translate(self.width() // 2, self.height() // 2) size = self.width() if self.width() < self.height() else self.height() painter.rotate(self.rotate()) - painter.drawPixmap(-size// 2, -size // - 2, self.pixmap.scaled(size,size)) - self.setMask(self.pixmap.scaled(size,size).mask()) + painter.drawPixmap(-size // 2, -size // + 2, self.pixmap.scaled(size, size)) + self.setMask(self.pixmap.scaled(size, size).mask()) def animationStart(self, check): + """ + :param check: bool + 点击按钮时,开始动画。 + check为True时,从0度到90度,否则从90度到0度,持续时间300ms。 + """ if check: self.animation.setStartValue(0) self.animation.setEndValue(90) @@ -48,15 +59,16 @@ def animationStart(self, check): class ReleaseFrame(QFrame): - downLoadFile = pyqtSignal(Release) + downLoadFile = pyqtSignal(ReleaseInfo) - def __init__(self, release: Release, mode=">", parent=None, r_path=""): + def __init__(self, release: ReleaseInfo, mode=">", parent=None, r_path=""): super().__init__(parent) - self.release: Release = release + self.release: ReleaseInfo = release self.showButton = AnimationButton() self.showButton.setToolTip(self.tr("unfold")) self.showButton.setCheckable(True) - self.showButton.pixmap = QPixmap(str(r_path.with_name('media').joinpath('unfold.png')).replace("\\", "/")) + self.showButton.pixmap = QPixmap(str(r_path.with_name( + 'media').joinpath('unfold.png')).replace("\\", "/")) self.dateTimeLabel = QLabel() self.titleWidget = QWidget() self.formWidget = QWidget() @@ -77,7 +89,8 @@ def initUi(self): row1.addWidget(QLabel(self.release.tag_name)) row1.addItem(QSpacerItem( 20, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)) - self.dateTimeLabel.setText(QDateTime.fromString(self.release.assets_created_at, "yyyy-MM-ddThh:mm:ssZ").toString("yyyy-MM-dd hh:mm:ss")) + self.dateTimeLabel.setText(QDateTime.fromString( + self.release.assets_created_at, "yyyy-MM-ddThh:mm:ssZ").toString("yyyy-MM-dd hh:mm:ss")) row1.addWidget(self.dateTimeLabel) row1.addItem(QSpacerItem( 20, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)) @@ -102,7 +115,8 @@ def initUi(self): formLayout.addRow(QObject.tr(self, "download_count"), QLabel(str(self.release.assets_download_count))) formLayout.addRow(QObject.tr(self, "created_at"), - QLabel(QDateTime.fromString(self.release.assets_created_at, "yyyy-MM-ddThh:mm:ssZ").toString("yyyy-MM-dd hh:mm:ss"))) + QLabel(QDateTime.fromString(self.release.assets_created_at, "yyyy-MM-ddThh:mm:ssZ").toString( + "yyyy-MM-dd hh:mm:ss"))) downloadUrlLabel = QLabel() downloadUrlLabel.setText("" + QObject.tr(self, "open download links") + "") @@ -124,25 +138,23 @@ def initUi(self): layout.addWidget(self.titleWidget) layout.addWidget(self.formWidget) self.setLayout(layout) - rgbStr = "" - if self.mode == ">": - # 样式表绿色 - rgbStr = "rgb(200,255,250)" - elif self.mode == "=": - # 样式表蓝色 - rgbStr = "rgb(200,216,230)" - else: - # 样式表红色背景 - rgbStr = "rgb(249, 179, 163)" - # label字体微软雅黑Ui,大小13,圆角 8 - self.setStyleSheet( - f"QFrame{{background-color:{rgbStr}; font-family:Microsoft YaHei UI; font-size:14px; border-radius: 5px;}}") + self.__setStyleSheet(False) def initConnect(self): self.showButton.clicked.connect(self.showButtonClicked) self.downloadButton.clicked.connect(self.downLoadButtonClicked) def showButtonClicked(self, checked: bool): + """ + 切换显示和隐藏的按钮点击事件处理函数。 + + 根据参数checked的值,切换formWidget的可见性,并执行相应的展开或折叠动画。 + 当checked为True时,展开formWidget并设置按钮提示为"fold", + 否则折叠formWidget并设置按钮提示为"unfold"。 + + Args: + checked (bool): 指示按钮是否被选中的布尔值。 + """ self.formWidget.setVisible(True) animation = QPropertyAnimation(self.formWidget, b"size", self) easingCurveType = QEasingCurve.Type.OutBack if checked else QEasingCurve.Type.InBack @@ -151,10 +163,6 @@ def showButtonClicked(self, checked: bool): animation2.setEasingCurve(easingCurveType) animation.setDuration(300) animation2.setDuration(300) - start: QSize = None - end: QSize = None - start1: QSize = None - end1: QSize = None if checked: self.showButton.setToolTip(QObject.tr(self, "fold")) start = QSize(self.width(), 0) @@ -174,22 +182,23 @@ def showButtonClicked(self, checked: bool): animation2.start(QPropertyAnimation.DeletionPolicy.DeleteWhenStopped) animation.start(QPropertyAnimation.DeletionPolicy.DeleteWhenStopped) animation.finished.connect(lambda: self.formWidget.setVisible(checked)) - animation.finished.connect(lambda: self.resWidth()) + # animation.finished.connect(lambda: self.resWidth()) def downLoadButtonClicked(self): self.downLoadFile.emit(self.release) def resWidth(self): - area:QScrollArea = self.parentWidget().parentWidget().parentWidget() - width = area.verticalScrollBar().width() if area.verticalScrollBar().isVisible() else 0 + area: QScrollArea = self.parentWidget().parentWidget().parentWidget() + width = area.verticalScrollBar().width( + ) if area.verticalScrollBar().isVisible() else 0 self.resize(area.width() - width, self.height()) - + def mousePressEvent(self, a0: QMouseEvent) -> None: super().mousePressEvent(a0) if self.titleWidget.geometry().contains(a0.pos()) and a0.button() == Qt.LeftButton: self.showButton.click() - def __setStyleSheet(self,isEnter:bool): - rgbStr = "" + + def __setStyleSheet(self, isEnter: bool): if self.mode == ">": # 样式表绿色 rgbStr = "rgb(200,255,250)" @@ -200,7 +209,6 @@ def __setStyleSheet(self,isEnter:bool): # 样式表红色背景 rgbStr = "rgb(249, 179, 163)" - if isEnter: # label字体微软雅黑Ui,大小13,圆角 8,1px的蓝色边框,只针对ReleaseFrame self.setStyleSheet( @@ -210,21 +218,26 @@ def __setStyleSheet(self,isEnter:bool): else: # label字体微软雅黑Ui,大小13,圆角 8 self.setStyleSheet( - f"QFrame{{background-color:{rgbStr}; font-family:Microsoft YaHei UI; font-size:14px; border-radius: 5px;}}") + f"""QFrame{{background-color:{rgbStr}; font-family:Microsoft YaHei UI; font-size:14px; border-radius: 5px;}} + ReleaseFrame{{border: 1px solid transparent;}} + """) + def enterEvent(self, a0: QEnterEvent) -> None: self.__setStyleSheet(True) return super().enterEvent(a0) - + def leaveEvent(self, a0: QEvent) -> None: self.__setStyleSheet(False) return super().leaveEvent(a0) - + + class CheckUpdateGui(QDialog): def __init__(self, github: GitHub, parent=None): super().__init__(parent.mainWindow) self.setWindowTitle(QObject.tr(self, "CheckUpdate")) # 去掉问号 - self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint) + self.setWindowFlags(self.windowFlags() & ~ + Qt.WindowContextHelpButtonHint) self.r_path = parent.r_path self.github: GitHub = github self.github.setParent(self) @@ -236,6 +249,9 @@ def __init__(self, github: GitHub, parent=None): self.releaseArea.setHorizontalScrollBarPolicy( Qt.ScrollBarPolicy.ScrollBarAlwaysOff ) + self.releaseArea.setVerticalScrollBarPolicy( + Qt.ScrollBarPolicy.ScrollBarAlwaysOn + ) self.pingThread = None self.sourceSpeedLabel = QLabel() self.sourceCombo = QComboBox() @@ -290,15 +306,19 @@ def initConnect(self): self.github.downloadReleaseAsyncFinishSignal.connect( self.hideDownloadDialog) self.sourceCombo.currentTextChanged.connect(self.changeSource) + def changeSource(self, source: str): - self.pingThread = PingThread(source, self.github.sourceManager.sources[source]) + self.pingThread = PingThread( + source, self.github.sourceManager.sources[source]) self.sourceSpeedLabel.setText("---ms") - self.pingThread.pingSignal.connect(lambda x,y: self.sourceSpeedLabel.setText(f"{int(y)}ms")) + self.pingThread.pingSignal.connect( + lambda x, y: self.sourceSpeedLabel.setText(f"{int(y)}ms")) self.pingThread.start() self.github.sourceManager.currentSource = source self.checkUpdateButton.click() + @pyqtSlot(list) - def checkUpdate(self, releases: list[Release]): + def checkUpdate(self, releases: list[ReleaseInfo]): widget = self.releaseArea.widget() if widget is not None: widget.deleteLater() @@ -328,7 +348,7 @@ def showError(self, msg: str): widget.deleteLater() QMessageBox.critical(self, QObject.tr(self, "Error"), msg) - def showDownloadDialog(self, release: Release): + def showDownloadDialog(self, release: ReleaseInfo): if self.processDialog is not None: self.processDialog.close() self.processDialog = QProgressDialog(self) @@ -344,7 +364,7 @@ def updateDownloadDialog(self, a: int, b: int): self.processDialog.setValue(a) self.processDialog.setMaximum(b) self.processDialog.setLabelText( - f'{a/1000000 : .2f}/{b/1000000 : .2f} MB') + f'{a / 1000000: .2f}/{b / 1000000: .2f} MB') def hideDownloadDialog(self, path: str): if self.processDialog is not None: @@ -358,7 +378,6 @@ def downloadCancel(self): if self.processDialog is not None: self.processDialog.close() self.processDialog = None - # if __name__ == '__main__': # import sys # app = QApplication(sys.argv) diff --git a/src/githubApi.py b/src/githubApi.py index 7b3507f..f1b685d 100644 --- a/src/githubApi.py +++ b/src/githubApi.py @@ -1,69 +1,71 @@ - -from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply -from PyQt5.QtCore import QUrl,QObject,pyqtSignal,QEventLoop,QCoreApplication,QThread,QElapsedTimer import json -import re import os +import re import tempfile -import subprocess import uuid +from PyQt5.QtCore import QUrl, QObject, pyqtSignal, QEventLoop, QCoreApplication, QThread, QElapsedTimer +from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply + + class PingThread(QThread): - pingSignal = pyqtSignal(str,float) - def __init__(self, name: str,url: str,parent : QObject | None = None) -> None: + pingSignal = pyqtSignal(str, float) + + def __init__(self, name: str, url: str, parent: QObject | None = None) -> None: super().__init__(parent) - self.reSet(name,url) - - def reSet(self,name: str,url: str): + self.host = None + self.url = None + self.name = None + self.reSet(name, url) + + def reSet(self, name: str, url: str) -> None: self.name = name self.url = url self.host = self.url.split("/") - - - def ping(self): + + def ping(self) -> float: timer = QElapsedTimer() timer.start() try: - nam = QNetworkAccessManager(self) + nam = QNetworkAccessManager() request = QNetworkRequest(QUrl(self.url)) - + reply = nam.get(request) loop = QEventLoop() reply.finished.connect(loop.quit) loop.exec_() - - except Exception as e: + except Exception: return float('inf') time = timer.elapsed() return time - def run(self): + def run(self) -> None: responseTime = self.ping() self.pingSignal.emit(self.name, responseTime) class SourceManager(QObject): - quickSource = pyqtSignal(str,float) - def __init__(self,sources:dict,parent=None): - super().__init__() - if not isinstance(sources,dict): + quickSource = pyqtSignal(str, float) + + def __init__(self, sources: dict, current: any = None, parent=None): + super().__init__(parent) + if not isinstance(sources, dict): raise TypeError if not sources: raise ValueError self.sources = sources # 第一个的key - self.currentSource = list(sources.keys())[0] + self.currentSource = list(sources.keys())[0] if current is None else current self.speedData = {} self.threads = [] - @property def sources(self): return self.__sources @sources.setter - def sources(self,sources:dict): - if not isinstance(sources,dict): + def sources(self, sources: dict): + if not isinstance(sources, dict): raise TypeError if not sources: raise ValueError @@ -71,16 +73,16 @@ def sources(self,sources:dict): sources[key] = sources[key].strip('/') self.__sources = sources self.currentSource = list(sources.keys())[0] - + @property def currentSource(self): """ 当前使用的源 """ return self.__currentSource - + @currentSource.setter - def currentSource(self,currentSource:str): + def currentSource(self, currentSource: str): """ 设置当前使用的源 @@ -110,40 +112,46 @@ def checkSourceSpeed(self): self.threads.append(thread) thread.start() - def __pingSignal(self,url: str, responseTime: float): + def __pingSignal(self, url: str, responseTime: float): self.speedData[url] = responseTime if len(self.speedData) == len(self.sources): quickTime = float('inf') - quickKey = '' - for key,value in self.speedData.items(): + quickKey = '' + for key, value in self.speedData.items(): if value < quickTime: quickTime = value quickKey = key - self.quickSource.emit(quickKey,quickTime) - -class Release: - def __init__(self,data:dict,versionReStr:str) -> None: - self.tag_name = data['tag_name'] if re.search(versionReStr,data['tag_name']) else data['name'] - self.body = data['body'] - self.html_url = data['html_url'] - m_assert = data['assets'][0] + self.quickSource.emit(quickKey, quickTime) + + +class ReleaseInfo: + def __init__(self, dataJson: dict, versionReStr: str) -> None: + self.tag_name = dataJson['tag_name'] if re.search( + versionReStr, dataJson['tag_name']) else dataJson['name'] + self.body = dataJson['body'] + self.html_url = dataJson['html_url'] + m_assert = dataJson['assets'][0] self.assets_name = m_assert['name'] self.assets_content_type = m_assert['content_type'] self.assets_size = m_assert['size'] self.assets_download_count = m_assert['download_count'] self.assets_created_at = m_assert['created_at'] self.assets_browser_download_url = m_assert['browser_download_url'] + + class GitHub(QObject): isNeedUpdateAsyncSignal = pyqtSignal(bool) - latestReleaseAsyncSignal = pyqtSignal(Release) + latestReleaseAsyncSignal = pyqtSignal(ReleaseInfo) releasesAsyncSignal = pyqtSignal(list) - downloadReleaseAsyncStartSignal = pyqtSignal(Release) - downloadReleaseAsyncProgressSignal = pyqtSignal(int,int) + downloadReleaseAsyncStartSignal = pyqtSignal(ReleaseInfo) + downloadReleaseAsyncProgressSignal = pyqtSignal(int, int) downloadReleaseAsyncFinishSignal = pyqtSignal(str) errorSignal = pyqtSignal(str) - def __init__(self,sourcemanager:SourceManager,owner:str,repo:str,version:str,versionReStr:str,parent=None): + + def __init__(self, sourceManager: SourceManager, owner: str, repo: str, version: str, versionReStr: str, + parent=None): """ - :param sourcemanager: 一个SourceManager实例,提供了多个github api的url + :param sourceManager: 一个SourceManager实例,提供了多个github api的url :param owner: github的owner :param repo: github的仓库名 :param version: 当前的版本号 @@ -151,82 +159,80 @@ def __init__(self,sourcemanager:SourceManager,owner:str,repo:str,version:str,ver :param parent: 一个QObject实例,父对象 """ super().__init__(parent) - self.__sourcemanager = sourcemanager + self.__sourceManager = sourceManager self.__owner = owner self.__repo = repo self.__version = version self.__versionReStr = versionReStr self.__replyDict = {} - + @property - def sourceManager(self): - return self.__sourcemanager + def sourceManager(self) -> SourceManager: + return self.__sourceManager @sourceManager.setter - def sourceManager(self,sourcemanager:SourceManager): - self.__sourcemanager = sourcemanager - + def sourceManager(self, sourceManager: SourceManager) -> None: + if self.__sourceManager is not None: + self.__sourceManager.deleteLater() + self.__sourceManager = sourceManager + self.__sourceManager.setParent(self) + @property - def owner(self): + def owner(self) -> str: return self.__owner @owner.setter - def owner(self,owner:str): + def owner(self, owner: str) -> None: self.__owner = owner @property - def repo(self): + def repo(self) -> str: return self.__repo @repo.setter - def repo(self,repo:str): + def repo(self, repo: str) -> None: self.__repo = repo @property - def version(self): + def version(self) -> str: return self.__version @version.setter - def version(self,version:str): + def version(self, version: str) -> None: self.__version = version @property - def versionReStr(self): + def versionReStr(self) -> str: return self.__versionReStr @versionReStr.setter - def versionReStr(self,versionReStr:str): + def versionReStr(self, versionReStr: str) -> None: self.__versionReStr = versionReStr + @property def latestReleaseUrl(self) -> str: """ 获取最新的release的url """ return f'{self.sourceManager.currentSourceUrl}/{self.__owner}/{self.__repo}/releases/latest' + @property def releasesUrl(self) -> str: """ 获取所有release的url """ return f'{self.sourceManager.currentSourceUrl}/{self.__owner}/{self.__repo}/releases' - @property - def header(self) -> dict: - return { - 'Accept':'application/vnd.github.v3+json', - 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36' - } - def isNeedUpdate(self,isAsync:bool = True) -> bool | str | None: + + def isNeedUpdate(self, isAsync: bool = True) -> bool | str | None: """ 判断是否需要更新 - + :param isAsync: 是否异步 :return: bool :如果不是异步,返回True表示需要更新,False表示不需要更新,None表示网络错误 \n str: 如果是异步,返回一个当前请求的唯一标识,并发射isNeedUpdateAsyncSignal信号 """ - - nam = QNetworkAccessManager(self) - request = QNetworkRequest(QUrl(self.latestReleaseUrl)) - request.setAttribute(QNetworkRequest.Attribute.FollowRedirectsAttribute,True) + + nam, request = self.__createRequest(self.latestReleaseUrl) reply = nam.get(request) reply.setObjectName(str(uuid.uuid1())) self.__replyDict[reply.objectName()] = reply @@ -237,40 +243,39 @@ def isNeedUpdate(self,isAsync:bool = True) -> bool | str | None: if reply.error() != QNetworkReply.NoError: self.error(reply.errorString()) return None - data = reply.readAll().data().decode('utf-8') + dataStr = reply.readAll().data().decode('utf-8') self.__replyDict.pop(reply.objectName()) reply.deleteLater() - return self.__isNeedUpdate(data) + return self.__isNeedUpdate(dataStr) else: nam.finished.connect(self.__isNeedUpdateAsync) return reply.objectName() - - def __isNeedUpdate(self,data)->bool: - release = Release(json.loads(data),self.versionReStr) + + def __isNeedUpdate(self, dataStr: str) -> bool: + release = ReleaseInfo(json.loads(dataStr), self.versionReStr) return self.compareVersion(release.tag_name) == '>' - - def __isNeedUpdateAsync(self,reply:QNetworkReply): + + def __isNeedUpdateAsync(self, reply: QNetworkReply): if reply.error() == QNetworkReply.NetworkError.OperationCanceledError: pass elif reply.error() != QNetworkReply.NoError: self.error(reply.errorString()) else: - self.isNeedUpdateAsyncSignal.emit(self.__isNeedUpdate(reply.readAll().data().decode('utf-8'))) + self.isNeedUpdateAsyncSignal.emit( + self.__isNeedUpdate(reply.readAll().data().decode('utf-8'))) self.__replyDict.pop(reply.objectName()) reply.deleteLater() - def latestRelease(self,isAsync:bool = True) -> Release | str | None: + def latestRelease(self, isAsync: bool = True) -> ReleaseInfo | str | None: """ 获取最新的release - + :param isAsync: 是否异步 :return: bool :如果不是异步,返回一个Release实例 str: 如果是异步,返回一个当前请求的唯一标识,并发射latestReleaseAsyncSignal信号 """ - - nam = QNetworkAccessManager(self) - request = QNetworkRequest(QUrl(self.latestReleaseUrl)) - request.setAttribute(QNetworkRequest.Attribute.FollowRedirectsAttribute,True) + + nam, request = self.__createRequest(self.latestReleaseUrl) reply = nam.get(request) reply.setObjectName(str(uuid.uuid1())) self.__replyDict[reply.objectName()] = reply @@ -281,35 +286,34 @@ def latestRelease(self,isAsync:bool = True) -> Release | str | None: if reply.error() != QNetworkReply.NoError: self.error(reply.errorString()) return None - data = reply.readAll().data().decode('utf-8') + dataStr = reply.readAll().data().decode('utf-8') self.__replyDict.pop(reply.objectName()) reply.deleteLater() - return Release(json.loads(data),self.versionReStr) + return ReleaseInfo(json.loads(dataStr), self.versionReStr) else: nam.finished.connect(self.__lastReleaseAsync) return reply.objectName() - - def __lastReleaseAsync(self,reply:QNetworkReply): + + def __lastReleaseAsync(self, reply: QNetworkReply): if reply.error() == QNetworkReply.NetworkError.OperationCanceledError: pass elif reply.error() != QNetworkReply.NoError: self.error(reply.errorString()) else: - self.latestReleaseAsyncSignal.emit(Release(json.loads(reply.readAll().data().decode('utf-8')),self.versionReStr)) + self.latestReleaseAsyncSignal.emit( + ReleaseInfo(json.loads(reply.readAll().data().decode('utf-8')), self.versionReStr)) self.__replyDict.pop(reply.objectName()) reply.deleteLater() - - def releases(self,isAsync:bool = True) -> list | str | None: + + def releases(self, isAsync: bool = True) -> list | str | None: """ 获取所有release - + :param isAsync: 是否异步 :return: list :如果不是异步,返回一个Release实例的列表 str: 如果是异步,返回一个当前请求的唯一标识,并发射releasesAsyncSignal信号 """ - nam = QNetworkAccessManager(self) - request = QNetworkRequest(QUrl(self.releasesUrl)) - request.setAttribute(QNetworkRequest.Attribute.FollowRedirectsAttribute,True) + nam, request = self.__createRequest(self.releasesUrl) reply = nam.get(request) reply.setObjectName(str(uuid.uuid1())) self.__replyDict[reply.objectName()] = reply @@ -320,19 +324,18 @@ def releases(self,isAsync:bool = True) -> list | str | None: if reply.error() != QNetworkReply.NoError: self.error(reply.errorString()) return None - data = reply.readAll().data().decode('utf-8') + dataStr = reply.readAll().data().decode('utf-8') self.__replyDict.pop(reply.objectName()) reply.deleteLater() - return self.__releases(data) + return self.__releases(dataStr) else: nam.finished.connect(self.__releasesAsync) return reply.objectName() - - - def __releases(self,data) -> list | None: - return [Release(x,self.versionReStr) for x in json.loads(data)] - - def __releasesAsync(self,reply:QNetworkReply): + + def __releases(self, dataStr: str) -> list | None: + return [ReleaseInfo(x, self.versionReStr) for x in json.loads(dataStr)] + + def __releasesAsync(self, reply: QNetworkReply): if reply.error() == QNetworkReply.NetworkError.OperationCanceledError: pass elif reply.error() != QNetworkReply.NoError: @@ -340,44 +343,60 @@ def __releasesAsync(self,reply:QNetworkReply): else: releases = self.__releases(reply.readAll().data().decode('utf-8')) self.releasesAsyncSignal.emit(releases) - self.__replyDict.pop(reply.objectName(),None) + self.__replyDict.pop(reply.objectName(), None) reply.deleteLater() - - def downloadRelease(self,release:Release) -> str: + def downloadRelease(self, release: ReleaseInfo) -> str: """ 下载release - + :param release: 需要下载的release :return: 一个当前请求的唯一标识 - """ + """ self.downloadReleaseAsyncStartSignal.emit(release) - nam = QNetworkAccessManager(self) - request = QNetworkRequest(QUrl(release.assets_browser_download_url)) - request.setAttribute(QNetworkRequest.Attribute.FollowRedirectsAttribute,True) + nam, request = self.__createRequest(release.assets_browser_download_url) reply = nam.get(request) reply.setObjectName(str(uuid.uuid1())) self.__replyDict[reply.objectName()] = reply reply.downloadProgress.connect(self.__downloadProgress) - reply.finished.connect( lambda : self.__saveFile(release)) + reply.finished.connect(lambda: self.__saveFile(release)) return reply.objectName() - - def __downloadProgress(self,bytesReceived:int,bytesTotal:int): + + def __createRequest(self, url: str) -> tuple[QNetworkAccessManager, QNetworkRequest]: + """ + 创建一个QNetworkAccessManager和QNetworkRequest + Parameters + ---------- + url + 请求的url + Returns + ------- + tuple[QNetworkAccessManager, QNetworkRequest] + 一个QNetworkAccessManager和QNetworkRequest + """ + nam = QNetworkAccessManager(self) + request = QNetworkRequest(QUrl(url)) + request.setAttribute( + QNetworkRequest.Attribute.FollowRedirectsAttribute, True) + return nam, request + + def __downloadProgress(self, bytesReceived: int, bytesTotal: int): """ 用于触发downloadReleaseAsyncProgressSignal信号 - + :param bytesReceived: 已经下载的字节数 :param bytesTotal: 需要下载的总字节数 """ - self.downloadReleaseAsyncProgressSignal.emit(bytesReceived,bytesTotal) - def __saveFile(self,release:Release): + self.downloadReleaseAsyncProgressSignal.emit(bytesReceived, bytesTotal) + + def __saveFile(self, release: ReleaseInfo): """ 从reply中读取数据并根据release.assets_name保存到临时文件夹 - + :param release: 需要保存的release :return: None """ - reply:QNetworkReply = self.sender() + reply: QNetworkReply = self.sender() if reply.error() == QNetworkReply.NetworkError.OperationCanceledError: pass elif reply.error() != QNetworkReply.NoError: @@ -385,17 +404,18 @@ def __saveFile(self,release:Release): else: # 临时文件夹 tempDir = tempfile.gettempdir() - path = os.path.join(tempDir,release.assets_name) + path = os.path.join(tempDir, release.assets_name) # 判断是否存在 if os.path.exists(path): os.remove(path) - with open(path,'wb') as f: + with open(path, 'wb') as f: f.write(reply.readAll()) self.downloadReleaseAsyncFinishSignal.emit(path) if reply.objectName() in self.__replyDict: self.__replyDict.pop(reply.objectName()) reply.deleteLater() - def compareVersion(self,v2:str) -> str: + + def compareVersion(self, v2: str) -> str: """和当前版本比较 Args: @@ -404,41 +424,55 @@ def compareVersion(self,v2:str) -> str: Returns: str: "<","=" or ">" """ - v = re.findall(self.__versionReStr,self.__version) + v = re.findall(self.__versionReStr, self.__version) if len(v) == 0: return '<' - v1 = re.findall(self.__versionReStr,v2) + v1 = re.findall(self.__versionReStr, v2) if len(v1) == 0: return '<' v1 = v1[0] v = v[0] - vlist = v.split('.') + vList = v.split('.') v1list = v1.split('.') - for i in range(len(vlist)): - if int(vlist[i]) > int(v1list[i]): + for i in range(len(vList)): + if int(vList[i]) > int(v1list[i]): return '<' - elif int(vlist[i]) < int(v1list[i]): + elif int(vList[i]) < int(v1list[i]): return '>' return '=' - - def error(self,errorStr:str): + + def error(self, errorStr: str): self.errorSignal.emit(errorStr) - + def closeAllRequest(self): """关闭所有的请求""" keys = list(self.__replyDict.keys()) for key in keys: self.__replyDict[key].abort() self.__replyDict.clear() - def closeRequest(self,id:str): + + def closeRequest(self, identification: str): """关闭单个请求 Args: - id (str): 请求的唯一标识 + identification (str): 请求的唯一标识 """ - if id in self.__replyDict: - self.__replyDict[id].abort() - self.__replyDict.pop(id) + if identification in self.__replyDict: + self.__replyDict[identification].abort() + self.__replyDict.pop(identification) + + @staticmethod + def removeAllTempFile(fileNameRe: str): + """删除临时文件夹中的安装包文件""" + tempDir = tempfile.gettempdir() + # 按照项目名称正则匹配 + for file in os.listdir(tempDir): + if re.match(fileNameRe, file): + os.remove(os.path.join(tempDir, file)) + + def removeDownloadFile(self): + """删除下载的文件""" + GitHub.removeAllTempFile(f".*{self.__repo}.*") if __name__ == '__main__': app = QCoreApplication([]) @@ -446,11 +480,11 @@ def closeRequest(self,id:str): "Github": "https://api.github.com/repos/", "gitee": "https://api.gitee.com/repos/", } - github = GitHub(SourceManager(data),"eee555","Metasweeper","3.1.9","(\d+\.\d+\.\d+)") - github.releasesAsyncSignal.connect(lambda x:print(x)) + github = GitHub(SourceManager(data), "eee555", + "Solvable-Minesweeper", "3.1.9", "(\d+\.\d+\.\d+)") + github.releasesAsyncSignal.connect(lambda x: print(x)) github.releases() # manager = SourceManager(data) # manager.quickSource.connect(lambda x,y:print(x,y)) # manager.checkSourceSpeed() app.exec_() - \ No newline at end of file