普通视图

发现新文章,点击刷新页面。
昨天以前首页

南墙 WAF 系列(三)– API SDK

2025年4月16日 17:47

手里乱七八糟的域名实在是有点多,而这些域名,多数都解析到了同样的站点。或者说是回源是同样的站点,最开始其实并没有套 cdn,中间有段时间连这种 301 302 跳转都被打,就变得很尴尬。301 跳转都能让沙雕给打挂,也的确什么好办法了。

后来发现盾云的 scdn性价比的确不错,于是一股脑的全部套了 scdn,不过,有的也的确没什么流量,于是一些没那么重要的东西放哪里就有些浪费:

所以,思索再三,几天申请了一台 99 块钱的海外轻量,每个月 1t 的流量,感觉应该足够应付这些乱七八糟的服务了。当然,最主要的还是要套上南墙 waf,直接裸奔感觉也不大行。

当然,waf 后面的回源多数还是家里的服务器。这就需要有个工具来更新 waf 的回源地址,今天又完善了一下南墙的 api 接口,代码直接提交 github 的,有需要可以自取。

功能特性

 

  • 证书管理
    • 获取证书列表
    • 检查证书
    • 提交证书配置
    • 删除证书
  • 站点配置
    • 获取站点列表
    • 更新站点配置
  • 用户认证
    • 用户名密码登录
    • 支持双因素认证(OTP)

环境要求

 

  • Python 3.x
  • 必需的 Python 包:
    • requests
    • jwt
    • urllib3

安装说明

 

  1. 克隆仓库:
git clone https://github.com/obaby/baby-nanqiang-waf-api-toos.git 
cd baby-nanqiang-waf-api-toos
  1. 安装依赖包:
pip install -r requirements.txt

 

使用说明

基础用法

from baby_nanqiang_api_tools import NanQiangAPI

# 初始化 API 客户端
api = NanQiangAPI(base_url="https://lang.bi:443")

# 登录
result = api.login(username="your_username", password="your_password")
if result:
    print("登录成功")
    
    # 获取证书列表
    cert_list = api.get_cert_list()
    if cert_list:
        parsed_certs = api.parse_cert_list(cert_list)
        print("证书列表:", parsed_certs)
证书管理
# 从文件检查证书
cert_result = api.check_cert_from_files("cert.pem", "key.pem")
if cert_result:
    # 提交证书配置
    submit_result = api.submit_cert_config(cert_result)
    
# 删除证书
delete_result = api.delete_cert(cert_id=123)

站点配置

 

# 获取站点列表
site_list = api.get_site_list()
if site_list:
    parsed_sites = api.parse_site_list(site_list)
    print("站点列表:", parsed_sites)

# 更新站点配置
update_result = api.update_site_config(
    ip="192.168.1.1",
    port=443,
    site_id=123,
    uid=456,
    description="我的网站"
)

仓库地址:
https://github.com/obaby/baby-nanqiang-waf-api-tools

The post 南墙 WAF 系列(三)– API SDK appeared first on obaby@mars.

南墙 WAF 系列(二)– 网站证书自动更新

2025年4月2日 14:48

相对管理后台的 ssl来说,其实网站的 ssl 证书才是正事,毕竟这个关系到网站的访问。按照官方的说法在开放 80 端口的情况下,南墙可以自动申请更新证书,不过后台没找到配置的地方,我的 v4 的 80 也是不通的,所以就需要自己去维护管理证书了。

然而,上午在问了管理之后,得到的答复是没有 api,可以自己抓包进行修改。

嗐,这么看来其实也没啥,最起码说明后台的 api 接口是可以直接拿来用的。即使是有 api 文档,也是得自己去看,去写,没有的话 curl 抓包一样能解决问题。按照之前的方法,只直接复制 curl 给 cursor 就可以了。

api 文件baby_nanqiang_api_tools.py内容:

#!/usr/bin/env python3
import requests
import json
import jwt
from datetime import datetime
import os
import urllib3

