import sys
import os
import time
import subprocess
import threading
import warnings
import re
import queue
class RunCmdException(Exception):
"""
Raised by function :func:`run_cmd`.
"""
pass
def get_interpreter_path():
"""
Returns the interpreter path.
"""
if sys.platform.startswith("win"):
return sys.executable.replace("pythonw.exe", "python.exe")
else:
return sys.executable
def split_cmp_command(cmd, remove_quotes=True):
"""
Splits a command line.
:param cmd: command line
:param remove_quotes: True by default
:return: list
"""
if isinstance(cmd, str):
spl = cmd.split()
res = []
for s in spl:
if len(res) == 0:
res.append(s)
elif res[-1].startswith('"') and not res[-1].endswith('"'):
res[-1] += " " + s
else:
res.append(s)
if remove_quotes:
nres = []
for _ in res:
if _.startswith('"') and _.endswith('"'):
nres.append(_.strip('"'))
else:
nres.append(_)
return nres
else:
return res
else:
return cmd
def decode_outerr(outerr, encoding, encerror, msg):
"""
Decodes the output or the error after running a command line instructions.
:param outerr: output or error
:param encoding: encoding (if None, it is replaced by ascii)
:param encerror: how to handle errors
:param msg: to add to the exception message
:return: converted string
"""
if encoding is None:
encoding = "ascii"
typstr = str
if not isinstance(outerr, bytes):
raise TypeError("only able to decode bytes, not " + typstr(type(outerr)))
try:
out = outerr.decode(encoding, errors=encerror)
return out
except UnicodeDecodeError as exu:
try:
out = outerr.decode(
"utf8" if encoding != "utf8" else "latin-1", errors=encerror
)
return out
except Exception as e:
out = outerr.decode(encoding, errors="ignore")
raise RuntimeError(
f"issue with cmd ({encoding}):{typstr(msg)}\n{typstr(exu)}"
"\n-----\n{out}"
) from e
raise RuntimeError("complete issue with cmd:" + typstr(msg))
def skip_run_cmd(
cmd,
sin="",
shell=True,
wait=False,
log_error=True,
stop_running_if=None,
encerror="ignore",
encoding="utf8",
change_path=None,
communicate=True,
preprocess=True,
timeout=None,
catch_exit=False,
logf=None,
timeout_listen=None,
tell_if_no_output=None,
prefix_log=None,
):
"""
Has the same signature as :func:`run_cmd` but does nothing.
"""
return "", ""
[docs]def run_cmd(
cmd,
sin="",
shell=sys.platform.startswith("win"),
wait=False,
log_error=True,
stop_running_if=None,
encerror="ignore",
encoding="utf8",
change_path=None,
communicate=True,
preprocess=True,
timeout=None,
catch_exit=False,
logf=None,
tell_if_no_output=None,
prefix_log=None,
):
"""
Runs a command line and wait for the result.
:param cmd: command line
:param sin: sin: what must be written on the standard input
:param shell: if True, cmd is a shell command (and no command window is opened)
:param wait: call ``proc.wait``
:param log_error: if log_error, call logf (error)
:param stop_running_if: the function stops waiting if some condition is fulfilled.
The function received the last line from the logs.
Signature: ``stop_waiting_if(last_out, last_err) -> bool``.
The function must return True to stop waiting.
This function can also be used to intercept the standard output
and the standard error while running.
:param encerror: encoding errors (ignore by default)
while converting the output into a string
:param encoding: encoding of the output
:param change_path: change the current path if not None
(put it back after the execution)
:param communicate: use method `communicate
<https://docs.python.org/3/library/subprocess.html
#subprocess.Popen.communicate>`_
which is supposed to be safer, parameter ``wait`` must be True
:param preprocess: preprocess the command line
if necessary (not available on Windows)
(False to disable that option)
:param timeout: when data is sent to stdin (``sin``), a timeout is needed to avoid
waiting for ever (*timeout* is in seconds)
:param catch_exit: catch *SystemExit* exception
:param logf: logging function (if not None, bypass others parameters)
:param tell_if_no_output: tells if there is no output every
*tell_if_no_output* seconds
:param prefix_log: add a prefix to a line before printing it
:return: content of stdout, stdres (only if wait is True)
::
from sphinx_runpython.runpython import run_cmd
out, err = run_cmd("python setup.py install", wait=True)
If you are using this function to run :epkg:`git` function, parameter
``shell`` must be True.
The function catches *SystemExit* exception.
See `Constantly print Subprocess output while process is running
<http://stackoverflow.com/questions/4417546/
constantly-print-subprocess-output-while-process-is-running/4417735>`_.
If *wait* is False, the function returns the started process.
``__exit__`` should be called if wait if False.
Parameter *prefix_log* was added.
"""
if prefix_log is None:
prefix_log = ""
if logf is not None:
if isinstance(cmd, (list, tuple)):
logf(prefix_log + "[run_cmd] execute", " ".join(cmd))
else:
logf(prefix_log + "[run_cmd] execute", cmd)
if change_path is not None:
current = os.getcwd()
os.chdir(change_path)
if sys.platform.startswith("win"):
cmdl = cmd
else:
cmdl = split_cmp_command(cmd) if preprocess else cmd
if catch_exit:
try:
pproc = subprocess.Popen(
cmdl,
shell=shell,
stdin=subprocess.PIPE if sin is not None and len(sin) > 0 else None,
stdout=subprocess.PIPE if wait else None,
stderr=subprocess.PIPE if wait else None,
)
except SystemExit as e:
if change_path is not None:
os.chdir(current)
raise RunCmdException("SystemExit raised (1)") from e
else:
pproc = subprocess.Popen(
cmdl,
shell=shell,
stdin=subprocess.PIPE if sin is not None and len(sin) > 0 else None,
stdout=subprocess.PIPE if wait else None,
stderr=subprocess.PIPE if wait else None,
)
pproc.__enter__()
if isinstance(cmd, list):
cmd = " ".join(cmd)
if wait:
skip_out_err = False
out = []
err = []
err_read = False
skip_waiting = False
if communicate:
# communicate is True
if tell_if_no_output is not None:
raise NotImplementedError(
"tell_if_no_output is not implemented when communicate is True"
)
if stop_running_if is not None:
raise NotImplementedError(
"stop_running_if is not implemented when communicate is True"
)
input = sin if sin is None else sin.encode()
if input is not None and len(input) > 0:
if logf is not None:
logf(prefix_log + "[run_cmd] input", [input])
if catch_exit:
try:
stdoutdata, stderrdata = pproc.communicate(
input=input, timeout=timeout
)
except SystemExit as e:
if change_path is not None:
os.chdir(current)
raise RunCmdException("SystemExit raised (2)") from e
else:
stdoutdata, stderrdata = pproc.communicate(input=input, timeout=timeout)
out = decode_outerr(stdoutdata, encoding, encerror, cmd)
err = decode_outerr(stderrdata, encoding, encerror, cmd)
else:
# communicate is False: use of threads
if sin is not None and len(sin) > 0:
if change_path is not None:
os.chdir(current)
pproc.__exit__(None, None, None)
raise RuntimeError(
"Argument 'communicate' should be True to send "
"something on stdin."
)
stdout, stderr = pproc.stdout, pproc.stderr
begin = time.perf_counter()
last_update = begin
# with threads
(stdoutReader, stdoutQueue) = _AsyncLineReader.getForFd(
stdout, catch_exit=catch_exit
)
(stderrReader, stderrQueue) = _AsyncLineReader.getForFd(
stderr, catch_exit=catch_exit
)
runloop = True
while (not stdoutReader.eof() or not stderrReader.eof()) and runloop:
while not stdoutQueue.empty():
line = stdoutQueue.get()
decol = decode_outerr(line, encoding, encerror, cmd)
sdecol = decol.strip("\n\r")
if logf is not None:
logf(prefix_log + sdecol)
out.append(sdecol)
last_update = time.perf_counter()
if stop_running_if is not None and stop_running_if(decol, None):
runloop = False
break
while not stderrQueue.empty():
line = stderrQueue.get()
decol = decode_outerr(line, encoding, encerror, cmd)
sdecol = decol.strip("\n\r")
if logf is not None:
logf(prefix_log + sdecol)
err.append(sdecol)
last_update = time.perf_counter()
if stop_running_if is not None and stop_running_if(None, decol):
runloop = False
break
time.sleep(0.05)
delta = time.perf_counter() - last_update
if tell_if_no_output is not None and delta >= tell_if_no_output:
logf(
prefix_log
+ "[run_cmd] No update in {0} seconds for cmd: {1}".format(
"%5.1f" % (last_update - begin), cmd
)
)
last_update = time.perf_counter()
full_delta = time.perf_counter() - begin
if timeout is not None and full_delta > timeout:
runloop = False
logf(
prefix_log
+ "[run_cmd] Timeout after {0} seconds for cmd: {1}".format(
"%5.1f" % full_delta, cmd
)
)
break
if runloop:
# Waiting for async readers to finish...
stdoutReader.join()
stderrReader.join()
# Waiting for process to exit...
returnCode = pproc.wait()
err_read = True
if returnCode != 0:
if change_path is not None:
os.chdir(current)
try:
# we try to close the ressources
stdout.close()
stderr.close()
except Exception:
warnings.warn(
"Unable to close stdout and sterr.", RuntimeWarning
)
if catch_exit:
mes = (
"SystemExit raised with error code {0}\nCMD:\n{1}\n"
"CWD:\n{2}\n#---OUT---#\n{3}\n#---ERR---#\n{4}"
)
raise RunCmdException(
mes.format(
returnCode,
cmd,
os.getcwd(),
"\n".join(out),
"\n".join(err),
)
)
raise subprocess.CalledProcessError(returnCode, cmd)
if not skip_waiting:
pproc.wait()
else:
out.append("[run_cmd] killing process.")
logf(
prefix_log
+ "[run_cmd] killing process because stop_running_if returned True."
)
pproc.kill()
err_read = True
logf(prefix_log + "[run_cmd] process killed.")
skip_out_err = True
out = "\n".join(out)
if skip_out_err:
err = "Process killed."
else:
if err_read:
err = "\n".join(err)
else:
temp = err = stderr.read()
try:
err = decode_outerr(temp, encoding, encerror, cmd)
except Exception:
err = decode_outerr(temp, encoding, "ignore", cmd)
stdout.close()
stderr.close()
# same path for whether communicate is False or True
err = err.replace("\r\n", "\n")
if logf is not None:
logf(prefix_log + "end of execution", cmd)
if len(err) > 0 and log_error and logf is not None:
if "\n" in err:
logf(prefix_log + "[run_cmd] stderr (log)")
for eline in err.split("\n"):
logf(prefix_log + eline)
else:
logf(prefix_log + f"[run_cmd] stderr (log)\n{err}")
if change_path is not None:
os.chdir(current)
pproc.__exit__(None, None, None)
if sys.platform.startswith("win"):
if err is not None:
err = err.strip("\n\r\t ")
return out.replace("\r\n", "\n"), err.replace("\r\n", "\n")
else:
if err is not None:
err = err.strip("\n\r\t ")
return out, err
else:
if change_path is not None:
os.chdir(current)
return pproc, None
def parse_exception_message(exc):
"""
Parses the message embedded in an exception and returns the
standard output and error if it can be found.
:param exc: exception coming :func:`run_cmd`
:return: out, err
"""
mes = str(exc)
reg = re.compile(".*#---OUT---#(.*)#---ERR---#(.*)", re.DOTALL)
find = reg.search(mes.replace("\r", ""))
if find:
gr = find.groups()
out, err = gr[0], gr[1]
return out.strip("\n "), err.strip("\n ")
else:
return None, None
def run_script(script, *args, **kwargs):
"""
Runs a script.
:param script: script to execute or command line starting with ``-m``
:param args: other parameters
:param kwargs: sent to :func:`run_cmd`
:return: out,err content of stdout stream and stderr stream
Allows command line starting with ``-m``.
"""
if not script.startswith("-m") and not os.path.exists(script):
raise FileNotFoundError(f"File {script!r} not found.")
py = get_interpreter_path()
cmd = f"{py} {script}"
if len(args) > 0:
typstr = str
cmd += " " + " ".join([typstr(x) for x in args])
out, err = run_cmd(cmd, **kwargs)
return out, err
class _AsyncLineReader(threading.Thread):
def __init__(self, fd, outputQueue, catch_exit):
threading.Thread.__init__(self)
assert isinstance(outputQueue, queue.Queue)
assert callable(fd.readline)
self.fd = fd
self.catch_exit = catch_exit
self.outputQueue = outputQueue
def run(self):
if self.catch_exit:
try:
for _ in map(self.outputQueue.put, iter(self.fd.readline, b"")):
pass
except SystemExit as e:
self.outputQueue.put(str(e))
raise RunCmdException("SystemExit raised (3)") from e
else:
for _ in map(self.outputQueue.put, iter(self.fd.readline, b"")):
pass
def eof(self):
return not self.is_alive() and self.outputQueue.empty()
@classmethod
def getForFd(cls, fd, start=True, catch_exit=False):
q = queue.Queue()
reader = cls(fd, q, catch_exit)
if start:
reader.start()
return reader, q