Coverage for mfutil/cli.py: 32%

192 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-18 16:04 +0000

1# -*- coding: utf-8 -*- 

2"""Utility functions to build CLI (Python >= 3.6 only).""" 

3 

4from __future__ import print_function 

5import six 

6import sys 

7try: 

8 from rich.progress import Progress, BarColumn, ProgressColumn, \ 

9 TimeRemainingColumn 

10 from rich.text import Text 

11 from rich.bar import Bar 

12 from rich.table import Table 

13 from rich.console import Console 

14except ImportError: 

15 class Dummy(): 

16 pass 

17 TimeRemainingColumn = Dummy 

18 BarColumn = Dummy 

19 ProgressColumn = Dummy 

20 Progress = Dummy 

21 Bar = Dummy 

22 

23from mfutil.exc import MFUtilException 

24 

25__pdoc__ = { 

26 "MFTimeRemainingColumn": False, 

27 "StateColumn": False 

28} 

29 

30_STDOUT_CONSOLE = None 

31_STDERR_CONSOLE = None 

32 

33 

34def _get_console(**kwargs): 

35 global _STDOUT_CONSOLE, _STDERR_CONSOLE 

36 if len(kwargs) > 1 or \ 

37 (len(kwargs) == 1 and 

38 (kwargs.get('file', None) not in (sys.stdout, sys.stderr))): 

39 return Console(**kwargs) 

40 file = kwargs.get('file', None) 

41 if file == sys.stdout: 

42 if _STDOUT_CONSOLE is None: 

43 _STDOUT_CONSOLE = Console(**kwargs) 

44 return _STDOUT_CONSOLE 

45 elif file == sys.stderr: 

46 if _STDERR_CONSOLE is None: 

47 _STDERR_CONSOLE = Console(**kwargs) 

48 return _STDERR_CONSOLE 

49 return Console(**kwargs) 

50 

51 

52def _is_interactive(f): 

53 c = _get_console(file=f) 

54 return c.is_terminal 

55 

56 

57def is_interactive(target=None): 

58 """Return True if we are in an interactive terminal. 

59 

60 Args: 

61 target (string): can be None (for stdout AND stderr checking), 

62 "stdout" (for stdout checking only or "stderr" (for stderr checking 

63 only). 

64 

65 Returns: 

66 boolean (True (interactive) or False (non-interactive). 

67 

68 Raises: 

69 MFUtilException: if target is invalid 

70 

71 """ 

72 if target is None: 

73 return _is_interactive(sys.stdout) and _is_interactive(sys.stderr) 

74 elif target == "stdout": 

75 return _is_interactive(sys.stdout) 

76 elif target == "stderr": 

77 return _is_interactive(sys.stderr) 

78 else: 

79 raise MFUtilException("invalid target parameter: %s" % target) 

80 

81 

82def echo_ok(message=""): 

83 """Write [OK] with colors if supported a little optional message. 

84 

85 Args: 

86 message (string): little optional message. 

87 

88 """ 

89 if is_interactive("stdout"): 

90 echo_clean() 

91 print("\033[60G[ \033[32mOK\033[0;0m ] %s%s" % ( 

92 message, " " * (13 - len(message))) 

93 ) 

94 else: 

95 print(" [ OK ] %s" % message) 

96 

97 

98def echo_nok(message=""): 

99 """Write [ERROR] with colors if supported a little optional message. 

100 

101 Args: 

102 message (string): little optional message. 

103 

104 """ 

105 if is_interactive("stdout"): 

106 echo_clean() 

107 print("\033[60G[ \033[31mERROR\033[0;0m ] %s%s" % ( 

108 message, " " * (10 - len(message))) 

109 ) 

110 else: 

111 print(" [ ERROR ] %s" % message) 

112 

113 

114def echo_warning(message=""): 

115 """Write [WARNING] with colors if supported a little optional message. 

116 

117 Args: 

118 message (string): little optional message. 

119 

120 """ 

121 if is_interactive("stdout"): 

122 echo_clean() 

123 print("\033[60G[ \033[33mWARNING\033[0;0m ] %s" % message) 

124 else: 

125 print("[ WARNING ] %s" % message) 

126 

127 

128def echo_bold(message): 

129 """Write a message in bold (if supported). 

130 

131 Args: 

132 message (string): message to write in bold. 

133 

134 """ 

135 if is_interactive("stdout"): 

136 print("\033[1m%s\033[0m" % message) 

