Prometheus
- Canonical Observability
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/stable | 210 | 19 Nov 2024 | |
latest/candidate | 216 | 03 Dec 2024 | |
latest/beta | 221 | 03 Dec 2024 | |
latest/edge | 226 | 19 Dec 2024 | |
1.0/stable | 159 | 16 Feb 2024 | |
1.0/candidate | 159 | 12 Dec 2023 | |
1.0/beta | 159 | 12 Dec 2023 | |
1.0/edge | 159 | 12 Dec 2023 |
juju deploy prometheus-k8s
Deploy Kubernetes operators easily with Juju, the Universal Operator Lifecycle Manager. Need a Kubernetes cluster? Install MicroK8s to create a full CNCF-certified Kubernetes system in under 60 seconds.
Platform:
charms.prometheus_k8s.v0.prometheus_scrape
-
- Last updated 30 Apr 2024
- Revision Library version 0.47
# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.
"""Prometheus Scrape Library.
## Overview
This document explains how to integrate with the Prometheus charm
for the purpose of providing a metrics endpoint to Prometheus. It
also explains how alternative implementations of the Prometheus charms
may maintain the same interface and be backward compatible with all
currently integrated charms. Finally this document is the
authoritative reference on the structure of relation data that is
shared between Prometheus charms and any other charm that intends to
provide a scrape target for Prometheus.
## Source code
Source code can be found on GitHub at:
https://github.com/canonical/prometheus-k8s-operator/tree/main/lib/charms/prometheus_k8s
## Provider Library Usage
This Prometheus charm interacts with its scrape targets using its
charm library. Charms seeking to expose metric endpoints for the
Prometheus charm, must do so using the `MetricsEndpointProvider`
object from this charm library. For the simplest use cases, using the
`MetricsEndpointProvider` object only requires instantiating it,
typically in the constructor of your charm (the one which exposes a
metrics endpoint). The `MetricsEndpointProvider` constructor requires
the name of the relation over which a scrape target (metrics endpoint)
is exposed to the Prometheus charm. This relation must use the
`prometheus_scrape` interface. By default address of the metrics
endpoint is set to the unit IP address, by each unit of the
`MetricsEndpointProvider` charm. These units set their address in
response to the `PebbleReady` event of each container in the unit,
since container restarts of Kubernetes charms can result in change of
IP addresses. The default name for the metrics endpoint relation is
`metrics-endpoint`. It is strongly recommended to use the same
relation name for consistency across charms and doing so obviates the
need for an additional constructor argument. The
`MetricsEndpointProvider` object may be instantiated as follows
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
def __init__(self, *args):
super().__init__(*args)
...
self.metrics_endpoint = MetricsEndpointProvider(self)
...
Note that the first argument (`self`) to `MetricsEndpointProvider` is
always a reference to the parent (scrape target) charm.
An instantiated `MetricsEndpointProvider` object will ensure that each
unit of its parent charm, is a scrape target for the
`MetricsEndpointConsumer` (Prometheus) charm. By default
`MetricsEndpointProvider` assumes each unit of the consumer charm
exports its metrics at a path given by `/metrics` on port 80. These
defaults may be changed by providing the `MetricsEndpointProvider`
constructor an optional argument (`jobs`) that represents a
Prometheus scrape job specification using Python standard data
structures. This job specification is a subset of Prometheus' own
[scrape
configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config)
format but represented using Python data structures. More than one job
may be provided using the `jobs` argument. Hence `jobs` accepts a list
of dictionaries where each dictionary represents one `<scrape_config>`
object as described in the Prometheus documentation. The currently
supported configuration subset is: `job_name`, `metrics_path`,
`static_configs`
Suppose it is required to change the port on which scraped metrics are
exposed to 8000. This may be done by providing the following data
structure as the value of `jobs`.
```
[
{
"static_configs": [
{
"targets": ["*:8000"]
}
]
}
]
```
The wildcard ("*") host specification implies that the scrape targets
will automatically be set to the host addresses advertised by each
unit of the consumer charm.
It is also possible to change the metrics path and scrape multiple
ports, for example
```
[
{
"metrics_path": "/my-metrics-path",
"static_configs": [
{
"targets": ["*:8000", "*:8081"],
}
]
}
]
```
More complex scrape configurations are possible. For example
```
[
{
"static_configs": [
{
"targets": ["10.1.32.215:7000", "*:8000"],
"labels": {
"some_key": "some-value"
}
}
]
}
]
```
This example scrapes the target "10.1.32.215" at port 7000 in addition
to scraping each unit at port 8000. There is however one difference
between wildcard targets (specified using "*") and fully qualified
targets (such as "10.1.32.215"). The Prometheus charm automatically
associates labels with metrics generated by each target. These labels
localise the source of metrics within the Juju topology by specifying
its "model name", "model UUID", "application name" and "unit
name". However unit name is associated only with wildcard targets but
not with fully qualified targets.
Multiple jobs with different metrics paths and labels are allowed, but
each job must be given a unique name:
```
[
{
"job_name": "my-first-job",
"metrics_path": "one-path",
"static_configs": [
{
"targets": ["*:7000"],
"labels": {
"some_key": "some-value"
}
}
]
},
{
"job_name": "my-second-job",
"metrics_path": "another-path",
"static_configs": [
{
"targets": ["*:8000"],
"labels": {
"some_other_key": "some-other-value"
}
}
]
}
]
```
**Important:** `job_name` should be a fixed string (e.g. hardcoded literal).
For instance, if you include variable elements, like your `unit.name`, it may break
the continuity of the metrics time series gathered by Prometheus when the leader unit
changes (e.g. on upgrade or rescale).
Additionally, it is also technically possible, but **strongly discouraged**, to
configure the following scrape-related settings, which behave as described by the
[Prometheus documentation](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config):
- `static_configs`
- `scrape_interval`
- `scrape_timeout`
- `proxy_url`
- `relabel_configs`
- `metric_relabel_configs`
- `sample_limit`
- `label_limit`
- `label_name_length_limit`
- `label_value_length_limit`
The settings above are supported by the `prometheus_scrape` library only for the sake of
specialized facilities like the [Prometheus Scrape Config](https://charmhub.io/prometheus-scrape-config-k8s)
charm. Virtually no charms should use these settings, and charmers definitely **should not**
expose them to the Juju administrator via configuration options.
## Consumer Library Usage
The `MetricsEndpointConsumer` object may be used by Prometheus
charms to manage relations with their scrape targets. For this
purposes a Prometheus charm needs to do two things
1. Instantiate the `MetricsEndpointConsumer` object by providing it a
reference to the parent (Prometheus) charm and optionally the name of
the relation that the Prometheus charm uses to interact with scrape
targets. This relation must confirm to the `prometheus_scrape`
interface and it is strongly recommended that this relation be named
`metrics-endpoint` which is its default value.
For example a Prometheus charm may instantiate the
`MetricsEndpointConsumer` in its constructor as follows
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointConsumer
def __init__(self, *args):
super().__init__(*args)
...
self.metrics_consumer = MetricsEndpointConsumer(self)
...
2. A Prometheus charm also needs to respond to the
`TargetsChangedEvent` event of the `MetricsEndpointConsumer` by adding itself as
an observer for these events, as in
self.framework.observe(
self.metrics_consumer.on.targets_changed,
self._on_scrape_targets_changed,
)
In responding to the `TargetsChangedEvent` event the Prometheus
charm must update the Prometheus configuration so that any new scrape
targets are added and/or old ones removed from the list of scraped
endpoints. For this purpose the `MetricsEndpointConsumer` object
exposes a `jobs()` method that returns a list of scrape jobs. Each
element of this list is the Prometheus scrape configuration for that
job. In order to update the Prometheus configuration, the Prometheus
charm needs to replace the current list of jobs with the list provided
by `jobs()` as follows
def _on_scrape_targets_changed(self, event):
...
scrape_jobs = self.metrics_consumer.jobs()
for job in scrape_jobs:
prometheus_scrape_config.append(job)
...
## Alerting Rules
This charm library also supports gathering alerting rules from all
related `MetricsEndpointProvider` charms and enabling corresponding alerts within the
Prometheus charm. Alert rules are automatically gathered by `MetricsEndpointProvider`
charms when using this library, from a directory conventionally named
`prometheus_alert_rules`. This directory must reside at the top level
in the `src` folder of the consumer charm. Each file in this directory
is assumed to be in one of two formats:
- the official prometheus alert rule format, conforming to the
[Prometheus docs](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/)
- a single rule format, which is a simplified subset of the official format,
comprising a single alert rule per file, using the same YAML fields.
The file name must have one of the following extensions:
- `.rule`
- `.rules`
- `.yml`
- `.yaml`
An example of the contents of such a file in the custom single rule
format is shown below.
```
alert: HighRequestLatency
expr: job:request_latency_seconds:mean5m{my_key=my_value} > 0.5
for: 10m
labels:
severity: Medium
type: HighLatency
annotations:
summary: High request latency for {{ $labels.instance }}.
```
The `MetricsEndpointProvider` will read all available alert rules and
also inject "filtering labels" into the alert expressions. The
filtering labels ensure that alert rules are localised to the metrics
provider charm's Juju topology (application, model and its UUID). Such
a topology filter is essential to ensure that alert rules submitted by
one provider charm generates alerts only for that same charm. When
alert rules are embedded in a charm, and the charm is deployed as a
Juju application, the alert rules from that application have their
expressions automatically updated to filter for metrics coming from
the units of that application alone. This remove risk of spurious
evaluation, e.g., when you have multiple deployments of the same charm
monitored by the same Prometheus.
Not all alerts one may want to specify can be embedded in a
charm. Some alert rules will be specific to a user's use case. This is
the case, for example, of alert rules that are based on business
constraints, like expecting a certain amount of requests to a specific
API every five minutes. Such alert rules can be specified via the
[COS Config Charm](https://charmhub.io/cos-configuration-k8s),
which allows importing alert rules and other settings like dashboards
from a Git repository.
Gathering alert rules and generating rule files within the Prometheus
charm is easily done using the `alerts()` method of
`MetricsEndpointConsumer`. Alerts generated by Prometheus will
automatically include Juju topology labels in the alerts. These labels
indicate the source of the alert. The following labels are
automatically included with each alert
- `juju_model`
- `juju_model_uuid`
- `juju_application`
## Relation Data
The Prometheus charm uses both application and unit relation data to
obtain information regarding its scrape jobs, alert rules and scrape
targets. This relation data is in JSON format and it closely resembles
the YAML structure of Prometheus [scrape configuration]
(https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config).
Units of Metrics provider charms advertise their names and addresses
over unit relation data using the `prometheus_scrape_unit_name` and
`prometheus_scrape_unit_address` keys. While the `scrape_metadata`,
`scrape_jobs` and `alert_rules` keys in application relation data
of Metrics provider charms hold eponymous information.
""" # noqa: W505
import copy
import hashlib
import ipaddress
import json
import logging
import os
import platform
import re
import socket
import subprocess
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
import yaml
from cosl import JujuTopology
from cosl.rules import AlertRules
from ops.charm import CharmBase, RelationRole
from ops.framework import (
BoundEvent,
EventBase,
EventSource,
Object,
ObjectEvents,
StoredDict,
StoredList,
StoredState,
)
from ops.model import Relation
# The unique Charmhub library identifier, never change it
LIBID = "bc84295fef5f4049878f07b131968ee2"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 47
PYDEPS = ["cosl"]
logger = logging.getLogger(__name__)
ALLOWED_KEYS = {
"job_name",
"metrics_path",
"static_configs",
"scrape_interval",
"scrape_timeout",
"proxy_url",
"relabel_configs",
"metric_relabel_configs",
"sample_limit",
"label_limit",
"label_name_length_limit",
"label_value_length_limit",
"scheme",
"basic_auth",
"tls_config",
"authorization",
"params",
}
DEFAULT_JOB = {
"metrics_path": "/metrics",
"static_configs": [{"targets": ["*:80"]}],
}
DEFAULT_RELATION_NAME = "metrics-endpoint"
RELATION_INTERFACE_NAME = "prometheus_scrape"
DEFAULT_ALERT_RULES_RELATIVE_PATH = "./src/prometheus_alert_rules"
class PrometheusConfig:
"""A namespace for utility functions for manipulating the prometheus config dict."""
# relabel instance labels so that instance identifiers are globally unique
# stable over unit recreation
topology_relabel_config = {
"source_labels": ["juju_model", "juju_model_uuid", "juju_application"],
"separator": "_",
"target_label": "instance",
"regex": "(.*)",
}
topology_relabel_config_wildcard = {
"source_labels": ["juju_model", "juju_model_uuid", "juju_application", "juju_unit"],
"separator": "_",
"target_label": "instance",
"regex": "(.*)",
}
@staticmethod
def sanitize_scrape_config(job: dict) -> dict:
"""Restrict permissible scrape configuration options.
If job is empty then a default job is returned. The
default job is
```
{
"metrics_path": "/metrics",
"static_configs": [{"targets": ["*:80"]}],
}
```
Args:
job: a dict containing a single Prometheus job
specification.
Returns:
a dictionary containing a sanitized job specification.
"""
sanitized_job = DEFAULT_JOB.copy()
sanitized_job.update({key: value for key, value in job.items() if key in ALLOWED_KEYS})
return sanitized_job
@staticmethod
def sanitize_scrape_configs(scrape_configs: List[dict]) -> List[dict]:
"""A vectorized version of `sanitize_scrape_config`."""
return [PrometheusConfig.sanitize_scrape_config(job) for job in scrape_configs]
@staticmethod
def prefix_job_names(scrape_configs: List[dict], prefix: str) -> List[dict]:
"""Adds the given prefix to all the job names in the given scrape_configs list."""
modified_scrape_configs = []
for scrape_config in scrape_configs:
job_name = scrape_config.get("job_name")
modified = scrape_config.copy()
modified["job_name"] = prefix + "_" + job_name if job_name else prefix
modified_scrape_configs.append(modified)
return modified_scrape_configs
@staticmethod
def expand_wildcard_targets_into_individual_jobs(
scrape_jobs: List[dict],
hosts: Dict[str, Tuple[str, str]],
topology: Optional[JujuTopology] = None,
) -> List[dict]:
"""Extract wildcard hosts from the given scrape_configs list into separate jobs.
Args:
scrape_jobs: list of scrape jobs.
hosts: a dictionary mapping host names to host address for
all units of the relation for which this job configuration
must be constructed.
topology: optional arg for adding topology labels to scrape targets.
"""
# hosts = self._relation_hosts(relation)
modified_scrape_jobs = []
for job in scrape_jobs:
static_configs = job.get("static_configs")
if not static_configs:
continue
# When a single unit specified more than one wildcard target, then they are expanded
# into a static_config per target
non_wildcard_static_configs = []
for static_config in static_configs:
targets = static_config.get("targets")
if not targets:
continue
# All non-wildcard targets remain in the same static_config
non_wildcard_targets = []
# All wildcard targets are extracted to a job per unit. If multiple wildcard
# targets are specified, they remain in the same static_config (per unit).
wildcard_targets = []
for target in targets:
match = re.compile(r"\*(?:(:\d+))?").match(target)
if match:
# This is a wildcard target.
# Need to expand into separate jobs and remove it from this job here
wildcard_targets.append(target)
else:
# This is not a wildcard target. Copy it over into its own static_config.
non_wildcard_targets.append(target)
# All non-wildcard targets remain in the same static_config
if non_wildcard_targets:
non_wildcard_static_config = static_config.copy()
non_wildcard_static_config["targets"] = non_wildcard_targets
if topology:
# When non-wildcard targets (aka fully qualified hostnames) are specified,
# there is no reliable way to determine the name (Juju topology unit name)
# for such a target. Therefore labeling with Juju topology, excluding the
# unit name.
non_wildcard_static_config["labels"] = {
**topology.label_matcher_dict,
**non_wildcard_static_config.get("labels", {}),
}
non_wildcard_static_configs.append(non_wildcard_static_config)
# Extract wildcard targets into individual jobs
if wildcard_targets:
for unit_name, (unit_hostname, unit_path) in hosts.items():
modified_job = job.copy()
modified_job["static_configs"] = [static_config.copy()]
modified_static_config = modified_job["static_configs"][0]
modified_static_config["targets"] = [
target.replace("*", unit_hostname) for target in wildcard_targets
]
unit_num = unit_name.split("/")[-1]
job_name = modified_job.get("job_name", "unnamed-job") + "-" + unit_num
modified_job["job_name"] = job_name
modified_job["metrics_path"] = unit_path + (
job.get("metrics_path") or "/metrics"
)
if topology:
# Add topology labels
modified_static_config["labels"] = {
**topology.label_matcher_dict,
**{"juju_unit": unit_name},
**modified_static_config.get("labels", {}),
}
# Instance relabeling for topology should be last in order.
modified_job["relabel_configs"] = modified_job.get(
"relabel_configs", []
) + [PrometheusConfig.topology_relabel_config_wildcard]
modified_scrape_jobs.append(modified_job)
if non_wildcard_static_configs:
modified_job = job.copy()
modified_job["static_configs"] = non_wildcard_static_configs
modified_job["metrics_path"] = modified_job.get("metrics_path") or "/metrics"
if topology:
# Instance relabeling for topology should be last in order.
modified_job["relabel_configs"] = modified_job.get("relabel_configs", []) + [
PrometheusConfig.topology_relabel_config
]
modified_scrape_jobs.append(modified_job)
return modified_scrape_jobs
@staticmethod
def render_alertmanager_static_configs(alertmanagers: List[str]):
"""Render the alertmanager static_configs section from a list of URLs.
Each target must be in the hostname:port format, and prefixes are specified in a separate
key. Therefore, with ingress in place, would need to extract the path into the
`path_prefix` key, which is higher up in the config hierarchy.
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#alertmanager_config
Args:
alertmanagers: List of alertmanager URLs.
Returns:
A dict representation for the static_configs section.
"""
# Make sure it's a valid url so urlparse could parse it.
scheme = re.compile(r"^https?://")
sanitized = [am if scheme.search(am) else "http://" + am for am in alertmanagers]
# Create a mapping from paths to netlocs
# Group alertmanager targets into a dictionary of lists:
# {path: [netloc1, netloc2]}
paths = defaultdict(list) # type: Dict[Tuple[str, str], List[str]]
for parsed in map(urlparse, sanitized):
path = parsed.path or "/"
paths[(parsed.scheme, path)].append(parsed.netloc)
return {
"alertmanagers": [
{
# For https we still do not render a `tls_config` section because
# certs are expected to be made available by the charm via the
# `update-ca-certificates` mechanism.
"scheme": scheme,
"path_prefix": path_prefix,
"static_configs": [{"targets": netlocs}],
}
for (scheme, path_prefix), netlocs in paths.items()
]
}
class RelationNotFoundError(Exception):
"""Raised if there is no relation with the given name is found."""
def __init__(self, relation_name: str):
self.relation_name = relation_name
self.message = "No relation named '{}' found".format(relation_name)
super().__init__(self.message)
class RelationInterfaceMismatchError(Exception):
"""Raised if the relation with the given name has a different interface."""
def __init__(
self,
relation_name: str,
expected_relation_interface: str,
actual_relation_interface: str,
):
self.relation_name = relation_name
self.expected_relation_interface = expected_relation_interface
self.actual_relation_interface = actual_relation_interface
self.message = (
"The '{}' relation has '{}' as interface rather than the expected '{}'".format(
relation_name, actual_relation_interface, expected_relation_interface
)
)
super().__init__(self.message)
class RelationRoleMismatchError(Exception):
"""Raised if the relation with the given name has a different role."""
def __init__(
self,
relation_name: str,
expected_relation_role: RelationRole,
actual_relation_role: RelationRole,
):
self.relation_name = relation_name
self.expected_relation_interface = expected_relation_role
self.actual_relation_role = actual_relation_role
self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format(
relation_name, repr(actual_relation_role), repr(expected_relation_role)
)
super().__init__(self.message)
class InvalidAlertRuleEvent(EventBase):
"""Event emitted when alert rule files are not parsable.
Enables us to set a clear status on the provider.
"""
def __init__(self, handle, errors: str = "", valid: bool = False):
super().__init__(handle)
self.errors = errors
self.valid = valid
def snapshot(self) -> Dict:
"""Save alert rule information."""
return {
"valid": self.valid,
"errors": self.errors,
}
def restore(self, snapshot):
"""Restore alert rule information."""
self.valid = snapshot["valid"]
self.errors = snapshot["errors"]
class InvalidScrapeJobEvent(EventBase):
"""Event emitted when alert rule files are not valid."""
def __init__(self, handle, errors: str = ""):
super().__init__(handle)
self.errors = errors
def snapshot(self) -> Dict:
"""Save error information."""
return {"errors": self.errors}
def restore(self, snapshot):
"""Restore error information."""
self.errors = snapshot["errors"]
class MetricsEndpointProviderEvents(ObjectEvents):
"""Events raised by :class:`InvalidAlertRuleEvent`s."""
alert_rule_status_changed = EventSource(InvalidAlertRuleEvent)
invalid_scrape_job = EventSource(InvalidScrapeJobEvent)
def _type_convert_stored(obj):
"""Convert Stored* to their appropriate types, recursively."""
if isinstance(obj, StoredList):
return list(map(_type_convert_stored, obj))
if isinstance(obj, StoredDict):
rdict = {} # type: Dict[Any, Any]
for k in obj.keys():
rdict[k] = _type_convert_stored(obj[k])
return rdict
return obj
def _validate_relation_by_interface_and_direction(
charm: CharmBase,
relation_name: str,
expected_relation_interface: str,
expected_relation_role: RelationRole,
):
"""Verifies that a relation has the necessary characteristics.
Verifies that the `relation_name` provided: (1) exists in metadata.yaml,
(2) declares as interface the interface name passed as `relation_interface`
and (3) has the right "direction", i.e., it is a relation that `charm`
provides or requires.
Args:
charm: a `CharmBase` object to scan for the matching relation.
relation_name: the name of the relation to be verified.
expected_relation_interface: the interface name to be matched by the
relation named `relation_name`.
expected_relation_role: whether the `relation_name` must be either
provided or required by `charm`.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the same relation interface
as specified via the `expected_relation_interface` argument.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the same role as specified
via the `expected_relation_role` argument.
"""
if relation_name not in charm.meta.relations:
raise RelationNotFoundError(relation_name)
relation = charm.meta.relations[relation_name]
actual_relation_interface = relation.interface_name
if actual_relation_interface != expected_relation_interface:
raise RelationInterfaceMismatchError(
relation_name, expected_relation_interface, actual_relation_interface or "None"
)
if expected_relation_role == RelationRole.provides:
if relation_name not in charm.meta.provides:
raise RelationRoleMismatchError(
relation_name, RelationRole.provides, RelationRole.requires
)
elif expected_relation_role == RelationRole.requires:
if relation_name not in charm.meta.requires:
raise RelationRoleMismatchError(
relation_name, RelationRole.requires, RelationRole.provides
)
else:
raise Exception("Unexpected RelationDirection: {}".format(expected_relation_role))
class InvalidAlertRulePathError(Exception):
"""Raised if the alert rules folder cannot be found or is otherwise invalid."""
def __init__(
self,
alert_rules_absolute_path: Path,
message: str,
):
self.alert_rules_absolute_path = alert_rules_absolute_path
self.message = message
super().__init__(self.message)
def _is_official_alert_rule_format(rules_dict: dict) -> bool:
"""Are alert rules in the upstream format as supported by Prometheus.
Alert rules in dictionary format are in "official" form if they
contain a "groups" key, since this implies they contain a list of
alert rule groups.
Args:
rules_dict: a set of alert rules in Python dictionary format
Returns:
True if alert rules are in official Prometheus file format.
"""
return "groups" in rules_dict
def _is_single_alert_rule_format(rules_dict: dict) -> bool:
"""Are alert rules in single rule format.
The Prometheus charm library supports reading of alert rules in a
custom format that consists of a single alert rule per file. This
does not conform to the official Prometheus alert rule file format
which requires that each alert rules file consists of a list of
alert rule groups and each group consists of a list of alert
rules.
Alert rules in dictionary form are considered to be in single rule
format if in the least it contains two keys corresponding to the
alert rule name and alert expression.
Returns:
True if alert rule is in single rule file format.
"""
# one alert rule per file
return set(rules_dict) >= {"alert", "expr"}
class TargetsChangedEvent(EventBase):
"""Event emitted when Prometheus scrape targets change."""
def __init__(self, handle, relation_id):
super().__init__(handle)
self.relation_id = relation_id
def snapshot(self):
"""Save scrape target relation information."""
return {"relation_id": self.relation_id}
def restore(self, snapshot):
"""Restore scrape target relation information."""
self.relation_id = snapshot["relation_id"]
class MonitoringEvents(ObjectEvents):
"""Event descriptor for events raised by `MetricsEndpointConsumer`."""
targets_changed = EventSource(TargetsChangedEvent)
class MetricsEndpointConsumer(Object):
"""A Prometheus based Monitoring service."""
on = MonitoringEvents() # pyright: ignore
def __init__(self, charm: CharmBase, relation_name: str = DEFAULT_RELATION_NAME):
"""A Prometheus based Monitoring service.
Args:
charm: a `CharmBase` instance that manages this
instance of the Prometheus service.
relation_name: an optional string name of the relation between `charm`
and the Prometheus charmed service. The default is "metrics-endpoint".
It is strongly advised not to change the default, so that people
deploying your charm will have a consistent experience with all
other charms that consume metrics endpoints.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the `prometheus_scrape` relation
interface.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the `RelationRole.requires`
role.
"""
_validate_relation_by_interface_and_direction(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires
)
super().__init__(charm, relation_name)
self._charm = charm
self._relation_name = relation_name
self._tool = CosTool(self._charm)
events = self._charm.on[relation_name]
self.framework.observe(events.relation_changed, self._on_metrics_provider_relation_changed)
self.framework.observe(
events.relation_departed, self._on_metrics_provider_relation_departed
)
def _on_metrics_provider_relation_changed(self, event):
"""Handle changes with related metrics providers.
Anytime there are changes in relations between Prometheus
and metrics provider charms the Prometheus charm is informed,
through a `TargetsChangedEvent` event. The Prometheus charm can
then choose to update its scrape configuration.
Args:
event: a `CharmEvent` in response to which the Prometheus
charm must update its scrape configuration.
"""
rel_id = event.relation.id
self.on.targets_changed.emit(relation_id=rel_id)
def _on_metrics_provider_relation_departed(self, event):
"""Update job config when a metrics provider departs.
When a metrics provider departs the Prometheus charm is informed
through a `TargetsChangedEvent` event so that it can update its
scrape configuration to ensure that the departed metrics provider
is removed from the list of scrape jobs and
Args:
event: a `CharmEvent` that indicates a metrics provider
unit has departed.
"""
rel_id = event.relation.id
self.on.targets_changed.emit(relation_id=rel_id)
def jobs(self) -> list:
"""Fetch the list of scrape jobs.
Returns:
A list consisting of all the static scrape configurations
for each related `MetricsEndpointProvider` that has specified
its scrape targets.
"""
scrape_jobs = []
for relation in self._charm.model.relations[self._relation_name]:
static_scrape_jobs = self._static_scrape_config(relation)
if static_scrape_jobs:
# Duplicate job names will cause validate_scrape_jobs to fail.
# Therefore we need to dedupe here and after all jobs are collected.
static_scrape_jobs = _dedupe_job_names(static_scrape_jobs)
try:
self._tool.validate_scrape_jobs(static_scrape_jobs)
except subprocess.CalledProcessError as e:
if self._charm.unit.is_leader():
data = json.loads(relation.data[self._charm.app].get("event", "{}"))
data["scrape_job_errors"] = str(e)
relation.data[self._charm.app]["event"] = json.dumps(data)
else:
scrape_jobs.extend(static_scrape_jobs)
scrape_jobs = _dedupe_job_names(scrape_jobs)
return scrape_jobs
@property
def alerts(self) -> dict:
"""Fetch alerts for all relations.
A Prometheus alert rules file consists of a list of "groups". Each
group consists of a list of alerts (`rules`) that are sequentially
executed. This method returns all the alert rules provided by each
related metrics provider charm. These rules may be used to generate a
separate alert rules file for each relation since the returned list
of alert groups are indexed by that relations Juju topology identifier.
The Juju topology identifier string includes substrings that identify
alert rule related metadata such as the Juju model, model UUID and the
application name from where the alert rule originates. Since this
topology identifier is globally unique, it may be used for instance as
the name for the file into which the list of alert rule groups are
written. For each relation, the structure of data returned is a dictionary
representation of a standard prometheus rules file:
{"groups": [{"name": ...}, ...]}
per official prometheus documentation
https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/
The value of the `groups` key is such that it may be used to generate
a Prometheus alert rules file directly using `yaml.dump` but the
`groups` key itself must be included as this is required by Prometheus.
For example the list of alert rule groups returned by this method may
be written into files consumed by Prometheus as follows
```
for topology_identifier, alert_rule_groups in self.metrics_consumer.alerts().items():
filename = "juju_" + topology_identifier + ".rules"
path = os.path.join(PROMETHEUS_RULES_DIR, filename)
rules = yaml.safe_dump(alert_rule_groups)
container.push(path, rules, make_dirs=True)
```
Returns:
A dictionary mapping the Juju topology identifier of the source charm to
its list of alert rule groups.
"""
alerts = {} # type: Dict[str, dict] # mapping b/w juju identifiers and alert rule files
for relation in self._charm.model.relations[self._relation_name]:
if not relation.units or not relation.app:
continue
alert_rules = json.loads(relation.data[relation.app].get("alert_rules", "{}"))
if not alert_rules:
continue
alert_rules = self._inject_alert_expr_labels(alert_rules)
identifier, topology = self._get_identifier_by_alert_rules(alert_rules)
if not topology:
try:
scrape_metadata = json.loads(relation.data[relation.app]["scrape_metadata"])
identifier = JujuTopology.from_dict(scrape_metadata).identifier
except KeyError as e:
logger.debug(
"Relation %s has no 'scrape_metadata': %s",
relation.id,
e,
)
if not identifier:
logger.error(
"Alert rules were found but no usable group or identifier was present."
)
continue
# We need to append the relation info to the identifier. This is to allow for cases for there are two
# relations which eventually scrape the same application. Issue #551.
identifier = f"{identifier}_{relation.name}_{relation.id}"
alerts[identifier] = alert_rules
_, errmsg = self._tool.validate_alert_rules(alert_rules)
if errmsg:
if alerts[identifier]:
del alerts[identifier]
if self._charm.unit.is_leader():
data = json.loads(relation.data[self._charm.app].get("event", "{}"))
data["errors"] = errmsg
relation.data[self._charm.app]["event"] = json.dumps(data)
continue
return alerts
def _get_identifier_by_alert_rules(
self, rules: dict
) -> Tuple[Union[str, None], Union[JujuTopology, None]]:
"""Determine an appropriate dict key for alert rules.
The key is used as the filename when writing alerts to disk, so the structure
and uniqueness is important.
Args:
rules: a dict of alert rules
Returns:
A tuple containing an identifier, if found, and a JujuTopology, if it could
be constructed.
"""
if "groups" not in rules:
logger.debug("No alert groups were found in relation data")
return None, None
# Construct an ID based on what's in the alert rules if they have labels
for group in rules["groups"]:
try:
labels = group["rules"][0]["labels"]
topology = JujuTopology(
# Don't try to safely get required constructor fields. There's already
# a handler for KeyErrors
model_uuid=labels["juju_model_uuid"],
model=labels["juju_model"],
application=labels["juju_application"],
unit=labels.get("juju_unit", ""),
charm_name=labels.get("juju_charm", ""),
)
return topology.identifier, topology
except KeyError:
logger.debug("Alert rules were found but no usable labels were present")
continue
logger.warning(
"No labeled alert rules were found, and no 'scrape_metadata' "
"was available. Using the alert group name as filename."
)
try:
for group in rules["groups"]:
return group["name"], None
except KeyError:
logger.debug("No group name was found to use as identifier")
return None, None
def _inject_alert_expr_labels(self, rules: Dict[str, Any]) -> Dict[str, Any]:
"""Iterate through alert rules and inject topology into expressions.
Args:
rules: a dict of alert rules
"""
if "groups" not in rules:
return rules
modified_groups = []
for group in rules["groups"]:
# Copy off rules, so we don't modify an object we're iterating over
rules_copy = group["rules"]
for idx, rule in enumerate(rules_copy):
labels = rule.get("labels")
if labels:
try:
topology = JujuTopology(
# Don't try to safely get required constructor fields. There's already
# a handler for KeyErrors
model_uuid=labels["juju_model_uuid"],
model=labels["juju_model"],
application=labels["juju_application"],
unit=labels.get("juju_unit", ""),
charm_name=labels.get("juju_charm", ""),
)
# Inject topology and put it back in the list
rule["expr"] = self._tool.inject_label_matchers(
re.sub(r"%%juju_topology%%,?", "", rule["expr"]),
topology.alert_expression_dict,
)
except KeyError:
# Some required JujuTopology key is missing. Just move on.
pass
group["rules"][idx] = rule
modified_groups.append(group)
rules["groups"] = modified_groups
return rules
def _static_scrape_config(self, relation) -> list:
"""Generate the static scrape configuration for a single relation.
If the relation data includes `scrape_metadata` then the value
of this key is used to annotate the scrape jobs with Juju
Topology labels before returning them.
Args:
relation: an `ops.model.Relation` object whose static
scrape configuration is required.
Returns:
A list (possibly empty) of scrape jobs. Each job is a
valid Prometheus scrape configuration for that job,
represented as a Python dictionary.
"""
if not relation.units:
return []
scrape_configs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]"))
if not scrape_configs:
return []
scrape_metadata = json.loads(relation.data[relation.app].get("scrape_metadata", "{}"))
if not scrape_metadata:
return scrape_configs
topology = JujuTopology.from_dict(scrape_metadata)
job_name_prefix = "juju_{}_prometheus_scrape".format(topology.identifier)
scrape_configs = PrometheusConfig.prefix_job_names(scrape_configs, job_name_prefix)
scrape_configs = PrometheusConfig.sanitize_scrape_configs(scrape_configs)
hosts = self._relation_hosts(relation)
scrape_configs = PrometheusConfig.expand_wildcard_targets_into_individual_jobs(
scrape_configs, hosts, topology
)
# For https scrape targets we still do not render a `tls_config` section because certs
# are expected to be made available by the charm via the `update-ca-certificates` mechanism.
return scrape_configs
def _relation_hosts(self, relation: Relation) -> Dict[str, Tuple[str, str]]:
"""Returns a mapping from unit names to (address, path) tuples, for the given relation."""
hosts = {}
for unit in relation.units:
# TODO deprecate and remove unit.name
unit_name = relation.data[unit].get("prometheus_scrape_unit_name") or unit.name
# TODO deprecate and remove "prometheus_scrape_host"
unit_address = relation.data[unit].get(
"prometheus_scrape_unit_address"
) or relation.data[unit].get("prometheus_scrape_host")
unit_path = relation.data[unit].get("prometheus_scrape_unit_path", "")
if unit_name and unit_address:
hosts.update({unit_name: (unit_address, unit_path)})
return hosts
def _target_parts(self, target) -> list:
"""Extract host and port from a wildcard target.
Args:
target: a string specifying a scrape target. A
scrape target is expected to have the format
"host:port". The host part may be a wildcard
"*" and the port part can be missing (along
with ":") in which case port is set to 80.
Returns:
a list with target host and port as in [host, port]
"""
if ":" in target:
parts = target.split(":")
else:
parts = [target, "80"]
return parts
def _dedupe_job_names(jobs: List[dict]):
"""Deduplicate a list of dicts by appending a hash to the value of the 'job_name' key.
Additionally, fully de-duplicate any identical jobs.
Args:
jobs: A list of prometheus scrape jobs
"""
jobs_copy = copy.deepcopy(jobs)
# Convert to a dict with job names as keys
# I think this line is O(n^2) but it should be okay given the list sizes
jobs_dict = {
job["job_name"]: list(filter(lambda x: x["job_name"] == job["job_name"], jobs_copy))
for job in jobs_copy
}
# If multiple jobs have the same name, convert the name to "name_<hash-of-job>"
for key in jobs_dict:
if len(jobs_dict[key]) > 1:
for job in jobs_dict[key]:
job_json = json.dumps(job)
hashed = hashlib.sha256(job_json.encode()).hexdigest()
job["job_name"] = "{}_{}".format(job["job_name"], hashed)
new_jobs = []
for key in jobs_dict:
new_jobs.extend(list(jobs_dict[key]))
# Deduplicate jobs which are equal
# Again this in O(n^2) but it should be okay
deduped_jobs = []
seen = []
for job in new_jobs:
job_json = json.dumps(job)
hashed = hashlib.sha256(job_json.encode()).hexdigest()
if hashed in seen:
continue
seen.append(hashed)
deduped_jobs.append(job)
return deduped_jobs
def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str:
"""Resolve the provided path items against the directory of the main file.
Look up the directory of the `main.py` file being executed. This is normally
going to be the charm.py file of the charm including this library. Then, resolve
the provided path elements and, if the result path exists and is a directory,
return its absolute path; otherwise, raise en exception.
Raises:
InvalidAlertRulePathError, if the path does not exist or is not a directory.
"""
charm_dir = Path(str(charm.charm_dir))
if not charm_dir.exists() or not charm_dir.is_dir():
# Operator Framework does not currently expose a robust
# way to determine the top level charm source directory
# that is consistent across deployed charms and unit tests
# Hence for unit tests the current working directory is used
# TODO: updated this logic when the following ticket is resolved
# https://github.com/canonical/operator/issues/643
charm_dir = Path(os.getcwd())
alerts_dir_path = charm_dir.absolute().joinpath(*path_elements)
if not alerts_dir_path.exists():
raise InvalidAlertRulePathError(alerts_dir_path, "directory does not exist")
if not alerts_dir_path.is_dir():
raise InvalidAlertRulePathError(alerts_dir_path, "is not a directory")
return str(alerts_dir_path)
class MetricsEndpointProvider(Object):
"""A metrics endpoint for Prometheus."""
on = MetricsEndpointProviderEvents() # pyright: ignore
def __init__(
self,
charm,
relation_name: str = DEFAULT_RELATION_NAME,
jobs=None,
alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
external_url: str = "",
lookaside_jobs_callable: Optional[Callable] = None,
):
"""Construct a metrics provider for a Prometheus charm.
If your charm exposes a Prometheus metrics endpoint, the
`MetricsEndpointProvider` object enables your charm to easily
communicate how to reach that metrics endpoint.
By default, a charm instantiating this object has the metrics
endpoints of each of its units scraped by the related Prometheus
charms. The scraped metrics are automatically tagged by the
Prometheus charms with Juju topology data via the
`juju_model_name`, `juju_model_uuid`, `juju_application_name`
and `juju_unit` labels. To support such tagging `MetricsEndpointProvider`
automatically forwards scrape metadata to a `MetricsEndpointConsumer`
(Prometheus charm).
Scrape targets provided by `MetricsEndpointProvider` can be
customized when instantiating this object. For example in the
case of a charm exposing the metrics endpoint for each of its
units on port 8080 and the `/metrics` path, the
`MetricsEndpointProvider` can be instantiated as follows:
self.metrics_endpoint_provider = MetricsEndpointProvider(
self,
jobs=[{
"static_configs": [{"targets": ["*:8080"]}],
}])
The notation `*:<port>` means "scrape each unit of this charm on port
`<port>`.
In case the metrics endpoints are not on the standard `/metrics` path,
a custom path can be specified as follows:
self.metrics_endpoint_provider = MetricsEndpointProvider(
self,
jobs=[{
"metrics_path": "/my/strange/metrics/path",
"static_configs": [{"targets": ["*:8080"]}],
}])
Note how the `jobs` argument is a list: this allows you to expose multiple
combinations of paths "metrics_path" and "static_configs" in case your charm
exposes multiple endpoints, which could happen, for example, when you have
multiple workload containers, with applications in each needing to be scraped.
The structure of the objects in the `jobs` list is one-to-one with the
`scrape_config` configuration item of Prometheus' own configuration (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
), but with only a subset of the fields allowed. The permitted fields are
listed in `ALLOWED_KEYS` object in this charm library module.
It is also possible to specify alert rules. By default, this library will look
into the `<charm_parent_dir>/prometheus_alert_rules`, which in a standard charm
layouts resolves to `src/prometheus_alert_rules`. Each alert rule goes into a
separate `*.rule` file. If the syntax of a rule is invalid,
the `MetricsEndpointProvider` logs an error and does not load the particular
rule.
To avoid false positives and negatives in the evaluation of alert rules,
all ingested alert rule expressions are automatically qualified using Juju
Topology filters. This ensures that alert rules provided by your charm, trigger
alerts based only on data scrapped from your charm. For example an alert rule
such as the following
alert: UnitUnavailable
expr: up < 1
for: 0m
will be automatically transformed into something along the lines of the following
alert: UnitUnavailable
expr: up{juju_model=<model>, juju_model_uuid=<uuid-prefix>, juju_application=<app>} < 1
for: 0m
An attempt will be made to validate alert rules prior to loading them into Prometheus.
If they are invalid, an event will be emitted from this object which charms can respond
to in order to set a meaningful status for administrators.
This can be observed via `consumer.on.alert_rule_status_changed` which contains:
- The error(s) encountered when validating as `errors`
- A `valid` attribute, which can be used to reset the state of charms if alert rules
are updated via another mechanism (e.g. `cos-config`) and refreshed.
Args:
charm: a `CharmBase` object that manages this
`MetricsEndpointProvider` object. Typically, this is
`self` in the instantiating class.
relation_name: an optional string name of the relation between `charm`
and the Prometheus charmed service. The default is "metrics-endpoint".
It is strongly advised not to change the default, so that people
deploying your charm will have a consistent experience with all
other charms that provide metrics endpoints.
jobs: an optional list of dictionaries where each
dictionary represents the Prometheus scrape
configuration for a single job. When not provided, a
default scrape configuration is provided for the
`/metrics` endpoint polling all units of the charm on port `80`
using the `MetricsEndpointProvider` object.
alert_rules_path: an optional path for the location of alert rules
files. Defaults to "./prometheus_alert_rules",
resolved relative to the directory hosting the charm entry file.
The alert rules are automatically updated on charm upgrade.
refresh_event: an optional bound event or list of bound events which
will be observed to re-set scrape job data (IP address and others)
external_url: an optional argument that represents an external url that
can be generated by an Ingress or a Proxy.
lookaside_jobs_callable: an optional `Callable` which should be invoked
when the job configuration is built as a secondary mapping. The callable
should return a `List[Dict]` which is syntactically identical to the
`jobs` parameter, but can be updated out of step initialization of
this library without disrupting the 'global' job spec.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the `prometheus_scrape` relation
interface.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the `RelationRole.provides`
role.
"""
_validate_relation_by_interface_and_direction(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides
)
try:
alert_rules_path = _resolve_dir_against_charm_path(charm, alert_rules_path)
except InvalidAlertRulePathError as e:
logger.debug(
"Invalid Prometheus alert rules folder at %s: %s",
e.alert_rules_absolute_path,
e.message,
)
super().__init__(charm, relation_name)
self.topology = JujuTopology.from_charm(charm)
self._charm = charm
self._alert_rules_path = alert_rules_path
self._relation_name = relation_name
# sanitize job configurations to the supported subset of parameters
jobs = [] if jobs is None else jobs
self._jobs = PrometheusConfig.sanitize_scrape_configs(jobs)
if external_url:
external_url = (
external_url if urlparse(external_url).scheme else ("http://" + external_url)
)
self.external_url = external_url
self._lookaside_jobs = lookaside_jobs_callable
events = self._charm.on[self._relation_name]
self.framework.observe(events.relation_changed, self._on_relation_changed)
if not refresh_event:
# FIXME remove once podspec charms are verified.
# `self.set_scrape_job_spec()` is called every re-init so this should not be needed.
if len(self._charm.meta.containers) == 1:
if "kubernetes" in self._charm.meta.series:
# This is a podspec charm
refresh_event = [self._charm.on.update_status]
else:
# This is a sidecar/pebble charm
container = list(self._charm.meta.containers.values())[0]
refresh_event = [self._charm.on[container.name.replace("-", "_")].pebble_ready]
else:
logger.warning(
"%d containers are present in metadata.yaml and "
"refresh_event was not specified. Defaulting to update_status. "
"Metrics IP may not be set in a timely fashion.",
len(self._charm.meta.containers),
)
refresh_event = [self._charm.on.update_status]
else:
if not isinstance(refresh_event, list):
refresh_event = [refresh_event]
self.framework.observe(events.relation_joined, self.set_scrape_job_spec)
for ev in refresh_event:
self.framework.observe(ev, self.set_scrape_job_spec)
def _on_relation_changed(self, event):
"""Check for alert rule messages in the relation data before moving on."""
if self._charm.unit.is_leader():
ev = json.loads(event.relation.data[event.app].get("event", "{}"))
if ev:
valid = bool(ev.get("valid", True))
errors = ev.get("errors", "")
if valid and not errors:
self.on.alert_rule_status_changed.emit(valid=valid)
else:
self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
scrape_errors = ev.get("scrape_job_errors", None)
if scrape_errors:
self.on.invalid_scrape_job.emit(errors=scrape_errors)
def update_scrape_job_spec(self, jobs):
"""Update scrape job specification."""
self._jobs = PrometheusConfig.sanitize_scrape_configs(jobs)
self.set_scrape_job_spec()
def set_scrape_job_spec(self, _=None):
"""Ensure scrape target information is made available to prometheus.
When a metrics provider charm is related to a prometheus charm, the
metrics provider sets specification and metadata related to its own
scrape configuration. This information is set using Juju application
data. In addition, each of the consumer units also sets its own
host address in Juju unit relation data.
"""
self._set_unit_ip()
if not self._charm.unit.is_leader():
return
alert_rules = AlertRules(query_type="promql", topology=self.topology)
alert_rules.add_path(self._alert_rules_path, recursive=True)
alert_rules_as_dict = alert_rules.as_dict()
for relation in self._charm.model.relations[self._relation_name]:
relation.data[self._charm.app]["scrape_metadata"] = json.dumps(self._scrape_metadata)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(self._scrape_jobs)
# Update relation data with the string representation of the rule file.
# Juju topology is already included in the "scrape_metadata" field above.
# The consumer side of the relation uses this information to name the rules file
# that is written to the filesystem.
relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict)
def _set_unit_ip(self, _=None):
"""Set unit host address.
Each time a metrics provider charm container is restarted it updates its own
host address in the unit relation data for the prometheus charm.
The only argument specified is an event, and it ignored. This is for expediency
to be able to use this method as an event handler, although no access to the
event is actually needed.
"""
for relation in self._charm.model.relations[self._relation_name]:
unit_ip = str(self._charm.model.get_binding(relation).network.bind_address)
# TODO store entire url in relation data, instead of only select url parts.
if self.external_url:
parsed = urlparse(self.external_url)
unit_address = parsed.hostname
path = parsed.path
elif self._is_valid_unit_address(unit_ip):
unit_address = unit_ip
path = ""
else:
unit_address = socket.getfqdn()
path = ""
relation.data[self._charm.unit]["prometheus_scrape_unit_address"] = unit_address
relation.data[self._charm.unit]["prometheus_scrape_unit_path"] = path
relation.data[self._charm.unit]["prometheus_scrape_unit_name"] = str(
self._charm.model.unit.name
)
def _is_valid_unit_address(self, address: str) -> bool:
"""Validate a unit address.
At present only IP address validation is supported, but
this may be extended to DNS addresses also, as needed.
Args:
address: a string representing a unit address
"""
try:
_ = ipaddress.ip_address(address)
except ValueError:
return False
return True
@property
def _scrape_jobs(self) -> list:
"""Fetch list of scrape jobs.
Returns:
A list of dictionaries, where each dictionary specifies a
single scrape job for Prometheus.
"""
jobs = self._jobs or []
if callable(self._lookaside_jobs):
jobs.extend(PrometheusConfig.sanitize_scrape_configs(self._lookaside_jobs()))
return jobs or [DEFAULT_JOB]
@property
def _scrape_metadata(self) -> dict:
"""Generate scrape metadata.
Returns:
Scrape configuration metadata for this metrics provider charm.
"""
return self.topology.as_dict()
class PrometheusRulesProvider(Object):
"""Forward rules to Prometheus.
This object may be used to forward rules to Prometheus. At present it only supports
forwarding alert rules. This is unlike :class:`MetricsEndpointProvider`, which
is used for forwarding both scrape targets and associated alert rules. This object
is typically used when there is a desire to forward rules that apply globally (across
all deployed charms and units) rather than to a single charm. All rule files are
forwarded using the same 'prometheus_scrape' interface that is also used by
`MetricsEndpointProvider`.
Args:
charm: A charm instance that `provides` a relation with the `prometheus_scrape` interface.
relation_name: Name of the relation in `metadata.yaml` that
has the `prometheus_scrape` interface.
dir_path: Root directory for the collection of rule files.
recursive: Whether to scan for rule files recursively.
"""
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
dir_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
recursive=True,
):
super().__init__(charm, relation_name)
self._charm = charm
self._relation_name = relation_name
self._recursive = recursive
try:
dir_path = _resolve_dir_against_charm_path(charm, dir_path)
except InvalidAlertRulePathError as e:
logger.debug(
"Invalid Prometheus alert rules folder at %s: %s",
e.alert_rules_absolute_path,
e.message,
)
self.dir_path = dir_path
events = self._charm.on[self._relation_name]
event_sources = [
events.relation_joined,
events.relation_changed,
self._charm.on.leader_elected,
self._charm.on.upgrade_charm,
]
for event_source in event_sources:
self.framework.observe(event_source, self._update_relation_data)
def _reinitialize_alert_rules(self):
"""Reloads alert rules and updates all relations."""
self._update_relation_data(None)
def _update_relation_data(self, _):
"""Update application relation data with alert rules for all relations."""
if not self._charm.unit.is_leader():
return
alert_rules = AlertRules(query_type="promql")
alert_rules.add_path(self.dir_path, recursive=self._recursive)
alert_rules_as_dict = alert_rules.as_dict()
logger.info("Updating relation data with rule files from disk")
for relation in self._charm.model.relations[self._relation_name]:
relation.data[self._charm.app]["alert_rules"] = json.dumps(
alert_rules_as_dict,
sort_keys=True, # sort, to prevent unnecessary relation_changed events
)
class MetricsEndpointAggregator(Object):
"""Aggregate metrics from multiple scrape targets.
`MetricsEndpointAggregator` collects scrape target information from one
or more related charms and forwards this to a `MetricsEndpointConsumer`
charm, which may be in a different Juju model. However, it is
essential that `MetricsEndpointAggregator` itself resides in the same
model as its scrape targets, as this is currently the only way to
ensure in Juju that the `MetricsEndpointAggregator` will be able to
determine the model name and uuid of the scrape targets.
`MetricsEndpointAggregator` should be used in place of
`MetricsEndpointProvider` in the following two use cases:
1. Integrating one or more scrape targets that do not support the
`prometheus_scrape` interface.
2. Integrating one or more scrape targets through cross model
relations. Although the [Scrape Config Operator](https://charmhub.io/cos-configuration-k8s)
may also be used for the purpose of supporting cross model
relations.
Using `MetricsEndpointAggregator` to build a Prometheus charm client
only requires instantiating it. Instantiating
`MetricsEndpointAggregator` is similar to `MetricsEndpointProvider` except
that it requires specifying the names of three relations: the
relation with scrape targets, the relation for alert rules, and
that with the Prometheus charms. For example
```python
self._aggregator = MetricsEndpointAggregator(
self,
{
"prometheus": "monitoring",
"scrape_target": "prometheus-target",
"alert_rules": "prometheus-rules"
}
)
```
`MetricsEndpointAggregator` assumes that each unit of a scrape target
sets in its unit-level relation data two entries with keys
"hostname" and "port". If it is required to integrate with charms
that do not honor these assumptions, it is always possible to
derive from `MetricsEndpointAggregator` overriding the `_get_targets()`
method, which is responsible for aggregating the unit name, host
address ("hostname") and port of the scrape target.
`MetricsEndpointAggregator` also assumes that each unit of a
scrape target sets in its unit-level relation data a key named
"groups". The value of this key is expected to be the string
representation of list of Prometheus Alert rules in YAML format.
An example of a single such alert rule is
```yaml
- alert: HighRequestLatency
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
for: 10m
labels:
severity: page
annotations:
summary: High request latency
```
Once again if it is required to integrate with charms that do not
honour these assumptions about alert rules then an object derived
from `MetricsEndpointAggregator` may be used by overriding the
`_get_alert_rules()` method.
`MetricsEndpointAggregator` ensures that Prometheus scrape job
specifications and alert rules are annotated with Juju topology
information, just like `MetricsEndpointProvider` and
`MetricsEndpointConsumer` do.
By default, `MetricsEndpointAggregator` ensures that Prometheus
"instance" labels refer to Juju topology. This ensures that
instance labels are stable over unit recreation. While it is not
advisable to change this option, if required it can be done by
setting the "relabel_instance" keyword argument to `False` when
constructing an aggregator object.
"""
_stored = StoredState()
def __init__(
self,
charm,
relation_names: Optional[dict] = None,
relabel_instance=True,
resolve_addresses=False,
):
"""Construct a `MetricsEndpointAggregator`.
Args:
charm: a `CharmBase` object that manages this
`MetricsEndpointAggregator` object. Typically, this is
`self` in the instantiating class.
relation_names: a dictionary with three keys. The value
of the "scrape_target" and "alert_rules" keys are
the relation names over which scrape job and alert rule
information is gathered by this `MetricsEndpointAggregator`.
And the value of the "prometheus" key is the name of
the relation with a `MetricsEndpointConsumer` such as
the Prometheus charm.
relabel_instance: A boolean flag indicating if Prometheus
scrape job "instance" labels must refer to Juju Topology.
resolve_addresses: A boolean flag indiccating if the aggregator
should attempt to perform DNS lookups of targets and append
a `dns_name` label
"""
self._charm = charm
relation_names = relation_names or {}
self._prometheus_relation = relation_names.get(
"prometheus", "downstream-prometheus-scrape"
)
self._target_relation = relation_names.get("scrape_target", "prometheus-target")
self._alert_rules_relation = relation_names.get("alert_rules", "prometheus-rules")
super().__init__(charm, self._prometheus_relation)
self._stored.set_default(jobs=[], alert_rules=[])
self._relabel_instance = relabel_instance
self._resolve_addresses = resolve_addresses
# manage Prometheus charm relation events
prometheus_events = self._charm.on[self._prometheus_relation]
self.framework.observe(prometheus_events.relation_joined, self._set_prometheus_data)
# manage list of Prometheus scrape jobs from related scrape targets
target_events = self._charm.on[self._target_relation]
self.framework.observe(target_events.relation_changed, self._on_prometheus_targets_changed)
self.framework.observe(
target_events.relation_departed, self._on_prometheus_targets_departed
)
# manage alert rules for Prometheus from related scrape targets
alert_rule_events = self._charm.on[self._alert_rules_relation]
self.framework.observe(alert_rule_events.relation_changed, self._on_alert_rules_changed)
self.framework.observe(alert_rule_events.relation_departed, self._on_alert_rules_departed)
def _set_prometheus_data(self, event):
"""Ensure every new Prometheus instances is updated.
Any time a new Prometheus unit joins the relation with
`MetricsEndpointAggregator`, that Prometheus unit is provided
with the complete set of existing scrape jobs and alert rules.
"""
if not self._charm.unit.is_leader():
return
jobs = [] + _type_convert_stored(
self._stored.jobs # pyright: ignore
) # list of scrape jobs, one per relation
for relation in self.model.relations[self._target_relation]:
targets = self._get_targets(relation)
if targets and relation.app:
jobs.append(self._static_scrape_job(targets, relation.app.name))
groups = [] + _type_convert_stored(
self._stored.alert_rules # pyright: ignore
) # list of alert rule groups
for relation in self.model.relations[self._alert_rules_relation]:
unit_rules = self._get_alert_rules(relation)
if unit_rules and relation.app:
appname = relation.app.name
rules = self._label_alert_rules(unit_rules, appname)
group = {"name": self.group_name(appname), "rules": rules}
groups.append(group)
event.relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
event.relation.data[self._charm.app]["alert_rules"] = json.dumps({"groups": groups})
def _on_prometheus_targets_changed(self, event):
"""Update scrape jobs in response to scrape target changes.
When there is any change in relation data with any scrape
target, the Prometheus scrape job, for that specific target is
updated.
"""
targets = self._get_targets(event.relation)
if not targets:
return
# new scrape job for the relation that has changed
self.set_target_job_data(targets, event.relation.app.name)
def set_target_job_data(self, targets: dict, app_name: str, **kwargs) -> None:
"""Update scrape jobs in response to scrape target changes.
When there is any change in relation data with any scrape
target, the Prometheus scrape job, for that specific target is
updated. Additionally, if this method is called manually, do the
same.
Args:
targets: a `dict` containing target information
app_name: a `str` identifying the application
kwargs: a `dict` of the extra arguments passed to the function
"""
if not self._charm.unit.is_leader():
return
# new scrape job for the relation that has changed
updated_job = self._static_scrape_job(targets, app_name, **kwargs)
for relation in self.model.relations[self._prometheus_relation]:
jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]"))
# list of scrape jobs that have not changed
jobs = [job for job in jobs if updated_job["job_name"] != job["job_name"]]
jobs.append(updated_job)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
if not _type_convert_stored(self._stored.jobs) == jobs: # pyright: ignore
self._stored.jobs = jobs
def _on_prometheus_targets_departed(self, event):
"""Remove scrape jobs when a target departs.
Any time a scrape target departs, any Prometheus scrape job
associated with that specific scrape target is removed.
"""
job_name = self._job_name(event.relation.app.name)
unit_name = event.unit.name
self.remove_prometheus_jobs(job_name, unit_name)
def remove_prometheus_jobs(self, job_name: str, unit_name: Optional[str] = ""):
"""Given a job name and unit name, remove scrape jobs associated.
The `unit_name` parameter is used for automatic, relation data bag-based
generation, where the unit name in labels can be used to ensure that jobs with
similar names (which are generated via the app name when scanning relation data
bags) are not accidentally removed, as their unit name labels will differ.
For NRPE, the job name is calculated from an ID sent via the NRPE relation, and is
sufficient to uniquely identify the target.
"""
if not self._charm.unit.is_leader():
return
for relation in self.model.relations[self._prometheus_relation]:
jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]"))
if not jobs:
continue
changed_job = [j for j in jobs if j.get("job_name") == job_name]
if not changed_job:
continue
changed_job = changed_job[0]
# list of scrape jobs that have not changed
jobs = [job for job in jobs if job.get("job_name") != job_name]
# list of scrape jobs for units of the same application that still exist
configs_kept = [
config
for config in changed_job["static_configs"] # type: ignore
if config.get("labels", {}).get("juju_unit") != unit_name
]
if configs_kept:
changed_job["static_configs"] = configs_kept # type: ignore
jobs.append(changed_job)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
if not _type_convert_stored(self._stored.jobs) == jobs: # pyright: ignore
self._stored.jobs = jobs
def _job_name(self, appname) -> str:
"""Construct a scrape job name.
Each relation has its own unique scrape job name. All units in
the relation are scraped as part of the same scrape job.
Args:
appname: string name of a related application.
Returns:
a string Prometheus scrape job name for the application.
"""
return "juju_{}_{}_{}_prometheus_scrape".format(
self.model.name, self.model.uuid[:7], appname
)
def _get_targets(self, relation) -> dict:
"""Fetch scrape targets for a relation.
Scrape target information is returned for each unit in the
relation. This information contains the unit name, network
hostname (or address) for that unit, and port on which a
metrics endpoint is exposed in that unit.
Args:
relation: an `ops.model.Relation` object for which scrape
targets are required.
Returns:
a dictionary whose keys are names of the units in the
relation. There values associated with each key is itself
a dictionary of the form
```
{"hostname": hostname, "port": port}
```
"""
targets = {}
for unit in relation.units:
port = relation.data[unit].get("port", 80)
hostname = relation.data[unit].get("hostname")
if hostname:
targets.update({unit.name: {"hostname": hostname, "port": port}})
return targets
def _static_scrape_job(self, targets, application_name, **kwargs) -> dict:
"""Construct a static scrape job for an application.
Args:
targets: a dictionary providing hostname and port for all
scrape target. The keys of this dictionary are unit
names. Values corresponding to these keys are
themselves a dictionary with keys "hostname" and
"port".
application_name: a string name of the application for
which this static scrape job is being constructed.
kwargs: a `dict` of the extra arguments passed to the function
Returns:
A dictionary corresponding to a Prometheus static scrape
job configuration for one application. The returned
dictionary may be transformed into YAML and appended to
the list of any existing list of Prometheus static configs.
"""
juju_model = self.model.name
juju_model_uuid = self.model.uuid
job = {
"job_name": self._job_name(application_name),
"static_configs": [
{
"targets": ["{}:{}".format(target["hostname"], target["port"])],
"labels": {
"juju_model": juju_model,
"juju_model_uuid": juju_model_uuid,
"juju_application": application_name,
"juju_unit": unit_name,
"host": target["hostname"],
# Expanding this will merge the dicts and replace the
# topology labels if any were present/found
**self._static_config_extra_labels(target),
},
}
for unit_name, target in targets.items()
],
"relabel_configs": self._relabel_configs + kwargs.get("relabel_configs", []),
}
job.update(kwargs.get("updates", {}))
return job
def _static_config_extra_labels(self, target: Dict[str, str]) -> Dict[str, str]:
"""Build a list of extra static config parameters, if specified."""
extra_info = {}
if self._resolve_addresses:
try:
dns_name = socket.gethostbyaddr(target["hostname"])[0]
except OSError:
logger.debug("Could not perform DNS lookup for %s", target["hostname"])
dns_name = target["hostname"]
extra_info["dns_name"] = dns_name
return extra_info
@property
def _relabel_configs(self) -> list:
"""Create Juju topology relabeling configuration.
Using Juju topology for instance labels ensures that these
labels are stable across unit recreation.
Returns:
a list of Prometheus relabeling configurations. Each item in
this list is one relabel configuration.
"""
return (
[
{
"source_labels": [
"juju_model",
"juju_model_uuid",
"juju_application",
"juju_unit",
],
"separator": "_",
"target_label": "instance",
"regex": "(.*)",
}
]
if self._relabel_instance
else []
)
def _on_alert_rules_changed(self, event):
"""Update alert rules in response to scrape target changes.
When there is any change in alert rule relation data for any
scrape target, the list of alert rules for that specific
target is updated.
"""
unit_rules = self._get_alert_rules(event.relation)
if not unit_rules:
return
app_name = event.relation.app.name
self.set_alert_rule_data(app_name, unit_rules)
def set_alert_rule_data(self, name: str, unit_rules: dict, label_rules: bool = True) -> None:
"""Update alert rule data.
The unit rules should be a dict, which is has additional Juju topology labels added. For
rules generated by the NRPE exporter, they are pre-labeled so lookups can be performed.
"""
if not self._charm.unit.is_leader():
return
if label_rules:
rules = self._label_alert_rules(unit_rules, name)
else:
rules = [unit_rules]
updated_group = {"name": self.group_name(name), "rules": rules}
for relation in self.model.relations[self._prometheus_relation]:
alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}"))
groups = alert_rules.get("groups", [])
# list of alert rule groups that have not changed
for group in groups:
if group["name"] == updated_group["name"]:
group["rules"] = [r for r in group["rules"] if r not in updated_group["rules"]]
group["rules"].extend(updated_group["rules"])
if updated_group["name"] not in [g["name"] for g in groups]:
groups.append(updated_group)
relation.data[self._charm.app]["alert_rules"] = json.dumps({"groups": groups})
if not _type_convert_stored(self._stored.alert_rules) == groups: # pyright: ignore
self._stored.alert_rules = groups
def _on_alert_rules_departed(self, event):
"""Remove alert rules for departed targets.
Any time a scrape target departs any alert rules associated
with that specific scrape target is removed.
"""
group_name = self.group_name(event.relation.app.name)
unit_name = event.unit.name
self.remove_alert_rules(group_name, unit_name)
def remove_alert_rules(self, group_name: str, unit_name: str) -> None:
"""Remove an alert rule group from relation data."""
if not self._charm.unit.is_leader():
return
for relation in self.model.relations[self._prometheus_relation]:
alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}"))
if not alert_rules:
continue
groups = alert_rules.get("groups", [])
if not groups:
continue
changed_group = [group for group in groups if group["name"] == group_name]
if not changed_group:
continue
changed_group = changed_group[0]
# list of alert rule groups that have not changed
groups = [group for group in groups if group["name"] != group_name]
# list of alert rules not associated with departing unit
rules_kept = [
rule
for rule in changed_group.get("rules") # type: ignore
if rule.get("labels").get("juju_unit") != unit_name
]
if rules_kept:
changed_group["rules"] = rules_kept # type: ignore
groups.append(changed_group)
relation.data[self._charm.app]["alert_rules"] = (
json.dumps({"groups": groups}) if groups else "{}"
)
if not _type_convert_stored(self._stored.alert_rules) == groups: # pyright: ignore
self._stored.alert_rules = groups
def _get_alert_rules(self, relation) -> dict:
"""Fetch alert rules for a relation.
Each unit of the related scrape target may have its own
associated alert rules. Alert rules for all units are returned
indexed by unit name.
Args:
relation: an `ops.model.Relation` object for which alert
rules are required.
Returns:
a dictionary whose keys are names of the units in the
relation. There values associated with each key is a list
of alert rules. Each rule is in dictionary format. The
structure "rule dictionary" corresponds to single
Prometheus alert rule.
"""
rules = {}
for unit in relation.units:
unit_rules = yaml.safe_load(relation.data[unit].get("groups", ""))
if unit_rules:
rules.update({unit.name: unit_rules})
return rules
def group_name(self, unit_name: str) -> str:
"""Construct name for an alert rule group.
Each unit in a relation may define its own alert rules. All
rules, for all units in a relation are grouped together and
given a single alert rule group name.
Args:
unit_name: string name of a related application.
Returns:
a string Prometheus alert rules group name for the unit.
"""
unit_name = re.sub(r"/", "_", unit_name)
return "juju_{}_{}_{}_alert_rules".format(self.model.name, self.model.uuid[:7], unit_name)
def _label_alert_rules(self, unit_rules, app_name: str) -> list:
"""Apply juju topology labels to alert rules.
Args:
unit_rules: a list of alert rules, where each rule is in
dictionary format.
app_name: a string name of the application to which the
alert rules belong.
Returns:
a list of alert rules with Juju topology labels.
"""
labeled_rules = []
for unit_name, rules in unit_rules.items():
for rule in rules:
# the new JujuTopology removed this, so build it up by hand
matchers = {
"juju_{}".format(k): v
for k, v in JujuTopology(self.model.name, self.model.uuid, app_name, unit_name)
.as_dict(excluded_keys=["charm_name"])
.items()
}
rule["labels"].update(matchers.items())
labeled_rules.append(rule)
return labeled_rules
class CosTool:
"""Uses cos-tool to inject label matchers into alert rule expressions and validate rules."""
_path = None
_disabled = False
def __init__(self, charm):
self._charm = charm
@property
def path(self):
"""Lazy lookup of the path of cos-tool."""
if self._disabled:
return None
if not self._path:
self._path = self._get_tool_path()
if not self._path:
logger.debug("Skipping injection of juju topology as label matchers")
self._disabled = True
return self._path
def apply_label_matchers(self, rules) -> dict:
"""Will apply label matchers to the expression of all alerts in all supplied groups."""
if not self.path:
return rules
for group in rules["groups"]:
rules_in_group = group.get("rules", [])
for rule in rules_in_group:
topology = {}
# if the user for some reason has provided juju_unit, we'll need to honor it
# in most cases, however, this will be empty
for label in [
"juju_model",
"juju_model_uuid",
"juju_application",
"juju_charm",
"juju_unit",
]:
if label in rule["labels"]:
topology[label] = rule["labels"][label]
rule["expr"] = self.inject_label_matchers(rule["expr"], topology)
return rules
def validate_alert_rules(self, rules: dict) -> Tuple[bool, str]:
"""Will validate correctness of alert rules, returning a boolean and any errors."""
if not self.path:
logger.debug("`cos-tool` unavailable. Not validating alert correctness.")
return True, ""
with tempfile.TemporaryDirectory() as tmpdir:
rule_path = Path(tmpdir + "/validate_rule.yaml")
rule_path.write_text(yaml.dump(rules))
args = [str(self.path), "validate", str(rule_path)]
# noinspection PyBroadException
try:
self._exec(args)
return True, ""
except subprocess.CalledProcessError as e:
logger.debug("Validating the rules failed: %s", e.output)
return False, ", ".join(
[
line
for line in e.output.decode("utf8").splitlines()
if "error validating" in line
]
)
def validate_scrape_jobs(self, jobs: list) -> bool:
"""Validate scrape jobs using cos-tool."""
if not self.path:
logger.debug("`cos-tool` unavailable. Not validating scrape jobs.")
return True
conf = {"scrape_configs": jobs}
with tempfile.NamedTemporaryFile() as tmpfile:
with open(tmpfile.name, "w") as f:
f.write(yaml.safe_dump(conf))
try:
self._exec([str(self.path), "validate-config", tmpfile.name])
except subprocess.CalledProcessError as e:
logger.error("Validating scrape jobs failed: {}".format(e.output))
raise
return True
def inject_label_matchers(self, expression, topology) -> str:
"""Add label matchers to an expression."""
if not topology:
return expression
if not self.path:
logger.debug("`cos-tool` unavailable. Leaving expression unchanged: %s", expression)
return expression
args = [str(self.path), "transform"]
args.extend(
["--label-matcher={}={}".format(key, value) for key, value in topology.items()]
)
args.extend(["{}".format(expression)])
# noinspection PyBroadException
try:
return self._exec(args)
except subprocess.CalledProcessError as e:
logger.debug('Applying the expression failed: "%s", falling back to the original', e)
return expression
def _get_tool_path(self) -> Optional[Path]:
arch = platform.machine()
arch = "amd64" if arch == "x86_64" else arch
res = "cos-tool-{}".format(arch)
try:
path = Path(res).resolve()
path.chmod(0o777)
return path
except NotImplementedError:
logger.debug("System lacks support for chmod")
except FileNotFoundError:
logger.debug('Could not locate cos-tool at: "{}"'.format(res))
return None
def _exec(self, cmd) -> str:
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return result.stdout.decode("utf-8").strip()