Coverage for hiperta_stream/config/configuration.py: 83%

139 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-12-11 13:29 +0000

1import pathlib 

2import warnings 

3from typing import Dict, List, Literal, Union 

4 

5import numpy as np 

6from pydantic import BaseModel, Field, model_validator 

7from ruamel.yaml import YAML 

8from typing_extensions import Annotated 

9 

10from hiperta_stream.utils.logging_utils import LOGGING_LEVELS 

11from hiperta_stream.utils.substring_substitution_utils import ( 

12 dict_defined_substring_substitution, 

13) 

14 

15_CONFIG_PLACEHOLDER_STR = "@@@@@" 

16 

17 

18class ReconstructionDL1DL2ModelProperties(BaseModel, extra="allow"): 

19 """Trained DL1->DL2 model properties useful to chose a model in the catalog for inference at run-time""" 

20 

21 zenith: float = Field( 

22 ge=0.0, 

23 le=90.0, 

24 title="zenith", 

25 description="Model's training data's zenith in degree.", 

26 examples=[20.0], 

27 ) 

28 

29 

30class ReconstructionDL2DL3ModelProperties(BaseModel, extra="allow"): 

31 """Trained DL2->DL3 model properties usefull to chose a model in the catalog for inference at run-time.""" 

32 

33 gammaness_cut: float = Field( 

34 ge=0.0, 

35 le=1.0, 

36 title="Gammaness cut", 

37 description="Events with a gammaness score lower than this value will be excluded from DL3 (between 0.0 and 1.0)", 

38 examples=[0.5, 0.65], 

39 ) 

40 

41 

42class DL2DL3ModelArchive(BaseModel, extra="allow"): 

43 """DL2 DL3 model archive, including path to irf and associated configuration.""" 

44 

45 relative_path: str = Field( 

46 title="IRF directory", 

47 description="Path to the directory containing the IRF file and configuration", 

48 examples=["standard_gammaness_0_5"], 

49 ) 

50 properties: ReconstructionDL2DL3ModelProperties = Field( 

51 title="DL2->DL3 properties", 

52 description="DL2->DL3 model properties", 

53 examples=[{"gammaness_cut": 0.65}], 

54 ) 

55 irf_file: str = Field( 

56 title="IRF file path", 

57 description="Path to IRF file (relative to IRF directory)", 

58 examples=["irf_RTA_ghcut0.5.fits.gz"], 

59 ) 

60 dl2_dl3_config: str = Field( 

61 title="DL2->DL3 configuration", 

62 description="Path to the DL2->DL3 configuration file (relative to IRF directory)", 

63 examples=["lstchain_config_dl2_dl3_rta_0_5.json"], 

64 ) 

65 

66 

67class ReconstructionModelArchive(BaseModel, extra="allow"): 

68 """Trained model archive, including path to model files, models properties and available inference modes.""" 

69 

70 relative_path: str = Field( 

71 title="Random Forest Directory", 

72 description="Path to the directory containing the random forests files, " 

73 "relative to the archive catalog base path.", 

74 examples=["RF1"], 

75 ) 

76 properties: ReconstructionDL1DL2ModelProperties = Field( 

77 title="Model properties", 

78 description="Set of values describing the model's nominal inference conditions", 

79 ) 

80 r0_dl1_config: str = Field( 

81 title="R0->DL1 training config", 

82 description="Path to the R0->DL1 configuration used during the model training. (relative to RF directory)", 

83 examples=["r0_dl1_training_config.yml"], 

84 ) 

85 dl1_dl2_config: str = Field( 

86 title="DL1->DL2 training config", 

87 description="Path to the DL1->DL2 configuration used during the model training. (relative to RF directory)", 

88 examples=["lstchain_standard_config_v063-RTA.json"], 

89 ) 

90 dl2_dl3_archives: List[DL2DL3ModelArchive] = Field( 

91 title="DL2->DL3 archive", 

92 description="DL2->DL3 model archive with information about IRF, configuration, etc.", 

93 examples=[ 

94 [ 

95 { 

96 "relative_path": "standard_gammaness_0_5", 

97 "properties": {"gammaness_cut": 0.65}, 

98 "irf_file": "irf_RTA_ghcut0.5.fits.gz", 

99 "dl2_dl3_config": "lstchain_config_dl2_dl3_rta_0_5.json", 

100 } 

101 ] 

102 ], 

103 ) 

