Coverage for hiperta_stream/hiperta_stream_start.py: 0%
82 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-12-11 13:29 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-12-11 13:29 +0000
1# P. Aubert & E. Garcia - Nov 2020, lock-down Chap. II
4import argparse
5import logging
6import pathlib
7import shlex
8import subprocess as sp
9from collections import namedtuple
10from pathlib import Path
12from hiperta_stream.config.configuration import (
13 DL1toDL3PipelineParameters,
14 ObservationParameters,
15 ReconstructionManagerConfiguration,
16 _delete_extra,
17 write_yaml_config,
18)
19from hiperta_stream.utils.file_handling_utils import copy_file_into_dir
20from hiperta_stream.utils.logging_utils import init_logging
21from hiperta_stream.utils.subprocess_utils import (
22 subprocess_run_and_raise_exception_on_error,
23)
24from hiperta_stream.utils.substring_substitution_utils import (
25 dict_defined_substring_substitution,
26)
27from ruamel.yaml import YAML
30def submit_r0_dl1_job(
31 config,
32 config_paths,
33 process_idx,
34):
35 """Submit the r0dl1 jobs to slurm.
37 Notes
38 -----
39 The R0->DL1 job is submitted on the `process_idx % total_nb_nodes` node.
41 Parameters
42 ----------
43 config : hiperta_stream.config.configuration.ReconstructionManagerConfiguration
44 Reconstruction manager configuration
45 config_paths : dict
46 Dictionary containing the paths to the written configuration for the processing pipeline steps
47 process_idx : int
48 Index of the R0->DL1 job (eg when using 4 lines: 0, 1, 2 or 3)}
49 """
51 r0_dl1_log_filename = Path(
52 config.r0_dl1_params.r0_dl1_log_dir,
53 config.r0_dl1_params.r0_dl1_log_file.replace("§{processIndex}", str(process_idx)),
54 ).absolute()
55 r0_dl1_job_name = config.r0_dl1_params.r0_dl1_job_name.replace("§{processIndex}", str(process_idx))
56 # flattened list of all available nodes and their reservation
57 NodeInfo = namedtuple("NodeInfo", ["nodename", "reservation"])
58 slurm_nodes_info = [
59 NodeInfo(node, reservation)
60 for reservation, reservation_nodelist in config.observation_config.slurm_nodelists.items()
61 for node in reservation_nodelist
62 ]
64 r0dl1_cmd = " ".join(
65 [
66 "hiperta_stream_r0_dl1",
67 "--input",
68 config.observation_config.data_stream_connections[process_idx].hostname,
69 "--port",
70 str(config.observation_config.data_stream_connections[process_idx].port),
71 "--output",
72 str(Path(config.observation_config.dl1_dir, config.r0_dl1_params.dl1_filename).absolute()),
73 "--confighdf5",
74 str(Path(config.r0_dl1_params.base_structure_file_path).absolute()),
75 "--outputFileIndexOffset",
76 str(0),
77 "--nbeventperhdf5",
78 str(config.r0_dl1_params.nb_event_per_hdf5),
79 "--nbmaxmessage",
80 str(config.r0_dl1_params.nbMaxMessage),
81 "--configcut",
82 str(Path(config_paths["r0_dl1_yaml_config_path"]).absolute()),
83 "--totalNbProcess",
84 str(len(config.observation_config.data_stream_connections)),
85 "--processindex",
86 str(process_idx),
87 "--nbthread",
88 str(config.r0_dl1_params.nb_processing_threads),
89 "--nbeventperblock",
90 str(config.r0_dl1_params.nb_event_per_block),
91 "--rtalog",
92 str(r0_dl1_log_filename),
93 "--loglevel",
94 config.logging_level.name,
95 "--zfits" if config.r0_dl1_params.is_stream_read_zfit else "",
96 "--pedperslice" if config.r0_dl1_params.is_stream_real_data else "",
97 config.r0_dl1_params.hiperta_extra_args,
98 "--verbose" if config.r0_dl1_params.debug_parameters.verbose else "",
99 ]
100 )
102 sbatch_cmd = " ".join(
103 [
104 "sbatch",
105 "--parsable", # sbatch will only return the job ID
106 "--mem={}G".format(config.r0_dl1_params.r0_dl1_mem),
107 "-o",
108 str(r0_dl1_log_filename.with_suffix(".o").absolute()),
109 "-e",
110 str(r0_dl1_log_filename.with_suffix(".e").absolute()),
111 *(["-A", config.pipeline_config.slurm_account] if config.pipeline_config.slurm_account else [""]),
112 *(["-p", config.pipeline_config.slurm_queue] if config.pipeline_config.slurm_queue else [""]),
113 *(
114 ["--reservation", slurm_nodes_info[process_idx % len(slurm_nodes_info)].reservation]
115 if config.pipeline_config.slurm_reservation
116 else [""]
117 ),
118 "--ntasks={}".format(config.r0_dl1_params.nb_tasks),
119 "--nodelist={}".format(slurm_nodes_info[process_idx % len(slurm_nodes_info)].nodename),
120 "-J",
121 r0_dl1_job_name,
122 '--wrap="{}"'.format(r0dl1_cmd),
123 ]
124 )
126 if config.no_slurm:
127 logging.info(f"Running in {config.observation_config.log_dir}: gdb --args {r0dl1_cmd}")
128 sp.run(["gdb", "--args", *shlex.split(r0dl1_cmd)], cwd=str(config.observation_config.log_dir))
129 else:
130 logging.info(f"Submitting R0->DL1 job: {sbatch_cmd}")
131 job_id = subprocess_run_and_raise_exception_on_error(
132 shlex.split(sbatch_cmd),
133 "R0->DL1 job submission successful!",
134 "R0->DL1 job submission failed!",
135 error_level=logging.CRITICAL,
136 ).stdout.strip()
137 logging.info(f"R0->DL1 job id: {job_id}")
140def write_configuration(config, r0_dl1_training_config):
141 """Write or copy all configuration in log directory.
143 Parameters
144 ----------
145 config : hiperta_stream.config.configuration.ReconstructionManagerConfiguration
146 Reconstruction manager configuration
147 r0_dl1_training_config : dict
148 R0->DL1 training parameters
150 Returns
151 -------
152 dict
153 Paths to the written configurations.
154 """
156 log_dir = Path(config.observation_config.log_dir)
158 # log complete json configuration
159 with open(log_dir / "hiperta_stream_start_config.json", "w") as json_conf_file:
160 json_conf_file.write(config.model_dump_json(indent=4))
161 logging.info("Logging json configuration at {}".format(log_dir / "hiperta_stream_start_config.json"))
163 # Write yaml configuration (pydantic extra removed) for HiPeRTA
164 minimal_config = config.model_copy(deep=True)
165 _delete_extra(minimal_config)
166 yaml_config_path = log_dir / "hiperta_stream_configuration.yml"
167 write_yaml_config(yaml_config_path, minimal_config, r0_dl1_training_config)
168 logging.info("Using R0->DL1 yaml configuration at {}".format(yaml_config_path))
170 # Copy lstchain configuration for DL1->DL3 steps
171 lstchain_dl1_dl2_config_path = copy_file_into_dir(config.dl1_dl3_params.lstchain_dl1_dl2_config_file, log_dir)
172 logging.info("Using DL1->DL2 json configuration at {}".format(lstchain_dl1_dl2_config_path))
173 lstchain_dl2_dl3_config_path = copy_file_into_dir(config.dl1_dl3_params.lstchain_dl2_dl3_config_file, log_dir)
174 logging.info("Using DL2->DL3 json configuration at {}".format(lstchain_dl2_dl3_config_path))
176 # Copy base structure file
177 base_structure_path = copy_file_into_dir(config.r0_dl1_params.base_structure_file_path, log_dir)
178 logging.info("Using base structure file at {}".format(base_structure_path))
180 return {
181 "r0_dl1_yaml_config_path": yaml_config_path,
182 "lstchain_dl1_dl2_config_path": lstchain_dl1_dl2_config_path,
183 "lstchain_dl2_dl3_config_path": lstchain_dl2_dl3_config_path,
184 "base_structure_path": base_structure_path,
185 }
188def create_pipeline_output_and_log_directories(config):
189 """Create the output directories as defined in config."""
191 dirs_to_create = (
192 list(config.pipeline_config.log_directories.values())
193 + list(config.pipeline_config.output_directories.values())
194 # r0_dl1 log is not part of pipeline config as r0_dl1 submits the slurm pipeline
195 + [config.r0_dl1_params.r0_dl1_log_dir]
196 )
197 for dir_path in dirs_to_create:
198 Path(dir_path).mkdir(parents=True, exist_ok=True)
199 logging.info("Created directory {}".format(dir_path))
202def select_observation_model(config):
203 """Chose the reconstruction model to use for the observation.
205 Warnings
206 --------
207 This function choses the first model available in the model catalog. Smarter decision will
208 be implemented in a future release.
210 Parameters
211 ----------
212 config : dict
213 Complete configuration dictionary. Its "dl1_dl3_params" field will be updated with the
214 chosen model parameters.
216 Returns
217 -------
218 dl1_to_dl3_pipeline_params : dict
219 Dictionary containing the path to models and configuration expected by the slurm pipeline part of the config
220 r0_dl1_training_config : dict
221 R0 DL1 configuration dict during the model training.
222 """
224 base_path = pathlib.Path(config.reco_model_catalog.archives_base_path)
225 chosen_model = config.reco_model_catalog.archives[0]
226 chosen_model_folder = base_path / chosen_model.relative_path
227 chosen_model_r0_dl1_config = chosen_model_folder / chosen_model.r0_dl1_config
228 chosen_model_dl1_dl2_config = chosen_model_folder / chosen_model.dl1_dl2_config
229 chosen_model_dl2_dl3 = chosen_model.dl2_dl3_archives[0]
230 chosen_dl2_dl3_model_folder = chosen_model_folder / chosen_model_dl2_dl3.relative_path
231 chosen_model_dl2_dl3_config = chosen_dl2_dl3_model_folder / chosen_model_dl2_dl3.dl2_dl3_config
232 chosen_model_irf = chosen_dl2_dl3_model_folder / chosen_model_dl2_dl3.irf_file
234 dl1_to_dl3_pipeline_params = DL1toDL3PipelineParameters.model_validate(
235 {
236 "reco_random_forests_dir": str(chosen_model_folder),
237 "lstchain_dl1_dl2_config_file": str(chosen_model_dl1_dl2_config),
238 "lstchain_dl2_dl3_config_file": str(chosen_model_dl2_dl3_config),
239 "irf_file": str(chosen_model_irf),
240 }
241 )
242 with open(chosen_model_r0_dl1_config, "r") as r0_dl1_config_file:
243 yaml = YAML(typ="safe")
244 r0_dl1_training_config = yaml.load(r0_dl1_config_file)
246 return dl1_to_dl3_pipeline_params, r0_dl1_training_config
249def main():
250 init_logging(log_filename="hiperta_stream_start.log")
252 parser = argparse.ArgumentParser(
253 description="Launches the SAG reconstruction pipeline slurm jobs: "
254 "permanent r0dl1 jobs, and intermittent dl1dl2 and dl2dl3 jobs."
255 )
256 parser.add_argument(
257 "--static_config",
258 "-c",
259 action="store",
260 type=str,
261 dest="static_config",
262 help="Path to a JSON static configuration file.",
263 required=True,
264 )
265 parser.add_argument(
266 "--observation_config",
267 "-d",
268 action="store",
269 type=str,
270 dest="obs_config",
271 help="Path to the observation JSON configuration file.",
272 required=True,
273 )
274 args = parser.parse_args()
276 # Read static and observation configuration
277 with open(args.static_config, "r") as static_config_f, open(args.obs_config, "r") as obs_config_f:
278 config = ReconstructionManagerConfiguration.model_validate_json(static_config_f.read())
279 config.observation_config = ObservationParameters.model_validate_json(obs_config_f.read())
281 # Re-init logging to write into specified log file.
282 init_logging(config.logging_level.name, config.observation_config.reco_manager_log_file)
284 # Chose the reconstruction model to use based on the observation parameters
285 # and update the corresponding paths in config.
286 config.dl1_dl3_params, r0_dl1_training_config = select_observation_model(config)
288 # Perform bash variable substitution in config strings.
289 config = ReconstructionManagerConfiguration.model_validate(
290 dict_defined_substring_substitution(config.model_dump())
291 )
293 create_pipeline_output_and_log_directories(config)
295 # Write and log all configuration
296 config_paths = write_configuration(config, r0_dl1_training_config)
298 # Launch R0->DL1 jobs
299 for process_idx in range(len(config.observation_config.data_stream_connections)):
300 submit_r0_dl1_job(
301 config,
302 config_paths,
303 process_idx,
304 )
306 logging.info("Done starting R0->DL1 jobs.")
309if __name__ == "__main__":
310 main()