Coverage for mfutil/cli.py: 31%
191 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-13 15:33 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-13 15:33 +0000
1# -*- coding: utf-8 -*-
2"""Utility functions to build CLI (Python >= 3.6 only)."""
4from __future__ import print_function
5import six
6import sys
7try:
8 from rich.progress import Progress, BarColumn, ProgressColumn, \
9 TimeRemainingColumn
10 from rich.text import Text
11 from rich.bar import Bar
12 from rich.table import Table
13 from rich.console import Console
14except ImportError:
15 class Dummy():
16 pass
17 TimeRemainingColumn = Dummy
18 BarColumn = Dummy
19 ProgressColumn = Dummy
20 Progress = Dummy
21 Bar = Dummy
23from mfutil.exc import MFUtilException
25__pdoc__ = {
26 "MFTimeRemainingColumn": False,
27 "StateColumn": False
28}
30_STDOUT_CONSOLE = None
31_STDERR_CONSOLE = None
34def _get_console(**kwargs):
35 global _STDOUT_CONSOLE, _STDERR_CONSOLE
36 if len(kwargs) > 1 or \
37 (len(kwargs) == 1 and
38 (kwargs.get('file', None) not in (sys.stdout, sys.stderr))):
39 return Console(**kwargs)
40 file = kwargs.get('file', None)
41 if file == sys.stdout:
42 if _STDOUT_CONSOLE is None:
43 _STDOUT_CONSOLE = Console(**kwargs)
44 return _STDOUT_CONSOLE
45 elif file == sys.stderr:
46 if _STDERR_CONSOLE is None:
47 _STDERR_CONSOLE = Console(**kwargs)
48 return _STDERR_CONSOLE
49 return Console(**kwargs)
52def _is_interactive(f):
53 c = _get_console(file=f)
54 return c.is_terminal
57def is_interactive(target=None):
58 """Return True if we are in an interactive terminal.
60 Args:
61 target (string): can be None (for stdout AND stderr checking),
62 "stdout" (for stdout checking only or "stderr" (for stderr checking
63 only).
65 Returns:
66 boolean (True (interactive) or False (non-interactive).
68 Raises:
69 MFUtilException: if target is invalid
71 """
72 if target is None:
73 return _is_interactive(sys.stdout) and _is_interactive(sys.stderr)
74 elif target == "stdout":
75 return _is_interactive(sys.stdout)
76 elif target == "stderr":
77 return _is_interactive(sys.stderr)
78 else:
79 raise MFUtilException("invalid target parameter: %s" % target)
82def echo_ok(message=""):
83 """Write [OK] with colors if supported a little optional message.
85 Args:
86 message (string): little optional message.
88 """
89 if is_interactive("stdout"):
90 echo_clean()
91 print("\033[60G[ \033[32mOK\033[0;0m ] %s%s" % (
92 message, " " * (13 - len(message)))
93 )
94 else:
95 print(" [ OK ] %s" % message)
98def echo_nok(message=""):
99 """Write [ERROR] with colors if supported a little optional message.
101 Args:
102 message (string): little optional message.
104 """
105 if is_interactive("stdout"):
106 echo_clean()
107 print("\033[60G[ \033[31mERROR\033[0;0m ] %s%s" % (
108 message, " " * (10 - len(message)))
109 )
110 else:
111 print(" [ ERROR ] %s" % message)
114def echo_warning(message=""):
115 """Write [WARNING] with colors if supported a little optional message.
117 Args:
118 message (string): little optional message.
120 """
121 if is_interactive("stdout"):
122 echo_clean()
123 print("\033[60G[ \033[33mWARNING\033[0;0m ] %s" % message)
124 else:
125 print("[ WARNING ] %s" % message)
128def echo_bold(message):
129 """Write a message in bold (if supported).
131 Args:
132 message (string): message to write in bold.
134 """
135 if is_interactive("stdout"):
136 print("\033[1m%s\033[0m" % message)
137 else:
138 print(message)
141def echo_running(message=None):
142 """Write [RUNNING] with colors if supported.
144 You can pass an optional message which will be rendered before [RUNNING]
145 on the same line.
147 Args:
148 message (string): little optional message.
150 """
151 if message is not None:
152 if is_interactive("stdout"):
153 if six.PY2:
154 print(message, end="")
155 sys.stdout.flush()
156 else:
157 print(message, end="", flush=True)
158 else:
159 print(message, end="")
160 if is_interactive("stdout"):
161 echo_clean()
162 print("\033[60G[ \033[33mRUNNING\033[0;0m ]", end="")
165def echo_clean():
166 """Clean waiting status."""
167 if is_interactive("stdout"):
168 print("\033[60G[ \033 ", end="")
171class StateColumn(ProgressColumn):
173 def render(self, task):
174 if task.finished:
175 extra = task.fields.get('status_extra', '')
176 if len(extra) > 0:
177 extra = " (%s)" % extra
178 if task.fields.get('status', 'OK') == 'ERROR':
179 return Text.assemble(Text("[ "),
180 Text("ERROR", style="red"),
181 Text(" ]%s" % extra))
182 elif task.fields.get('status', 'OK') == 'WARNING':
183 return Text.assemble(Text("[ "),
184 Text("WARNING", style="yellow"),
185 Text(" ]%s" % extra))
186 else:
187 return Text.assemble(Text("[ "),
188 Text("OK", style="green"),
189 Text(" ]%s" % extra))
190 else:
191 return Text.assemble(Text("[ "),
192 Text("RUNNING", style="yellow"),
193 Text(" ]"))
196class MFTimeRemainingColumn(TimeRemainingColumn):
198 def render(self, task):
199 if task.finished:
200 return Text("")
201 else:
202 return TimeRemainingColumn.render(self, task)
205class MFProgress(Progress):
206 """[Rich Progress]
207 (https://rich.readthedocs.io/en/latest/progress.html) child class.
209 This class add three features to the original one:
211 - support (basic) rendering in non-terminal
212 - task status management
213 - different default columns setup (but you can override this)
214 - rendering on stdout by default (but you can change this by providing
215 a custom Console object)
217 You can use it exactly like the original [Progress class]
218 (https://rich.readthedocs.io/en/latest/reference/progress.html):
220 ```python
221 import time
222 from mfutil.cli import MFProgress
224 with MFProgress() as progress:
225 t1 = p.add_task("Foo task")
226 t2 = p.add_task("Foo task")
227 while not progress.finished:
228 progress.update(t1, advance=10)
229 progress.update(t2, advance=10)
230 time.sleep(1)
231 ```
233 For status management:
235 - if you leave MFProgress context manager, not finished tasks are
236 automatically set to `ERROR` state, finished tasks are automatically
237 set to `OK` state
238 - you have 3 new methods to manually override this behaviour:
239 (complete_task(), complete_task_nok(), complete_task_warning())
241 Example:
243 ```python
244 import time
245 from mfutil.cli import MFProgress
247 with MFProgress() as progress:
248 t1 = p.add_task("Foo task")
249 t2 = p.add_task("Foo task")
250 i = 0
251 while not progress.finished:
252 progress.update(t1, advance=10)
253 if i < 5:
254 progress.update(t2, advance=10)
255 elif i == 5:
256 progress.complete_task_failed(t2, "unknown error")
257 time.sleep(1)
258 i = i + 1
259 ```
261 """
262 def __init__(self, *args, **kwargs):
263 console = kwargs.get("console", None)
264 if not console:
265 self._interactive = _is_interactive(sys.stdout)
266 kwargs["console"] = Console()
267 else:
268 self._interactive = _is_interactive(console.file)
269 if len(args) == 0:
270 columns = ["[progress.description]{task.description}",
271 BarColumn(12),
272 StateColumn(),
273 MFTimeRemainingColumn()]
274 self.mfprogress_columns = True
275 else:
276 columns = args
277 self.mfprogress_columns = False
278 Progress.__init__(self, *columns, **kwargs)
280 def make_tasks_table(self, tasks):
281 if self.mfprogress_columns:
282 return self._mfprogress_make_tasks_table(tasks)
283 else:
284 return MFProgress.make_tasks_table(self, tasks)
286 def _mfprogress_make_tasks_table(self, tasks):
287 """Get a table to render the Progress display."""
288 table = Table.grid()
289 table.pad_edge = True
290 table.padding = (0, 1, 0, 0)
291 try:
292 finished = self.finished
293 except Exception:
294 finished = False
295 if finished:
296 table.add_column(width=57, no_wrap=True)
297 table.add_column(width=0)
298 table.add_column()
299 table.add_column()
300 else:
301 table.add_column(width=45, no_wrap=True)
302 table.add_column()
303 table.add_column()
304 table.add_column()
305 for task in tasks:
306 if task.visible:
307 row = []
308 append = row.append
309 for index, column in enumerate(self.columns):
310 if isinstance(column, str):
311 txt = column.format(task=task)
312 if finished:
313 if len(txt) > 79:
314 txt = txt[0:74] + "[[...]]"
315 else:
316 if len(txt) > 67:
317 txt = txt[0:62] + "[[...]]"
318 append(txt)
319 table.columns[index].no_wrap = True
320 else:
321 widget = column(task)
322 append(widget)
323 if isinstance(widget, (str, Text)):
324 table.columns[index].no_wrap = True
325 table.add_row(*row)
326 return table
328 def __exit__(self, *args, **kwargs):
329 with self._lock:
330 for tid, task in self._tasks.items():
331 if not task.finished:
332 self.complete_task_nok(tid)
333 Progress.__exit__(self, *args, **kwargs)
335 def start(self, *args, **kwargs):
336 if self._interactive:
337 return Progress.start(self, *args, **kwargs)
339 def __complete_task_nok_nolock(self, task_id, status_extra=""):
340 task = self._tasks[task_id]
341 self.update(task_id, completed=task.total, status="ERROR",
342 refresh=False, status_extra=status_extra)
344 def complete_task_nok(self, task_id, status_extra=""):
345 """Complete a task with ERROR status.
347 Args:
348 task_id (TaskID): a task ID.
349 status_extra (str): a little extra message to add.
351 """
352 with self._lock:
353 self.__complete_task_nok_nolock(task_id, status_extra=status_extra)
355 def complete_task_warning(self, task_id, status_extra=""):
356 """Complete a task with WARNING status.
358 Args:
359 task_id (TaskID): a task ID.
360 status_extra (str): a little extra message to add.
362 """
363 with self._lock:
364 task = self._tasks[task_id]
365 self.update(task_id, completed=task.total, status="WARNING",
366 refresh=False, status_extra=status_extra)
368 def complete_task(self, task_id, status_extra=""):
369 """Complete a task with OK status.
371 Args:
372 task_id (TaskID): a task ID.
373 status_extra (str): a little extra message to add.
375 """
376 with self._lock:
377 task = self._tasks[task_id]
378 self.update(task_id, completed=task.total, status="OK",
379 refresh=False, status_extra=status_extra)
381 def stop(self):
382 if self._interactive:
383 return Progress.stop(self)
384 else:
385 with self._lock:
386 for tid, task in self._tasks.items():
387 status = ""
388 extra = task.fields.get('status_extra', '')
389 if len(extra) > 0:
390 extra = " (%s)" % extra
391 if task.finished:
392 if task.fields.get('status', 'OK') == 'OK':
393 status = "OK"
394 elif task.fields.get('status', 'OK') == 'WARNING':
395 status = "WARNING"
396 else:
397 status = "ERROR"
398 print("%s [ %s ]%s" % (task.description, status, extra),
399 file=self.console.file)
401 def refresh(self, *args, **kwargs):
402 if self._interactive:
403 return Progress.refresh(self, *args, **kwargs)