Skip to content
Draft

CSCS #530

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 104 additions & 49 deletions lstmcpipe/config/paths_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class PathConfig:
Base class to generate a Path configuration for a production
"""

def __init__(self, prod_id):
def __init__(self, prod_id, base_dir="/fefs/aswg/data/mc"):
self.prod_id = prod_id
self.base_dir = base_dir
self.paths = {}
self.stages = []

Expand Down Expand Up @@ -84,12 +85,13 @@ class PathConfigProd5Trans80(PathConfig):
Standard paths configuration for a prod5_trans_80 MC production
"""

def __init__(self, prod_id, zenith='zenith_20deg'):
super().__init__(prod_id)
def __init__(self, prod_id, zenith='zenith_20deg', base_dir="/fefs/aswg/data/mc"):
super().__init__(prod_id, base_dir)
self.prod_id = prod_id
self.zenith = zenith
self.base_dir = (
'/fefs/aswg/data/mc/{data_level}/20200629_prod5_trans_80/{particle}/{zenith}/south_pointing/{prod_id}'
self.prod_dir = os.path.join(
self.base_dir,
'{data_level}/20200629_prod5_trans_80/{particle}/{zenith}/south_pointing/{prod_id}'
)
self.training_particles = ['gamma-diffuse', 'proton']
self.testing_particles = ['gamma', 'electron', 'proton', 'gamma-diffuse']
Expand Down Expand Up @@ -124,7 +126,7 @@ def _data_level_dir(self, prod_id, data_level, particle, gamma_src_offset='off0.
-------
str: path to directory
"""
base = self.base_dir.format(data_level=data_level, particle=particle, zenith=self.zenith, prod_id=prod_id)
base = self.prod_dir.format(data_level=data_level, particle=particle, zenith=self.zenith, prod_id=prod_id)
if particle == 'gamma':
base = os.path.join(base, gamma_src_offset)
return base
Expand Down Expand Up @@ -228,7 +230,7 @@ def merge_dl1(self):
return paths

def models_path(self):
p = self.base_dir.format(data_level='models', particle='', zenith=self.zenith, prod_id=self.prod_id).replace(
p = self.prod_dir.format(data_level='models', particle='', zenith=self.zenith, prod_id=self.prod_id).replace(
'/mc/', '/'
)

Expand Down Expand Up @@ -340,8 +342,8 @@ def path_dict(offset):


class PathConfigProd5Trans80DL1ab(PathConfigProd5Trans80):
def __init__(self, prod_id, source_prod_id, zenith='zenith_20deg', run_checker=True):
super(PathConfigProd5Trans80DL1ab, self).__init__(prod_id=prod_id, zenith=zenith)
def __init__(self, prod_id, source_prod_id, zenith='zenith_20deg', base_dir="/fefs/aswg/data/mc", run_checker=True):
super(PathConfigProd5Trans80DL1ab, self).__init__(prod_id=prod_id, zenith=zenith, base_dir=base_dir)
self.source_prod_id = source_prod_id
self.stages.remove('r0_to_dl1')
self.stages.remove('train_test_split')
Expand Down Expand Up @@ -388,11 +390,14 @@ class PathConfigAllSkyBase(PathConfig):
dataset_type: 'Training' or 'Testing'
"""

def __init__(self, prod_id, dec):
super().__init__(prod_id)
def __init__(self, prod_id, dec, base_dir="/fefs/aswg/data/mc"):
super().__init__(prod_id, base_dir)
self.prod_id = prod_id
self.dec = dec
self.base_dir = "/fefs/aswg/data/mc/{data_level}/AllSky/{prod_id}/{dataset_type}/{particle}/{dec}/{pointing}/"
self.prod_dir = os.path.join(
self.base_dir,
"{data_level}/AllSky/{prod_id}/{dataset_type}/{particle}/{dec}/{pointing}/"
)

self.paths = {}
self.stages = []
Expand Down Expand Up @@ -422,7 +427,7 @@ def _data_level_dir(self, prod_id, data_level, particle, pointing, dec, dataset_
if data_level not in ['models', 'DL1', 'DL2', 'IRF']:
raise ValueError("data_level should be DL1, DL2 or IRF")
return os.path.realpath(
self.base_dir.format(
self.prod_dir.format(
data_level=data_level,
particle=particle,
pointing=pointing,
Expand Down Expand Up @@ -466,7 +471,7 @@ def irf_dir(self, particle, pointing, dataset_type, dec):
)

def models_dir(self):
p = self.base_dir.format(
p = self.prod_dir.format(
data_level='models', particle='', pointing='', prod_id=self.prod_id, dataset_type='', dec=self.dec
).replace('/mc/', '/')
return os.path.realpath(p)
Expand Down Expand Up @@ -498,11 +503,12 @@ class PathConfigAllSkyTraining(PathConfigAllSkyBase):
Handles a single declination from R0 up to RF generation.
"""

def __init__(self, prod_id, dec):
super().__init__(prod_id, dec)
def __init__(self, prod_id, dec, base_dir="/fefs/aswg/data/mc"):
super().__init__(prod_id, dec, base_dir)
# dec must be read here and not later as a f-string, hence the + dec +
self.training_dir = (
"/fefs/aswg/data/mc/DL0/LSTProd2/TrainingDataset/{particle}/"
self.training_dir = os.path.join(
self.base_dir,
"DL0/LSTProd2/TrainingDataset/{particle}/"
+ dec
+ "/sim_telarray/{pointing}/output_v1.4/"
)
Expand Down Expand Up @@ -675,8 +681,8 @@ def train_pipe(self):


class PathConfigAllSkyTrainingWithSplit(PathConfigAllSkyTraining):
def __init__(self, prod_id, dec):
super().__init__(prod_id, dec)
def __init__(self, prod_id, dec, base_dir="/fefs/aswg/data/mc"):
super().__init__(prod_id, dec, base_dir)
self.stages.insert(1, 'train_test_split')

def dl1_diffuse_test_dir(self, pointing):
Expand Down Expand Up @@ -715,9 +721,12 @@ def merge_dl1(self):


class PathConfigAllSkyTesting(PathConfigAllSkyBase):
def __init__(self, prod_id, dec):
super().__init__(prod_id, dec)
self.testing_dir = "/fefs/aswg/data/mc/DL0/LSTProd2/TestDataset/sim_telarray/{pointing}/output_v1.4/"
def __init__(self, prod_id, dec, base_dir="/fefs/aswg/data/mc"):
super().__init__(prod_id, dec, base_dir)
self.testing_dir = os.path.join(
self.base_dir,
"DL0/LSTProd2/TestDataset/sim_telarray/{pointing}/output_v1.4/"
)
self.dataset_type = 'TestingDataset'
self.particle = 'Gamma'
self.stages = ['r0_to_dl1', 'merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs']
Expand Down Expand Up @@ -883,15 +892,15 @@ def dl2_to_irfs(self):


class PathConfigAllSkyTestingGammaDiffuse(PathConfigAllSkyTesting):
def __init__(self, prod_id, dec):
def __init__(self, prod_id, dec, base_dir="/fefs/aswg/data/mc"):
"""
This config must be used after a PathConfigAllSkyTrainingWithSplit has been generated and run.
It uses the test dataset of GammaDiffuse created by the train_test_split stage of PathConfigAllSkyTrainingWithSplit,
merges the nodes and runs the dl1_to_dl2 and dl2_to_irfs stages.
"""
super().__init__(prod_id, dec)
super().__init__(prod_id, dec, base_dir)
self.stages = ['merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs']
self.train_config = PathConfigAllSkyTrainingWithSplit(prod_id, dec)
self.train_config = PathConfigAllSkyTrainingWithSplit(prod_id, dec, base_dir)
# self.pointings = self.train_config.pointings
self.particle = 'GammaDiffuse'

Expand Down Expand Up @@ -940,22 +949,23 @@ def dl2_output_file(self, pointing):


class PathConfigAllSkyFull(PathConfig):
def __init__(self, prod_id, dec_list):
def __init__(self, prod_id, dec_list, base_dir="/fefs/aswg/data/mc"):
"""
Does training and testing for a list of declinations

Parameters
----------
prod_id: str
dec_list: [str]
base_dir: str
"""
super().__init__(prod_id)
super().__init__(prod_id, base_dir)
self.prod_id = prod_id
self.dec_list = dec_list
self.stages = ['r0_to_dl1', 'merge_dl1', 'train_pipe', 'dl1_to_dl2', 'dl2_to_irfs']

self.train_configs = {dec: PathConfigAllSkyTraining(prod_id, dec) for dec in dec_list}
self.test_configs = {dec: PathConfigAllSkyTesting(prod_id, dec) for dec in dec_list}
self.train_configs = {dec: PathConfigAllSkyTraining(prod_id, dec, base_dir) for dec in dec_list}
self.test_configs = {dec: PathConfigAllSkyTesting(prod_id, dec, base_dir) for dec in dec_list}

@property
def r0_to_dl1(self):
Expand Down Expand Up @@ -1031,7 +1041,7 @@ def plot_pointings(self, ax=None, projection='polar', add_grid3d=False, train_kw


class PathConfigAllSkyTrainingDL1ab(PathConfigAllSkyTraining):
def __init__(self, prod_id, source_prod_id, dec, run_checker=True):
def __init__(self, prod_id, source_prod_id, dec, base_dir="/fefs/aswg/data/mc", run_checker=True):
"""
Parameters
----------
Expand All @@ -1041,13 +1051,14 @@ def __init__(self, prod_id, source_prod_id, dec, run_checker=True):
the source prod ID (must exist)
dec: str
the declination
base_dir: str
run_checker: boolean
True to check if the source prod exists
"""
super().__init__(prod_id, dec)
super().__init__(prod_id, dec, base_dir)
self.stages = ['dl1ab', 'merge_dl1', 'train_pipe']
self.source_prod_id = source_prod_id
self.source_config = PathConfigAllSkyTraining(source_prod_id, dec)
self.source_config = PathConfigAllSkyTraining(source_prod_id, dec, base_dir)
if run_checker:
self.check_source_prod()

Expand Down Expand Up @@ -1077,7 +1088,7 @@ def dl1ab(self):


class PathConfigAllSkyTestingDL1ab(PathConfigAllSkyTesting):
def __init__(self, prod_id, source_prod_id, dec, run_checker=True):
def __init__(self, prod_id, source_prod_id, dec, base_dir="/fefs/aswg/data/mc", run_checker=True):
"""
Parameters
----------
Expand All @@ -1087,13 +1098,14 @@ def __init__(self, prod_id, source_prod_id, dec, run_checker=True):
the source prod ID (must exist)
dec: str
the declination
base_dir: str
run_checker: boolean
True to check if the source prod exists
"""
super().__init__(prod_id, dec)
super().__init__(prod_id, dec, base_dir)
self.stages = ['dl1ab', 'merge_dl1', 'dl1_to_dl2', 'dl2_to_irfs']
self.source_prod_id = source_prod_id
self.source_config = PathConfigAllSkyTesting(source_prod_id, dec)
self.source_config = PathConfigAllSkyTesting(source_prod_id, dec, base_dir)
if run_checker:
self.check_source_prod()

Expand Down Expand Up @@ -1121,7 +1133,7 @@ def dl1ab(self):


class PathConfigAllSkyFullDL1ab(PathConfigAllSkyFull):
def __init__(self, prod_id, source_prod_id, dec_list, run_checker=True):
def __init__(self, prod_id, source_prod_id, dec_list, base_dir="/fefs/aswg/data/mc", run_checker=True):
"""
Parameters
----------
Expand All @@ -1131,18 +1143,19 @@ def __init__(self, prod_id, source_prod_id, dec_list, run_checker=True):
the source prod ID (must exist)
dec_list: [str]
list of declinations
base_dir: str
run_checker: boolean
True to check if the source prod exists
"""
super().__init__(prod_id, dec_list)
super().__init__(prod_id, dec_list, base_dir)
self.source_prod_id = source_prod_id
self.stages = ['dl1ab', 'merge_dl1', 'train_pipe', 'dl1_to_dl2', 'dl2_to_irfs']
self.train_configs = {
dec: PathConfigAllSkyTrainingDL1ab(prod_id, source_prod_id, dec, run_checker=run_checker)
dec: PathConfigAllSkyTrainingDL1ab(prod_id, source_prod_id, dec, base_dir, run_checker=run_checker)
for dec in dec_list
}
self.test_configs = {
dec: PathConfigAllSkyTestingDL1ab(prod_id, source_prod_id, dec, run_checker=run_checker) for dec in dec_list
dec: PathConfigAllSkyTestingDL1ab(prod_id, source_prod_id, dec, base_dir, run_checker=run_checker) for dec in dec_list
}

@property
Expand All @@ -1156,7 +1169,7 @@ def dl1ab(self):


class PathConfigAllTrainTestDL1b(PathConfigAllSkyFullDL1ab):
def __init__(self, prod_id, source_prod_id, dec_list, run_checker=True):
def __init__(self, prod_id, source_prod_id, dec_list, base_dir="/fefs/aswg/data/mc", run_checker=True):
"""
Config for an allsky train-test analysis from an existing source prod.
It runs:
Expand All @@ -1165,13 +1178,13 @@ def __init__(self, prod_id, source_prod_id, dec_list, run_checker=True):
Note that in of source-dependent analysis,
missing src-dep parameters are recomputed on the fly during the train stage by lstchain.
"""
super().__init__(prod_id, source_prod_id, dec_list)
super().__init__(prod_id, source_prod_id, dec_list, base_dir, run_checker)
self.dec_list = dec_list
self.source_prod_id = source_prod_id
self.source_configs = PathConfigAllSkyFullDL1ab(
source_prod_id, source_prod_id, dec_list, run_checker=run_checker
source_prod_id, source_prod_id, dec_list, base_dir, run_checker=run_checker
)
self.target_configs = PathConfigAllSkyFullDL1ab(prod_id, source_prod_id, dec_list, run_checker=run_checker)
self.target_configs = PathConfigAllSkyFullDL1ab(prod_id, source_prod_id, dec_list, base_dir, run_checker=run_checker)
self.stages = ['train_pipe', 'dl1_to_dl2']
if run_checker:
self.check_source_prod()
Expand Down Expand Up @@ -1221,13 +1234,13 @@ def check_source_prod(self):


class PathConfigAllSkyFullSplitDiffuse(PathConfigAllSkyFull):
def __init__(self, prod_id, dec_list):
super().__init__(prod_id, dec_list)
def __init__(self, prod_id, dec_list, base_dir="/fefs/aswg/data/mc"):
super().__init__(prod_id, dec_list, base_dir)
self.stages = ['r0_to_dl1', 'train_test_split', 'merge_dl1', 'train_pipe', 'dl1_to_dl2', 'dl2_to_irfs']

self.train_configs = {dec: PathConfigAllSkyTrainingWithSplit(prod_id, dec) for dec in dec_list}
self.test_configs = {dec: PathConfigAllSkyTesting(prod_id, dec) for dec in dec_list}
self.test_diffuse_config = {dec: PathConfigAllSkyTestingGammaDiffuse(prod_id, dec) for dec in dec_list}
self.train_configs = {dec: PathConfigAllSkyTrainingWithSplit(prod_id, dec, base_dir) for dec in dec_list}
self.test_configs = {dec: PathConfigAllSkyTesting(prod_id, dec, base_dir) for dec in dec_list}
self.test_diffuse_config = {dec: PathConfigAllSkyTestingGammaDiffuse(prod_id, dec, base_dir) for dec in dec_list}

@property
def train_test_split(self):
Expand Down Expand Up @@ -1255,4 +1268,46 @@ def dl2_to_irfs(self):
paths = super().dl2_to_irfs
for dec in self.dec_list:
paths.extend(self.test_diffuse_config[dec].dl2_to_irfs)
return paths


class PathConfigAllSkyFullCSCS(PathConfigAllSkyFull):
def __init__(self, prod_id, dec_list, base_dir="$SCRATCH/data/mc"):
super().__init__(prod_id, dec_list, base_dir)
self.stages.insert(0, 'dcache_download')
self.stages.append('dcache_upload')
self.remote_base_dir = "/pnfs/cta.cscs.ch/lst/MC"

@property
def dcache_download(self):
paths = []
# Download Training Data
for dec in self.dec_list:
tc = self.train_configs[dec]
for particle in tc.training_particles:
local_path = os.path.join(self.base_dir, f"DL0/LSTProd2/TrainingDataset/{particle}/{dec}")
remote_path = os.path.join(self.remote_base_dir, f"DL0/LSTProd2/TrainingDataset/{particle}/{dec}")
paths.append({'input': remote_path, 'output': local_path})

# Download Testing Data
local_test_path = os.path.join(self.base_dir, "DL0/LSTProd2/TestDataset")
remote_test_path = os.path.join(self.remote_base_dir, "DL0/LSTProd2/TestDataset")
paths.append({'input': remote_test_path, 'output': local_test_path})

return paths

@property
def dcache_upload(self):
paths = []
# Upload DL1, DL2, IRF
for data_level in ['DL1', 'DL2', 'IRF']:
local_path = os.path.join(self.base_dir, f"mc/{data_level}/AllSky/{self.prod_id}")
remote_path = os.path.join(self.remote_base_dir, f"mc/{data_level}/AllSky/{self.prod_id}")
paths.append({'input': local_path, 'output': remote_path})

# Upload Models
local_models = os.path.join(self.base_dir, f"models/AllSky/{self.prod_id}")
remote_models = os.path.join(self.remote_base_dir, f"models/AllSky/{self.prod_id}")
paths.append({'input': local_models, 'output': remote_models})

return paths
Loading
Loading