下载 ERA5 数据时如何实现断点续传
我们在下载 ERA5 数据的时候,大部分人应该都会遵守 cds 网站的指导,在配置完页面以后使用网站自动生成的 Python 示例代码进行下载(或者做一些外围修改,比如改为以日为循环或者改为多线程/进程,而核心的下载仍然使用 cdsapi 的方法)。
但是这种下载方法有一个问题,那就是它做不到断点续传。如果 cds 按照我们请求的表单生成的单个文件比较大的话,那么就很容易出现因为网络不稳定而导致的中断。一旦出现中断,如果直接在代码里增加简单的重试逻辑再次请求下载,那么 cds 就会重新生成一个新的链接给我们下载那么我们就需要从头再来(如果以几百 K 的速度辛辛苦苦下载到 99% 然后连接中断需要重新下载,是不是就很崩溃?)。如果反复出现网络波动,那么程序就会一次又一次地重新提交请求然后重新下载,这样不但浪费时间,也会给 cds 的服务器带来额外的压力。
所以我们需要用一个方法来实现断点续传。即使在下载的过程中链接中断了我们重试时,也能使用原有的链接从中断点继续下载,直到文件下载完成。
要实现断点续传功能,我们可以使用 Python 自己编写函数实现。但我觉得麻烦,其实可以使用现成的成熟工具 wget 来完成下载的断点续传。wget 是跨平台的工具,它既既支持 Linux 也支持 Windows 系统。具体安装细节可以自行查阅相关资料。
我们需要的工具包:
- wget:不同平台安装方式不一样请自行搜索,注意不是
pip install wget
这种方式安装。 - retrying:
pip install retrying
- cdsapi:
pip install cdsapi
下面这个例子是下载 ERA5 高空某些变量(用于跑盘古)数据的脚本,单次请求下载的文件大约 2.7GB 左右。当然 .cdsapirc
文件的配置还是需要的,这里就不再赘述了。
import os
import cdsapi
import zipfile
from multiprocessing import Pool
from subprocess import run
from retrying import retry
dataset = "reanalysis-era5-pressure-levels"
request_template = {
"product_type": ["reanalysis"],
"variable": [
"geopotential",
"specific_humidity",
"temperature",
"u_component_of_wind",
"v_component_of_wind"
],
"year": ["2024"],
"month": [
"01", "02", "03",
"04", "05", "06",
"07", "08", "09",
"10", "11", "12"
],
"day": [
"01", "02", "03",
"04", "05", "06",
"07", "08", "09",
"10", "11", "12",
"13", "14", "15",
"16", "17", "18",
"19", "20", "21",
"22", "23", "24",
"25", "26", "27",
"28", "29", "30",
"31"
],
"time": [
"00:00", "01:00", "02:00",
"03:00", "04:00", "05:00",
"06:00", "07:00", "08:00",
"09:00", "10:00", "11:00",
"12:00", "13:00", "14:00",
"15:00", "16:00", "17:00",
"18:00", "19:00", "20:00",
"21:00", "22:00", "23:00"
],
"pressure_level": [
"50", "100", "150",
"200", "250", "300",
"400", "500", "600",
"700", "850", "925",
"1000"
],
"data_format": "grib",
"download_format": "zip"
}
def check(zip_file):
"""通过 zip 解压的方式验证下载的文件是完整的"""
directory = os.path.dirname(zip_file)
try:
with zipfile.ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(directory)
except zipfile.BadZipFile:
# 如果文件无效则删除
os.remove(zip_file)
return False
else:
return True
@retry # 使用 retrying 库的 retry 装饰器,可以在函数执行失败时自动重试
def download_by_wget(url, savefp):
"""使用 wget 下载文件,支持断点续传"""
cmd = f"wget -O {savefp} -c {url}" # 这是一个简单的 wget 命令,-O 参数指定保存的文件名,-c 参数表示断点续传
run(cmd, shell=True, check=True) # 使用 run 函数执行命令,shell=True 表示使用 shell 执行,check=True 表示如果命令执行失败则抛出异常
def download_data(month, day):
client = cdsapi.Client()
try:
request = request_template.copy()
request["month"] = [month]
request["day"] = [f"{day:02d}"]
savedir = "/data/reanalysis/era5/upper/grib/zqtuv/2024/"
os.makedirs(savedir, exist_ok=True)
savefp = os.path.join(savedir, f"era5_{month}_{day:02d}.grib.zip")
if os.path.exists(savefp):
if os.path.getsize(savefp) < 2600 * 1024 * 1024: # 如果文件小于 2.6GB 则删除,因为这个文件大小是正常的文件大小
os.remove(savefp)
else:
print(f"File {savefp} already exists and is of sufficient size, skipping")
return
# client.retrieve(dataset, request).download(savefp) # 使用官方示例脚本的下载方法,该方法无法完成断点续传的功能
response = client.retrieve(dataset, request)
url = response.location # response 对象的 location 属性保存的其实是下载的 URL,也不知道为什么要这样取名
print(f"URL: {url}") # 打印下载链接
download_by_wget(url, savefp)
check_result = check(savefp)
if not check_result:
print(f"{savefp} abnormal, removed")
except Exception as e:
print(f"Failed to download data for {month}-{day:02d}: {e}")
else:
print(f"Downloaded data for {month}-{day:02d}")
if __name__ == "__main__":
tasks = [(month, day) for month in request_template["month"] for day in range(1, 32)]
with Pool(processes=4) as pool: # 使用多进程下载,可以加快下载速度
pool.starmap(download_data, tasks)
这个脚本主要就是在发送数据请求之后,不直接用 cdsapi 提供的下载方法,而是获取到下载链接(cds 给我们生成下载链接在一定时效内是可以反复下载的,cdsapi 的下载本质上也是从这个链接进行下载)后使用 wget
命令下载。wget
命令的 -c
参数即是启动断点续传,它能够自动计算本地已存在文件的数据位长度(断点位置),然后从服务端下载时自动寻址到断点位置并进行后续的下载,该命令具有幂等性,可以反复执行直到文件下载完整为止。
上述代码是在 Linux 环境下运行的,未在 Windows 环境下做过测试,但原理相通。