Coverage for hiperta_stream/hiperta_stream_start.py: 29%

82 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-07-16 10:16 +0000

1# P. Aubert & E. Garcia - Nov 2020, lock-down Chap. II 

2 

3 

4import argparse 

5import logging 

6import pathlib 

7import shlex 

8import subprocess as sp 

9from collections import namedtuple 

10from pathlib import Path 

11 

12from hiperta_stream.config.configuration import ( 

13 DL1toDL3PipelineParameters, 

14 ObservationParameters, 

15 ReconstructionManagerConfiguration, 

16 _delete_extra, 

17 write_yaml_config, 

18) 

19from hiperta_stream.utils.file_handling_utils import copy_file_into_dir 

20from hiperta_stream.utils.logging_utils import init_logging 

21from hiperta_stream.utils.subprocess_utils import ( 

22 subprocess_run_and_raise_exception_on_error, 

23) 

24from hiperta_stream.utils.substring_substitution_utils import ( 

25 dict_defined_substring_substitution, 

26) 

27from ruamel.yaml import YAML 

28 

29 

30def submit_r0_dl1_job( 

31 config, 

32 config_paths, 

33 process_idx, 

34): 

35 """Submit the r0dl1 jobs to slurm. 

36 

37 Notes 

38 ----- 

39 The R0->DL1 job is submitted on the `process_idx % total_nb_nodes` node. 

40 

41 Parameters 

42 ---------- 

43 config : hiperta_stream.config.configuration.ReconstructionManagerConfiguration 

44 Reconstruction manager configuration 

45 config_paths : dict 

46 Dictionary containing the paths to the written configuration for the processing pipeline steps 

47 process_idx : int 

48 Index of the R0->DL1 job (eg when using 4 lines: 0, 1, 2 or 3)} 

49 """ 

50 

51 r0_dl1_log_filename = Path( 

52 config.r0_dl1_params.r0_dl1_log_dir, 

53 config.r0_dl1_params.r0_dl1_log_file.replace("§{processIndex}", str(process_idx)), 

54 ).absolute() 

55 r0_dl1_job_name = config.r0_dl1_params.r0_dl1_job_name.replace("§{processIndex}", str(process_idx)) 

56 # flattened list of all available nodes and their reservation 

57 NodeInfo = namedtuple("NodeInfo", ["nodename", "reservation"]) 

58 slurm_nodes_info = [ 

59 NodeInfo(node, reservation) 

60 for reservation, reservation_nodelist in config.observation_config.slurm_nodelists.items() 

61 for node in reservation_nodelist 

62 ] 

63 

64 r0dl1_cmd = " ".join( 

65 [ 

66 "hiperta_stream_r0_dl1", 

67 "--input", 

68 config.observation_config.data_stream_connections[process_idx].hostname, 

69 "--port", 

70 str(config.observation_config.data_stream_connections[process_idx].port), 

71 "--output", 

72 str(Path(config.observation_config.dl1_dir, config.r0_dl1_params.dl1_filename).absolute()), 

73 "--confighdf5", 

74 str(Path(config.r0_dl1_params.base_structure_file_path).absolute()), 

75 "--outputFileIndexOffset", 

76 str(0), 

77 "--nbeventperhdf5", 

78 str(config.r0_dl1_params.nb_event_per_hdf5), 

79 "--nbmaxmessage", 

80 str(config.r0_dl1_params.nbMaxMessage), 

81 "--configcut", 

82 str(Path(config_paths["r0_dl1_yaml_config_path"]).absolute()), 

83 "--totalNbProcess", 

84 str(len(config.observation_config.data_stream_connections)), 

85 "--processindex", 

86 str(process_idx), 

87 "--nbthread", 

88 str(config.r0_dl1_params.nb_processing_threads), 

89 "--nbeventperblock", 

90 str(config.r0_dl1_params.nb_event_per_block), 

91 "--rtalog", 

92 str(r0_dl1_log_filename), 

93 "--loglevel", 

94 config.logging_level.name, 

95 "--zfits" if config.r0_dl1_params.is_stream_read_zfit else "", 

96 "--pedperslice" if config.r0_dl1_params.is_stream_real_data else "", 

97 config.r0_dl1_params.hiperta_extra_args, 

98 "--verbose" if config.r0_dl1_params.debug_parameters.verbose else "", 

99 ] 

100 ) 

101 

