import subprocess
import json
def generate_curl_command(conf, page):
"""
Generate command to invoke "curl" for Cloudflare API communication.
"""
pattern = """\
curl -X GET "https://api.cloudflare.com/client/v4/zones/%s/dns_records?page=%d" \
-H "X-Auth-Email:%s" -H "X-Auth-Key:%s" -H "Content-Type: application/json" \
"""
command = pattern % (conf["zone_id"], page, conf["email"], conf["api_key"])
return command
def load_user_config_file():
"""
Load user configuration file in order to use Cloudflare API.
"""
with open("config.json") as file:
conf = json.load(file)
if conf["proxied"]:
conf["proxied"] = "true"
else:
conf["proxied"] = "false"
return conf
def invoke_curl_command(command):
"""
Invoke "curl" to use Cloudflare API to update DNS record.
"""
curl_subp = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = curl_subp.communicate()
curl_subp.wait()
curl_response = json.loads(stdout)
return curl_response
def print_error_message(response):
"""
Print human friendly error message.
"""
print("Error occured when communicating to Cloudflare API.")
print("The API returned the message below:")
for error in response["errors"]:
error_message = error["message"]
print(error_message)
def find_record_id(page_num, conf):
"""
Search through all pages to find desired record id.
"""
dns_name = conf["name"]
found = False
record_id = None
for i in range(page_num):
curl_cmd = generate_curl_command(conf, i + 1)
response = invoke_curl_command(curl_cmd)
for entry in response["result"]:
if entry["name"] == dns_name and entry["type"] == "AAAA":
found = True
record_id = entry["id"]
break
if found:
break
return record_id
def main():
conf = load_user_config_file()
dns_name = conf["name"]
curl_cmd = generate_curl_command(conf, 1)
response = invoke_curl_command(curl_cmd)
if not response["success"]:
print_error_message(response)
exit(1)
page_num = response["result_info"]["total_pages"]
record_id = find_record_id(page_num, conf)
if record_id is None:
print("No DNS record of type AAAA is found given the name %s" % dns_name)
else:
print("Record id of name \"%s\" is: %s" % (dns_name, record_id))
if __name__ == "__main__":
main()
下面的代码经过验证
#经验证代码
import time
import re
import requests
import pydnspod
import socket
import os
global old_ip
def get_local_ipv6():
string = os.popen("ifconfig")
ip = list(string)
ip_str = str(ip)
comp = re.compile("24[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*:[0-9a-f]*")
ans = comp.findall(ip_str)
return ans[0]
user_id = "189498"#更换成你的Token ID
user_token = "e0a8e5b1b2f1d0d88f7d3a9e9a7e07f4"#更换成你的Token
domain = "carbin.press"#更换成你的域名
sub_domain = "@"#更换成你的记录
sub_domain_id = "683917156"#更换成你的记录ID
sub_domain1 = "www"
sub_domain_id1="688381119"
dp = pydnspod.connect(user_id,user_token)
old_ip=bytes(dp.record.info(domain,sub_domain_id)['value'],encoding='utf8')
while(True):
try:
# old_ip=str("b'"+dp.record.info(domain,sub_domain_id)['value']+"'")
sock = socket.create_connection(('ns1.dnspod.net',6666),20)
ipv4 = sock.recv(16)
time.sleep(5)
if old_ip != ipv4:
print("当前记录:"+bytes.decode(old_ip))
print("公网ip:"+bytes.decode(ipv4))
time.sleep(2)
return_ = dp.record.modify(domain,sub_domain_id,sub_domain,"A",ipv4)
return_1= dp.record.modify(domain,sub_domain_id1,sub_domain1,"A",ipv4)
old_ip=bytes(dp.record.info(domain,sub_domain_id)['value'],encoding='utf8')
print(return_)
print(return_1)
if (return_ == None or return_1 == None):
print("未更新")
time.sleep(2)
else:
print("已更新,等待300秒")
time.sleep(300)
except IndexError:
print("网络未连接")
time.sleep(2)
except OSError:
print("网络中断")
time.sleep(2)