Coverage for hiperta_stream/config/configuration.py: 84%

139 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-07-16 10:16 +0000

1import pathlib 

2import warnings 

3from typing import Dict, List, Literal, Union 

4 

5import numpy as np 

6from hiperta_stream.utils.logging_utils import LOGGING_LEVELS 

7from hiperta_stream.utils.substring_substitution_utils import ( 

8 dict_defined_substring_substitution, 

9) 

10from pydantic import BaseModel, Field, model_validator 

11from ruamel.yaml import YAML 

12from typing_extensions import Annotated 

13 

14_CONFIG_PLACEHOLDER_STR = "@@@@@" 

15 

16 

17class ReconstructionDL1DL2ModelProperties(BaseModel, extra="allow"): 

18 """Trained DL1->DL2 model properties useful to chose a model in the catalog for inference at run-time""" 

19 

20 zenith: float = Field( 

21 ge=0.0, le=90.0, title="zenith", description="Model's training data's zenith in degree.", examples=[20.0] 

22 ) 

23 

24 

25class ReconstructionDL2DL3ModelProperties(BaseModel, extra="allow"): 

26 """Trained DL2->DL3 model properties usefull to chose a model in the catalog for inference at run-time.""" 

27 

28 gammaness_cut: float = Field( 

29 ge=0.0, 

30 le=1.0, 

31 title="Gammaness cut", 

32 description="Events with a gammaness score lower than this value will be excluded from DL3 (between 0.0 and 1.0)", 

33 examples=[0.5, 0.65], 

34 ) 

35 

36 

37class DL2DL3ModelArchive(BaseModel, extra="allow"): 

38 """DL2 DL3 model archive, including path to irf and associated configuration.""" 

39 

40 relative_path: str = Field( 

41 title="IRF directory", 

42 description="Path to the directory containing the IRF file and configuration", 

43 examples=["standard_gammaness_0_5"], 

44 ) 

45 properties: ReconstructionDL2DL3ModelProperties = Field( 

46 title="DL2->DL3 properties", description="DL2->DL3 model properties", examples=[{"gammaness_cut": 0.65}] 

47 ) 

48 irf_file: str = Field( 

49 title="IRF file path", 

50 description="Path to IRF file (relative to IRF directory)", 

51 examples=["irf_RTA_ghcut0.5.fits.gz"], 

52 ) 

53 dl2_dl3_config: str = Field( 

54 title="DL2->DL3 configuration", 

55 description="Path to the DL2->DL3 configuration file (relative to IRF directory)", 

56 examples=["lstchain_config_dl2_dl3_rta_0_5.json"], 

57 ) 

58 

59 

60class ReconstructionModelArchive(BaseModel, extra="allow"): 

61 """Trained model archive, including path to model files, models properties and available inference modes.""" 

62 

63 relative_path: str = Field( 

64 title="Random Forest Directory", 

65 description="Path to the directory containing the random forests files, " 

66 "relative to the archive catalog base path.", 

67 examples=["RF1"], 

68 ) 

69 properties: ReconstructionDL1DL2ModelProperties = Field( 

70 title="Model properties", description="Set of values describing the model's nominal inference conditions" 

71 ) 

72 r0_dl1_config: str = Field( 

73 title="R0->DL1 training config", 

74 description="Path to the R0->DL1 configuration used during the model training. (relative to RF directory)", 

75 examples=["r0_dl1_training_config.yml"], 

76 ) 

77 dl1_dl2_config: str = Field( 

78 title="DL1->DL2 training config", 

79 description="Path to the DL1->DL2 configuration used during the model training. (relative to RF directory)", 

80 examples=["lstchain_standard_config_v063-RTA.json"], 

81 ) 

