2020import json
2121import logging
2222import os
23+ import threading
2324from copy import copy
2425from functools import lru_cache , partial
2526from typing import (
@@ -370,7 +371,7 @@ class FsspecFileIO(FileIO):
370371 def __init__ (self , properties : Properties ):
371372 self ._scheme_to_fs = {}
372373 self ._scheme_to_fs .update (SCHEME_TO_FS )
373- self .get_fs : Callable [[ str ], AbstractFileSystem ] = lru_cache ( self . _get_fs )
374+ self ._thread_locals = threading . local ( )
374375 super ().__init__ (properties = properties )
375376
376377 def new_input (self , location : str ) -> FsspecInputFile :
@@ -416,6 +417,13 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
416417 fs = self .get_fs (uri .scheme )
417418 fs .rm (str_location )
418419
420+ def get_fs (self , scheme : str ) -> AbstractFileSystem :
421+ """Get a filesystem for a specific scheme, cached per thread."""
422+ if not hasattr (self ._thread_locals , "get_fs_cached" ):
423+ self ._thread_locals .get_fs_cached = lru_cache (self ._get_fs )
424+
425+ return self ._thread_locals .get_fs_cached (scheme )
426+
419427 def _get_fs (self , scheme : str ) -> AbstractFileSystem :
420428 """Get a filesystem for a specific scheme."""
421429 if scheme not in self ._scheme_to_fs :
@@ -425,10 +433,10 @@ def _get_fs(self, scheme: str) -> AbstractFileSystem:
425433 def __getstate__ (self ) -> Dict [str , Any ]:
426434 """Create a dictionary of the FsSpecFileIO fields used when pickling."""
427435 fileio_copy = copy (self .__dict__ )
428- fileio_copy ["get_fs" ] = None
436+ del fileio_copy ["_thread_locals" ]
429437 return fileio_copy
430438
431439 def __setstate__ (self , state : Dict [str , Any ]) -> None :
432440 """Deserialize the state into a FsSpecFileIO instance."""
433441 self .__dict__ = state
434- self .get_fs = lru_cache ( self . _get_fs )
442+ self ._thread_locals = threading . local ( )
0 commit comments