137 else: 

138 print(message) 

139 

140 

141def echo_running(message=None): 

142 """Write [RUNNING] with colors if supported. 

143 

144 You can pass an optional message which will be rendered before [RUNNING] 

145 on the same line. 

146 

147 Args: 

148 message (string): little optional message. 

149 

150 """ 

151 if message is not None: 

152 if is_interactive("stdout"): 

153 if six.PY2: 

154 print(message, end="") 

155 sys.stdout.flush() 

156 else: 

157 print(message, end="", flush=True) 

158 else: 

159 print(message, end="") 

160 if is_interactive("stdout"): 

161 echo_clean() 

162 print("\033[60G[ \033[33mRUNNING\033[0;0m ]", end="") 

163 

164 

165def echo_clean(): 

166 """Clean waiting status.""" 

167 if is_interactive("stdout"): 

168 print("\033[60G[ \033 ", end="") 

169 

170 

171class StateColumn(ProgressColumn): 

172 

173 def render(self, task): 

174 if task.finished: 

175 extra = task.fields.get('status_extra', '') 

176 if len(extra) > 0: 

177 extra = " (%s)" % extra 

178 if task.fields.get('status', 'OK') == 'ERROR': 

179 return Text.assemble(Text("[ "), 

180 Text("ERROR", style="red"), 

181 Text(" ]%s" % extra)) 

182 elif task.fields.get('status', 'OK') == 'WARNING': 

183 return Text.assemble(Text("[ "), 

184 Text("WARNING", style="yellow"), 

185 Text(" ]%s" % extra)) 

186 else: 

187 return Text.assemble(Text("[ "), 

188 Text("OK", style="green"), 

189 Text(" ]%s" % extra)) 

190 else: 

191 return Text.assemble(Text("[ "), 

192 Text("RUNNING", style="yellow"), 

193 Text(" ]")) 

194 

195 

196class MFTimeRemainingColumn(TimeRemainingColumn): 

197 

198 def render(self, task): 

199 if task.finished: 

200 return Text("") 

201 else: 

202 return TimeRemainingColumn.render(self, task) 

203 

204 

205class MFProgress(Progress): 

206 """[Rich Progress](https://rich.readthedocs.io/en/latest/progress.html) child class. 

207 

208 This class add three features to the original one: 

209 

210 - support (basic) rendering in non-terminal 

211 - task status management 

212 - different default columns setup (but you can override this) 

213 - rendering on stdout by default (but you can change this by providing 

214 a custom Console object) 

215 

216 You can use it exactly like the original [Progress class](https://rich.readthedocs.io/en/latest/reference/progress.html): 

217 

218 ```python 

219 import time 

220 from mfutil.cli import MFProgress 

221 

222 with MFProgress() as progress: 

223 t1 = p.add_task("Foo task") 

224 t2 = p.add_task("Foo task") 

225 while not progress.finished: 

226 progress.update(t1, advance=10) 

227 progress.update(t2, advance=10) 

228 time.sleep(1) 

229 ``` 

230 

231 For status management: 

232 

233 - if you leave MFProgress context manager, not finished tasks are 

234 automatically set to `ERROR` state, finished tasks are automatically 

235 set to `OK` state 

236 - you have 3 new methods to manually override this behaviour: 

237 (complete_task(), complete_task_nok(), complete_task_warning()) 

238 

239 Example: 

240 

241 ```python 

242 import time 

243 from mfutil.cli import MFProgress 

244 

245 with MFProgress() as progress: 

246 t1 = p.add_task("Foo task") 

247 t2 = p.add_task("Foo task") 

248 i = 0 

249 while not progress.finished: 

250 progress.update(t1, advance=10) 

251 if i < 5: 

252 progress.update(t2, advance=10) 

253 elif i == 5: 

254 progress.complete_task_failed(t2, "unknown error") 

255 time.sleep(1) 

256 i = i + 1 

257 ``` 

258 

259 """ 

260 def __init__(self, *args, **kwargs): 

261 console = kwargs.get("console", None) 

262 if not console: 

263 self._interactive = _is_interactive(sys.stdout) 

264 kwargs["console"] = Console() 

265 else: 

266 self._interactive = _is_interactive(console.file) 

267 if len(args) == 0: 

268 columns = ["[progress.description]{task.description}", 

269 BarColumn(12), 

270 StateColumn(), 

271 MFTimeRemainingColumn()] 

272 self.mfprogress_columns = True 