82 dl2_dl3_archives: List[DL2DL3ModelArchive] = Field( 

83 title="DL2->DL3 archive", 

84 description="DL2->DL3 model archive with information about IRF, configuration, etc.", 

85 examples=[ 

86 [ 

87 { 

88 "relative_path": "standard_gammaness_0_5", 

89 "properties": {"gammaness_cut": 0.65}, 

90 "irf_file": "irf_RTA_ghcut0.5.fits.gz", 

91 "dl2_dl3_config": "lstchain_config_dl2_dl3_rta_0_5.json", 

92 } 

93 ] 

94 ], 

95 ) 

96 

97 

98class ReconstructionModelCatalog(BaseModel, extra="allow"): 

99 """Model catalog containing trained model archives""" 

100 

101 archives_base_path: str = Field( 

102 title="Catalog Directory", 

103 description="Path to the directory containing all the trained models archives.", 

104 examples=["~/trained_models/"], 

105 ) 

106 archives: List[ReconstructionModelArchive] = Field( 

107 title="Archived models", description="List of available archived models" 

108 ) 

109 

110 

111class ProtoBufFieldParsingConfiguration(BaseModel, extra="allow"): 

112 """Description of Protocol Buffer Event sub-fields""" 

113 

114 id: int = Field(gt=0, title="ID", description="Field ID", examples=[1, 4]) 

115 type: str = Field(title="Field Type", description="Field data type", examples=["uint64", "int8", "submessage"]) 

116 

117 # With python > 3.7, from __future__ import annotations allows to the type directly instead of a literal string 

118 # supported by pydantic: https://docs.pydantic.dev/latest/concepts/postponed_annotations/ 

119 # However, final integration in python standard is uncertain: 

120 # https://mail.python.org/archives/list/python-dev@python.org/message/VIZEBX5EYMSYIJNDBF6DMUMZOCWHARSO/ 

121 # https://docs.python.org/3/library/__future__.html#id2 

122 sub_message: Union[None, Dict[str, "ProtoBufFieldParsingConfiguration"]] = Field( 

123 default=None, 

124 title="Field sub-message", 

125 description="Optional named sub-field (a named field inside this field)", 

126 ) 

127 is_array: Union[None, bool] = Field( 

128 default=None, 

129 title="Field scalarity", 

130 description="If true, Field contains a array of values. Otherwise a single scalar.", 

131 examples=[True, False], 

132 ) 

133 

134 

135class EventParsingConfiguration(BaseModel, extra="allow"): 

136 """Description of Protocol Buffer Event field""" 

137 

138 Event: ProtoBufFieldParsingConfiguration = Field( 

139 title="Event Field", description="Event Field in the protocol buffer messages" 

140 ) 

141 

142 

143class DataStreamParsingConfiguration(BaseModel, extra="allow"): 

144 """Description of Protocol Buffer messages""" 

145 

146 protobuf_stream_parsing_config: EventParsingConfiguration = Field( 

147 title="Protobuf Messages Configuration", 

148 description="Field ID's and types required to parse protocol buffer messages.", 

149 ) 

150 

151 

152class DataStreamConnectionConfiguration(BaseModel, extra="allow"): 

153 """Parameters to connect to data streamers""" 

154 

155 hostname: str = Field( 

156 title="hostname", 

157 description="Hostname on the network of the data streamer", 

158 examples=["localhost", "tcs05-ib0"], 

159 ) 

160 port: int = Field( 

161 gt=0, le=65535, title="Port", description="Port on the network of the data streamer", examples=[25000, 3391] 

162 ) 

163 

164 

165class ObservationParameters(BaseModel, extra="allow"): 

166 """Observation run-time parameters received from SAG-Supervisor""" 

167 

168 sb_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = Field( 

169 default=_CONFIG_PLACEHOLDER_STR, 

170 title="Scheduling Block ID", 

171 description="ID of the observation's scheduling block.", 

172 examples=[12345], 

173 ) 

174 

175 obs_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = Field( 

176 default=_CONFIG_PLACEHOLDER_STR, 

177 title="Observation ID", 

178 description="Id of the observation", 

179 examples=[12345], 

180 ) 

