import json from time import sleep from random import random from requests import request from bs4 import BeautifulSoup from requests.exceptions import ReadTimeout total = {"province": 0, "city": 0, "county": 0, "town": 0, "village": 0} run = {"province": 0, "city": 0, "county": 0, "town": 0} cur = {"province": 0, "city": 0, "county": 0} errs = [] def save(filename: "str", data: "dict or list"): with open(filename, "w", encoding="utf-8") as fp: json.dump(data, fp, ensure_ascii=False, indent=2) fp.close() def get_village_of(root: "str", url: "str") -> "dict": # root: 2022/2/2/2/9.html res, resp = {}, None try: resp = request("get", f"{root}/{url}", timeout=2) except ReadTimeout: errs.append({"type": "village", "reason": "timeout", "url": f"{root}/{url}"}) print(f"village timeout") return {} dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml") targets = dom.select("tr.villagetr") size = len(targets) if size == 0: errs.append({"type": "village", "reason": "zero", "url": f"{root}/{url}"}) print(f"village zero") return {} total["village"] += size for village in targets: infos = village.select("td") res[infos[0].text] = {"type": infos[1].text, "name": infos[2].text} print(f"{size:02} villages") return res def get_town_of(root: "str", url: "str") -> "dict": # root: 2022/2/2/6.html sleep(random() + 0.5) res, resp = {}, None try: resp = request("get", f"{root}/{url}", timeout=2) except ReadTimeout: errs.append({"type": "town", "reason": "timeout", "url": f"{root}/{url}"}) print(f"|------town timeout") return {} dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml") targets = dom.select("tr.towntr > td:last-child > a") run["town"] = len(targets) if run["town"] == 0: errs.append({"type": "town", "reason": "zero", "url": f"{root}/{url}"}) print(f"|------town zero") return {} total["town"] += run["town"] for cur["town"], town in enumerate(targets): name, link = town.text, town["href"] # link: 2/9.html print( f"|------town [{cur['town'] + 1:02}/{run['town']:02}]" f" county [{cur['county'] + 1:02}/{run['county']:02}]" f" city [{cur['city'] + 1:02}/{run['city']:02}]" f" province [{cur['province'] + 1:02}/{run['province']:02}]", end=" => " ) res[link[3:12]] = {"name": name, "children": get_village_of(f"{root}/{url[0:2]}", link)} return res def get_county_of(root: "str", url: "str") -> "dict": # root: 2022/2/4.html res, resp = {}, None try: resp = request("get", f"{root}/{url}", timeout=2) except ReadTimeout: errs.append({"type": "county", "reason": "timeout", "url": f"{root}/{url}"}) print(f"|----county timeout") return {} dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml") targets = dom.select("tr.countytr > td:last-child > a") run["county"] = len(targets) if run["county"] == 0: errs.append({"type": "county", "reason": "zero", "url": f"{root}/{url}"}) print(f"|----county zero") return {} total["county"] += run["county"] for cur["county"], county in enumerate(targets): name, link = county.text, county["href"] # link: 2/6.html print( f"|----county [{cur['county'] + 1:02}/{run['county']:02}]" f" city [{cur['city'] + 1:02}/{run['city']:02}]" f" province [{cur['province'] + 1:02}/{run['province']:02}]" ) res[link[3:9]] = {"name": name, "children": get_town_of(f"{root}/{url[0:2]}", link)} sleep(2) return res def get_cities_of(root: "str", url: "str") -> "dict": # root: 2022/2.html res, resp = {}, None try: resp = request("get", f"{root}/{url}", timeout=2) except ReadTimeout: errs.append({"type": "city", "reason": "timeout", "url": f"{root}/{url}"}) print(f"|--city timeout") return {} dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml") targets = dom.select("tr.citytr > td:last-child > a") run["city"] = len(targets) if run["city"] == 0: errs.append({"type": "city", "reason": "zero", "url": f"{root}/{url}"}) print(f"|--city zero") return {} total["city"] += run["city"] for cur["city"], city in enumerate(targets): name, link = city.text, city["href"] # link: 2/4.html print( f"|--city <{name}> [{cur['city'] + 1:02}/{run['city']:02}]" f" of province [{cur['province'] + 1:02}/{run['province']:02}]" ) res[link[3:7]] = {"name": name, "children": get_county_of(root, link)} sleep(3) return res def get_provinces(root: "str") -> "dict": res = {} resp = request("get", f"{root}/index.html") dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml") targets = dom.select("tr.provincetr a") run["province"] = len(targets) assert run["province"] > 0 total["province"] += run["province"] for cur["province"], province in enumerate(targets): name, link = province.text, province["href"] # link: 2.html print(f"province <{name}> [{cur['province'] + 1:02}/{run['province']:02}] from {root}/{link}") res[link[0:2]] = {"name": name, "children": get_cities_of(root, link)} sleep(5) return res def main(): root = "http://www.stats.gov.cn/sj/tjbz/tjyqhdmhcxhfdm/2022" # back = "http://www.tcmap.com.cn/" res_file, total_file, err_file = "main.json", "total.json", "error.json" res = get_provinces(root) print(total) save(total_file, total) print(errs) save(err_file, errs) print(res) save(res_file, res) if __name__ == "__main__": """ province, city, county, town, village 省、市、县、镇、村 """ main()