#!/usr/bin/env python3 """ 并发选区压测脚本 功能: - 批量生成登录链接(拿到 codeNo) - 并发对这些 codeNo 执行“选区”操作 - 统计是否出现多个 codeNo 选到同一设备(设备串号) 使用前请在“配置区”按实际接口改动: - BASE_URL、GENERATE_PATH、SELECT_PATH - 请求方法/参数/头部及字段名(CODE_NO_FIELD、DEVICE_FIELD、STATUS_FIELD) 运行示例: python scripts/concurrency_region_select.py --base http://localhost:18080 --gen-path /api/link/generate --sel-path /api/region/select --links 100 --concurrency 100 --region Q 依赖:pip install httpx anyio """ from __future__ import annotations import argparse import asyncio import json import random import sys from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import anyio import httpx # ===================== 配置区(可通过命令行覆盖) ===================== # 响应字段名 CODE_NO_FIELD = "codeNo" # 生成链接接口返回的 codeNo 字段 DEVICE_FIELD = "device" # 选区接口返回的 设备字段 STATUS_FIELD = "status" # 选区接口返回的 状态字段(可选) # 生成链接:请求方法与构造(默认 POST,无体) DEFAULT_GEN_METHOD = "POST" # 可选:GET/POST DEFAULT_GEN_BODY: Optional[Dict[str, Any]] = None # 如需传 body,可在此设置 DEFAULT_GEN_HEADERS: Dict[str, str] = {} # 如需鉴权:{"Authorization": "Bearer ..."} # 选区:请求方法与构造(默认 POST,body 含 codeNo 与 region) DEFAULT_SEL_METHOD = "POST" DEFAULT_SEL_HEADERS: Dict[str, str] = {} # ===================== 实现 ===================== @dataclass class Config: base_url: str generate_path: str select_path: str links: int concurrency: int region: str gen_method: str = DEFAULT_GEN_METHOD sel_method: str = DEFAULT_SEL_METHOD gen_body: Optional[str] = None # JSON 字符串,命令行传入后解析 gen_headers: Optional[str] = None # JSON 字符串 sel_headers: Optional[str] = None # JSON 字符串 timeout: float = 10.0 jitter_ms: int = 10 # 并发抖动,避免完全同一时刻 def parsed_gen_body(self) -> Optional[Dict[str, Any]]: return json.loads(self.gen_body) if self.gen_body else DEFAULT_GEN_BODY def parsed_gen_headers(self) -> Dict[str, str]: if self.gen_headers: return json.loads(self.gen_headers) return DEFAULT_GEN_HEADERS def parsed_sel_headers(self) -> Dict[str, str]: if self.sel_headers: return json.loads(self.sel_headers) return DEFAULT_SEL_HEADERS async def generate_links(cfg: Config, client: httpx.AsyncClient) -> List[str]: """并发生成链接,返回 codeNo 列表。""" url = cfg.base_url.rstrip("/") + cfg.generate_path sem = anyio.Semaphore(cfg.concurrency) code_nos: List[Optional[str]] = [None] * cfg.links async def one(idx: int): async with sem: # 抖动 await anyio.sleep(random.random() * cfg.jitter_ms / 1000.0) try: if cfg.gen_method.upper() == "GET": resp = await client.get(url, headers=cfg.parsed_gen_headers(), timeout=cfg.timeout) else: resp = await client.post(url, headers=cfg.parsed_gen_headers(), json=cfg.parsed_gen_body(), timeout=cfg.timeout) resp.raise_for_status() data = resp.json() code_no = data.get(CODE_NO_FIELD) if not code_no: # 尝试从 URL 中解析(如果返回的是完整 link) # 例如: ...?codeNo=XXXX link = data if isinstance(data, str) else data.get("link") if isinstance(data, dict) else None if isinstance(link, str) and "codeNo=" in link: code_no = link.split("codeNo=")[-1].split("&")[0] if not code_no: raise ValueError(f"生成链接响应缺少 {CODE_NO_FIELD} 字段: {data}") code_nos[idx] = code_no except Exception as e: print(f"[ERR] 生成链接失败 idx={idx}: {e}") async with anyio.create_task_group() as tg: for i in range(cfg.links): tg.start_soon(one, i) ok = [c for c in code_nos if c] if len(ok) != cfg.links: print(f"[WARN] 生成成功 {len(ok)}/{cfg.links}") return ok async def select_region_for_codes(cfg: Config, client: httpx.AsyncClient, code_nos: List[str]) -> List[Tuple[str, Optional[str], Optional[str], int]]: """并发对 codeNos 执行选区,返回 (codeNo, device, status, http_status) 列表。""" url = cfg.base_url.rstrip("/") + cfg.select_path sem = anyio.Semaphore(cfg.concurrency) results: List[Optional[Tuple[str, Optional[str], Optional[str], int]]] = [None] * len(code_nos) def build_select_body(code_no: str) -> Dict[str, Any]: # 按需修改为你的服务契约 return {"codeNo": code_no, "region": cfg.region} async def one(idx: int, code_no: str): async with sem: await anyio.sleep(random.random() * cfg.jitter_ms / 1000.0) try: if cfg.sel_method.upper() == "GET": # GET 方式:拼接 query params = {"codeNo": code_no, "region": cfg.region} resp = await client.get(url, headers=cfg.parsed_sel_headers(), params=params, timeout=cfg.timeout) else: body = build_select_body(code_no) resp = await client.post(url, headers=cfg.parsed_sel_headers(), json=body, timeout=cfg.timeout) status_code = resp.status_code device = None status = None try: data = resp.json() if isinstance(data, dict): device = data.get(DEVICE_FIELD) status = data.get(STATUS_FIELD) except Exception: pass results[idx] = (code_no, device, status, status_code) except Exception as e: print(f"[ERR] 选区失败 codeNo={code_no}: {e}") results[idx] = (code_no, None, None, 0) async with anyio.create_task_group() as tg: for i, c in enumerate(code_nos): tg.start_soon(one, i, c) return [r for r in results if r] def analyze_collisions(pairs: List[Tuple[str, Optional[str], Optional[str], int]]) -> None: # 汇总设备 -> codeNos device_map: Dict[str, List[str]] = {} success = 0 http_ok = 0 for code_no, device, status, http_status in pairs: if http_status == 200: http_ok += 1 if device: success += 1 device_map.setdefault(device, []).append(code_no) total = len(pairs) print("\n===== 结果统计 =====") print(f"请求总数: {total}") print(f"HTTP 200: {http_ok}") print(f"解析出设备: {success}") collisions = {d: cs for d, cs in device_map.items() if len(cs) > 1} if collisions: print("\n[ALERT] 检测到设备串号(同一设备被多个 codeNo 绑定):") for device, codes in sorted(collisions.items(), key=lambda x: len(x[1]), reverse=True): sample = ", ".join(codes[:10]) + (" ..." if len(codes) > 10 else "") print(f"- device={device}: {len(codes)} codeNos -> {sample}") else: print("\n[OK] 未发现同一设备被多个 codeNo 同时绑定的情况。") async def main_async(cfg: Config) -> int: async with httpx.AsyncClient(http2=False) as client: print(f"生成链接 {cfg.links} 个 ...") code_nos = await generate_links(cfg, client) if not code_nos: print("未成功生成 codeNo,退出。") return 2 print(f"开始并发选区(并发={cfg.concurrency}, region={cfg.region})...") pairs = await select_region_for_codes(cfg, client, code_nos) analyze_collisions(pairs) return 0 def parse_args(argv: List[str]) -> Config: p = argparse.ArgumentParser(description="并发选区压测脚本") p.add_argument("--base", dest="base_url", required=True, help="服务基础地址,如 http://localhost:8080") p.add_argument("--gen-path", dest="generate_path", required=True, help="生成链接接口路径,如 /api/link/generate") p.add_argument("--sel-path", dest="select_path", required=True, help="选区接口路径,如 /api/region/select") p.add_argument("--links", dest="links", type=int, default=100, help="生成链接数量") p.add_argument("--concurrency", dest="concurrency", type=int, default=100, help="并发度(同时进行的请求数)") p.add_argument("--region", dest="region", default="CN", help="选区标识,如 CN/US/EU") p.add_argument("--gen-method", dest="gen_method", default=DEFAULT_GEN_METHOD, choices=["GET", "POST"], help="生成链接的 HTTP 方法") p.add_argument("--sel-method", dest="sel_method", default=DEFAULT_SEL_METHOD, choices=["GET", "POST"], help="选区的 HTTP 方法") p.add_argument("--gen-body", dest="gen_body", help="生成链接的 JSON 请求体(字符串)") p.add_argument("--gen-headers", dest="gen_headers", help="生成链接的请求头(JSON字符串)") p.add_argument("--sel-headers", dest="sel_headers", help="选区的请求头(JSON字符串)") p.add_argument("--timeout", dest="timeout", type=float, default=10.0, help="单请求超时时间(秒)") p.add_argument("--jitter-ms", dest="jitter_ms", type=int, default=10, help="请求抖动(毫秒)") args = p.parse_args(argv) return Config( base_url=args.base_url, generate_path=args.generate_path, select_path=args.select_path, links=args.links, concurrency=args.concurrency, region=args.region, gen_method=args.gen_method, sel_method=args.sel_method, gen_body=args.gen_body, gen_headers=args.gen_headers, sel_headers=args.sel_headers, timeout=args.timeout, jitter_ms=args.jitter_ms, ) def main() -> int: cfg = parse_args(sys.argv[1:]) try: return asyncio.run(main_async(cfg)) except KeyboardInterrupt: print("用户中断。") return 130 if __name__ == "__main__": raise SystemExit(main())