104 

105 

106class ReconstructionModelCatalog(BaseModel, extra="allow"): 

107 """Model catalog containing trained model archives""" 

108 

109 archives_base_path: str = Field( 

110 title="Catalog Directory", 

111 description="Path to the directory containing all the trained models archives.", 

112 examples=["~/trained_models/"], 

113 ) 

114 archives: List[ReconstructionModelArchive] = Field( 

115 title="Archived models", description="List of available archived models" 

116 ) 

117 

118 

119class ProtoBufFieldParsingConfiguration(BaseModel, extra="allow"): 

120 """Description of Protocol Buffer Event sub-fields""" 

121 

122 id: int = Field(gt=0, title="ID", description="Field ID", examples=[1, 4]) 

123 type: str = Field( 

124 title="Field Type", 

125 description="Field data type", 

126 examples=["uint64", "int8", "submessage"], 

127 ) 

128 

129 # With python > 3.7, from __future__ import annotations allows to the type directly instead of a literal string 

130 # supported by pydantic: https://docs.pydantic.dev/latest/concepts/postponed_annotations/ 

131 # However, final integration in python standard is uncertain: 

132 # https://mail.python.org/archives/list/python-dev@python.org/message/VIZEBX5EYMSYIJNDBF6DMUMZOCWHARSO/ 

133 # https://docs.python.org/3/library/__future__.html#id2 

134 sub_message: Union[None, Dict[str, "ProtoBufFieldParsingConfiguration"]] = Field( 

135 default=None, 

136 title="Field sub-message", 

137 description="Optional named sub-field (a named field inside this field)", 

138 ) 

139 is_array: Union[None, bool] = Field( 

140 default=None, 

141 title="Field scalarity", 

142 description="If true, Field contains a array of values. Otherwise a single scalar.", 

143 examples=[True, False], 

144 ) 

145 

146 

147class EventParsingConfiguration(BaseModel, extra="allow"): 

148 """Description of Protocol Buffer Event field""" 

149 

150 Event: ProtoBufFieldParsingConfiguration = Field( 

151 title="Event Field", description="Event Field in the protocol buffer messages" 

152 ) 

153 

154 

155class DataStreamParsingConfiguration(BaseModel, extra="allow"): 

156 """Description of Protocol Buffer messages""" 

157 

158 protobuf_stream_parsing_config: EventParsingConfiguration = Field( 

159 title="Protobuf Messages Configuration", 

160 description="Field ID's and types required to parse protocol buffer messages.", 

161 ) 

162 

163 

164class DataStreamConnectionConfiguration(BaseModel, extra="allow"): 

165 """Parameters to connect to data streamers""" 

166 

167 hostname: str = Field( 

168 title="hostname", 

169 description="Hostname on the network of the data streamer", 

170 examples=["localhost", "tcs05-ib0"], 

171 ) 

172 port: int = Field( 

173 gt=0, 

174 le=65535, 

175 title="Port", 

176 description="Port on the network of the data streamer", 

177 examples=[25000, 3391], 

178 ) 

179 

180 

181class ObservationParameters(BaseModel, extra="allow"): 

182 """Observation run-time parameters received from SAG-Supervisor""" 

183 

184 sb_id: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)]] = ( 

185 Field( 

186 default=_CONFIG_PLACEHOLDER_STR, 

187 title="Scheduling Block ID", 

188 description="ID of the observation's scheduling block.", 

189 examples=[12345], 

190 ) 

191 ) 

192 

193 obs_id: Union[ 

194 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)] 

195 ] = Field( 

196 default=_CONFIG_PLACEHOLDER_STR, 

197 title="Observation ID", 

198 description="Id of the observation", 

199 examples=[12345], 

200 ) 