181 tel_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = Field( 

182 default=_CONFIG_PLACEHOLDER_STR, 

183 title="Telescope ID", 

184 description="Telescope ID", 

185 examples=[1], 

186 ) 

187 

188 RA_pointing: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[float, Field(ge=0.0, le=360.0)]] = Field( 

189 default=_CONFIG_PLACEHOLDER_STR, 

190 title="Pointing Rate Ascension", 

191 description="Pointing Rate Ascension during the observation.", 

192 examples=[20.0], 

193 ) 

194 DEC_pointing: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[float, Field(ge=-90.0, le=90.0)]] = Field( 

195 default=_CONFIG_PLACEHOLDER_STR, 

196 title="Pointing Declination", 

197 description="Pointing declination during the observation", 

198 examples=[60.0], 

199 ) 

200 dl1_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

201 default=_CONFIG_PLACEHOLDER_STR, 

202 title="DL1 directory", 

203 description="Path to the directory where to write the DL1 files.", 

204 examples=["~/DL1"], 

205 ) 

206 dl2_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

207 default=_CONFIG_PLACEHOLDER_STR, 

208 title="DL2 directory", 

209 description="Path to the directory where to write the DL2 files.", 

210 examples=["~/DL2"], 

211 ) 

212 dl3_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

213 default=_CONFIG_PLACEHOLDER_STR, 

214 title="DL3 directory", 

215 description="Path to the directory where to write the DL3 files.", 

216 examples=["~/DL3"], 

217 ) 

218 log_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

219 default=_CONFIG_PLACEHOLDER_STR, 

220 title="Log directory", 

221 description="Path to the directory where to write the log files.", 

222 examples=["~/logs"], 

223 ) 

224 reco_manager_log_file: str = Field( 

225 default="hiperta_stream.log", 

226 title="hiperta_stream_start log file", 

227 description="Log file for reco manager entrypoint (hiperta_stream_start). " 

228 "This path must NOT contain any string bash like string substitution" 

229 "(opening the log file is the first thing reco-manager will do, before substituting strings)", 

230 examples=["/fefs/user/test/hiperta_stream_test.log", "hiperta_stream_start_obs_id_12345"], 

231 ) 

232 data_stream_connections: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], List[DataStreamConnectionConfiguration]] = ( 

233 Field( 

234 default=_CONFIG_PLACEHOLDER_STR, 

235 title="Stream Connections", 

236 description="Parameters of the connections to data streamers", 

237 examples=[[{"hostname": "tcs06", "port": 25000}]], 

238 ) 

239 ) 

240 slurm_nodelists: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Dict[str, List[str]]] = Field( 

241 default=_CONFIG_PLACEHOLDER_STR, 

242 title="Slurm Node List", 

243 description="List of the slurm nodes to use, per slurm reservation", 

244 examples=[{"acada-daily": ["cp15", "cp16"], "rta-one-node": ["cp19"]}], 

245 ) 

246 

247 

248class R0toDL1DebugParameters(BaseModel, extra="allow"): 

249 """Debuging parameters of R0->DL1 program""" 

250 

251 keep_random_trigger_threshold: float = Field( 

252 ge=0.0, 

253 le=1.0, 

254 default=1.0, 

255 title="Random Trigger Probability", 

256 description="Probability to keep events. If 0.0 all events are discarded, if 1.0 all events are kept.", 

257 examples=[1.0], 

258 ) 

259 verbose: bool = Field(default=False, title="Verbose Mode", description="If True: activate verbose output") 

260 is_flush_log_disable: bool = Field( 

261 title="Disable log flush", description="If true, flushing threads will not write logs.", examples=[False] 

262 ) 

263 

264 

265class R0toDL1PipelineParameters(BaseModel, extra="allow"): 

266 """R0->DL1 programs parameters""" 

267 