102 sbatch_cmd = " ".join( 

103 [ 

104 "sbatch", 

105 "--parsable", # sbatch will only return the job ID 

106 "--mem={}G".format(config.r0_dl1_params.r0_dl1_mem), 

107 "-o", 

108 str(r0_dl1_log_filename.with_suffix(".o").absolute()), 

109 "-e", 

110 str(r0_dl1_log_filename.with_suffix(".e").absolute()), 

111 *(["-A", config.pipeline_config.slurm_account] if config.pipeline_config.slurm_account else [""]), 

112 *(["-p", config.pipeline_config.slurm_queue] if config.pipeline_config.slurm_queue else [""]), 

113 *( 

114 ["--reservation", slurm_nodes_info[process_idx % len(slurm_nodes_info)].reservation] 

115 if config.pipeline_config.slurm_reservation 

116 else [""] 

117 ), 

118 "--ntasks={}".format(config.r0_dl1_params.nb_tasks), 

119 "--nodelist={}".format(slurm_nodes_info[process_idx % len(slurm_nodes_info)].nodename), 

120 "-J", 

121 r0_dl1_job_name, 

122 '--wrap="{}"'.format(r0dl1_cmd), 

123 ] 

124 ) 

125 

126 if config.no_slurm: 

127 logging.info(f"Running in {config.observation_config.log_dir}: gdb --args {r0dl1_cmd}") 

128 sp.run(["gdb", "--args", *shlex.split(r0dl1_cmd)], cwd=str(config.observation_config.log_dir)) 

129 else: 

130 logging.info(f"Submitting R0->DL1 job: {sbatch_cmd}") 

131 job_id = subprocess_run_and_raise_exception_on_error( 

132 shlex.split(sbatch_cmd), 

133 "R0->DL1 job submission successful!", 

134 "R0->DL1 job submission failed!", 

135 error_level=logging.CRITICAL, 

136 ).stdout.strip() 

137 logging.info(f"R0->DL1 job id: {job_id}") 

138 

139 

140def write_configuration(config, r0_dl1_training_config): 

141 """Write or copy all configuration in log directory. 

142 

143 Parameters 

144 ---------- 

145 config : hiperta_stream.config.configuration.ReconstructionManagerConfiguration 

146 Reconstruction manager configuration 

147 r0_dl1_training_config : dict 

148 R0->DL1 training parameters 

149 

150 Returns 

151 ------- 

152 dict 

153 Paths to the written configurations. 

154 """ 

155 

156 log_dir = Path(config.observation_config.log_dir) 

157 

158 # log complete json configuration 

159 with open(log_dir / "hiperta_stream_start_config.json", "w") as json_conf_file: 

160 json_conf_file.write(config.model_dump_json(indent=4)) 

161 logging.info("Logging json configuration at {}".format(log_dir / "hiperta_stream_start_config.json")) 

162 

163 # Write yaml configuration (pydantic extra removed) for HiPeRTA 

164 minimal_config = config.model_copy(deep=True) 

165 _delete_extra(minimal_config) 

166 yaml_config_path = log_dir / "hiperta_stream_configuration.yml" 

167 write_yaml_config(yaml_config_path, minimal_config, r0_dl1_training_config) 

168 logging.info("Using R0->DL1 yaml configuration at {}".format(yaml_config_path)) 

169 

170 # Copy lstchain configuration for DL1->DL3 steps 

171 lstchain_dl1_dl2_config_path = copy_file_into_dir(config.dl1_dl3_params.lstchain_dl1_dl2_config_file, log_dir) 

172 logging.info("Using DL1->DL2 json configuration at {}".format(lstchain_dl1_dl2_config_path)) 

173 lstchain_dl2_dl3_config_path = copy_file_into_dir(config.dl1_dl3_params.lstchain_dl2_dl3_config_file, log_dir) 

174 logging.info("Using DL2->DL3 json configuration at {}".format(lstchain_dl2_dl3_config_path)) 

175 

176 # Copy base structure file 

177 base_structure_path = copy_file_into_dir(config.r0_dl1_params.base_structure_file_path, log_dir) 

178 logging.info("Using base structure file at {}".format(base_structure_path)) 

179 

180 return { 

181 "r0_dl1_yaml_config_path": yaml_config_path, 

182 "lstchain_dl1_dl2_config_path": lstchain_dl1_dl2_config_path, 

183 "lstchain_dl2_dl3_config_path": lstchain_dl2_dl3_config_path, 

184 "base_structure_path": base_structure_path, 

185 } 

186 

187 

188def create_pipeline_output_and_log_directories(config): 

189 """Create the output directories as defined in config.""" 

190 

191 dirs_to_create = ( 

192 list(config.pipeline_config.log_directories.values()) 

193 + list(config.pipeline_config.output_directories.values()) 

194 # r0_dl1 log is not part of pipeline config as r0_dl1 submits the slurm pipeline 

195 + [config.r0_dl1_params.r0_dl1_log_dir] 

196 ) 

197 for dir_path in dirs_to_create: 

198 Path(dir_path).mkdir(parents=True, exist_ok=True) 

199 logging.info("Created directory {}".format(dir_path)) 

