| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- # ============================================================
- # HTTP 客户端封装 — 统一处理 Token、会话、请求/响应日志
- # ============================================================
- import requests
- import json
- import time
- from config import API_BASE_URL, REQUEST_TIMEOUT
- class ApiClient:
- """封装 HTTP 请求,自动管理 Token 和会话"""
- def __init__(self, base_url=None):
- self.base_url = base_url or API_BASE_URL
- self.session = requests.Session()
- self.session.headers.update({
- "Content-Type": "application/json;charset=utf-8",
- "User-Agent": "E2ETest/1.0",
- })
- self._last_response = None
- self._admin_token = None # 管理员 Token(Authorization 头)
- self._member_token = None # 会员 Token(具体头部看接口)
- self._seller_token = None # 商家 Token
- # ---------- 基础请求方法 ----------
- def request(self, method, path, **kwargs):
- url = f"{self.base_url}{path}" if not path.startswith("http") else path
- kwargs.setdefault("timeout", REQUEST_TIMEOUT)
- kwargs.setdefault("headers", {})
- resp = self.session.request(method, url, **kwargs)
- self._last_response = resp
- # 打印简略日志
- _log_req(method, url, kwargs)
- _log_resp(resp)
- return resp
- def get(self, path, **kwargs):
- return self.request("GET", path, **kwargs)
- def post(self, path, **kwargs):
- return self.request("POST", path, **kwargs)
- def put(self, path, **kwargs):
- return self.request("PUT", path, **kwargs)
- def delete(self, path, **kwargs):
- return self.request("DELETE", path, **kwargs)
- # ---------- Token 管理 ----------
- def set_admin_token(self, token):
- self._admin_token = token
- self.session.headers["Authorization"] = f"Bearer {token}"
- def set_member_token(self, token):
- self._member_token = token
- # 根据 MemberContext 的实现,会员 Token 也是 Authorization 头
- # 但区分会员和管理员,我们用不同的头
- self.session.headers["Authorization"] = f"Bearer {token}"
- def set_seller_token(self, token):
- self._seller_token = token
- self.session.headers["Authorization"] = f"Bearer {token}"
- def clear_token(self):
- self._admin_token = None
- self._member_token = None
- self._seller_token = None
- self.session.headers.pop("Authorization", None)
- # ---------- 响应解析 ----------
- def parse_json(self):
- if self._last_response is None:
- return None
- try:
- return self._last_response.json()
- except Exception:
- return None
- def assert_ok(self, err_msg=None):
- """断言接口返回 code == 200(AjaxResult.success)"""
- data = self.parse_json()
- if data is None:
- raise AssertionError(f"{err_msg or '响应不是 JSON'}: {self._last_response.text[:200]}")
- code = data.get("code") if isinstance(data, dict) else None
- if code != 200:
- msg = data.get("msg", "未知错误")
- raise AssertionError(f"{err_msg or '接口错误'}: code={code}, msg={msg}")
- return data
- def extract_data(self):
- """从 AjaxResult 中提取 data 字段"""
- data = self.assert_ok()
- return data.get("data")
- # ---------- 工具方法 ----------
- def get_captcha(self):
- """
- 获取登录验证码(若依标准)
- 返回 (uuid, math_answer) 或 (None, None)
- """
- resp = self.get("/captchaImage")
- body = self.assert_ok("获取验证码失败")
- return body.get("uuid"), None # math 验证码需要 OCR,建议测试环境关闭
- # ---------- 日志辅助 ----------
- def _log_req(method, url, kwargs):
- body = kwargs.get("json") or kwargs.get("data")
- snippet = ""
- if body:
- s = json.dumps(body, ensure_ascii=False)
- snippet = f" | body={s[:120]}"
- print(f" >>> {method} {url}{snippet}")
- def _log_resp(resp):
- status = resp.status_code
- body = ""
- try:
- data = resp.json()
- if isinstance(data, dict):
- code = data.get("code", "")
- msg = data.get("msg", "")
- body = f"code={code} msg={msg}"
- except Exception:
- body = resp.text[:80]
- print(f" <<< {status} | {body}")
|