268 dl1_filename: str = Field( 

269 title="DL1 filename", 

270 description="DL1 data file name, relative to the DL1 directory. " 

271 "HiPeRTA can substitute some variables at run-time.", 

272 examples=[ 

273 "dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_line_idx_§{processIndex}_" 

274 "thread_idx_§{threadIndex}_th_file_idx_§{processFileIndex}_file_idx_§{fileIndex}.h5" 

275 ], 

276 ) 

277 r0_dl1_log_dir: str = Field( 

278 title="R0->DL1 log directory", 

279 description="Directory where to write the R0->DL1 processing logs", 

280 examples=["~/logs/r0_dl1/"], 

281 ) 

282 r0_dl1_log_file: str = Field( 

283 title="R0->DL1 log filename", 

284 description="R0->DL1 log filename, relative to the R0->DL1 log directory." 

285 "HiPeRTA can substitute some variables at run-time.", 

286 examples=["r0_dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_§{processIndex}.log"], 

287 ) 

288 r0_dl1_job_name: str = Field( 

289 title="R0->DL1 jobname for slurm", 

290 description="Job name of the R0->DL1 slurm jobs.", 

291 examples=["@{tel_id}_§{processIndex}_r0_dl1"], 

292 ) 

293 r0_dl1_mem: Union[None, int] = Field( 

294 title="R0->DL1 memory", 

295 description="Required memory for R0->DL1 jobs in GB (passed to slurm)", 

296 examples=["1"], 

297 default=None, 

298 ) 

299 nb_event_per_hdf5: int = Field( 

300 gt=0, title="Number of Events", description="Number of events to write per DL1 files.", examples=[20000] 

301 ) 

302 is_stream_read_zfit: bool = Field( 

303 title="Zfit mode", description="If true, expect the incoming data to be in zfit format.", examples=[True] 

304 ) 

305 is_stream_real_data: bool = Field( 

306 title="Real Calibration Data mode", 

307 description="If true, set the --pedperslice option to r0->dl1 programs", 

308 examples=[True], 

309 ) 

310 is_stream_r1_zfit: bool = Field( 

311 title="R1 mode", 

312 description="If true, expect the incoming data to be calibrated 1 gain R1 data.", 

313 examples=[True], 

314 ) 

315 r1_scale: float = Field( 

316 title="R1 scale", 

317 description="Scaling value to apply to R1 data, when transforming it back to float from unsigned int" 

318 " (which are used for compression reasons)", 

319 examples=[5.0], 

320 ) 

321 r1_offset: float = Field( 

322 title="R1 offset", 

323 description="Offset value to apply to R1 data, when transforming it back to float from unsigned int" 

324 " (which are used for compression reasons)", 

325 examples=[60.0], 

326 ) 

327 nb_processing_threads: int = Field( 

328 gt=0, 

329 title="Number of Processing Threads", 

330 description="Number of threads to start for each R0->DL1 process for processing", 

331 examples=[2], 

332 ) 

333 nb_event_per_block: int = Field( 

334 gt=0, 

335 title="Number of events per Processing Block", 

336 description="Number of events per processing block before dividing them to each threads.", 

337 examples=[2], 

338 ) 

339 nb_tasks: int = Field( 

340 gt=0, 

341 title="Number of slurm tasks", 

342 description="Number of cpu slurm should allocate to each R0->DL1 program", 

343 examples=[3], 

344 ) 

345 zmq_nb_thread: int = Field( 

346 gt=0, 

347 title="Numbe of ZMQ threads", 

348 description="Number of threads ZMQ will start to receive data.", 

349 examples=[1], 

350 ) 

351 zmq_thread_affinity: int = Field( 

352 ge=0, 

353 title="ZQM Thread affinity", 

354 description="ZMQ thread affinity to set which thread will handle re-connection", 

355 examples=[1], 

356 ) 

