ParallelSSHClient

Package containing ParallelSSHClient class

class pssh.pssh_client.ParallelSSHClient(hosts, user=None, password=None, port=None, pkey=None, forward_ssh_agent=True, num_retries=3, timeout=120, pool_size=10, proxy_host=None, proxy_port=22, proxy_user=None, proxy_password=None, proxy_pkey=None, agent=None, allow_agent=True, host_config=None, channel_timeout=None)

Uses pssh.ssh_client.SSHClient, performs tasks over SSH on multiple hosts in parallel.

Connections to hosts are established in parallel when run_command is called, therefor any connection and/or authentication exceptions will happen on the call to run_command and need to be handled there.

Parameters:
  • hosts (list(str)) – Hosts to connect to
  • user (str) – (Optional) User to login as. Defaults to logged in user or user from ~/.ssh/config or /etc/ssh/ssh_config if set
  • password (str) – (Optional) Password to use for login. Defaults to no password
  • port (int) – (Optional) Port number to use for SSH connection. Defaults to None which uses SSH default
  • pkey (paramiko.pkey.PKey) – (Optional) Client’s private key to be used to connect with
  • num_retries (int) – (Optional) Number of retries for connection attempts before the client gives up. Defaults to 3.
  • timeout (int) – (Optional) Number of seconds to wait before connection and authentication attempt times out. Note that total time before timeout will be timeout * num_retries + (5 * (num_retries-1)) number of seconds, where (5 * (num_retries-1)) refers to a five (5) second delay between retries.
  • forward_ssh_agent (bool) – (Optional) Turn on/off SSH agent forwarding - equivalent to ssh -A from the ssh command line utility. Defaults to True if not set.
  • pool_size (int) – (Optional) Greenlet pool size. Controls on how many hosts to execute tasks in parallel. Defaults to 10. Overhead in event loop will determine how high this can be set to, see scaling guide lines in project’s readme.
  • proxy_host (str) – (Optional) SSH host to tunnel connection through so that SSH clients connect to host via client -> proxy_host -> host
  • proxy_port (int) – (Optional) SSH port to use to login to proxy host if set. Defaults to 22.
  • proxy_user (str) – (Optional) User to login to proxy_host as. Defaults to logged in user.
  • proxy_password (str) – (Optional) Password to login to proxy_host with. Defaults to no password
  • proxy_pkey (paramiko.pkey.PKey) – (Optional) Private key to be used for authentication with proxy_host. Defaults to available keys from SSHAgent and user’s home directory keys
  • agent (pssh.agent.SSHAgent) – (Optional) SSH agent object to programmatically supply an agent to override system SSH agent with
  • host_config (dict) – (Optional) Per-host configuration for cases where not all hosts use the same configuration values.
  • channel_timeout (int) – (Optional) Time in seconds before reading from an SSH channel times out. For example with channel timeout set to one, trying to immediately gather output from a command producing no output for more than one second will timeout.
  • allow_agent (bool) – (Optional) set to False to disable connecting to the system’s SSH agent

Example Usage

from __future__ import print_function
from pprint import pprint

from pssh.pssh_client import ParallelSSHClient
from pssh.exceptions import AuthenticationException, \
    UnknownHostException, ConnectionErrorException

client = ParallelSSHClient(['myhost1', 'myhost2'])
try:
    output = client.run_command('ls -ltrh /tmp/aasdfasdf', sudo=True)
except (AuthenticationException, UnknownHostException,
        ConnectionErrorException):
    pass

Commands have started executing at this point. Exit code(s) will not be available immediately.

pprint(output)
  {'myhost1':
        host=myhost1
        exit_code=None
        cmd=<Greenlet>
        channel=<channel>
        stdout=<generator>
        stderr=<generator>
        stdin=<channel>
        exception=None
   'myhost2':
        host=myhost2
        exit_code=None
        cmd=<Greenlet>
        channel=<channel>
        stdout=<generator>
        stderr=<generator>
        stdin=<channel>
        exception=None
  }
