Files
game_server/scripts/concurrency_region_select.py

253 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
并发选区压测脚本
功能:
- 批量生成登录链接(拿到 codeNo
- 并发对这些 codeNo 执行“选区”操作
- 统计是否出现多个 codeNo 选到同一设备(设备串号)
使用前请在“配置区”按实际接口改动:
- BASE_URL、GENERATE_PATH、SELECT_PATH
- 请求方法/参数/头部及字段名CODE_NO_FIELD、DEVICE_FIELD、STATUS_FIELD
运行示例:
python scripts/concurrency_region_select.py --base http://localhost:18080 --gen-path /api/link/generate --sel-path /api/region/select --links 100 --concurrency 100 --region Q
依赖pip install httpx anyio
"""
from __future__ import annotations
import argparse
import asyncio
import json
import random
import sys
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import anyio
import httpx
# ===================== 配置区(可通过命令行覆盖) =====================
# 响应字段名
CODE_NO_FIELD = "codeNo" # 生成链接接口返回的 codeNo 字段
DEVICE_FIELD = "device" # 选区接口返回的 设备字段
STATUS_FIELD = "status" # 选区接口返回的 状态字段(可选)
# 生成链接:请求方法与构造(默认 POST无体
DEFAULT_GEN_METHOD = "POST" # 可选GET/POST
DEFAULT_GEN_BODY: Optional[Dict[str, Any]] = None # 如需传 body可在此设置
DEFAULT_GEN_HEADERS: Dict[str, str] = {} # 如需鉴权:{"Authorization": "Bearer ..."}
# 选区:请求方法与构造(默认 POSTbody 含 codeNo 与 region
DEFAULT_SEL_METHOD = "POST"
DEFAULT_SEL_HEADERS: Dict[str, str] = {}
# ===================== 实现 =====================
@dataclass
class Config:
base_url: str
generate_path: str
select_path: str
links: int
concurrency: int
region: str
gen_method: str = DEFAULT_GEN_METHOD
sel_method: str = DEFAULT_SEL_METHOD
gen_body: Optional[str] = None # JSON 字符串,命令行传入后解析
gen_headers: Optional[str] = None # JSON 字符串
sel_headers: Optional[str] = None # JSON 字符串
timeout: float = 10.0
jitter_ms: int = 10 # 并发抖动,避免完全同一时刻
def parsed_gen_body(self) -> Optional[Dict[str, Any]]:
return json.loads(self.gen_body) if self.gen_body else DEFAULT_GEN_BODY
def parsed_gen_headers(self) -> Dict[str, str]:
if self.gen_headers:
return json.loads(self.gen_headers)
return DEFAULT_GEN_HEADERS
def parsed_sel_headers(self) -> Dict[str, str]:
if self.sel_headers:
return json.loads(self.sel_headers)
return DEFAULT_SEL_HEADERS
async def generate_links(cfg: Config, client: httpx.AsyncClient) -> List[str]:
"""并发生成链接,返回 codeNo 列表。"""
url = cfg.base_url.rstrip("/") + cfg.generate_path
sem = anyio.Semaphore(cfg.concurrency)
code_nos: List[Optional[str]] = [None] * cfg.links
async def one(idx: int):
async with sem:
# 抖动
await anyio.sleep(random.random() * cfg.jitter_ms / 1000.0)
try:
if cfg.gen_method.upper() == "GET":
resp = await client.get(url, headers=cfg.parsed_gen_headers(), timeout=cfg.timeout)
else:
resp = await client.post(url, headers=cfg.parsed_gen_headers(), json=cfg.parsed_gen_body(), timeout=cfg.timeout)
resp.raise_for_status()
data = resp.json()
code_no = data.get(CODE_NO_FIELD)
if not code_no:
# 尝试从 URL 中解析(如果返回的是完整 link
# 例如: ...?codeNo=XXXX
link = data if isinstance(data, str) else data.get("link") if isinstance(data, dict) else None
if isinstance(link, str) and "codeNo=" in link:
code_no = link.split("codeNo=")[-1].split("&")[0]
if not code_no:
raise ValueError(f"生成链接响应缺少 {CODE_NO_FIELD} 字段: {data}")
code_nos[idx] = code_no
except Exception as e:
print(f"[ERR] 生成链接失败 idx={idx}: {e}")
async with anyio.create_task_group() as tg:
for i in range(cfg.links):
tg.start_soon(one, i)
ok = [c for c in code_nos if c]
if len(ok) != cfg.links:
print(f"[WARN] 生成成功 {len(ok)}/{cfg.links}")
return ok
async def select_region_for_codes(cfg: Config, client: httpx.AsyncClient, code_nos: List[str]) -> List[Tuple[str, Optional[str], Optional[str], int]]:
"""并发对 codeNos 执行选区,返回 (codeNo, device, status, http_status) 列表。"""
url = cfg.base_url.rstrip("/") + cfg.select_path
sem = anyio.Semaphore(cfg.concurrency)
results: List[Optional[Tuple[str, Optional[str], Optional[str], int]]] = [None] * len(code_nos)
def build_select_body(code_no: str) -> Dict[str, Any]:
# 按需修改为你的服务契约
return {"codeNo": code_no, "region": cfg.region}
async def one(idx: int, code_no: str):
async with sem:
await anyio.sleep(random.random() * cfg.jitter_ms / 1000.0)
try:
if cfg.sel_method.upper() == "GET":
# GET 方式:拼接 query
params = {"codeNo": code_no, "region": cfg.region}
resp = await client.get(url, headers=cfg.parsed_sel_headers(), params=params, timeout=cfg.timeout)
else:
body = build_select_body(code_no)
resp = await client.post(url, headers=cfg.parsed_sel_headers(), json=body, timeout=cfg.timeout)
status_code = resp.status_code
device = None
status = None
try:
data = resp.json()
if isinstance(data, dict):
device = data.get(DEVICE_FIELD)
status = data.get(STATUS_FIELD)
except Exception:
pass
results[idx] = (code_no, device, status, status_code)
except Exception as e:
print(f"[ERR] 选区失败 codeNo={code_no}: {e}")
results[idx] = (code_no, None, None, 0)
async with anyio.create_task_group() as tg:
for i, c in enumerate(code_nos):
tg.start_soon(one, i, c)
return [r for r in results if r]
def analyze_collisions(pairs: List[Tuple[str, Optional[str], Optional[str], int]]) -> None:
# 汇总设备 -> codeNos
device_map: Dict[str, List[str]] = {}
success = 0
http_ok = 0
for code_no, device, status, http_status in pairs:
if http_status == 200:
http_ok += 1
if device:
success += 1
device_map.setdefault(device, []).append(code_no)
total = len(pairs)
print("\n===== 结果统计 =====")
print(f"请求总数: {total}")
print(f"HTTP 200: {http_ok}")
print(f"解析出设备: {success}")
collisions = {d: cs for d, cs in device_map.items() if len(cs) > 1}
if collisions:
print("\n[ALERT] 检测到设备串号(同一设备被多个 codeNo 绑定):")
for device, codes in sorted(collisions.items(), key=lambda x: len(x[1]), reverse=True):
sample = ", ".join(codes[:10]) + (" ..." if len(codes) > 10 else "")
print(f"- device={device}: {len(codes)} codeNos -> {sample}")
else:
print("\n[OK] 未发现同一设备被多个 codeNo 同时绑定的情况。")
async def main_async(cfg: Config) -> int:
async with httpx.AsyncClient(http2=False) as client:
print(f"生成链接 {cfg.links} 个 ...")
code_nos = await generate_links(cfg, client)
if not code_nos:
print("未成功生成 codeNo退出。")
return 2
print(f"开始并发选区(并发={cfg.concurrency}, region={cfg.region}...")
pairs = await select_region_for_codes(cfg, client, code_nos)
analyze_collisions(pairs)
return 0
def parse_args(argv: List[str]) -> Config:
p = argparse.ArgumentParser(description="并发选区压测脚本")
p.add_argument("--base", dest="base_url", required=True, help="服务基础地址,如 http://localhost:8080")
p.add_argument("--gen-path", dest="generate_path", required=True, help="生成链接接口路径,如 /api/link/generate")
p.add_argument("--sel-path", dest="select_path", required=True, help="选区接口路径,如 /api/region/select")
p.add_argument("--links", dest="links", type=int, default=100, help="生成链接数量")
p.add_argument("--concurrency", dest="concurrency", type=int, default=100, help="并发度(同时进行的请求数)")
p.add_argument("--region", dest="region", default="CN", help="选区标识,如 CN/US/EU")
p.add_argument("--gen-method", dest="gen_method", default=DEFAULT_GEN_METHOD, choices=["GET", "POST"], help="生成链接的 HTTP 方法")
p.add_argument("--sel-method", dest="sel_method", default=DEFAULT_SEL_METHOD, choices=["GET", "POST"], help="选区的 HTTP 方法")
p.add_argument("--gen-body", dest="gen_body", help="生成链接的 JSON 请求体(字符串)")
p.add_argument("--gen-headers", dest="gen_headers", help="生成链接的请求头(JSON字符串)")
p.add_argument("--sel-headers", dest="sel_headers", help="选区的请求头(JSON字符串)")
p.add_argument("--timeout", dest="timeout", type=float, default=10.0, help="单请求超时时间(秒)")
p.add_argument("--jitter-ms", dest="jitter_ms", type=int, default=10, help="请求抖动(毫秒)")
args = p.parse_args(argv)
return Config(
base_url=args.base_url,
generate_path=args.generate_path,
select_path=args.select_path,
links=args.links,
concurrency=args.concurrency,
region=args.region,
gen_method=args.gen_method,
sel_method=args.sel_method,
gen_body=args.gen_body,
gen_headers=args.gen_headers,
sel_headers=args.sel_headers,
timeout=args.timeout,
jitter_ms=args.jitter_ms,
)
def main() -> int:
cfg = parse_args(sys.argv[1:])
try:
return asyncio.run(main_async(cfg))
except KeyboardInterrupt:
print("用户中断。")
return 130
if __name__ == "__main__":
raise SystemExit(main())