201 tel_id: Union[ 

202 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[int, Field(ge=0)] 

203 ] = Field( 

204 default=_CONFIG_PLACEHOLDER_STR, 

205 title="Telescope ID", 

206 description="Telescope ID", 

207 examples=[1], 

208 ) 

209 

210 RA_pointing: Union[ 

211 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Annotated[float, Field(ge=0.0, le=360.0)] 

212 ] = Field( 

213 default=_CONFIG_PLACEHOLDER_STR, 

214 title="Pointing Rate Ascension", 

215 description="Pointing Rate Ascension during the observation.", 

216 examples=[20.0], 

217 ) 

218 DEC_pointing: Union[ 

219 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], 

220 Annotated[float, Field(ge=-90.0, le=90.0)], 

221 ] = Field( 

222 default=_CONFIG_PLACEHOLDER_STR, 

223 title="Pointing Declination", 

224 description="Pointing declination during the observation", 

225 examples=[60.0], 

226 ) 

227 dl1_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

228 default=_CONFIG_PLACEHOLDER_STR, 

229 title="DL1 directory", 

230 description="Path to the directory where to write the DL1 files.", 

231 examples=["~/DL1"], 

232 ) 

233 dl2_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

234 default=_CONFIG_PLACEHOLDER_STR, 

235 title="DL2 directory", 

236 description="Path to the directory where to write the DL2 files.", 

237 examples=["~/DL2"], 

238 ) 

239 dl3_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

240 default=_CONFIG_PLACEHOLDER_STR, 

241 title="DL3 directory", 

242 description="Path to the directory where to write the DL3 files.", 

243 examples=["~/DL3"], 

244 ) 

245 log_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

246 default=_CONFIG_PLACEHOLDER_STR, 

247 title="Log directory", 

248 description="Path to the directory where to write the log files.", 

249 examples=["~/logs"], 

250 ) 

251 reco_manager_log_file: str = Field( 

252 default="hiperta_stream.log", 

253 title="hiperta_stream_start log file", 

254 description="Log file for reco manager entrypoint (hiperta_stream_start). " 

255 "This path must NOT contain any string bash like string substitution" 

256 "(opening the log file is the first thing reco-manager will do, before substituting strings)", 

257 examples=[ 

258 "/fefs/user/test/hiperta_stream_test.log", 

259 "hiperta_stream_start_obs_id_12345", 

260 ], 

261 ) 

262 data_stream_connections: Union[ 

263 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], List[DataStreamConnectionConfiguration] 

264 ] = Field( 

265 default=_CONFIG_PLACEHOLDER_STR, 

266 title="Stream Connections", 

267 description="Parameters of the connections to data streamers", 

268 examples=[[{"hostname": "tcs06", "port": 25000}]], 

269 ) 

270 slurm_nodelists: Union[ 

271 Literal[f"{_CONFIG_PLACEHOLDER_STR}"], Dict[str, List[str]] 

272 ] = Field( 

273 default=_CONFIG_PLACEHOLDER_STR, 

274 title="Slurm Node List", 

275 description="List of the slurm nodes to use, per slurm reservation", 

276 examples=[{"acada-daily": ["cp15", "cp16"], "rta-one-node": ["cp19"]}], 

277 ) 

278 

279 

280class R0toDL1DebugParameters(BaseModel, extra="allow"): 

281 """Debuging parameters of R0->DL1 program""" 

282 

283 keep_random_trigger_threshold: float = Field( 

284 ge=0.0, 

285 le=1.0, 

286 default=1.0, 

287 title="Random Trigger Probability", 

288 description="Probability to keep events. If 0.0 all events are discarded, if 1.0 all events are kept.", 

289 examples=[1.0], 

290 ) 

291 verbose: bool = Field( 

292 default=False, 

293 title="Verbose Mode", 

294 description="If True: activate verbose output", 

295 ) 

296 is_flush_log_disable: bool = Field( 

297 title="Disable log flush", 

298 description="If true, flushing threads will not write logs.", 

299 examples=[False], 

300 ) 

301 

302 

303class R0toDL1PipelineParameters(BaseModel, extra="allow"): 

304 """R0->DL1 programs parameters""" 

305 

