Skip to content

Commit ba8802a

Browse files
committed
feat: 为所有存储后端实现流式文件下载
将文件下载方式从全量读取改为流式传输,减少内存占用并提高大文件下载性能 支持S3、OneDrive、OpenDAL和WebDAV存储后端的流式下载 添加错误处理和HTTP异常捕获
1 parent bf43727 commit ba8802a

File tree

1 file changed

+105
-49
lines changed

1 file changed

+105
-49
lines changed

core/storage.py

Lines changed: 105 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from core.settings import data_root, settings
2424
from apps.base.models import FileCodes, UploadChunk
2525
from core.utils import get_file_url, sanitize_filename
26-
from fastapi.responses import FileResponse
26+
from fastapi.responses import FileResponse, StreamingResponse
2727

2828

2929
class FileStorageInterface:
@@ -310,20 +310,34 @@ async def get_file_response(self, file_code: FileCodes):
310310
},
311311
ExpiresIn=3600,
312312
)
313-
tmp = io.BytesIO()
314-
async with aiohttp.ClientSession() as session:
315-
async with session.get(link) as resp:
316-
tmp.write(await resp.read())
317-
tmp.seek(0)
318-
content = tmp.read()
319-
tmp.close()
320-
return Response(
321-
content,
313+
314+
async def stream_generator():
315+
async with aiohttp.ClientSession() as session:
316+
async with session.get(link) as resp:
317+
if resp.status != 200:
318+
raise HTTPException(
319+
status_code=resp.status,
320+
detail=f"从S3获取文件失败: {resp.status}"
321+
)
322+
# 设置块大小(例如64KB)
323+
chunk_size = 65536
324+
while True:
325+
chunk = await resp.content.read(chunk_size)
326+
if not chunk:
327+
break
328+
yield chunk
329+
330+
from fastapi.responses import StreamingResponse
331+
headers = {
332+
"Content-Disposition": f'attachment; filename="{filename.encode("utf-8").decode("latin-1")}"'
333+
}
334+
return StreamingResponse(
335+
stream_generator(),
322336
media_type="application/octet-stream",
323-
headers={
324-
"Content-Disposition": f'attachment; filename="{filename.encode("utf-8").decode("latin-1")}"'
325-
},
337+
headers=headers
326338
)
339+
except HTTPException:
340+
raise
327341
except Exception:
328342
raise HTTPException(status_code=503, detail="服务代理下载异常,请稍后再试")
329343

@@ -602,20 +616,32 @@ async def get_file_response(self, file_code: FileCodes):
602616
link = await asyncio.to_thread(
603617
self._get_file_url, await file_code.get_file_path(), filename
604618
)
605-
tmp = io.BytesIO()
606-
async with aiohttp.ClientSession() as session:
607-
async with session.get(link) as resp:
608-
tmp.write(await resp.read())
609-
tmp.seek(0)
610-
content = tmp.read()
611-
tmp.close()
612-
return Response(
613-
content,
619+
620+
async def stream_generator():
621+
async with aiohttp.ClientSession() as session:
622+
async with session.get(link) as resp:
623+
if resp.status != 200:
624+
raise HTTPException(
625+
status_code=resp.status,
626+
detail=f"从OneDrive获取文件失败: {resp.status}"
627+
)
628+
chunk_size = 65536
629+
while True:
630+
chunk = await resp.content.read(chunk_size)
631+
if not chunk:
632+
break
633+
yield chunk
634+
635+
headers = {
636+
"Content-Disposition": f'attachment; filename="{filename.encode("utf-8").decode("latin-1")}"'
637+
}
638+
return StreamingResponse(
639+
stream_generator(),
614640
media_type="application/octet-stream",
615-
headers={
616-
"Content-Disposition": f'attachment; filename="{filename.encode("utf-8").decode("latin-1")}"'
617-
},
641+
headers=headers
618642
)
643+
except HTTPException:
644+
raise
619645
except Exception:
620646
raise HTTPException(status_code=503, detail="服务代理下载异常,请稍后再试")
621647

@@ -776,11 +802,35 @@ async def get_file_url(self, file_code: FileCodes):
776802
async def get_file_response(self, file_code: FileCodes):
777803
try:
778804
filename = file_code.prefix + file_code.suffix
779-
content = await self.operator.read(await file_code.get_file_path())
805+
# 尝试使用流式读取器
806+
try:
807+
# OpenDAL 可能提供 reader 方法返回一个异步读取器
808+
reader = await self.operator.reader(await file_code.get_file_path())
809+
except AttributeError:
810+
# 如果 reader 方法不存在,回退到全量读取(兼容旧版本)
811+
content = await self.operator.read(await file_code.get_file_path())
812+
headers = {
813+
"Content-Disposition": f'attachment; filename="{filename}"'
814+
}
815+
return Response(
816+
content, headers=headers, media_type="application/octet-stream"
817+
)
818+
819+
async def stream_generator():
820+
chunk_size = 65536
821+
while True:
822+
chunk = await reader.read(chunk_size)
823+
if not chunk:
824+
break
825+
yield chunk
826+
780827
headers = {
781-
"Content-Disposition": f'attachment; filename="{filename}"'}
782-
return Response(
783-
content, headers=headers, media_type="application/octet-stream"
828+
"Content-Disposition": f'attachment; filename="{filename}"'
829+
}
830+
return StreamingResponse(
831+
stream_generator(),
832+
media_type="application/octet-stream",
833+
headers=headers
784834
)
785835
except Exception as e:
786836
logger.info(e)
@@ -969,26 +1019,32 @@ async def get_file_response(self, file_code: FileCodes):
9691019
try:
9701020
filename = file_code.prefix + file_code.suffix
9711021
url = self._build_url(await file_code.get_file_path())
972-
async with aiohttp.ClientSession(headers={
973-
"Authorization": f"Basic {base64.b64encode(f'{settings.webdav_username}:{settings.webdav_password}'.encode()).decode()}"
974-
}) as session:
975-
async with session.get(url) as resp:
976-
if resp.status != 200:
977-
raise HTTPException(
978-
status_code=resp.status,
979-
detail=f"文件获取失败{resp.status}: {await resp.text()}",
980-
)
981-
# 读取内容到内存
982-
content = await resp.read()
983-
return Response(
984-
content=content,
985-
media_type=resp.headers.get(
986-
"Content-Type", "application/octet-stream"
987-
),
988-
headers={
989-
"Content-Disposition": f'attachment; filename="{filename.encode("utf-8").decode()}"'
990-
},
991-
)
1022+
1023+
async def stream_generator():
1024+
async with aiohttp.ClientSession(headers={
1025+
"Authorization": f"Basic {base64.b64encode(f'{settings.webdav_username}:{settings.webdav_password}'.encode()).decode()}"
1026+
}) as session:
1027+
async with session.get(url) as resp:
1028+
if resp.status != 200:
1029+
raise HTTPException(
1030+
status_code=resp.status,
1031+
detail=f"文件获取失败{resp.status}: {await resp.text()}",
1032+
)
1033+
chunk_size = 65536
1034+
while True:
1035+
chunk = await resp.content.read(chunk_size)
1036+
if not chunk:
1037+
break
1038+
yield chunk
1039+
1040+
headers = {
1041+
"Content-Disposition": f'attachment; filename="{filename.encode("utf-8").decode()}"'
1042+
}
1043+
return StreamingResponse(
1044+
stream_generator(),
1045+
media_type="application/octet-stream",
1046+
headers=headers
1047+
)
9921048
except aiohttp.ClientError as e:
9931049
raise HTTPException(
9941050
status_code=503, detail=f"WebDAV连接异常: {str(e)}")

0 commit comments

Comments
 (0)