Coverage for mfutil/cli.py: 31%

191 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-13 15:33 +0000

1# -*- coding: utf-8 -*- 

2"""Utility functions to build CLI (Python >= 3.6 only).""" 

3 

4from __future__ import print_function 

5import six 

6import sys 

7try: 

8 from rich.progress import Progress, BarColumn, ProgressColumn, \ 

9 TimeRemainingColumn 

10 from rich.text import Text 

11 from rich.bar import Bar 

12 from rich.table import Table 

13 from rich.console import Console 

14except ImportError: 

15 class Dummy(): 

16 pass 

17 TimeRemainingColumn = Dummy 

18 BarColumn = Dummy 

19 ProgressColumn = Dummy 

20 Progress = Dummy 

21 Bar = Dummy 

22 

23from mfutil.exc import MFUtilException 

24 

25__pdoc__ = { 

26 "MFTimeRemainingColumn": False, 

27 "StateColumn": False 

28} 

29 

30_STDOUT_CONSOLE = None 

31_STDERR_CONSOLE = None 

32 

33 

34def _get_console(**kwargs): 

35 global _STDOUT_CONSOLE, _STDERR_CONSOLE 

36 if len(kwargs) > 1 or \ 

37 (len(kwargs) == 1 and 

38 (kwargs.get('file', None) not in (sys.stdout, sys.stderr))): 

39 return Console(**kwargs) 

40 file = kwargs.get('file', None) 

41 if file == sys.stdout: 

42 if _STDOUT_CONSOLE is None: 

43 _STDOUT_CONSOLE = Console(**kwargs) 

44 return _STDOUT_CONSOLE 

45 elif file == sys.stderr: 

46 if _STDERR_CONSOLE is None: 

47 _STDERR_CONSOLE = Console(**kwargs) 

48 return _STDERR_CONSOLE 

49 return Console(**kwargs) 

50 

51 

52def _is_interactive(f): 

53 c = _get_console(file=f) 

54 return c.is_terminal 

55 

56 

57def is_interactive(target=None): 

58 """Return True if we are in an interactive terminal. 

59 

60 Args: 

61 target (string): can be None (for stdout AND stderr checking), 

62 "stdout" (for stdout checking only or "stderr" (for stderr checking 

63 only). 

64 

65 Returns: 

66 boolean (True (interactive) or False (non-interactive). 

67 

68 Raises: 

69 MFUtilException: if target is invalid 

70 

71 """ 

72 if target is None: 

73 return _is_interactive(sys.stdout) and _is_interactive(sys.stderr) 

74 elif target == "stdout": 

75 return _is_interactive(sys.stdout) 

76 elif target == "stderr": 

77 return _is_interactive(sys.stderr) 

78 else: 

79 raise MFUtilException("invalid target parameter: %s" % target) 

80 

81 

82def echo_ok(message=""): 

83 """Write [OK] with colors if supported a little optional message. 

84 

85 Args: 

86 message (string): little optional message. 

87 

88 """ 

89 if is_interactive("stdout"): 

90 echo_clean() 

91 print("\033[60G[ \033[32mOK\033[0;0m ] %s%s" % ( 

92 message, " " * (13 - len(message))) 

93 ) 

94 else: 

95 print(" [ OK ] %s" % message) 

96 

97 

98def echo_nok(message=""): 

99 """Write [ERROR] with colors if supported a little optional message. 

100 

101 Args: 

102 message (string): little optional message. 

103 

104 """ 

105 if is_interactive("stdout"): 

106 echo_clean() 

107 print("\033[60G[ \033[31mERROR\033[0;0m ] %s%s" % ( 

108 message, " " * (10 - len(message))) 

109 ) 

110 else: 

111 print(" [ ERROR ] %s" % message) 

112 

113 

114def echo_warning(message=""): 

115 """Write [WARNING] with colors if supported a little optional message. 

116 

117 Args: 

118 message (string): little optional message. 

119 

120 """ 

121 if is_interactive("stdout"): 

122 echo_clean() 

123 print("\033[60G[ \033[33mWARNING\033[0;0m ] %s" % message) 

124 else: 

125 print("[ WARNING ] %s" % message) 

126 

127 

128def echo_bold(message): 

129 """Write a message in bold (if supported). 

130 

131 Args: 

132 message (string): message to write in bold. 

133 

134 """ 

135 if is_interactive("stdout"): 

136 print("\033[1m%s\033[0m" % message) 

