Major Changes: - ✅ Restored: TENCENT_SECRET_ID/TENCENT_SECRET_KEY authentication - ✅ Fixed: TC3-HMAC-SHA256 signature implementation - ✅ Fixed: SERVICE='dnspod' in credential scope (not API_VERSION) - ✅ Fixed: API parameter naming (SubDomain not Subdomain) - ✅ Fixed: Error handling for empty record lists Files Updated: - scripts/deploy_record.py - Complete rewrite with correct signing - .env.example - Updated to Tencent Cloud credentials format - SKILL.md - Updated documentation for API 3.0 New Documentation: - MIGRATION.md - Migration guide - UPDATE-SUMMARY-V3.md - Version 3.0 update summary - TEST-REPORT.md - Test results and verification Testing Results: ✅ API connection successful ✅ Domain query working ✅ Record creation successful (tested: test.eoxnet.com A 1.2.3.4) ✅ Record verification working ✅ Error handling complete API Details: - Endpoint: dnspod.tencentcloudapi.com - Version: 2021-03-23 - Signature: TC3-HMAC-SHA256 - Service: dnspod Version: 3.0 (Tencent Cloud API 3.0)
348 lines
11 KiB
Python
Executable File
348 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
DNSPod 单条记录部署脚本
|
||
使用腾讯云 API 3.0 (TC3-HMAC-SHA256 签名)
|
||
文档:https://cloud.tencent.com/document/product/1427/56189
|
||
"""
|
||
import os
|
||
import sys
|
||
import json
|
||
import hmac
|
||
import hashlib
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
try:
|
||
import requests
|
||
except ImportError:
|
||
print("错误:缺少 requests 库")
|
||
print("请运行:pip install -r requirements.txt")
|
||
sys.exit(1)
|
||
|
||
# 加载 .env 文件
|
||
def load_env():
|
||
"""加载.env 文件"""
|
||
env_file = Path(__file__).parent.parent / '.env'
|
||
if env_file.exists():
|
||
with open(env_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith('#') and '=' in line:
|
||
key, value = line.split('=', 1)
|
||
os.environ[key.strip()] = value.strip()
|
||
|
||
# 加载 .env
|
||
load_env()
|
||
|
||
# API 配置
|
||
API_ENDPOINT = "dnspod.tencentcloudapi.com"
|
||
API_VERSION = "2021-03-23"
|
||
SERVICE = "dnspod"
|
||
REGION = "ap-guangzhou"
|
||
|
||
def get_credentials():
|
||
"""从环境变量或.env 文件获取腾讯云凭证"""
|
||
secret_id = os.getenv('TENCENT_SECRET_ID')
|
||
secret_key = os.getenv('TENCENT_SECRET_KEY')
|
||
|
||
if not secret_id or not secret_key:
|
||
print("错误:未找到腾讯云 API 密钥")
|
||
print("\n请设置环境变量或创建 .env 文件:")
|
||
print(" TENCENT_SECRET_ID=AKIDxxxxxxxxxxxxxxxx")
|
||
print(" TENCENT_SECRET_KEY=xxxxxxxxxxxxxxxx")
|
||
print("\n获取密钥地址:https://console.cloud.tencent.com/cam/capi")
|
||
sys.exit(1)
|
||
|
||
return secret_id, secret_key
|
||
|
||
def sign_request(secret_id, secret_key, action, params):
|
||
"""
|
||
生成腾讯云 API 3.0 签名 (TC3-HMAC-SHA256)
|
||
文档:https://cloud.tencent.com/document/product/1427/56189
|
||
"""
|
||
# 1. 构造请求体
|
||
body = json.dumps(params)
|
||
|
||
# 2. 构造规范请求串
|
||
http_request_method = "POST"
|
||
canonical_uri = "/"
|
||
canonical_querystring = ""
|
||
canonical_headers = f"content-type:application/json\nhost:{API_ENDPOINT}\n"
|
||
signed_headers = "content-type;host"
|
||
hashed_request_payload = hashlib.sha256(body.encode('utf-8')).hexdigest()
|
||
|
||
canonical_request = (
|
||
f"{http_request_method}\n"
|
||
f"{canonical_uri}\n"
|
||
f"{canonical_querystring}\n"
|
||
f"{canonical_headers}\n"
|
||
f"{signed_headers}\n"
|
||
f"{hashed_request_payload}"
|
||
)
|
||
|
||
# 3. 构造待签名字符串
|
||
algorithm = "TC3-HMAC-SHA256"
|
||
timestamp = int(time.time())
|
||
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
|
||
credential_scope = f"{date}/{SERVICE}/tc3_request"
|
||
hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
|
||
|
||
string_to_sign = (
|
||
f"{algorithm}\n"
|
||
f"{timestamp}\n"
|
||
f"{credential_scope}\n"
|
||
f"{hashed_canonical_request}"
|
||
)
|
||
|
||
# 4. 计算签名
|
||
def _hmac_sha256(key, msg):
|
||
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
|
||
|
||
secret_date = _hmac_sha256(f"TC3{secret_key}".encode('utf-8'), date)
|
||
secret_service = _hmac_sha256(secret_date, SERVICE)
|
||
secret_signing = _hmac_sha256(secret_service, "tc3_request")
|
||
signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
|
||
|
||
# 5. 构造 Authorization 头
|
||
authorization = (
|
||
f"{algorithm} "
|
||
f"Credential={secret_id}/{credential_scope}, "
|
||
f"SignedHeaders={signed_headers}, "
|
||
f"Signature={signature}"
|
||
)
|
||
|
||
return {
|
||
"authorization": authorization,
|
||
"body": body,
|
||
"timestamp": timestamp
|
||
}
|
||
|
||
def call_api(action, params):
|
||
"""调用腾讯云 API 3.0"""
|
||
secret_id, secret_key = get_credentials()
|
||
|
||
# 生成签名
|
||
sig = sign_request(secret_id, secret_key, action, params)
|
||
|
||
# 构造请求头
|
||
headers = {
|
||
"Authorization": sig["authorization"],
|
||
"Content-Type": "application/json",
|
||
"Host": API_ENDPOINT,
|
||
"X-TC-Action": action,
|
||
"X-TC-Timestamp": str(sig["timestamp"]),
|
||
"X-TC-Version": API_VERSION,
|
||
"X-TC-Region": REGION
|
||
}
|
||
|
||
# 发送请求
|
||
url = f"https://{API_ENDPOINT}/"
|
||
response = requests.post(url, headers=headers, data=sig["body"])
|
||
|
||
return response.json()
|
||
|
||
def deploy_record(domain, subdomain, record_type, value, line="默认", ttl=600, force=False, create_domain=False, remark=""):
|
||
"""部署 DNS 记录"""
|
||
print(f"\n{'='*60}")
|
||
print(f"DNSPod DNS 记录部署 (腾讯云 API 3.0)")
|
||
print(f"{'='*60}\n")
|
||
print(f"域名:{domain}")
|
||
print(f"子域名:{subdomain}")
|
||
print(f"记录类型:{record_type}")
|
||
print(f"记录值:{value}")
|
||
print(f"线路:{line}")
|
||
print(f"TTL: {ttl}秒")
|
||
print()
|
||
|
||
# 1. 检查域名是否存在
|
||
print("正在检查域名...")
|
||
domain_result = call_api("DescribeDomainList", {
|
||
"Limit": 100,
|
||
"Offset": 0
|
||
})
|
||
|
||
domain_id = None
|
||
if domain_result and "Response" in domain_result:
|
||
if "Error" in domain_result["Response"]:
|
||
error = domain_result["Response"]["Error"]
|
||
print(f"API 错误:{error['Code']}")
|
||
print(f"消息:{error['Message']}")
|
||
return False
|
||
|
||
domain_list = domain_result["Response"].get("DomainList", [])
|
||
for d in domain_list:
|
||
if d.get("Name") == domain:
|
||
domain_id = d.get("DomainId")
|
||
break
|
||
|
||
if not domain_id:
|
||
if create_domain:
|
||
print(f"域名不存在,正在创建:{domain}")
|
||
create_result = call_api("CreateDomain", {"Domain": domain})
|
||
if not create_result or "Error" in create_result.get("Response", {}):
|
||
print("创建域名失败")
|
||
return False
|
||
domain_id = create_result["Response"].get("DomainId")
|
||
print(f"✓ 域名创建成功,ID: {domain_id}")
|
||
else:
|
||
print(f"错误:域名 {domain} 不存在")
|
||
print("提示:使用 --create-domain 参数自动创建域名")
|
||
return False
|
||
|
||
print(f"✓ 域名存在,ID: {domain_id}")
|
||
|
||
# 2. 检查记录是否已存在
|
||
print("正在查询现有记录...")
|
||
records_result = call_api("DescribeRecordList", {
|
||
"Domain": domain,
|
||
"Subdomain": subdomain,
|
||
"RecordType": record_type
|
||
})
|
||
|
||
if not records_result:
|
||
print("查询记录失败")
|
||
return False
|
||
|
||
# 处理"记录列表为空"的情况 (这不是错误)
|
||
records = []
|
||
if "Response" in records_result:
|
||
if "Error" in records_result["Response"]:
|
||
error = records_result["Response"]["Error"]
|
||
# 如果是"记录列表为空",继续处理 (没有现有记录)
|
||
if error['Code'] != 'ResourceNotFound.NoDataOfRecord':
|
||
print(f"API 错误:{error['Code']}")
|
||
print(f"消息:{error['Message']}")
|
||
return False
|
||
else:
|
||
records = records_result["Response"].get("RecordList", [])
|
||
|
||
# 3. 如果记录存在且不是强制更新,提示用户
|
||
if records and not force:
|
||
print(f"\n⚠️ 发现现有记录:")
|
||
for record in records[:3]:
|
||
print(f" {record.get('Name', '')} {record.get('Type', '')} -> {record.get('Value', '')}")
|
||
|
||
if len(records) > 3:
|
||
print(f" ... 还有 {len(records) - 3} 条记录")
|
||
|
||
response = input("\n是否更新现有记录?(y/N): ")
|
||
if response.lower() != 'y':
|
||
print("已取消")
|
||
return False
|
||
force = True
|
||
|
||
# 4. 如果强制更新,先删除现有记录
|
||
if records and force:
|
||
print(f"\n正在删除 {len(records)} 条现有记录...")
|
||
for record in records:
|
||
record_id = record.get('RecordId')
|
||
if record_id:
|
||
delete_result = call_api("DeleteRecord", {
|
||
"Domain": domain,
|
||
"RecordId": record_id
|
||
})
|
||
if delete_result and "Error" not in delete_result.get("Response", {}):
|
||
print(f" ✓ 删除记录:{record.get('Name')} {record.get('Type')}")
|
||
|
||
# 5. 创建新记录
|
||
print(f"\n正在创建新记录...")
|
||
create_params = {
|
||
"Domain": domain,
|
||
"SubDomain": subdomain, # 注意:是 SubDomain 不是 Subdomain
|
||
"RecordType": record_type,
|
||
"RecordLine": line,
|
||
"Value": value,
|
||
"TTL": ttl
|
||
}
|
||
|
||
if remark:
|
||
create_params["Remark"] = remark
|
||
|
||
create_result = call_api("CreateRecord", create_params)
|
||
|
||
if not create_result:
|
||
print("创建记录失败")
|
||
return False
|
||
|
||
if "Error" in create_result.get("Response", {}):
|
||
error = create_result["Response"]["Error"]
|
||
print(f"API 错误:{error['Code']}")
|
||
print(f"消息:{error['Message']}")
|
||
return False
|
||
|
||
record_id = create_result["Response"].get("RecordId")
|
||
|
||
print(f"\n{'='*60}")
|
||
print(f"✓ DNS 记录创建成功!")
|
||
print(f"{'='*60}")
|
||
print(f"记录 ID: {record_id}")
|
||
print(f"完整记录:{subdomain}.{domain} {record_type} {value}")
|
||
print(f"线路:{line}")
|
||
print(f"TTL: {ttl}秒")
|
||
if remark:
|
||
print(f"备注:{remark}")
|
||
print()
|
||
|
||
return True
|
||
|
||
def main():
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description='DNSPod 记录快速部署 (腾讯云 API 3.0)',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog='''
|
||
示例:
|
||
# 添加 www 的 A 记录
|
||
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4
|
||
|
||
# 添加主域名记录
|
||
%(prog)s --domain example.com --subdomain @ --type A --value 1.2.3.4
|
||
|
||
# 添加 CNAME 记录
|
||
%(prog)s --domain example.com --subdomain cdn --type CNAME --value cdn.example.com
|
||
|
||
# 强制更新现有记录
|
||
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.5 --force
|
||
|
||
# 域名不存在时自动创建
|
||
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 --create-domain
|
||
|
||
配置:
|
||
TENCENT_SECRET_ID=AKIDxxxxxxxxxxxxxxxx
|
||
TENCENT_SECRET_KEY=xxxxxxxxxxxxxxxx
|
||
|
||
获取密钥:https://console.cloud.tencent.com/cam/capi
|
||
'''
|
||
)
|
||
|
||
parser.add_argument('--domain', required=True, help='域名 (如:example.com)')
|
||
parser.add_argument('--subdomain', default='@', help='子域名 (默认:@ 表示主域名)')
|
||
parser.add_argument('--type', required=True, choices=['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'SRV', 'CAA'], help='记录类型')
|
||
parser.add_argument('--value', required=True, help='记录值')
|
||
parser.add_argument('--line', default='默认', help='线路 (默认:默认)')
|
||
parser.add_argument('--ttl', type=int, default=600, help='TTL(秒,默认:600)')
|
||
parser.add_argument('--force', action='store_true', help='强制更新现有记录 (不询问)')
|
||
parser.add_argument('--create-domain', action='store_true', help='域名不存在时自动创建')
|
||
parser.add_argument('--remark', default='', help='记录备注')
|
||
|
||
args = parser.parse_args()
|
||
|
||
success = deploy_record(
|
||
domain=args.domain,
|
||
subdomain=args.subdomain,
|
||
record_type=args.type,
|
||
value=args.value,
|
||
line=args.line,
|
||
ttl=args.ttl,
|
||
force=args.force,
|
||
create_domain=args.create_domain,
|
||
remark=args.remark
|
||
)
|
||
|
||
sys.exit(0 if success else 1)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|