Coverage for mfutil/cli.py: 32%
192 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 16:04 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 16:04 +0000
1# -*- coding: utf-8 -*-
2"""Utility functions to build CLI (Python >= 3.6 only)."""
4from __future__ import print_function
5import six
6import sys
7try:
8 from rich.progress import Progress, BarColumn, ProgressColumn, \
9 TimeRemainingColumn
10 from rich.text import Text
11 from rich.bar import Bar
12 from rich.table import Table
13 from rich.console import Console
14except ImportError:
15 class Dummy():
16 pass
17 TimeRemainingColumn = Dummy
18 BarColumn = Dummy
19 ProgressColumn = Dummy
20 Progress = Dummy
21 Bar = Dummy
23from mfutil.exc import MFUtilException
25__pdoc__ = {
26 "MFTimeRemainingColumn": False,
27 "StateColumn": False
28}
30_STDOUT_CONSOLE = None
31_STDERR_CONSOLE = None
34def _get_console(**kwargs):
35 global _STDOUT_CONSOLE, _STDERR_CONSOLE
36 if len(kwargs) > 1 or \
37 (len(kwargs) == 1 and
38 (kwargs.get('file', None) not in (sys.stdout, sys.stderr))):
39 return Console(**kwargs)
40 file = kwargs.get('file', None)
41 if file == sys.stdout:
42 if _STDOUT_CONSOLE is None:
43 _STDOUT_CONSOLE = Console(**kwargs)
44 return _STDOUT_CONSOLE
45 elif file == sys.stderr:
46 if _STDERR_CONSOLE is None:
47 _STDERR_CONSOLE = Console(**kwargs)
48 return _STDERR_CONSOLE
49 return Console(**kwargs)
52def _is_interactive(f):
53 c = _get_console(file=f)
54 return c.is_terminal
57def is_interactive(target=None):
58 """Return True if we are in an interactive terminal.
60 Args:
61 target (string): can be None (for stdout AND stderr checking),
62 "stdout" (for stdout checking only or "stderr" (for stderr checking
63 only).
65 Returns:
66 boolean (True (interactive) or False (non-interactive).
68 Raises:
69 MFUtilException: if target is invalid
71 """
72 if target is None:
73 return _is_interactive(sys.stdout) and _is_interactive(sys.stderr)
74 elif target == "stdout":
75 return _is_interactive(sys.stdout)
76 elif target == "stderr":
77 return _is_interactive(sys.stderr)
78 else:
79 raise MFUtilException("invalid target parameter: %s" % target)
82def echo_ok(message=""):
83 """Write [OK] with colors if supported a little optional message.
85 Args:
86 message (string): little optional message.
88 """
89 if is_interactive("stdout"):
90 echo_clean()
91 print("\033[60G[ \033[32mOK\033[0;0m ] %s%s" % (
92 message, " " * (13 - len(message)))
93 )
94 else:
95 print(" [ OK ] %s" % message)
98def echo_nok(message=""):
99 """Write [ERROR] with colors if supported a little optional message.
101 Args:
102 message (string): little optional message.
104 """
105 if is_interactive("stdout"):
106 echo_clean()
107 print("\033[60G[ \033[31mERROR\033[0;0m ] %s%s" % (
108 message, " " * (10 - len(message)))
109 )
110 else:
111 print(" [ ERROR ] %s" % message)
114def echo_warning(message=""):
115 """Write [WARNING] with colors if supported a little optional message.
117 Args:
118 message (string): little optional message.
120 """
121 if is_interactive("stdout"):
122 echo_clean()
123 print("\033[60G[ \033[33mWARNING\033[0;0m ] %s" % message)
124 else:
125 print("[ WARNING ] %s" % message)
128def echo_bold(message):
129 """Write a message in bold (if supported).
131 Args:
132 message (string): message to write in bold.
134 """
135 if is_interactive("stdout"):
136 print("\033[1m%s\033[0m" % message)
137 else:
138 print(message)
141def echo_running(message=None):
142 """Write [RUNNING] with colors if supported.
144 You can pass an optional message which will be rendered before [RUNNING]
145 on the same line.
147 Args:
148 message (string): little optional message.
150 """
151 if message is not None:
152 if is_interactive("stdout"):
153 if six.PY2:
154 print(message, end="")
155 sys.stdout.flush()
156 else:
157 print(message, end="", flush=True)
158 else:
159 print(message, end="")
160 if is_interactive("stdout"):
161 echo_clean()
162 print("\033[60G[ \033[33mRUNNING\033[0;0m ]", end="")
165def echo_clean():
166 """Clean waiting status."""
167 if is_interactive("stdout"):
168 print("\033[60G[ \033 ", end="")
171class StateColumn(ProgressColumn):
173 def render(self, task):
174 if task.finished:
175 extra = task.fields.get('status_extra', '')
176 if len(extra) > 0:
177 extra = " (%s)" % extra
178 if task.fields.get('status', 'OK') == 'ERROR':
179 return Text.assemble(Text("[ "),
180 Text("ERROR", style="red"),
181 Text(" ]%s" % extra))
182 elif task.fields.get('status', 'OK') == 'WARNING':
183 return Text.assemble(Text("[ "),
184 Text("WARNING", style="yellow"),
185 Text(" ]%s" % extra))
186 else:
187 return Text.assemble(Text("[ "),
188 Text("OK", style="green"),
189 Text(" ]%s" % extra))
190 else:
191 return Text.assemble(Text("[ "),
192 Text("RUNNING", style="yellow"),
193 Text(" ]"))
196class MFTimeRemainingColumn(TimeRemainingColumn):
198 def render(self, task):
199 if task.finished:
200 return Text("")
201 else:
202 return TimeRemainingColumn.render(self, task)
205class MFProgress(Progress):
206 """[Rich Progress](https://rich.readthedocs.io/en/latest/progress.html) child class.
208 This class add three features to the original one:
210 - support (basic) rendering in non-terminal
211 - task status management
212 - different default columns setup (but you can override this)
213 - rendering on stdout by default (but you can change this by providing
214 a custom Console object)
216 You can use it exactly like the original [Progress class](https://rich.readthedocs.io/en/latest/reference/progress.html):
218 ```python
219 import time
220 from mfutil.cli import MFProgress
222 with MFProgress() as progress:
223 t1 = p.add_task("Foo task")
224 t2 = p.add_task("Foo task")
225 while not progress.finished:
226 progress.update(t1, advance=10)
227 progress.update(t2, advance=10)
228 time.sleep(1)
229 ```
231 For status management:
233 - if you leave MFProgress context manager, not finished tasks are
234 automatically set to `ERROR` state, finished tasks are automatically
235 set to `OK` state
236 - you have 3 new methods to manually override this behaviour:
237 (complete_task(), complete_task_nok(), complete_task_warning())
239 Example:
241 ```python
242 import time
243 from mfutil.cli import MFProgress
245 with MFProgress() as progress:
246 t1 = p.add_task("Foo task")
247 t2 = p.add_task("Foo task")
248 i = 0
249 while not progress.finished:
250 progress.update(t1, advance=10)
251 if i < 5:
252 progress.update(t2, advance=10)
253 elif i == 5:
254 progress.complete_task_failed(t2, "unknown error")
255 time.sleep(1)
256 i = i + 1
257 ```
259 """
260 def __init__(self, *args, **kwargs):
261 console = kwargs.get("console", None)
262 if not console:
263 self._interactive = _is_interactive(sys.stdout)
264 kwargs["console"] = Console()
265 else:
266 self._interactive = _is_interactive(console.file)
267 if len(args) == 0:
268 columns = ["[progress.description]{task.description}",
269 BarColumn(12),
270 StateColumn(),
271 MFTimeRemainingColumn()]
272 self.mfprogress_columns = True
273 else:
274 columns = args
275 self.mfprogress_columns = False
276 Progress.__init__(self, *columns, **kwargs)
278 def make_tasks_table(self, tasks):
279 if self.mfprogress_columns:
280 return self._mfprogress_make_tasks_table(tasks)
281 else:
282 return MFProgress.make_tasks_table(self, tasks)
284 def _mfprogress_make_tasks_table(self, tasks):
285 """Get a table to render the Progress display."""
286 table = Table.grid()
287 table.pad_edge = True
288 table.padding = (0, 1, 0, 0)
289 try:
290 finished = self.finished
291 except Exception:
292 finished = False
293 if finished:
294 table.add_column(width=57, no_wrap=True)
295 table.add_column(width=0)
296 table.add_column()
297 table.add_column()
298 else:
299 table.add_column(width=45, no_wrap=True)
300 table.add_column()
301 table.add_column()
302 table.add_column()
303 for task in tasks:
304 if task.visible:
305 row = []
306 append = row.append
307 for index, column in enumerate(self.columns):
308 if isinstance(column, str):
309 txt = column.format(task=task)
310 if finished:
311 if len(txt) > 79:
312 txt = txt[0:74] + "[[...]]"
313 else:
314 if len(txt) > 67:
315 txt = txt[0:62] + "[[...]]"
316 append(txt)
317 table.columns[index].no_wrap = True
318 else:
319 widget = column(task)
320 append(widget)
321 if isinstance(widget, (str, Text)):
322 table.columns[index].no_wrap = True
323 table.add_row(*row)
324 return table
326 def __exit__(self, *args, **kwargs):
327 with self._lock:
328 for tid, task in self._tasks.items():
329 if not task.finished:
330 self.complete_task_nok(tid)
331 Progress.__exit__(self, *args, **kwargs)
333 def start(self, *args, **kwargs):
334 if self._interactive:
335 return Progress.start(self, *args, **kwargs)
337 def __complete_task_nok_nolock(self, task_id, status_extra=""):
338 task = self._tasks[task_id]
339 self.update(task_id, completed=task.total, status="ERROR",
340 refresh=False, status_extra=status_extra)
342 def complete_task_nok(self, task_id, status_extra=""):
343 """Complete a task with ERROR status.
345 Args:
346 task_id (TaskID): a task ID.
347 status_extra (str): a little extra message to add.
349 """
350 with self._lock:
351 self.__complete_task_nok_nolock(task_id, status_extra=status_extra)
353 def complete_task_warning(self, task_id, status_extra=""):
354 """Complete a task with WARNING status.
356 Args:
357 task_id (TaskID): a task ID.
358 status_extra (str): a little extra message to add.
360 """
361 with self._lock:
362 task = self._tasks[task_id]
363 self.update(task_id, completed=task.total, status="WARNING",
364 refresh=False, status_extra=status_extra)
366 def complete_task(self, task_id, status_extra=""):
367 """Complete a task with OK status.
369 Args:
370 task_id (TaskID): a task ID.
371 status_extra (str): a little extra message to add.
373 """
374 with self._lock:
375 task = self._tasks[task_id]
376 self.update(task_id, completed=task.total, status="OK",
377 refresh=False, status_extra=status_extra)
379 def stop(self):
380 if self._interactive:
381 return Progress.stop(self)
382 else:
383 with self._lock:
384 for tid, task in self._tasks.items():
385 status = ""
386 extra = task.fields.get('status_extra', '')
387 if len(extra) > 0:
388 extra = " (%s)" % extra
389 if task.finished:
390 if task.fields.get('status', 'OK') == 'OK':
391 status = "OK"
392 elif task.fields.get('status', 'OK') == 'WARNING':
393 status = "WARNING"
394 else:
395 status = "ERROR"
396 print("%s [ %s ]%s" % (task.description, status, extra),
397 file=self.console.file)
399 def refresh(self, *args, **kwargs):
400 if self._interactive:
401 return Progress.refresh(self, *args, **kwargs)