Enabling host logger:
 

There is a host logger in parallel-ssh that can be enabled to show stdout from remote commands on hosts as it comes in.

This allows for stdout to be automatically logged without having to print it serially per host. pssh.utils.host_logger is a standard library logger and may be configured to log to anywhere else.

For host logger to log output, join must be called with consume_output=True

import pssh.utils
pssh.utils.enable_host_logger()

output = client.run_command('ls -ltrh')
client.join(output, consume_output=True)
[myhost1]     drwxrwxr-x 6 user group 4.0K Jan 1 HH:MM x
[myhost2]     drwxrwxr-x 6 user group 4.0K Jan 1 HH:MM x

Retrieve exit codes after commands have finished as below.

exit_code in output will be None immediately after call to run_command.

parallel-ssh starts commands asynchronously to enable starting multiple commands in parallel without blocking.

Because of this, exit codes will not be immediately available even for commands that exit immediately.

Waiting for command completion:
 

At least one of

  • Iterating over stdout/stderr to completion
  • Calling client.join(output)

is necessary to cause parallel-ssh to wait for commands to finish and be able to gather exit codes.

An individual command’s exit code can be gathered by get_exit_code(host_output)

Note

Joining on client’s gevent pool

client.pool.join() only blocks until greenlets have been spawned which will be immediately as long as pool is not full.

Checking command completion:
 

To check if commands have finished without blocking use

client.finished(output)
False

which returns True if and only if all commands in output have finished.

For individual commands the status of channel can be checked

output[host].channel.closed
False

which returns True if command has finished.

Either iterating over stdout/stderr or client.join(output) will cause exit codes to become available in output without explicitly calling get_exit_codes.

Use client.join(output) to block until all commands have finished and gather exit codes at same time.

In versions prior to 1.0.0 only, client.join would consume output.

Exit code retrieval

get_exit_codes is not a blocking function and will not wait for commands to finish.

output parameter is modified in-place.

client.get_exit_codes(output)
for host in output:
    print(output[host].exit_code)
0
0

Stdout from each host

for host in output:
    for line in output[host].stdout:
        print(line)
ls: cannot access /tmp/aasdfasdf: No such file or directory
ls: cannot access /tmp/aasdfasdf: No such file or directory

Example with specified private key

from pssh.utils import load_private_key
client_key = load_private_key('user.key')
client = ParallelSSHClient(['myhost1', 'myhost2'], pkey=client_key)

Multiple commands

for cmd in ['uname', 'whoami']:
    client.run_command(cmd)

Per-Host configuration

Per host configuration can be provided for any or all of user, password port and private key. Private key value is a paramiko.pkey.PKey object as returned by pssh.utils.load_private_key().

pssh.utils.load_private_key() accepts both file names and file-like objects and will attempt to load all available key types, returning None if they all fail.

from pssh.utils import load_private_key

host_config = { 'host1' : {'user': 'user1', 'password': 'pass',
                           'port': 2222,
                           'private_key': load_private_key(
                               'my_key.pem')},
                'host2' : {'user': 'user2', 'password': 'pass',
                           'port': 2223,
                           'private_key': load_private_key(
                               open('my_other_key.pem'))},
                }
hosts = host_config.keys()

client = ParallelSSHClient(hosts, host_config=host_config)
client.run_command('uname')
<..>

Note

Connection persistence

Connections to hosts will remain established for the duration of the object’s life. To close them, just del or reuse the object reference

client = ParallelSSHClient(['localhost'])
output = client.run_command('ls')
netstat:tcp  0  0 127.0.0.1:53054    127.0.0.1:22    ESTABLISHED

Connection remains active after commands have finished executing. Any additional commands will reuse the same connection.

del client

Connection is terminated.

copy_file(local_file, remote_file, recurse=False)

Copy local file to remote file in parallel