306 dl1_filename: str = Field( 

307 title="DL1 filename", 

308 description="DL1 data file name, relative to the DL1 directory. " 

309 "HiPeRTA can substitute some variables at run-time.", 

310 examples=[ 

311 "dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_line_idx_§{processIndex}_" 

312 "thread_idx_§{threadIndex}_th_file_idx_§{processFileIndex}_file_idx_§{fileIndex}.h5" 

313 ], 

314 ) 

315 r0_dl1_log_dir: str = Field( 

316 title="R0->DL1 log directory", 

317 description="Directory where to write the R0->DL1 processing logs", 

318 examples=["~/logs/r0_dl1/"], 

319 ) 

320 r0_dl1_log_file: str = Field( 

321 title="R0->DL1 log filename", 

322 description="R0->DL1 log filename, relative to the R0->DL1 log directory." 

323 "HiPeRTA can substitute some variables at run-time.", 

324 examples=[ 

325 "r0_dl1_sb_id_@{sb_id}_obs_id_@{obs_id}_tel_id_@{tel_id}_§{processIndex}.log" 

326 ], 

327 ) 

328 r0_dl1_job_name: str = Field( 

329 title="R0->DL1 jobname for slurm", 

330 description="Job name of the R0->DL1 slurm jobs.", 

331 examples=["@{tel_id}_§{processIndex}_r0_dl1"], 

332 ) 

333 r0_dl1_mem: Union[None, int] = Field( 

334 title="R0->DL1 memory", 

335 description="Required memory for R0->DL1 jobs in GB (passed to slurm)", 

336 examples=["1"], 

337 default=None, 

338 ) 

339 nb_event_per_hdf5: int = Field( 

340 gt=0, 

341 title="Number of Events", 

342 description="Number of events to write per DL1 files.", 

343 examples=[20000], 

344 ) 

345 is_stream_read_zfit: bool = Field( 

346 title="Zfit mode", 

347 description="If true, expect the incoming data to be in zfit format.", 

348 examples=[True], 

349 ) 

350 is_stream_real_data: bool = Field( 

351 title="Real Calibration Data mode", 

352 description="If true, set the --pedperslice option to r0->dl1 programs", 

353 examples=[True], 

354 ) 

355 is_stream_r1_zfit: bool = Field( 

356 title="R1 mode", 

357 description="If true, expect the incoming data to be calibrated 1 gain R1 data.", 

358 examples=[True], 

359 ) 

360 r1_scale: float = Field( 

361 title="R1 scale", 

362 description="Scaling value to apply to R1 data, when transforming it back to float from unsigned int" 

363 " (which are used for compression reasons)", 

364 examples=[5.0], 

365 ) 

366 r1_offset: float = Field( 

367 title="R1 offset", 

368 description="Offset value to apply to R1 data, when transforming it back to float from unsigned int" 

369 " (which are used for compression reasons)", 

370 examples=[60.0], 

371 ) 

372 nb_processing_threads: int = Field( 

373 gt=0, 

374 title="Number of Processing Threads", 

375 description="Number of threads to start for each R0->DL1 process for processing", 

376 examples=[2], 

377 ) 

378 nb_event_per_block: int = Field( 

379 gt=0, 

380 title="Number of events per Processing Block", 

381 description="Number of events per processing block before dividing them to each threads.", 

382 examples=[2], 

383 ) 

384 nb_tasks: int = Field( 

385 gt=0, 

386 title="Number of slurm tasks", 

387 description="Number of cpu slurm should allocate to each R0->DL1 program", 

388 examples=[3], 

389 ) 

390 zmq_nb_thread: int = Field( 

391 gt=0, 

392 title="Numbe of ZMQ threads", 

393 description="Number of threads ZMQ will start to receive data.", 

394 examples=[1], 

395 ) 

396 zmq_thread_affinity: int = Field( 

397 ge=0, 

398 title="ZQM Thread affinity", 

399 description="ZMQ thread affinity to set which thread will handle re-connection", 

400 examples=[1], 

401 ) 

402 zmq_buffer_size: int = Field( 

403 gt=0, 

404 title="ZMQ buffer size", 

405 description="ZMQ buffer size in bytes", 

406 examples=[40000000], 

407 ) 

