#!/usr/bin/env python3 """ DNSPod 单条记录部署脚本 使用腾讯云 API 3.0 (TC3-HMAC-SHA256 签名) 文档:https://cloud.tencent.com/document/product/1427/56189 """ import os import sys import json import hmac import hashlib import time from datetime import datetime from pathlib import Path try: import requests except ImportError: print("错误:缺少 requests 库") print("请运行:pip install -r requirements.txt") sys.exit(1) # 加载 .env 文件 def load_env(): """加载.env 文件""" env_file = Path(__file__).parent.parent / '.env' if env_file.exists(): with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() # 加载 .env load_env() # API 配置 API_ENDPOINT = "dnspod.tencentcloudapi.com" API_VERSION = "2021-03-23" SERVICE = "dnspod" REGION = "ap-guangzhou" def get_credentials(): """从环境变量或.env 文件获取腾讯云凭证""" secret_id = os.getenv('TENCENT_SECRET_ID') secret_key = os.getenv('TENCENT_SECRET_KEY') if not secret_id or not secret_key: print("错误:未找到腾讯云 API 密钥") print("\n请设置环境变量或创建 .env 文件:") print(" TENCENT_SECRET_ID=AKIDxxxxxxxxxxxxxxxx") print(" TENCENT_SECRET_KEY=xxxxxxxxxxxxxxxx") print("\n获取密钥地址:https://console.cloud.tencent.com/cam/capi") sys.exit(1) return secret_id, secret_key def sign_request(secret_id, secret_key, action, params): """ 生成腾讯云 API 3.0 签名 (TC3-HMAC-SHA256) 文档:https://cloud.tencent.com/document/product/1427/56189 """ # 1. 构造请求体 body = json.dumps(params) # 2. 构造规范请求串 http_request_method = "POST" canonical_uri = "/" canonical_querystring = "" canonical_headers = f"content-type:application/json\nhost:{API_ENDPOINT}\n" signed_headers = "content-type;host" hashed_request_payload = hashlib.sha256(body.encode('utf-8')).hexdigest() canonical_request = ( f"{http_request_method}\n" f"{canonical_uri}\n" f"{canonical_querystring}\n" f"{canonical_headers}\n" f"{signed_headers}\n" f"{hashed_request_payload}" ) # 3. 构造待签名字符串 algorithm = "TC3-HMAC-SHA256" timestamp = int(time.time()) date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d') credential_scope = f"{date}/{SERVICE}/tc3_request" hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() string_to_sign = ( f"{algorithm}\n" f"{timestamp}\n" f"{credential_scope}\n" f"{hashed_canonical_request}" ) # 4. 计算签名 def _hmac_sha256(key, msg): return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() secret_date = _hmac_sha256(f"TC3{secret_key}".encode('utf-8'), date) secret_service = _hmac_sha256(secret_date, SERVICE) secret_signing = _hmac_sha256(secret_service, "tc3_request") signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest() # 5. 构造 Authorization 头 authorization = ( f"{algorithm} " f"Credential={secret_id}/{credential_scope}, " f"SignedHeaders={signed_headers}, " f"Signature={signature}" ) return { "authorization": authorization, "body": body, "timestamp": timestamp } def call_api(action, params): """调用腾讯云 API 3.0""" secret_id, secret_key = get_credentials() # 生成签名 sig = sign_request(secret_id, secret_key, action, params) # 构造请求头 headers = { "Authorization": sig["authorization"], "Content-Type": "application/json", "Host": API_ENDPOINT, "X-TC-Action": action, "X-TC-Timestamp": str(sig["timestamp"]), "X-TC-Version": API_VERSION, "X-TC-Region": REGION } # 发送请求 url = f"https://{API_ENDPOINT}/" response = requests.post(url, headers=headers, data=sig["body"]) return response.json() def deploy_record(domain, subdomain, record_type, value, line="默认", ttl=600, force=False, create_domain=False, remark=""): """部署 DNS 记录""" print(f"\n{'='*60}") print(f"DNSPod DNS 记录部署 (腾讯云 API 3.0)") print(f"{'='*60}\n") print(f"域名:{domain}") print(f"子域名:{subdomain}") print(f"记录类型:{record_type}") print(f"记录值:{value}") print(f"线路:{line}") print(f"TTL: {ttl}秒") print() # 1. 检查域名是否存在 print("正在检查域名...") domain_result = call_api("DescribeDomainList", { "Limit": 100, "Offset": 0 }) domain_id = None if domain_result and "Response" in domain_result: if "Error" in domain_result["Response"]: error = domain_result["Response"]["Error"] print(f"API 错误:{error['Code']}") print(f"消息:{error['Message']}") return False domain_list = domain_result["Response"].get("DomainList", []) for d in domain_list: if d.get("Name") == domain: domain_id = d.get("DomainId") break if not domain_id: if create_domain: print(f"域名不存在,正在创建:{domain}") create_result = call_api("CreateDomain", {"Domain": domain}) if not create_result or "Error" in create_result.get("Response", {}): print("创建域名失败") return False domain_id = create_result["Response"].get("DomainId") print(f"✓ 域名创建成功,ID: {domain_id}") else: print(f"错误:域名 {domain} 不存在") print("提示:使用 --create-domain 参数自动创建域名") return False print(f"✓ 域名存在,ID: {domain_id}") # 2. 检查记录是否已存在 print("正在查询现有记录...") records_result = call_api("DescribeRecordList", { "Domain": domain, "Subdomain": subdomain, "RecordType": record_type }) if not records_result: print("查询记录失败") return False # 处理"记录列表为空"的情况 (这不是错误) records = [] if "Response" in records_result: if "Error" in records_result["Response"]: error = records_result["Response"]["Error"] # 如果是"记录列表为空",继续处理 (没有现有记录) if error['Code'] != 'ResourceNotFound.NoDataOfRecord': print(f"API 错误:{error['Code']}") print(f"消息:{error['Message']}") return False else: records = records_result["Response"].get("RecordList", []) # 3. 如果记录存在且不是强制更新,提示用户 if records and not force: print(f"\n⚠️ 发现现有记录:") for record in records[:3]: print(f" {record.get('Name', '')} {record.get('Type', '')} -> {record.get('Value', '')}") if len(records) > 3: print(f" ... 还有 {len(records) - 3} 条记录") response = input("\n是否更新现有记录?(y/N): ") if response.lower() != 'y': print("已取消") return False force = True # 4. 如果强制更新,先删除现有记录 if records and force: print(f"\n正在删除 {len(records)} 条现有记录...") for record in records: record_id = record.get('RecordId') if record_id: delete_result = call_api("DeleteRecord", { "Domain": domain, "RecordId": record_id }) if delete_result and "Error" not in delete_result.get("Response", {}): print(f" ✓ 删除记录:{record.get('Name')} {record.get('Type')}") # 5. 创建新记录 print(f"\n正在创建新记录...") create_params = { "Domain": domain, "SubDomain": subdomain, # 注意:是 SubDomain 不是 Subdomain "RecordType": record_type, "RecordLine": line, "Value": value, "TTL": ttl } if remark: create_params["Remark"] = remark create_result = call_api("CreateRecord", create_params) if not create_result: print("创建记录失败") return False if "Error" in create_result.get("Response", {}): error = create_result["Response"]["Error"] print(f"API 错误:{error['Code']}") print(f"消息:{error['Message']}") return False record_id = create_result["Response"].get("RecordId") print(f"\n{'='*60}") print(f"✓ DNS 记录创建成功!") print(f"{'='*60}") print(f"记录 ID: {record_id}") print(f"完整记录:{subdomain}.{domain} {record_type} {value}") print(f"线路:{line}") print(f"TTL: {ttl}秒") if remark: print(f"备注:{remark}") print() return True def main(): import argparse parser = argparse.ArgumentParser( description='DNSPod 记录快速部署 (腾讯云 API 3.0)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' 示例: # 添加 www 的 A 记录 %(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 # 添加主域名记录 %(prog)s --domain example.com --subdomain @ --type A --value 1.2.3.4 # 添加 CNAME 记录 %(prog)s --domain example.com --subdomain cdn --type CNAME --value cdn.example.com # 强制更新现有记录 %(prog)s --domain example.com --subdomain www --type A --value 1.2.3.5 --force # 域名不存在时自动创建 %(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 --create-domain 配置: TENCENT_SECRET_ID=AKIDxxxxxxxxxxxxxxxx TENCENT_SECRET_KEY=xxxxxxxxxxxxxxxx 获取密钥:https://console.cloud.tencent.com/cam/capi ''' ) parser.add_argument('--domain', required=True, help='域名 (如:example.com)') parser.add_argument('--subdomain', default='@', help='子域名 (默认:@ 表示主域名)') parser.add_argument('--type', required=True, choices=['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'SRV', 'CAA'], help='记录类型') parser.add_argument('--value', required=True, help='记录值') parser.add_argument('--line', default='默认', help='线路 (默认:默认)') parser.add_argument('--ttl', type=int, default=600, help='TTL(秒,默认:600)') parser.add_argument('--force', action='store_true', help='强制更新现有记录 (不询问)') parser.add_argument('--create-domain', action='store_true', help='域名不存在时自动创建') parser.add_argument('--remark', default='', help='记录备注') args = parser.parse_args() success = deploy_record( domain=args.domain, subdomain=args.subdomain, record_type=args.type, value=args.value, line=args.line, ttl=args.ttl, force=args.force, create_domain=args.create_domain, remark=args.remark ) sys.exit(0 if success else 1) if __name__ == '__main__': main()