357 zmq_buffer_size: int = Field( 

358 gt=0, title="ZMQ buffer size", description="ZMQ buffer size in bytes", examples=[40000000] 

359 ) 

360 zmq_max_nb_message_per_buffer: int = Field( 

361 gt=0, 

362 title="ZMQ buffer max message number", 

363 description="Maximum of messages to put in ZMQ buffer", 

364 examples=[100], 

365 ) 

366 base_structure_file_path: str = Field( 

367 title="Base Structure File", 

368 description="Path to base structure hdf5 file with instrument model", 

369 examples=["~/base_structure.hdf5"], 

370 ) 

371 refresh_stop_ms: int = Field( 

372 gt=0, 

373 title="Refresh Signal Period", 

374 description="Amount of time between 2 checks of receiving stop signal, in ms", 

375 examples=[1000], 

376 ) 

377 refresh_perf_ms: int = Field( 

378 gt=0, 

379 title="Performance Stat Period", 

380 description="Amount of time between to sending of statistics messages, in ms", 

381 examples=[1000], 

382 ) 

383 is_detach_dl1_flush: bool = Field( 

384 title="Detach flush mode", description="If true, detach a new process to flush the DL1 files.", examples=[True] 

385 ) 

386 keep_rejected_event: bool = Field( 

387 title="Keep Rejected Events Mode", 

388 description="If true, write even the events flagged as bad quality to files (ignored if nb_threads > 1)", 

389 examples=[True], 

390 ) 

391 nbMaxMessage: int = Field( 

392 gt=0, 

393 title="Maximum Number of Messages", 

394 description="R0->DL1 program will stop after receiving this many messages.", 

395 examples=[999999999999], 

396 ) 

397 wait_detach_thread_to_finish: int = Field( 

398 title="TimeOut for flushing threads", 

399 description="Amount of time to wait for flushing threads, when stopping the analysis, in microseconds", 

400 examples=[10000000], 

401 ) 

402 debug_parameters: R0toDL1DebugParameters = Field( 

403 title="Debugging Parameters", description="Debuging parameters of R0->DL1 program" 

404 ) 

405 hiperta_extra_args: Union[None, str] = Field( 

406 title="Extra arguments for hiperta", 

407 description="List of extra arguments for HiPeRTA process", 

408 default=None, 

409 examples=["--verbose", "--loglevel DEBUG"], 

410 ) 

411 

412 @model_validator(mode="after") 

413 def check_min_nb_event_per_block_and_n_tasks(self): 

414 """Check that the number of threads, slurm tasks and event per blocks is consistent""" 

415 if self.nb_event_per_block < self.nb_processing_threads: 

416 warnings.warn( 

417 "nb_event_per_block {} is less than the number of processing threads {}!\n" 

418 "Setting it to {}!".format( 

419 self.nb_event_per_block, self.nb_processing_threads, self.nb_processing_threads 

420 ) 

421 ) 

422 self.nb_event_per_block = self.nb_processing_threads 

423 if self.nb_tasks < self.zmq_nb_thread + self.nb_processing_threads: 

424 warnings.warn( 

425 "nb_tasks (number of slurm tasks for r0->dl1) is less than zmq_nb_thread {} + nb_processing_threads {}!\n" 

426 "Setting it to {}!".format( 

427 self.nb_tasks, 

428 self.zmq_nb_thread, 

429 self.nb_processing_threads, 

430 self.zmq_nb_thread + self.nb_processing_threads, 

431 ) 

432 ) 

433 self.nb_tasks = self.zmq_nb_thread + self.nb_processing_threads 

434 return self 

435 

436 @model_validator(mode="after") 

437 def compute_r0_dl1_mem_if_not_set(self): 

438 """Compute a value for the memory requirement of R0->DL1 jobs if it was not set.""" 

439 if self.r0_dl1_mem is None: 

440 self.r0_dl1_mem = ( 

441 int(abs(np.ceil((self.zmq_max_nb_message_per_buffer + self.nb_event_per_hdf5 * 2) * 360e3 / 1e9))) + 2 

442 ) 

