123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import json
- from time import sleep
- from random import random
- from requests import request
- from bs4 import BeautifulSoup
- from requests.exceptions import ReadTimeout
- total = {"province": 0, "city": 0, "county": 0, "town": 0, "village": 0}
- run = {"province": 0, "city": 0, "county": 0, "town": 0}
- cur = {"province": 0, "city": 0, "county": 0}
- errs = []
- def save(filename: "str", data: "dict or list"):
- with open(filename, "w", encoding="utf-8") as fp:
- json.dump(data, fp, ensure_ascii=False, indent=2)
- fp.close()
- def get_village_of(root: "str", url: "str") -> "dict": # root: 2022/2/2/2/9.html
- res, resp = {}, None
- try:
- resp = request("get", f"{root}/{url}", timeout=2)
- except ReadTimeout:
- errs.append({"type": "village", "reason": "timeout", "url": f"{root}/{url}"})
- print(f"village timeout")
- return {}
- dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml")
- targets = dom.select("tr.villagetr")
- size = len(targets)
- if size == 0:
- errs.append({"type": "village", "reason": "zero", "url": f"{root}/{url}"})
- print(f"village zero")
- return {}
- total["village"] += size
- for village in targets:
- infos = village.select("td")
- res[infos[0].text] = {"type": infos[1].text, "name": infos[2].text}
- print(f"{size:02} villages")
- return res
- def get_town_of(root: "str", url: "str") -> "dict": # root: 2022/2/2/6.html
- sleep(random() + 0.5)
- res, resp = {}, None
- try:
- resp = request("get", f"{root}/{url}", timeout=2)
- except ReadTimeout:
- errs.append({"type": "town", "reason": "timeout", "url": f"{root}/{url}"})
- print(f"|------town timeout")
- return {}
- dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml")
- targets = dom.select("tr.towntr > td:last-child > a")
- run["town"] = len(targets)
- if run["town"] == 0:
- errs.append({"type": "town", "reason": "zero", "url": f"{root}/{url}"})
- print(f"|------town zero")
- return {}
- total["town"] += run["town"]
- for cur["town"], town in enumerate(targets):
- name, link = town.text, town["href"] # link: 2/9.html
- print(
- f"|------town [{cur['town'] + 1:02}/{run['town']:02}]"
- f" county [{cur['county'] + 1:02}/{run['county']:02}]"
- f" city [{cur['city'] + 1:02}/{run['city']:02}]"
- f" province [{cur['province'] + 1:02}/{run['province']:02}]",
- end=" => "
- )
- res[link[3:12]] = {"name": name, "children": get_village_of(f"{root}/{url[0:2]}", link)}
- return res
- def get_county_of(root: "str", url: "str") -> "dict": # root: 2022/2/4.html
- res, resp = {}, None
- try:
- resp = request("get", f"{root}/{url}", timeout=2)
- except ReadTimeout:
- errs.append({"type": "county", "reason": "timeout", "url": f"{root}/{url}"})
- print(f"|----county timeout")
- return {}
- dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml")
- targets = dom.select("tr.countytr > td:last-child > a")
- run["county"] = len(targets)
- if run["county"] == 0:
- errs.append({"type": "county", "reason": "zero", "url": f"{root}/{url}"})
- print(f"|----county zero")
- return {}
- total["county"] += run["county"]
- for cur["county"], county in enumerate(targets):
- name, link = county.text, county["href"] # link: 2/6.html
- print(
- f"|----county [{cur['county'] + 1:02}/{run['county']:02}]"
- f" city [{cur['city'] + 1:02}/{run['city']:02}]"
- f" province [{cur['province'] + 1:02}/{run['province']:02}]"
- )
- res[link[3:9]] = {"name": name, "children": get_town_of(f"{root}/{url[0:2]}", link)}
- sleep(2)
- return res
- def get_cities_of(root: "str", url: "str") -> "dict": # root: 2022/2.html
- res, resp = {}, None
- try:
- resp = request("get", f"{root}/{url}", timeout=2)
- except ReadTimeout:
- errs.append({"type": "city", "reason": "timeout", "url": f"{root}/{url}"})
- print(f"|--city timeout")
- return {}
- dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml")
- targets = dom.select("tr.citytr > td:last-child > a")
- run["city"] = len(targets)
- if run["city"] == 0:
- errs.append({"type": "city", "reason": "zero", "url": f"{root}/{url}"})
- print(f"|--city zero")
- return {}
- total["city"] += run["city"]
- for cur["city"], city in enumerate(targets):
- name, link = city.text, city["href"] # link: 2/4.html
- print(
- f"|--city <{name}> [{cur['city'] + 1:02}/{run['city']:02}]"
- f" of province [{cur['province'] + 1:02}/{run['province']:02}]"
- )
- res[link[3:7]] = {"name": name, "children": get_county_of(root, link)}
- sleep(3)
- return res
- def get_provinces(root: "str") -> "dict":
- res = {}
- resp = request("get", f"{root}/index.html")
- dom = BeautifulSoup(resp.content.decode("utf-8"), "lxml")
- targets = dom.select("tr.provincetr a")
- run["province"] = len(targets)
- assert run["province"] > 0
- total["province"] += run["province"]
- for cur["province"], province in enumerate(targets):
- name, link = province.text, province["href"] # link: 2.html
- print(f"province <{name}> [{cur['province'] + 1:02}/{run['province']:02}] from {root}/{link}")
- res[link[0:2]] = {"name": name, "children": get_cities_of(root, link)}
- sleep(5)
- return res
- def main():
- root = "http://www.stats.gov.cn/sj/tjbz/tjyqhdmhcxhfdm/2022"
- # back = "http://www.tcmap.com.cn/"
- res_file, total_file, err_file = "main.json", "total.json", "error.json"
- res = get_provinces(root)
- print(total)
- save(total_file, total)
- print(errs)
- save(err_file, errs)
- print(res)
- save(res_file, res)
- if __name__ == "__main__":
- """
- province, city, county, town, village
- 省、市、县、镇、村
- """
- main()
|