miércoles, 8 de abril de 2026

Python ASYNC (III). Ejecutando otros procesos

 1. Llamada a comandos de la shell SINCRONA

Veamos como se llama a un proceso síncronamente ya sea por el ecomando entero o pasándole parámetros:

def run_text(cmd):
    '''
    Executes a command and captures its output as text.
    Parameters:
        cmd: A list of command arguments to execute (e.g., ["certutil", "-L", "-d", "sql:/home/user/.pki/nssdb"]).
    Returns:
    The standard output of the command as a string if successful, or None if an error occurs.
    Example usage:
        # List certificates in the NSS database
        output = run_text(["certutil", "-L", "-d", "sql:/home/user/.pki/nssdb"])
    '''
    try:
        res = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            check=True
        )
        return res.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error ejecutando: {' '.join(cmd)}", file=sys.stderr)
        return None


def run_bytes(cmd, input_data=None):
    '''
    Executes a command and captures its output as bytes.
    Parameters:
        cmd: A list of command arguments to execute (e.g., ["openssl", "x509", "-in", "cert.pem", "-outform", "DER"]).
        input_data: Optional bytes to be sent to the command's standard input.
    Returns:
    The standard output of the command as bytes if successful, or None if an error occurs.
    Example usage:
        # Get the PEM-encoded certificate for a given nickname from the NSS database
        pem_bytes = run_bytes(["certutil", "-L", "-d", "sql:/home/user/.pki/nssdb", "-n", "My Certificate", "-a"])
    '''
    try:
        res = subprocess.run(
            cmd,
            input=input_data,
            capture_output=True,
            check=True
        )
        return res.stdout
    except subprocess.CalledProcessError:
        return None

 2. Llamada a comandos de la shell ASINCRONA

Veamos como se llama a un proceso asíncronamente ya sea por el ecomando entero o pasándole parámetros:

async def run_text_async(cmd: list[str]) -> str | None:
    '''
    Executes a command asynchronously and captures its output as text.
    Parameters:
        cmd: A list of command arguments to execute (e.g., ["ls", "-l"]).
    Returns:
        The standard output of the command as a string if successful, or None if an error occurs.
    Example usage:
        # List files in the current directory asynchronously
        output = await run_text_async(["ls", "-l"])    
    '''
    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate()
        if proc.returncode != 0:
            print(f"Error ejecutando: {' '.join(cmd)}", file=sys.stderr)
            if stderr:
                print(stderr.decode(errors="ignore").strip(), file=sys.stderr)
            return None
        return stdout.decode(errors="ignore")
    except Exception as e:
        print(f"Excepción ejecutando {' '.join(cmd)}: {e}", file=sys.stderr)
        return None


async def run_bytes_async(cmd: list[str], input_data: bytes | None = None) -> bytes | None:
    '''
    Executes a command asynchronously and captures its output as bytes.
    Parameters:
        cmd: A list of command arguments to execute (e.g., ["openssl", "x509", "-in", "cert.pem", "-outform", "DER"]).
        input_data: Optional bytes to be sent to the command's standard input.
    Returns:
        The standard output of the command as bytes if successful, or None if an error occurs.
    Example usage:
        # Get the PEM-encoded certificate for a given nickname from the NSS database asynchronously
        pem_bytes = await run_bytes_async(["certutil", "-L", "-d", "    
    '''
    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE if input_data is not None else None,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate(input=input_data)
        if proc.returncode != 0:
            print(f"Error ejecutando: {' '.join(cmd)}", file=sys.stderr)
            if stderr:
                print(stderr.decode(errors="ignore").strip(), file=sys.stderr)
            return None
        return stdout
    except Exception as e:
        print(f"Excepción ejecutando {' '.join(cmd)}: {e}", file=sys.stderr)
        return None




No hay comentarios :

Publicar un comentario