# 禁用 SSL 验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class NanQiangAPI:
    def __init__(self, base_url="https://lang.bi:443"):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.verify = False  # 忽略SSL证书验证
        self.token = None
        self._setup_headers()

    def _setup_headers(self):
        """设置请求头"""
        self.headers = {
            'accept': 'application/json, text/plain, */*',
            'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
            'cache-control': 'no-cache',
            'content-type': 'application/json',
            'origin': self.base_url,
            'pragma': 'no-cache',
            'priority': 'u=1, i',
            'referer': f'{self.base_url}/',
            'sec-ch-ua': '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"macOS"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-origin',
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
        }

    def _update_headers_with_token(self):
        """更新请求头,添加token"""
        if self.token:
            self.headers['Authorization'] = self.token  # 直接使用token,不添加'Bearer '前缀

    def delete_cert(self, cert_id):
        """
        删除指定ID的证书
        :param cert_id: 证书ID
        :return: 删除结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/{cert_id}"

        try:
            response = self.session.delete(
                url,
                headers=self.headers
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"删除证书失败: {response_data['err']}")
                return None
            
            # 检查删除是否成功
            if response_data.get('result') == 'success' and response_data.get('RowsAffected') > 0:
                print(f"证书 {cert_id} 删除成功")
                return True
            else:
                print(f"证书 {cert_id} 删除失败: 未找到证书或删除操作未生效")
                return False
            
        except requests.exceptions.RequestException as e:
            print(f"删除证书请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def parse_cert_list(self, cert_list):
        """
        解析证书列表数据
        :param cert_list: 证书列表数据
        :return: 解析后的证书信息列表
        """
        if not cert_list:
            return None

        parsed_certs = []
        for cert in cert_list:
            try:
                # 解析SNI字段(JSON字符串)
                sni_list = json.loads(cert.get('sni', '[]'))
                
                parsed_cert = {
                    'id': cert.get('id'),
                    'sni': sni_list,
                    'expire_time': cert.get('expire_time'),
                    'update_time': cert.get('update_time')
                }
                parsed_certs.append(parsed_cert)
            except json.JSONDecodeError as e:
                print(f"解析SNI字段失败: {str(e)}")
                continue
            except Exception as e:
                print(f"解析证书数据失败: {str(e)}")
                continue

        return parsed_certs

    def get_cert_list(self):
        """
        获取证书列表
        :return: 证书列表
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/"

        try:
            response = self.session.get(
                url,
                headers=self.headers
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"获取证书列表失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"获取证书列表请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def login(self, username, password, otp=""):
        """
        登录接口
        :param username: 用户名
        :param password: 密码
        :param otp: 双因素认证码(可选)
        :return: 登录响应
        """
        url = f"{self.base_url}/api/v1/users/login"
        data = {
            "usr": username,
            "pwd": password,
            "otp": otp
        }

        try:
            response = self.session.post(
                url,
                headers=self.headers,
                json=data
            )
            
            # 获取响应数据
            response_data = response.json()
            
            # 检查是否有错误信息
            if 'err' in response_data:
                print(f"登录失败: {response_data['err']}")
                return None
            
            # 保存token
            if 'token' in response_data:
                self.token = response_data['token']
                self._update_headers_with_token()
                
                # # 解析token信息
                # try:
                #     # 使用 jwt.decode 替代 jwt.decode_complete
                #     token_data = jwt.decode(self.token, options={"verify_signature": False})
                #     exp_timestamp = token_data.get('exp')
                #     if exp_timestamp:
                #         exp_date = datetime.fromtimestamp(exp_timestamp)
                #         print(f"Token 有效期至: {exp_date}")
                # except Exception as e:
                #     print(f"无法解析token信息: {str(e)}")
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"登录请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def check_cert(self, cert_content, key_content, mode=0):
        """
        检查证书
        :param cert_content: 证书内容
        :param key_content: 私钥内容
        :param mode: 模式,默认为0
        :return: 检查结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/check"
        
        # 准备multipart/form-data数据
        files = {
            'mode': (None, str(mode)),
            'cert': (None, cert_content),
            'key': (None, key_content)
        }

        try:
            # 临时移除content-type,让requests自动设置
            headers = self.headers.copy()
            headers.pop('content-type', None)
            
            response = self.session.post(
                url,
                headers=headers,
                files=files
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"证书检查失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"证书检查请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def check_cert_from_files(self, cert_file_path, key_file_path, mode=0):
        """
        从文件检查证书
        :param cert_file_path: 证书文件路径
        :param key_file_path: 私钥文件路径
        :param mode: 模式,默认为0
        :return: 检查结果
        """
        try:
            with open(cert_file_path, 'r') as f:
                cert_content = f.read()
            with open(key_file_path, 'r') as f:
                key_content = f.read()
                
            return self.check_cert(cert_content, key_content, mode)
            
        except FileNotFoundError as e:
            print(f"文件不存在: {str(e)}")
            return None
        except Exception as e:
            print(f"读取文件失败: {str(e)}")
            return None

    def submit_cert_config(self, check_result):
        """
        提交证书配置
        :param check_result: 证书检查的结果数据
        :return: 提交结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        if not check_result:
            print("无效的证书检查结果")
            return None

        url = f"{self.base_url}/api/v1/certs/config"
        
        # 准备提交数据
        data = {
            "id": check_result.get("id", 0),
            "sni": check_result.get("sni", "[]"),
            "cert": check_result.get("cert", ""),
            "key": check_result.get("key", ""),
            "expire_time": check_result.get("expire_time", ""),
            "update_time": check_result.get("update_time", "")
        }

        try:
            response = self.session.post(
                url,
                headers=self.headers,
                json=data
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"证书配置提交失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"证书配置提交请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def is_logged_in(self):
        """
        检查是否已登录
        :return: bool
        """
        return self.token is not None

def main():
    # 使用示例
    api = NanQiangAPI()
    
    # 登录信息
    username = "obaby"
    password = "obaby@mars"
    
    # 执行登录
    result = api.login(username, password)
    
    if result:
        print("登录成功:")
        print(json.dumps(result, indent=2, ensure_ascii=False))
        print(f"Token: {api.token}")
        
        # 获取证书列表
        cert_list = api.get_cert_list()
        if cert_list:
            # 解析证书列表
            parsed_certs = api.parse_cert_list(cert_list)
            if parsed_certs:
                print("解析后的证书列表:")
                print(json.dumps(parsed_certs, indent=2, ensure_ascii=False))
                
                # # 删除证书示例
                # cert_id = 4  # 要删除的证书ID
                # delete_result = api.delete_cert(cert_id)
                # if delete_result:
                #     print(f"证书 {cert_id} 删除成功")
                # else:
                #     print(f"证书 {cert_id} 删除失败")
        
        # 证书检查示例
        cert_file = "path/to/cert.pem"
        key_file = "path/to/key.pem"
        
        if os.path.exists(cert_file) and os.path.exists(key_file):
            return
            # 先检查证书
            cert_result = api.check_cert_from_files(cert_file, key_file)
            if cert_result:
                print("证书检查结果:")
                print(json.dumps(cert_result, indent=2, ensure_ascii=False))
                
                # 提交证书配置
                submit_result = api.submit_cert_config(cert_result)
                if submit_result:
                    print("证书配置提交成功:")
                    print(json.dumps(submit_result, indent=2, ensure_ascii=False))
                else:
                    print("证书配置提交失败")
    else:
        print("登录失败")

if __name__ == "__main__":
    main()

账号不要设置动态密码,如果设置了,那就创建一个新账号。

获取证书的脚本参考上一篇文章,对应的路径自己调整。更新证书的代码site_cert_auto_update_tool.py:

#!/usr/bin/env python3
import os
import subprocess
import hashlib
import json
from datetime import datetime
import logging
from baby_nanqiang_api_tools import NanQiangAPI

# Configuration
CERT_SOURCE_DIR = "/root/.acme.sh/h4ck.org.cn_ecc"
CERT_FILE = "fullchain.cer"
KEY_FILE = "h4ck.org.cn.key"
HASH_FILE = "web_cert_hash.json"
CERT_SCRIPT = "get_web_cert.sh"

def setup_logging():
    """设置日志"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('web_cert_update.log'),
            logging.StreamHandler()
        ]
    )

def get_file_hash(file_path):
    """计算文件的SHA-256哈希值"""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def save_cert_hash(cert_hash, key_hash):
    """保存证书和私钥的哈希值到JSON文件"""
    with open(HASH_FILE, 'w') as f:
        json.dump({
            'cert_hash': cert_hash,
            'key_hash': key_hash
        }, f)

def load_cert_hash():
    """从JSON文件加载证书和私钥的哈希值"""
    try:
        with open(HASH_FILE, 'r') as f:
            data = json.load(f)
            return data.get('cert_hash'), data.get('key_hash')
    except (FileNotFoundError, json.JSONDecodeError):
        return None, None

def run_get_cert_script(script_path=None):
    """
    执行获取证书的脚本
    :param script_path: 脚本路径,如果为None则使用默认的get_web_cert.sh
    :return: bool 是否执行成功
    """
    try:
        # 如果没有指定脚本路径,使用默认的get_web_cert.sh
        if script_path is None:
            script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), CERT_SCRIPT)
        
        # 检查脚本是否存在
        if not os.path.exists(script_path):
            logging.error(f"错误: 脚本文件 {script_path} 不存在")
            return False
            
        # 检查脚本是否可执行
        if not os.access(script_path, os.X_OK):
            logging.error(f"错误: 脚本文件 {script_path} 没有执行权限")
            return False
            
        # 执行脚本
        result = subprocess.run(['sh', script_path], 
                              capture_output=True, 
                              text=True)
        
        # 检查执行结果
        if result.returncode == 0:
            logging.info("证书获取脚本执行成功")
            if result.stdout:
                logging.info("脚本输出:\n%s", result.stdout)
            return True
        else:
            logging.error(f"证书获取脚本执行异常,返回码: {result.returncode}")
            if result.stderr:
                logging.error("异常输出:\n%s", result.stderr)
            return True
            
    except Exception as e:
        logging.error(f"执行证书获取脚本时发生错误: {str(e)}")
        return False

def read_file_content(file_path):
    """读取文件内容"""
    try:
        with open(file_path, 'r') as f:
            return f.read()
    except Exception as e:
        logging.error(f"读取文件 {file_path} 失败: {str(e)}")
        return None

def is_cert_expired(expire_time_str):
    """
    检查证书是否过期或即将过期(7天内)
    :param expire_time_str: 过期时间字符串
    :return: bool 是否过期或即将过期
    """
    try:
        expire_time = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S")
        now = datetime.now()
        days_until_expire = (expire_time - now).days
        return days_until_expire <= 7
    except Exception as e:
        logging.error(f"解析过期时间失败: {str(e)}")
        return False

def process_same_sni_certs(api, parsed_certs, current_sni, current_cert_id):
    """
    处理具有相同SNI的证书
    :param api: API实例
    :param parsed_certs: 解析后的证书列表
    :param current_sni: 当前证书的SNI
    :param current_cert_id: 当前证书的ID
    :return: None
    """
    # 筛选出相同SNI的证书
    same_sni_certs = [cert for cert in parsed_certs 
                     if cert['sni'] == current_sni and cert['id'] != current_cert_id]
    
    if not same_sni_certs:
        return
        
    # 按过期时间排序(从早到晚)
    same_sni_certs.sort(key=lambda x: datetime.strptime(x['expire_time'], "%Y-%m-%d %H:%M:%S"))
    
    # 检查是否有过期或即将过期的证书
    for cert in same_sni_certs:
        if is_cert_expired(cert['expire_time']):
            logging.info(f"删除过期证书 ID: {cert['id']}")
            if not api.delete_cert(cert['id']):
                logging.error(f"删除证书 {cert['id']} 失败")
    
    # 检查是否有过期时间相同的证书
    if len(same_sni_certs) > 1:
        # 获取第一个证书的过期时间作为基准
        base_expire_time = same_sni_certs[0]['expire_time']
        
        # 删除过期时间相同的证书(保留第一个)
        for cert in same_sni_certs[1:]:
            if cert['expire_time'] == base_expire_time:
                logging.info(f"删除重复过期时间的证书 ID: {cert['id']}")
                if not api.delete_cert(cert['id']):
                    logging.error(f"删除证书 {cert['id']} 失败")

def main():
    # 设置日志
    setup_logging()
    
    try:
        # 执行证书获取脚本
        if not run_get_cert_script():
            logging.error("获取证书失败,退出程序")
            return
            
        # 检查证书文件是否存在
        cert_path = os.path.join(CERT_SOURCE_DIR, CERT_FILE)
        key_path = os.path.join(CERT_SOURCE_DIR, KEY_FILE)
        
        if not (os.path.exists(cert_path) and os.path.exists(key_path)):
            logging.error("证书文件不存在,退出程序")
            return
            
        # 计算新文件的哈希值
        new_cert_hash = get_file_hash(cert_path)
        new_key_hash = get_file_hash(key_path)
        
        # 获取旧的哈希值
        old_cert_hash, old_key_hash = load_cert_hash()
        
        # 检查文件是否发生变化
        if new_cert_hash != old_cert_hash or new_key_hash != old_key_hash:
            logging.info("证书文件已发生变化,开始更新流程")
            
            # 读取证书和私钥内容
            cert_content = read_file_content(cert_path)
            key_content = read_file_content(key_path)
            
            if not cert_content or not key_content:
                logging.error("读取证书文件失败")
                return
                
            # 初始化API
            api = NanQiangAPI()
            
            # 登录
            if not api.login("obaby", "obaby@mars"):
                logging.error("登录失败")
                return
                
            # 检查证书
            check_result = api.check_cert(cert_content, key_content)
            if not check_result:
                logging.error("证书检查失败")
                return
                
            # 提交证书配置
            if not api.submit_cert_config(check_result):
                logging.error("提交证书配置失败")
                return
                
            # 获取证书列表
            cert_list = api.get_cert_list()
            if not cert_list:
                logging.error("获取证书列表失败")
                return
                
            # 解析证书列表
            parsed_certs = api.parse_cert_list(cert_list)
            if not parsed_certs:
                logging.error("解析证书列表失败")
                return
                
            # 获取当前证书的SNI
            current_sni = check_result.get('sni', '[]')
            try:
                current_sni = json.loads(current_sni)
            except json.JSONDecodeError:
                logging.error("解析当前证书SNI失败")
                return
                
            # 处理相同SNI的证书
            process_same_sni_certs(api, parsed_certs, current_sni, check_result.get('id'))
            
            # 保存新的哈希值
            save_cert_hash(new_cert_hash, new_key_hash)
            logging.info("证书更新完成")
        else:
            logging.info("证书文件未发生变化,无需更新")
            
    except Exception as e:
        logging.error(f"程序执行出错: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()

添加定时任务,每天,或者每几天:

0 2 * * * /usr/bin/python3 /home/soft/baby-nanqiang-cert-tools/site_cert_auto_update_tool.py >> /home/soft/baby-nanqiang-cert-tools/web_cert_manager.log 2>&1

最终效果:

The post 南墙 WAF 系列(二)– 网站证书自动更新 appeared first on obaby@mars.

是 IPv6 吖 — V6 重生记

2025年3月31日 10:27

上周五的时候,在杜老师的聊天室聊天,hehe 提到一个关于家里 ipv6 地址申请的问题。这时候才想到自己家里的网络应该也是有 ipv6 的地址的。至于地址是不是公网的那就不知道了。

而至于想要弄这个东西,其实还有一个原因是 he.net 的 ipv6 徽章已经很久没更新了,还差最后一步 ipv6 only 的网络访问测试,而测试的域名就是 h4ck.org.cn。
IPv6 Certification Badge for obaby

为了通过这个测试,自然要折腾一下。通过之后,he说会免费邮寄一个 T 恤衫,尺码和地址都更新了。不过这跨国的快递,不知道能不能收到。

至于能不能收到,这就只能耐心等待啦。

远程登录路由器,直接访问 ip 地址,然后高级的一幕就出现了,竟然直接打开了路由器的登录页面:

那么也就是说联通在 v6 协议下竟然没有封禁 80 端口,这样的话我忽然就有了个大胆的想法。如果路由器将 v6 的映射打开,直接访问 v6 的地址,忽略证书错误。然后网站就顺利打开了:

既然如此,那么这一来也解决了自己的 cdn 流量超限的问题。

这个月流量超限之后,买了 100G 的扩展包,结果就用了四天就又没了。为了解决流量问题,文章中的视频,直接通过 url 转发了。而至于首页右下角的图片就直接去掉了。不知道是访问量还是神马问题,这些图片一天跑十几个 G 的流量。

然而,到现在就出现了另外几个问题,要想让网站直接在互联网上访问,没有任何的防护措施,的确感觉不怎么靠谱。

1.家里的 V6 地址也是动态的,需要能够动态更新 ipv6 的 AAAA 记录。

2.在家里的主机上安装 waf 系统,提供基础的防御功能。

3.其他的未知问题。

AAAA 记录

在测试的时候,添加 AAAA 记录,会因为存在 cname 记录而导致添加失败,AAAA 记录和 CNAME 记录有冲突,请先删除或暂停现有的 CNAME 记录后重试:

此时针对不同的线路分别添加解析就 ok 了:

那么,在这之后就来到了第二个问题,怎么获取本地的公网 ipv6地址。

最直接的想法是直接通过获取 ipv4 地址类似的写法,来获取 ipv6 的地址,让 cursor 给写了类似的代码:

def get_ipv6_by_httpbin():
    """通过 httpbin.org 获取 IPv6 地址"""
    url = 'https://api6.ipify.org?format=json'
    try:
        resp = request.urlopen(url=url, timeout=10).read()
        data = json.loads(resp.decode("utf-8"))
        if 'ip' in data:
            return data['ip']
        return None
    except Exception as e:
        logging.warning("get_ipv6_by_httpbin FAILED, error: %s", str(e))
        return None

def get_ipv6_by_icanhazip():
    """通过 icanhazip.com 获取 IPv6 地址"""
    url = 'https://icanhazip.com'
    try:
        resp = request.urlopen(url=url, timeout=10).read()
        ip = resp.decode("utf-8").strip()
        if regex_ipv6.match(ip):
            return ip
        return None
    except Exception as e:
        logging.warning("get_ipv6_by_icanhazip FAILED, error: %s", str(e))
        return None

def get_ipv6_by_ident_me():
    """通过 ident.me 获取 IPv6 地址"""
    url = 'https://v6.ident.me'
    try:
        resp = request.urlopen(url=url, timeout=10).read()
        ip = resp.decode("utf-8").strip()
        if regex_ipv6.match(ip):
            return ip
        return None
    except Exception as e:
        logging.warning("get_ipv6_by_ident_me FAILED, error: %s", str(e))
        return None

实际证明,代码写的不错,在自己的 mac 电脑上的确可以获取到 ipv6 的地址。

然而,在家里的服务器上却使用无法获取 ip 地址,所有 v6 协议的服务都是超时状态。搜索了一堆,问了一大圈的 ai,最终都没解决问题。

后来猜测是不是路由器的问题,于是重新登录路由器的 v6 配置页面,来回切换配置:

看网上有文章会所需要改为 slaac 模式,改过去之后无效,切换成原来的自动,继续沿用上面的配置。断线重连结果网络就好啦。注意,这两个 dns 是腾讯的 dns,不是联通默认的 dns。

然而,此时就出现了另外一个问题,直到这时候我才发现,获取到的这个地址是本地的 v6 地址,而不是路由器的 v6 地址,当然,更恐怖的是这个 v6 地址也是可以在互联网直接访问的。

那么怎么自动更新这个 dns 记录就成了问题,总不能自己去天天改啊。

问小杜无果之后,继续尝试通过路由或者 tracerout 的方式获取,最终都以失败告终。至此,简单的方法算是彻底没了招了,那么就剩下一条路了,之计通过路由器获取,然鹅,tplink 的企业路由器并没有开放相关的 api。只能自己去找接口。

结果在登录页面就被来了个下马威,获取到接口,让 cursor 写完代码之后登录不了。

看起来页面很简单不是,但是这个东西恶心的地方在于登录的密码是加密过得,直接使用明文密码是登录不了的。不过好在这个密码不是动态加密的,直接使用密码登录,截取登录的加密后密码进行登录就 ok 了。剩下的就是获取 ipv6 地址,更新 dnspod 的aaaa 记录:

tplink 相关代码:

import requests
import json
import urllib.parse

def login_tplink(ip, username, password):
    """
    Login to TP-Link router
    :param ip: Router IP address
    :param username: Login username
    :param password: Login password
    :return: Response from the router and stok if successful
    """
    url = f"http://{ip}/"
    
    headers = {
        'Accept': 'text/plain, */*; q=0.01',
        'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Content-Type': 'application/json; charset=UTF-8',
        'Origin': f'http://{ip}',
        'Pragma': 'no-cache',
        'Referer': f'http://{ip}/login.htm',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
        'X-Requested-With': 'XMLHttpRequest'
    }
    
    data = {
        "method": "do",
        "login": {
            "username": username,
            "password": password
        }
    }
    
    try:
        response = requests.post(
            url,
            headers=headers,
            json=data,
            verify=False
        )
        if response.status_code == 200:
            try:
                response_data = response.json()
                if 'stok' in response_data:
                    return response_data['stok']
            except json.JSONDecodeError:
                print("Failed to parse login response as JSON")
        return None
    except requests.exceptions.RequestException as e:
        print(f"Login error occurred: {e}")
        return None

def get_network_info(ip, stok):
    """
    Get network information from TP-Link router
    :param ip: Router IP address
    :param stok: Session token from login
    :return: Network information response
    """
    url = f"http://{ip}/stok={stok}/ds"
    
    headers = {
        'Accept': 'text/plain, */*; q=0.01',
        'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Content-Type': 'application/json; charset=UTF-8',
        'Origin': f'http://{ip}',
        'Pragma': 'no-cache',
        'Referer': f'http://{ip}/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
        'X-Requested-With': 'XMLHttpRequest'
    }
    
    data = {
        "method": "get",
        "network": {
            "table": "if_info"
        }
    }
    
    try:
        response = requests.post(
            url,
            headers=headers,
            json=data,
            verify=False
        )
        return response
    except requests.exceptions.RequestException as e:
        print(f"Network info error occurred: {e}")
        return None

def get_wan1_pppoe_addresses(response_data):
    """
    Parse IPv4 and IPv6 addresses from network info response
    :param response_data: JSON response data
    :return: Dictionary containing IPv4 and IPv6 addresses
    """
    addresses = {
        'ipv4': None,
        'ipv6': None
    }
    
    try:
        if_info = response_data.get('network', {}).get('if_info', [])
        for interface in if_info:
            if 'wan1_pppoe' in interface:
                wan_data = interface['wan1_pppoe']
                if 'ipaddr' in wan_data:
                    addresses['ipv4'] = wan_data['ipaddr']
                if 'ip6addr' in wan_data:
                    addresses['ipv6'] = urllib.parse.unquote(wan_data['ip6addr'])
                break
    except Exception as e:
        print(f"Error parsing wan1_pppoe addresses: {e}")
    
    return addresses

def update_ipv6_nat_mapping(ip, stok, dest_ip):
    """
    Update IPv6 NAT mapping on TP-Link router
    :param ip: Router IP address
    :param stok: Session token from login
    :param dest_ip: Destination IPv6 address
    :return: Response from the router
    """
    url = f"http://{ip}/stok={stok}/ds"
    
    headers = {
        'Accept': 'text/plain, */*; q=0.01',
        'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Content-Type': 'application/json; charset=UTF-8',
        'Origin': f'http://{ip}',
        'Pragma': 'no-cache',
        'Referer': f'http://{ip}/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
        'X-Requested-With': 'XMLHttpRequest'
    }
    
    # URL encode the IPv6 address
    encoded_dest_ip = urllib.parse.quote(dest_ip)
    
    data = {
        "method": "set",
        "firewall": {
            "redirect_4313056213": {
                "name": "mac_server_v6",
                "ip_proto": "IPv6",
                "if": ["WAN"],
                "src_dport": "443",
                "dest_port": "443",
                "dest_ip": encoded_dest_ip,
                "proto": "ALL",
                "loopback_ipaddr": "",
                "enable": "on",
                "src_dport_start": "65536",
                "src_dport_end": "65536",
                "dest_port_start": "65536",
                "dest_port_end": "65536"
            }
        }
    }
    
    try:
        response = requests.post(
            url,
            headers=headers,
            json=data,
            verify=False
        )
        return response
    except requests.exceptions.RequestException as e:
        print(f"Error updating IPv6 NAT mapping: {e}")
        return None

if __name__ == "__main__":
    # Disable SSL warnings
    requests.packages.urllib3.disable_warnings()
    
    # Router credentials
    ip = '192.168.1.1'
    username = 'obaby'
    password = '123456***加密后密码'
    
    # First login to get stok
    stok = login_tplink(ip, username, password)
    
    if stok:
        print(f"Login successful! Got stok: {stok}")
        
        # Get network information using the stok
        network_response = get_network_info(ip, stok)
        
        if network_response:
            try:
                response_data = network_response.json()
                addresses = get_wan1_pppoe_addresses(response_data)
                
                print("\nWAN1 PPPoE Addresses:")
                if addresses['ipv4']:
                    print(f"IPv4: {addresses['ipv4']}")
                if addresses['ipv6']:
                    print(f"IPv6: {addresses['ipv6']}")
                    
                    # Update NAT mapping with the IPv6 address
                    nat_response = update_ipv6_nat_mapping(ip, stok, addresses['ipv6'])
                    if nat_response:
                        print(f"NAT mapping update response: {nat_response.text}")
                    else:
                        print("Failed to update NAT mapping")
            except json.JSONDecodeError:
                print("Failed to parse network response as JSON")
        else:
            print("Failed to get network information")
    else:
        print("Login failed!")

至此第一个问题解决了。

开始第二个小问题,更新 aaaa 记录,这个就比较简单了,直接让 curosr 写就行了:

def get_record_id(domain, sub_domain, record_type='A', record_line='默认'):
    """获取记录ID,支持A和AAAA记录,以及不同的记录线路"""
    url = 'https://dnsapi.cn/Record.List'
    params = parse.urlencode({
        'login_token': cfg['login_token'],
        'format': 'json',
        'domain': domain
    })
    req = request.Request(url=url, data=params.encode('utf-8'), method='POST', headers=header())
    try:
        resp = request.urlopen(req).read().decode()
    except (error.HTTPError, error.URLError, socket.timeout):
        return None
    records = json.loads(resp).get('records', {})
    for item in records:
        if (item.get('name') == sub_domain and 
            item.get('type') == record_type and 
            item.get('line') == record_line):
            return item.get('id')
    return None

def update_ipv6_record(current_ipv6):
    """更新IPv6记录,支持多个记录和不同的记录线路"""
    ipv6_count = int(cfg.get('ipv6_count', '1'))
    ipv6_pool = cfg.get('ipv6_pool', '').split(',')[:ipv6_count]
    cfg['current_ipv6'] = current_ipv6
    
    if current_ipv6 not in ipv6_pool:
        logging.info("new IPv6 found: %s", current_ipv6)
        ipv6_pool.insert(0, current_ipv6)
        cfg['ipv6_pool'] = ','.join([str(x) for x in ipv6_pool[:ipv6_count]])
        
        # 获取所有需要更新的AAAA记录配置
        aaaa_records = cfg.get('aaaa_records', '').split(',')
        for record in aaaa_records:
            if not record.strip():
                continue
            try:
                sub_domain, record_line = record.strip().split(':')
                if update_record('AAAA', current_ipv6, record_line, sub_domain):
                    logging.info(f"成功更新AAAA记录: {sub_domain}.{cfg['domain']} 线路: {record_line}")
                else:
                    logging.error(f"更新AAAA记录失败: {sub_domain}.{cfg['domain']} 线路: {record_line}")
            except ValueError:
                logging.error(f"无效的AAAA记录配置: {record}")
        save_config()
    else:
        logging.info('IPv6 地址无变化,跳过更新')

到这里网站就能正常访问了。

WAF:雷池&南墙

至于 waf 系统,其实自己之前也没怎么系统了解过,也是杜老师推荐了这两个。首先尝试的是雷池,也是杜老师最开始推荐的。

雷池:

个人版是免费的,相对来说配置也比较简单。

官网地址:https://waf-ce.chaitin.cn

自动安装一行命令即可:

bash -c "$(curl -fsSLk https://waf-ce.chaitin.cn/release/latest/manager.sh)"

安装为 docker 模式,相对来说侵入性比较小一些。并且不需要占用 80,443 端口,这一点其实相对比南墙安装配置要求要低一些。

安装之后就可以通过 9443 端口登录了。相关功能示例:

系统概览,不知道是不是因为是 v6 地址的原因,左侧地图都是空白的。

同样,这个地球上也是空白的,底部的功能都需要专业版才能查看

防护模块是全部可用的

加强防御需要专业版

通用配置模块也是 ok 的。

整体来说安装过程比较顺畅也没遇到什么问题,不过访问 ip 由于是通过路由转发进来的需要从 x-forward-for中取这个信息。

南墙

开源免费的 waf 系统

官网地址:https://waf.uusec.com/#/

在使用过程中遇到点问题,不过最后在他们的技术帮助下顺利解决了。在安装之后,首先遇到的问题就是获取的 ip 地址有问题,都是本地的地址。并且不管怎么选择地址,最后都是同一个 ip 地址。

使用测速工具测速之后,ip 地址还是一个,这肯定是有问题的。在群里问了一下,给了个指令修复这个问题:

firewall-cmd --permanent --zone=internal --change-interface=docker0
systemctl restart firewalld && systemctl daemon-reload && systemctl restart docker

不过这么执行之后可能会出现的问题就是所有的服务都访问不了了,需要在 public 区域重新开放相关服务:

sudo firewall-cmd --zone=public --permanent --add-port=10043/tcp
sudo firewall-cmd --zone=public --permanent --add-port=14443/tcp
sudo firewall-cmd --zone=public --permanent --add-port=880/tcp
sudo firewall-cmd --zone=public --permanent --add-port=3306/tcp
sudo firewall-cmd --zone=public --permanent --add-port=9443/tcp
sudo firewall-cmd --zone=public --permanent --add-port=8443/tcp

其他需要开放的服务和端口自行添加即可。

然而,这个命令并没有解决问题。包括卸载重装,其实重装这件事情对我来说有些麻烦,因为服务器的默认 80 和 443 都映射到公网了,如果直接改了也比较麻烦,只能去工控机上停掉 80 的监听,443 的修改端口重新添加映射,毕竟这台主机上相对服务少一些。

重新安装依然没解决问题,这时候提议安装主机版。

然而,更尴尬的事情粗线了,那就是主机版不支持 ubuntu,只能作罢继续使用 docker 版本。

并且安装主机版,需要提前备份数据库,安装脚本会重装 mysql。这一点一定要注意!

这时候管理员提议远程协助,于是将端口映射出去,提供账号密码,等管理员修复。

管理说可能是映射的问题,然而,雷池的没问题,那么说明一定是有解决办法的,管理提到 docker 的网络配置不一样的,于是提议修改网络配置。

最终,亲爱的管理员,成功的修复了问题:

这样这个问题算是解决了,整体而言,感觉雷池的在 v6 测速的时候更绿一些。

好啦,相对来说雷池基本所有的模块都是开放的,除了机器学习部分:

安全态势

系统信息

用户管理

日志

证书管理,这个证书管理直接上传即可,不需要去进行绑定。

cdn 加速,其实感觉更像缓存配置。

规则管理

网站管理,得添加多个。

整体来说体验还是不错的,然而,刚才去看了配置文件感觉还是 bridge 啊。奇怪了。

不过既然问题解决了,那也就不纠结了。

官方文档说明:

https://waf.uusec.com/#/guide/problems?id=%f0%9f%8d%8b-%e5%a6%82%e4%bd%95%e8%a7%a3%e5%86%b3%e5%8d%97%e5%a2%99docker%e7%89%88%e8%8e%b7%e5%8f%96%e7%9a%84%e5%ae%a2%e6%88%b7%e7%ab%afip%e4%b8%ba172%e7%9a%84%e9%97%ae%e9%a2%98%ef%bc%9f

 

🍋 解决部分南墙容器版获取的客户端ip为网桥ip的问题?

1.将Docker网桥加入到防火墙的internal区域,以便获取到真实的IP地址, 假设Docker网桥名称为docker0

firewall-cmd --permanent --zone=internal --change-interface=docker0
systemctl restart firewalld && systemctl daemon-reload && systemctl restart docker

2.如果方法1无效,可以修改docker-compose.yml文件,将uuwaf容器的网络设置为network_mode: host,同时修改数据库连接环境变量UUWAF_DB_DSN中的wafdb为127.0.0.1,并映射wafdb容器的3306端口,重启后生效。

 

其他问题

鉴于主机获取的 ipv6 地址能直接访问,其实我一度想直接把主机的地址更新到 dns aaaa 记录上,但是这么一搞,暴露主机的确不是我最终想要的。

于是想着映射本地的链路地址,然而,端口映射通过链路地址通过路由器的 v6 地址却打不开网站,但是这个链路地址在内网的主机上又能打开网站,于是只能放弃这个做法。获取 ipv6 地址的代码:

import re
import logging
import json
import subprocess
import socket
import os
from urllib import request, error, parse

# 匹配合法 IPv6 地址
regex_ipv6 = re.compile(
    r"(?:inet6\s+)?(fe80:[0-9a-fA-F:]+|"  # 特别处理链路本地地址格式
    + r"(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|"  # 标准格式
    + r"(?:[0-9a-fA-F]{1,4}:){6}:[0-9a-fA-F]{1,4}|"  # 压缩格式
    + r"(?:[0-9a-fA-F]{1,4}:){5}(?::[0-9a-fA-F]{1,4}){1,2}|"
    + r"(?:[0-9a-fA-F]{1,4}:){4}(?::[0-9a-fA-F]{1,4}){1,3}|"
    + r"(?:[0-9a-fA-F]{1,4}:){3}(?::[0-9a-fA-F]{1,4}){1,4}|"
    + r"(?:[0-9a-fA-F]{1,4}:){2}(?::[0-9a-fA-F]{1,4}){1,5}|"
    + r"(?:[0-9a-fA-F]{1,4}:){1}(?::[0-9a-fA-F]{1,4}){1,6}|"
    + r"(?::[0-9a-fA-F]{1,4}){1,7}|"
    + r"::"
    + r")")

# 特别匹配链路本地 IPv6 地址,确保能匹配到 fe80:: 开头的地址
regex_link_local_ipv6 = re.compile(r"inet6\s+(fe80:[0-9a-fA-F:]+)")

def get_ipv6():
    """获取公网 IPv6 地址,使用多个备选方法"""
    return (get_ipv6_by_ifconfig()  # 优先使用本地接口地址
        or get_ipv6_by_httpbin()
        or get_ipv6_by_icanhazip()
        or get_ipv6_by_ident_me()
        or get_ipv6_by_socket())

def get_ipv6_by_ifconfig():
    """通过 ifconfig 命令获取本地 IPv6 地址"""
    try:
        # Windows 系统使用 ipconfig
        if os.name == 'nt':
            cmd = ['ipconfig']
            output = subprocess.check_output(cmd, text=True)
        # Linux/Unix 系统使用 ifconfig
        else:
            cmd = ['ifconfig']
            output = subprocess.check_output(cmd, text=True)
            
        # 按行分割输出
        lines = output.split('\n')
        for line in lines:
            # 查找包含 inet6 的行
            if 'inet6' in line:
                # 使用正则表达式提取 IPv6 地址
                matches = regex_ipv6.findall(line)
                if matches:
                    ipv6 = matches[0]
                    # 排除本地回环地址
                    if not ipv6.startswith('::1'):
                        logging.info(f"Found IPv6 address: {ipv6}")
                        return ipv6
    except Exception as e:
        logging.warning("get_ipv6_by_ifconfig FAILED, error: %s", str(e))
    return None

def get_ipv6_by_socket():
    """通过 Python socket 库获取本地 IPv6 地址"""
    try:
        # 创建一个 IPv6 socket
        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
        # 连接到一个外部地址(这里使用 Google 的 IPv6 DNS)
        s.connect(('2001:4860:4860::8888', 80))
        # 获取本地地址
        local_addr = s.getsockname()[0]
        s.close()
        return local_addr
    except Exception as e:
        logging.warning("get_ipv6_by_socket FAILED, error: %s", str(e))
        return None

def get_ipv6_by_httpbin():
    """通过 httpbin.org 获取 IPv6 地址"""
    url = 'https://api6.ipify.org?format=json'
    try:
        resp = request.urlopen(url=url, timeout=10).read()
        data = json.loads(resp.decode("utf-8"))
        if 'ip' in data:
            return data['ip']
        return None
    except Exception as e:
        logging.warning("get_ipv6_by_httpbin FAILED, error: %s", str(e))
        return None

def get_ipv6_by_icanhazip():
    """通过 icanhazip.com 获取 IPv6 地址"""
    url = 'https://icanhazip.com'
    try:
        resp = request.urlopen(url=url, timeout=10).read()
        ip = resp.decode("utf-8").strip()
        if regex_ipv6.match(ip):
            return ip
        return None
    except Exception as e:
        logging.warning("get_ipv6_by_icanhazip FAILED, error: %s", str(e))
        return None

def get_ipv6_by_ident_me():
    """通过 ident.me 获取 IPv6 地址"""
    url = 'https://v6.ident.me'
    try:
        resp = request.urlopen(url=url, timeout=10).read()
        ip = resp.decode("utf-8").strip()
        if regex_ipv6.match(ip):
            return ip
        return None
    except Exception as e:
        logging.warning("get_ipv6_by_ident_me FAILED, error: %s", str(e))
        return None

def get_link_local_ipv6():
    """专门获取链路本地 IPv6 地址"""
    try:
        # Windows 系统使用 ipconfig
        if os.name == 'nt':
            cmd = ['ipconfig']
            output = subprocess.check_output(cmd, text=True)
        # Linux/Unix 系统使用 ifconfig
        else:
            cmd = ['ifconfig']
            output = subprocess.check_output(cmd, text=True)
            
        # 按行分割输出
        lines = output.split('\n')
        for line in lines:
            # 查找包含 inet6 的行
            if 'inet6' in line:
                # 提取 IPv6 地址和 prefixlen
                if 'prefixlen 64' in line and 'scopeid 0x20<link>' in line:
                    # 调试输出
                    logging.debug(f"Processing line: {line}")
                    
                    # 使用特定正则表达式提取链路本地 IPv6 地址
                    matches = regex_link_local_ipv6.findall(line)
                    if matches:
                        ipv6 = matches[0]
                        logging.info(f"Found link-local IPv6 address with new regex: {ipv6}")
                        return ipv6
                    
                    # 如果特定正则表达式没有匹配到,尝试使用一般性正则表达式
                    matches = regex_ipv6.findall(line)
                    if matches:
                        ipv6 = matches[0]
                        logging.debug(f"Original regex matched: {ipv6}")
                        # 只返回链路本地地址
                        if ipv6.startswith('fe80'):
                            logging.info(f"Found link-local IPv6 address with original regex: {ipv6}")
                            return ipv6
                        elif 'fe80' in line:
                            # 如果行中包含fe80但匹配失败,记录错误
                            logging.warning(f"Regex failed to match fe80 in: {line}")
    except Exception as e:
        logging.warning("get_link_local_ipv6 FAILED, error: %s", str(e))
    return None

# 测试
if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    
    # 测试特定的 IPv6 地址匹配
    test_line = "inet6 fe80::e4d:e9ff:fec9:9de3  prefixlen 64  scopeid 0x20<link>"
    print("Testing regex with line:", test_line)
    
    # 测试链路本地特定正则
    matches = regex_link_local_ipv6.findall(test_line)
    if matches:
        print("Link-local regex matched:", matches[0])
    else:
        print("Link-local regex failed to match")
    
    # 测试一般 IPv6 正则
    matches = regex_ipv6.findall(test_line)
    if matches:
        print("General IPv6 regex matched:", matches[0])
    else:
        print("General IPv6 regex failed to match")
    
    print("\n--- Regular program output ---")
    print("Method 1 (httpbin):", get_ipv6_by_httpbin())
    print("Method 2 (icanhazip):", get_ipv6_by_icanhazip())
    print("Method 3 (ident.me):", get_ipv6_by_ident_me())
    print("Method 4 (ifconfig):", get_ipv6_by_ifconfig())
    print("Method 5 (socket):", get_ipv6_by_socket())
    print("Link-local IPv6:", get_link_local_ipv6())

获取到 v6 地址,就可以通过 tplink 的映射代码update_ipv6_nat_mapping进行地址映射了。

如果要用这个代码,需要根据自己的路由器配置获取映射的 id。

整体来说,速度还是可以的: 

 

The post 是 IPv6 吖 — V6 重生记 appeared first on obaby@mars.

❌
❌