Posted on :: 268 Words :: Tags: , ,

Using the Cloudflare API, this script updates a DNS record with the current IP address. It also sends a notification to Uptime Kuma via a webhook.

import requests
import argparse
import socket

def send_webhook_notification(webhook_url, status_code, message):
    payload = {"status": status_code, "message": message}
    try:
        response = requests.post(webhook_url, json=payload)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"Failed to send webhook notification: {e}")

def get_current_ip(ip_type):
    if ip_type == "ipv4":
        return requests.get("https://api.ipify.org").text
    elif ip_type == "ipv6":
        return requests.get("https://api6.ipify.org").text
    else:
        raise ValueError("Unsupported IP type!")

def update_dns_record(api_token, zone_id, dns_record_id, ip_address, ip_type):
    headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json"
    }
    data = {
        "type": "A" if ip_type == "ipv4" else "AAAA",
        "name": dns_record_id,
        "content": ip_address,
        "ttl": 1,  # Automatic TTL (Time To Live)
        "proxied": False
    }
    url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{dns_record_id}"
    try:
        response = requests.put(url, json=data, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Failed to update DNS record: {e}")
        return None

def main():
    parser = argparse.ArgumentParser(description="Cloudflare DDNS Updater")
    parser.add_argument("--api-token", required=True, help="Cloudflare API token")
    parser.add_argument("--zone-id", required=True, help="Cloudflare Zone ID")
    parser.add_argument("--dns-record-id", required=True, help="Cloudflare DNS Record ID")
    parser.add_argument("--ip-type", choices=["ipv4", "ipv6"], required=True, help="Type of IP (ipv4 or ipv6)")
    parser.add_argument("--webhook", required=True, help="Uptime Kuma Webhook URL")
    
    args = parser.parse_args()

    ip_address = get_current_ip(args.ip_type)
    if not ip_address:
        send_webhook_notification(args.webhook, "fail", "Could not determine IP address.")
        return

    result = update_dns_record(args.api_token, args.zone_id, args.dns_record_id, ip_address, args.ip_type)

    if result and result.get("success"):
        send_webhook_notification(args.webhook, "success", "DNS updated successfully.")
    else:
        send_webhook_notification(args.webhook, "fail", "Failed to update DNS.")

if __name__ == "__main__":
    main()