Files
DNSPod-Skill/scripts/deploy_record.py
OpenClaw 7abea390ad Initial commit: Tencent DNSPod DNS deployment skill
- Support for single record deployment
- Batch deployment from JSON config
- Service quick deployment (Web/API/CDN)
- .env file support for secure credentials
- Complete documentation
2026-03-01 11:44:05 +08:00

338 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DNSPod单条记录部署脚本
用于快速添加或更新DNS记录
支持 .env 文件配置敏感信息
"""
import os
import sys
import json
import hmac
import hashlib
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import urlencode, quote
try:
import requests
except ImportError:
print("错误: 缺少 requests 库")
print("请运行: pip install -r requirements.txt")
sys.exit(1)
# 加载 .env 文件
def load_env():
"""加载.env文件"""
env_file = Path(__file__).parent.parent / '.env'
if env_file.exists():
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
# 加载 .env
load_env()
# API配置
API_ENDPOINT = "dnspod.tencentcloudapi.com"
API_VERSION = "2021-03-23"
REGION = "ap-guangzhou"
def get_credentials():
"""从环境变量或.env文件获取腾讯云凭证"""
secret_id = os.getenv('TENCENT_SECRET_ID')
secret_key = os.getenv('TENCENT_SECRET_KEY')
if not secret_id or not secret_key:
print("错误: 未找到腾讯云API密钥")
print("\n请设置环境变量或创建 .env 文件:")
print(" 方式1: 设置环境变量")
print(" export TENCENT_SECRET_ID=\"你的SecretId\"")
print(" export TENCENT_SECRET_KEY=\"你的SecretKey\"")
print("\n 方式2: 创建 .env 文件")
print(" cp .env.example .env")
print(" 然后编辑 .env 文件,填入你的密钥")
print("\n获取密钥地址: https://console.cloud.tencent.com/cam/capi")
sys.exit(1)
return secret_id, secret_key
def sign_request(secret_id, secret_key, action, params):
"""
生成腾讯云API签名
文档: https://cloud.tencent.com/document/product/1427/56152
"""
# 1. 构造请求体
body = json.dumps(params)
# 2. 构造规范请求串
# 请求方法
http_request_method = "POST"
# 请求URI
canonical_uri = "/"
# 请求查询字符串(空)
canonical_querystring = ""
# 请求头
canonical_headers = f"content-type:application/json\nhost:{API_ENDPOINT}\n"
signed_headers = "content-type;host"
# 请求哈希值
hashed_request_payload = hashlib.sha256(body.encode('utf-8')).hexdigest()
canonical_request = (
http_request_method + "\n" +
canonical_uri + "\n" +
canonical_querystring + "\n" +
canonical_headers + "\n" +
signed_headers + "\n" +
hashed_request_payload
)
# 3. 构造待签名字符串
algorithm = "TC3-HMAC-SHA256"
timestamp = int(time.time())
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
credential_scope = f"{date}/{API_VERSION}/tc3_request"
hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
string_to_sign = (
algorithm + "\n" +
str(timestamp) + "\n" +
credential_scope + "\n" +
hashed_canonical_request
)
# 4. 计算签名
def _hmac_sha256(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
secret_date = _hmac_sha256(f"TC3{secret_key}".encode('utf-8'), date)
secret_service = _hmac_sha256(secret_date, API_VERSION)
secret_signing = _hmac_sha256(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
# 5. 构造Authorization头
authorization = (
algorithm + " " +
"Credential=" + secret_id + "/" + credential_scope + ", " +
"SignedHeaders=" + signed_headers + ", " +
"Signature=" + signature
)
return {
"authorization": authorization,
"body": body,
"timestamp": timestamp
}
def call_api(action, params):
"""调用腾讯云API"""
secret_id, secret_key = get_credentials()
# 生成签名
sig = sign_request(secret_id, secret_key, action, params)
# 构造请求头
headers = {
"Authorization": sig["authorization"],
"Content-Type": "application/json",
"Host": API_ENDPOINT,
"X-TC-Action": action,
"X-TC-Timestamp": str(sig["timestamp"]),
"X-TC-Version": API_VERSION,
"X-TC-Region": REGION
}
# 发送请求
url = f"https://{API_ENDPOINT}/"
response = requests.post(url, headers=headers, data=sig["body"])
return response.json()
def check_domain_exists(domain):
"""检查域名是否存在"""
try:
result = call_api("DescribeDomainList", {})
domains = [d["Name"] for d in result.get("Response", {}).get("DomainList", [])]
return domain in domains
except Exception as e:
print(f"检查域名失败: {e}")
return False
def create_domain(domain):
"""创建域名"""
print(f"正在创建域名: {domain}")
try:
result = call_api("CreateDomain", {"Domain": domain})
print(f"✓ 域名创建成功: {domain}")
return True
except Exception as e:
print(f"✗ 域名创建失败: {e}")
return False
def find_record(domain, subdomain, record_type):
"""查找现有记录"""
try:
params = {
"Domain": domain,
"Subdomain": subdomain,
"RecordType": record_type
}
result = call_api("DescribeRecordList", params)
records = result.get("Response", {}).get("RecordList", [])
for record in records:
if record.get("Name") == subdomain and record.get("Type") == record_type:
return record
return None
except Exception as e:
print(f"查找记录失败: {e}")
return None
def create_record(domain, subdomain, record_type, value, line="默认", ttl=600, remark=""):
"""创建DNS记录"""
params = {
"Domain": domain,
"RecordType": record_type,
"RecordLine": line,
"Value": value,
"TTL": ttl
}
# 添加子域名
if subdomain and subdomain != "@":
params["SubDomain"] = subdomain
# 添加备注
if remark:
params["Remark"] = remark
try:
result = call_api("CreateRecord", params)
record_id = result.get("Response", {}).get("RecordId")
print(f"✓ 记录创建成功: {subdomain or '@'}.{domain} ({record_type}) → {value}")
return record_id
except Exception as e:
print(f"✗ 记录创建失败: {e}")
return None
def modify_record(domain, record_id, subdomain, record_type, value, line="默认", ttl=600, remark=""):
"""修改DNS记录"""
params = {
"Domain": domain,
"RecordId": record_id,
"RecordType": record_type,
"RecordLine": line,
"Value": value,
"TTL": ttl
}
if subdomain and subdomain != "@":
params["SubDomain"] = subdomain
if remark:
params["Remark"] = remark
try:
result = call_api("ModifyRecord", params)
print(f"✓ 记录更新成功: {subdomain or '@'}.{domain} ({record_type}) → {value}")
return True
except Exception as e:
print(f"✗ 记录更新失败: {e}")
return False
def deploy_record(domain, subdomain, record_type, value, line="默认", ttl=600, force=False, create_domain_flag=False, remark=""):
"""部署DNS记录(创建或更新)"""
print(f"\n部署DNS记录: {subdomain or '@'}.{domain}")
print(f" 类型: {record_type}")
print(f" 值: {value}")
print(f" 线路: {line}")
print(f" TTL: {ttl}s")
# 检查域名是否存在
if not check_domain_exists(domain):
if create_domain_flag:
if not create_domain(domain):
return False
else:
print(f"✗ 域名不存在: {domain}")
print(f" 提示: 使用 --create-domain 自动创建域名")
return False
# 查找现有记录
existing = find_record(domain, subdomain, record_type)
if existing:
if not force:
old_value = existing.get("Value", "")
print(f"\n记录已存在:")
print(f" 当前值: {old_value}")
print(f" 新值: {value}")
# 检查是否需要更新
if old_value == value:
print(" 值未变化,无需更新")
return True
response = input("\n是否更新? (y/N): ")
if response.lower() != 'y':
print(" 已取消")
return False
record_id = existing.get("RecordId")
return modify_record(domain, record_id, subdomain, record_type, value, line, ttl, remark)
else:
return create_record(domain, subdomain, record_type, value, line, ttl, remark)
def main():
import argparse
parser = argparse.ArgumentParser(description='DNSPod记录快速部署', formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
示例:
# 添加www的A记录
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4
# 添加主域名记录
%(prog)s --domain example.com --subdomain @ --type A --value 1.2.3.4
# 添加CNAME记录
%(prog)s --domain example.com --subdomain cdn --type CNAME --value cdn.example.com
# 强制更新现有记录
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.5 --force
# 域名不存在时自动创建
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 --create-domain
''')
parser.add_argument('--domain', required=True, help='域名(如: example.com)')
parser.add_argument('--subdomain', default='@', help='子域名(默认: @ 表示主域名)')
parser.add_argument('--type', required=True, choices=['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'SRV'], help='记录类型')
parser.add_argument('--value', required=True, help='记录值')
parser.add_argument('--line', default='默认', help='线路(默认: 默认)')
parser.add_argument('--ttl', type=int, default=600, help='TTL(秒, 默认: 600)')
parser.add_argument('--force', action='store_true', help='强制更新现有记录(不询问)')
parser.add_argument('--create-domain', action='store_true', help='域名不存在时自动创建')
parser.add_argument('--remark', default='', help='记录备注')
args = parser.parse_args()
# 部署记录
success = deploy_record(
domain=args.domain,
subdomain=args.subdomain,
record_type=args.type,
value=args.value,
line=args.line,
ttl=args.ttl,
force=args.force,
create_domain_flag=args.create_domain,
remark=args.remark
)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()