#!/usr/bin/env python3 """ DNSPod单条记录部署脚本 用于快速添加或更新DNS记录 支持 .env 文件配置敏感信息 """ import os import sys import json import hmac import hashlib import time from datetime import datetime from pathlib import Path from urllib.parse import urlencode, quote try: import requests except ImportError: print("错误: 缺少 requests 库") print("请运行: pip install -r requirements.txt") sys.exit(1) # 加载 .env 文件 def load_env(): """加载.env文件""" env_file = Path(__file__).parent.parent / '.env' if env_file.exists(): with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() # 加载 .env load_env() # API配置 API_ENDPOINT = "dnspod.tencentcloudapi.com" API_VERSION = "2021-03-23" REGION = "ap-guangzhou" def get_credentials(): """从环境变量或.env文件获取腾讯云凭证""" secret_id = os.getenv('TENCENT_SECRET_ID') secret_key = os.getenv('TENCENT_SECRET_KEY') if not secret_id or not secret_key: print("错误: 未找到腾讯云API密钥") print("\n请设置环境变量或创建 .env 文件:") print(" 方式1: 设置环境变量") print(" export TENCENT_SECRET_ID=\"你的SecretId\"") print(" export TENCENT_SECRET_KEY=\"你的SecretKey\"") print("\n 方式2: 创建 .env 文件") print(" cp .env.example .env") print(" 然后编辑 .env 文件,填入你的密钥") print("\n获取密钥地址: https://console.cloud.tencent.com/cam/capi") sys.exit(1) return secret_id, secret_key def sign_request(secret_id, secret_key, action, params): """ 生成腾讯云API签名 文档: https://cloud.tencent.com/document/product/1427/56152 """ # 1. 构造请求体 body = json.dumps(params) # 2. 构造规范请求串 # 请求方法 http_request_method = "POST" # 请求URI canonical_uri = "/" # 请求查询字符串(空) canonical_querystring = "" # 请求头 canonical_headers = f"content-type:application/json\nhost:{API_ENDPOINT}\n" signed_headers = "content-type;host" # 请求哈希值 hashed_request_payload = hashlib.sha256(body.encode('utf-8')).hexdigest() canonical_request = ( http_request_method + "\n" + canonical_uri + "\n" + canonical_querystring + "\n" + canonical_headers + "\n" + signed_headers + "\n" + hashed_request_payload ) # 3. 构造待签名字符串 algorithm = "TC3-HMAC-SHA256" timestamp = int(time.time()) date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d') credential_scope = f"{date}/{API_VERSION}/tc3_request" hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() string_to_sign = ( algorithm + "\n" + str(timestamp) + "\n" + credential_scope + "\n" + hashed_canonical_request ) # 4. 计算签名 def _hmac_sha256(key, msg): return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() secret_date = _hmac_sha256(f"TC3{secret_key}".encode('utf-8'), date) secret_service = _hmac_sha256(secret_date, API_VERSION) secret_signing = _hmac_sha256(secret_service, "tc3_request") signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest() # 5. 构造Authorization头 authorization = ( algorithm + " " + "Credential=" + secret_id + "/" + credential_scope + ", " + "SignedHeaders=" + signed_headers + ", " + "Signature=" + signature ) return { "authorization": authorization, "body": body, "timestamp": timestamp } def call_api(action, params): """调用腾讯云API""" secret_id, secret_key = get_credentials() # 生成签名 sig = sign_request(secret_id, secret_key, action, params) # 构造请求头 headers = { "Authorization": sig["authorization"], "Content-Type": "application/json", "Host": API_ENDPOINT, "X-TC-Action": action, "X-TC-Timestamp": str(sig["timestamp"]), "X-TC-Version": API_VERSION, "X-TC-Region": REGION } # 发送请求 url = f"https://{API_ENDPOINT}/" response = requests.post(url, headers=headers, data=sig["body"]) return response.json() def check_domain_exists(domain): """检查域名是否存在""" try: result = call_api("DescribeDomainList", {}) domains = [d["Name"] for d in result.get("Response", {}).get("DomainList", [])] return domain in domains except Exception as e: print(f"检查域名失败: {e}") return False def create_domain(domain): """创建域名""" print(f"正在创建域名: {domain}") try: result = call_api("CreateDomain", {"Domain": domain}) print(f"✓ 域名创建成功: {domain}") return True except Exception as e: print(f"✗ 域名创建失败: {e}") return False def find_record(domain, subdomain, record_type): """查找现有记录""" try: params = { "Domain": domain, "Subdomain": subdomain, "RecordType": record_type } result = call_api("DescribeRecordList", params) records = result.get("Response", {}).get("RecordList", []) for record in records: if record.get("Name") == subdomain and record.get("Type") == record_type: return record return None except Exception as e: print(f"查找记录失败: {e}") return None def create_record(domain, subdomain, record_type, value, line="默认", ttl=600, remark=""): """创建DNS记录""" params = { "Domain": domain, "RecordType": record_type, "RecordLine": line, "Value": value, "TTL": ttl } # 添加子域名 if subdomain and subdomain != "@": params["SubDomain"] = subdomain # 添加备注 if remark: params["Remark"] = remark try: result = call_api("CreateRecord", params) record_id = result.get("Response", {}).get("RecordId") print(f"✓ 记录创建成功: {subdomain or '@'}.{domain} ({record_type}) → {value}") return record_id except Exception as e: print(f"✗ 记录创建失败: {e}") return None def modify_record(domain, record_id, subdomain, record_type, value, line="默认", ttl=600, remark=""): """修改DNS记录""" params = { "Domain": domain, "RecordId": record_id, "RecordType": record_type, "RecordLine": line, "Value": value, "TTL": ttl } if subdomain and subdomain != "@": params["SubDomain"] = subdomain if remark: params["Remark"] = remark try: result = call_api("ModifyRecord", params) print(f"✓ 记录更新成功: {subdomain or '@'}.{domain} ({record_type}) → {value}") return True except Exception as e: print(f"✗ 记录更新失败: {e}") return False def deploy_record(domain, subdomain, record_type, value, line="默认", ttl=600, force=False, create_domain_flag=False, remark=""): """部署DNS记录(创建或更新)""" print(f"\n部署DNS记录: {subdomain or '@'}.{domain}") print(f" 类型: {record_type}") print(f" 值: {value}") print(f" 线路: {line}") print(f" TTL: {ttl}s") # 检查域名是否存在 if not check_domain_exists(domain): if create_domain_flag: if not create_domain(domain): return False else: print(f"✗ 域名不存在: {domain}") print(f" 提示: 使用 --create-domain 自动创建域名") return False # 查找现有记录 existing = find_record(domain, subdomain, record_type) if existing: if not force: old_value = existing.get("Value", "") print(f"\n记录已存在:") print(f" 当前值: {old_value}") print(f" 新值: {value}") # 检查是否需要更新 if old_value == value: print(" 值未变化,无需更新") return True response = input("\n是否更新? (y/N): ") if response.lower() != 'y': print(" 已取消") return False record_id = existing.get("RecordId") return modify_record(domain, record_id, subdomain, record_type, value, line, ttl, remark) else: return create_record(domain, subdomain, record_type, value, line, ttl, remark) def main(): import argparse parser = argparse.ArgumentParser(description='DNSPod记录快速部署', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' 示例: # 添加www的A记录 %(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 # 添加主域名记录 %(prog)s --domain example.com --subdomain @ --type A --value 1.2.3.4 # 添加CNAME记录 %(prog)s --domain example.com --subdomain cdn --type CNAME --value cdn.example.com # 强制更新现有记录 %(prog)s --domain example.com --subdomain www --type A --value 1.2.3.5 --force # 域名不存在时自动创建 %(prog)s --domain example.com --subdomain www --type A --value 1.2.3.4 --create-domain ''') parser.add_argument('--domain', required=True, help='域名(如: example.com)') parser.add_argument('--subdomain', default='@', help='子域名(默认: @ 表示主域名)') parser.add_argument('--type', required=True, choices=['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'SRV'], help='记录类型') parser.add_argument('--value', required=True, help='记录值') parser.add_argument('--line', default='默认', help='线路(默认: 默认)') parser.add_argument('--ttl', type=int, default=600, help='TTL(秒, 默认: 600)') parser.add_argument('--force', action='store_true', help='强制更新现有记录(不询问)') parser.add_argument('--create-domain', action='store_true', help='域名不存在时自动创建') parser.add_argument('--remark', default='', help='记录备注') args = parser.parse_args() # 部署记录 success = deploy_record( domain=args.domain, subdomain=args.subdomain, record_type=args.type, value=args.value, line=args.line, ttl=args.ttl, force=args.force, create_domain_flag=args.create_domain, remark=args.remark ) sys.exit(0 if success else 1) if __name__ == '__main__': main()