diff --git a/fsspec/spec.py b/fsspec/spec.py index da120e525..07d8cdca5 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -70,9 +70,12 @@ def __call__(cls, *args, **kwargs): strip_tokenize_options = { k: kwargs.pop(k) for k in cls._strip_tokenize_options if k in kwargs } - token = tokenize( - cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs - ) + if getattr(cls, "async_impl", False) and not kwargs.get("asynchronous", False): + token = tokenize(cls, cls._pid, *args, *extra_tokens, **kwargs) + else: + token = tokenize( + cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs + ) skip = kwargs.pop("skip_instance_cache", False) if os.getpid() != cls._pid: cls._cache.clear() diff --git a/fsspec/tests/test_cache_threads.py b/fsspec/tests/test_cache_threads.py new file mode 100644 index 000000000..eb279ea82 --- /dev/null +++ b/fsspec/tests/test_cache_threads.py @@ -0,0 +1,94 @@ +import threading + +import pytest + +import fsspec +from fsspec.asyn import AsyncFileSystem +from fsspec.registry import register_implementation +from fsspec.spec import AbstractFileSystem + + +class SyncDummyFS(AbstractFileSystem): + cachable = True + async_impl = False + protocol = "syncdummy" + + +class AsyncDummyFS(AsyncFileSystem): + cachable = True + protocol = "asyncdummy" + + +@pytest.fixture(autouse=True) +def dummy_fs_setup(): + register_implementation("syncdummy", SyncDummyFS) + register_implementation("asyncdummy", AsyncDummyFS) + SyncDummyFS.clear_instance_cache() + AsyncDummyFS.clear_instance_cache() + yield + SyncDummyFS.clear_instance_cache() + AsyncDummyFS.clear_instance_cache() + + +def test_async_fs_sync_mode_shares_instance(): + results = {} + lock = threading.Lock() + + def worker(thread_id): + with lock: + fs = fsspec.filesystem("asyncdummy", asynchronous=False) + results[thread_id] = id(fs) + + t1 = threading.Thread(target=worker, args=(1,)) + t2 = threading.Thread(target=worker, args=(2,)) + + t1.start() + t2.start() + t1.join() + t2.join() + + assert results[1] == results[2] + + +def test_async_fs_async_mode_does_not_share(): + results = {} + lock = threading.Lock() + barrier = threading.Barrier(2) + + def worker(thread_id): + barrier.wait() + with lock: + fs = fsspec.filesystem("asyncdummy", asynchronous=True) + results[thread_id] = id(fs) + + t1 = threading.Thread(target=worker, args=(1,)) + t2 = threading.Thread(target=worker, args=(2,)) + + t1.start() + t2.start() + t1.join() + t2.join() + + assert results[1] != results[2] + + +def test_sync_fs_does_not_share(): + results = {} + lock = threading.Lock() + barrier = threading.Barrier(2) + + def worker(thread_id): + barrier.wait() + with lock: + fs = fsspec.filesystem("syncdummy") + results[thread_id] = id(fs) + + t1 = threading.Thread(target=worker, args=(1,)) + t2 = threading.Thread(target=worker, args=(2,)) + + t1.start() + t2.start() + t1.join() + t2.join() + + assert results[1] != results[2]