mirror of
https://github.com/netinvent/npbackup.git
synced 2025-09-10 15:04:55 +08:00
Refactor restic_metrics parser, and add tests
This commit is contained in:
parent
e3d6db7ed1
commit
29581e6944
2 changed files with 424 additions and 30 deletions
|
@ -4,19 +4,20 @@
|
|||
|
||||
__intname__ = "restic_metrics"
|
||||
__author__ = "Orsiris de Jong"
|
||||
__copyright__ = "Copyright (C) 2022-2023 Orsiris de Jong - NetInvent"
|
||||
__copyright__ = "Copyright (C) 2022-2024 Orsiris de Jong - NetInvent"
|
||||
__licence__ = "BSD-3-Clause"
|
||||
__version__ = "1.4.4"
|
||||
__build__ = "2023052701"
|
||||
__version__ = "2.0.0"
|
||||
__build__ = "2024010101"
|
||||
__description__ = (
|
||||
"Converts restic command line output to a text file node_exporter can scrape"
|
||||
)
|
||||
__compat__ = "python2.7+"
|
||||
__compat__ = "python3.6+"
|
||||
|
||||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import json
|
||||
from typing import Union, List, Tuple
|
||||
import logging
|
||||
import platform
|
||||
|
@ -28,24 +29,193 @@ from ofunctions.misc import BytesConverter, convert_time_to_seconds
|
|||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def restic_str_output_to_json(restic_exit_status: Union[bool, int], output: str) -> dict:
|
||||
"""
|
||||
Parsse restic output when used without `--json` parameter
|
||||
"""
|
||||
if restic_exit_status is False or (restic_exit_status is not True and restic_exit_status != 0):
|
||||
errors = True
|
||||
else:
|
||||
errors = False
|
||||
metrics = {
|
||||
"files_new": None,
|
||||
"files_changed": None,
|
||||
"files_unmodified": None,
|
||||
"dirs_new": None,
|
||||
"dirs_changed": None,
|
||||
"dirs_unmodified": None,
|
||||
"data_blobs": None, # Not present in standard output
|
||||
"tree_blobs": None, # Not present in standard output
|
||||
"data_added": None, # Is "4.425" in Added to the repository: 4.425 MiB (1.431 MiB stored)
|
||||
"data_stored": None, # Not present in json output, is "1.431" in Added to the repository: 4.425 MiB (1.431 MiB stored)
|
||||
"total_files_processed": None,
|
||||
"total_bytes_processed": None,
|
||||
"total_duration": None,
|
||||
|
||||
# type bool:
|
||||
"errors": None
|
||||
}
|
||||
for line in output.splitlines():
|
||||
# for line in output:
|
||||
matches = re.match(
|
||||
r"Files:\s+(\d+)\snew,\s+(\d+)\schanged,\s+(\d+)\sunmodified",
|
||||
line,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if matches:
|
||||
try:
|
||||
metrics["files_new"] = matches.group(1)
|
||||
metrics["files_changed"] = matches.group(2)
|
||||
metrics["files_unmodified"] = matches.group(3)
|
||||
except IndexError:
|
||||
logger.warning("Cannot parse restic log for files")
|
||||
errors = True
|
||||
|
||||
matches = re.match(
|
||||
r"Dirs:\s+(\d+)\snew,\s+(\d+)\schanged,\s+(\d+)\sunmodified",
|
||||
line,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if matches:
|
||||
try:
|
||||
metrics["dirs_new"] = matches.group(1)
|
||||
metrics["dirs_changed"] = matches.group(2)
|
||||
metrics["dirs_unmodified"] = matches.group(3)
|
||||
except IndexError:
|
||||
logger.warning("Cannot parse restic log for dirs")
|
||||
errors = True
|
||||
|
||||
matches = re.match(
|
||||
r"Added to the repo.*:\s([-+]?(?:\d*\.\d+|\d+))\s(\w+)\s+\((.*)\sstored\)",
|
||||
line,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if matches:
|
||||
try:
|
||||
size = matches.group(1)
|
||||
unit = matches.group(2)
|
||||
try:
|
||||
value = int(BytesConverter("{} {}".format(size, unit)))
|
||||
metrics["data_added"] = value
|
||||
except TypeError:
|
||||
logger.warning(
|
||||
"Cannot parse restic values from added to repo size log line"
|
||||
)
|
||||
errors = True
|
||||
stored_size = matches.group(3) # TODO: add unit detection in regex
|
||||
try:
|
||||
stored_size = int(BytesConverter(stored_size))
|
||||
metrics["data_stored"] = stored_size
|
||||
except TypeError:
|
||||
logger.warning(
|
||||
"Cannot parse restic values from added to repo stored_size log line"
|
||||
)
|
||||
errors = True
|
||||
except IndexError as exc:
|
||||
logger.warning(
|
||||
"Cannot parse restic log for added data: {}".format(exc)
|
||||
)
|
||||
errors = True
|
||||
|
||||
matches = re.match(
|
||||
r"processed\s(\d+)\sfiles,\s([-+]?(?:\d*\.\d+|\d+))\s(\w+)\sin\s((\d+:\d+:\d+)|(\d+:\d+)|(\d+))",
|
||||
line,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if matches:
|
||||
try:
|
||||
metrics["total_files_processed"] = matches.group(1)
|
||||
size = matches.group(2)
|
||||
unit = matches.group(3)
|
||||
try:
|
||||
value = int(BytesConverter("{} {}".format(size, unit)))
|
||||
metrics["total_bytes_processed"] = value
|
||||
except TypeError:
|
||||
logger.warning("Cannot parse restic values for total repo size")
|
||||
errors = True
|
||||
|
||||
seconds_elapsed = convert_time_to_seconds(matches.group(4))
|
||||
try:
|
||||
metrics["total_duration"] = int(seconds_elapsed)
|
||||
except ValueError:
|
||||
logger.warning("Cannot parse restic elapsed time")
|
||||
errors = True
|
||||
except IndexError as exc:
|
||||
logger.error("Trace:", exc_info=True)
|
||||
logger.warning(
|
||||
"Cannot parse restic log for repo size: {}".format(exc)
|
||||
)
|
||||
errors = True
|
||||
matches = re.match(
|
||||
r"Failure|Fatal|Unauthorized|no such host|s there a repository at the following location\?",
|
||||
line,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if matches:
|
||||
try:
|
||||
logger.debug(
|
||||
'Matcher found error: "{}" in line "{}".'.format(
|
||||
matches.group(), line
|
||||
)
|
||||
)
|
||||
except IndexError as exc:
|
||||
logger.error("Trace:", exc_info=True)
|
||||
errors = True
|
||||
|
||||
metrics["errors"] = errors
|
||||
return metrics
|
||||
|
||||
|
||||
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 4):
|
||||
import time
|
||||
def restic_json_to_prometheus(restic_json, labels: dict = None) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Transform a restic JSON result into prometheus metrics
|
||||
"""
|
||||
_labels = []
|
||||
for key, value in labels.items():
|
||||
_labels.append(f'{key}="{value}"')
|
||||
labels = ",".join(_labels)
|
||||
|
||||
def timestamp_get():
|
||||
"""
|
||||
Get UTC timestamp
|
||||
"""
|
||||
return time.mktime(datetime.utcnow().timetuple())
|
||||
# Take last line of restic output
|
||||
if isinstance(restic_json, str):
|
||||
found = False
|
||||
for line in reversed(restic_json.split('\n')):
|
||||
if '"message_type":"summary"' in line:
|
||||
restic_json = line
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
raise ValueError("Bogus data given. No message_type: summmary found")
|
||||
|
||||
else:
|
||||
if not isinstance(restic_json, dict):
|
||||
try:
|
||||
restic_json = json.loads(restic_json)
|
||||
except json.JSONDecodeError as exc:
|
||||
logger.error("Cannot decode JSON")
|
||||
raise ValueError("Bogus data given, except a json dict or string")
|
||||
|
||||
def timestamp_get():
|
||||
"""
|
||||
Get UTC timestamp
|
||||
"""
|
||||
return datetime.utcnow().timestamp()
|
||||
prom_metrics = []
|
||||
for key, value in restic_json.items():
|
||||
skip = False
|
||||
for starters in ("files", "dirs"):
|
||||
if key.startswith(starters):
|
||||
for enders in ("new", "changed", "unmodified"):
|
||||
if key.endswith(enders):
|
||||
prom_metrics.append(f'restic_{starters}{{{labels},state="{enders}",action="backup"}} {value}')
|
||||
skip = True
|
||||
if skip:
|
||||
continue
|
||||
if key == "total_files_processed":
|
||||
prom_metrics.append(f'restic_files{{{labels},state="total",action="backup"}} {value}')
|
||||
continue
|
||||
if key == "total_bytes_processed":
|
||||
prom_metrics.append(f'restic_snasphot_size_bytes{{{labels},action="backup",type="processed"}} {value}')
|
||||
continue
|
||||
if "duration" in key:
|
||||
key += "_seconds"
|
||||
prom_metrics.append(f'restic_{key}{{{labels},action="backup"}} {value}')
|
||||
return prom_metrics
|
||||
|
||||
|
||||
def restic_output_2_metrics(restic_result, output, labels=None):
|
||||
|
@ -59,6 +229,19 @@ def restic_output_2_metrics(restic_result, output, labels=None):
|
|||
Dirs: 258 new, 714 changed, 37066 unmodified
|
||||
Added to the repo: 493.649 MiB
|
||||
processed 237786 files, 85.487 GiB in 11:12
|
||||
|
||||
Logfile format with restic 0.16 (adds actual stored data size):
|
||||
|
||||
repository 962d5924 opened (version 2, compression level auto)
|
||||
using parent snapshot 8cb0c82d
|
||||
[0:00] 100.00% 2 / 2 index files loaded
|
||||
|
||||
Files: 0 new, 1 changed, 5856 unmodified
|
||||
Dirs: 0 new, 5 changed, 859 unmodified
|
||||
Added to the repository: 27.406 KiB (7.909 KiB stored)
|
||||
|
||||
processed 5857 files, 113.659 MiB in 0:00
|
||||
snapshot 6881b995 saved
|
||||
"""
|
||||
|
||||
metrics = []
|
||||
|
@ -223,23 +406,26 @@ def restic_output_2_metrics(restic_result, output, labels=None):
|
|||
|
||||
metrics.append(
|
||||
'restic_backup_failure{{{},timestamp="{}"}} {}'.format(
|
||||
labels, int(timestamp_get()), 1 if errors else 0
|
||||
labels, int(datetime.utcnow().timestamp()), 1 if errors else 0
|
||||
)
|
||||
)
|
||||
return errors, metrics
|
||||
|
||||
|
||||
def upload_metrics(destination, authentication, no_cert_verify, metrics):
|
||||
def upload_metrics(destination: str, authentication, no_cert_verify: bool, metrics):
|
||||
"""
|
||||
Optional upload of metrics to a pushgateway, when no node_exporter with text_collector is available
|
||||
"""
|
||||
try:
|
||||
headers = {
|
||||
"X-Requested-With": "{} {}".format(__intname__, __version__),
|
||||
"X-Requested-With": f"{__intname__} {__version__}",
|
||||
"Content-type": "text/html",
|
||||
}
|
||||
|
||||
data = ""
|
||||
for metric in metrics:
|
||||
data += "{}\n".format(metric)
|
||||
logger.debug("metrics:\n{}".format(data))
|
||||
data += f"{metric}\n"
|
||||
logger.debug(f"metrics:\n{data}")
|
||||
result = requests.post(
|
||||
destination,
|
||||
headers=headers,
|
||||
|
@ -251,18 +437,19 @@ def upload_metrics(destination, authentication, no_cert_verify, metrics):
|
|||
if result.status_code == 200:
|
||||
logger.info("Metrics pushed succesfully.")
|
||||
else:
|
||||
logger.warning(
|
||||
"Could not push metrics: {}: {}".format(result.reason, result.text)
|
||||
)
|
||||
logger.warning(f"Could not push metrics: {result.reason}: {result.text}")
|
||||
except Exception as exc:
|
||||
logger.error("Cannot upload metrics: {}".format(exc))
|
||||
logger.error(f"Cannot upload metrics: {exc}")
|
||||
logger.debug("Trace:", exc_info=True)
|
||||
|
||||
|
||||
def write_metrics_file(metrics, filename):
|
||||
with open(filename, "w", encoding="utf-8") as file_handle:
|
||||
for metric in metrics:
|
||||
file_handle.write(metric + "\n")
|
||||
def write_metrics_file(metrics: List[str], filename: str):
|
||||
try:
|
||||
with open(filename, "w", encoding="utf-8") as file_handle:
|
||||
for metric in metrics:
|
||||
file_handle.write(metric + "\n")
|
||||
except OSError as exc:
|
||||
logger.error(f"Cannot write metrics file {filename}: {exc}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
207
tests/test_restic_metrics.py
Normal file
207
tests/test_restic_metrics.py
Normal file
|
@ -0,0 +1,207 @@
|
|||
#! /usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
__intname__ = "restic_metrics_tests"
|
||||
__author__ = "Orsiris de Jong"
|
||||
__copyright__ = "Copyright (C) 2022-2024 Orsiris de Jong - NetInvent SASU"
|
||||
__licence__ = "NetInvent CSE"
|
||||
__build__ = "2024010101"
|
||||
__description__ = "Converts restic command line output to a text file node_exporter can scrape"
|
||||
__compat__ = "python3.6+"
|
||||
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import re
|
||||
import json
|
||||
import tempfile
|
||||
from ofunctions.platform import os_arch
|
||||
from command_runner import command_runner
|
||||
try:
|
||||
from npbackup.restic_metrics import *
|
||||
except ImportError: # would be ModuleNotFoundError in Python 3+
|
||||
# In case we run tests without actually having installed command_runner
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(__file__, os.pardir, os.pardir)))
|
||||
from npbackup.restic_metrics import *
|
||||
from npbackup.core.restic_source_binary import get_restic_internal_binary
|
||||
|
||||
restic_json_outputs = {}
|
||||
restic_json_outputs["v0.16.2"] = \
|
||||
"""{"message_type":"summary","files_new":5,"files_changed":15,"files_unmodified":6058,"dirs_new":0,"dirs_changed":27,"dirs_unmodified":866,"data_blobs":17,"tree_blobs":28,"data_added":281097,"total_files_processed":6078,"total_bytes_processed":122342158,"total_duration":1.2836983,"snapshot_id":"360333437921660a5228a9c1b65a2d97381f0bc135499c6e851acb0ab84b0b0a"}
|
||||
"""
|
||||
|
||||
restic_str_outputs = {}
|
||||
# log file from restic v0.16.2
|
||||
restic_str_outputs["v0.16.2"] = \
|
||||
"""repository 962d5924 opened (version 2, compression level auto)
|
||||
using parent snapshot 325a2fa1
|
||||
[0:00] 100.00% 4 / 4 index files loaded
|
||||
|
||||
Files: 216 new, 21 changed, 5836 unmodified
|
||||
Dirs: 29 new, 47 changed, 817 unmodified
|
||||
Added to the repository: 4.425 MiB (1.431 MiB stored)
|
||||
|
||||
processed 6073 files, 116.657 MiB in 0:03
|
||||
snapshot b28b0901 save
|
||||
d"""
|
||||
|
||||
# log file from restic v0.14.0
|
||||
restic_str_outputs["v0.14.0"] = \
|
||||
"""using parent snapshot df60db01
|
||||
|
||||
Files: 1584 new, 269 changed, 235933 unmodified
|
||||
Dirs: 258 new, 714 changed, 37066 unmodified
|
||||
Added to the repo: 493.649 MiB
|
||||
|
||||
processed 237786 files, 85.487 GiB in 11:12"
|
||||
"""
|
||||
|
||||
# log file form restic v0.9.4
|
||||
restic_str_outputs["v0.9.4"] = \
|
||||
"""
|
||||
Files: 9 new, 32 changed, 110340 unmodified
|
||||
Dirs: 0 new, 2 changed, 0 unmodified
|
||||
Added to the repo: 196.568 MiB
|
||||
processed 110381 files, 107.331 GiB in 0:36
|
||||
"""
|
||||
|
||||
# restic_metrics_v1 prometheus output
|
||||
expected_results_V1 = [
|
||||
r'restic_repo_files{instance="test",backup_job="some_nas",state="new"} (\d+)',
|
||||
r'restic_repo_files{instance="test",backup_job="some_nas",state="changed"} (\d+)',
|
||||
r'restic_repo_files{instance="test",backup_job="some_nas",state="unmodified"} (\d+)',
|
||||
r'restic_repo_dirs{instance="test",backup_job="some_nas",state="new"} (\d+)',
|
||||
r'restic_repo_dirs{instance="test",backup_job="some_nas",state="changed"} (\d+)',
|
||||
r'restic_repo_dirs{instance="test",backup_job="some_nas",state="unmodified"} (\d+)',
|
||||
r'restic_repo_files{instance="test",backup_job="some_nas",state="total"} (\d+)',
|
||||
r'restic_repo_size_bytes{instance="test",backup_job="some_nas",state="total"} (\d+)',
|
||||
r'restic_backup_duration_seconds{instance="test",backup_job="some_nas",action="backup"} (\d+)',
|
||||
]
|
||||
|
||||
# restic_metrics_v2 prometheus output
|
||||
expected_results_V2 = [
|
||||
r'restic_files{instance="test",backup_job="some_nas",state="new",action="backup"} (\d+)',
|
||||
r'restic_files{instance="test",backup_job="some_nas",state="changed",action="backup"} (\d+)',
|
||||
r'restic_files{instance="test",backup_job="some_nas",state="unmodified",action="backup"} (\d+)',
|
||||
r'restic_dirs{instance="test",backup_job="some_nas",state="new",action="backup"} (\d+)',
|
||||
r'restic_dirs{instance="test",backup_job="some_nas",state="changed",action="backup"} (\d+)',
|
||||
r'restic_dirs{instance="test",backup_job="some_nas",state="unmodified",action="backup"} (\d+)',
|
||||
r'restic_files{instance="test",backup_job="some_nas",state="total",action="backup"} (\d+)',
|
||||
r'restic_snasphot_size_bytes{instance="test",backup_job="some_nas",action="backup",type="processed"} (\d+)',
|
||||
r'restic_total_duration_seconds{instance="test",backup_job="some_nas",action="backup"} (\d+)',
|
||||
]
|
||||
|
||||
def test_restic_str_output_2_metrics():
|
||||
instance = "test"
|
||||
backup_job = "some_nas"
|
||||
labels = "instance=\"{}\",backup_job=\"{}\"".format(instance, backup_job)
|
||||
for version, output in restic_str_outputs.items():
|
||||
print(f"Testing V1 parser restic str output from version {version}")
|
||||
errors, prom_metrics = restic_output_2_metrics(True, output, labels)
|
||||
assert errors is False
|
||||
#print(f"Parsed result:\n{prom_metrics}")
|
||||
for expected_result in expected_results_V1:
|
||||
match_found = False
|
||||
#print("Searching for {}".format(expected_result))
|
||||
for metric in prom_metrics:
|
||||
result = re.match(expected_result, metric)
|
||||
if result:
|
||||
match_found = True
|
||||
break
|
||||
assert match_found is True, 'No match found for {}'.format(expected_result)
|
||||
|
||||
|
||||
def test_restic_str_output_to_json():
|
||||
labels = {
|
||||
"instance": "test",
|
||||
"backup_job": "some_nas"
|
||||
}
|
||||
for version, output in restic_str_outputs.items():
|
||||
print(f"Testing V2 parser restic str output from version {version}")
|
||||
json_metrics = restic_str_output_to_json(True, output)
|
||||
assert json_metrics["errors"] == False
|
||||
#print(json_metrics)
|
||||
prom_metrics = restic_json_to_prometheus(json_metrics, labels)
|
||||
|
||||
#print(f"Parsed result:\n{prom_metrics}")
|
||||
for expected_result in expected_results_V2:
|
||||
match_found = False
|
||||
#print("Searching for {}".format(expected_result))
|
||||
for metric in prom_metrics:
|
||||
result = re.match(expected_result, metric)
|
||||
if result:
|
||||
match_found = True
|
||||
break
|
||||
assert match_found is True, 'No match found for {}'.format(expected_result)
|
||||
|
||||
|
||||
def test_restic_json_output():
|
||||
labels = {
|
||||
"instance": "test",
|
||||
"backup_job": "some_nas"
|
||||
}
|
||||
for version, json_output in restic_json_outputs.items():
|
||||
print(f"Testing V2 direct restic --json output from version {version}")
|
||||
restic_json = json.loads(json_output)
|
||||
prom_metrics = restic_json_to_prometheus(restic_json, labels)
|
||||
#print(f"Parsed result:\n{prom_metrics}")
|
||||
for expected_result in expected_results_V2:
|
||||
match_found = False
|
||||
#print("Searching for {}".format(expected_result))
|
||||
for metric in prom_metrics:
|
||||
result = re.match(expected_result, metric)
|
||||
if result:
|
||||
match_found = True
|
||||
break
|
||||
assert match_found is True, 'No match found for {}'.format(expected_result)
|
||||
|
||||
|
||||
def test_real_restic_output():
|
||||
labels = {
|
||||
"instance": "test",
|
||||
"backup_job": "some_nas"
|
||||
}
|
||||
restic_binary = get_restic_internal_binary(os_arch())
|
||||
print(f"Testing real restic output, Running with restic {restic_binary}")
|
||||
assert restic_binary is not None, "No restic binary found"
|
||||
|
||||
for api_arg in ['', ' --json']:
|
||||
|
||||
# Setup repo and run a quick backup
|
||||
repo_path = Path(tempfile.gettempdir()) / "repo"
|
||||
if repo_path.is_dir():
|
||||
shutil.rmtree(repo_path)
|
||||
repo_path.mkdir()
|
||||
|
||||
os.environ["RESTIC_REPOSITORY"] = str(repo_path)
|
||||
os.environ["RESTIC_PASSWORD"] = "TEST"
|
||||
|
||||
|
||||
exit_code, output = command_runner(f"{restic_binary} init --repository-version 2", live_output=True)
|
||||
cmd = f"{restic_binary} backup {api_arg} ."
|
||||
exit_code, output = command_runner(cmd, timeout=60, live_output=True)
|
||||
assert exit_code == 0, "Failed to run restic"
|
||||
if not api_arg:
|
||||
restic_json = restic_str_output_to_json(True, output)
|
||||
else:
|
||||
restic_json = output
|
||||
prom_metrics = restic_json_to_prometheus(restic_json, labels)
|
||||
#print(f"Parsed result:\n{prom_metrics}")
|
||||
for expected_result in expected_results_V2:
|
||||
match_found = False
|
||||
print("Searching for {}".format(expected_result))
|
||||
for metric in prom_metrics:
|
||||
result = re.match(expected_result, metric)
|
||||
if result:
|
||||
match_found = True
|
||||
break
|
||||
assert match_found is True, 'No match found for {}'.format(expected_result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_restic_str_output_2_metrics()
|
||||
test_restic_str_output_to_json()
|
||||
test_restic_json_output()
|
||||
test_real_restic_output()
|
Loading…
Add table
Reference in a new issue