| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- # ============================================================
- # 全链路串联测试 — 跨角色业务流程
- # 模拟真实的业务协作场景
- # ============================================================
- import pytest
- import json
- from config import SELLER_SHOP_ID, KEEP_TEST_DATA
- class TestFullChain:
- def test_chain_01_admin_entry_review(self, client, admin_token):
- """管理员查看并审核入驻申请"""
- resp = client.get("/agri/merchantEntryApply/list", params={
- "pageSize": 5, "pageNum": 1
- })
- client.assert_ok()
- data = client.extract_data() or {}
- total = data.get("total", 0)
- rows = data.get("rows", [])
- print(f" [INFO] 入驻申请共 {total} 条")
- pending = [r for r in rows if r.get("applyStatus") == "PENDING"]
- if pending:
- apply = pending[0]
- aid = apply.get("applyId")
- resp = client.put(f"/agri/merchantEntryApply/approve/{aid}", json={})
- client.assert_ok("审核通过失败")
- print(f" [OK] 审核通过 applyId={aid}")
- else:
- print(f" [SKIP] 无待审核申请")
- def test_chain_02_admin_audit_goods(self, client, admin_token):
- """管理员审核待上架商品"""
- resp = client.get("/agri/goodsAudit/list", params={
- "pageSize": 5, "pageNum": 1
- })
- client.assert_ok()
- data = client.extract_data() or {}
- total = data.get("total", 0)
- rows = data.get("rows", [])
- print(f" [INFO] 待审核商品共 {total} 条")
- pending = [r for r in rows if r.get("goodsStatus") in ("SUBMITTED", "PENDING")]
- if pending:
- g = pending[0]
- gid = g.get("goodsId")
- resp = client.put("/agri/goodsAudit/audit", json={
- "goodsIds": [gid],
- "auditStatus": "APPROVED",
- "auditRemark": "E2E测试自动审核通过"
- })
- data = client.assert_ok("审核失败")
- print(f" [OK] 审核通过 goodsId={gid}")
- else:
- print(f" [SKIP] 无待审核商品")
- def test_chain_03_seller_switch_shop(self, client, seller_token):
- """商家切换店铺上下文"""
- resp = client.put("/agri/seller/context/shop", json={
- "shopId": SELLER_SHOP_ID
- })
- client.assert_ok("切换店铺失败")
- print(f" [OK] 店铺已切换 shopId={SELLER_SHOP_ID}")
- def test_chain_04_seller_view_goods(self, client, seller_token):
- """商家查看商品并检查待提交商品"""
- resp = client.get("/agri/seller/goods/list")
- client.assert_ok()
- data = client.extract_data() or {}
- total = data.get("total", 0)
- rows = data.get("rows", [])
- print(f" [INFO] 商家商品共 {total} 条")
- draft = [r for r in rows if r.get("goodsStatus") in ("DRAFT",)]
- submitted = [r for r in rows if r.get("goodsStatus") in ("SUBMITTED", "PENDING")]
- on_shelf = [r for r in rows if r.get("goodsStatus") == "ON_SHELF"]
- if draft:
- g = draft[0]
- gid = g.get("goodsId")
- resp = client.put(f"/agri/seller/goods/{gid}/submit")
- client.assert_ok("提交上架失败")
- print(f" [OK] 提交上架 goodsId={gid}")
- else:
- print(f" [SKIP] 无草稿商品需提交")
- if on_shelf:
- print(f" [INFO] 在售商品 {len(on_shelf)} 个")
- def test_chain_05_seller_view_orders(self, client, seller_token):
- """商家查看订单并发货"""
- resp = client.get("/agri/seller/order/list")
- client.assert_ok()
- data = client.extract_data() or {}
- total = data.get("total", 0)
- rows = data.get("rows", [])
- print(f" [INFO] 商家订单共 {total} 条")
- unpaid = [r for r in rows if r.get("orderStatus") in ("WAIT_PAY", "UNPAID")]
- shipped = [r for r in rows if r.get("orderStatus") in ("WAIT_SHIP", "UNSHIPPED")]
- print(f" [INFO] 待付款 {len(unpaid)}, 待发货 {len(shipped)}")
- def test_chain_06_seller_view_stock(self, client, seller_token):
- """商家查看库存"""
- resp = client.get("/agri/seller/stock/query/list")
- client.assert_ok()
- data = client.extract_data() or {}
- total = data.get("total", 0)
- rows = data.get("rows", [])
- low_stock = [r for r in rows if (r.get("stock") or 0) < 10]
- print(f" [INFO] 库存商品 {total} 个, 低库存 {len(low_stock)} 个")
- if low_stock:
- g = low_stock[0]
- gid = g.get("goodsId")
- print(f" [INFO] 需补货: goodsId={gid} stock={g.get('stock','?')}")
- def test_chain_07_consumer_register_browse(self, client, member_token):
- """消费者注册后浏览首页"""
- resp = client.get("/api/home/banners")
- client.assert_ok()
- banners = client.extract_data() or []
- resp = client.get("/api/home/categories")
- client.assert_ok()
- cats = client.extract_data() or []
- resp = client.get("/api/home/hot-goods")
- client.assert_ok()
- goods = client.extract_data() or []
- print(f" [OK] 首页浏览: Banner={len(banners)} 分类={len(cats)} 热销={len(goods)}")
- if goods:
- client._test_goods_id = goods[0].get("goodsId") or goods[0].get("id")
- resp = client.get(f"/api/goods/{client._test_goods_id}")
- client.assert_ok()
- detail = client.extract_data() or {}
- print(f" [OK] 商品详情: {detail.get('goodsName','')[:20]}")
- def test_chain_08_consumer_add_to_cart(self, client, member_token):
- """消费者加购并下单"""
- gid = getattr(client, '_test_goods_id', None)
- resp = client.get("/api/home/hot-goods")
- data = client.assert_ok()
- goods = data.get("data", [])
- if not goods:
- pytest.skip("无在售商品")
- gid = goods[0].get("goodsId") or goods[0].get("id")
- # 加购
- resp = client.post("/api/cart/items", json={"goodsId": gid, "quantity": 1})
- client.assert_ok("加购失败")
- print(f" [OK] 加购 goodsId={gid}")
- # 需要地址才能下单
- resp = client.post("/api/member/address", json={
- "receiverName": "测试用户",
- "receiverMobile": "13800138000",
- "province": "北京市", "city": "北京市", "district": "朝阳区",
- "detailAddress": "E2E测试地址100号",
- "fullAddress": "北京市北京市朝阳区E2E测试地址100号",
- "isDefault": True,
- })
- data = client.assert_ok()
- addr_id = data.get("data")
- # 预览订单
- resp = client.post("/api/checkout/preview", json={
- "source": "BUY_NOW",
- "items": [{"goodsId": gid, "quantity": 1}],
- })
- client.assert_ok("预览订单失败")
- print(f" [OK] 订单预览通过")
- # 提交订单
- resp = client.post("/api/checkout/submit", json={
- "source": "BUY_NOW",
- "addressId": addr_id,
- "payType": "WEIXIN",
- "items": [{"goodsId": gid, "quantity": 1}],
- })
- data = client.assert_ok("提交订单失败")
- order_data = data.get("data", {}) if isinstance(data.get("data"), dict) else data
- order_id = order_data.get("orderId")
- if not order_id:
- order_ids = order_data.get("orderIds", [])
- order_id = order_ids[0] if order_ids else None
- if order_id:
- print(f" [OK] 订单已提交 orderId={order_id}")
- def test_chain_09_consumer_view_after_sale(self, client, member_token):
- """消费者查看售后入口"""
- resp = client.get("/api/order/list")
- client.assert_ok()
- data = client.extract_data() or {}
- total = data.get("total", 0)
- print(f" [INFO] 我的订单共 {total} 条")
- # 售后列表
- resp = client.get("/api/order/aftersale/list")
- client.assert_ok()
- data = client.extract_data() or {}
- total_as = data.get("total", 0) if isinstance(data, dict) else 0
- print(f" [INFO] 售后单共 {total_as} 条")
- def test_chain_10_admin_stats_overview(self, client, admin_token):
- """平台端统一数据概览"""
- modules = [
- ("商品待审", "/agri/goodsAudit/pendingCount", "pendingCount"),
- ("入驻待审", "/agri/merchantEntryApply/pendingCount", "pendingCount"),
- ("提现待审", "/agri/finance/withdrawAudit/pendingCount", "pendingCount"),
- ]
- for label, path, field in modules:
- resp = client.get(path)
- client.assert_ok()
- val = (client.extract_data() or {}).get(field, "?")
- print(f" [INFO] {label}: {val}")
|