This function returns a list of greenlets which can be join-ed on to wait for completion.

gevent.joinall() function may be used to join on all greenlets and will also raise exceptions from them if called with raise_error=True - default is False.

Alternatively call .get on each greenlet to raise any exceptions from it.

Exceptions listed here are raised when either gevent.joinall(<greenlets>, raise_error=True) is called or .get is called on each greenlet, not this function itself.

Parameters:
  • local_file (str) – Local filepath to copy to remote host
  • remote_file (str) – Remote filepath on remote host to copy file to
  • recurse (bool) – Whether or not to descend into directories recursively.
Return type:

List(gevent.Greenlet) of greenlets for remote copy commands

Raises:

ValueError when a directory is supplied to local_file and recurse is not set

Raises:

IOError on I/O errors writing files

Raises:

OSError on OS errors like permission denied

Note

Remote directories in remote_file that do not exist will be created as long as permissions allow.

copy_remote_file(remote_file, local_file, recurse=False, suffix_separator='_')

Copy remote file(s) in parallel as <local_file><suffix_separator><host>

With a local_file value of myfile and default separator _ the resulting filename will be myfile_myhost for the file from host myhost.

This function, like ParallelSSHClient.copy_file(), returns a list of greenlets which can be join-ed on to wait for completion.

gevent.joinall() function may be used to join on all greenlets and will also raise exceptions if called with raise_error=True - default is False.

Alternatively call .get on each greenlet to raise any exceptions from it.

Exceptions listed here are raised when either gevent.joinall(<greenlets>, raise_error=True) is called or .get is called on each greenlet, not this function itself.

Parameters:
  • remote_file (str) – remote filepath to copy to local host
  • local_file (str) – local filepath on local host to copy file to
  • recurse (bool) – whether or not to recurse
  • suffix_separator (str) – (Optional) Separator string between filename and host, defaults to _. For example, for a local_file value of myfile and default separator the resulting filename will be myfile_myhost for the file from host myhost
Return type:

list(gevent.Greenlet) of greenlets for remote copy commands

Raises:

ValueError when a directory is supplied to local_file and recurse is not set

Raises:

IOError on I/O errors writing files

Raises:

OSError on OS errors like permission denied

Note

Local directories in local_file that do not exist will be created as long as permissions allow.

Note

File names will be de-duplicated by appending the hostname to the filepath separated by suffix_separator.

finished(output)

Check if commands have finished without blocking

Parameters:output – As returned by pssh.pssh_client.ParallelSSHClient.get_output()
Return type:bool
get_exit_code(host_output)

Get exit code from host output if available.

Parameters:host_output – Per host output as returned by pssh.pssh_client.ParallelSSHClient.get_output()
Return type:int or None if exit code not ready
get_exit_codes(output)

Get exit code for all hosts in output if available. Output parameter is modified in-place.

Parameters:output – As returned by pssh.pssh_client.ParallelSSHClient.get_output()
Return type:None
get_output(cmd, output, encoding='utf-8')

Get output from command.

Parameters:
Return type:

None

output parameter is modified in-place and has the following structure

{'myhost1':
      exit_code=exit code if ready else None
      channel=SSH channel of command
      stdout=<iterable>
      stderr=<iterable>
      cmd=<greenlet>
      exception=<exception object if applicable>
}

Stdout and stderr are also logged via the logger named host_logger which can be enabled by calling enable_host_logger

Example usage:

output = client.get_output()
for host in output:
    for line in output[host].stdout:
        print(line)
<stdout>
# Get exit code for a particular host's output after command
# has finished
self.get_exit_code(output[host])
0
join(output, consume_output=False)

Block until all remote commands in output have finished and retrieve exit codes

Parameters:
  • output (dict as returned by pssh.pssh_client.ParallelSSHClient.get_output()) – Output of commands to join on
  • consume_output (bool) – Whether or not join should consume output buffers. Output buffers will be empty after join if set to True. Must be set to True to allow host logger to log output on call to join.