200 

201 

202def select_observation_model(config): 

203 """Chose the reconstruction model to use for the observation. 

204 

205 Warnings 

206 -------- 

207 This function choses the first model available in the model catalog. Smarter decision will 

208 be implemented in a future release. 

209 

210 Parameters 

211 ---------- 

212 config : dict 

213 Complete configuration dictionary. Its "dl1_dl3_params" field will be updated with the 

214 chosen model parameters. 

215 

216 Returns 

217 ------- 

218 dl1_to_dl3_pipeline_params : dict 

219 Dictionary containing the path to models and configuration expected by the slurm pipeline part of the config 

220 r0_dl1_training_config : dict 

221 R0 DL1 configuration dict during the model training. 

222 """ 

223 

224 base_path = pathlib.Path(config.reco_model_catalog.archives_base_path) 

225 chosen_model = config.reco_model_catalog.archives[0] 

226 chosen_model_folder = base_path / chosen_model.relative_path 

227 chosen_model_r0_dl1_config = chosen_model_folder / chosen_model.r0_dl1_config 

228 chosen_model_dl1_dl2_config = chosen_model_folder / chosen_model.dl1_dl2_config 

229 chosen_model_dl2_dl3 = chosen_model.dl2_dl3_archives[0] 

230 chosen_dl2_dl3_model_folder = chosen_model_folder / chosen_model_dl2_dl3.relative_path 

231 chosen_model_dl2_dl3_config = chosen_dl2_dl3_model_folder / chosen_model_dl2_dl3.dl2_dl3_config 

232 chosen_model_irf = chosen_dl2_dl3_model_folder / chosen_model_dl2_dl3.irf_file 

233 

234 dl1_to_dl3_pipeline_params = DL1toDL3PipelineParameters.model_validate( 

235 { 

236 "reco_random_forests_dir": str(chosen_model_folder), 

237 "lstchain_dl1_dl2_config_file": str(chosen_model_dl1_dl2_config), 

238 "lstchain_dl2_dl3_config_file": str(chosen_model_dl2_dl3_config), 

239 "irf_file": str(chosen_model_irf), 

240 } 

241 ) 

242 with open(chosen_model_r0_dl1_config, "r") as r0_dl1_config_file: 

243 yaml = YAML(typ="safe") 

244 r0_dl1_training_config = yaml.load(r0_dl1_config_file) 

245 

246 return dl1_to_dl3_pipeline_params, r0_dl1_training_config 

247 

248 

249def main(): 

250 init_logging(log_filename="hiperta_stream_start.log") 

251 

252 parser = argparse.ArgumentParser( 

253 description="Launches the SAG reconstruction pipeline slurm jobs: " 

254 "permanent r0dl1 jobs, and intermittent dl1dl2 and dl2dl3 jobs." 

255 ) 

256 parser.add_argument( 

257 "--static_config", 

258 "-c", 

259 action="store", 

260 type=str, 

261 dest="static_config", 

262 help="Path to a JSON static configuration file.", 

263 required=True, 

264 ) 

265 parser.add_argument( 

266 "--observation_config", 

267 "-d", 

268 action="store", 

269 type=str, 

270 dest="obs_config", 

271 help="Path to the observation JSON configuration file.", 

272 required=True, 

273 ) 

274 args = parser.parse_args() 

275 

276 # Read static and observation configuration 

277 with open(args.static_config, "r") as static_config_f, open(args.obs_config, "r") as obs_config_f: 

278 config = ReconstructionManagerConfiguration.model_validate_json(static_config_f.read()) 

279 config.observation_config = ObservationParameters.model_validate_json(obs_config_f.read()) 

280 

281 # Re-init logging to write into specified log file. 

282 init_logging(config.logging_level.name, config.observation_config.reco_manager_log_file) 

283 

284 # Chose the reconstruction model to use based on the observation parameters 

285 # and update the corresponding paths in config. 

286 config.dl1_dl3_params, r0_dl1_training_config = select_observation_model(config) 

287 

288 # Perform bash variable substitution in config strings. 

289 config = ReconstructionManagerConfiguration.model_validate( 

290 dict_defined_substring_substitution(config.model_dump()) 

291 ) 

292 

293 create_pipeline_output_and_log_directories(config) 

294 

295 # Write and log all configuration 

296 config_paths = write_configuration(config, r0_dl1_training_config) 

297 

298 # Launch R0->DL1 jobs 

299 for process_idx in range(len(config.observation_config.data_stream_connections)): 

300 submit_r0_dl1_job( 

301 config, 

302 config_paths, 

303 process_idx, 

304 ) 

305 

306 logging.info("Done starting R0->DL1 jobs.") 

307 

308 

309if __name__ == "__main__": 

310 main()