408 zmq_max_nb_message_per_buffer: int = Field( 

409 gt=0, 

410 title="ZMQ buffer max message number", 

411 description="Maximum of messages to put in ZMQ buffer", 

412 examples=[100], 

413 ) 

414 base_structure_file_path: str = Field( 

415 title="Base Structure File", 

416 description="Path to base structure hdf5 file with instrument model", 

417 examples=["~/base_structure.hdf5"], 

418 ) 

419 refresh_stop_ms: int = Field( 

420 gt=0, 

421 title="Refresh Signal Period", 

422 description="Amount of time between 2 checks of receiving stop signal, in ms", 

423 examples=[1000], 

424 ) 

425 refresh_perf_ms: int = Field( 

426 gt=0, 

427 title="Performance Stat Period", 

428 description="Amount of time between to sending of statistics messages, in ms", 

429 examples=[1000], 

430 ) 

431 is_detach_dl1_flush: bool = Field( 

432 title="Detach flush mode", 

433 description="If true, detach a new process to flush the DL1 files.", 

434 examples=[True], 

435 ) 

436 keep_rejected_event: bool = Field( 

437 title="Keep Rejected Events Mode", 

438 description="If true, write even the events flagged as bad quality to files (ignored if nb_threads > 1)", 

439 examples=[True], 

440 ) 

441 nbMaxMessage: int = Field( 

442 gt=0, 

443 title="Maximum Number of Messages", 

444 description="R0->DL1 program will stop after receiving this many messages.", 

445 examples=[999999999999], 

446 ) 

447 wait_detach_thread_to_finish: int = Field( 

448 title="TimeOut for flushing threads", 

449 description="Amount of time to wait for flushing threads, when stopping the analysis, in microseconds", 

450 examples=[10000000], 

451 ) 

452 debug_parameters: R0toDL1DebugParameters = Field( 

453 title="Debugging Parameters", 

454 description="Debuging parameters of R0->DL1 program", 

455 ) 

456 hiperta_extra_args: Union[None, str] = Field( 

457 title="Extra arguments for hiperta", 

458 description="List of extra arguments for HiPeRTA process", 

459 default=None, 

460 examples=["--verbose", "--loglevel DEBUG"], 

461 ) 

462 

463 @model_validator(mode="after") 

464 def check_min_nb_event_per_block_and_n_tasks(self): 

465 """Check that the number of threads, slurm tasks and event per blocks is consistent""" 

466 if self.nb_event_per_block < self.nb_processing_threads: 466 ↛ 467line 466 didn't jump to line 467, because the condition on line 466 was never true

467 warnings.warn( 

468 "nb_event_per_block {} is less than the number of processing threads {}!\n" 

469 "Setting it to {}!".format( 

470 self.nb_event_per_block, 

471 self.nb_processing_threads, 

472 self.nb_processing_threads, 

473 ) 

474 ) 

475 self.nb_event_per_block = self.nb_processing_threads 

476 if self.nb_tasks < self.zmq_nb_thread + self.nb_processing_threads: 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true

477 warnings.warn( 

478 "nb_tasks (number of slurm tasks for r0->dl1) {} is less than zmq_nb_thread {} + nb_processing_threads {}!\n" 

479 "Setting it to {}!".format( 

480 self.nb_tasks, 

481 self.zmq_nb_thread, 

482 self.nb_processing_threads, 

483 self.zmq_nb_thread + self.nb_processing_threads, 

484 ) 

485 ) 

486 self.nb_tasks = self.zmq_nb_thread + self.nb_processing_threads 

487 return self 

488 

489 @model_validator(mode="after") 

490 def compute_r0_dl1_mem_if_not_set(self): 

491 """Compute a value for the memory requirement of R0->DL1 jobs if it was not set.""" 

492 if self.r0_dl1_mem is None: 492 ↛ 493line 492 didn't jump to line 493

493 self.r0_dl1_mem = ( 

494 int( 

495 abs( 

496 np.ceil( 

497 ( 

498 self.zmq_max_nb_message_per_buffer 

499 + self.nb_event_per_hdf5 * 2 

500 ) 

501 * 360e3 

502 / 1e9 

503 ) 

504 ) 

505 ) 

506 + 2 

507 ) 