Enabling host logger:
 
from pssh.utils import enable_host_logger
enable_host_logger()
output = client.run_command(<..>)
client.join(output, consume_output=True)

# Output buffers now empty
len(list(output[client.hosts[0]].stdout)) == 0

With consume_output=True, host logger logs output.

[my_host1] <..>

With consume_output=False, the default, iterating over output is needed for host logger to log anything.

output = client.run_command(<..>)
client.join(output, consume_output=False)
for host, host_out in output.items():
    for line in host_out.stdout:
        pass
[my_host1] <..>
run_command(command, sudo=False, user=None, stop_on_errors=True, shell=None, use_shell=True, use_pty=True, host_args=None, encoding='utf-8', **paramiko_kwargs)

Run command on all hosts in parallel, honoring self.pool_size, and return output buffers.

This function will block until all commands have been sent to remote servers and then return immediately

More explicitly, function will return after connection and authentication establishment and after commands have been sent to successfully established SSH channels.

Any connection and/or authentication exceptions will be raised here and need catching unless run_command is called with stop_on_errors=False in which case exceptions are added to host output instead.

Parameters:
  • command (str) – Command to run
  • sudo (bool) – (Optional) Run with sudo. Defaults to False
  • user (str) – (Optional) User to run command as. Requires sudo access for that user from the logged in user account.
  • stop_on_errors (bool) – (Optional) Raise exception on errors running command. Defaults to True. With stop_on_errors set to False, exceptions are instead added to output of run_command. See example usage below.
  • shell (str) – (Optional) Override shell to use to run command with. Defaults to login user’s defined shell. Use the shell’s command syntax, eg shell=’bash -c’ or shell=’zsh -c’.
  • use_shell (bool) – (Optional) Run command with or without shell. Defaults to True - use shell defined in user login to run command string
  • use_pty (bool) – (Optional) Enable/Disable use of pseudo terminal emulation. Disabling it will prohibit capturing standard input/output. This is required in majority of cases, exceptions being where a shell is not used and/or input/output is not required. In particular when running a command which deliberately closes input/output pipes, such as a daemon process, you may want to disable use_pty. Defaults to True
  • host_args (tuple or list) – (Optional) Format command string with per-host arguments in host_args. host_args length must equal length of host list - pssh.exceptions.HostArgumentException is raised otherwise
  • encoding (str) – Encoding to use for output. Must be valid Python codec
  • paramiko_kwargs (dict) – (Optional) Extra keyword arguments to be passed on to paramiko.client.SSHClient.connect()
Return type:

Dictionary with host as key and pssh.output.HostOutput as value as per pssh.pssh_client.ParallelSSHClient.get_output()

Raises:

pssh.exceptions.AuthenticationException on authentication error

Raises:

pssh.exceptions.UnknownHostException on DNS resolution error

Raises:

pssh.exceptions.ConnectionErrorException on error connecting

Raises:

pssh.exceptions.SSHException on other undefined SSH errors

Raises:

pssh.exceptions.HostArgumentException on number of host arguments not equal to number of hosts

Raises:

TypeError on not enough host arguments for cmd string format

Raises:

KeyError on no host argument key in arguments dict for cmd string format

Example Usage

Simple run command:
 
output = client.run_command('ls -ltrh')
Print stdout for each command:
 
from __future__ import print_function

for host in output:
    for line in output[host].stdout:
        print(line)
Get exit codes after command has finished:
 
from __future__ import print_function

client.get_exit_codes(output)
for host in output:
    print(output[host].exit_code)
0
0
Wait for completion, print exit codes:
 
client.join(output)
print(output[host].exit_code)
0
for line in output[host].stdout:
    print(line)
Run with sudo:
output = client.run_command('ls -ltrh', sudo=True)
Capture stdout:

Warning

This will store the entirety of stdout into memory and may exhaust available memory if command output is large enough.

