diff --git a/DEVICE_ALLOCATION_IMPROVEMENTS.md b/DEVICE_ALLOCATION_IMPROVEMENTS.md new file mode 100644 index 0000000..4a8ec10 --- /dev/null +++ b/DEVICE_ALLOCATION_IMPROVEMENTS.md @@ -0,0 +1,130 @@ +# 设备分配并发竞争问题解决方案 + +## 问题描述 + +在游戏平台服务器中发现了设备分配的并发竞争条件问题,导致多个用户同时选区时可能被分配到同一个设备。 + +### 具体表现 +- 两个用户(L789AT98、LPUEL6XM)几乎同时(相差400ms)进行选区操作 +- 都被分配到了相同的设备 `xx1` +- 由于时序问题,冷却机制未能有效防止重复分配 + +## 根本原因分析 + +1. **并发竞争条件**:设备检查和占用不是原子操作 +2. **选择算法单一**:总是选择第一个可用设备,增加了冲突概率 +3. **时序问题**:冷却机制在设备分配之后才生效 +4. **缺少验证机制**:分配后没有冲突检测和回滚 + +## 解决方案 + +### 1. 原子设备分配机制 + +#### 新增方法:`MemoryMachineCooldownService.tryAllocateDevice()` +- **功能**:原子方式检查设备状态并立即加入冷却队列 +- **特点**:使用设备级锁确保线程安全 +- **效果**:防止多个请求同时分配同一设备 + +```java +public boolean tryAllocateDevice(String machineId, String reason, Long linkTaskId) +``` + +#### 新增方法:`MemoryMachineCooldownService.releaseDeviceAllocation()` +- **功能**:释放设备分配(用于失败回滚) +- **验证**:确保只有原分配任务才能释放 + +### 2. 设备分配服务 (DeviceAllocationService) + +#### 核心功能 +- **负载均衡**:随机打乱设备列表,避免总是选择第一个设备 +- **原子分配**:使用新的原子分配方法 +- **冲突检测**:过滤被其他任务占用的设备 +- **验证机制**:分配后双重检查是否存在冲突 + +#### 关键方法 +```java +public String allocateDevice(List availableDevices, Long linkTaskId, String reason) +public boolean validateDeviceAllocation(String deviceId, Long linkTaskId) +public void releaseDeviceAllocation(String deviceId, Long linkTaskId) +``` + +### 3. 增强的安全检查 + +#### 多层防护机制 +1. **分配前检查**:过滤冷却期和被占用设备 +2. **原子分配**:确保分配操作的原子性 +3. **分配后验证**:检测是否存在分配冲突 +4. **异常回滚**:失败时自动释放设备分配 + +#### 异常处理改进 +- 脚本调用失败时自动释放设备分配 +- 数据库更新失败时回滚设备状态 +- 冲突检测失败时完整回滚操作 + +## 改进效果 + +### 性能优化 +- **负载均衡**:设备选择更均匀,减少热点设备竞争 +- **并发安全**:使用设备级锁,避免全局锁竞争 +- **快速失败**:原子检查快速排除不可用设备 + +### 可靠性提升 +- **原子操作**:消除设备分配的竞争条件 +- **冲突检测**:多层验证确保分配唯一性 +- **自动回滚**:异常情况下自动恢复一致状态 + +### 可维护性增强 +- **清晰日志**:详细记录分配过程和冲突检测 +- **模块化设计**:设备分配逻辑独立为专门服务 +- **故障恢复**:支持手动释放设备分配 + +## 使用场景 + +### 首次选区 +1. 获取空闲设备列表 +2. 调用 `DeviceAllocationService.allocateDevice()` 进行原子分配 +3. 进行脚本调用和数据库更新 +4. 验证分配结果,失败时自动回滚 + +### 重复选区 +- 复用已分配设备,不重复分配 +- 验证设备是否仍然可用 + +## 测试验证 + +### 并发测试 +- 模拟多个用户同时选区 +- 验证不会出现设备重复分配 +- 检查冲突检测和回滚机制 + +### 异常测试 +- 脚本调用失败场景 +- 数据库更新失败场景 +- 验证回滚机制有效性 + +## 监控建议 + +### 关键指标 +- 设备分配成功率 +- 冲突检测次数 +- 回滚操作频率 +- 设备利用率分布 + +### 告警设置 +- 设备分配冲突告警 +- 回滚操作频繁告警 +- 设备池不足告警 + +## 注意事项 + +1. **向后兼容**:保留了原有的冷却机制 +2. **性能影响**:增加了验证步骤,但提升了可靠性 +3. **内存管理**:定期清理过期的冷却记录 +4. **并发限制**:基于内存的锁机制,适用于单实例部署 + +## 后续优化建议 + +1. **分布式锁**:如需多实例部署,考虑使用Redis分布式锁 +2. **设备预分配**:预先为用户分配设备,减少实时分配压力 +3. **智能调度**:基于历史数据和设备负载进行智能分配 +4. **故障转移**:设备故障时自动切换到备用设备 diff --git a/scripts/concurrency_region_select.py b/scripts/concurrency_region_select.py new file mode 100644 index 0000000..3db5a76 --- /dev/null +++ b/scripts/concurrency_region_select.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +""" +并发选区压测脚本 + +功能: +- 批量生成登录链接(拿到 codeNo) +- 并发对这些 codeNo 执行“选区”操作 +- 统计是否出现多个 codeNo 选到同一设备(设备串号) + +使用前请在“配置区”按实际接口改动: +- BASE_URL、GENERATE_PATH、SELECT_PATH +- 请求方法/参数/头部及字段名(CODE_NO_FIELD、DEVICE_FIELD、STATUS_FIELD) + +运行示例: + python scripts/concurrency_region_select.py --base http://localhost:18080 --gen-path /api/link/generate --sel-path /api/region/select --links 100 --concurrency 100 --region Q + +依赖:pip install httpx anyio +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import random +import sys +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + +import anyio +import httpx + + +# ===================== 配置区(可通过命令行覆盖) ===================== + +# 响应字段名 +CODE_NO_FIELD = "codeNo" # 生成链接接口返回的 codeNo 字段 +DEVICE_FIELD = "device" # 选区接口返回的 设备字段 +STATUS_FIELD = "status" # 选区接口返回的 状态字段(可选) + +# 生成链接:请求方法与构造(默认 POST,无体) +DEFAULT_GEN_METHOD = "POST" # 可选:GET/POST +DEFAULT_GEN_BODY: Optional[Dict[str, Any]] = None # 如需传 body,可在此设置 +DEFAULT_GEN_HEADERS: Dict[str, str] = {} # 如需鉴权:{"Authorization": "Bearer ..."} + +# 选区:请求方法与构造(默认 POST,body 含 codeNo 与 region) +DEFAULT_SEL_METHOD = "POST" +DEFAULT_SEL_HEADERS: Dict[str, str] = {} + + +# ===================== 实现 ===================== + +@dataclass +class Config: + base_url: str + generate_path: str + select_path: str + links: int + concurrency: int + region: str + gen_method: str = DEFAULT_GEN_METHOD + sel_method: str = DEFAULT_SEL_METHOD + gen_body: Optional[str] = None # JSON 字符串,命令行传入后解析 + gen_headers: Optional[str] = None # JSON 字符串 + sel_headers: Optional[str] = None # JSON 字符串 + timeout: float = 10.0 + jitter_ms: int = 10 # 并发抖动,避免完全同一时刻 + + def parsed_gen_body(self) -> Optional[Dict[str, Any]]: + return json.loads(self.gen_body) if self.gen_body else DEFAULT_GEN_BODY + + def parsed_gen_headers(self) -> Dict[str, str]: + if self.gen_headers: + return json.loads(self.gen_headers) + return DEFAULT_GEN_HEADERS + + def parsed_sel_headers(self) -> Dict[str, str]: + if self.sel_headers: + return json.loads(self.sel_headers) + return DEFAULT_SEL_HEADERS + + +async def generate_links(cfg: Config, client: httpx.AsyncClient) -> List[str]: + """并发生成链接,返回 codeNo 列表。""" + url = cfg.base_url.rstrip("/") + cfg.generate_path + sem = anyio.Semaphore(cfg.concurrency) + code_nos: List[Optional[str]] = [None] * cfg.links + + async def one(idx: int): + async with sem: + # 抖动 + await anyio.sleep(random.random() * cfg.jitter_ms / 1000.0) + try: + if cfg.gen_method.upper() == "GET": + resp = await client.get(url, headers=cfg.parsed_gen_headers(), timeout=cfg.timeout) + else: + resp = await client.post(url, headers=cfg.parsed_gen_headers(), json=cfg.parsed_gen_body(), timeout=cfg.timeout) + resp.raise_for_status() + data = resp.json() + code_no = data.get(CODE_NO_FIELD) + if not code_no: + # 尝试从 URL 中解析(如果返回的是完整 link) + # 例如: ...?codeNo=XXXX + link = data if isinstance(data, str) else data.get("link") if isinstance(data, dict) else None + if isinstance(link, str) and "codeNo=" in link: + code_no = link.split("codeNo=")[-1].split("&")[0] + if not code_no: + raise ValueError(f"生成链接响应缺少 {CODE_NO_FIELD} 字段: {data}") + code_nos[idx] = code_no + except Exception as e: + print(f"[ERR] 生成链接失败 idx={idx}: {e}") + + async with anyio.create_task_group() as tg: + for i in range(cfg.links): + tg.start_soon(one, i) + + ok = [c for c in code_nos if c] + if len(ok) != cfg.links: + print(f"[WARN] 生成成功 {len(ok)}/{cfg.links}") + return ok + + +async def select_region_for_codes(cfg: Config, client: httpx.AsyncClient, code_nos: List[str]) -> List[Tuple[str, Optional[str], Optional[str], int]]: + """并发对 codeNos 执行选区,返回 (codeNo, device, status, http_status) 列表。""" + url = cfg.base_url.rstrip("/") + cfg.select_path + sem = anyio.Semaphore(cfg.concurrency) + results: List[Optional[Tuple[str, Optional[str], Optional[str], int]]] = [None] * len(code_nos) + + def build_select_body(code_no: str) -> Dict[str, Any]: + # 按需修改为你的服务契约 + return {"codeNo": code_no, "region": cfg.region} + + async def one(idx: int, code_no: str): + async with sem: + await anyio.sleep(random.random() * cfg.jitter_ms / 1000.0) + try: + if cfg.sel_method.upper() == "GET": + # GET 方式:拼接 query + params = {"codeNo": code_no, "region": cfg.region} + resp = await client.get(url, headers=cfg.parsed_sel_headers(), params=params, timeout=cfg.timeout) + else: + body = build_select_body(code_no) + resp = await client.post(url, headers=cfg.parsed_sel_headers(), json=body, timeout=cfg.timeout) + status_code = resp.status_code + device = None + status = None + try: + data = resp.json() + if isinstance(data, dict): + device = data.get(DEVICE_FIELD) + status = data.get(STATUS_FIELD) + except Exception: + pass + results[idx] = (code_no, device, status, status_code) + except Exception as e: + print(f"[ERR] 选区失败 codeNo={code_no}: {e}") + results[idx] = (code_no, None, None, 0) + + async with anyio.create_task_group() as tg: + for i, c in enumerate(code_nos): + tg.start_soon(one, i, c) + + return [r for r in results if r] + + +def analyze_collisions(pairs: List[Tuple[str, Optional[str], Optional[str], int]]) -> None: + # 汇总设备 -> codeNos + device_map: Dict[str, List[str]] = {} + success = 0 + http_ok = 0 + for code_no, device, status, http_status in pairs: + if http_status == 200: + http_ok += 1 + if device: + success += 1 + device_map.setdefault(device, []).append(code_no) + + total = len(pairs) + print("\n===== 结果统计 =====") + print(f"请求总数: {total}") + print(f"HTTP 200: {http_ok}") + print(f"解析出设备: {success}") + + collisions = {d: cs for d, cs in device_map.items() if len(cs) > 1} + if collisions: + print("\n[ALERT] 检测到设备串号(同一设备被多个 codeNo 绑定):") + for device, codes in sorted(collisions.items(), key=lambda x: len(x[1]), reverse=True): + sample = ", ".join(codes[:10]) + (" ..." if len(codes) > 10 else "") + print(f"- device={device}: {len(codes)} codeNos -> {sample}") + else: + print("\n[OK] 未发现同一设备被多个 codeNo 同时绑定的情况。") + + +async def main_async(cfg: Config) -> int: + async with httpx.AsyncClient(http2=False) as client: + print(f"生成链接 {cfg.links} 个 ...") + code_nos = await generate_links(cfg, client) + if not code_nos: + print("未成功生成 codeNo,退出。") + return 2 + print(f"开始并发选区(并发={cfg.concurrency}, region={cfg.region})...") + pairs = await select_region_for_codes(cfg, client, code_nos) + analyze_collisions(pairs) + return 0 + + +def parse_args(argv: List[str]) -> Config: + p = argparse.ArgumentParser(description="并发选区压测脚本") + p.add_argument("--base", dest="base_url", required=True, help="服务基础地址,如 http://localhost:8080") + p.add_argument("--gen-path", dest="generate_path", required=True, help="生成链接接口路径,如 /api/link/generate") + p.add_argument("--sel-path", dest="select_path", required=True, help="选区接口路径,如 /api/region/select") + p.add_argument("--links", dest="links", type=int, default=100, help="生成链接数量") + p.add_argument("--concurrency", dest="concurrency", type=int, default=100, help="并发度(同时进行的请求数)") + p.add_argument("--region", dest="region", default="CN", help="选区标识,如 CN/US/EU") + p.add_argument("--gen-method", dest="gen_method", default=DEFAULT_GEN_METHOD, choices=["GET", "POST"], help="生成链接的 HTTP 方法") + p.add_argument("--sel-method", dest="sel_method", default=DEFAULT_SEL_METHOD, choices=["GET", "POST"], help="选区的 HTTP 方法") + p.add_argument("--gen-body", dest="gen_body", help="生成链接的 JSON 请求体(字符串)") + p.add_argument("--gen-headers", dest="gen_headers", help="生成链接的请求头(JSON字符串)") + p.add_argument("--sel-headers", dest="sel_headers", help="选区的请求头(JSON字符串)") + p.add_argument("--timeout", dest="timeout", type=float, default=10.0, help="单请求超时时间(秒)") + p.add_argument("--jitter-ms", dest="jitter_ms", type=int, default=10, help="请求抖动(毫秒)") + + args = p.parse_args(argv) + return Config( + base_url=args.base_url, + generate_path=args.generate_path, + select_path=args.select_path, + links=args.links, + concurrency=args.concurrency, + region=args.region, + gen_method=args.gen_method, + sel_method=args.sel_method, + gen_body=args.gen_body, + gen_headers=args.gen_headers, + sel_headers=args.sel_headers, + timeout=args.timeout, + jitter_ms=args.jitter_ms, + ) + + +def main() -> int: + cfg = parse_args(sys.argv[1:]) + try: + return asyncio.run(main_async(cfg)) + except KeyboardInterrupt: + print("用户中断。") + return 130 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/src/main/java/com/gameplatform/server/service/cooldown/MemoryMachineCooldownService.java b/src/main/java/com/gameplatform/server/service/cooldown/MemoryMachineCooldownService.java new file mode 100644 index 0000000..a866c5b --- /dev/null +++ b/src/main/java/com/gameplatform/server/service/cooldown/MemoryMachineCooldownService.java @@ -0,0 +1,381 @@ +package com.gameplatform.server.service.cooldown; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 基于内存的机器冷却服务 - 高性能版本 + * 实现同一台机器在10分钟内不会重复调用的机制 + * + * 优化点: + * 1. 纯内存操作,无数据库IO,性能极高 + * 2. 使用ReentrantLock确保线程安全 + * 3. 自动清理过期记录,避免内存泄漏 + * 4. 支持高并发场景 + */ +@Service +public class MemoryMachineCooldownService { + private static final Logger log = LoggerFactory.getLogger(MemoryMachineCooldownService.class); + + // 冷却时间:10分钟 + private static final int COOLDOWN_MINUTES = 10; + + // 内存缓存:machineId -> 冷却信息 + private final ConcurrentMap cooldownMap = new ConcurrentHashMap<>(); + + // 为每个设备提供独立的锁,避免全局锁竞争 + private final ConcurrentMap deviceLocks = new ConcurrentHashMap<>(); + + // 定期清理过期记录的阈值 + private static final int CLEANUP_THRESHOLD = 1000; + private volatile int lastCleanupSize = 0; + + /** + * 冷却信息内部类 + */ + private static class CooldownInfo { + private final LocalDateTime cooldownStartTime; + private final LocalDateTime cooldownEndTime; + private final String reason; + private final Long linkTaskId; + + public CooldownInfo(LocalDateTime cooldownStartTime, LocalDateTime cooldownEndTime, + String reason, Long linkTaskId) { + this.cooldownStartTime = cooldownStartTime; + this.cooldownEndTime = cooldownEndTime; + this.reason = reason; + this.linkTaskId = linkTaskId; + } + + public boolean isActive() { + return LocalDateTime.now().isBefore(cooldownEndTime); + } + + public long getRemainingMinutes() { + if (!isActive()) { + return 0; + } + return java.time.Duration.between(LocalDateTime.now(), cooldownEndTime).toMinutes() + 1; + } + + public LocalDateTime getCooldownStartTime() { return cooldownStartTime; } + public LocalDateTime getCooldownEndTime() { return cooldownEndTime; } + public String getReason() { return reason; } + public Long getLinkTaskId() { return linkTaskId; } + } + + /** + * 获取设备锁(双重检查锁定模式) + */ + private ReentrantLock getDeviceLock(String machineId) { + ReentrantLock lock = deviceLocks.get(machineId); + if (lock == null) { + synchronized (deviceLocks) { + lock = deviceLocks.get(machineId); + if (lock == null) { + lock = new ReentrantLock(); + deviceLocks.put(machineId, lock); + } + } + } + return lock; + } + + /** + * 检查机器是否在冷却期内 + * @param machineId 机器ID + * @return true表示在冷却期内,false表示可以操作 + */ + public boolean isMachineInCooldown(String machineId) { + if (machineId == null || machineId.trim().isEmpty()) { + return false; + } + + CooldownInfo cooldownInfo = cooldownMap.get(machineId); + if (cooldownInfo == null) { + return false; + } + + // 检查是否已过期 + if (!cooldownInfo.isActive()) { + // 异步清理过期记录 + cooldownMap.remove(machineId); + return false; + } + + long remainingMinutes = cooldownInfo.getRemainingMinutes(); + log.debug("机器{}在冷却期内,剩余冷却时间:{}分钟,原因:{}", + machineId, remainingMinutes, cooldownInfo.getReason()); + + return true; + } + + /** + * 将机器加入冷却队列(线程安全) + * @param machineId 机器ID + * @param reason 加入冷却的原因 + * @param linkTaskId 关联的链接任务ID(可选) + * @return true表示成功加入冷却,false表示设备已在冷却中 + */ + public boolean addMachineToCooldown(String machineId, String reason, Long linkTaskId) { + if (machineId == null || machineId.trim().isEmpty()) { + log.warn("尝试添加空的机器ID到冷却队列"); + return false; + } + + ReentrantLock lock = getDeviceLock(machineId); + lock.lock(); + try { + // 双重检查:在锁内再次检查是否已在冷却中 + CooldownInfo existingCooldown = cooldownMap.get(machineId); + if (existingCooldown != null && existingCooldown.isActive()) { + long remainingMinutes = existingCooldown.getRemainingMinutes(); + log.info("机器{}已在冷却期内,剩余时间:{}分钟,原因:{},跳过重复添加", + machineId, remainingMinutes, existingCooldown.getReason()); + return false; + } + + // 创建新的冷却记录 + LocalDateTime now = LocalDateTime.now(); + LocalDateTime cooldownEndTime = now.plusMinutes(COOLDOWN_MINUTES); + CooldownInfo newCooldown = new CooldownInfo(now, cooldownEndTime, reason, linkTaskId); + + cooldownMap.put(machineId, newCooldown); + + log.info("机器{}已加入冷却队列,原因:{},冷却时间:{}分钟,冷却结束时间:{},关联任务:{}", + machineId, reason, COOLDOWN_MINUTES, cooldownEndTime, linkTaskId); + + // 定期清理过期记录 + cleanupExpiredCooldownsIfNeeded(); + + return true; + + } finally { + lock.unlock(); + } + } + + /** + * 重载方法:不指定linkTaskId + */ + public boolean addMachineToCooldown(String machineId, String reason) { + return addMachineToCooldown(machineId, reason, null); + } + + /** + * 获取机器剩余冷却时间(分钟) + * @param machineId 机器ID + * @return 剩余冷却时间,如果不在冷却期则返回0 + */ + public long getRemainingCooldownMinutes(String machineId) { + if (machineId == null || machineId.trim().isEmpty()) { + return 0; + } + + CooldownInfo cooldownInfo = cooldownMap.get(machineId); + if (cooldownInfo == null) { + return 0; + } + + if (!cooldownInfo.isActive()) { + // 清理过期记录 + cooldownMap.remove(machineId); + return 0; + } + + return cooldownInfo.getRemainingMinutes(); + } + + /** + * 手动移除机器的冷却状态(用于测试或管理员操作) + * @param machineId 机器ID + * @return true表示成功移除,false表示设备不在冷却中 + */ + public boolean removeMachineFromCooldown(String machineId) { + if (machineId == null || machineId.trim().isEmpty()) { + return false; + } + + ReentrantLock lock = getDeviceLock(machineId); + lock.lock(); + try { + CooldownInfo removed = cooldownMap.remove(machineId); + if (removed != null) { + log.info("手动移除机器{}的冷却状态,原冷却原因:{}", machineId, removed.getReason()); + return true; + } + return false; + } finally { + lock.unlock(); + } + } + + /** + * 获取当前冷却队列大小 + * @return 冷却队列中的设备数量 + */ + public int getCooldownQueueSize() { + // 先清理过期记录再返回大小 + cleanupExpiredCooldownsIfNeeded(); + return cooldownMap.size(); + } + + /** + * 清理过期的冷却记录 + */ + public void cleanupExpiredCooldowns() { + int sizeBefore = cooldownMap.size(); + + // 使用迭代器安全地移除过期记录 + cooldownMap.entrySet().removeIf(entry -> { + CooldownInfo cooldownInfo = entry.getValue(); + if (!cooldownInfo.isActive()) { + log.debug("清理过期冷却记录:机器{},原因:{}", entry.getKey(), cooldownInfo.getReason()); + return true; + } + return false; + }); + + int removedCount = sizeBefore - cooldownMap.size(); + if (removedCount > 0) { + log.info("清理了{}个过期的冷却记录,当前冷却队列大小:{}", removedCount, cooldownMap.size()); + } + } + + /** + * 在需要时清理过期记录(避免频繁清理影响性能) + */ + private void cleanupExpiredCooldownsIfNeeded() { + int currentSize = cooldownMap.size(); + if (currentSize > CLEANUP_THRESHOLD && currentSize > lastCleanupSize * 1.5) { + cleanupExpiredCooldowns(); + lastCleanupSize = currentSize; + } + } + + /** + * 原子方式尝试分配设备(检查+分配一体化,防止并发竞争) + * @param machineId 要分配的机器ID + * @param reason 分配原因 + * @param linkTaskId 关联的链接任务ID + * @return true表示分配成功,false表示设备已被占用或在冷却中 + */ + public boolean tryAllocateDevice(String machineId, String reason, Long linkTaskId) { + if (machineId == null || machineId.trim().isEmpty()) { + log.warn("尝试分配空的机器ID"); + return false; + } + + ReentrantLock lock = getDeviceLock(machineId); + lock.lock(); + try { + // 原子检查:确保设备不在冷却期内 + CooldownInfo existingCooldown = cooldownMap.get(machineId); + if (existingCooldown != null && existingCooldown.isActive()) { + long remainingMinutes = existingCooldown.getRemainingMinutes(); + log.debug("设备{}在冷却期内,无法分配,剩余时间:{}分钟,原因:{}", + machineId, remainingMinutes, existingCooldown.getReason()); + return false; + } + + // 原子分配:立即将设备加入冷却队列 + LocalDateTime now = LocalDateTime.now(); + LocalDateTime cooldownEndTime = now.plusMinutes(COOLDOWN_MINUTES); + CooldownInfo newCooldown = new CooldownInfo(now, cooldownEndTime, reason, linkTaskId); + + cooldownMap.put(machineId, newCooldown); + + log.info("设备{}原子分配成功,原因:{},冷却时间:{}分钟,冷却结束时间:{},关联任务:{}", + machineId, reason, COOLDOWN_MINUTES, cooldownEndTime, linkTaskId); + + return true; + + } finally { + lock.unlock(); + } + } + + /** + * 释放设备分配(如果后续操作失败时回滚) + * @param machineId 要释放的机器ID + * @param linkTaskId 关联的链接任务ID(用于验证) + * @return true表示释放成功 + */ + public boolean releaseDeviceAllocation(String machineId, Long linkTaskId) { + if (machineId == null || machineId.trim().isEmpty()) { + return false; + } + + ReentrantLock lock = getDeviceLock(machineId); + lock.lock(); + try { + CooldownInfo cooldownInfo = cooldownMap.get(machineId); + if (cooldownInfo == null) { + return false; + } + + // 验证是否是同一个任务的分配 + if (linkTaskId != null && !linkTaskId.equals(cooldownInfo.getLinkTaskId())) { + log.warn("尝试释放设备{}失败:任务ID不匹配,当前任务:{},请求任务:{}", + machineId, cooldownInfo.getLinkTaskId(), linkTaskId); + return false; + } + + cooldownMap.remove(machineId); + log.info("设备{}分配已释放,原关联任务:{}", machineId, linkTaskId); + return true; + + } finally { + lock.unlock(); + } + } + + /** + * 获取冷却统计信息 + */ + public CooldownStats getCooldownStats() { + int totalDevices = cooldownMap.size(); + int activeDevices = 0; + long totalRemainingMinutes = 0; + + for (CooldownInfo cooldownInfo : cooldownMap.values()) { + if (cooldownInfo.isActive()) { + activeDevices++; + totalRemainingMinutes += cooldownInfo.getRemainingMinutes(); + } + } + + return new CooldownStats(totalDevices, activeDevices, totalRemainingMinutes); + } + + /** + * 冷却统计信息 + */ + public static class CooldownStats { + private final int totalDevices; + private final int activeDevices; + private final long totalRemainingMinutes; + + public CooldownStats(int totalDevices, int activeDevices, long totalRemainingMinutes) { + this.totalDevices = totalDevices; + this.activeDevices = activeDevices; + this.totalRemainingMinutes = totalRemainingMinutes; + } + + public int getTotalDevices() { return totalDevices; } + public int getActiveDevices() { return activeDevices; } + public long getTotalRemainingMinutes() { return totalRemainingMinutes; } + + @Override + public String toString() { + return String.format("CooldownStats{total=%d, active=%d, totalRemainingMinutes=%d}", + totalDevices, activeDevices, totalRemainingMinutes); + } + } +} diff --git a/src/main/java/com/gameplatform/server/service/link/DeviceAllocationService.java b/src/main/java/com/gameplatform/server/service/link/DeviceAllocationService.java new file mode 100644 index 0000000..42fcc5b --- /dev/null +++ b/src/main/java/com/gameplatform/server/service/link/DeviceAllocationService.java @@ -0,0 +1,154 @@ +package com.gameplatform.server.service.link; + +import com.gameplatform.server.mapper.agent.LinkTaskMapper; +import com.gameplatform.server.model.entity.agent.LinkTask; +import com.gameplatform.server.service.cooldown.MemoryMachineCooldownService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * 设备分配服务 - 提供线程安全的设备分配机制 + * 解决并发环境下多个请求争抢同一设备的问题 + */ +@Service +public class DeviceAllocationService { + private static final Logger log = LoggerFactory.getLogger(DeviceAllocationService.class); + + private final MemoryMachineCooldownService machineCooldownService; + private final LinkTaskMapper linkTaskMapper; + + public DeviceAllocationService(MemoryMachineCooldownService machineCooldownService, + LinkTaskMapper linkTaskMapper) { + this.machineCooldownService = machineCooldownService; + this.linkTaskMapper = linkTaskMapper; + } + + /** + * 原子方式分配设备,避免并发竞争 + * @param availableDevices 可用设备列表 + * @param linkTaskId 链接任务ID + * @param reason 分配原因 + * @return 分配成功的设备ID,如果所有设备都被占用则返回null + */ + public String allocateDevice(List availableDevices, Long linkTaskId, String reason) { + if (availableDevices == null || availableDevices.isEmpty()) { + log.warn("设备分配失败:没有可用设备"); + return null; + } + + log.info("开始设备分配流程:候选设备数={}, 任务ID={}, 原因={}", + availableDevices.size(), linkTaskId, reason); + + // 1. 过滤掉被其他任务占用的设备 + List filteredDevices = filterOccupiedDevices(availableDevices); + if (filteredDevices.isEmpty()) { + log.warn("设备分配失败:所有候选设备都被其他任务占用"); + return null; + } + + log.info("设备占用检查完成:原候选设备数={}, 过滤后设备数={}, 可用设备={}", + availableDevices.size(), filteredDevices.size(), filteredDevices); + + // 2. 打乱设备列表,实现负载均衡 + List shuffledDevices = new ArrayList<>(filteredDevices); + Collections.shuffle(shuffledDevices, ThreadLocalRandom.current()); + + log.info("设备列表已随机化:{}", shuffledDevices); + + // 3. 尝试原子分配设备(按随机顺序) + for (String deviceId : shuffledDevices) { + if (machineCooldownService.tryAllocateDevice(deviceId, reason, linkTaskId)) { + log.info("设备分配成功:设备={}, 任务ID={}, 原因={}", deviceId, linkTaskId, reason); + return deviceId; + } else { + log.debug("设备{}分配失败(在冷却期或已被占用),尝试下一个设备", deviceId); + } + } + + log.warn("设备分配失败:所有候选设备都在冷却期内"); + return null; + } + + /** + * 过滤掉被其他链接任务占用的设备 + * @param devices 设备列表 + * @return 未被占用的设备列表 + */ + private List filterOccupiedDevices(List devices) { + List availableDevices = new ArrayList<>(); + + for (String deviceId : devices) { + // 检查设备是否被其他链接任务占用(USING、LOGGED_IN状态) + List occupiedTasks = linkTaskMapper.findByMachineIdAndStatus(deviceId, "USING"); + occupiedTasks.addAll(linkTaskMapper.findByMachineIdAndStatus(deviceId, "LOGGED_IN")); + + if (occupiedTasks.isEmpty()) { + availableDevices.add(deviceId); + } else { + log.debug("设备{}被其他链接任务占用,占用任务数={}", deviceId, occupiedTasks.size()); + for (LinkTask occupiedTask : occupiedTasks) { + log.debug("占用设备{}的链接:codeNo={}, status={}, 任务ID={}", + deviceId, occupiedTask.getCodeNo(), occupiedTask.getStatus(), occupiedTask.getId()); + } + } + } + + return availableDevices; + } + + /** + * 验证设备分配结果(分配后的双重检查) + * @param deviceId 设备ID + * @param linkTaskId 链接任务ID + * @return true表示分配有效,false表示存在冲突 + */ + public boolean validateDeviceAllocation(String deviceId, Long linkTaskId) { + if (deviceId == null || linkTaskId == null) { + return false; + } + + // 检查是否有其他任务也占用了这个设备 + List conflictTasks = linkTaskMapper.findByMachineIdAndStatus(deviceId, "USING"); + conflictTasks.addAll(linkTaskMapper.findByMachineIdAndStatus(deviceId, "LOGGED_IN")); + + // 过滤掉自己的任务 + conflictTasks.removeIf(task -> task.getId().equals(linkTaskId)); + + if (!conflictTasks.isEmpty()) { + log.error("设备分配冲突检测:设备{}被多个任务占用,当前任务ID={},冲突任务数={}", + deviceId, linkTaskId, conflictTasks.size()); + + for (LinkTask conflictTask : conflictTasks) { + log.error("冲突任务详情:任务ID={}, codeNo={}, status={}, 设备={}", + conflictTask.getId(), conflictTask.getCodeNo(), + conflictTask.getStatus(), conflictTask.getMachineId()); + } + return false; + } + + log.debug("设备分配验证通过:设备={}, 任务ID={}", deviceId, linkTaskId); + return true; + } + + /** + * 释放设备分配(失败回滚时使用) + * @param deviceId 设备ID + * @param linkTaskId 链接任务ID + */ + public void releaseDeviceAllocation(String deviceId, Long linkTaskId) { + if (deviceId != null && linkTaskId != null) { + boolean released = machineCooldownService.releaseDeviceAllocation(deviceId, linkTaskId); + if (released) { + log.info("设备分配已释放:设备={}, 任务ID={}", deviceId, linkTaskId); + } else { + log.warn("设备分配释放失败:设备={}, 任务ID={}", deviceId, linkTaskId); + } + } + } +}