508 return self 

509 

510 

511class DL1toDL3PipelineParameters(BaseModel, extra="allow"): 

512 """Parameters for the DL1->DL3 jobs""" 

513 

514 reco_random_forests_dir: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

515 default=_CONFIG_PLACEHOLDER_STR, 

516 title="Random Forest Directory", 

517 description="Path to the directory containing the random forest files", 

518 examples=["~/RFS"], 

519 ) 

520 lstchain_dl1_dl2_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = ( 

521 Field( 

522 default=_CONFIG_PLACEHOLDER_STR, 

523 title="DL1->DL2 configuration file", 

524 description="Path to the configuration file for lstchain DL1->DL2", 

525 examples=["~/RFS/lstchain_standard.json"], 

526 ) 

527 ) 

528 lstchain_dl2_dl3_config_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = ( 

529 Field( 

530 default=_CONFIG_PLACEHOLDER_STR, 

531 title="DL2->DL3 configuration file", 

532 description="Path to the configuration file for lstchain DL2->DL3", 

533 examples=["~/RFS/lstchain_dl2_dl3.json"], 

534 ) 

535 ) 

536 irf_file: Union[Literal[f"{_CONFIG_PLACEHOLDER_STR}"], str] = Field( 

537 default=_CONFIG_PLACEHOLDER_STR, 

538 title="IRF File", 

539 description="Path to the IRF file to add in DL3 files.", 

540 examples=["~/RFS/irf.fits"], 

541 ) 

542 

543 

544class SlurmJobConfiguration(BaseModel, extra="allow"): 

545 """Slurm Pipeline Description""" 

546 

547 job_cmd: str = Field( 

548 title="Job Command passed to slurm using --wrap", 

549 description="Job command passed to slurm using --wrap", 

550 examples=["echo hostname"], 

551 ) 

552 slurm_job_parameters: str = Field( 

553 title="Slurm Job Parameters", 

554 description="String passed to slurm to set job parameters", 

555 examples=["-A ctao-n-acada -p short"], 

556 ) 

557 name_prefix: str = Field( 

558 title="Slurm Job Name prefix", 

559 description="Prefix for the slurm job name. It is prefixed to the SlurmJobConfiguration name by HiPeRTA.", 

560 examples=["@{tel_id}_§{processIndex}_"], 

561 ) 

562 dependencies: Union[None, List[str]] = Field( 

563 default=None, 

564 title="Job Dependencies", 

565 description="Pipeline stage name that are dependencies of this job", 

566 examples=["dl1reorg", "dl1dl2"], 

567 ) 

568 

569 

570class SlurmPipelineConfiguration(BaseModel, extra="allow"): 

571 """Pipeline Definition""" 

572 

573 slurm_account: str = Field( 

574 title="Slurm account", 

575 description="Slurm account to use to submit jobs", 

576 examples=["ctao-n-acada"], 

577 ) 

578 slurm_queue: str = Field( 

579 title="Slurm queue", 

580 description="Slurm queue to use when submitting jobs", 

581 examples=["short"], 

582 ) 

583 slurm_reservation: str = Field( 

584 title="Slurm reservation", 

585 description="Slurm reservation to use when submitting jobs", 

586 examples=["acada_daily"], 

587 ) 

588 

589 slurm_jobs: Dict[str, SlurmJobConfiguration] = Field( 

590 title="Slurm Pipeline", 

591 description="Pipeline description: stage_name: Slurm job", 

592 ) 

593 output_directories: Dict[str, str] = Field( 

594 title="Pipeline's steps output directories", 

595 description="The directories where the pipeline jobs' output should be saved. " 

596 "Note: DL1, DL2 and DL3 mandatory directories are specified in observation parameters. " 

597 "This dictionary exists to allow to add other directories if required by pipeline steps.", 

598 examples=[{}], 

599 ) 