137 else: 

138 print(message) 

139 

140 

141def echo_running(message=None): 

142 """Write [RUNNING] with colors if supported. 

143 

144 You can pass an optional message which will be rendered before [RUNNING] 

145 on the same line. 

146 

147 Args: 

148 message (string): little optional message. 

149 

150 """ 

151 if message is not None: 

152 if is_interactive("stdout"): 

153 if six.PY2: 

154 print(message, end="") 

155 sys.stdout.flush() 

156 else: 

157 print(message, end="", flush=True) 

158 else: 

159 print(message, end="") 

160 if is_interactive("stdout"): 

161 echo_clean() 

162 print("\033[60G[ \033[33mRUNNING\033[0;0m ]", end="") 

163 

164 

165def echo_clean(): 

166 """Clean waiting status.""" 

167 if is_interactive("stdout"): 

168 print("\033[60G[ \033 ", end="") 

169 

170 

171class StateColumn(ProgressColumn): 

172 

173 def render(self, task): 

174 if task.finished: 

175 extra = task.fields.get('status_extra', '') 

176 if len(extra) > 0: 

177 extra = " (%s)" % extra 

178 if task.fields.get('status', 'OK') == 'ERROR': 

179 return Text.assemble(Text("[ "), 

180 Text("ERROR", style="red"), 

181 Text(" ]%s" % extra)) 

182 elif task.fields.get('status', 'OK') == 'WARNING': 

183 return Text.assemble(Text("[ "), 

184 Text("WARNING", style="yellow"), 

185 Text(" ]%s" % extra)) 

186 else: 

187 return Text.assemble(Text("[ "), 

188 Text("OK", style="green"), 

189 Text(" ]%s" % extra)) 

190 else: 

191 return Text.assemble(Text("[ "), 

192 Text("RUNNING", style="yellow"), 

193 Text(" ]")) 

194 

195 

196class MFTimeRemainingColumn(TimeRemainingColumn): 

197 

198 def render(self, task): 

199 if task.finished: 

200 return Text("") 

201 else: 

202 return TimeRemainingColumn.render(self, task) 

203 

204 

205class MFProgress(Progress): 

206 """[Rich Progress] 

207 (https://rich.readthedocs.io/en/latest/progress.html) child class. 

208 

209 This class add three features to the original one: 

210 

211 - support (basic) rendering in non-terminal 

212 - task status management 

213 - different default columns setup (but you can override this) 

214 - rendering on stdout by default (but you can change this by providing 

215 a custom Console object) 

216 

217 You can use it exactly like the original [Progress class] 

218 (https://rich.readthedocs.io/en/latest/reference/progress.html): 

219 

220 ```python 

221 import time 

222 from mfutil.cli import MFProgress 

223 

224 with MFProgress() as progress: 

225 t1 = p.add_task("Foo task") 

226 t2 = p.add_task("Foo task") 

227 while not progress.finished: 

228 progress.update(t1, advance=10) 

229 progress.update(t2, advance=10) 

230 time.sleep(1) 

231 ``` 

232 

233 For status management: 

234 

235 - if you leave MFProgress context manager, not finished tasks are 

236 automatically set to `ERROR` state, finished tasks are automatically 

237 set to `OK` state 

238 - you have 3 new methods to manually override this behaviour: 

239 (complete_task(), complete_task_nok(), complete_task_warning()) 

240 

241 Example: 

242 

243 ```python 

244 import time 

245 from mfutil.cli import MFProgress 

246 

247 with MFProgress() as progress: 

248 t1 = p.add_task("Foo task") 

249 t2 = p.add_task("Foo task") 

250 i = 0 

251 while not progress.finished: 

252 progress.update(t1, advance=10) 

253 if i < 5: 

254 progress.update(t2, advance=10) 

255 elif i == 5: 

256 progress.complete_task_failed(t2, "unknown error") 

257 time.sleep(1) 

258 i = i + 1 

259 ``` 

260 

261 """ 

262 def __init__(self, *args, **kwargs): 

263 console = kwargs.get("console", None) 

264 if not console: 

265 self._interactive = _is_interactive(sys.stdout) 

266 kwargs["console"] = Console() 

267 else: 

268 self._interactive = _is_interactive(console.file) 

269 if len(args) == 0: 

270 columns = ["[progress.description]{task.description}", 

271 BarColumn(12), 

272 StateColumn(), 

273 MFTimeRemainingColumn()] 

