Coverage for hiperta_stream/config/configuration.py: 84%
139 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-07-16 10:16 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-07-16 10:16 +0000
1import pathlib
2import warnings
3from typing import Dict, List, Literal, Union
5import numpy as np
6from hiperta_stream.utils.logging_utils import LOGGING_LEVELS
7from hiperta_stream.utils.substring_substitution_utils import (
8 dict_defined_substring_substitution,
9)
10from pydantic import BaseModel, Field, model_validator
11from ruamel.yaml import YAML
12from typing_extensions import Annotated
14_CONFIG_PLACEHOLDER_STR = "@@@@@"
17class ReconstructionDL1DL2ModelProperties(BaseModel, extra="allow"):
18 """Trained DL1->DL2 model properties useful to chose a model in the catalog for inference at run-time"""
20 zenith: float = Field(
21 ge=0.0, le=90.0, title="zenith", description="Model's training data's zenith in degree.", examples=[20.0]
22 )
25class ReconstructionDL2DL3ModelProperties(BaseModel, extra="allow"):
26 """Trained DL2->DL3 model properties usefull to chose a model in the catalog for inference at run-time."""
28 gammaness_cut: float = Field(
29 ge=0.0,
30 le=1.0,
31 title="Gammaness cut",
32 description="Events with a gammaness score lower than this value will be excluded from DL3 (between 0.0 and 1.0)",
33 examples=[0.5, 0.65],
34 )
37class DL2DL3ModelArchive(BaseModel, extra="allow"):
38 """DL2 DL3 model archive, including path to irf and associated configuration."""
40 relative_path: str = Field(
41 title="IRF directory",
42 description="Path to the directory containing the IRF file and configuration",
43 examples=["standard_gammaness_0_5"],
44 )
45 properties: ReconstructionDL2DL3ModelProperties = Field(
46 title="DL2->DL3 properties", description="DL2->DL3 model properties", examples=[{"gammaness_cut": 0.65}]
47 )
48 irf_file: str = Field(
49 title="IRF file path",
50 description="Path to IRF file (relative to IRF directory)",
51 examples=["irf_RTA_ghcut0.5.fits.gz"],
52 )
53 dl2_dl3_config: str = Field(
54 title="DL2->DL3 configuration",
55 description="Path to the DL2->DL3 configuration file (relative to IRF directory)",
56 examples=["lstchain_config_dl2_dl3_rta_0_5.json"],
57 )
60class ReconstructionModelArchive(BaseModel, extra="allow"):
61 """Trained model archive, including path to model files, models properties and available inference modes."""
63 relative_path: str = Field(
64 title="Random Forest Directory",
65 description="Path to the directory containing the random forests files, "
66 "relative to the archive catalog base path.",
67 examples=["RF1"],
68 )
69 properties: ReconstructionDL1DL2ModelProperties = Field(
70 title="Model properties", description="Set of values describing the model's nominal inference conditions"
71 )
72 r0_dl1_config: str = Field(
73 title="R0->DL1 training config",
74 description="Path to the R0->DL1 configuration used during the model training. (relative to RF directory)",
75 examples=["r0_dl1_training_config.yml"],
76 )
77 dl1_dl2_config: str = Field(
78 title="DL1->DL2 training config",
79 description="Path to the DL1->DL2 configuration used during the model training. (relative to RF directory)",
80 examples=["lstchain_standard_config_v063-RTA.json"],
81 )
82 dl2_dl3_archives: List[DL2DL3ModelArchive] = Field(
83 title="DL2->DL3 archive",
84 description="DL2->DL3 model archive with information about IRF, configuration, etc.",
85 examples=[
86 [
87 {
88 "relative_path": "standard_gammaness_0_5",
89 "properties": {"gammaness_cut": 0.65},
90 "irf_file": "irf_RTA_ghcut0.5.fits.gz",
91 "dl2_dl3_config": "lstchain_config_dl2_dl3_rta_0_5.json",
92 }
93 ]
94 ],
95 )
98class ReconstructionModelCatalog(BaseModel, extra="allow"):
99 """Model catalog containing trained model archives"""
101 archives_base_path: str = Field(
102 title="Catalog Directory",
103 description="Path to the directory containing all the trained models archives.",
104 examples=["~/trained_models/"],
105 )
106 archives: List[ReconstructionModelArchive] = Field(
107 title="Archived models", description="List of available archived models"
108 )
111class ProtoBufFieldParsingConfiguration(BaseModel, extra="allow"):
112 """Description of Protocol Buffer Event sub-fields"""
114 id: int = Field(gt=0, title="ID", description="Field ID", examples=[1, 4])
115 type: str = Field(title="Field Type", description="Field data type", examples=["uint64", "int8", "submessage"])
117 # With python > 3.7, from __future__ import annotations allows to the type directly instead of a literal string
118 # supported by pydantic: https://docs.pydantic.dev/latest/concepts/postponed_annotations/
119 # However, final integration in python standard is uncertain:
120 # https://mail.python.org/archives/list/python-dev@python.org/message/VIZEBX5EYMSYIJNDBF6DMUMZOCWHARSO/
121 # https://docs.python.org/3/library/__future__.html#id2
122 sub_message: Union[None, Dict[str, "ProtoBufFieldParsingConfiguration"]] = Field(
123 default=None,
124 title="Field sub-message",
125 description="Optional named sub-field (a named field inside this field)",
126 )
127 is_array: Union[None, bool] = Field(
128 default=None,
129 title="Field scalarity",
130 description="If true, Field contains a array of values. Otherwise a single scalar.",
131 examples=[True, False],
132 )
135class EventParsingConfiguration(BaseModel, extra="allow"):
136 """Description of Protocol Buffer Event field"""
138 Event: ProtoBufFieldParsingConfiguration = Field(
139 title="Event Field", description="Event Field in the protocol buffer messages"
140 )
143class DataStreamParsingConfiguration(BaseModel, extra="allow"):
144 """Description of Protocol Buffer messages"""
146 protobuf_stream_parsing_config: EventParsingConfiguration = Field(
147 title="Protobuf Messages Configuration",
148 description="Field ID's and types required to parse protocol buffer messages.",
149 )
152class DataStreamConnectionConfiguration(BaseModel, extra="allow"):
153 """Parameters to connect to data streamers"""
155 hostname: str = Field(
156 title="hostname",
157 description="Hostname on the network of the data streamer",
158 examples=["localhost", "tcs05-ib0"],
159 )
160 port: int = Field(
161 gt=0, le=65535, title="Port", description="Port on the network of the data streamer", examples=[25000, 3391]
162 )
165class ObservationParameters(BaseModel, extra="allow"):
166 """Observation run-time parameters received from SAG-Supervisor"""
168 sb_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = Field(
169 default=_CONFIG_PLACEHOLDER_STR,
170 title="Scheduling Block ID",
171 description="ID of the observation's scheduling block.",
172 examples=[12345],
173 )
175 obs_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = Field(
176 default=_CONFIG_PLACEHOLDER_STR,
177 title="Observation ID",
178 description="Id of the observation",
179 examples=[12345],
180 )
181 tel_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = Field(
182 default=_CONFIG_PLACEHOLDER_STR,
183 title="Telescope ID",
184 description="Telescope ID",
185 examples=[1],
186 )
188 RA_pointing: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[float, Field(ge=0.0, le=360.0)]] = Field(
189 default=_CONFIG_PLACEHOLDER_STR,
190 title="Pointing Rate Ascension",
191 description="Pointing Rate Ascension during the observation.",
192 examples=[20.0],
193 )
194 DEC_pointing: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[float, Field(ge=-90.0, le=90.0)]] = Field(
195 default=_CONFIG_PLACEHOLDER_STR,
196 title="Pointing Declination",
197 description="Pointing declination during the observation",
198 examples=[60.0],
199 )
200 dl1_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
201 default=_CONFIG_PLACEHOLDER_STR,
202 title="DL1 directory",
203 description="Path to the directory where to write the DL1 files.",
204 examples=["~/DL1"],
205 )
206 dl2_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
207 default=_CONFIG_PLACEHOLDER_STR,
208 title="DL2 directory",
209 description="Path to the directory where to write the DL2 files.",
210 examples=["~/DL2"],
211 )
212 dl3_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
213 default=_CONFIG_PLACEHOLDER_STR,
214 title="DL3 directory",
215 description="Path to the directory where to write the DL3 files.",
216 examples=["~/DL3"],
217 )
218 log_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
219 default=_CONFIG_PLACEHOLDER_STR,
220 title="Log directory",
221 description="Path to the directory where to write the log files.",
222 examples=["~/logs"],
223 )
224 reco_manager_log_file: str = Field(
225 default="hiperta_stream.log",
226 title="hiperta_stream_start log file",
227 description="Log file for reco manager entrypoint (hiperta_stream_start). "
228 "This path must NOT contain any string bash like string substitution"
229 "(opening the log file is the first thing reco-manager will do, before substituting strings)",
230 examples=["/fefs/user/test/hiperta_stream_test.log", "hiperta_stream_start_obs_id_12345"],
231 )
232 data_stream_connections: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], List[DataStreamConnectionConfiguration]] = (
233 Field(
234 default=_CONFIG_PLACEHOLDER_STR,
235 title="Stream Connections",
236 description="Parameters of the connections to data streamers",
237 examples=[[{"hostname": "tcs06", "port": 25000}]],
238 )
239 )
240 slurm_nodelists: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Dict[str, List[str]]] = Field(
241 default=_CONFIG_PLACEHOLDER_STR,
242 title="Slurm Node List",
243 description="List of the slurm nodes to use, per slurm reservation",
244 examples=[{"acada-daily": ["cp15", "cp16"], "rta-one-node": ["cp19"]}],
245 )
248class R0toDL1DebugParameters(BaseModel, extra="allow"):
249 """Debuging parameters of R0->DL1 program"""
251 keep_random_trigger_threshold: float = Field(
252 ge=0.0,
253 le=1.0,
254 default=1.0,
255 title="Random Trigger Probability",
256 description="Probability to keep events. If 0.0 all events are discarded, if 1.0 all events are kept.",
257 examples=[1.0],
258 )
259 verbose: bool = Field(default=False, title="Verbose Mode", description="If True: activate verbose output")
260 is_flush_log_disable: bool = Field(
261 title="Disable log flush", description="If true, flushing threads will not write logs.", examples=[False]
262 )
265class R0toDL1PipelineParameters(BaseModel, extra="allow"):
266 """R0->DL1 programs parameters"""
268 dl1_filename: str = Field(
269 title="DL1 filename",
270 description="DL1 data file name, relative to the DL1 directory. "
271 "HiPeRTA can substitute some variables at run-time.",
272 examples=[
273 "dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_line_idx_§{processIndex}_"
274 "thread_idx_§{threadIndex}_th_file_idx_§{processFileIndex}_file_idx_§{fileIndex}.h5"
275 ],
276 )
277 r0_dl1_log_dir: str = Field(
278 title="R0->DL1 log directory",
279 description="Directory where to write the R0->DL1 processing logs",
280 examples=["~/logs/r0_dl1/"],
281 )
282 r0_dl1_log_file: str = Field(
283 title="R0->DL1 log filename",
284 description="R0->DL1 log filename, relative to the R0->DL1 log directory."
285 "HiPeRTA can substitute some variables at run-time.",
286 examples=["r0_dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_§{processIndex}.log"],
287 )
288 r0_dl1_job_name: str = Field(
289 title="R0->DL1 jobname for slurm",
290 description="Job name of the R0->DL1 slurm jobs.",
291 examples=["@{tel_id}_§{processIndex}_r0_dl1"],
292 )
293 r0_dl1_mem: Union[None, int] = Field(
294 title="R0->DL1 memory",
295 description="Required memory for R0->DL1 jobs in GB (passed to slurm)",
296 examples=["1"],
297 default=None,
298 )
299 nb_event_per_hdf5: int = Field(
300 gt=0, title="Number of Events", description="Number of events to write per DL1 files.", examples=[20000]
301 )
302 is_stream_read_zfit: bool = Field(
303 title="Zfit mode", description="If true, expect the incoming data to be in zfit format.", examples=[True]
304 )
305 is_stream_real_data: bool = Field(
306 title="Real Calibration Data mode",
307 description="If true, set the --pedperslice option to r0->dl1 programs",
308 examples=[True],
309 )
310 is_stream_r1_zfit: bool = Field(
311 title="R1 mode",
312 description="If true, expect the incoming data to be calibrated 1 gain R1 data.",
313 examples=[True],
314 )
315 r1_scale: float = Field(
316 title="R1 scale",
317 description="Scaling value to apply to R1 data, when transforming it back to float from unsigned int"
318 " (which are used for compression reasons)",
319 examples=[5.0],
320 )
321 r1_offset: float = Field(
322 title="R1 offset",
323 description="Offset value to apply to R1 data, when transforming it back to float from unsigned int"
324 " (which are used for compression reasons)",
325 examples=[60.0],
326 )
327 nb_processing_threads: int = Field(
328 gt=0,
329 title="Number of Processing Threads",
330 description="Number of threads to start for each R0->DL1 process for processing",
331 examples=[2],
332 )
333 nb_event_per_block: int = Field(
334 gt=0,
335 title="Number of events per Processing Block",
336 description="Number of events per processing block before dividing them to each threads.",
337 examples=[2],
338 )
339 nb_tasks: int = Field(
340 gt=0,
341 title="Number of slurm tasks",
342 description="Number of cpu slurm should allocate to each R0->DL1 program",
343 examples=[3],
344 )
345 zmq_nb_thread: int = Field(
346 gt=0,
347 title="Numbe of ZMQ threads",
348 description="Number of threads ZMQ will start to receive data.",
349 examples=[1],
350 )
351 zmq_thread_affinity: int = Field(
352 ge=0,
353 title="ZQM Thread affinity",
354 description="ZMQ thread affinity to set which thread will handle re-connection",
355 examples=[1],
356 )
357 zmq_buffer_size: int = Field(
358 gt=0, title="ZMQ buffer size", description="ZMQ buffer size in bytes", examples=[40000000]
359 )
360 zmq_max_nb_message_per_buffer: int = Field(
361 gt=0,
362 title="ZMQ buffer max message number",
363 description="Maximum of messages to put in ZMQ buffer",
364 examples=[100],
365 )
366 base_structure_file_path: str = Field(
367 title="Base Structure File",
368 description="Path to base structure hdf5 file with instrument model",
369 examples=["~/base_structure.hdf5"],
370 )
371 refresh_stop_ms: int = Field(
372 gt=0,
373 title="Refresh Signal Period",
374 description="Amount of time between 2 checks of receiving stop signal, in ms",
375 examples=[1000],
376 )
377 refresh_perf_ms: int = Field(
378 gt=0,
379 title="Performance Stat Period",
380 description="Amount of time between to sending of statistics messages, in ms",
381 examples=[1000],
382 )
383 is_detach_dl1_flush: bool = Field(
384 title="Detach flush mode", description="If true, detach a new process to flush the DL1 files.", examples=[True]
385 )
386 keep_rejected_event: bool = Field(
387 title="Keep Rejected Events Mode",
388 description="If true, write even the events flagged as bad quality to files (ignored if nb_threads > 1)",
389 examples=[True],
390 )
391 nbMaxMessage: int = Field(
392 gt=0,
393 title="Maximum Number of Messages",
394 description="R0->DL1 program will stop after receiving this many messages.",
395 examples=[999999999999],
396 )
397 wait_detach_thread_to_finish: int = Field(
398 title="TimeOut for flushing threads",
399 description="Amount of time to wait for flushing threads, when stopping the analysis, in microseconds",
400 examples=[10000000],
401 )
402 debug_parameters: R0toDL1DebugParameters = Field(
403 title="Debugging Parameters", description="Debuging parameters of R0->DL1 program"
404 )
405 hiperta_extra_args: Union[None, str] = Field(
406 title="Extra arguments for hiperta",
407 description="List of extra arguments for HiPeRTA process",
408 default=None,
409 examples=["--verbose", "--loglevel DEBUG"],
410 )
412 @model_validator(mode="after")
413 def check_min_nb_event_per_block_and_n_tasks(self):
414 """Check that the number of threads, slurm tasks and event per blocks is consistent"""
415 if self.nb_event_per_block < self.nb_processing_threads:
416 warnings.warn(
417 "nb_event_per_block {} is less than the number of processing threads {}!\n"
418 "Setting it to {}!".format(
419 self.nb_event_per_block, self.nb_processing_threads, self.nb_processing_threads
420 )
421 )
422 self.nb_event_per_block = self.nb_processing_threads
423 if self.nb_tasks < self.zmq_nb_thread + self.nb_processing_threads:
424 warnings.warn(
425 "nb_tasks (number of slurm tasks for r0->dl1) is less than zmq_nb_thread {} + nb_processing_threads {}!\n"
426 "Setting it to {}!".format(
427 self.nb_tasks,
428 self.zmq_nb_thread,
429 self.nb_processing_threads,
430 self.zmq_nb_thread + self.nb_processing_threads,
431 )
432 )
433 self.nb_tasks = self.zmq_nb_thread + self.nb_processing_threads
434 return self
436 @model_validator(mode="after")
437 def compute_r0_dl1_mem_if_not_set(self):
438 """Compute a value for the memory requirement of R0->DL1 jobs if it was not set."""
439 if self.r0_dl1_mem is None:
440 self.r0_dl1_mem = (
441 int(abs(np.ceil((self.zmq_max_nb_message_per_buffer + self.nb_event_per_hdf5 * 2) * 360e3 / 1e9))) + 2
442 )
443 return self
446class DL1toDL3PipelineParameters(BaseModel, extra="allow"):
447 """Parameters for the DL1->DL3 jobs"""
449 reco_random_forests_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
450 default=_CONFIG_PLACEHOLDER_STR,
451 title="Random Forest Directory",
452 description="Path to the directory containing the random forest files",
453 examples=["~/RFS"],
454 )
455 lstchain_dl1_dl2_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
456 default=_CONFIG_PLACEHOLDER_STR,
457 title="DL1->DL2 configuration file",
458 description="Path to the configuration file for lstchain DL1->DL2",
459 examples=["~/RFS/lstchain_standard.json"],
460 )
461 lstchain_dl2_dl3_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
462 default=_CONFIG_PLACEHOLDER_STR,
463 title="DL2->DL3 configuration file",
464 description="Path to the configuration file for lstchain DL2->DL3",
465 examples=["~/RFS/lstchain_dl2_dl3.json"],
466 )
467 irf_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
468 default=_CONFIG_PLACEHOLDER_STR,
469 title="IRF File",
470 description="Path to the IRF file to add in DL3 files.",
471 examples=["~/RFS/irf.fits"],
472 )
475class SlurmJobConfiguration(BaseModel, extra="allow"):
476 """Slurm Pipeline Description"""
478 job_cmd: str = Field(
479 title="Job Command passed to slurm using --wrap",
480 description="Job command passed to slurm using --wrap",
481 examples=["echo hostname"],
482 )
483 slurm_job_parameters: str = Field(
484 title="Slurm Job Parameters",
485 description="String passed to slurm to set job parameters",
486 examples=["-A ctao-n-acada -p short"],
487 )
488 name_prefix: str = Field(
489 title="Slurm Job Name prefix",
490 description="Prefix for the slurm job name. It is prefixed to the SlurmJobConfiguration name by HiPeRTA.",
491 examples=["@{tel_id}_§{processIndex}_"],
492 )
493 dependencies: Union[None, List[str]] = Field(
494 default=None,
495 title="Job Dependencies",
496 description="Pipeline stage name that are dependencies of this job",
497 examples=["dl1reorg", "dl1dl2"],
498 )
501class SlurmPipelineConfiguration(BaseModel, extra="allow"):
502 """Pipeline Definition"""
504 slurm_account: str = Field(
505 title="Slurm account", description="Slurm account to use to submit jobs", examples=["ctao-n-acada"]
506 )
507 slurm_queue: str = Field(
508 title="Slurm queue", description="Slurm queue to use when submitting jobs", examples=["short"]
509 )
510 slurm_reservation: str = Field(
511 title="Slurm reservation",
512 description="Slurm reservation to use when submitting jobs",
513 examples=["acada_daily"],
514 )
516 slurm_jobs: Dict[str, SlurmJobConfiguration] = Field(
517 title="Slurm Pipeline", description="Pipeline description: stage_name: Slurm job"
518 )
519 output_directories: Dict[str, str] = Field(
520 title="Pipeline's steps output directories",
521 description="The directories where the pipeline jobs' output should be saved. "
522 "Note: DL1, DL2 and DL3 mandatory directories are specified in observation parameters. "
523 "This dictionary exists to allow to add other directories if required by pipeline steps.",
524 examples=[{}],
525 )
526 log_directories: Dict[str, str] = Field(
527 title="Pipeline's steps log directories",
528 description="The directories where the pipeline jobs' logs should be saved.",
529 examples=[
530 {
531 "dl1_alt_az_log_dir": "@{log_dir}/dl1_alt_az",
532 "dl1_reorg_log_dir": "@{log_dir}/dl1_reorg",
533 "dl1_dl2_log_dir": "@{log_dir}/dl1_dl2",
534 "dl2_dl3_log_dir": "@{log_dir}/dl2_dl3",
535 }
536 ],
537 )
540class ReconstructionManagerConfiguration(BaseModel, extra="allow"):
541 """Complete SAG-Reconstruction configuration"""
543 logging_level: LOGGING_LEVELS = Field(
544 title="Logging level",
545 description="Set the logging level of the reco-manager",
546 examples=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
547 )
548 no_slurm: bool = Field(
549 title="Deactivate slurm",
550 description="If True, sub-systems programs will be started with as subprocesses instead of slurm jobs",
551 examples=[False],
552 )
553 observation_config: ObservationParameters = Field(
554 title="Observation Parameters", description="Configuration of the observation parameters"
555 )
556 dl1_dl3_params: DL1toDL3PipelineParameters = Field(
557 title="DL1->DL3 Configuration", description="DL1->DL3 Pipeline parameters"
558 )
559 reco_model_catalog: ReconstructionModelCatalog = Field(
560 title="Reconstruction Model Catalog", description="Description of the available reconstruction models"
561 )
562 r0_dl1_params: R0toDL1PipelineParameters = Field(
563 title="R0->DL1 Pipeline Parameters", description="Parameters of the R0->DL1 programs"
564 )
565 protobuf_parsing_conf: DataStreamParsingConfiguration = Field(
566 title="ProtocolBuffer Message Parsing Configuration",
567 description="Parameters used to parse the stream's protocol buffers messages",
568 )
569 pipeline_config: SlurmPipelineConfiguration = Field(
570 title="Slurm Pipeline Configuration", description="DL1->DL3 slurm jobs configuration."
571 )
574def write_yaml_config(yaml_config_filename, reco_manager_config, r0_dl1_training_config):
575 """Dump the configuration into a yaml file to use with SAG-Reconstruction"""
576 yaml_dict = {
577 k: v
578 for config in [
579 {"logging_level": reco_manager_config.logging_level.name, "no_slurm": reco_manager_config.no_slurm},
580 reco_manager_config.observation_config.model_dump(exclude_none=True),
581 reco_manager_config.dl1_dl3_params.model_dump(exclude_none=True),
582 reco_manager_config.reco_model_catalog.model_dump(exclude_none=True),
583 r0_dl1_training_config,
584 reco_manager_config.r0_dl1_params.model_dump(exclude_none=True),
585 reco_manager_config.protobuf_parsing_conf.model_dump(exclude_none=True, exclude_unset=True),
586 reco_manager_config.pipeline_config.model_dump(exclude_none=True),
587 ]
588 for k, v in config.items()
589 }
590 with open(yaml_config_filename, "w") as yaml_config_file:
591 yaml = YAML()
592 yaml.indent(mapping=4, sequence=4, offset=2)
593 yaml.width = 2**16 # very large to not wrap lines
594 yaml.allow_unicode = True
595 yaml.sort_keys = False
596 yaml.dump(yaml_dict, yaml_config_file)
599def _delete_extra(model):
600 """Deletes all extra fields from a pydantic BaseModel.
602 This is usefull to dump a model without the extra parameters, when the model accepts the extra.
603 Required until https://github.com/pydantic/pydantic/issues/6150 or equivalent is implemented.
604 """
605 model.__pydantic_extra__ = {}
606 for _, field in model:
607 if isinstance(field, BaseModel):
608 _delete_extra(field)