Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion aiomultiprocess/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def __init__(
self.context = get_context()

self.scheduler = scheduler or RoundRobin()
self.process_count = max(1, processes or os.cpu_count() or 2)
self.process_count = max(1, processes or self._cpu_count() or 2)
self.queue_count = max(1, queuecount or 1)

if self.queue_count > self.process_count:
Expand Down Expand Up @@ -214,6 +214,39 @@ def init(self) -> None:
self.processes[self.create_worker(qid)] = qid
self.scheduler.register_process(qid)

def _cpu_count(self):

"""get the real cpu count considering container environment"""

def get_cpu_use_cgroup(cgroup_file_dir):
"""use cgroup"""
with open(cgroup_file_dir + '/cpu/cpu.cfs_quota_us') as f:
use_cpu_time = int(f.read())
if use_cpu_time == -1:
core_num = os.cpu_count()
else:
with open(cgroup_file_dir + '/cpu/cpu.cfs_period_us') as file:
per_cpu_time = int(file.read())
core_num = use_cpu_time / per_cpu_time
return round(core_num)

def get_cpu_use_os():
"""use os.cpu_count"""
return os.cpu_count()

cgroup_file_dir = '/sys/fs/cgroup'
docker_judge = '/.dockerenv'
if os.path.exists(docker_judge) and os.path.exists(cgroup_file_dir):
# under container environment
try:
core_num = get_cpu_use_cgroup(cgroup_file_dir)
except Exception:
core_num = get_cpu_use_os()
else:
core_num = get_cpu_use_os()

return core_num

async def loop(self) -> None:
"""
Maintain the pool of workers while open.
Expand Down