273 else: 

274 columns = args 

275 self.mfprogress_columns = False 

276 Progress.__init__(self, *columns, **kwargs) 

277 

278 def make_tasks_table(self, tasks): 

279 if self.mfprogress_columns: 

280 return self._mfprogress_make_tasks_table(tasks) 

281 else: 

282 return MFProgress.make_tasks_table(self, tasks) 

283 

284 def _mfprogress_make_tasks_table(self, tasks): 

285 """Get a table to render the Progress display.""" 

286 table = Table.grid() 

287 table.pad_edge = True 

288 table.padding = (0, 1, 0, 0) 

289 try: 

290 finished = self.finished 

291 except Exception: 

292 finished = False 

293 if finished: 

294 table.add_column(width=57, no_wrap=True) 

295 table.add_column(width=0) 

296 table.add_column() 

297 table.add_column() 

298 else: 

299 table.add_column(width=45, no_wrap=True) 

300 table.add_column() 

301 table.add_column() 

302 table.add_column() 

303 for task in tasks: 

304 if task.visible: 

305 row = [] 

306 append = row.append 

307 for index, column in enumerate(self.columns): 

308 if isinstance(column, str): 

309 txt = column.format(task=task) 

310 if finished: 

311 if len(txt) > 79: 

312 txt = txt[0:74] + "[[...]]" 

313 else: 

314 if len(txt) > 67: 

315 txt = txt[0:62] + "[[...]]" 

316 append(txt) 

317 table.columns[index].no_wrap = True 

318 else: 

319 widget = column(task) 

320 append(widget) 

321 if isinstance(widget, (str, Text)): 

322 table.columns[index].no_wrap = True 

323 table.add_row(*row) 

324 return table 

325 

326 def __exit__(self, *args, **kwargs): 

327 with self._lock: 

328 for tid, task in self._tasks.items(): 

329 if not task.finished: 

330 self.complete_task_nok(tid) 

331 Progress.__exit__(self, *args, **kwargs) 

332 

333 def start(self, *args, **kwargs): 

334 if self._interactive: 

335 return Progress.start(self, *args, **kwargs) 

336 

337 def __complete_task_nok_nolock(self, task_id, status_extra=""): 

338 task = self._tasks[task_id] 

339 self.update(task_id, completed=task.total, status="ERROR", 

340 refresh=False, status_extra=status_extra) 

341 

342 def complete_task_nok(self, task_id, status_extra=""): 

343 """Complete a task with ERROR status. 

344 

345 Args: 

346 task_id (TaskID): a task ID. 

347 status_extra (str): a little extra message to add. 

348 

349 """ 

350 with self._lock: 

351 self.__complete_task_nok_nolock(task_id, status_extra=status_extra) 

352 

353 def complete_task_warning(self, task_id, status_extra=""): 

354 """Complete a task with WARNING status. 

355 

356 Args: 

357 task_id (TaskID): a task ID. 

358 status_extra (str): a little extra message to add. 

359 

360 """ 

361 with self._lock: 

362 task = self._tasks[task_id] 

363 self.update(task_id, completed=task.total, status="WARNING", 

364 refresh=False, status_extra=status_extra) 

365 

366 def complete_task(self, task_id, status_extra=""): 

367 """Complete a task with OK status. 

368 

369 Args: 

370 task_id (TaskID): a task ID. 

371 status_extra (str): a little extra message to add. 

372 

373 """ 

374 with self._lock: 

375 task = self._tasks[task_id] 

376 self.update(task_id, completed=task.total, status="OK", 

377 refresh=False, status_extra=status_extra) 

378 

379 def stop(self): 

380 if self._interactive: 

381 return Progress.stop(self) 

382 else: 

383 with self._lock: 

384 for tid, task in self._tasks.items(): 

385 status = "" 

386 extra = task.fields.get('status_extra', '') 

387 if len(extra) > 0: 

388 extra = " (%s)" % extra 

389 if task.finished: 

390 if task.fields.get('status', 'OK') == 'OK': 

391 status = "OK" 

392 elif task.fields.get('status', 'OK') == 'WARNING': 

393 status = "WARNING" 

394 else: 

395 status = "ERROR" 

396 print("%s [ %s ]%s" % (task.description, status, extra), 

397 file=self.console.file) 

398 

399 def refresh(self, *args, **kwargs): 

400 if self._interactive: 

401 return Progress.refresh(self, *args, **kwargs)