316 lines
9.6 KiB
Python

# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, division, absolute_import, print_function)
import re
import os
import socket
from powerline.lib.url import urllib_read
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.segments import with_docstring
from powerline.theme import requires_segment_info
@requires_segment_info
def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False):
'''Return the current hostname.
:param bool only_if_ssh:
only return the hostname if currently in an SSH session
:param bool exclude_domain:
return the hostname without domain if there is one
'''
if (
segment_info['environ'].get('_POWERLINE_RUNNING_SHELL_TESTS')
== 'ee5bcdc6-b749-11e7-9456-50465d597777'
):
return 'hostname'
if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'):
return None
if exclude_domain:
return socket.gethostname().split('.')[0]
return socket.gethostname()
def _external_ip(query_url='http://ipv4.icanhazip.com/'):
return urllib_read(query_url).strip()
class ExternalIpSegment(ThreadedSegment):
interval = 300
def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs):
self.query_url = query_url
super(ExternalIpSegment, self).set_state(**kwargs)
def update(self, old_ip):
return _external_ip(query_url=self.query_url)
def render(self, ip, **kwargs):
if not ip:
return None
return [{'contents': ip, 'divider_highlight_group': 'background:divider'}]
external_ip = with_docstring(ExternalIpSegment(),
'''Return external IP address.
:param str query_url:
URI to query for IP address, should return only the IP address as a text string
Suggested URIs:
* http://ipv4.icanhazip.com/
* http://ipv6.icanhazip.com/
* http://icanhazip.com/ (returns IPv6 address if available, else IPv4)
Divider highlight group used: ``background:divider``.
''')
try:
import netifaces
except ImportError:
def internal_ip(pl, interface='auto', ipv=4):
return None
else:
_interface_starts = {
'eth': 10, # Regular ethernet adapters : eth1
'enp': 10, # Regular ethernet adapters, Gentoo : enp2s0
'en': 10, # OS X : en0
'ath': 9, # Atheros WiFi adapters : ath0
'wlan': 9, # Other WiFi adapters : wlan1
'wlp': 9, # Other WiFi adapters, Gentoo : wlp5s0
'teredo': 1, # miredo interface : teredo
'lo': -10, # Loopback interface : lo
'docker': -5, # Docker bridge interface : docker0
'vmnet': -5, # VMWare bridge interface : vmnet1
'vboxnet': -5, # VirtualBox bridge interface : vboxnet0
}
_interface_start_re = re.compile(r'^([a-z]+?)(\d|$)')
def _interface_key(interface):
match = _interface_start_re.match(interface)
if match:
try:
base = _interface_starts[match.group(1)] * 100
except KeyError:
base = 500
if match.group(2):
return base - int(match.group(2))
else:
return base
else:
return 0
def internal_ip(pl, interface='auto', ipv=4):
family = netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET
if interface == 'auto':
try:
interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True)))
except StopIteration:
pl.info('No network interfaces found')
return None
elif interface == 'default_gateway':
try:
interface = netifaces.gateways()['default'][family][1]
except KeyError:
pl.info('No default gateway found for IPv{0}', ipv)
return None
addrs = netifaces.ifaddresses(interface)
try:
return addrs[family][0]['addr']
except (KeyError, IndexError):
pl.info("No IPv{0} address found for interface {1}", ipv, interface)
return None
internal_ip = with_docstring(internal_ip,
'''Return internal IP address
Requires ``netifaces`` module to work properly.
:param str interface:
Interface on which IP will be checked. Use ``auto`` to automatically
detect interface. In this case interfaces with lower numbers will be
preferred over interfaces with similar names. Order of preference based on
names:
#. ``eth`` and ``enp`` followed by number or the end of string.
#. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string.
#. ``teredo`` followed by number or the end of string.
#. Any other interface that is not ``lo*``.
#. ``lo`` followed by number or the end of string.
Use ``default_gateway`` to detect the interface based on the machine's
`default gateway <https://en.wikipedia.org/wiki/Default_gateway>`_ (i.e.,
the router to which it is connected).
:param int ipv:
4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you
need exactly.
''')
try:
import psutil
def _get_bytes(interface):
try:
io_counters = psutil.net_io_counters(pernic=True)
except AttributeError:
io_counters = psutil.network_io_counters(pernic=True)
if_io = io_counters.get(interface)
if not if_io:
return None
return if_io.bytes_recv, if_io.bytes_sent
def _get_interfaces():
try:
io_counters = psutil.net_io_counters(pernic=True)
except AttributeError:
io_counters = psutil.network_io_counters(pernic=True)
for interface, data in io_counters.items():
if data:
yield interface, data.bytes_recv, data.bytes_sent
except ImportError:
def _get_bytes(interface):
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
def _get_interfaces():
for interface in os.listdir('/sys/class/net'):
x = _get_bytes(interface)
if x is not None:
yield interface, x[0], x[1]
class NetworkLoadSegment(KwThreadedSegment):
interfaces = {}
replace_num_pat = re.compile(r'[a-zA-Z]+')
@staticmethod
def key(interface='auto', **kwargs):
return interface
def compute_state(self, interface):
if interface == 'auto':
proc_exists = getattr(self, 'proc_exists', None)
if proc_exists is None:
proc_exists = self.proc_exists = os.path.exists('/proc/net/route')
if proc_exists:
# Look for default interface in routing table
with open('/proc/net/route', 'rb') as f:
for line in f.readlines():
parts = line.split()
if len(parts) > 1:
iface, destination = parts[:2]
if not destination.replace(b'0', b''):
interface = iface.decode('utf-8')
break
if interface == 'auto':
# Choose interface with most total activity, excluding some
# well known interface names
interface, total = 'eth0', -1
for name, rx, tx in _get_interfaces():
base = self.replace_num_pat.match(name)
if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'):
continue
activity = rx + tx
if activity > total:
total = activity
interface = name
try:
idata = self.interfaces[interface]
try:
idata['prev'] = idata['last']
except KeyError:
pass
except KeyError:
idata = {}
if self.run_once:
idata['prev'] = (monotonic(), _get_bytes(interface))
self.shutdown_event.wait(self.interval)
self.interfaces[interface] = idata
idata['last'] = (monotonic(), _get_bytes(interface))
return idata.copy()
def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs):
if not idata or 'prev' not in idata:
return None
t1, b1 = idata['prev']
t2, b2 = idata['last']
measure_interval = t2 - t1
if None in (b1, b2):
return None
r = []
for i, key in zip((0, 1), ('recv', 'sent')):
format = locals()[key + '_format']
try:
value = (b2[i] - b1[i]) / measure_interval
except ZeroDivisionError:
self.warn('Measure interval zero.')
value = 0
max_key = key + '_max'
is_gradient = max_key in kwargs
hl_groups = ['network_load_' + key, 'network_load']
if is_gradient:
hl_groups[:0] = (group + '_gradient' for group in hl_groups)
r.append({
'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)),
'divider_highlight_group': 'network_load:divider',
'highlight_groups': hl_groups,
})
if is_gradient:
max = kwargs[max_key]
if value >= max:
r[-1]['gradient_level'] = 100
else:
r[-1]['gradient_level'] = value * 100.0 / max
return r
network_load = with_docstring(NetworkLoadSegment(),
'''Return the network load.
Uses the ``psutil`` module if available for multi-platform compatibility,
falls back to reading
:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`.
:param str interface:
Network interface to measure (use the special value "auto" to have powerline
try to auto-detect the network interface).
:param str suffix:
String appended to each load string.
:param bool si_prefix:
Use SI prefix, e.g. MB instead of MiB.
:param str recv_format:
Format string that determines how download speed should look like. Receives
``value`` as argument.
:param str sent_format:
Format string that determines how upload speed should look like. Receives
``value`` as argument.
:param float recv_max:
Maximum number of received bytes per second. Is only used to compute
gradient level.
:param float sent_max:
Maximum number of sent bytes per second. Is only used to compute gradient
level.
Divider highlight group used: ``network_load:divider``.
Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``.
''')