Update vyos.py

Migrate to templates
This commit is contained in:
Wieger Bontekoe 2024-04-01 16:30:09 +02:00 committed by GitHub
parent 9c16a0055a
commit d5a4d0d422
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 181 additions and 249 deletions

View File

@ -19,12 +19,15 @@ Read napalm.readthedocs.org for more information.
"""
import logging
import os
import re
import tempfile
import textfsm
import vyattaconfparser
from django.core.cache import cache
cache.clear()
# NAPALM base
import napalm.base.constants as C
@ -37,6 +40,7 @@ from napalm.base.exceptions import (
)
from netmiko import ConnectHandler, SCPConn, __version__ as netmiko_version
class VyOSDriver(NetworkDriver):
_MINUTE_SECONDS = 60
@ -129,24 +133,26 @@ class VyOSDriver(NetworkDriver):
else:
cfg_filename = filename
if os.path.exists(cfg_filename) is True:
self._scp_client.scp_transfer_file(cfg_filename, self._DEST_FILENAME)
self.device.send_command(f"cp {self._BOOT_FILENAME} {self._BACKUP_FILENAME}")
output_loadcmd = self.device.send_config_set([f"load {self._DEST_FILENAME}"])
match_loaded = re.findall("Load complete.", output_loadcmd)
match_notchanged = re.findall(
"No configuration changes to commit", output_loadcmd
)
if match_failed := re.findall(
"Failed to parse specified config file", output_loadcmd
):
raise ReplaceConfigException(f"Failed replace config: {output_loadcmd}")
if not match_loaded and not match_notchanged:
raise ReplaceConfigException(f"Failed replace config: {output_loadcmd}")
else:
if os.path.exists(cfg_filename) is not True:
raise ReplaceConfigException("config file is not found")
self._scp_client.scp_transfer_file(cfg_filename, self._DEST_FILENAME)
self.device.send_command(
f"cp {self._BOOT_FILENAME} {self._BACKUP_FILENAME}"
)
output_loadcmd = self.device.send_config_set(
[f"load {self._DEST_FILENAME}"]
)
match_loaded = re.findall("Load complete.", output_loadcmd)
match_notchanged = re.findall(
"No configuration changes to commit", output_loadcmd
)
if match_failed := re.findall(
"Failed to parse specified config file", output_loadcmd
):
raise ReplaceConfigException(f"Failed replace config: {output_loadcmd}")
if not match_loaded and not match_notchanged:
raise ReplaceConfigException(f"Failed replace config: {output_loadcmd}")
def load_merge_candidate(self, filename=None, config=None):
"""
@ -167,7 +173,9 @@ class VyOSDriver(NetworkDriver):
if os.path.exists(cfg_filename) is not True:
raise MergeConfigException("config file is not found")
with open(cfg_filename) as f:
self.device.send_command(f"cp {self._BOOT_FILENAME} {self._BACKUP_FILENAME}")
self.device.send_command(
f"cp {self._BOOT_FILENAME} {self._BACKUP_FILENAME}"
)
self._new_config = f.read()
cfg = [x for x in self._new_config.split("\n") if x]
output_loadcmd = self.device.send_config_set(cfg)
@ -213,7 +221,9 @@ class VyOSDriver(NetworkDriver):
if match := re.findall("Load complete.", output_loadcmd):
self.device.send_config_set(["commit", "save"])
else:
raise ReplaceConfigException(f"Failed rollback config: {output_loadcmd}")
raise ReplaceConfigException(
f"Failed rollback config: {output_loadcmd}"
)
def get_environment(self):
"""
@ -446,249 +456,172 @@ class VyOSDriver(NetworkDriver):
192.168.1.3 4 64521 7132 7103 0 0 0 4d21h05m 0
192.168.1.4 4 64522 0 0 0 0 0 never Active
"""
output = self.device.send_command("show ip bgp summary")
output = output.split("\n")
match = re.search(
r".* router identifier (\d+\.\d+\.\d+\.\d+), local AS number (\d+)",
output[2],
)
if not match:
return {}
router_id = match[1]
local_as = int(match[2])
current_dir = os.path.dirname(os.path.abspath(__file__))
template_path = os.path.join(current_dir, "templates", "bgp_sum.template")
bgp_neighbor_data = {"global": {}}
bgp_neighbor_data["global"]["router_id"] = router_id
bgp_neighbor_data["global"]["peers"] = {}
# Assuming you've got a TextFSM template ready to parse the `bgp_detail` output
with open(template_path) as template_file:
fsm = textfsm.TextFSM(template_file)
header = fsm.header
result = fsm.ParseText(output)
# delete the header and empty element
bgp_info = [i.strip() for i in output[9:-2] if i]
bgp_neighbor_data = {"global": {"router_id": "", "peers": {}}}
for i in bgp_info:
if len(i) > 0:
values = i.split()
(
peer_id,
bgp_version,
remote_as,
msg_rcvd,
msg_sent,
table_version,
in_queue,
out_queue,
up_time,
state_prefix,
) = values[:10]
bgp_neighbor_data["global"]["router_id"] = result[0][
header.index("BGP_ROUTER_ID")
]
is_enabled = "(Admin)" not in state_prefix
for neighbor in result:
peer_id = neighbor[header.index("NEIGHBOR")]
bgp_neighbor_data["global"]["peers"][peer_id] = []
received_prefixes = None
peer_dict = {
"description": str(neighbor[header.index("DESCRIPTION")]),
"is_enabled": "Admin" not in neighbor[header.index("PREFIX_SENT")],
"local_as": int(neighbor[header.index("LOCAL_AS")]),
"is_up": "Active"
not in neighbor[header.index("STATE_PREFIX_RECEIVED")],
"remote_id": neighbor[header.index("NEIGHBOR")],
"remote_address": neighbor[header.index("NEIGHBOR")],
"uptime": int(
self._bgp_time_conversion(neighbor[header.index("UP_TIME")])
),
"remote_as": int(neighbor[header.index("NEIGHBOR_AS")]),
}
try:
state_prefix = int(state_prefix)
received_prefixes = state_prefix
is_up = True
except ValueError:
state_prefix = -1
received_prefixes = -1
is_up = False
if bgp_version == "4":
address_family = "ipv4"
elif bgp_version == "6":
address_family = "ipv6"
else:
raise ValueError("BGP neighbor parsing failed")
"""
'show ip bgp neighbors 192.168.1.1' output example:
BGP neighbor is 192.168.1.1, remote AS 64519, local AS 64520, external link
BGP version 4, remote router ID 192.168.1.1
For address family: IPv4 Unicast
~~~
Community attribute sent to this neighbor(both)
1 accepted prefixes
~~~
"""
bgp_detail = self.device.send_command(
"show ip bgp neighbors %s" % peer_id
)
match_rid = re.search(
r"remote router ID (\d+\.\d+\.\d+\.\d+).*", bgp_detail
)
remote_rid = match_rid[1]
match_prefix_accepted = re.search(
r"(\d+) accepted prefixes", bgp_detail
)
accepted_prefixes = match_prefix_accepted[1]
bgp_neighbor_data["global"]["peers"].setdefault(peer_id, {})
peer_dict = {
"description": "",
"is_enabled": is_enabled,
"local_as": local_as,
"is_up": bool(is_up),
"remote_id": remote_rid,
"remote_address": peer_id,
"uptime": int(self._bgp_time_conversion(up_time)),
"remote_as": int(remote_as),
}
af_dict = {
address_family: {
"sent_prefixes": -1,
"accepted_prefixes": int(accepted_prefixes),
"received_prefixes": int(received_prefixes),
}
}
peer_dict["address_family"] = af_dict
bgp_neighbor_data["global"]["peers"][peer_id] = peer_dict
bgp_neighbor_data["global"]["peers"][peer_id] = peer_dict
return bgp_neighbor_data
import re
def get_bgp_neighbors_detail(self, neighbor_address=""):
def search_and_group(pattern, text, default=None):
"""Helper function to perform regex search and return the first group, or a default value"""
match = re.search(pattern, text)
return match[1] if match else default
def search_and_int_group(pattern, text, default=0):
"""Helper function to perform regex search and return the first group as an integer, or a default value"""
result = search_and_group(pattern, text, default)
return int(result) if result is not None else default
def safe_int(value, default=0):
try:
return int(value) if value and value.isdigit() else default
except ValueError:
return default
output = self.device.send_command("show ip bgp summary").split("\n")
match = re.search(
r".* router identifier (\d+\.\d+\.\d+\.\d+), local AS number (\d+)",
output[2],
)
if not match:
logger.warning("BGP neighbor parsing failed")
return {}
neighbors = self.get_bgp_neighbors()
_router_id, local_as = match.group(1), int(match[2])
bgp_neighbor_data = {"global": {}}
bgp_info = [i.strip() for i in output[9:-2] if i]
for neighbor in neighbors["global"]["peers"]:
for i in bgp_info:
if len(i) > 0:
values = i.split()
(
peer_id,
bgp_version,
remote_as,
msg_rcvd,
msg_sent,
table_version,
in_queue,
out_queue,
up_time,
state_prefix,
prefix_sent,
peer_desc,
) = values[:12]
output = self.device.send_command(f"show ip bgp neighbor {neighbor}")
if neighbor_address and peer_id != neighbor_address:
self.logger.warning(f"Skipping {peer_id}")
current_dir = os.path.dirname(os.path.abspath(__file__))
template_path = os.path.join(
current_dir, "templates", "bgp_details.template"
)
bgp_neighbor_data = {"global": {}}
with open(template_path) as template_file:
fsm = textfsm.TextFSM(template_file)
result = fsm.ParseText(output)
if not result:
continue
prefix_sent = (
0
if "(Admin)" in state_prefix or "Idle" in state_prefix
else int(prefix_sent) if prefix_sent.isdigit() else 0
)
received_prefixes = int(state_prefix) if state_prefix.isdigit() else -1
bgp_detail = self.device.send_command(f"show ip bgp neighbors {peer_id}")
neighbors_dicts = [
dict(zip(fsm.header, neighbor)) for neighbor in result
]
peer_dict = {
"up": search_and_group(
r" BGP state = (\S+), up for", bgp_detail
)
== "Established",
"local_as": local_as,
"remote_as": int(remote_as),
"router_id": search_and_group(
r"local router ID (\d+\.\d+\.\d+\.\d+)", bgp_detail
),
"local_address": search_and_group(
r"local router ID (\d+\.\d+\.\d+\.\d+)", bgp_detail
),
"routing_table": search_and_group(
r"BGP version (\d+)", bgp_detail
),
"local_address_configured": bool(
re.search(r"Local host: (\d+\.\d+\.\d+\.\d+)", bgp_detail)
),
"local_port": search_and_group(
r"Local port: (\d+)", bgp_detail
),
"remote_address": peer_id,
"remote_port": search_and_group(
r"Foreign port: (\d+)", bgp_detail
),
"multihop": search_and_int_group(
r"External BGP neighbor may be up to (\d+)", bgp_detail
)
> 1,
"multipath": -1,
"remove_private_as": -1,
"import_policy": search_and_group(
r" Route map for incoming advertisements is (\S+)",
bgp_detail,
),
"export_policy": search_and_group(
r" Route map for outgoing advertisements is (\S+)",
bgp_detail,
),
"input_messages": -1,
"output_messages": -1,
"input_updates": -1,
"output_updates": -1,
"connection_state": search_and_group(
r" BGP state = (\S+)", bgp_detail
)
.replace(",", "")
.lower(),
"bgp_state": search_and_group(
r" BGP state = (\S+)", bgp_detail
).replace(",", ""),
"previous_connection_state": -1,
"last_event": -1,
"suppress_4byte_as": -1,
"local_as_prepend": -1,
"holdtime": search_and_group(
r"Hold time is (\d+)", bgp_detail
),
"configured_holdtime": search_and_int_group(
r"Configured hold time is (\d+)", bgp_detail, 0
),
"keepalive": search_and_group(
r"keepalive interval is (\d+)", bgp_detail
),
"configured_keepalive": search_and_int_group(
r"keepalive interval is (\d+)", bgp_detail, 0
),
"active_prefix_count": -1,
"accepted_prefix_count": search_and_int_group(
r"(\d+) accepted prefixes", bgp_detail, -1
),
"suppressed_prefix_count": -1,
"advertised_prefix_count": prefix_sent,
"received_prefix_count": received_prefixes,
"flap_count": -1,
}
for neighbor in neighbors_dicts:
remote_as_int = int(remote_as)
bgp_neighbor_data["global"].setdefault(remote_as_int, []).append(
peer_dict
)
remote_as = neighbor["REMOTE_AS"]
return bgp_neighbor_data
peer_dict = {
"up": neighbor["BGP_STATE"].lower() == "established",
"local_as": int(neighbor["LOCAL_AS"]),
"remote_as": int(neighbor["REMOTE_AS"]),
"router_id": neighbor["LOCAL_ROUTER_ID"],
"local_address": neighbor[
"LOCAL_ROUTER_ID"
], # Adjusted from LOCAL_ROUTER_ID based on context
"routing_table": f"IPv{neighbor['BGP_VERSION']} Unicast", # Constructed value
"local_address_configured": bool(neighbor["LOCAL_ROUTER_ID"]),
"local_port": (
int(neighbor["LOCAL_PORT"])
if neighbor["LOCAL_PORT"].isdigit()
else None
),
"remote_address": neighbor["REMOTE_ROUTER_ID"],
"remote_port": neighbor["FOREIGN_PORT"],
"multipath": neighbor.get(
"DYNAMIC_CAPABILITY", "no"
), # Assuming DYNAMIC_CAPABILITY indicates multipath
"remove_private_as": (
"yes"
if neighbor.get("REMOVE_PRIVATE_AS", "no") != "no"
else "no"
), # Placeholder for actual value
"input_messages": sum(
int(neighbor["MESSAGE_STATISTICS_RECEIVED"][i])
for i in range(len(neighbor["MESSAGE_STATISTICS_TYPE"]))
if neighbor["MESSAGE_STATISTICS_TYPE"][i]
in ["Updates", "Keepalives"]
),
"output_messages": sum(
int(neighbor["MESSAGE_STATISTICS_SENT"][i])
for i in range(len(neighbor["MESSAGE_STATISTICS_TYPE"]))
if neighbor["MESSAGE_STATISTICS_TYPE"][i]
in ["Updates", "Keepalives"]
),
"input_updates": safe_int(
neighbor.get("RECEIVED_PREFIXES_IPV4")
)
+ safe_int(neighbor.get("RECEIVED_PREFIXES_IPV6")),
"output_updates": safe_int(
neighbor.get("ADVERTISED_PREFIX_COUNT")
),
"connection_state": neighbor["BGP_STATE"].lower(),
"bgp_state": neighbor["BGP_STATE"].lower(),
"previous_connection_state": neighbor.get(
"LAST_RESET_REASON", "unknown"
),
"last_event": neighbor.get(
"LAST_EVENT", "Not Available"
), # Assuming LAST_EVENT is available
"suppress_4byte_as": neighbor.get(
"FOUR_BYTE_AS_CAPABILITY", "Not Configured"
),
"local_as_prepend": neighbor.get(
"LOCAL_AS_PREPEND", "Not Configured"
), # Assuming LOCAL_AS_PREPEND is available
"holdtime": int(neighbor["HOLD_TIME"]),
"configured_holdtime": int(neighbor["CONFIGURED_HOLD_TIME"]),
"keepalive": int(neighbor["KEEPALIVE_INTERVAL"]),
"configured_keepalive": int(
neighbor["CONFIGURED_KEEPALIVE_INTERVAL"]
),
"active_prefix_count": int(
neighbor.get("ACTIVE_PREFIX_COUNT", 0)
), # Assuming ACTIVE_PREFIX_COUNT is available
"accepted_prefix_count": int(
neighbor.get("ACCEPTED_PREFIX_COUNT", 0)
), # Assuming ACCEPTED_PREFIX_COUNT is available
"suppressed_prefix_count": int(
neighbor.get("SUPPRESSED_PREFIX_COUNT", 0)
), # Assuming SUPPRESSED_PREFIX_COUNT is available
"advertised_prefix_count": int(
neighbor.get("ADVERTISED_PREFIX_COUNT", 0)
),
"received_prefix_count": safe_int(
neighbor.get("RECEIVED_PREFIXES_IPV4", 0)
)
+ safe_int(neighbor.get("RECEIVED_PREFIXES_IPV6", 0)),
"flap_count": safe_int(
neighbor.get("FLAP_COUNT", 0)
), # Assuming FLAP_COUNT is available
}
bgp_neighbor_data["global"].setdefault(int(remote_as), []).append(
peer_dict
)
return bgp_neighbor_data
def _bgp_time_conversion(self, bgp_uptime):
if "never" in bgp_uptime:
@ -703,10 +636,9 @@ class VyOSDriver(NetworkDriver):
elif "w" in bgp_uptime:
match = re.search(r"(\d+)(\w)(\d+)(\w)(\d+)(\w)", bgp_uptime)
return (
(int(match.group(1)) * self._WEEK_SECONDS)
+ (int(match.group(3)) * self._DAY_SECONDS)
+ (int(match.group(5)) * self._HOUR_SECONDS)
)
int(match[1]) * self._WEEK_SECONDS
+ int(match[3]) * self._DAY_SECONDS
) + int(match[5]) * self._HOUR_SECONDS
elif "d" in bgp_uptime:
match = re.search(r"(\d+)(\w)(\d+)(\w)(\d+)(\w)", bgp_uptime)
return (
@ -831,9 +763,7 @@ class VyOSDriver(NetworkDriver):
ver_str = [line for line in output if "Version" in line][0]
version = self.parse_version(ver_str)
above_1_1 = bool(
not version.startswith("1.0") and not version.startswith("1.1")
)
above_1_1 = not version.startswith("1.0") and not version.startswith("1.1")
if above_1_1:
sn_str = [line for line in output if "Hardware S/N" in line][0]
hwmodel_str = [line for line in output if "Hardware model" in line][0]
@ -993,7 +923,9 @@ class VyOSDriver(NetworkDriver):
# 'loss,', 'time', '3997ms']
packet_info = output_ping.split("\n")
packet_info = packet_info[-2] if len(packet_info[-1]) > 0 else packet_info[-3]
packet_info = (
packet_info[-2] if len(packet_info[-1]) > 0 else packet_info[-3]
)
packet_info = [x.strip() for x in packet_info.split()]
sent = int(packet_info[0])