Files
DNSPod-Skill/scripts/deploy_record.py
panfd 47f090aa6f Restore Tencent Cloud API 3.0 with correct TC3-HMAC-SHA256 signing
Major Changes:
-  Restored: TENCENT_SECRET_ID/TENCENT_SECRET_KEY authentication
-  Fixed: TC3-HMAC-SHA256 signature implementation
-  Fixed: SERVICE='dnspod' in credential scope (not API_VERSION)
-  Fixed: API parameter naming (SubDomain not Subdomain)
-  Fixed: Error handling for empty record lists

Files Updated:
- scripts/deploy_record.py - Complete rewrite with correct signing
- .env.example - Updated to Tencent Cloud credentials format
- SKILL.md - Updated documentation for API 3.0

New Documentation:
- MIGRATION.md - Migration guide
- UPDATE-SUMMARY-V3.md - Version 3.0 update summary
- TEST-REPORT.md - Test results and verification

Testing Results:
 API connection successful
 Domain query working
 Record creation successful (tested: test.eoxnet.com A 1.2.3.4)
 Record verification working
 Error handling complete

API Details:
- Endpoint: dnspod.tencentcloudapi.com
- Version: 2021-03-23
- Signature: TC3-HMAC-SHA256
- Service: dnspod

Version: 3.0 (Tencent Cloud API 3.0)
2026-03-01 16:37:38 +08:00

