Skip to content

Commit 308dee2

Browse files
committed
Fix FsspecFileIO.get_fs thread safety
`FsspecFileIO.get_fs` can be called by multiple threads when `ExecutorFactory` is used (for example by `DataScan.plan_files`). The base class of `fsspec` filesystem objects, `fsspec.spec.AbstractFileSystem`, internally caches instances through the `fsspec.spec._Cached` metaclass. The caching key used includes `threading.get_ident()`, making entries thread-local: https://github.com/fsspec/filesystem_spec/blame/f84b99f0d1f079f990db1a219b74df66ab3e7160/fsspec/spec.py#L71 The `FsspecFileIO.get_fs` LRU cache (around `FsspecFileIO._get_fs`) breaks the thread-locality of the filesystem instances as it will return the same instance for different threads. One consequence of this is that for `s3fs.S3FileSystem`, HTTP connection pooling no longer occurs per thread (as is normal with `aiobotocore`), as the `aiobotocore` client object (containing the `aiohttp.ClientSession`) is stored on the `s3fs.S3FileSystem`. This change addresses this by making the `FsspecFileIO.get_fs` cache thread-local.
1 parent 83789f0 commit 308dee2

File tree

2 files changed

+61
-3
lines changed

2 files changed

+61
-3
lines changed

pyiceberg/io/fsspec.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import logging
2222
import os
23+
import threading
2324
from copy import copy
2425
from functools import lru_cache, partial
2526
from typing import (
@@ -370,7 +371,7 @@ class FsspecFileIO(FileIO):
370371
def __init__(self, properties: Properties):
371372
self._scheme_to_fs = {}
372373
self._scheme_to_fs.update(SCHEME_TO_FS)
373-
self.get_fs: Callable[[str], AbstractFileSystem] = lru_cache(self._get_fs)
374+
self._thread_locals = threading.local()
374375
super().__init__(properties=properties)
375376

376377
def new_input(self, location: str) -> FsspecInputFile:
@@ -416,6 +417,13 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
416417
fs = self.get_fs(uri.scheme)
417418
fs.rm(str_location)
418419

420+
def get_fs(self, scheme: str) -> AbstractFileSystem:
421+
"""Get a filesystem for a specific scheme, cached per thread."""
422+
if not hasattr(self._thread_locals, "get_fs_cached"):
423+
self._thread_locals.get_fs_cached = lru_cache(self._get_fs)
424+
425+
return self._thread_locals.get_fs_cached(scheme)
426+
419427
def _get_fs(self, scheme: str) -> AbstractFileSystem:
420428
"""Get a filesystem for a specific scheme."""
421429
if scheme not in self._scheme_to_fs:
@@ -425,10 +433,10 @@ def _get_fs(self, scheme: str) -> AbstractFileSystem:
425433
def __getstate__(self) -> Dict[str, Any]:
426434
"""Create a dictionary of the FsSpecFileIO fields used when pickling."""
427435
fileio_copy = copy(self.__dict__)
428-
fileio_copy["get_fs"] = None
436+
del fileio_copy["_thread_locals"]
429437
return fileio_copy
430438

431439
def __setstate__(self, state: Dict[str, Any]) -> None:
432440
"""Deserialize the state into a FsSpecFileIO instance."""
433441
self.__dict__ = state
434-
self.get_fs = lru_cache(self._get_fs)
442+
self._thread_locals = threading.local()

tests/io/test_fsspec.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
import os
1919
import pickle
2020
import tempfile
21+
import threading
2122
import uuid
23+
from typing import List
2224
from unittest import mock
2325

2426
import pytest
2527
from botocore.awsrequest import AWSRequest
2628
from fsspec.implementations.local import LocalFileSystem
29+
from fsspec.spec import AbstractFileSystem
2730
from requests_mock import Mocker
2831

2932
from pyiceberg.exceptions import SignError
@@ -54,6 +57,53 @@ def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: Fsspe
5457
pytest.fail("Failed to write to file without parent directory")
5558

5659

60+
def test_fsspec_get_fs_instance_per_thread_caching(fsspec_fileio: FsspecFileIO) -> None:
61+
"""Test that filesystem instances are cached per-thread by `FsspecFileIO.get_fs`"""
62+
fs_instances: List[AbstractFileSystem] = []
63+
64+
thread_started_events: List[threading.Event] = [threading.Event() for _ in range(2)]
65+
start_work_events: List[threading.Event] = [threading.Event() for _ in range(2)]
66+
67+
def get_fs(thread_started_event: threading.Event, start_work_event: threading.Event) -> None:
68+
# Tell the main thread the worker thread has started running
69+
thread_started_event.set()
70+
71+
# Wait to be told to actually retrieve the filesystem instances
72+
start_work_event.wait()
73+
74+
# Call twice to ensure caching within the same thread
75+
for _ in range(2):
76+
fs_instances.append(fsspec_fileio.get_fs("file"))
77+
78+
threads = [
79+
threading.Thread(target=get_fs, args=[thread_started_event, start_work_event])
80+
for thread_started_event, start_work_event in zip(thread_started_events, start_work_events)
81+
]
82+
83+
threads[0].start()
84+
threads[1].start()
85+
86+
# Wait for both threads to start as we want to ensure distinct `threading.get_ident()` values that are used in
87+
# `fsspec.spec.AbstractFileSystem`s cache keys.
88+
for thread_started_event in thread_started_events:
89+
thread_started_event.wait()
90+
91+
# Get the filesystem instances in the first thread and wait for completion
92+
start_work_events[0].set()
93+
threads[0].join()
94+
95+
# Get the filesystem instances in the second thread and wait for completion
96+
start_work_events[1].set()
97+
threads[1].join()
98+
99+
# Same thread, same instance
100+
assert fs_instances[0] is fs_instances[1]
101+
assert fs_instances[2] is fs_instances[3]
102+
103+
# Different threads, different instances
104+
assert fs_instances[0] is not fs_instances[2]
105+
106+
57107
@pytest.mark.s3
58108
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
59109
"""Test creating a new input file from a fsspec file-io"""

0 commit comments

Comments
 (0)