Tag: traefik

2 notes found.

traefik 中生成的证书如果已经不再使用 (表现其实就是过期了) 的证书,经查询发现有人写了相关的脚本来处理,稍作修改直接使用 uv 来管理避免需要手动安装依赖。

更简单的使用方法 (heredoc):

uv run - <<EOF                                                          
# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "cryptography",
# ]
# ///
import json
import sys
import base64
from datetime import datetime, timedelta, timezone
from cryptography import x509
import os

DEBUG = os.environ.get("DEBUG", False)


def cleanup_certs(acme_file="acme.json"):
    """
    Deletes all expired certificates from acme.json.
    """
    try:
        with open(acme_file, "r") as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: {acme_file} not found.")
        return

    now = datetime.now()
    if DEBUG:
        print(f"DEBUG: current time: {now}")

    cleaned_count = 0

    def process_certificates(certificates, version):
        nonlocal cleaned_count
        if not certificates:
            return [], 0

        original_cert_count = len(certificates)
        certs_to_keep = []
        removed_certs = []
        for cert in certificates:
            cert_b64 = ""
            domain = "N/A"
            try:
                if version == 1:
                    cert_b64 = cert.get("Certificate", "")
                    domain = cert.get("Domain", {}).get("Main", "N/A")
                elif version in [2, 3]:
                    cert_b64 = cert.get("certificate", "")
                    domain = cert.get("domain", {}).get("main", "N/A")

                if cert_b64:
                    cert_b64_decoded = cert_b64.replace("-", "+").replace("_", "/")
                    padding_needed = len(cert_b64_decoded) % 4
                    if padding_needed != 0:
                        cert_b64_decoded += "=" * (4 - padding_needed)

                    cert_pem_bytes = base64.b64decode(cert_b64_decoded)
                    x509_cert = x509.load_pem_x509_certificate(cert_pem_bytes)
                    not_after_date = x509_cert.not_valid_after_utc

                    if DEBUG:
                        print(
                            f"DEBUG: Processing domain {domain}, not_valid_after: {not_after_date}"
                        )

                    if not_after_date > datetime.now(timezone.utc):
                        certs_to_keep.append(cert)
                    else:
                        removed_certs.append(domain)
                else:
                    certs_to_keep.append(cert)
            except Exception as e:
                print(f"Could not process certificate for domain {domain}: {e}")
                certs_to_keep.append(cert)

        removed_count = original_cert_count - len(certs_to_keep)

        # We only want to add to the cleaned_count if we are in a v2/v3 file,
        # because the v1 file has only one list of certs and we will get the
        # count from the return value of this function.
        if version in [2, 3]:
            cleaned_count += removed_count

        for domain_name in removed_certs:
            print(f"Removed certificate for: {domain_name}")

        return certs_to_keep, removed_count

    # Detect acme.json version
    version = None
    if (
        isinstance(data, dict)
        and "Certificates" in data
        and isinstance(data.get("Certificates"), list)
    ):
        version = 1
    elif isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, dict) and "Certificates" in value:
                version = 2  # Treat as v2/v3 generic
                break

    if DEBUG:
        print(f"DEBUG: Detected version: {version}")

    if version == 1:
        certs_to_keep, removed_count = process_certificates(
            data["Certificates"], version
        )
        if removed_count > 0:
            data["Certificates"] = certs_to_keep
            cleaned_count = removed_count
    elif version == 2:
        for resolver, resolver_data in data.items():
            if (
                isinstance(resolver_data, dict)
                and "Certificates" in resolver_data
                and resolver_data["Certificates"] is not None
            ):
                if DEBUG:
                    print(f"DEBUG: Processing resolver: '{resolver}'")
                certs_to_keep, _ = process_certificates(
                    resolver_data["Certificates"], version
                )
                resolver_data["Certificates"] = certs_to_keep
            else:
                if DEBUG:
                    print(f"DEBUG: Resolver '{resolver}': No certificates to process.")

    else:
        print("Could not determine the structure of the acme.json file.")
        return

    if cleaned_count == 0:
        print("No expired certificates to remove.")
    else:
        try:
            with open(acme_file, "w") as f:
                json.dump(data, f, indent=4)
            print(
                f"Successfully cleaned up {cleaned_count} certificates in {acme_file}."
            )
        except Exception as e:
            print(f"Error writing to {acme_file}: {e}")