274 self.mfprogress_columns = True 

275 else: 

276 columns = args 

277 self.mfprogress_columns = False 

278 Progress.__init__(self, *columns, **kwargs) 

279 

280 def make_tasks_table(self, tasks): 

281 if self.mfprogress_columns: 

282 return self._mfprogress_make_tasks_table(tasks) 

283 else: 

284 return MFProgress.make_tasks_table(self, tasks) 

285 

286 def _mfprogress_make_tasks_table(self, tasks): 

287 """Get a table to render the Progress display.""" 

288 table = Table.grid() 

289 table.pad_edge = True 

290 table.padding = (0, 1, 0, 0) 

291 try: 

292 finished = self.finished 

293 except Exception: 

294 finished = False 

295 if finished: 

296 table.add_column(width=57, no_wrap=True) 

297 table.add_column(width=0) 

298 table.add_column() 

299 table.add_column() 

300 else: 

301 table.add_column(width=45, no_wrap=True) 

302 table.add_column() 

303 table.add_column() 

304 table.add_column() 

305 for task in tasks: 

306 if task.visible: 

307 row = [] 

308 append = row.append 

309 for index, column in enumerate(self.columns): 

310 if isinstance(column, str): 

311 txt = column.format(task=task) 

312 if finished: 

313 if len(txt) > 79: 

314 txt = txt[0:74] + "[[...]]" 

315 else: 

316 if len(txt) > 67: 

317 txt = txt[0:62] + "[[...]]" 

318 append(txt) 

319 table.columns[index].no_wrap = True 

320 else: 

321 widget = column(task) 

322 append(widget) 

323 if isinstance(widget, (str, Text)): 

324 table.columns[index].no_wrap = True 

325 table.add_row(*row) 

326 return table 

327 

328 def __exit__(self, *args, **kwargs): 

329 with self._lock: 

330 for tid, task in self._tasks.items(): 

331 if not task.finished: 

332 self.complete_task_nok(tid) 

333 Progress.__exit__(self, *args, **kwargs) 

334 

335 def start(self, *args, **kwargs): 

336 if self._interactive: 

337 return Progress.start(self, *args, **kwargs) 

338 

339 def __complete_task_nok_nolock(self, task_id, status_extra=""): 

340 task = self._tasks[task_id] 

341 self.update(task_id, completed=task.total, status="ERROR", 

342 refresh=False, status_extra=status_extra) 

343 

344 def complete_task_nok(self, task_id, status_extra=""): 

345 """Complete a task with ERROR status. 

346 

347 Args: 

348 task_id (TaskID): a task ID. 

349 status_extra (str): a little extra message to add. 

350 

351 """ 

352 with self._lock: 

353 self.__complete_task_nok_nolock(task_id, status_extra=status_extra) 

354 

355 def complete_task_warning(self, task_id, status_extra=""): 

356 """Complete a task with WARNING status. 

357 

358 Args: 

359 task_id (TaskID): a task ID. 

360 status_extra (str): a little extra message to add. 

361 

362 """ 

363 with self._lock: 

364 task = self._tasks[task_id] 

365 self.update(task_id, completed=task.total, status="WARNING", 

366 refresh=False, status_extra=status_extra) 

367 

368 def complete_task(self, task_id, status_extra=""): 

369 """Complete a task with OK status. 

370 

371 Args: 

372 task_id (TaskID): a task ID. 

373 status_extra (str): a little extra message to add. 

374 

375 """ 

376 with self._lock: 

377 task = self._tasks[task_id] 

378 self.update(task_id, completed=task.total, status="OK", 

379 refresh=False, status_extra=status_extra) 

380 

381 def stop(self): 

382 if self._interactive: 

383 return Progress.stop(self) 

384 else: 

385 with self._lock: 

386 for tid, task in self._tasks.items(): 

387 status = "" 

388 extra = task.fields.get('status_extra', '') 

389 if len(extra) > 0: 

390 extra = " (%s)" % extra 

391 if task.finished: 

392 if task.fields.get('status', 'OK') == 'OK': 

393 status = "OK" 

394 elif task.fields.get('status', 'OK') == 'WARNING': 

395 status = "WARNING" 

396 else: 

397 status = "ERROR" 

398 print("%s [ %s ]%s" % (task.description, status, extra), 

399 file=self.console.file) 

400 

401 def refresh(self, *args, **kwargs): 

402 if self._interactive: 

403 return Progress.refresh(self, *args, **kwargs)