Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ def __call__(cls, *args, **kwargs):
strip_tokenize_options = {
k: kwargs.pop(k) for k in cls._strip_tokenize_options if k in kwargs
}
token = tokenize(
cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
)
if getattr(cls, "async_impl", False) and not kwargs.get("asynchronous", False):
token = tokenize(cls, cls._pid, *args, *extra_tokens, **kwargs)
else:
token = tokenize(
cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
)
skip = kwargs.pop("skip_instance_cache", False)
if os.getpid() != cls._pid:
cls._cache.clear()
Expand Down
94 changes: 94 additions & 0 deletions fsspec/tests/test_cache_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import threading

import pytest

import fsspec
from fsspec.asyn import AsyncFileSystem
from fsspec.registry import register_implementation
from fsspec.spec import AbstractFileSystem


class SyncDummyFS(AbstractFileSystem):
cachable = True
async_impl = False
protocol = "syncdummy"


class AsyncDummyFS(AsyncFileSystem):
cachable = True
protocol = "asyncdummy"


@pytest.fixture(autouse=True)
def dummy_fs_setup():
register_implementation("syncdummy", SyncDummyFS)
register_implementation("asyncdummy", AsyncDummyFS)
SyncDummyFS.clear_instance_cache()
AsyncDummyFS.clear_instance_cache()
yield
SyncDummyFS.clear_instance_cache()
AsyncDummyFS.clear_instance_cache()


def test_async_fs_sync_mode_shares_instance():
results = {}
lock = threading.Lock()

def worker(thread_id):
with lock:
fs = fsspec.filesystem("asyncdummy", asynchronous=False)
results[thread_id] = id(fs)

t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))

t1.start()
t2.start()
t1.join()
t2.join()

assert results[1] == results[2]


def test_async_fs_async_mode_does_not_share():
results = {}
lock = threading.Lock()
barrier = threading.Barrier(2)

def worker(thread_id):
barrier.wait()
with lock:
fs = fsspec.filesystem("asyncdummy", asynchronous=True)
results[thread_id] = id(fs)

t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))

t1.start()
t2.start()
t1.join()
t2.join()

assert results[1] != results[2]


def test_sync_fs_does_not_share():
results = {}
lock = threading.Lock()
barrier = threading.Barrier(2)

def worker(thread_id):
barrier.wait()
with lock:
fs = fsspec.filesystem("syncdummy")
results[thread_id] = id(fs)

t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))

t1.start()
t2.start()
t1.join()
t2.join()

assert results[1] != results[2]