600 log_directories: Dict[str, str] = Field( 

601 title="Pipeline's steps log directories", 

602 description="The directories where the pipeline jobs' logs should be saved.", 

603 examples=[ 

604 { 

605 "dl1_alt_az_log_dir": "@{log_dir}/dl1_alt_az", 

606 "dl1_reorg_log_dir": "@{log_dir}/dl1_reorg", 

607 "dl1_dl2_log_dir": "@{log_dir}/dl1_dl2", 

608 "dl2_dl3_log_dir": "@{log_dir}/dl2_dl3", 

609 } 

610 ], 

611 ) 

612 

613 

614class ReconstructionManagerConfiguration(BaseModel, extra="allow"): 

615 """Complete SAG-Reconstruction configuration""" 

616 

617 logging_level: LOGGING_LEVELS = Field( 

618 title="Logging level", 

619 description="Set the logging level of the reco-manager", 

620 examples=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], 

621 ) 

622 no_slurm: bool = Field( 

623 title="Deactivate slurm", 

624 description="If True, sub-systems programs will be started with as subprocesses instead of slurm jobs", 

625 examples=[False], 

626 ) 

627 observation_config: ObservationParameters = Field( 

628 title="Observation Parameters", 

629 description="Configuration of the observation parameters", 

630 ) 

631 dl1_dl3_params: DL1toDL3PipelineParameters = Field( 

632 title="DL1->DL3 Configuration", description="DL1->DL3 Pipeline parameters" 

633 ) 

634 reco_model_catalog: ReconstructionModelCatalog = Field( 

635 title="Reconstruction Model Catalog", 

636 description="Description of the available reconstruction models", 

637 ) 

638 r0_dl1_params: R0toDL1PipelineParameters = Field( 

639 title="R0->DL1 Pipeline Parameters", 

640 description="Parameters of the R0->DL1 programs", 

641 ) 

642 protobuf_parsing_conf: DataStreamParsingConfiguration = Field( 

643 title="ProtocolBuffer Message Parsing Configuration", 

644 description="Parameters used to parse the stream's protocol buffers messages", 

645 ) 

646 pipeline_config: SlurmPipelineConfiguration = Field( 

647 title="Slurm Pipeline Configuration", 

648 description="DL1->DL3 slurm jobs configuration.", 

649 ) 

650 

651 

652def write_yaml_config( 

653 yaml_config_filename, reco_manager_config, r0_dl1_training_config 

654): 

655 """Dump the configuration into a yaml file to use with SAG-Reconstruction""" 

656 yaml_dict = { 

657 k: v 

658 for config in [ 

659 { 

660 "logging_level": reco_manager_config.logging_level.name, 

661 "no_slurm": reco_manager_config.no_slurm, 

662 }, 

663 reco_manager_config.observation_config.model_dump(exclude_none=True), 

664 reco_manager_config.dl1_dl3_params.model_dump(exclude_none=True), 

665 reco_manager_config.reco_model_catalog.model_dump(exclude_none=True), 

666 r0_dl1_training_config, 

667 reco_manager_config.r0_dl1_params.model_dump(exclude_none=True), 

668 reco_manager_config.protobuf_parsing_conf.model_dump( 

669 exclude_none=True, exclude_unset=True 

670 ), 

671 reco_manager_config.pipeline_config.model_dump(exclude_none=True), 

672 ] 

673 for k, v in config.items() 

674 } 

675 with open(yaml_config_filename, "w") as yaml_config_file: 

676 yaml = YAML() 

677 yaml.indent(mapping=4, sequence=4, offset=2) 

678 yaml.width = 2**16 # very large to not wrap lines 

679 yaml.allow_unicode = True 

680 yaml.sort_keys = False 

681 yaml.dump(yaml_dict, yaml_config_file) 

682 

683 

684def _delete_extra(model): 

685 """Deletes all extra fields from a pydantic BaseModel. 

686 

687 This is usefull to dump a model without the extra parameters, when the model accepts the extra. 

688 Required until https://github.com/pydantic/pydantic/issues/6150 or equivalent is implemented. 

689 """ 

690 model.__pydantic_extra__ = {} 

691 for _, field in model: 

692 if isinstance(field, BaseModel): 

693 _delete_extra(field)