Iterating over stdout/stderr to completion by definition implies blocking until command has finished. To only log output as it comes in without blocking the host logger can be enabled - see Enabling Host Logger above.

from __future__ import print_function

for host in output:
    stdout = list(output[host].stdout)
    print("Complete stdout for host %s is %s" % (host, stdout,))
Command with per-host arguments:
 

host_args keyword parameter can be used to provide arguments to use to format the command string.

Number of host_args should be at least as many as number of hosts.

Any string format specification characters may be used in command string.

Examples:
# Tuple
#
# First host in hosts list will use cmd 'host1_cmd',
# second host 'host2_cmd' and so on
output = client.run_command('%s', host_args=('host1_cmd',
                                             'host2_cmd',
                                             'host3_cmd',))

# Multiple arguments
#
output = client.run_command('%s %s',
                            host_args=(('host1_cmd1', 'host1_cmd2'),
                                       ('host2_cmd1', 'host2_cmd2'),
                                       ('host3_cmd1', 'host3_cmd2'),))

# List of dict
#
# First host in host list will use cmd 'host-index-0',
# second host 'host-index-1' and so on
output = client.run_command(
  '%(cmd)s', host_args=[{'cmd': 'host-index-%s' % (i,))
                        for i in range(len(client.hosts))])
Expression as host list:
 

Any type of iterator may be used as host list, including generator and list comprehension expressions.

hosts = ['dc1.myhost1', 'dc2.myhost2']
# List comprehension
client = ParallelSSHClient([h for h in hosts if h.find('dc1')])
# Generator
client = ParallelSSHClient((h for h in hosts if h.find('dc1')))
# Filter
client = ParallelSSHClient(filter(lambda h: h.find('dc1'), hosts))
client.run_command(<..>)

Note

Since generators by design only iterate over a sequence once then stop, client.hosts should be re-assigned after each call to run_command when using generators as target of client.hosts.

Overriding host list:
 

Host list can be modified in place. Call to run_command will create new connections as necessary and output will only contain output for the hosts run_command executed on.

client.hosts = ['otherhost']
print(client.run_command('exit 0'))
{'otherhost': exit_code=None, <..>}
Run multiple commands in parallel:
 

This short example demonstrates running multiple long running commands in parallel on the same host, how long it takes for all commands to start, blocking until they complete and how long it takes for all commands to complete.

See examples directory for complete script.

output = []
host = 'localhost'

# Run 10 five second sleeps
cmds = ['sleep 5' for _ in xrange(10)]
start = datetime.datetime.now()
for cmd in cmds:
    output.append(client.run_command(cmd, stop_on_errors=False))
end = datetime.datetime.now()
print("Started %s commands in %s" % (len(cmds), end-start,))
start = datetime.datetime.now()
for _output in output:
    for line in _output[host].stdout:
        print(line)
end = datetime.datetime.now()
print("All commands finished in %s" % (end-start,))

Output

Started 10 commands in 0:00:00.428629
All commands finished in 0:00:05.014757
Output format:
{'myhost1':
      host=myhost1
      exit_code=exit code if ready else None
      channel=SSH channel of command
      stdout=<iterable>
      stderr=<iterable>
      stdin=<file-like writable channel>
      cmd=<greenlet>
      exception=None}
Do not stop on errors, return per-host exceptions in output:
 
output = client.run_command('ls -ltrh', stop_on_errors=False)
client.join(output)
print(output)
{'myhost1':
      host=myhost1
      exit_code=None
      channel=None
      stdout=None
      stderr=None
      cmd=None
      exception=ConnectionErrorException(
                  "Error connecting to host '%s:%s' - %s - "
                  "retry %s/%s",
                   host, port, 'Connection refused', 3, 3)}
Using stdin:
output = client.run_command('read')
stdin = output['localhost'].stdin
stdin.write("writing to stdin\n")
stdin.flush()
for line in output['localhost'].stdout:
    print(line)

writing to stdin