# ============================================================ # 全链路串联测试 — 跨角色业务流程 # 模拟真实的业务协作场景 # ============================================================ import pytest import json from config import SELLER_SHOP_ID, KEEP_TEST_DATA class TestFullChain: def test_chain_01_admin_entry_review(self, client, admin_token): """管理员查看并审核入驻申请""" resp = client.get("/agri/merchantEntryApply/list", params={ "pageSize": 5, "pageNum": 1 }) client.assert_ok() data = client.extract_data() or {} total = data.get("total", 0) rows = data.get("rows", []) print(f" [INFO] 入驻申请共 {total} 条") pending = [r for r in rows if r.get("applyStatus") == "PENDING"] if pending: apply = pending[0] aid = apply.get("applyId") resp = client.put(f"/agri/merchantEntryApply/approve/{aid}", json={}) client.assert_ok("审核通过失败") print(f" [OK] 审核通过 applyId={aid}") else: print(f" [SKIP] 无待审核申请") def test_chain_02_admin_audit_goods(self, client, admin_token): """管理员审核待上架商品""" resp = client.get("/agri/goodsAudit/list", params={ "pageSize": 5, "pageNum": 1 }) client.assert_ok() data = client.extract_data() or {} total = data.get("total", 0) rows = data.get("rows", []) print(f" [INFO] 待审核商品共 {total} 条") pending = [r for r in rows if r.get("goodsStatus") in ("SUBMITTED", "PENDING")] if pending: g = pending[0] gid = g.get("goodsId") resp = client.put("/agri/goodsAudit/audit", json={ "goodsIds": [gid], "auditStatus": "APPROVED", "auditRemark": "E2E测试自动审核通过" }) data = client.assert_ok("审核失败") print(f" [OK] 审核通过 goodsId={gid}") else: print(f" [SKIP] 无待审核商品") def test_chain_03_seller_switch_shop(self, client, seller_token): """商家切换店铺上下文""" resp = client.put("/agri/seller/context/shop", json={ "shopId": SELLER_SHOP_ID }) client.assert_ok("切换店铺失败") print(f" [OK] 店铺已切换 shopId={SELLER_SHOP_ID}") def test_chain_04_seller_view_goods(self, client, seller_token): """商家查看商品并检查待提交商品""" resp = client.get("/agri/seller/goods/list") client.assert_ok() data = client.extract_data() or {} total = data.get("total", 0) rows = data.get("rows", []) print(f" [INFO] 商家商品共 {total} 条") draft = [r for r in rows if r.get("goodsStatus") in ("DRAFT",)] submitted = [r for r in rows if r.get("goodsStatus") in ("SUBMITTED", "PENDING")] on_shelf = [r for r in rows if r.get("goodsStatus") == "ON_SHELF"] if draft: g = draft[0] gid = g.get("goodsId") resp = client.put(f"/agri/seller/goods/{gid}/submit") client.assert_ok("提交上架失败") print(f" [OK] 提交上架 goodsId={gid}") else: print(f" [SKIP] 无草稿商品需提交") if on_shelf: print(f" [INFO] 在售商品 {len(on_shelf)} 个") def test_chain_05_seller_view_orders(self, client, seller_token): """商家查看订单并发货""" resp = client.get("/agri/seller/order/list") client.assert_ok() data = client.extract_data() or {} total = data.get("total", 0) rows = data.get("rows", []) print(f" [INFO] 商家订单共 {total} 条") unpaid = [r for r in rows if r.get("orderStatus") in ("WAIT_PAY", "UNPAID")] shipped = [r for r in rows if r.get("orderStatus") in ("WAIT_SHIP", "UNSHIPPED")] print(f" [INFO] 待付款 {len(unpaid)}, 待发货 {len(shipped)}") def test_chain_06_seller_view_stock(self, client, seller_token): """商家查看库存""" resp = client.get("/agri/seller/stock/query/list") client.assert_ok() data = client.extract_data() or {} total = data.get("total", 0) rows = data.get("rows", []) low_stock = [r for r in rows if (r.get("stock") or 0) < 10] print(f" [INFO] 库存商品 {total} 个, 低库存 {len(low_stock)} 个") if low_stock: g = low_stock[0] gid = g.get("goodsId") print(f" [INFO] 需补货: goodsId={gid} stock={g.get('stock','?')}") def test_chain_07_consumer_register_browse(self, client, member_token): """消费者注册后浏览首页""" resp = client.get("/api/home/banners") client.assert_ok() banners = client.extract_data() or [] resp = client.get("/api/home/categories") client.assert_ok() cats = client.extract_data() or [] resp = client.get("/api/home/hot-goods") client.assert_ok() goods = client.extract_data() or [] print(f" [OK] 首页浏览: Banner={len(banners)} 分类={len(cats)} 热销={len(goods)}") if goods: client._test_goods_id = goods[0].get("goodsId") or goods[0].get("id") resp = client.get(f"/api/goods/{client._test_goods_id}") client.assert_ok() detail = client.extract_data() or {} print(f" [OK] 商品详情: {detail.get('goodsName','')[:20]}") def test_chain_08_consumer_add_to_cart(self, client, member_token): """消费者加购并下单""" gid = getattr(client, '_test_goods_id', None) resp = client.get("/api/home/hot-goods") data = client.assert_ok() goods = data.get("data", []) if not goods: pytest.skip("无在售商品") gid = goods[0].get("goodsId") or goods[0].get("id") # 加购 resp = client.post("/api/cart/items", json={"goodsId": gid, "quantity": 1}) client.assert_ok("加购失败") print(f" [OK] 加购 goodsId={gid}") # 需要地址才能下单 resp = client.post("/api/member/address", json={ "receiverName": "测试用户", "receiverMobile": "13800138000", "province": "北京市", "city": "北京市", "district": "朝阳区", "detailAddress": "E2E测试地址100号", "fullAddress": "北京市北京市朝阳区E2E测试地址100号", "isDefault": True, }) data = client.assert_ok() addr_id = data.get("data") # 预览订单 resp = client.post("/api/checkout/preview", json={ "source": "BUY_NOW", "items": [{"goodsId": gid, "quantity": 1}], }) client.assert_ok("预览订单失败") print(f" [OK] 订单预览通过") # 提交订单 resp = client.post("/api/checkout/submit", json={ "source": "BUY_NOW", "addressId": addr_id, "payType": "WEIXIN", "items": [{"goodsId": gid, "quantity": 1}], }) data = client.assert_ok("提交订单失败") order_data = data.get("data", {}) if isinstance(data.get("data"), dict) else data order_id = order_data.get("orderId") if not order_id: order_ids = order_data.get("orderIds", []) order_id = order_ids[0] if order_ids else None if order_id: print(f" [OK] 订单已提交 orderId={order_id}") def test_chain_09_consumer_view_after_sale(self, client, member_token): """消费者查看售后入口""" resp = client.get("/api/order/list") client.assert_ok() data = client.extract_data() or {} total = data.get("total", 0) print(f" [INFO] 我的订单共 {total} 条") # 售后列表 resp = client.get("/api/order/aftersale/list") client.assert_ok() data = client.extract_data() or {} total_as = data.get("total", 0) if isinstance(data, dict) else 0 print(f" [INFO] 售后单共 {total_as} 条") def test_chain_10_admin_stats_overview(self, client, admin_token): """平台端统一数据概览""" modules = [ ("商品待审", "/agri/goodsAudit/pendingCount", "pendingCount"), ("入驻待审", "/agri/merchantEntryApply/pendingCount", "pendingCount"), ("提现待审", "/agri/finance/withdrawAudit/pendingCount", "pendingCount"), ] for label, path, field in modules: resp = client.get(path) client.assert_ok() val = (client.extract_data() or {}).get(field, "?") print(f" [INFO] {label}: {val}")