feat: 新增中央气象台数据

This commit is contained in:
yarnom 2025-09-11 14:57:07 +08:00
parent a612b511b2
commit 93533aa76c
4 changed files with 421 additions and 2 deletions

View File

@ -31,6 +31,8 @@ func main() {
// 预报抓取
var forecastOnly = flag.Bool("forecast_only", false, "仅执行一次open-meteo拉取并退出")
var caiyunOnly = flag.Bool("caiyun_only", false, "仅执行一次彩云拉取并退出")
var cmaCLI = flag.Bool("cma_cli", false, "仅执行一次CMA接口抓取并打印未来三小时")
var cmaOnly = flag.Bool("cma_only", false, "仅执行一次CMA拉取并退出")
var forecastDay = flag.String("forecast_day", "", "按日期抓取当天0点到当前时间+3h格式YYYY-MM-DD")
// 历史数据补完
var historicalOnly = flag.Bool("historical_only", false, "仅执行历史数据补完并退出")
@ -91,6 +93,23 @@ func main() {
return
}
// 单次 CMA 拉取(固定参数)写库并退出
if *cmaOnly {
if err := forecast.RunCMAFetch(context.Background()); err != nil {
log.Fatalf("CMA 拉取失败: %v", err)
}
log.Println("CMA 拉取完成")
return
}
// 单次 CMA 拉取(固定参数)并打印三小时
if *cmaCLI {
if err := forecast.RunCMACLI(context.Background()); err != nil {
log.Fatalf("CMA 拉取失败: %v", err)
}
return
}
// 历史CSV范围导出
if *exportRangeOnly {
if *exportStart == "" || *exportEnd == "" {

385
internal/forecast/cma.go Normal file
View File

@ -0,0 +1,385 @@
package forecast
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"
"weatherstation/internal/database"
)
// RunCMACLI 调用中国气象数据网 getSKStationInfo 接口,
// 以固定参数获取数据,并在控制台输出“当前时间的未来三小时”(整点)的要素值。
// 仅打印,不写库。
func RunCMACLI(ctx context.Context) error {
// 固定参数(可按需调整)
staID := "59238"
funitem := "1150101020"
typeCode := "NWST"
// 目标时间默认从“下一个整点”开始但后续会根据接口可用时次选择“接下来可用的3个未来时次”
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
now := time.Now().In(loc)
base := now.Truncate(time.Hour)
// 构造请求
form := url.Values{}
form.Set("staId", staID)
form.Set("funitemmenuid", funitem)
form.Set("typeCode", typeCode)
// 使用 getStationInfo包含更长时段数据含未来小时
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode()))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
client := &http.Client{Timeout: 15 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CMA 响应状态码异常: %d", resp.StatusCode)
}
// 响应结构hover 字段标注各要素list 为多要素时间序列
var payload struct {
Hover string `json:"hover"`
List [][][]interface{} `json:"list"`
Value string `json:"value"`
}
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
return err
}
// 解析 hover确定各要素索引若 hover 异常则回退为固定顺序0雨,1温,2湿,3压,4风
labels := strings.Split(payload.Hover, ",")
useHover := len(labels) == len(payload.List) && len(payload.List) >= 4
idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4
if useHover {
r := findLabelIndex(labels, []string{"PRE_1h", "PRE"})
t := findLabelIndex(labels, []string{"TEM", "temperature"})
h := findLabelIndex(labels, []string{"RHU", "humidity"})
p := findLabelIndex(labels, []string{"PRS", "pressure"})
w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"})
if r >= 0 && t >= 0 && h >= 0 && w >= 0 { // PRS 可能为空表,索引允许>=0
idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w
}
}
// 构建时间 -> 值 的查找表(按 Asia/Shanghai 解析时间戳 YYYYMMDDHHMMSS
// 每个要素一张表
tables := make([]map[time.Time]float64, len(payload.List))
for i := range payload.List {
tables[i] = map[time.Time]float64{}
for _, pair := range payload.List[i] {
if len(pair) != 2 {
continue
}
ts, _ := pair[0].(string)
val := toFloat(pair[1])
if len(ts) != 14 {
continue
}
t, err := time.ParseInLocation("20060102150405", ts, loc)
if err != nil {
continue
}
tables[i][t] = val
}
}
// 统计可用时间范围(用于提示)和收集未来时次
var latest time.Time
timeSet := map[time.Time]struct{}{}
for i := range tables {
for t := range tables[i] {
if t.After(latest) {
latest = t
}
if t.After(base) { // 未来时次
timeSet[t] = struct{}{}
}
}
}
// 选择“接下来可用的三个未来时次”(不强制为连续每小时)
var targets []time.Time
if len(timeSet) > 0 {
// 排序并取前3
targets = make([]time.Time, 0, len(timeSet))
for t := range timeSet {
targets = append(targets, t)
}
// 简单选择排序(避免引入 sort 包也没问题,但使用标准库更好)
}
if len(timeSet) > 0 {
// 使用标准库排序
targets = sortTimesAsc(targets)
if len(targets) > 3 {
targets = targets[:3]
}
} else {
// 回退到严格的下三个整点(若无则输出 NA
targets = []time.Time{base.Add(1 * time.Hour), base.Add(2 * time.Hour), base.Add(3 * time.Hour)}
}
// 输出
fmt.Fprintf(os.Stdout, "CMA next 3 hours (CST) station=%s\n", staID)
fmt.Fprintln(os.Stdout, "Time, Rain(mm), Temp(°C), RHU(%), PRS(hPa), Wind(m/s)")
for _, tt := range targets {
var rain, temp, rhu, prs, wind string
if idxRain >= 0 && idxRain < len(tables) {
if v, ok := tables[idxRain][tt]; ok {
rain = formatOrNA(v)
} else {
rain = "NA"
}
} else {
rain = "NA"
}
if idxTemp >= 0 && idxTemp < len(tables) {
if v, ok := tables[idxTemp][tt]; ok {
temp = formatOrNA(v)
} else {
temp = "NA"
}
} else {
temp = "NA"
}
if idxRHU >= 0 && idxRHU < len(tables) {
if v, ok := tables[idxRHU][tt]; ok {
rhu = formatOrNA(v)
} else {
rhu = "NA"
}
} else {
rhu = "NA"
}
if idxPRS >= 0 && idxPRS < len(tables) {
if v, ok := tables[idxPRS][tt]; ok {
prs = formatOrNA(v)
} else {
prs = "NA"
}
} else {
prs = "NA"
}
if idxWIN >= 0 && idxWIN < len(tables) {
if v, ok := tables[idxWIN][tt]; ok {
wind = formatOrNA(v)
} else {
wind = "NA"
}
} else {
wind = "NA"
}
fmt.Fprintf(os.Stdout, "%s, %s, %s, %s, %s, %s\n", tt.Format("2006-01-02 15:00"), rain, temp, rhu, prs, wind)
}
if len(targets) > 0 && latest.Before(targets[0]) {
fmt.Fprintf(os.Stdout, "Note: latest CMA data time is %s; no future values returned by this endpoint.\n", latest.Format("2006-01-02 15:04:05"))
}
return nil
}
func findLabelIndex(labels []string, keys []string) int {
for i, s := range labels {
for _, k := range keys {
if strings.Contains(s, k) {
return i
}
}
}
return -1
}
func toFloat(v interface{}) float64 {
switch t := v.(type) {
case float64:
return t
case float32:
return float64(t)
case int:
return float64(t)
case int64:
return float64(t)
case json.Number:
f, _ := t.Float64()
return f
default:
return 0
}
}
func formatOrNA(v float64) string {
// 使用默认格式保留最多1-2位小数按需要精简这里用 %.2f
// 但如果值为0仍然输出 0
return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.2f", v), "0"), ".")
}
func sortTimesAsc(ts []time.Time) []time.Time {
sort.Slice(ts, func(i, j int) bool { return ts[i].Before(ts[j]) })
return ts
}
// RunCMAFetch 拉取一次CMA数据并将“下一个整点开始的3个小时”写入 forecast_hourly。
// 特殊规则所有站点共用同一份数据缺失项以0填充provider='cma'。
func RunCMAFetch(ctx context.Context) error {
// 固定参数
staID := "59238"
funitem := "1150101020"
typeCode := "NWST"
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
issuedAt := time.Now().In(loc)
startHour := issuedAt.Truncate(time.Hour)
targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)}
// 请求CMA
form := url.Values{}
form.Set("staId", staID)
form.Set("funitemmenuid", funitem)
form.Set("typeCode", typeCode)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode()))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
client := &http.Client{Timeout: 15 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("CMA status=%d", resp.StatusCode)
}
var payload struct {
Hover string `json:"hover"`
List [][][]interface{} `json:"list"`
Value string `json:"value"`
}
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
return err
}
// 索引映射(容错)
labels := strings.Split(payload.Hover, ",")
useHover := len(labels) == len(payload.List) && len(payload.List) >= 4
idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4
if useHover {
r := findLabelIndex(labels, []string{"PRE_1h", "PRE"})
t := findLabelIndex(labels, []string{"TEM", "temperature"})
h := findLabelIndex(labels, []string{"RHU", "humidity"})
p := findLabelIndex(labels, []string{"PRS", "pressure"})
w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"})
if r >= 0 && t >= 0 && h >= 0 && w >= 0 {
idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w
}
}
// 建表:要素 -> 时间 -> 值
tables := make([]map[time.Time]float64, len(payload.List))
for i := range payload.List {
tables[i] = map[time.Time]float64{}
for _, pair := range payload.List[i] {
if len(pair) != 2 {
continue
}
ts, _ := pair[0].(string)
val := toFloat(pair[1])
if len(ts) != 14 {
continue
}
t, err := time.ParseInLocation("20060102150405", ts, loc)
if err != nil {
continue
}
tables[i][t] = val
}
}
// 读取站点列表(全部)
db := database.GetDB()
stationIDs, err := loadAllStationIDs(ctx, db)
if err != nil {
return err
}
// 写库缺失置0单位转换同其他provider
for _, ft := range targets {
rain := getOrZero(tables, idxRain, ft) // mm
temp := getOrZero(tables, idxTemp, ft) // °C
rhu := getOrZero(tables, idxRHU, ft) // %
prs := getOrZero(tables, idxPRS, ft) // hPa
ws := getOrZero(tables, idxWIN, ft) // m/s
gust := 0.0 // 未提供
wdir := 0.0 // 未提供
prob := 0.0 // 未提供
rainMmX1000 := int64(rain * 1000.0)
tempCx100 := int64(temp * 100.0)
humidityPct := int64(rhu)
wsMsX1000 := int64(ws * 1000.0)
gustMsX1000 := int64(gust * 1000.0)
wdirDeg := int64(wdir)
probPct := int64(prob)
pressureHpaX100 := int64(prs * 100.0)
for _, sid := range stationIDs {
_ = upsertForecastWithProvider(ctx, db, sid, "cma", issuedAt, ft,
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100)
}
}
return nil
}
func getOrZero(tables []map[time.Time]float64, idx int, t time.Time) float64 {
if idx >= 0 && idx < len(tables) {
if v, ok := tables[idx][t]; ok {
return v
}
}
return 0
}
// 加载所有站点ID
func loadAllStationIDs(ctx context.Context, db *sql.DB) ([]string, error) {
rows, err := db.QueryContext(ctx, `SELECT station_id FROM stations`)
if err != nil {
return nil, err
}
defer rows.Close()
var list []string
for rows.Next() {
var s string
if err := rows.Scan(&s); err == nil {
list = append(list, s)
}
}
return list, nil
}

View File

@ -178,6 +178,20 @@ func StartUDPServer() error {
}
}()
// 后台定时每小时拉取CMA全站共用一份数据缺失填0
go func() {
for {
now := time.Now()
next := now.Truncate(time.Hour).Add(time.Hour)
time.Sleep(time.Until(next))
if err := forecast.RunCMAFetch(context.Background()); err != nil {
log.Printf("cma 定时拉取失败: %v", err)
} else {
log.Printf("cma 定时拉取完成")
}
}
}()
for {
n, addr, err := conn.ReadFrom(buffer)
if err != nil {

View File

@ -479,8 +479,9 @@
<select id="forecastProvider" class="px-2 py-1 border border-gray-300 rounded text-sm">
<option value="">不显示预报</option>
<option value="open-meteo">欧洲气象局</option>
<option value="caiyun" selected>中央气象台</option>
<option value="caiyun" selected>彩云</option>
<option value="imdroid">英卓</option>
<option value="cma">中央气象台</option>
</select>
</div>