443 return self 

444 

445 

446class DL1toDL3PipelineParameters(BaseModel, extra="allow"): 

447 """Parameters for the DL1->DL3 jobs""" 

448 

449 reco_random_forests_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

450 default=_CONFIG_PLACEHOLDER_STR, 

451 title="Random Forest Directory", 

452 description="Path to the directory containing the random forest files", 

453 examples=["~/RFS"], 

454 ) 

455 lstchain_dl1_dl2_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

456 default=_CONFIG_PLACEHOLDER_STR, 

457 title="DL1->DL2 configuration file", 

458 description="Path to the configuration file for lstchain DL1->DL2", 

459 examples=["~/RFS/lstchain_standard.json"], 

460 ) 

461 lstchain_dl2_dl3_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

462 default=_CONFIG_PLACEHOLDER_STR, 

463 title="DL2->DL3 configuration file", 

464 description="Path to the configuration file for lstchain DL2->DL3", 

465 examples=["~/RFS/lstchain_dl2_dl3.json"], 

466 ) 

467 irf_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

468 default=_CONFIG_PLACEHOLDER_STR, 

469 title="IRF File", 

470 description="Path to the IRF file to add in DL3 files.", 

471 examples=["~/RFS/irf.fits"], 

472 ) 

473 

474 

475class SlurmJobConfiguration(BaseModel, extra="allow"): 

476 """Slurm Pipeline Description""" 

477 

478 job_cmd: str = Field( 

479 title="Job Command passed to slurm using --wrap", 

480 description="Job command passed to slurm using --wrap", 

481 examples=["echo hostname"], 

482 ) 

483 slurm_job_parameters: str = Field( 

484 title="Slurm Job Parameters", 

485 description="String passed to slurm to set job parameters", 

486 examples=["-A ctao-n-acada -p short"], 

487 ) 

488 name_prefix: str = Field( 

489 title="Slurm Job Name prefix", 

490 description="Prefix for the slurm job name. It is prefixed to the SlurmJobConfiguration name by HiPeRTA.", 

491 examples=["@{tel_id}_§{processIndex}_"], 

492 ) 

493 dependencies: Union[None, List[str]] = Field( 

494 default=None, 

495 title="Job Dependencies", 

496 description="Pipeline stage name that are dependencies of this job", 

497 examples=["dl1reorg", "dl1dl2"], 

498 ) 

499 

500 

501class SlurmPipelineConfiguration(BaseModel, extra="allow"): 

502 """Pipeline Definition""" 

503 

504 slurm_account: str = Field( 

505 title="Slurm account", description="Slurm account to use to submit jobs", examples=["ctao-n-acada"] 

506 ) 

507 slurm_queue: str = Field( 

508 title="Slurm queue", description="Slurm queue to use when submitting jobs", examples=["short"] 

509 ) 

510 slurm_reservation: str = Field( 

511 title="Slurm reservation", 

512 description="Slurm reservation to use when submitting jobs", 

513 examples=["acada_daily"], 

514 ) 

515 

516 slurm_jobs: Dict[str, SlurmJobConfiguration] = Field( 

517 title="Slurm Pipeline", description="Pipeline description: stage_name: Slurm job" 

518 ) 

519 output_directories: Dict[str, str] = Field( 

520 title="Pipeline's steps output directories", 

521 description="The directories where the pipeline jobs' output should be saved. " 

522 "Note: DL1, DL2 and DL3 mandatory directories are specified in observation parameters. " 

523 "This dictionary exists to allow to add other directories if required by pipeline steps.", 

524 examples=[{}], 

525 ) 