if __name__ == "__main__":
    help_message = """
Usage: python3 cleanup_certs.py [--help] [acme_file]

Deletes all expired certificates from an acme.json file.
The script attempts to auto-detect the acme.json file version (v1 or v2/v3).

Arguments:
  acme_file   Optional. Path to the acme.json file. Defaults to "acme.json".
  --help      Display this help message and exit.
"""

    if "--help" in sys.argv:
        print(help_message)
        sys.exit(0)

    if len(sys.argv) > 1 and sys.argv[1] != "--help":
        file_path = sys.argv[1]
    else:
        file_path = "acme.json"

    cleanup_certs(file_path)
EOF

Traefik 底层使用的是 lego 库来处理 ACME (Let’s Encrypt) 协议。在标准的 DNS-01 验证流程中,Traefik 会严格遵循 “创建 -> 验证 -> 清理” 的生命周期。所以如果有 _acme-challenge 之类的 dns 记录存在,除非正在签发过程中,否则是可以删除掉的。

签发流程:LIVE EDITOR

flowchart TD
    Start["开始 DNS-01 挑战"]
    
    Start --> GetInfo["获取挑战信息<br/>GetChallengeInfo()"]
    GetInfo --> CNAMECheck{"检查 CNAME"}
    
    CNAMECheck -->|有 CNAME| FollowCNAME["跟随 CNAME 链<br/>getChallengeFQDN()"]
    CNAMECheck -->|无 CNAME| UseFQDN["使用原始 FQDN"]
    
    FollowCNAME --> EffectiveFQDN["获得 EffectiveFQDN"]
    UseFQDN --> EffectiveFQDN
    
    EffectiveFQDN --> Present["创建 TXT 记录<br/>Present()"]
    
    Present --> ProviderCheck{"DNS 提供商类型"}
    
    ProviderCheck -->|标准提供商| StandardProvider["标准 DNS 提供商<br/>直接创建 TXT 记录"]
    ProviderCheck -->|ACME-DNS| ACMEDNSProvider["ACME-DNS 提供商"]
    
    ACMEDNSProvider --> AccountCheck{"检查账户"}
    AccountCheck -->|账户存在| UpdateTXT["更新 TXT 记录"]
    AccountCheck -->|账户不存在| CreateAccount["创建新账户"]
    
    CreateAccount --> CNAMERequired["返回 ErrCNAMERequired<br/>需要手动创建 CNAME"]
    CNAMERequired --> ManualCNAME["手动创建 CNAME 记录"]
    ManualCNAME --> UpdateTXT
    
    StandardProvider --> Propagation["等待 DNS 传播"]
    UpdateTXT --> Propagation
    
    Propagation --> Polling["轮询检查<br/>wait.For()"]
    Polling --> Validate{"验证成功?"}
    
    Validate -->|是| CertIssued["证书签发成功"]
    Validate -->|否| Timeout{"超时?"}
    
    Timeout -->|是| Error["签发失败"]
    Timeout -->|否| Polling
    
    CertIssued --> Cleanup["清理 DNS 记录<br/>CleanUp()"]
    Error --> Cleanup
    
    Cleanup --> DeleteTXT["删除 TXT 记录"]
    DeleteTXT --> End["流程结束"]
    
    %% 样式
    classDef default fill:#f9f9f9,stroke:#333,stroke-width:2px
    classDef process fill:#e1f5fe,stroke:#01579b,stroke-width:2px
    classDef decision fill:#fff3e0,stroke:#e65100,stroke-width:2px
    classDef error fill:#ffebee,stroke:#c62828,stroke-width:2px
    classDef success fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
    
    class GetInfo,FollowCNAME,UseFQDN,EffectiveFQDN,Present,StandardProvider,UpdateTXT,Propagation,Polling,Cleanup,DeleteTXT process
    class CNAMECheck,ProviderCheck,AccountCheck,Validate,Timeout decision
    class CNAMERequired,ManualCNAME,Error error
    class CertIssued,End success