Coverage for hiperta_stream/config/configuration.py: 83%
139 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-12-11 13:29 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-12-11 13:29 +0000
1import pathlib
2import warnings
3from typing import Dict, List, Literal, Union
5import numpy as np
6from pydantic import BaseModel, Field, model_validator
7from ruamel.yaml import YAML
8from typing_extensions import Annotated
10from hiperta_stream.utils.logging_utils import LOGGING_LEVELS
11from hiperta_stream.utils.substring_substitution_utils import (
12 dict_defined_substring_substitution,
13)
15_CONFIG_PLACEHOLDER_STR = "@@@@@"
18class ReconstructionDL1DL2ModelProperties(BaseModel, extra="allow"):
19 """Trained DL1->DL2 model properties useful to chose a model in the catalog for inference at run-time"""
21 zenith: float = Field(
22 ge=0.0,
23 le=90.0,
24 title="zenith",
25 description="Model's training data's zenith in degree.",
26 examples=[20.0],
27 )
30class ReconstructionDL2DL3ModelProperties(BaseModel, extra="allow"):
31 """Trained DL2->DL3 model properties usefull to chose a model in the catalog for inference at run-time."""
33 gammaness_cut: float = Field(
34 ge=0.0,
35 le=1.0,
36 title="Gammaness cut",
37 description="Events with a gammaness score lower than this value will be excluded from DL3 (between 0.0 and 1.0)",
38 examples=[0.5, 0.65],
39 )
42class DL2DL3ModelArchive(BaseModel, extra="allow"):
43 """DL2 DL3 model archive, including path to irf and associated configuration."""
45 relative_path: str = Field(
46 title="IRF directory",
47 description="Path to the directory containing the IRF file and configuration",
48 examples=["standard_gammaness_0_5"],
49 )
50 properties: ReconstructionDL2DL3ModelProperties = Field(
51 title="DL2->DL3 properties",
52 description="DL2->DL3 model properties",
53 examples=[{"gammaness_cut": 0.65}],
54 )
55 irf_file: str = Field(
56 title="IRF file path",
57 description="Path to IRF file (relative to IRF directory)",
58 examples=["irf_RTA_ghcut0.5.fits.gz"],
59 )
60 dl2_dl3_config: str = Field(
61 title="DL2->DL3 configuration",
62 description="Path to the DL2->DL3 configuration file (relative to IRF directory)",
63 examples=["lstchain_config_dl2_dl3_rta_0_5.json"],
64 )
67class ReconstructionModelArchive(BaseModel, extra="allow"):
68 """Trained model archive, including path to model files, models properties and available inference modes."""
70 relative_path: str = Field(
71 title="Random Forest Directory",
72 description="Path to the directory containing the random forests files, "
73 "relative to the archive catalog base path.",
74 examples=["RF1"],
75 )
76 properties: ReconstructionDL1DL2ModelProperties = Field(
77 title="Model properties",
78 description="Set of values describing the model's nominal inference conditions",
79 )
80 r0_dl1_config: str = Field(
81 title="R0->DL1 training config",
82 description="Path to the R0->DL1 configuration used during the model training. (relative to RF directory)",
83 examples=["r0_dl1_training_config.yml"],
84 )
85 dl1_dl2_config: str = Field(
86 title="DL1->DL2 training config",
87 description="Path to the DL1->DL2 configuration used during the model training. (relative to RF directory)",
88 examples=["lstchain_standard_config_v063-RTA.json"],
89 )
90 dl2_dl3_archives: List[DL2DL3ModelArchive] = Field(
91 title="DL2->DL3 archive",
92 description="DL2->DL3 model archive with information about IRF, configuration, etc.",
93 examples=[
94 [
95 {
96 "relative_path": "standard_gammaness_0_5",
97 "properties": {"gammaness_cut": 0.65},
98 "irf_file": "irf_RTA_ghcut0.5.fits.gz",
99 "dl2_dl3_config": "lstchain_config_dl2_dl3_rta_0_5.json",
100 }
101 ]
102 ],
103 )
106class ReconstructionModelCatalog(BaseModel, extra="allow"):
107 """Model catalog containing trained model archives"""
109 archives_base_path: str = Field(
110 title="Catalog Directory",
111 description="Path to the directory containing all the trained models archives.",
112 examples=["~/trained_models/"],
113 )
114 archives: List[ReconstructionModelArchive] = Field(
115 title="Archived models", description="List of available archived models"
116 )
119class ProtoBufFieldParsingConfiguration(BaseModel, extra="allow"):
120 """Description of Protocol Buffer Event sub-fields"""
122 id: int = Field(gt=0, title="ID", description="Field ID", examples=[1, 4])
123 type: str = Field(
124 title="Field Type",
125 description="Field data type",
126 examples=["uint64", "int8", "submessage"],
127 )
129 # With python > 3.7, from __future__ import annotations allows to the type directly instead of a literal string
130 # supported by pydantic: https://docs.pydantic.dev/latest/concepts/postponed_annotations/
131 # However, final integration in python standard is uncertain:
132 # https://mail.python.org/archives/list/python-dev@python.org/message/VIZEBX5EYMSYIJNDBF6DMUMZOCWHARSO/
133 # https://docs.python.org/3/library/__future__.html#id2
134 sub_message: Union[None, Dict[str, "ProtoBufFieldParsingConfiguration"]] = Field(
135 default=None,
136 title="Field sub-message",
137 description="Optional named sub-field (a named field inside this field)",
138 )
139 is_array: Union[None, bool] = Field(
140 default=None,
141 title="Field scalarity",
142 description="If true, Field contains a array of values. Otherwise a single scalar.",
143 examples=[True, False],
144 )
147class EventParsingConfiguration(BaseModel, extra="allow"):
148 """Description of Protocol Buffer Event field"""
150 Event: ProtoBufFieldParsingConfiguration = Field(
151 title="Event Field", description="Event Field in the protocol buffer messages"
152 )
155class DataStreamParsingConfiguration(BaseModel, extra="allow"):
156 """Description of Protocol Buffer messages"""
158 protobuf_stream_parsing_config: EventParsingConfiguration = Field(
159 title="Protobuf Messages Configuration",
160 description="Field ID's and types required to parse protocol buffer messages.",
161 )
164class DataStreamConnectionConfiguration(BaseModel, extra="allow"):
165 """Parameters to connect to data streamers"""
167 hostname: str = Field(
168 title="hostname",
169 description="Hostname on the network of the data streamer",
170 examples=["localhost", "tcs05-ib0"],
171 )
172 port: int = Field(
173 gt=0,
174 le=65535,
175 title="Port",
176 description="Port on the network of the data streamer",
177 examples=[25000, 3391],
178 )
181class ObservationParameters(BaseModel, extra="allow"):
182 """Observation run-time parameters received from SAG-Supervisor"""
184 sb_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = (
185 Field(
186 default=_CONFIG_PLACEHOLDER_STR,
187 title="Scheduling Block ID",
188 description="ID of the observation's scheduling block.",
189 examples=[12345],
190 )
191 )
193 obs_id: Union[
194 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]
195 ] = Field(
196 default=_CONFIG_PLACEHOLDER_STR,
197 title="Observation ID",
198 description="Id of the observation",
199 examples=[12345],
200 )
201 tel_id: Union[
202 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]
203 ] = Field(
204 default=_CONFIG_PLACEHOLDER_STR,
205 title="Telescope ID",
206 description="Telescope ID",
207 examples=[1],
208 )
210 RA_pointing: Union[
211 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[float, Field(ge=0.0, le=360.0)]
212 ] = Field(
213 default=_CONFIG_PLACEHOLDER_STR,
214 title="Pointing Rate Ascension",
215 description="Pointing Rate Ascension during the observation.",
216 examples=[20.0],
217 )
218 DEC_pointing: Union[
219 Literal[f"{_CONFIG_PLACEHOLDER_STR}"],
220 Annotated[float, Field(ge=-90.0, le=90.0)],
221 ] = Field(
222 default=_CONFIG_PLACEHOLDER_STR,
223 title="Pointing Declination",
224 description="Pointing declination during the observation",
225 examples=[60.0],
226 )
227 dl1_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
228 default=_CONFIG_PLACEHOLDER_STR,
229 title="DL1 directory",
230 description="Path to the directory where to write the DL1 files.",
231 examples=["~/DL1"],
232 )
233 dl2_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
234 default=_CONFIG_PLACEHOLDER_STR,
235 title="DL2 directory",
236 description="Path to the directory where to write the DL2 files.",
237 examples=["~/DL2"],
238 )
239 dl3_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
240 default=_CONFIG_PLACEHOLDER_STR,
241 title="DL3 directory",
242 description="Path to the directory where to write the DL3 files.",
243 examples=["~/DL3"],
244 )
245 log_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
246 default=_CONFIG_PLACEHOLDER_STR,
247 title="Log directory",
248 description="Path to the directory where to write the log files.",
249 examples=["~/logs"],
250 )
251 reco_manager_log_file: str = Field(
252 default="hiperta_stream.log",
253 title="hiperta_stream_start log file",
254 description="Log file for reco manager entrypoint (hiperta_stream_start). "
255 "This path must NOT contain any string bash like string substitution"
256 "(opening the log file is the first thing reco-manager will do, before substituting strings)",
257 examples=[
258 "/fefs/user/test/hiperta_stream_test.log",
259 "hiperta_stream_start_obs_id_12345",
260 ],
261 )
262 data_stream_connections: Union[
263 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], List[DataStreamConnectionConfiguration]
264 ] = Field(
265 default=_CONFIG_PLACEHOLDER_STR,
266 title="Stream Connections",
267 description="Parameters of the connections to data streamers",
268 examples=[[{"hostname": "tcs06", "port": 25000}]],
269 )
270 slurm_nodelists: Union[
271 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Dict[str, List[str]]
272 ] = Field(
273 default=_CONFIG_PLACEHOLDER_STR,
274 title="Slurm Node List",
275 description="List of the slurm nodes to use, per slurm reservation",
276 examples=[{"acada-daily": ["cp15", "cp16"], "rta-one-node": ["cp19"]}],
277 )
280class R0toDL1DebugParameters(BaseModel, extra="allow"):
281 """Debuging parameters of R0->DL1 program"""
283 keep_random_trigger_threshold: float = Field(
284 ge=0.0,
285 le=1.0,
286 default=1.0,
287 title="Random Trigger Probability",
288 description="Probability to keep events. If 0.0 all events are discarded, if 1.0 all events are kept.",
289 examples=[1.0],
290 )
291 verbose: bool = Field(
292 default=False,
293 title="Verbose Mode",
294 description="If True: activate verbose output",
295 )
296 is_flush_log_disable: bool = Field(
297 title="Disable log flush",
298 description="If true, flushing threads will not write logs.",
299 examples=[False],
300 )
303class R0toDL1PipelineParameters(BaseModel, extra="allow"):
304 """R0->DL1 programs parameters"""
306 dl1_filename: str = Field(
307 title="DL1 filename",
308 description="DL1 data file name, relative to the DL1 directory. "
309 "HiPeRTA can substitute some variables at run-time.",
310 examples=[
311 "dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_line_idx_§{processIndex}_"
312 "thread_idx_§{threadIndex}_th_file_idx_§{processFileIndex}_file_idx_§{fileIndex}.h5"
313 ],
314 )
315 r0_dl1_log_dir: str = Field(
316 title="R0->DL1 log directory",
317 description="Directory where to write the R0->DL1 processing logs",
318 examples=["~/logs/r0_dl1/"],
319 )
320 r0_dl1_log_file: str = Field(
321 title="R0->DL1 log filename",
322 description="R0->DL1 log filename, relative to the R0->DL1 log directory."
323 "HiPeRTA can substitute some variables at run-time.",
324 examples=[
325 "r0_dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_§{processIndex}.log"
326 ],
327 )
328 r0_dl1_job_name: str = Field(
329 title="R0->DL1 jobname for slurm",
330 description="Job name of the R0->DL1 slurm jobs.",
331 examples=["@{tel_id}_§{processIndex}_r0_dl1"],
332 )
333 r0_dl1_mem: Union[None, int] = Field(
334 title="R0->DL1 memory",
335 description="Required memory for R0->DL1 jobs in GB (passed to slurm)",
336 examples=["1"],
337 default=None,
338 )
339 nb_event_per_hdf5: int = Field(
340 gt=0,
341 title="Number of Events",
342 description="Number of events to write per DL1 files.",
343 examples=[20000],
344 )
345 is_stream_read_zfit: bool = Field(
346 title="Zfit mode",
347 description="If true, expect the incoming data to be in zfit format.",
348 examples=[True],
349 )
350 is_stream_real_data: bool = Field(
351 title="Real Calibration Data mode",
352 description="If true, set the --pedperslice option to r0->dl1 programs",
353 examples=[True],
354 )
355 is_stream_r1_zfit: bool = Field(
356 title="R1 mode",
357 description="If true, expect the incoming data to be calibrated 1 gain R1 data.",
358 examples=[True],
359 )
360 r1_scale: float = Field(
361 title="R1 scale",
362 description="Scaling value to apply to R1 data, when transforming it back to float from unsigned int"
363 " (which are used for compression reasons)",
364 examples=[5.0],
365 )
366 r1_offset: float = Field(
367 title="R1 offset",
368 description="Offset value to apply to R1 data, when transforming it back to float from unsigned int"
369 " (which are used for compression reasons)",
370 examples=[60.0],
371 )
372 nb_processing_threads: int = Field(
373 gt=0,
374 title="Number of Processing Threads",
375 description="Number of threads to start for each R0->DL1 process for processing",
376 examples=[2],
377 )
378 nb_event_per_block: int = Field(
379 gt=0,
380 title="Number of events per Processing Block",
381 description="Number of events per processing block before dividing them to each threads.",
382 examples=[2],
383 )
384 nb_tasks: int = Field(
385 gt=0,
386 title="Number of slurm tasks",
387 description="Number of cpu slurm should allocate to each R0->DL1 program",
388 examples=[3],
389 )
390 zmq_nb_thread: int = Field(
391 gt=0,
392 title="Numbe of ZMQ threads",
393 description="Number of threads ZMQ will start to receive data.",
394 examples=[1],
395 )
396 zmq_thread_affinity: int = Field(
397 ge=0,
398 title="ZQM Thread affinity",
399 description="ZMQ thread affinity to set which thread will handle re-connection",
400 examples=[1],
401 )
402 zmq_buffer_size: int = Field(
403 gt=0,
404 title="ZMQ buffer size",
405 description="ZMQ buffer size in bytes",
406 examples=[40000000],
407 )
408 zmq_max_nb_message_per_buffer: int = Field(
409 gt=0,
410 title="ZMQ buffer max message number",
411 description="Maximum of messages to put in ZMQ buffer",
412 examples=[100],
413 )
414 base_structure_file_path: str = Field(
415 title="Base Structure File",
416 description="Path to base structure hdf5 file with instrument model",
417 examples=["~/base_structure.hdf5"],
418 )
419 refresh_stop_ms: int = Field(
420 gt=0,
421 title="Refresh Signal Period",
422 description="Amount of time between 2 checks of receiving stop signal, in ms",
423 examples=[1000],
424 )
425 refresh_perf_ms: int = Field(
426 gt=0,
427 title="Performance Stat Period",
428 description="Amount of time between to sending of statistics messages, in ms",
429 examples=[1000],
430 )
431 is_detach_dl1_flush: bool = Field(
432 title="Detach flush mode",
433 description="If true, detach a new process to flush the DL1 files.",
434 examples=[True],
435 )
436 keep_rejected_event: bool = Field(
437 title="Keep Rejected Events Mode",
438 description="If true, write even the events flagged as bad quality to files (ignored if nb_threads > 1)",
439 examples=[True],
440 )
441 nbMaxMessage: int = Field(
442 gt=0,
443 title="Maximum Number of Messages",
444 description="R0->DL1 program will stop after receiving this many messages.",
445 examples=[999999999999],
446 )
447 wait_detach_thread_to_finish: int = Field(
448 title="TimeOut for flushing threads",
449 description="Amount of time to wait for flushing threads, when stopping the analysis, in microseconds",
450 examples=[10000000],
451 )
452 debug_parameters: R0toDL1DebugParameters = Field(
453 title="Debugging Parameters",
454 description="Debuging parameters of R0->DL1 program",
455 )
456 hiperta_extra_args: Union[None, str] = Field(
457 title="Extra arguments for hiperta",
458 description="List of extra arguments for HiPeRTA process",
459 default=None,
460 examples=["--verbose", "--loglevel DEBUG"],
461 )
463 @model_validator(mode="after")
464 def check_min_nb_event_per_block_and_n_tasks(self):
465 """Check that the number of threads, slurm tasks and event per blocks is consistent"""
466 if self.nb_event_per_block < self.nb_processing_threads: 466 ↛ 467line 466 didn't jump to line 467, because the condition on line 466 was never true
467 warnings.warn(
468 "nb_event_per_block {} is less than the number of processing threads {}!\n"
469 "Setting it to {}!".format(
470 self.nb_event_per_block,
471 self.nb_processing_threads,
472 self.nb_processing_threads,
473 )
474 )
475 self.nb_event_per_block = self.nb_processing_threads
476 if self.nb_tasks < self.zmq_nb_thread + self.nb_processing_threads: 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true
477 warnings.warn(
478 "nb_tasks (number of slurm tasks for r0->dl1) {} is less than zmq_nb_thread {} + nb_processing_threads {}!\n"
479 "Setting it to {}!".format(
480 self.nb_tasks,
481 self.zmq_nb_thread,
482 self.nb_processing_threads,
483 self.zmq_nb_thread + self.nb_processing_threads,
484 )
485 )
486 self.nb_tasks = self.zmq_nb_thread + self.nb_processing_threads
487 return self
489 @model_validator(mode="after")
490 def compute_r0_dl1_mem_if_not_set(self):
491 """Compute a value for the memory requirement of R0->DL1 jobs if it was not set."""
492 if self.r0_dl1_mem is None: 492 ↛ 493line 492 didn't jump to line 493
493 self.r0_dl1_mem = (
494 int(
495 abs(
496 np.ceil(
497 (
498 self.zmq_max_nb_message_per_buffer
499 + self.nb_event_per_hdf5 * 2
500 )
501 * 360e3
502 / 1e9
503 )
504 )
505 )
506 + 2
507 )
508 return self
511class DL1toDL3PipelineParameters(BaseModel, extra="allow"):
512 """Parameters for the DL1->DL3 jobs"""
514 reco_random_forests_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
515 default=_CONFIG_PLACEHOLDER_STR,
516 title="Random Forest Directory",
517 description="Path to the directory containing the random forest files",
518 examples=["~/RFS"],
519 )
520 lstchain_dl1_dl2_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = (
521 Field(
522 default=_CONFIG_PLACEHOLDER_STR,
523 title="DL1->DL2 configuration file",
524 description="Path to the configuration file for lstchain DL1->DL2",
525 examples=["~/RFS/lstchain_standard.json"],
526 )
527 )
528 lstchain_dl2_dl3_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = (
529 Field(
530 default=_CONFIG_PLACEHOLDER_STR,
531 title="DL2->DL3 configuration file",
532 description="Path to the configuration file for lstchain DL2->DL3",
533 examples=["~/RFS/lstchain_dl2_dl3.json"],
534 )
535 )
536 irf_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field(
537 default=_CONFIG_PLACEHOLDER_STR,
538 title="IRF File",
539 description="Path to the IRF file to add in DL3 files.",
540 examples=["~/RFS/irf.fits"],
541 )
544class SlurmJobConfiguration(BaseModel, extra="allow"):
545 """Slurm Pipeline Description"""
547 job_cmd: str = Field(
548 title="Job Command passed to slurm using --wrap",
549 description="Job command passed to slurm using --wrap",
550 examples=["echo hostname"],
551 )
552 slurm_job_parameters: str = Field(
553 title="Slurm Job Parameters",
554 description="String passed to slurm to set job parameters",
555 examples=["-A ctao-n-acada -p short"],
556 )
557 name_prefix: str = Field(
558 title="Slurm Job Name prefix",
559 description="Prefix for the slurm job name. It is prefixed to the SlurmJobConfiguration name by HiPeRTA.",
560 examples=["@{tel_id}_§{processIndex}_"],
561 )
562 dependencies: Union[None, List[str]] = Field(
563 default=None,
564 title="Job Dependencies",
565 description="Pipeline stage name that are dependencies of this job",
566 examples=["dl1reorg", "dl1dl2"],
567 )
570class SlurmPipelineConfiguration(BaseModel, extra="allow"):
571 """Pipeline Definition"""
573 slurm_account: str = Field(
574 title="Slurm account",
575 description="Slurm account to use to submit jobs",
576 examples=["ctao-n-acada"],
577 )
578 slurm_queue: str = Field(
579 title="Slurm queue",
580 description="Slurm queue to use when submitting jobs",
581 examples=["short"],
582 )
583 slurm_reservation: str = Field(
584 title="Slurm reservation",
585 description="Slurm reservation to use when submitting jobs",
586 examples=["acada_daily"],
587 )
589 slurm_jobs: Dict[str, SlurmJobConfiguration] = Field(
590 title="Slurm Pipeline",
591 description="Pipeline description: stage_name: Slurm job",
592 )
593 output_directories: Dict[str, str] = Field(
594 title="Pipeline's steps output directories",
595 description="The directories where the pipeline jobs' output should be saved. "
596 "Note: DL1, DL2 and DL3 mandatory directories are specified in observation parameters. "
597 "This dictionary exists to allow to add other directories if required by pipeline steps.",
598 examples=[{}],
599 )
600 log_directories: Dict[str, str] = Field(
601 title="Pipeline's steps log directories",
602 description="The directories where the pipeline jobs' logs should be saved.",
603 examples=[
604 {
605 "dl1_alt_az_log_dir": "@{log_dir}/dl1_alt_az",
606 "dl1_reorg_log_dir": "@{log_dir}/dl1_reorg",
607 "dl1_dl2_log_dir": "@{log_dir}/dl1_dl2",
608 "dl2_dl3_log_dir": "@{log_dir}/dl2_dl3",
609 }
610 ],
611 )
614class ReconstructionManagerConfiguration(BaseModel, extra="allow"):
615 """Complete SAG-Reconstruction configuration"""
617 logging_level: LOGGING_LEVELS = Field(
618 title="Logging level",
619 description="Set the logging level of the reco-manager",
620 examples=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
621 )
622 no_slurm: bool = Field(
623 title="Deactivate slurm",
624 description="If True, sub-systems programs will be started with as subprocesses instead of slurm jobs",
625 examples=[False],
626 )
627 observation_config: ObservationParameters = Field(
628 title="Observation Parameters",
629 description="Configuration of the observation parameters",
630 )
631 dl1_dl3_params: DL1toDL3PipelineParameters = Field(
632 title="DL1->DL3 Configuration", description="DL1->DL3 Pipeline parameters"
633 )
634 reco_model_catalog: ReconstructionModelCatalog = Field(
635 title="Reconstruction Model Catalog",
636 description="Description of the available reconstruction models",
637 )
638 r0_dl1_params: R0toDL1PipelineParameters = Field(
639 title="R0->DL1 Pipeline Parameters",
640 description="Parameters of the R0->DL1 programs",
641 )
642 protobuf_parsing_conf: DataStreamParsingConfiguration = Field(
643 title="ProtocolBuffer Message Parsing Configuration",
644 description="Parameters used to parse the stream's protocol buffers messages",
645 )
646 pipeline_config: SlurmPipelineConfiguration = Field(
647 title="Slurm Pipeline Configuration",
648 description="DL1->DL3 slurm jobs configuration.",
649 )
652def write_yaml_config(
653 yaml_config_filename, reco_manager_config, r0_dl1_training_config
654):
655 """Dump the configuration into a yaml file to use with SAG-Reconstruction"""
656 yaml_dict = {
657 k: v
658 for config in [
659 {
660 "logging_level": reco_manager_config.logging_level.name,
661 "no_slurm": reco_manager_config.no_slurm,
662 },
663 reco_manager_config.observation_config.model_dump(exclude_none=True),
664 reco_manager_config.dl1_dl3_params.model_dump(exclude_none=True),
665 reco_manager_config.reco_model_catalog.model_dump(exclude_none=True),
666 r0_dl1_training_config,
667 reco_manager_config.r0_dl1_params.model_dump(exclude_none=True),
668 reco_manager_config.protobuf_parsing_conf.model_dump(
669 exclude_none=True, exclude_unset=True
670 ),
671 reco_manager_config.pipeline_config.model_dump(exclude_none=True),
672 ]
673 for k, v in config.items()
674 }
675 with open(yaml_config_filename, "w") as yaml_config_file:
676 yaml = YAML()
677 yaml.indent(mapping=4, sequence=4, offset=2)
678 yaml.width = 2**16 # very large to not wrap lines
679 yaml.allow_unicode = True
680 yaml.sort_keys = False
681 yaml.dump(yaml_dict, yaml_config_file)
684def _delete_extra(model):
685 """Deletes all extra fields from a pydantic BaseModel.
687 This is usefull to dump a model without the extra parameters, when the model accepts the extra.
688 Required until https://github.com/pydantic/pydantic/issues/6150 or equivalent is implemented.
689 """
690 model.__pydantic_extra__ = {}
691 for _, field in model:
692 if isinstance(field, BaseModel):
693 _delete_extra(field)