348 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DNSPod 单条记录部署脚本
使用腾讯云 API 3.0 (TC3-HMAC-SHA256 签名)
文档https://cloud.tencent.com/document/product/1427/56189
"""
import os
import sys
import json
import hmac
import hashlib
import time
from datetime import datetime
from pathlib import Path
try:
import requests
except ImportError:
print("错误:缺少 requests 库")
print("请运行pip install -r requirements.txt")
sys.exit(1)
# 加载 .env 文件
def load_env():
"""加载.env 文件"""
env_file = Path(__file__).parent.parent / '.env'
if env_file.exists():
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
# 加载 .env
load_env()
# API 配置
API_ENDPOINT = "dnspod.tencentcloudapi.com"
API_VERSION = "2021-03-23"
SERVICE = "dnspod"
REGION = "ap-guangzhou"
def get_credentials():
"""从环境变量或.env 文件获取腾讯云凭证"""
secret_id = os.getenv('TENCENT_SECRET_ID')
secret_key = os.getenv('TENCENT_SECRET_KEY')
if not secret_id or not secret_key:
print("错误:未找到腾讯云 API 密钥")
print("\n请设置环境变量或创建 .env 文件:")
print(" TENCENT_SECRET_ID=AKIDxxxxxxxxxxxxxxxx")
print(" TENCENT_SECRET_KEY=xxxxxxxxxxxxxxxx")
print("\n获取密钥地址https://console.cloud.tencent.com/cam/capi")
sys.exit(1)
return secret_id, secret_key
def sign_request(secret_id, secret_key, action, params):
"""
生成腾讯云 API 3.0 签名 (TC3-HMAC-SHA256)
文档https://cloud.tencent.com/document/product/1427/56189
"""
# 1. 构造请求体
body = json.dumps(params)
# 2. 构造规范请求串
http_request_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
canonical_headers = f"content-type:application/json\nhost:{API_ENDPOINT}\n"
signed_headers = "content-type;host"
hashed_request_payload = hashlib.sha256(body.encode('utf-8')).hexdigest()
canonical_request = (
f"{http_request_method}\n"
f"{canonical_uri}\n"
f"{canonical_querystring}\n"
f"{canonical_headers}\n"
f"{signed_headers}\n"
f"{hashed_request_payload}"
)
# 3. 构造待签名字符串
algorithm = "TC3-HMAC-SHA256"
timestamp = int(time.time())
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
credential_scope = f"{date}/{SERVICE}/tc3_request"
hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
string_to_sign = (
f"{algorithm}\n"
f"{timestamp}\n"
f"{credential_scope}\n"
f"{hashed_canonical_request}"
)
# 4. 计算签名
def _hmac_sha256(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
secret_date = _hmac_sha256(f"TC3{secret_key}".encode('utf-8'), date)
secret_service = _hmac_sha256(secret_date, SERVICE)
secret_signing = _hmac_sha256(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
# 5. 构造 Authorization 头
authorization = (
f"{algorithm} "
f"Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
return {
"authorization": authorization,
"body": body,
"timestamp": timestamp
}
def call_api(action, params):
"""调用腾讯云 API 3.0"""
secret_id, secret_key = get_credentials()
# 生成签名
sig = sign_request(secret_id, secret_key, action, params)
# 构造请求头
headers = {
"Authorization": sig["authorization"],
"Content-Type": "application/json",
"Host": API_ENDPOINT,
"X-TC-Action": action,
"X-TC-Timestamp": str(sig["timestamp"]),
"X-TC-Version": API_VERSION,
"X-TC-Region": REGION
}
# 发送请求
url = f"https://{API_ENDPOINT}/"
response = requests.post(url, headers=headers, data=sig["body"])
return response.json()
def deploy_record(domain, subdomain, record_type, value, line="默认", ttl=600, force=False, create_domain=False, remark=""):
"""部署 DNS 记录"""
print(f"\n{'='*60}")
print(f"DNSPod DNS 记录部署 (腾讯云 API 3.0)")
print(f"{'='*60}\n")
print(f"域名:{domain}")
print(f"子域名:{subdomain}")
print(f"记录类型:{record_type}")
print(f"记录值:{value}")
print(f"线路:{line}")
print(f"TTL: {ttl}")
print()
# 1. 检查域名是否存在
print("正在检查域名...")
domain_result = call_api("DescribeDomainList", {
"Limit": 100,
"Offset": 0
})
domain_id = None
if domain_result and "Response" in domain_result:
if "Error" in domain_result["Response"]:
error = domain_result["Response"]["Error"]
print(f"API 错误:{error['Code']}")
print(f"消息:{error['Message']}")
return False
domain_list = domain_result["Response"].get("DomainList", [])
for d in domain_list:
if d.get("Name") == domain:
domain_id = d.get("DomainId")
break
if not domain_id:
if create_domain:
print(f"域名不存在,正在创建:{domain}")
create_result = call_api("CreateDomain", {"Domain": domain})
if not create_result or "Error" in create_result.get("Response", {}):
print("创建域名失败")
return False
domain_id = create_result["Response"].get("DomainId")
print(f"✓ 域名创建成功ID: {domain_id}")
else:
print(f"错误:域名 {domain} 不存在")
print("提示:使用 --create-domain 参数自动创建域名")
return False
print(f"✓ 域名存在ID: {domain_id}")
# 2. 检查记录是否已存在
print("正在查询现有记录...")
records_result = call_api("DescribeRecordList", {
"Domain": domain,
"Subdomain": subdomain,
"RecordType": record_type
})
if not records_result:
print("查询记录失败")
return False
# 处理"记录列表为空"的情况 (这不是错误)
records = []
if "Response" in records_result:
if "Error" in records_result["Response"]:
error = records_result["Response"]["Error"]
# 如果是"记录列表为空",继续处理 (没有现有记录)
if error['Code'] != 'ResourceNotFound.NoDataOfRecord':
print(f"API 错误:{error['Code']}")
print(f"消息:{error['Message']}")
return False
else:
records = records_result["Response"].get("RecordList", [])
# 3. 如果记录存在且不是强制更新,提示用户
if records and not force:
print(f"\n⚠️ 发现现有记录:")
for record in records[:3]:
print(f" {record.get('Name', '')} {record.get('Type', '')} -> {record.get('Value', '')}")
if len(records) > 3:
print(f" ... 还有 {len(records) - 3} 条记录")
response = input("\n是否更新现有记录?(y/N): ")
if response.lower() != 'y':
print("已取消")
return False
force = True
# 4. 如果强制更新,先删除现有记录
if records and force:
print(f"\n正在删除 {len(records)} 条现有记录...")
for record in records:
record_id = record.get('RecordId')
if record_id:
delete_result = call_api("DeleteRecord", {
"Domain": domain,
"RecordId": record_id
})
if delete_result and "Error" not in delete_result.get("Response", {}):
print(f" ✓ 删除记录:{record.get('Name')} {record.get('Type')}")
# 5. 创建新记录
print(f"\n正在创建新记录...")
create_params = {
"Domain": domain,
"SubDomain": subdomain, # 注意:是 SubDomain 不是 Subdomain
"RecordType": record_type,
"RecordLine": line,
"Value": value,
"TTL": ttl
}
if remark:
create_params["Remark"] = remark
create_result = call_api("CreateRecord", create_params)
if not create_result:
print("创建记录失败")
return False
if "Error" in create_result.get("Response", {}):
error = create_result["Response"]["Error"]
print(f"API 错误:{error['Code']}")
print(f"消息:{error['Message']}")
return False
record_id = create_result["Response"].get("RecordId")
print(f"\n{'='*60}")
print(f"✓ DNS 记录创建成功!")
print(f"{'='*60}")
print(f"记录 ID: {record_id}")
print(f"完整记录:{subdomain}.{domain} {record_type} {value}")
print(f"线路:{line}")
print(f"TTL: {ttl}")
if remark:
print(f"备注:{remark}")
print()
return True
def main():
import argparse
parser = argparse.ArgumentParser(
description='DNSPod 记录快速部署 (腾讯云 API 3.0)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
示例:
# 添加 www 的 A 记录
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4
# 添加主域名记录
%(prog)s --domain example.com --subdomain @ --type A --value 1.2.3.4
# 添加 CNAME 记录
%(prog)s --domain example.com --subdomain cdn --type CNAME --value cdn.example.com
# 强制更新现有记录
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.5 --force
# 域名不存在时自动创建
%(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 --create-domain
配置:
TENCENT_SECRET_ID=AKIDxxxxxxxxxxxxxxxx
TENCENT_SECRET_KEY=xxxxxxxxxxxxxxxx
获取密钥https://console.cloud.tencent.com/cam/capi
'''
)
parser.add_argument('--domain', required=True, help='域名 (如example.com)')
parser.add_argument('--subdomain', default='@', help='子域名 (默认:@ 表示主域名)')
parser.add_argument('--type', required=True, choices=['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'SRV', 'CAA'], help='记录类型')
parser.add_argument('--value', required=True, help='记录值')
parser.add_argument('--line', default='默认', help='线路 (默认:默认)')
parser.add_argument('--ttl', type=int, default=600, help='TTL(秒默认600)')
parser.add_argument('--force', action='store_true', help='强制更新现有记录 (不询问)')
parser.add_argument('--create-domain', action='store_true', help='域名不存在时自动创建')
parser.add_argument('--remark', default='', help='记录备注')
args = parser.parse_args()
success = deploy_record(
domain=args.domain,
subdomain=args.subdomain,
record_type=args.type,
value=args.value,
line=args.line,
ttl=args.ttl,
force=args.force,
create_domain=args.create_domain,
remark=args.remark
)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()