之前的逆地理编码需要使用高德等API,受限于请求次数和请求频率,对于大批量的需求无法满足。因此基于中国行政区划的GIS数据,开发了一套离线版python逆地理编码程序。
运行前需要将中国行政区划的shp文件(省、市、区县等)放置在区划文件夹下。读取的内容为excel的经度纬度,输出“省份-市-区县”三级数据
代码如下:
import geopandas as gpd
import sys
import pandas as pd
from shapely.geometry import Point
def check_column_names(gdf, file_name):
"""检查数据框的列名"""
print(f"{file_name} 的列名: {list(gdf.columns)}")
def get_location_by_coordinate(longitude, latitude, province_gdf, city_gdf, county_gdf):
"""根据经纬度获取对应的省、市、县"""
# 创建点对象
point = Point(longitude, latitude)
# 初始化结果
province = None
city = None
county = None
# 1. 查找省份
for idx, row in province_gdf.iterrows():
polygon = row['geometry']
if polygon.contains(point):
province = row.get('省', None)
break
# 2. 查找市
if province:
for idx, row in city_gdf.iterrows():
polygon = row['geometry']
if polygon.contains(point):
city = row.get('市', None)
break
# 3. 查找县
if city:
for idx, row in county_gdf.iterrows():
polygon = row['geometry']
if polygon.contains(point):
# 使用正确的列名'NAME'获取县名
county = row.get('NAME', None)
break
# 构建完整地理位置字符串
location_parts = []
if province:
location_parts.append(province)
if city:
location_parts.append(city)
if county:
location_parts.append(county)
full_location = '、'.join(location_parts) if location_parts else '未知'
return province, city, county, full_location
def process_excel_file(excel_path, province_gdf, city_gdf, county_gdf):
"""处理Excel文件中的坐标数据"""
try:
# 读取Excel文件
print(f"正在读取Excel文件: {excel_path}")
# 使用openpyxl引擎读取Excel文件
df = pd.read_excel(excel_path, engine='openpyxl')
print(f"Excel文件读取成功! 共包含 {len(df)} 行数据")
print(f"Excel文件列名: {list(df.columns)}")
# 检查'经度'和'纬度'列是否存在
if '经度' not in df.columns or '纬度' not in df.columns:
print("错误: Excel文件中缺少'经度'列或'纬度'列")
return False
# 初始化L列
df['完整地理位置'] = ''
print("已初始化'完整地理位置'列")
# 批量处理每一行的坐标
total_rows = len(df)
print(f"开始处理 {total_rows} 行数据...")
# 定义保存间隔
save_interval = 10000
# 保存结果文件路径
output_path = excel_path.replace('.xlsx', '_结果.xlsx')
for idx, row in df.iterrows():
# 显示进度
if (idx + 1) % 100 == 0 or (idx + 1) == total_rows:
print(f"处理进度: {idx + 1}/{total_rows}")
# 每处理save_interval行保存一次结果
if (idx + 1) % save_interval == 0:
print(f"\n已处理 {idx + 1} 行,正在保存中间结果...")
df.to_excel(output_path, index=False, engine='openpyxl')
print(f"中间结果已保存到: {output_path}")
try:
longitude = float(row['经度'])
latitude = float(row['纬度'])
# 获取地理位置
_, _, _, full_location = get_location_by_coordinate(longitude, latitude, province_gdf, city_gdf, county_gdf)
# 写入结果到L列
df.at[idx, '完整地理位置'] = full_location
except Exception as e:
# 处理错误,跳过该行
if (idx + 1) % 1000 == 0:
print(f"处理第 {idx + 1} 行时出错: {e}")
df.at[idx, '完整地理位置'] = '坐标无效'
continue
# 最终保存结果
print(f"\n处理完成! 正在保存最终结果...")
df.to_excel(output_path, index=False, engine='openpyxl')
print(f"最终结果已保存到: {output_path}")
return True
except Exception as e:
print(f"处理Excel文件时出错: {e}")
import traceback
traceback.print_exc()
return False
def main():
# 文件路径
province_shp = r'C:\Users\47215\Desktop\逆地理编码\区划\省.shp'
city_shp = r'C:\Users\47215\Desktop\逆地理编码\区划\市.shp'
county_shp = r'C:\Users\47215\Desktop\逆地理编码\区划\县.shp'
try:
# 读取shp文件
print("正在加载地理边界数据...")
print("加载省份数据...")
province_gdf = gpd.read_file(province_shp)
print(f"省份数据加载成功! 共包含 {len(province_gdf)} 个省份")
print("加载城市数据...")
city_gdf = gpd.read_file(city_shp)
print(f"城市数据加载成功! 共包含 {len(city_gdf)} 个城市")
print("加载县数据...")
county_gdf = gpd.read_file(county_shp)
print(f"县数据加载成功! 共包含 {len(county_gdf)} 个县")
print("数据加载完成!")
# 检查命令行参数
if len(sys.argv) > 1 and sys.argv[1] == '--batch':
# 批量处理模式
excel_path = r'C:\Users\47215\Desktop\逆地理编码\全国矿产地分布.xlsx'
process_excel_file(excel_path, province_gdf, city_gdf, county_gdf)
else:
# 单坐标查询模式
# 获取用户输入的经纬度
while True:
try:
longitude = float(input("\n请输入经度: "))
latitude = float(input("请输入纬度: "))
break
except ValueError:
print("输入错误,请输入有效的数字")
# 判断点落在哪个地理区域
province, city, county, full_location = get_location_by_coordinate(longitude, latitude, province_gdf, city_gdf, county_gdf)
# 输出结果
print(f"\n坐标: ({longitude}, {latitude})")
print(f"所属省份: {province if province else '未知'}")
print(f"所属城市: {city if city else '未知'}")
print(f"所属县: {county if county else '未知'}")
print(f"完整地理位置: {full_location}")
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
if __name__ == "__main__":
main()