526 log_directories: Dict[str, str] = Field( 

527 title="Pipeline's steps log directories", 

528 description="The directories where the pipeline jobs' logs should be saved.", 

529 examples=[ 

530 { 

531 "dl1_alt_az_log_dir": "@{log_dir}/dl1_alt_az", 

532 "dl1_reorg_log_dir": "@{log_dir}/dl1_reorg", 

533 "dl1_dl2_log_dir": "@{log_dir}/dl1_dl2", 

534 "dl2_dl3_log_dir": "@{log_dir}/dl2_dl3", 

535 } 

536 ], 

537 ) 

538 

539 

540class ReconstructionManagerConfiguration(BaseModel, extra="allow"): 

541 """Complete SAG-Reconstruction configuration""" 

542 

543 logging_level: LOGGING_LEVELS = Field( 

544 title="Logging level", 

545 description="Set the logging level of the reco-manager", 

546 examples=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], 

547 ) 

548 no_slurm: bool = Field( 

549 title="Deactivate slurm", 

550 description="If True, sub-systems programs will be started with as subprocesses instead of slurm jobs", 

551 examples=[False], 

552 ) 

553 observation_config: ObservationParameters = Field( 

554 title="Observation Parameters", description="Configuration of the observation parameters" 

555 ) 

556 dl1_dl3_params: DL1toDL3PipelineParameters = Field( 

557 title="DL1->DL3 Configuration", description="DL1->DL3 Pipeline parameters" 

558 ) 

559 reco_model_catalog: ReconstructionModelCatalog = Field( 

560 title="Reconstruction Model Catalog", description="Description of the available reconstruction models" 

561 ) 

562 r0_dl1_params: R0toDL1PipelineParameters = Field( 

563 title="R0->DL1 Pipeline Parameters", description="Parameters of the R0->DL1 programs" 

564 ) 

565 protobuf_parsing_conf: DataStreamParsingConfiguration = Field( 

566 title="ProtocolBuffer Message Parsing Configuration", 

567 description="Parameters used to parse the stream's protocol buffers messages", 

568 ) 

569 pipeline_config: SlurmPipelineConfiguration = Field( 

570 title="Slurm Pipeline Configuration", description="DL1->DL3 slurm jobs configuration." 

571 ) 

572 

573 

574def write_yaml_config(yaml_config_filename, reco_manager_config, r0_dl1_training_config): 

575 """Dump the configuration into a yaml file to use with SAG-Reconstruction""" 

576 yaml_dict = { 

577 k: v 

578 for config in [ 

579 {"logging_level": reco_manager_config.logging_level.name, "no_slurm": reco_manager_config.no_slurm}, 

580 reco_manager_config.observation_config.model_dump(exclude_none=True), 

581 reco_manager_config.dl1_dl3_params.model_dump(exclude_none=True), 

582 reco_manager_config.reco_model_catalog.model_dump(exclude_none=True), 

583 r0_dl1_training_config, 

584 reco_manager_config.r0_dl1_params.model_dump(exclude_none=True), 

585 reco_manager_config.protobuf_parsing_conf.model_dump(exclude_none=True, exclude_unset=True), 

586 reco_manager_config.pipeline_config.model_dump(exclude_none=True), 

587 ] 

588 for k, v in config.items() 

589 } 

590 with open(yaml_config_filename, "w") as yaml_config_file: 

591 yaml = YAML() 

592 yaml.indent(mapping=4, sequence=4, offset=2) 

593 yaml.width = 2**16 # very large to not wrap lines 

594 yaml.allow_unicode = True 

595 yaml.sort_keys = False 

596 yaml.dump(yaml_dict, yaml_config_file) 

597 

598 

599def _delete_extra(model): 

600 """Deletes all extra fields from a pydantic BaseModel. 

601 

602 This is usefull to dump a model without the extra parameters, when the model accepts the extra. 

603 Required until https://github.com/pydantic/pydantic/issues/6150 or equivalent is implemented. 

604 """ 

605 model.__pydantic_extra__ = {} 

606 for _, field in model: 

607 if isinstance(field, BaseModel): 

608 _delete_extra(field)