
Charmed MySQL
- Canonical
- Cloud
Channel | Revision | Published | Runs on |
---|---|---|---|
8.0/stable | 366 | 31 Mar 2025 | |
8.0/stable | 367 | 31 Mar 2025 | |
8.0/candidate | 366 | 24 Mar 2025 | |
8.0/candidate | 367 | 24 Mar 2025 | |
8.0/beta | 366 | 11 Mar 2025 | |
8.0/beta | 367 | 11 Mar 2025 | |
8.0/edge | 375 | 07 May 2025 | |
8.0/edge | 374 | 07 May 2025 |
juju deploy mysql --channel 8.0/stable
Deploy universal operators easily with Juju, the Universal Operator Lifecycle Manager.
Platform:
22.04
-
- Last updated
- Revision Library version 0.11
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""S3 helper functions for the MySQL charms."""
import base64
import logging
import pathlib
import tempfile
import time
from contextlib import nullcontext
from io import BytesIO
from typing import Dict, List, Tuple
import boto3
import botocore
import botocore.exceptions
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "bb85c0c01b454bd48898d680e3f3ce4d"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 11
S3_GROUP_REPLICATION_ID_FILE = "group_replication_id.txt"
# botocore/urllib3 clutter the logs when on debug
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def _construct_endpoint(s3_parameters: dict) -> str:
"""Construct the S3 service endpoint using the region.
This is needed when the provided endpoint is from AWS, and it doesn't contain the region.
"""
# Use the provided endpoint if a region is not needed.
endpoint = s3_parameters["endpoint"]
# Load endpoints data.
loader = botocore.loaders.create_loader()
data = loader.load_data("endpoints")
# Construct the endpoint using the region.
resolver = botocore.regions.EndpointResolver(data)
endpoint_data = resolver.construct_endpoint("s3", s3_parameters["region"])
# Use the built endpoint if it is an AWS endpoint.
if endpoint_data and endpoint.endswith(endpoint_data["dnsSuffix"]):
endpoint = f"{endpoint.split('://')[0]}://{endpoint_data['hostname']}"
return endpoint
def _get_bucket(s3_parameters: Dict) -> boto3.resources.base.ServiceResource:
"""Get an S3 bucket resource.
Args:
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key
Returns: an S3 bucket resource
"""
session = boto3.session.Session(
aws_access_key_id=s3_parameters["access-key"],
aws_secret_access_key=s3_parameters["secret-key"],
region_name=s3_parameters["region"] or None,
)
ca_chain = s3_parameters.get("tls-ca-chain")
with tempfile.NamedTemporaryFile() if ca_chain else nullcontext() as ca_file:
if ca_file:
ca = "\n".join([base64.b64decode(s).decode() for s in ca_chain])
ca_file.write(ca.encode())
ca_file.flush()
s3 = session.resource(
"s3",
endpoint_url=_construct_endpoint(s3_parameters),
verify=ca_file.name if ca_file else True,
)
return s3.Bucket(s3_parameters["bucket"])
def upload_content_to_s3(content: str, content_path: str, s3_parameters: Dict) -> bool:
"""Uploads the provided contents to the provided S3 bucket.
Args:
content: The content to upload to S3
content_path: The path to which to upload the content
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key
Returns: a boolean indicating success.
"""
try:
logger.info(f"Uploading content to bucket={s3_parameters['bucket']}, path={content_path}")
bucket = _get_bucket(s3_parameters)
with tempfile.NamedTemporaryFile() as temp_file:
temp_file.write(content.encode("utf-8"))
temp_file.flush()
bucket.upload_file(temp_file.name, content_path)
except Exception as e:
logger.exception(
f"Failed to upload content to S3 bucket={s3_parameters['bucket']}, path={content_path}",
exc_info=e,
)
return False
return True
def _read_content_from_s3(content_path: str, s3_parameters: dict) -> str | None:
"""Reads specified content from the provided S3 bucket.
Args:
content_path: The S3 path from which download the content
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key
Returns:
a string with the content if object is successfully downloaded and None if file is not existing or error
occurred during download.
"""
try:
logger.info(f"Reading content from bucket={s3_parameters['bucket']}, path={content_path}")
bucket = _get_bucket(s3_parameters)
with BytesIO() as buf:
bucket.download_fileobj(content_path, buf)
return buf.getvalue().decode("utf-8")
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
logger.info(
f"No such object to read from S3 bucket={s3_parameters['bucket']}, path={content_path}"
)
else:
logger.exception(
f"Failed to read content from S3 bucket={s3_parameters['bucket']}, path={content_path}",
exc_info=e,
)
except Exception as e:
logger.exception(
f"Failed to read content from S3 bucket={s3_parameters['bucket']}, path={content_path}",
exc_info=e,
)
return None
def _compile_backups_from_file_ids(
metadata_ids: List[str], md5_ids: List[str], log_ids: List[str]
) -> List[Tuple[str, str]]:
"""Helper function that compiles tuples of (backup_id, status) from file ids."""
backups = []
for backup_id in metadata_ids:
backup_status = "in progress"
if backup_id in md5_ids:
backup_status = "finished"
elif backup_id in log_ids:
backup_status = "failed"
backups.append((backup_id, backup_status))
return backups
def list_backups_in_s3_path(s3_parameters: Dict) -> List[Tuple[str, str]]: # noqa: C901
"""Retrieve subdirectories in an S3 path.
Args:
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, path,
region, endpoint, access-key and secret-key
Returns: a list of subdirectories directly after the S3 path.
Raises: any exception raised by boto3
"""
try:
logger.info(
f"Listing subdirectories from S3 bucket={s3_parameters['bucket']}, path={s3_parameters['path']}"
)
s3_client = boto3.client(
"s3",
aws_access_key_id=s3_parameters["access-key"],
aws_secret_access_key=s3_parameters["secret-key"],
endpoint_url=_construct_endpoint(s3_parameters),
region_name=s3_parameters["region"] or None,
)
list_objects_v2_paginator = s3_client.get_paginator("list_objects_v2")
s3_path_directory = (
s3_parameters["path"]
if s3_parameters["path"][-1] == "/"
else f"{s3_parameters['path']}/"
)
metadata_ids = []
md5_ids = []
log_ids = []
for page in list_objects_v2_paginator.paginate(
Bucket=s3_parameters["bucket"],
Prefix=s3_path_directory,
Delimiter="/",
):
for content in page.get("Contents", []):
key = content["Key"]
filename = key.removeprefix(s3_path_directory)
if ".metadata" in filename:
try:
backup_id = filename.split(".metadata")[0]
time.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ")
metadata_ids.append(backup_id)
except ValueError:
pass
elif ".md5" in key:
md5_ids.append(filename.split(".md5")[0])
elif ".backup.log" in key:
log_ids.append(filename.split(".backup.log")[0])
return _compile_backups_from_file_ids(metadata_ids, md5_ids, log_ids)
except Exception as e:
try:
# botocore raises dynamically generated exceptions
# with a response attribute. We can use this to
# set a more meaningful error message.
if e.response["Error"]["Code"] == "NoSuchBucket":
message = f"Bucket {s3_parameters['bucket']} does not exist"
setattr(e, "message", message)
raise
except (KeyError, AttributeError):
pass
# default handling exposes exception
logger.exception(
f"Failed to list subdirectories in S3 bucket={s3_parameters['bucket']}, path={s3_parameters['path']}"
)
raise
def fetch_and_check_existence_of_s3_path(path: str, s3_parameters: Dict[str, str]) -> bool:
"""Checks the existence of a provided S3 path by fetching the object.
Args:
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key
path: The path to check the existence of
Returns: a boolean indicating the existence of the s3 path
Raises: any exceptions raised by boto3
"""
s3_client = boto3.client(
"s3",
aws_access_key_id=s3_parameters["access-key"],
aws_secret_access_key=s3_parameters["secret-key"],
endpoint_url=_construct_endpoint(s3_parameters),
region_name=s3_parameters["region"] or None,
)
try:
response = s3_client.get_object(Bucket=s3_parameters["bucket"], Key=path, Range="0-1")
return "ContentLength" in response # return True even if object is empty
except s3_client.exceptions.NoSuchKey:
return False
except Exception as e:
logger.exception(
f"Failed to fetch and check existence of path {path} in S3 bucket {s3_parameters['bucket']}",
exc_info=e,
)
raise
def ensure_s3_compatible_group_replication_id(
group_replication_id: str, s3_parameters: Dict[str, str]
) -> bool:
"""Checks if group replication id is equal to the one in the provided S3 repository.
If S3 doesn't have this claim (so it's not initialized),
then it will be populated automatically with the provided id.
Args:
group_replication_id: group replication id of the current cluster
s3_parameters: A dictionary containing the S3 parameters
The following are expected keys in the dictionary: bucket, region,
endpoint, access-key and secret-key
"""
s3_id_path = str(pathlib.Path(s3_parameters["path"]) / S3_GROUP_REPLICATION_ID_FILE)
s3_id = _read_content_from_s3(s3_id_path, s3_parameters)
if s3_id and s3_id != group_replication_id:
logger.info(
f"s3 repository is not compatible based on group replication id: {group_replication_id} != {s3_id}"
)
return False
upload_content_to_s3(group_replication_id, s3_id_path, s3_parameters)
return True