# ============================================================ # HTTP 客户端封装 — 统一处理 Token、会话、请求/响应日志 # ============================================================ import requests import json import time from config import API_BASE_URL, REQUEST_TIMEOUT class ApiClient: """封装 HTTP 请求,自动管理 Token 和会话""" def __init__(self, base_url=None): self.base_url = base_url or API_BASE_URL self.session = requests.Session() self.session.headers.update({ "Content-Type": "application/json;charset=utf-8", "User-Agent": "E2ETest/1.0", }) self._last_response = None self._admin_token = None # 管理员 Token(Authorization 头) self._member_token = None # 会员 Token(具体头部看接口) self._seller_token = None # 商家 Token # ---------- 基础请求方法 ---------- def request(self, method, path, **kwargs): url = f"{self.base_url}{path}" if not path.startswith("http") else path kwargs.setdefault("timeout", REQUEST_TIMEOUT) kwargs.setdefault("headers", {}) resp = self.session.request(method, url, **kwargs) self._last_response = resp # 打印简略日志 _log_req(method, url, kwargs) _log_resp(resp) return resp def get(self, path, **kwargs): return self.request("GET", path, **kwargs) def post(self, path, **kwargs): return self.request("POST", path, **kwargs) def put(self, path, **kwargs): return self.request("PUT", path, **kwargs) def delete(self, path, **kwargs): return self.request("DELETE", path, **kwargs) # ---------- Token 管理 ---------- def set_admin_token(self, token): self._admin_token = token self.session.headers["Authorization"] = f"Bearer {token}" def set_member_token(self, token): self._member_token = token # 根据 MemberContext 的实现,会员 Token 也是 Authorization 头 # 但区分会员和管理员,我们用不同的头 self.session.headers["Authorization"] = f"Bearer {token}" def set_seller_token(self, token): self._seller_token = token self.session.headers["Authorization"] = f"Bearer {token}" def clear_token(self): self._admin_token = None self._member_token = None self._seller_token = None self.session.headers.pop("Authorization", None) # ---------- 响应解析 ---------- def parse_json(self): if self._last_response is None: return None try: return self._last_response.json() except Exception: return None def assert_ok(self, err_msg=None): """断言接口返回 code == 200(AjaxResult.success)""" data = self.parse_json() if data is None: raise AssertionError(f"{err_msg or '响应不是 JSON'}: {self._last_response.text[:200]}") code = data.get("code") if isinstance(data, dict) else None if code != 200: msg = data.get("msg", "未知错误") raise AssertionError(f"{err_msg or '接口错误'}: code={code}, msg={msg}") return data def extract_data(self): """从 AjaxResult 中提取 data 字段""" data = self.assert_ok() return data.get("data") # ---------- 工具方法 ---------- def get_captcha(self): """ 获取登录验证码(若依标准) 返回 (uuid, math_answer) 或 (None, None) """ resp = self.get("/captchaImage") body = self.assert_ok("获取验证码失败") return body.get("uuid"), None # math 验证码需要 OCR,建议测试环境关闭 # ---------- 日志辅助 ---------- def _log_req(method, url, kwargs): body = kwargs.get("json") or kwargs.get("data") snippet = "" if body: s = json.dumps(body, ensure_ascii=False) snippet = f" | body={s[:120]}" print(f" >>> {method} {url}{snippet}") def _log_resp(resp): status = resp.status_code body = "" try: data = resp.json() if isinstance(data, dict): code = data.get("code", "") msg = data.get("msg", "") body = f"code={code} msg={msg}" except Exception: body = resp.text[:80] print(f" <<< {status} | {body}")