Python Cloudflare DDNS Updater.
Using the Cloudflare API, this script updates a DNS record with the current IP address. It also sends a notification to Uptime Kuma via a webhook.
import requests
import argparse
import socket
def send_webhook_notification(webhook_url, status_code, message):
payload = {"status": status_code, "message": message}
try:
response = requests.post(webhook_url, json=payload)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Failed to send webhook notification: {e}")
def get_current_ip(ip_type):
if ip_type == "ipv4":
return requests.get("https://api.ipify.org").text
elif ip_type == "ipv6":
return requests.get("https://api6.ipify.org").text
else:
raise ValueError("Unsupported IP type!")
def update_dns_record(api_token, zone_id, dns_record_id, ip_address, ip_type):
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
data = {
"type": "A" if ip_type == "ipv4" else "AAAA",
"name": dns_record_id,
"content": ip_address,
"ttl": 1, # Automatic TTL (Time To Live)
"proxied": False
}
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{dns_record_id}"
try:
response = requests.put(url, json=data, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Failed to update DNS record: {e}")
return None
def main():
parser = argparse.ArgumentParser(description="Cloudflare DDNS Updater")
parser.add_argument("--api-token", required=True, help="Cloudflare API token")
parser.add_argument("--zone-id", required=True, help="Cloudflare Zone ID")
parser.add_argument("--dns-record-id", required=True, help="Cloudflare DNS Record ID")
parser.add_argument("--ip-type", choices=["ipv4", "ipv6"], required=True, help="Type of IP (ipv4 or ipv6)")
parser.add_argument("--webhook", required=True, help="Uptime Kuma Webhook URL")
args = parser.parse_args()
ip_address = get_current_ip(args.ip_type)
if not ip_address:
send_webhook_notification(args.webhook, "fail", "Could not determine IP address.")
return
result = update_dns_record(args.api_token, args.zone_id, args.dns_record_id, ip_address, args.ip_type)
if result and result.get("success"):
send_webhook_notification(args.webhook, "success", "DNS updated successfully.")
else:
send_webhook_notification(args.webhook, "fail", "Failed to update DNS.")
if __name__ == "__main__":
main()