普通视图

发现新文章,点击刷新页面。
昨天以前首页

南墙 WAF 系列(二)– 网站证书自动更新

2025年4月2日 14:48

相对管理后台的 ssl来说,其实网站的 ssl 证书才是正事,毕竟这个关系到网站的访问。按照官方的说法在开放 80 端口的情况下,南墙可以自动申请更新证书,不过后台没找到配置的地方,我的 v4 的 80 也是不通的,所以就需要自己去维护管理证书了。

然而,上午在问了管理之后,得到的答复是没有 api,可以自己抓包进行修改。

嗐,这么看来其实也没啥,最起码说明后台的 api 接口是可以直接拿来用的。即使是有 api 文档,也是得自己去看,去写,没有的话 curl 抓包一样能解决问题。按照之前的方法,只直接复制 curl 给 cursor 就可以了。

api 文件baby_nanqiang_api_tools.py内容:

#!/usr/bin/env python3
import requests
import json
import jwt
from datetime import datetime
import os
import urllib3

# 禁用 SSL 验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class NanQiangAPI:
    def __init__(self, base_url="https://lang.bi:443"):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.verify = False  # 忽略SSL证书验证
        self.token = None
        self._setup_headers()

    def _setup_headers(self):
        """设置请求头"""
        self.headers = {
            'accept': 'application/json, text/plain, */*',
            'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
            'cache-control': 'no-cache',
            'content-type': 'application/json',
            'origin': self.base_url,
            'pragma': 'no-cache',
            'priority': 'u=1, i',
            'referer': f'{self.base_url}/',
            'sec-ch-ua': '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"macOS"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-origin',
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
        }

    def _update_headers_with_token(self):
        """更新请求头,添加token"""
        if self.token:
            self.headers['Authorization'] = self.token  # 直接使用token,不添加'Bearer '前缀

    def delete_cert(self, cert_id):
        """
        删除指定ID的证书
        :param cert_id: 证书ID
        :return: 删除结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/{cert_id}"

        try:
            response = self.session.delete(
                url,
                headers=self.headers
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"删除证书失败: {response_data['err']}")
                return None
            
            # 检查删除是否成功
            if response_data.get('result') == 'success' and response_data.get('RowsAffected') > 0:
                print(f"证书 {cert_id} 删除成功")
                return True
            else:
                print(f"证书 {cert_id} 删除失败: 未找到证书或删除操作未生效")
                return False
            
        except requests.exceptions.RequestException as e:
            print(f"删除证书请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def parse_cert_list(self, cert_list):
        """
        解析证书列表数据
        :param cert_list: 证书列表数据
        :return: 解析后的证书信息列表
        """
        if not cert_list:
            return None

        parsed_certs = []
        for cert in cert_list:
            try:
                # 解析SNI字段(JSON字符串)
                sni_list = json.loads(cert.get('sni', '[]'))
                
                parsed_cert = {
                    'id': cert.get('id'),
                    'sni': sni_list,
                    'expire_time': cert.get('expire_time'),
                    'update_time': cert.get('update_time')
                }
                parsed_certs.append(parsed_cert)
            except json.JSONDecodeError as e:
                print(f"解析SNI字段失败: {str(e)}")
                continue
            except Exception as e:
                print(f"解析证书数据失败: {str(e)}")
                continue

        return parsed_certs

    def get_cert_list(self):
        """
        获取证书列表
        :return: 证书列表
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/"

        try:
            response = self.session.get(
                url,
                headers=self.headers
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"获取证书列表失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"获取证书列表请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def login(self, username, password, otp=""):
        """
        登录接口
        :param username: 用户名
        :param password: 密码
        :param otp: 双因素认证码(可选)
        :return: 登录响应
        """
        url = f"{self.base_url}/api/v1/users/login"
        data = {
            "usr": username,
            "pwd": password,
            "otp": otp
        }

        try:
            response = self.session.post(
                url,
                headers=self.headers,
                json=data
            )
            
            # 获取响应数据
            response_data = response.json()
            
            # 检查是否有错误信息
            if 'err' in response_data:
                print(f"登录失败: {response_data['err']}")
                return None
            
            # 保存token
            if 'token' in response_data:
                self.token = response_data['token']
                self._update_headers_with_token()
                
                # # 解析token信息
                # try:
                #     # 使用 jwt.decode 替代 jwt.decode_complete
                #     token_data = jwt.decode(self.token, options={"verify_signature": False})
                #     exp_timestamp = token_data.get('exp')
                #     if exp_timestamp:
                #         exp_date = datetime.fromtimestamp(exp_timestamp)
                #         print(f"Token 有效期至: {exp_date}")
                # except Exception as e:
                #     print(f"无法解析token信息: {str(e)}")
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"登录请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def check_cert(self, cert_content, key_content, mode=0):
        """
        检查证书
        :param cert_content: 证书内容
        :param key_content: 私钥内容
        :param mode: 模式,默认为0
        :return: 检查结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/check"
        
        # 准备multipart/form-data数据
        files = {
            'mode': (None, str(mode)),
            'cert': (None, cert_content),
            'key': (None, key_content)
        }

        try:
            # 临时移除content-type,让requests自动设置
            headers = self.headers.copy()
            headers.pop('content-type', None)
            
            response = self.session.post(
                url,
                headers=headers,
                files=files
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"证书检查失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"证书检查请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def check_cert_from_files(self, cert_file_path, key_file_path, mode=0):
        """
        从文件检查证书
        :param cert_file_path: 证书文件路径
        :param key_file_path: 私钥文件路径
        :param mode: 模式,默认为0
        :return: 检查结果
        """
        try:
            with open(cert_file_path, 'r') as f:
                cert_content = f.read()
            with open(key_file_path, 'r') as f:
                key_content = f.read()
                
            return self.check_cert(cert_content, key_content, mode)
            
        except FileNotFoundError as e:
            print(f"文件不存在: {str(e)}")
            return None
        except Exception as e:
            print(f"读取文件失败: {str(e)}")
            return None

    def submit_cert_config(self, check_result):
        """
        提交证书配置
        :param check_result: 证书检查的结果数据
        :return: 提交结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        if not check_result:
            print("无效的证书检查结果")
            return None

        url = f"{self.base_url}/api/v1/certs/config"
        
        # 准备提交数据
        data = {
            "id": check_result.get("id", 0),
            "sni": check_result.get("sni", "[]"),
            "cert": check_result.get("cert", ""),
            "key": check_result.get("key", ""),
            "expire_time": check_result.get("expire_time", ""),
            "update_time": check_result.get("update_time", "")
        }

        try:
            response = self.session.post(
                url,
                headers=self.headers,
                json=data
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"证书配置提交失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"证书配置提交请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def is_logged_in(self):
        """
        检查是否已登录
        :return: bool
        """
        return self.token is not None

def main():
    # 使用示例
    api = NanQiangAPI()
    
    # 登录信息
    username = "obaby"
    password = "obaby@mars"
    
    # 执行登录
    result = api.login(username, password)
    
    if result:
        print("登录成功:")
        print(json.dumps(result, indent=2, ensure_ascii=False))
        print(f"Token: {api.token}")
        
        # 获取证书列表
        cert_list = api.get_cert_list()
        if cert_list:
            # 解析证书列表
            parsed_certs = api.parse_cert_list(cert_list)
            if parsed_certs:
                print("解析后的证书列表:")
                print(json.dumps(parsed_certs, indent=2, ensure_ascii=False))
                
                # # 删除证书示例
                # cert_id = 4  # 要删除的证书ID
                # delete_result = api.delete_cert(cert_id)
                # if delete_result:
                #     print(f"证书 {cert_id} 删除成功")
                # else:
                #     print(f"证书 {cert_id} 删除失败")
        
        # 证书检查示例
        cert_file = "path/to/cert.pem"
        key_file = "path/to/key.pem"
        
        if os.path.exists(cert_file) and os.path.exists(key_file):
            return
            # 先检查证书
            cert_result = api.check_cert_from_files(cert_file, key_file)
            if cert_result:
                print("证书检查结果:")
                print(json.dumps(cert_result, indent=2, ensure_ascii=False))
                
                # 提交证书配置
                submit_result = api.submit_cert_config(cert_result)
                if submit_result:
                    print("证书配置提交成功:")
                    print(json.dumps(submit_result, indent=2, ensure_ascii=False))
                else:
                    print("证书配置提交失败")
    else:
        print("登录失败")

if __name__ == "__main__":
    main()

账号不要设置动态密码,如果设置了,那就创建一个新账号。

获取证书的脚本参考上一篇文章,对应的路径自己调整。更新证书的代码site_cert_auto_update_tool.py:

#!/usr/bin/env python3
import os
import subprocess
import hashlib
import json
from datetime import datetime
import logging
from baby_nanqiang_api_tools import NanQiangAPI

# Configuration
CERT_SOURCE_DIR = "/root/.acme.sh/h4ck.org.cn_ecc"
CERT_FILE = "fullchain.cer"
KEY_FILE = "h4ck.org.cn.key"
HASH_FILE = "web_cert_hash.json"
CERT_SCRIPT = "get_web_cert.sh"

def setup_logging():
    """设置日志"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('web_cert_update.log'),
            logging.StreamHandler()
        ]
    )

def get_file_hash(file_path):
    """计算文件的SHA-256哈希值"""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def save_cert_hash(cert_hash, key_hash):
    """保存证书和私钥的哈希值到JSON文件"""
    with open(HASH_FILE, 'w') as f:
        json.dump({
            'cert_hash': cert_hash,
            'key_hash': key_hash
        }, f)

def load_cert_hash():
    """从JSON文件加载证书和私钥的哈希值"""
    try:
        with open(HASH_FILE, 'r') as f:
            data = json.load(f)
            return data.get('cert_hash'), data.get('key_hash')
    except (FileNotFoundError, json.JSONDecodeError):
        return None, None

def run_get_cert_script(script_path=None):
    """
    执行获取证书的脚本
    :param script_path: 脚本路径,如果为None则使用默认的get_web_cert.sh
    :return: bool 是否执行成功
    """
    try:
        # 如果没有指定脚本路径,使用默认的get_web_cert.sh
        if script_path is None:
            script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), CERT_SCRIPT)
        
        # 检查脚本是否存在
        if not os.path.exists(script_path):
            logging.error(f"错误: 脚本文件 {script_path} 不存在")
            return False
            
        # 检查脚本是否可执行
        if not os.access(script_path, os.X_OK):
            logging.error(f"错误: 脚本文件 {script_path} 没有执行权限")
            return False
            
        # 执行脚本
        result = subprocess.run(['sh', script_path], 
                              capture_output=True, 
                              text=True)
        
        # 检查执行结果
        if result.returncode == 0:
            logging.info("证书获取脚本执行成功")
            if result.stdout:
                logging.info("脚本输出:\n%s", result.stdout)
            return True
        else:
            logging.error(f"证书获取脚本执行异常,返回码: {result.returncode}")
            if result.stderr:
                logging.error("异常输出:\n%s", result.stderr)
            return True
            
    except Exception as e:
        logging.error(f"执行证书获取脚本时发生错误: {str(e)}")
        return False

def read_file_content(file_path):
    """读取文件内容"""
    try:
        with open(file_path, 'r') as f:
            return f.read()
    except Exception as e:
        logging.error(f"读取文件 {file_path} 失败: {str(e)}")
        return None

def is_cert_expired(expire_time_str):
    """
    检查证书是否过期或即将过期(7天内)
    :param expire_time_str: 过期时间字符串
    :return: bool 是否过期或即将过期
    """
    try:
        expire_time = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S")
        now = datetime.now()
        days_until_expire = (expire_time - now).days
        return days_until_expire <= 7
    except Exception as e:
        logging.error(f"解析过期时间失败: {str(e)}")
        return False

def process_same_sni_certs(api, parsed_certs, current_sni, current_cert_id):
    """
    处理具有相同SNI的证书
    :param api: API实例
    :param parsed_certs: 解析后的证书列表
    :param current_sni: 当前证书的SNI
    :param current_cert_id: 当前证书的ID
    :return: None
    """
    # 筛选出相同SNI的证书
    same_sni_certs = [cert for cert in parsed_certs 
                     if cert['sni'] == current_sni and cert['id'] != current_cert_id]
    
    if not same_sni_certs:
        return
        
    # 按过期时间排序(从早到晚)
    same_sni_certs.sort(key=lambda x: datetime.strptime(x['expire_time'], "%Y-%m-%d %H:%M:%S"))
    
    # 检查是否有过期或即将过期的证书
    for cert in same_sni_certs:
        if is_cert_expired(cert['expire_time']):
            logging.info(f"删除过期证书 ID: {cert['id']}")
            if not api.delete_cert(cert['id']):
                logging.error(f"删除证书 {cert['id']} 失败")
    
    # 检查是否有过期时间相同的证书
    if len(same_sni_certs) > 1:
        # 获取第一个证书的过期时间作为基准
        base_expire_time = same_sni_certs[0]['expire_time']
        
        # 删除过期时间相同的证书(保留第一个)
        for cert in same_sni_certs[1:]:
            if cert['expire_time'] == base_expire_time:
                logging.info(f"删除重复过期时间的证书 ID: {cert['id']}")
                if not api.delete_cert(cert['id']):
                    logging.error(f"删除证书 {cert['id']} 失败")

def main():
    # 设置日志
    setup_logging()
    
    try:
        # 执行证书获取脚本
        if not run_get_cert_script():
            logging.error("获取证书失败,退出程序")
            return
            
        # 检查证书文件是否存在
        cert_path = os.path.join(CERT_SOURCE_DIR, CERT_FILE)
        key_path = os.path.join(CERT_SOURCE_DIR, KEY_FILE)
        
        if not (os.path.exists(cert_path) and os.path.exists(key_path)):
            logging.error("证书文件不存在,退出程序")
            return
            
        # 计算新文件的哈希值
        new_cert_hash = get_file_hash(cert_path)
        new_key_hash = get_file_hash(key_path)
        
        # 获取旧的哈希值
        old_cert_hash, old_key_hash = load_cert_hash()
        
        # 检查文件是否发生变化
        if new_cert_hash != old_cert_hash or new_key_hash != old_key_hash:
            logging.info("证书文件已发生变化,开始更新流程")
            
            # 读取证书和私钥内容
            cert_content = read_file_content(cert_path)
            key_content = read_file_content(key_path)
            
            if not cert_content or not key_content:
                logging.error("读取证书文件失败")
                return
                
            # 初始化API
            api = NanQiangAPI()
            
            # 登录
            if not api.login("obaby", "obaby@mars"):
                logging.error("登录失败")
                return
                
            # 检查证书
            check_result = api.check_cert(cert_content, key_content)
            if not check_result:
                logging.error("证书检查失败")
                return
                
            # 提交证书配置
            if not api.submit_cert_config(check_result):
                logging.error("提交证书配置失败")
                return
                
            # 获取证书列表
            cert_list = api.get_cert_list()
            if not cert_list:
                logging.error("获取证书列表失败")
                return
                
            # 解析证书列表
            parsed_certs = api.parse_cert_list(cert_list)
            if not parsed_certs:
                logging.error("解析证书列表失败")
                return
                
            # 获取当前证书的SNI
            current_sni = check_result.get('sni', '[]')
            try:
                current_sni = json.loads(current_sni)
            except json.JSONDecodeError:
                logging.error("解析当前证书SNI失败")
                return
                
            # 处理相同SNI的证书
            process_same_sni_certs(api, parsed_certs, current_sni, check_result.get('id'))
            
            # 保存新的哈希值
            save_cert_hash(new_cert_hash, new_key_hash)
            logging.info("证书更新完成")
        else:
            logging.info("证书文件未发生变化,无需更新")
            
    except Exception as e:
        logging.error(f"程序执行出错: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()

添加定时任务,每天,或者每几天:

0 2 * * * /usr/bin/python3 /home/soft/baby-nanqiang-cert-tools/site_cert_auto_update_tool.py >> /home/soft/baby-nanqiang-cert-tools/web_cert_manager.log 2>&1

最终效果:

The post 南墙 WAF 系列(二)– 网站证书自动更新 appeared first on obaby@mars.

南墙 WAF 系列(一)– 管理后台证书自动更新

2025年4月2日 10:13

管理后台这个东西,当然可以不在公网暴露,但是如果一旦在公网允许访问配置,此时就会出现一个很尴尬的问题,各种证书错误提示。

在上一篇文章提过,waf 是通过 docker 部署的。相对来说管理倒是也算方便,按照官方文档的说法,管理后台的证书在下面的位置:

南墙管理后台的配置位于/uuwaf/web/conf/config.json中,addr字段值即为ip地址和端口。替换SSL证书可以替换/uuwaf/web/conf/目录中的server.crt和server.key文件,之后执行systemctl restart uuwaf重启服务使配置生效。

那么要更新证书,只需要解决下面几个问题就行了,由于不想付费买证书,那么现在最好的思路就是直接通过 acme.sh 自动申请证书,让后写个小工具自动将相关的文件复制到指定的目录下,重启 docker 服务就可以了。

1.acme.sh 自动申请证书。

a.安装 acme.sh:

curl https://get.acme.sh | sh -s email=my@example.com

b.配置 dnspod的 api key 和 secret(使用子账号):

创建策略,输入以下内容保存:

{
 "statement": [
     {
         "action": [
             "dnspod:DescribeRecordFilterList",
             "dnspod:DescribeRecordList",
             "dnspod:CreateRecord",
             "dnspod:DeleteRecord"
         ],
         "effect": "allow",
         "resource": [
             "*"
         ]
     }
 ],
 "version": "2.0"
}

登录 腾讯云控制台,进入 访问管理 页面,单击左侧菜单栏的 用户列表,进入用户列表页面,并单击新建用户

创建 api 访问账号之后,写一个获取证书的脚本:

export Tencent_SecretId="key"
export Tencent_SecretKey="secret"
"/usr/local/acme.sh"/acme.sh --issue --dns dns_tencent -d lang.bi -d *.lang.bi

到这里第一步就完成了,不过需要注意的事有的扩展名不支持,例如 by,本来想用 oba.by 域名的,结果提示失败了:

sh cert_get.sh 
[Wed Apr  2 08:51:41 AM CST 2025] Using CA: https://acme.zerossl.com/v2/DV90
[Wed Apr  2 08:51:41 AM CST 2025] Account key creation OK.
[Wed Apr  2 08:51:41 AM CST 2025] No EAB credentials found for ZeroSSL, let's obtain them
[Wed Apr  2 08:51:43 AM CST 2025] Registering account: https://acme.zerossl.com/v2/DV90
[Wed Apr  2 08:52:14 AM CST 2025] Registered
[Wed Apr  2 08:52:14 AM CST 2025] ACCOUNT_THUMBPRINT='mri378DxKFRt5hzNd_P7HBLV1zo4c7n1g7HBVNAKG-s'
[Wed Apr  2 08:52:14 AM CST 2025] Creating domain key
[Wed Apr  2 08:52:14 AM CST 2025] The domain key is here: /root/.acme.sh/oba.by_ecc/oba.by.key
[Wed Apr  2 08:52:14 AM CST 2025] Multi domain='DNS:oba.by,DNS:*.oba.by'
[Wed Apr  2 08:52:16 AM CST 2025] Error creating new order. Le_OrderFinalize not found. {"type":"urn:ietf:params:acme:error:rejectedIdentifier","status":400,"detail":"DNS identifier is disallowed [oba.by]"}
[Wed Apr  2 08:52:16 AM CST 2025] Please add '--debug' or '--log' to see more information.
[Wed Apr  2 08:52:16 AM CST 2025] See: https://github.com/acmesh-official/acme.sh/wiki/How-to-debug-acme.sh
h4ck# vim cert_get.sh
h4ck# sh cert_get.sh 
[Wed Apr  2 08:54:34 AM CST 2025] Using CA: https://acme.zerossl.com/v2/DV90
[Wed Apr  2 08:54:34 AM CST 2025] Multi domain='DNS:oba.by,DNS:www.oba.by,DNS:nas.oba.by'
[Wed Apr  2 08:55:23 AM CST 2025] Error creating new order. Le_OrderFinalize not found. {"type":"urn:ietf:params:acme:error:rejectedIdentifier","status":400,"detail":"DNS identifier is disallowed [oba.by]"}
[Wed Apr  2 08:55:23 AM CST 2025] Please add '--debug' or '--log' to see more information.
[Wed Apr  2 08:55:23 AM CST 2025] See: https://github.com/acmesh-official/acme.sh/wiki/How-to-debug-acme.sh

2.复制文件重启服务

至于第二步就更简单了,直接让 cursor 给写一个:

参考下面的内容给我编写一个 Python3 的文件,实现以下功能:首先需要调用 get_cert.sh 脚本,自动获取证书文件文件,如果获取成功,证书会保存在下面的路径:/root/.acme.sh/lang.bi_ecc/fullchain.cer 私钥会保存在下面的路径/root/.acme.sh/lang.bi_ecc/lang.bi.key;获取到这两个文件之后,需要根据判断证书文件是否变化(记录旧证书内容,用于判断文件变更),如果变化则需要更新对应 docker 下的证书和私钥文件;docker 对应container Id 为f7dd0b0a990b,对应的证书文件路径为/uuwaf/web/conf/目录中的server.crt和server.key文件,在替换文件之后,需要重启对应的 docker 容器。按照步骤实现上面的内容,并且完成代码编写

最终代码:

#!/usr/bin/env python3
import os
import subprocess
import hashlib
import json
from pathlib import Path

# Configuration
DOCKER_CONTAINER_ID = "f7dd0b0a990b"
CERT_SOURCE_DIR = "/root/.acme.sh/lang.bi_ecc"
CERT_DEST_DIR = "/uuwaf/web/conf"
CERT_FILE = "fullchain.cer"
KEY_FILE = "lang.bi.key"
DEST_CERT_FILE = "server.crt"
DEST_KEY_FILE = "server.key"
HASH_FILE = "cert_hash.json"
CERT_SCRIPT = "get_cert.sh"

def get_cert_hash(file_path):
    """Calculate SHA-256 hash of a file."""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def save_cert_hash(cert_hash):
    """Save certificate hash to a JSON file."""
    with open(HASH_FILE, 'w') as f:
        json.dump({'cert_hash': cert_hash}, f)

def load_cert_hash():
    """Load certificate hash from JSON file."""
    try:
        with open(HASH_FILE, 'r') as f:
            data = json.load(f)
            return data.get('cert_hash')
    except (FileNotFoundError, json.JSONDecodeError):
        return None

def run_get_cert_script():
    """Run the get_cert.sh script."""
    script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), CERT_SCRIPT)
    
    # Check if script exists
    if not os.path.exists(script_path):
        print(f"Error: {CERT_SCRIPT} not found in the current directory")
        print(f"Expected path: {script_path}")
        return False
    
    # Check if script is executable
    if not os.access(script_path, os.X_OK):
        print(f"Error: {CERT_SCRIPT} is not executable")
        print("Attempting to make it executable...")
        try:
            os.chmod(script_path, 0o755)
            print("Successfully made the script executable")
        except Exception as e:
            print(f"Failed to make script executable: {str(e)}")
            return False
    
    try:
        # Use absolute path to the script
        result = subprocess.run(['sh', script_path], capture_output=True, text=True)
        if result.returncode == 0:
            print("Certificate generation successful")
            return True
        else:
            print(f"Certificate generation ignored: {result.stderr}")
            return True
    except Exception as e:
        print(f"Error running {CERT_SCRIPT}: {str(e)}")
        return False

def copy_cert_files():
    """Copy certificate files to Docker container."""
    try:
        # Copy certificate
        subprocess.run([
            'docker', 'cp',
            f"{CERT_SOURCE_DIR}/{CERT_FILE}",
            f"{DOCKER_CONTAINER_ID}:{CERT_DEST_DIR}/{DEST_CERT_FILE}"
        ], check=True)
        
        # Copy private key
        subprocess.run([
            'docker', 'cp',
            f"{CERT_SOURCE_DIR}/{KEY_FILE}",
            f"{DOCKER_CONTAINER_ID}:{CERT_DEST_DIR}/{DEST_KEY_FILE}"
        ], check=True)
        
        print("Certificate files copied successfully")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error copying files: {str(e)}")
        return False

def restart_docker_container():
    """Restart the Docker container."""
    try:
        subprocess.run(['docker', 'restart', DOCKER_CONTAINER_ID], check=True)
        print("Docker container restarted successfully")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error restarting container: {str(e)}")
        return False

def main():
    # Step 1: Run get_cert.sh script
    if not run_get_cert_script():
        print("Failed to generate certificates")
        return

    # Step 2: Check if certificate files exist
    cert_path = os.path.join(CERT_SOURCE_DIR, CERT_FILE)
    key_path = os.path.join(CERT_SOURCE_DIR, KEY_FILE)
    
    if not (os.path.exists(cert_path) and os.path.exists(key_path)):
        print("Certificate files not found")
        return

    # Step 3: Calculate new certificate hash
    new_cert_hash = get_cert_hash(cert_path)
    old_cert_hash = load_cert_hash()

    # Step 4: Check if certificate has changed
    if new_cert_hash != old_cert_hash:
        print("Certificate has changed, updating...")
        
        # Copy new certificate files
        if copy_cert_files():
            # Restart Docker container
            if restart_docker_container():
                # Save new certificate hash
                save_cert_hash(new_cert_hash)
                print("Certificate update completed successfully")
            else:
                print("Failed to restart Docker container")
        else:
            print("Failed to copy certificate files")
    else:
        print("Certificate has not changed, no update needed")

if __name__ == "__main__":
    main()

除了运行sh 脚本有点问题,需要改一下,其他的基本都没啥问题。最终执行效果:

此时刷新页面,一切就都 ok 了:

The post 南墙 WAF 系列(一)– 管理后台证书自动更新 appeared first on obaby@mars.

418 I’m a teapot

2025年3月20日 09:57

这几天偶尔会折腾下 wp 的插件和数据的问题,更新了一下。偶然间充新打开了 wp 邮件的发送报告,看了下发送记录每天几千封邮件,一大半是失败的。

猜测肯定就是各种 bot 尝试注册账号导致发送注册确认邮件失败了。

看了下用户表,结果这个用户数量的确是无敌了。如果闺蜜圈目前能有这么多用户就好了。

昨天晚上回去连上数据库,直接代码删除了未激活用户,然后令人崩溃的事情出现了,博客直接挂了。哈哈哈。好在之前备份了数据库,直接数据回滚,不过问题就是丢失了部分评论数据。

另外一个问题就是 oba.by 的cdn 感觉也有问题,直接红了一大片:

这个目前自己解决不了,只能等别人解决啦。

说到这里,过年的时候喵呜酱免费赞助了一年的 cdn,现在另外一个域名 obaby.org.cn 是是用的这个 cdn,感兴趣的可以看一下。

网址:https://waf.pro

我用的国外的节点,整体感觉速度还算可以。

今天早上在看昨天红了一片的域名的时候发现了另外一个神奇的玩意儿,右侧有个图片不显示了:

调试发现了这么个错误:418 I’m a teapot

神 tm teapot。哈哈哈,来了解下这个 teapot:

HTTP错误代码418表示服务器拒绝尝试用一个茶壶冲泡咖啡,这一代码在1998年作为愚人节笑话被加入到超文本传输协议(HTTP)中。418 I’m a teapot 是由一份名为 “Hyper Text Coffee Pot Control Protocol”(HTCPCP)的网络通信协议草案引入的,该协议用来控制、监视和诊断咖啡壶。尽管418是个玩笑,但它在某些软件和服务中得到了实现,成为了一个文化现象,并在互联网社区被反复提及。

一、418 I’M A TEAPOT 的起源

HTTP状态码418的典故源自一个名为HTCPCP的愚人节笑话文档,RFC 2324,它在1998年由互联网工程任务组(IETF)发布。HTCPCP以幽默的方式描述了控制咖啡壶的扩展协议,而418则是在此基础上增加的一个状态代码,介绍一个遵循HTCPCP的茶壶是如何处理咖啡冲泡请求的。这个错误代码是对通常用于网络通信的严肃的规范文档的一种戏仿。

该协议详细说明了如何通过网络给咖啡壶发送指令,例如BREW或POST命令用以开始咖啡冲泡过程。418状态码成为了这个协议的一部分,象征着一个茶壶无法完成冲泡咖啡的任务,因为它是用来烧水做茶用的。

二、418状态码的实际应用

虽然在HTTP中定义为一个梗,418 I’m a teapot 状态码确实在某些实现中被采用了。例如,一些网页服务器和HTTP库支持返回这个玩笑的状态码。某些API和网络服务为了幽默或彩蛋也会使用418状态码回应特定的请求。为了庆祝这个状态码以及互联网文化的一部分,一些开发者甚至会在愚人节这一天专门返回418状态码。

回馈这个文化现象,418状态码逐渐成为了一个有趣的象征,指代那些不按常理出牌或者需要特殊处理的情况。它表明互联网社区对于HTTP协议和其他表面严肃的标准中潜入的幽默元素表示赞赏。

三、418状态码的文化意义

418状态码超出了它最初的设定,成为了互联网文化中的一块标志。它体现了互联网社区的幽默感和对传统技术规范的一种轻松对待方式。这个状态码经常被用来表明对规范的玩世不恭态度,或者是为了给用户带来一点乐趣。它代表了一个防止技术沦为无聊和机械过程的行动,鼓励人们在日常的技术工作中保持创造力和幽默感。

相关问答FAQs:

【FAQ 1】:什么是Http错误代码418?它有什么特殊典故或意义吗?

Http错误代码418是一种非常少见且有趣的错误代码,它指代“我是一个茶壶”(I’m a teapot)。这个错误代码的起源可以追溯到1998年,当时IETF(Internet工程任务组)的一位成员在一个恶搞提案中加入了这个错误代码作为一种玩笑。

【FAQ 2】:Http错误代码418的含义是什么?为什么会用“我是一个茶壶”来表示错误?

Http错误代码418的含义是表明服务器拒绝请求,因为它是一个茶壶,并且无法咖啡加热或者冲泡咖啡。虽然这个错误代码在实际应用中并没有太多意义,但它被创造出来主要是为了幽默和调侃,给人们一些欢乐和轻松的氛围。

【FAQ 3】:Http错误代码418在实际应用中有什么作用?哪些网站或服务曾经使用过它?

实际上,Http错误代码418并不被广泛使用,因为它并没有特别实用的作用。一些网站或服务曾经在特殊的场合使用过它来制造一些幽默和趣味性,例如谷歌曾在愚人节时将其用作错误页。

虽然Http错误代码418并不常见且不具有重要实际功能,但它作为一种有趣的元素,赋予了网络世界更多的乐趣和创意。

但是,这个图片正常访问是可以打开的,为什么会返回茶壶呢?

目前没修复这个错误,感兴趣的可以访问看看:https://oba.by

20250321更新:

知道为什么会是茶壶啦。昨天博客又被打了,需要滑动认证,在没认证的时候,cdn 就返回了个茶壶,哈哈哈

The post 418 I’m a teapot appeared first on obaby@mars.

再谈 Python自动生成 pdf 文件

2025年3月5日 17:25

pdf 这个东西,不得不说,真的是受人欢迎,任何时候下载个文件或者报告之类的,都想弄个 pdf 版本。当然,这个东西的好处是不管在哪里看,样式基本都是一样的。

然而,缺点也很明显,没有办法直接生成 pdf 文件,当然,通过各种库可以直接将图片转为 pdf。然而,对于复杂格式或者需要使用模板来创建 pdf 的时候,就变得有些麻烦了。

def converImageToPdf(img_list):
    pdf = fitz.open() PyMuPDF
    pdf_document = fitz.open()  Creates a new PDF
    #遍历图片文件夹中的所有图片文件
    for img_url in img_list:
        img_local_file = download_image(img_url, 'confirmd_images')
        img_path = os.path.join(img_folder, img_file)
        img = fitz.open(img_local_file)
        img_rect = img[0].rect  Get the rectangle of the first page of the image
    #Create a new page with the same dimensions as the image
        pdf_page = pdf_document.new_page(width=img_rect.width, height=img_rect.height)
    #Insert the image into the new page
        pdf_page.insert_image(pdf_page.rect, filename=img_local_file)
    #保存PDF文件
        img.close()
    
    file_name = random_file_name('pdf')
    if not os.path.exists('confirmd_receipt'):
        os.mkdir('confirmd_receipt')
    pdf_document.save(os.path.join('confirmd_receipt/') + file_name)
    pdf_document.close()

依赖于fitz

pip install fitz

之前写过基于 oss 的:

Python生成Pdf报告

那么没有 oss 呢?其实此时最简单的办法就是基于 liboffice 了。

# 安装 LibreOffice(Ubuntu/Debian)
sudo apt-get install libreoffice

# 验证安装
libreoffice --version

代码:

import subprocess
import os

def convert_to_pdf(input_docx, output_dir):
    try:
        # 创建输出目录(如果不存在)
        os.makedirs(output_dir, exist_ok=True)
        
        # 执行转换命令
        cmd = [
            'libreoffice', '--headless', '--convert-to', 'pdf',
            '--outdir', output_dir, input_docx
        ]
        result = subprocess.run(cmd, check=True, capture_output=True, text=True)
        
        print(f"转换成功: {input_docx} → {output_dir}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"转换失败: {e.stderr}")
        return False
    except Exception as e:
        print(f"发生错误: {str(e)}")
        return False

# 使用示例
convert_to_pdf(
    input_docx="/path/to/document.docx",
    output_dir="/path/to/output"
)

不多此时大概率得到的 PDF 文件会是乱码:

这一堆框就很专业,应该是没有字体导致的,安装字体文件:

# Ubuntu/Debian
sudo apt-get install fonts-wqy-zenhei fonts-wqy-microhei fonts-noto-cjk

# CentOS/RHEL
sudo yum install wqy-zenhei-fonts wqy-microhei-fonts google-noto-cjk-fonts

# 刷新字体缓存
sudo fc-cache -fv

验证安装:

# 查看已安装的中文字体
fc-list :lang=zh | grep -E "WenQuanYi|Noto"

# 预期输出示例(显示已安装字体路径):
# /usr/share/fonts/truetype/wqy/wqy-zenhei.ttc: WenQuanYi Zen Hei,文泉驛正黑:style=Regular
# /usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc: Noto Sans CJK JP,Noto Sans CJK JP Regular:style=Regular

如果依然有问题,那就修改系统配置:

# 检查当前 locale
locale

# 生成中文环境配置(如未安装)
sudo locale-gen zh_CN.UTF-8

# 临时设置环境变量(测试用)
export LANG=zh_CN.UTF-8

# 永久设置(修改 /etc/default/locale)
sudo nano /etc/default/locale
# 添加内容:
LANG="zh_CN.UTF-8"
LC_ALL="zh_CN.UTF-8"

或者转换的之后指定字体:

def convert_with_font(input_docx, output_dir):
    cmd = [
        'libreoffice', '--headless',
        '--env:UserInstallation=file:///tmp/libreoffice-altprofile', # 使用独立配置
        '--convert-to', 'pdf:writer_pdf_Export:{"Watermark":{"type":"string","value":" "},'
                         '"SelectPdfVersion":{"type":"long","value":"1"},'
                         '"UseTaggedPDF":{"type":"boolean","value":"true"},'
                         '"ExportBookmarks":{"type":"boolean","value":"true"},'
                         '"EmbedStandardFonts":{"type":"boolean","value":"true"}}',
        '--outdir', output_dir,
        input_docx
    ]
    subprocess.run(cmd, check=True)

此时多数就能解决问题了:

django TimescaleDB

2025年1月17日 15:34

平时项目使用的都是 mysql 数据库,少数时候会用到 mariadb,而至于时序数据库那基本没用过。而现在,对于一些监控数值需要更高的写入效率,查询效率,所以想着迁移到时序数据库上。

搜了一下,推荐的基本都是 timescadb:

TimescaleDB 是一个基于 PostgreSQL 的开源时序数据库扩展。它将 PostgreSQL 的强大功能与时序数据的优化存储和查询相结合,特别适合处理时间序列数据(如传感器数据、监控数据、金融数据等)。TimescaleDB 提供了高性能的时序数据存储、压缩、自动分区(hypertables)和高效的查询功能。

以下是关于 TimescaleDB 的详细介绍,以及如何在 Django 项目中集成和使用它。


TimescaleDB 的核心特性

  1. 基于 PostgreSQL
    • TimescaleDB 是 PostgreSQL 的扩展,因此你可以使用 PostgreSQL 的所有功能(如 ACID 事务、SQL 语法、JSONB 支持等)。
    • 兼容现有的 PostgreSQL 工具和生态系统。
  2. Hypertables
    • TimescaleDB 引入了 hypertables,这是一种自动分区的表结构,专门为时序数据优化。
    • 数据按时间维度自动分区,支持高效的数据插入和查询。
  3. 时间序列优化
    • 支持高效的时间范围查询、降采样(downsampling)、数据压缩和连续聚合(continuous aggregates)。
    • 提供专门的时序函数和窗口函数。
  4. 可扩展性
    • 支持分布式架构(TimescaleDB 2.0+),可以水平扩展以处理大规模数据。
  5. 开源
    • TimescaleDB 是开源的,社区版免费使用,企业版提供额外的高级功能。

而这个东西也提供了一个 django 的粗件:https://pypi.com.cn/project/django-timescaledb/

网上搜一下相关的文章都简单的 1b,但是如果简单的按照文章中的内容操作,很可能直接就卡在数据库连接上了。

建议在开始配置之前不要安装任何的 postgresql 数据库以及客户端,直接参考官方文档:https://docs.timescale.com/self-hosted/latest/install/installation-macos/#install-and-configure-timescaledb-on-postgresql

在开发电脑以及服务器上都要安装相关的postgresql timescaledb插件。否则就会提示找不到相关的组件。

postgres=# CREATE EXTENSION IF NOT EXISTS timescaledb;
ERROR:  could not open extension control file "/usr/share/postgresql/12/extension/timescaledb.control": No such file or directory

同样在开发电脑上也要安装相关的组件,如果安装了其他版本的postgresql删除掉旧版本,或者直接全新安装。

执行下面的命令创建用户以及数据库:

sudo -i -u postgres
psql
CREATE USER data_db WITH PASSWORD '1qaz@WSX';
CREATE DATABASE data_db OWNER data_db;

修改配置文件,让 data_db 用于允许远程登录:

vim /etc/postgresql/12/main/pg_hba.conf
host    data_db             data_db             0.0.0.0/0   md5
root@2gcc5hbhemlhjejc:~# sudo service postgresql restart

创建扩展,并且查看加载情况:

postgres=# CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION
postgres=# \dx
                                                List of installed extensions
    Name     | Version |   Schema   |                                      Description                                      
-------------+---------+------------+---------------------------------------------------------------------------------------
 plpgsql     | 1.0     | pg_catalog | PL/pgSQL procedural language
 timescaledb | 2.17.2  | public     | Enables scalable inserts and complex queries for time-series data (Community Edition)
(2 rows)

如果看到timescaledb 就表示成功了:

安装相关的组件之后,如果要在 django 中使用,还需要安装psycopg2,不建议安装psycopg2-binary因为安装这个东西,在 mac 下同样会爆上面的错误。psycopg2  需要编译安装,安装过程可能会提示找不到 ssl 库,通过brew info openssl 定位 ssl 库位置。

brew info openssl
==> openssl@3: stable 3.4.0 (bottled)
Cryptography and SSL/TLS Toolkit
https://openssl-library.org
Installed
/opt/homebrew/Cellar/openssl@3/3.4.0 (7,236 files, 33.4MB) *
  Poured from bottle using the formulae.brew.sh API on 2024-11-22 at 09:36:14
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/o/openssl@3.rb
License: Apache-2.0
==> Dependencies
Required: ca-certificates ✔
==> Caveats
A CA file has been bootstrapped using certificates from the system
keychain. To add additional certificates, place .pem files in
  /opt/homebrew/etc/openssl@3/certs

and run
  /opt/homebrew/opt/openssl@3/bin/c_rehash
==> Analytics
install: 318,996 (30 days), 1,205,908 (90 days), 4,903,860 (365 days)
install-on-request: 39,060 (30 days), 168,471 (90 days), 637,515 (365 days)
build-error: 6,326 (30 days)

修改.bash_prfile文件添加上述位置:

export LDFLAGS="-L/opt/homebrew/Cellar/openssl@3/3.4.0/lib"
export CPPFLAGS="-I/opt/homebrew/Cellar/openssl@3/3.4.0/include"

再次执行source ~/.bash_profile & pip install psycopg2即可。

剩下的就可以按照https://pypi.com.cn/project/django-timescaledb/里面的步骤实施了。

相关示例工程:https://gitcode.com/gh_mirrors/dj/django-timescaledb

注意连接引擎为timescale.db.backends.postgresql:

'ENGINE': 'timescale.db.backends.postgresql',
        'NAME': 'data_db',
        'USER': 'data_db',
        'PASSWORD': '1qaz@WSX',
        'HOST': '113.125.1.1',
        'PORT': '5432',
    },

 

 

django 直接运行目录下py 文件

2025年1月7日 10:16

为了处理数据,直接写了一个文件用来处理解析数据。然而比较诡异的一点是,使用 pycharm 可以直接运行这个文件,不会报错。但是,如果用命令运行就直接报错了。

上面是 pycharm 的运行效果,下面是直接命令运行的效果。

(venv) PS E:\Pycharm_Projects\powersystem> E:\Pycharm_Projects\powersystem\venv\Scripts\python.exe E:\Pycharm_Projects\powersystem\application\data_process_test.py 
Traceback (most recent call last):
  File "E:\Pycharm_Projects\powersystem\application\data_process_test.py", line 17, in <module>
    django.setup()
  File "E:\Pycharm_Projects\powersystem\venv\lib\site-packages\django\__init__.py", line 19, in setup
    configure_logging(settings.LOGGING_CONFIG, settings.LOGGING)
  File "E:\Pycharm_Projects\powersystem\venv\lib\site-packages\django\conf\__init__.py", line 82, in __getattr__
    self._setup(name)
  File "E:\Pycharm_Projects\powersystem\venv\lib\site-packages\django\conf\__init__.py", line 69, in _setup
    self._wrapped = Settings(settings_module)
  File "E:\Pycharm_Projects\powersystem\venv\lib\site-packages\django\conf\__init__.py", line 170, in __init__
    mod = importlib.import_module(self.SETTINGS_MODULE)
  File "G:\Python3.10.6\lib\importlib\__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'application'

提示的错误信息是找不到 application,但是这个文件是作为 django 的一部分存在的,按理也不需要去设置什么东西。之前的时候不能运行也就算了,但是现在有台服务器在内网,无法链接内网的的数据库进行数据处理,这就比较麻烦。

不过既然 pycharm 能运行,那肯定是有些东西不一样,猜测是 pycharm 将当前的目录加入 lib 目录了。添加下面的代码重新运行。

import os,sys

if __name__ == '__main__':
    # 获取当前脚本所在目录的绝对路径
    current_directory = os.path.abspath(os.path.dirname(__file__))

    # 将当前目录添加到sys.path
    sys.path.append("E:/Pycharm_Projects/powersystem/")
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "application.settings")
    import django

    django.setup()
    label =get_device_label(msg['devvar'])
    new_msg = rebuild_msg(msg, label)
    print(new_msg)

现在一切就 ok 了。

其他的运行脚本方式:

https://django-extensions-zh.readthedocs.io/zh-cn/latest/runscript.html

https://www.jb51.net/article/236739.htm

哪吒监控 1.5.1 自定义头像

2025年1月1日 20:01

对于哪吒监控自带的授权登录,最近不知道是github问题还是什么问题,一直登录失败。醒着直接升级下监控版本,结果,效果很不错,成功升级了。

感觉是换了一个全新的版本,但是这个版本确登录不了。

多次尝试更新之后,终于默认账号登录成功了,但是,伴随而来的是另外的一个问题。所有的监控项都没了,并且首页提示websocket建立失败。查阅文档才发现,新的版本需要将websocket一块进行代理:

server {
    listen 443 ssl http2;
    listen [::]:443 ssl http2;
    # http2 on; # Nginx > 1.25.1,请注释上面两行,启用此行

    server_name dashboard.example.com; # 替换为你的域名
    ssl_certificate          /data/letsencrypt/fullchain.pem; # 域名证书路径
    ssl_certificate_key      /data/letsencrypt/key.pem;       # 域名私钥路径
    ssl_stapling on;
    ssl_session_timeout 1d;
    ssl_session_cache shared:SSL:10m; # 如果与其他配置冲突,请注释此项
    ssl_protocols TLSv1.2 TLSv1.3;

    underscores_in_headers on;
    set_real_ip_from 0.0.0.0/0; # 替换为你的 CDN 回源 IP 地址段
    real_ip_header CF-Connecting-IP; # 替换为你的 CDN 提供的私有 header,此处为 CloudFlare 默认
    # 如果你使用nginx作为最外层,把上面两行注释掉

    # grpc 相关    
    location ^~ /proto.NezhaService/ {
        grpc_set_header Host $host;
        grpc_set_header nz-realip $http_CF_Connecting_IP; # 替换为你的 CDN 提供的私有 header,此处为 CloudFlare 默认
        # grpc_set_header nz-realip $remote_addr; # 如果你使用nginx作为最外层,就把上面一行注释掉,启用此行
        grpc_read_timeout 600s;
        grpc_send_timeout 600s;
        grpc_socket_keepalive on;
        client_max_body_size 10m;
        grpc_buffer_size 4m;
        grpc_pass grpc://dashboard;
    }
    # websocket 相关
    location ~* ^/api/v1/ws/(server|terminal|file)(.*)$ {
        proxy_set_header Host $host;
        proxy_set_header nz-realip $http_cf_connecting_ip; # 替换为你的 CDN 提供的私有 header,此处为 CloudFlare 默认
        # proxy_set_header nz-realip $remote_addr; # 如果你使用nginx作为最外层,就把上面一行注释掉,启用此行
        proxy_set_header Origin https://$host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        proxy_pass http://127.0.0.1:8008;
    }
    # web
    location / {
        proxy_set_header Host $host;
        proxy_set_header nz-realip $http_cf_connecting_ip; # 替换为你的 CDN 提供的私有 header,此处为 CloudFlare 默认
        # proxy_set_header nz-realip $remote_addr; # 如果你使用nginx作为最外层,就把上面一行注释掉,启用此行
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        proxy_buffer_size 128k;
        proxy_buffers 4 256k;
        proxy_busy_buffers_size 256k;
        proxy_max_temp_file_size 0;
        proxy_pass http://127.0.0.1:8008;
    }
}

upstream dashboard {
    server 127.0.0.1:8008;
    keepalive 512;
}

而至于数据丢失,官网也写了:

这就很棒。只能重新添加所有服务器,添加之后感觉一切正常了,但是后台那个头像实在是无法忍受。

头像

尼玛,这苦大仇深的表情,作为小仙女坚决不能忍啊。在杜老师的聊天室,交流了一下,他也不知道怎么改。只能尝试js暴力修改src,代码如下:

var images = document.querySelectorAll('img[alt="obaby"]');
 images.forEach(function(image) {
        console.log(image);
         console.log('first inject to replace avatar!');
        image.src = "https://g.h4ck.org.cn/avatar/3a78942c4ddcda86242f20abdacee082?s=256&d=identicon&r=g";
    });

然而,将这段代码加入到系统设置的自定义代码内,会发现多数情况下都不能调用,只能通过差距js的方式进行调用,并且哪吒还有个问题,那就是对于其他域名下的js会加载失败,所以只能把js放到同一个服务器下。

后台添加配置:

此时js代码多数情况都能正常调用,为了能够加载这个js,需要修改nginx配置文件将js路径加入nginx配置文件:

location /inject/{
alias /home/wwwroot/s.h4ck.org.cn/inject/;
}

此时,顶部的头像已经修改成功了,但是下面的头像还是旧的,就很蛋疼。如果查看页面源码会发现基本都是js绘制的,本身并没有任何的内容。

 

遇事尝试使用nginx的替换功能进行内容替换,

location / {
        proxy_set_header Host $host;
        #proxy_set_header nz-realip $http_cf_connecting_ip; # 替换为你的 CDN 提供的私有 header,此处为 CloudFlare 默认
        proxy_set_header nz-realip $remote_addr; # 如果你使用nginx作为最外层,就把上面一行注释掉,启用此行
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        proxy_buffer_size 128k;
        proxy_buffers 4 256k;
        proxy_busy_buffers_size 256k;
        proxy_max_temp_file_size 0;
        proxy_pass http://127.0.0.1:8008;
sub_filter 'https://api.dicebear.com/7.x/notionists/svg?seed=' 'https://g.h4ck.org.cn/avatar/3a78942c4ddcda86242f20abdacee082?s=256&d=identicon&r=g&name=';
sub_filter_once off;

    }

然而,这种替换方式竟然只有首次生效,不知道是不是nginx配置问题,最终还是回到了直接修改js文件的方法。

拉出app文件,这个东西是编译的的elf文件,看了下字符串长度过长,替换有些麻烦,也没趁手的elf编辑器,就很麻烦。

转换思路,不好改二进制,那就改对应的js文件,编辑/dashboard/assets/index-CeBwNjOv.js 替换内部的https://api.dicebear.com/7.x/notionists/svg?seed=。

同时将文件头的import改为绝对路径,参考https://s.h4ck.org.cn/inject/index-obaby.js:

修改之前的内容替换代码为:

location / {
        proxy_set_header Host $host;
        #proxy_set_header nz-realip $http_cf_connecting_ip; # 替换为你的 CDN 提供的私有 header,此处为 CloudFlare 默认
        proxy_set_header nz-realip $remote_addr; # 如果你使用nginx作为最外层,就把上面一行注释掉,启用此行
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        proxy_buffer_size 128k;
        proxy_buffers 4 256k;
        proxy_busy_buffers_size 256k;
        proxy_max_temp_file_size 0;
        proxy_pass http://127.0.0.1:8008;
sub_filter '/dashboard/assets/index-CeBwNjOv.js' '/inject/index-obaby.js';
sub_filter_once off;

    }

重启nginx

通过下面的命令查看是否支持sub_filter ,如果不支持重新编译nginx:

否则会爆下面的错误:

此时头像就ok啦:

另外一个,就是那个命令窗口,V0版本是个全屏的,这V1弄了个小窗口,看着是真tm蛋疼:

这是纯粹为了难看(手动狗头)?

同样通过js修改页面样式:

setTimeout(function () {

if (window.location.pathname.includes('terminal')) {
        console.log("这段代码将在3秒后执行");
    var terminals = document.getElementsByClassName('xterm-screen');
    for (var i = 0; i < terminals.length; i++) {
        console.log('change screen size');
        terminals[i].setAttribute("style", "width: 941px; height: 900px;")
    }

    var divs = document.getElementsByClassName('max-w-5xl mx-auto');
    for (var i = 0; i < divs.length; i++) {
        console.log('change screen size');
        divs[i].setAttribute("style", "max-width: 100%;max-height: 100%")
    }
}
    // ... 执行其他操作
}, 500); // 3000毫秒等于3秒

在引入的js文件中使用windows.onload无法触发,直接延迟执行即可,调整后效果。

看起来顺眼多了,修改顶部图标以及favicon:

var imgElements = document.getElementsByClassName('h-7 mr-1');
for (var i = 0; i < imgElements.length; i++) {
    imgElements[i].src = 'https://image.h4ck.org.cn/support/uugai.com-1661691272754.png';
}

var images = document.querySelectorAll('img[alt="apple-touch-icon"]');
images.forEach(function (image) {
    console.log(image);
    console.log('change logo!');
    image.src = "https://image.h4ck.org.cn/support/uugai.com-1661691272754.png";
});

var faviconurl = "https://image.h4ck.org.cn/support/uugai.com-1661691272754.png"; /* 替换为自己的 Logo 图片链接 */
var link = document.querySelector("link[rel*='icon']") || document.createElement('link');
link.type = 'image/png';
link.rel = 'shortcut icon';
link.href = faviconurl;
document.getElementsByTagName('head')[0].appendChild(link);

最终效果:

看起来好多了,最终js文件:

https://s.h4ck.org.cn/inject/index-obaby.js

https://s.h4ck.org.cn/inject/avatar.js

哪吒监控-多服务器监控与运维工具

2024年11月14日 13:32

前几天在杜老师的聊天室,聊到那些闲着没事到处攻击人的 cc 狗的问题。杜老师扔出来一个链接,并且说道:可以装个监控玩玩,看实时流量神马的。

于是呢,请杜老师写个教程:

杜老师信誓旦旦,表示晚上八点二十发。然后呢,我等了好几个八点二十了也没看到。虽然昨晚杜老师更新了一篇,但是不是教程,哼唧唧。就离谱啊。

终于,实在忍不住了,然后自己照着官网的文档直接安装了一套:

果然男人都靠不整,哼。

具体教程还是等杜老师更新吧,我就不写了。

不过,在安装过程中发现一个问题,不知道是兼容信问题还是神马问题。在 ubuntu 20.04上,通过独立安装的面板,无法正常启动服务,所以建议还是直接 docker 安装,如果是这个系统的话,安装倒是也简单,一行命令,傻瓜式操作:

curl -L https://gitee.com/naibahq/scripts/raw/main/install.sh -o nezha.sh && chmod +x nezha.sh && sudo CN=true ./nezha.sh

剩下的各种配置就更傻瓜化了,不过安装之后发现一个问题。就是自己的网站首页没有 favicon 的小图标,但是杜老师的有:

要解决这个问题也简单,后台修改前台自定义 css,添加以下代码:

<link rel="icon" type="image/x-icon" href="https://s.h4ck.org.cn/static/logo.svg?v20210804">
<link rel="apple-touch-icon" sizes="180x180" href="https://s.h4ck.org.cn/static/logo.svg?v20210804">

然后就促来啦:

完美,嘻嘻。

WP-UserAgent [纯真增强版] 15.01.01

2024年11月8日 10:45

之前为了下载纯真的ip 地址数据库订阅了他们的公众号,前几天的时候看到推送说什么数据库格式更新了,有了 czdb 的格式,并且提供了各种语言的 sdk。

不过这个东西应该不是最近才推的,因为印象里貌似很久之前就看到皇家园林写的数据库迁移的文章。官方给的sdk 地址是这个:https://github.com/tagphi/czdb_searcher_php

按照文档操作,感觉也不复杂,直接:

composer require czdb/searcher

composer导入,就一行命令的事,但是为了弄个插件,需要在服务器上装这么个东西?那插件安装到别的地方也麻烦啊。想着一次性解决这个问题,直接下载源码,修改导入方式,按照网上的教程一通改,并不好使,最后 还是请教杜郎,才解决了这个问题:

composer

真不错,直接小花花+1.

下载 copmoser 导出的包,直接扔到插件目录下,

因为最终要修改的是 ip2text.php 文件中的convertip函数,所以直接扔到 show-useragent 目录下,在代码中导入代码,并且初始化:

require_once __DIR__ . '/vendor/autoload.php';

use Czdb\DbSearcher;

$v4databasePath = dirname(__FILE__).'/czdb/db/cz88_public_v4.czdb';
$v6databasePath = dirname(__FILE__).'/czdb/db/cz88_public_v6.czdb';

$queryType = 'MEMORY';
$key = 'n2pf2******************==';

// Initialize the DbSearcher with the command line arguments
// $instance = new SomeNamespace\SomeClass();

$v4dbSearcher = new DbSearcher($v4databasePath, $queryType, $key);
$v6dbSearcher = new DbSearcher($v6databasePath, $queryType, $key);

// $dbSearcher = new DbSearcher($databasePath, $queryType, $key);

function convertip($ip) {
    global $v4dbSearcher;
    global $v6dbSearcher;
    try{
        if(strpos($ip, ':') != false){
            $region = $v6dbSearcher->search($ip);
        }else if (strpos($ip, '.')!= false)
        {
            $region = $v4dbSearcher->search($ip);
        }else{
            $region='Unknown';
        }
    }catch (Exception $e) {
        // Handle the exception and inform the user
        $region = 'Exception';
    }
   
    return $region;
}

这里初始化了两个DbSearcher,分别对应 v4 和v6的查询。查询代码也很简单,就上面这几行。

同样,既然有了国家代码,那剩下的就是去掉原来通过接口查询所属国家的问题了,之前用接口是因为qqwry.dat 旧版本没有 v6 的数据,后来也一直没更新,所以归属地现实国旗是通过接口实现的,现在既然 46 都有了,那就可以直接本地解析了,不过比较坑爹的是 v4 的地址是“-”拼接的,v6 的地址感觉是空格,实际上是个制表符’\t’,为了这个制表符废了半天的劲,一直解析不出来,直接头大:

function getCountryName($str) {
    $parts = explode('–', $str);
    $name = count($parts) > 0 ? $parts[0] : '';
    // print($name);
    if (strpos($name, " ")!==false){
        $parts = explode(" ", $str);
        $name = count($parts) > 0 ? $parts[0] : '';
        // print($name);
    }
    if (strpos($name, "\t")!==false){
        $parts = explode("\t", $str);
        $name = count($parts) > 0 ? $parts[0] : '';
        // print($name);
    }
    return $name;
}

之所以解析不出来是最开始的if (strpos($name, “\t”)!==false)用的单引号,后来才发现,单引号下转义字符无效,这尼玛是凭什么啊,果然 php 是最好的语言。

后面就是讲国家名转换为 2 位国家代码了:

function getCountryCode($countryName) {
    $countryMap = array(
        '中国' => 'CN',
        '美国' => 'US',
        '日本' => 'JP',
        '韩国' => 'KR',
        '俄罗斯' => 'RU',
        '法国' => 'FR',
        '德国' => 'DE',
        '英国' => 'GB',
        '意大利' => 'IT',
        '加拿大' => 'CA',
        // 省略部分国家地区
        '瓦利斯和富图纳' => 'WF',
        '也门' => 'YE',
        '赞比亚' => 'ZM',
        '津巴布韦' => 'ZW',
        );
    $countryName = removeWhitespace($countryName);
    $countryCode = 'unknown';
    if (isset($countryMap[$countryName])) {
        $countryCode = $countryMap[$countryName];
    }
    // ; return $countryCode;
    return strtolower($countryCode);
}

到这里改造基本就全部完成了。

更新日志:

= v15.01.01 =
* 替换本地IP归属地查询数据库为纯真CZDB格式
* 替换IPv6归属地查询,替换为本地数据库,去掉查询服务器配置功能
* 鉴于纯真数据库需要授权码,需要去 https://cz88.com/geo-public 获取授权密钥以及数据库文件
* 密钥配置文件,ip2c-text.php $key = 'n2pf2******************pg==';
* 数据库下载之后放入show-useragent\czdb\db 目录下,文件名分别为: cz88_public_v4.czdb cz88_public_v6.czdb

插件安装无法直接使用,请按照下面的步骤操作:

* 需要去 https://cz88.com/geo-public 获取授权密钥以及数据库文件

* 密钥配置文件,ip2c-text.php $key = ‘n2pf2******************pg==’;

* 数据库下载之后放入show-useragent\czdb\db 目录下,文件名分别为: cz88_public_v4.czdb cz88_public_v6.czdb

实际效果:

插件下载地址:

温馨提示: 此处隐藏内容需要发表评论,并且审核通过后才能查看。
(发表评论请勾选 在此浏览器中保存我的显示名称、邮箱地址和网站地址,以便下次评论时使用。
(请仔细检查自己的昵称和评论内容,以免被识别为垃圾评论而导致无法正常审核。)

姐姐,你也不想让别人知道你的秘密吧? — 浅谈 Python 代码加密

2024年8月23日 11:13

像 python 这种非编译型的语言,在代码加密上有这先天性的弱势,虽然java 之类的编译成 jar 依然比较容易反编译回来,但是毕竟也算是提升了那么一点点门槛,再加上混淆神马的,基本就能避免一些入门级的破解了。

但是对于 python 这种,如果发布不想直接让别人看代码,最简单的办法就是打包成二进制。通常的做法就是 py2exe.

官网地址:https://www.py2exe.org

py2exe

py2exe is a Python Distutils extension which converts Python scripts into executable Windows programs, able to run without requiring a Python installation.Spice

Development is hosted on GitHub. You can find the mailing listsvn, and downloads for Python 2 there. Downloads for Python 3 are on PyPI.

py2exe was originally developed by Thomas Heller who still makes contributions. Jimmy Retzlaff, Mark Hammond, and Alberto Sottile have also made contributions. Code contributions are always welcome from the community and many people provide invaluable help on the mailing list and the Wiki.

py2exe is used by BitTorrentSpamBayes, and thousands more – py2exe averages over 5,000 downloads per month.

In an effort to limit Wiki spam, this front page is not editable. Feel free to edit other pages with content relevant to py2exe. You will need an account to edit (but not to read) and your account information will be used only for the purposes of administering this Wiki.

The old py2exe web site is still available until that information has found its way into this wiki.

之前发布的各种美女爬虫基本都是通过 py2exe 打包的,虽然体积比较大,但是整体来说效果还算不错。

但是对于 web 框架,例如 flask django 之类的该怎么打包?这个就稍显麻烦一些了。

搜索一下,也能找到一些工具,例如 https://github.com/amchii/encryptpy 这个东西底层还是通过 cython 来实现的,如果不想使用这个工具,那么直接使用 cython 也是可以的,至于原理,本质上是直接把 py代码编译成了二进制文件。

下面直接用 cython 来实现:

pip install cython

编写编译脚本,叫什么无所谓,这里我的名称是cython_build.py:

from distutils.core import setup
from Cython.Build import cythonize

setup(
    ext_modules=cythonize(["application/settings.py",
                           "PowerManagement/models.py",
                           "PowerManagement/views/meter.py",
                           "PowerManagement/views/meter_remote.py",
                           "PowerManagement/views/substation_picture.py",
                           "PowerManagement/views/circuit.py",
                           ])
)

建议将上面的代码放在项目的根目录下,要处理的 modules 使用相对路径来实现。

通过下面的命令编译 py 文件:

python3 cython_build.py build_ext --inplace

但是上面的代码有个问题,那就是–inplace 并没有吧所有的 so文件放到原来的目录下,编译之后,一些文件放到了项目根目录下:

扩展名为 so 的文件就是编译生成的二进制文件,此时如果直接运行项目会提示各种组件找不到,还需要将处理后的文件复制到原来的目录下:

mv *.so PowerManagement/views/

最后一步就是删除原来的 py 文件:

cd "PowerManagement/views/"
rm  *.py

到这里整个编译流程就算完成了,可以尝试重新启动服务了。

毕竟姐姐,你也不想你的代码被人随便给抄走吧?

❌
❌