package forecast import ( "context" "database/sql" "encoding/json" "fmt" "net/http" "net/url" "os" "sort" "strings" "time" "weatherstation/internal/database" ) // RunCMACLI 调用中国气象数据网 getSKStationInfo 接口, // 以固定参数获取数据,并在控制台输出“当前时间的未来三小时”(整点)的要素值。 // 仅打印,不写库。 func RunCMACLI(ctx context.Context) error { // 固定参数(可按需调整) staID := "59238" funitem := "1150101020" typeCode := "NWST" // 目标时间:默认从“下一个整点”开始,但后续会根据接口可用时次选择“接下来可用的3个未来时次” loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } now := time.Now().In(loc) base := now.Truncate(time.Hour) // 构造请求 form := url.Values{} form.Set("staId", staID) form.Set("funitemmenuid", funitem) form.Set("typeCode", typeCode) // 使用 getStationInfo(包含更长时段数据,含未来小时) req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode())) if err != nil { return err } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/") req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36") client := &http.Client{Timeout: 15 * time.Second} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("CMA 响应状态码异常: %d", resp.StatusCode) } // 响应结构:hover 字段标注各要素,list 为多要素时间序列 var payload struct { Hover string `json:"hover"` List [][][]interface{} `json:"list"` Value string `json:"value"` } if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { return err } // 解析 hover,确定各要素索引;若 hover 异常则回退为固定顺序:0雨,1温,2湿,3压,4风 labels := strings.Split(payload.Hover, ",") useHover := len(labels) == len(payload.List) && len(payload.List) >= 4 idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4 if useHover { r := findLabelIndex(labels, []string{"PRE_1h", "PRE"}) t := findLabelIndex(labels, []string{"TEM", "temperature"}) h := findLabelIndex(labels, []string{"RHU", "humidity"}) p := findLabelIndex(labels, []string{"PRS", "pressure"}) w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"}) if r >= 0 && t >= 0 && h >= 0 && w >= 0 { // PRS 可能为空表,索引允许>=0 idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w } } // 构建时间 -> 值 的查找表(按 Asia/Shanghai 解析时间戳 YYYYMMDDHHMMSS) // 每个要素一张表 tables := make([]map[time.Time]float64, len(payload.List)) for i := range payload.List { tables[i] = map[time.Time]float64{} for _, pair := range payload.List[i] { if len(pair) != 2 { continue } ts, _ := pair[0].(string) val := toFloat(pair[1]) if len(ts) != 14 { continue } t, err := time.ParseInLocation("20060102150405", ts, loc) if err != nil { continue } tables[i][t] = val } } // 统计可用时间范围(用于提示)和收集未来时次 var latest time.Time timeSet := map[time.Time]struct{}{} for i := range tables { for t := range tables[i] { if t.After(latest) { latest = t } if t.After(base) { // 未来时次 timeSet[t] = struct{}{} } } } // 选择“接下来可用的三个未来时次”(不强制为连续每小时) var targets []time.Time if len(timeSet) > 0 { // 排序并取前3 targets = make([]time.Time, 0, len(timeSet)) for t := range timeSet { targets = append(targets, t) } // 简单选择排序(避免引入 sort 包也没问题,但使用标准库更好) } if len(timeSet) > 0 { // 使用标准库排序 targets = sortTimesAsc(targets) if len(targets) > 3 { targets = targets[:3] } } else { // 回退到严格的下三个整点(若无则输出 NA) targets = []time.Time{base.Add(1 * time.Hour), base.Add(2 * time.Hour), base.Add(3 * time.Hour)} } // 输出 fmt.Fprintf(os.Stdout, "CMA next 3 hours (CST) station=%s\n", staID) fmt.Fprintln(os.Stdout, "Time, Rain(mm), Temp(°C), RHU(%), PRS(hPa), Wind(m/s)") for _, tt := range targets { var rain, temp, rhu, prs, wind string if idxRain >= 0 && idxRain < len(tables) { if v, ok := tables[idxRain][tt]; ok { rain = formatOrNA(v) } else { rain = "NA" } } else { rain = "NA" } if idxTemp >= 0 && idxTemp < len(tables) { if v, ok := tables[idxTemp][tt]; ok { temp = formatOrNA(v) } else { temp = "NA" } } else { temp = "NA" } if idxRHU >= 0 && idxRHU < len(tables) { if v, ok := tables[idxRHU][tt]; ok { rhu = formatOrNA(v) } else { rhu = "NA" } } else { rhu = "NA" } if idxPRS >= 0 && idxPRS < len(tables) { if v, ok := tables[idxPRS][tt]; ok { prs = formatOrNA(v) } else { prs = "NA" } } else { prs = "NA" } if idxWIN >= 0 && idxWIN < len(tables) { if v, ok := tables[idxWIN][tt]; ok { wind = formatOrNA(v) } else { wind = "NA" } } else { wind = "NA" } fmt.Fprintf(os.Stdout, "%s, %s, %s, %s, %s, %s\n", tt.Format("2006-01-02 15:00"), rain, temp, rhu, prs, wind) } if len(targets) > 0 && latest.Before(targets[0]) { fmt.Fprintf(os.Stdout, "Note: latest CMA data time is %s; no future values returned by this endpoint.\n", latest.Format("2006-01-02 15:04:05")) } return nil } func findLabelIndex(labels []string, keys []string) int { for i, s := range labels { for _, k := range keys { if strings.Contains(s, k) { return i } } } return -1 } func toFloat(v interface{}) float64 { switch t := v.(type) { case float64: return t case float32: return float64(t) case int: return float64(t) case int64: return float64(t) case json.Number: f, _ := t.Float64() return f default: return 0 } } func formatOrNA(v float64) string { // 使用默认格式,保留最多1-2位小数(按需要精简,这里用 %.2f) // 但如果值为0,仍然输出 0 return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.2f", v), "0"), ".") } func sortTimesAsc(ts []time.Time) []time.Time { sort.Slice(ts, func(i, j int) bool { return ts[i].Before(ts[j]) }) return ts } // RunCMAFetch 拉取一次CMA数据,并将“下一个整点开始的3个小时”写入 forecast_hourly。 // 特殊规则:所有站点共用同一份数据;缺失项以0填充;provider='cma'。 func RunCMAFetch(ctx context.Context) error { // 固定参数 staID := "59238" funitem := "1150101020" typeCode := "NWST" loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } issuedAt := time.Now().In(loc) base := issuedAt.Truncate(time.Hour) // 请求CMA form := url.Values{} form.Set("staId", staID) form.Set("funitemmenuid", funitem) form.Set("typeCode", typeCode) req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode())) if err != nil { return err } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/") req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36") client := &http.Client{Timeout: 15 * time.Second} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("CMA status=%d", resp.StatusCode) } var payload struct { Hover string `json:"hover"` List [][][]interface{} `json:"list"` Value string `json:"value"` } if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { return err } // 索引映射(容错) labels := strings.Split(payload.Hover, ",") useHover := len(labels) == len(payload.List) && len(payload.List) >= 4 idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4 if useHover { r := findLabelIndex(labels, []string{"PRE_1h", "PRE"}) t := findLabelIndex(labels, []string{"TEM", "temperature"}) h := findLabelIndex(labels, []string{"RHU", "humidity"}) p := findLabelIndex(labels, []string{"PRS", "pressure"}) w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"}) if r >= 0 && t >= 0 && h >= 0 && w >= 0 { idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w } } // 建表:要素 -> 时间 -> 值 tables := make([]map[time.Time]float64, len(payload.List)) for i := range payload.List { tables[i] = map[time.Time]float64{} for _, pair := range payload.List[i] { if len(pair) != 2 { continue } ts, _ := pair[0].(string) val := toFloat(pair[1]) if len(ts) != 14 { continue } t, err := time.ParseInLocation("20060102150405", ts, loc) if err != nil { continue } tables[i][t] = val } } // 从可用数据中选择“接下来可用的三个未来时次”(按接口实际返回) timeSet := map[time.Time]struct{}{} for i := range tables { for t := range tables[i] { if t.After(base) { timeSet[t] = struct{}{} } } } var targets []time.Time for t := range timeSet { targets = append(targets, t) } targets = sortTimesAsc(targets) if len(targets) > 3 { targets = targets[:3] } if len(targets) == 0 { // 无未来时次则不写库,避免写入“假想整点”。 return nil } // 读取站点列表(全部) db := database.GetDB() stationIDs, err := loadAllStationIDs(ctx, db) if err != nil { return err } // 写库(缺失置0;单位转换同其他provider) for _, ft := range targets { rain := getOrZero(tables, idxRain, ft) // mm temp := getOrZero(tables, idxTemp, ft) // °C rhu := getOrZero(tables, idxRHU, ft) // % prs := getOrZero(tables, idxPRS, ft) // hPa ws := getOrZero(tables, idxWIN, ft) // m/s gust := 0.0 // 未提供 wdir := 0.0 // 未提供 prob := 0.0 // 未提供 rainMmX1000 := int64(rain * 1000.0) tempCx100 := int64(temp * 100.0) humidityPct := int64(rhu) wsMsX1000 := int64(ws * 1000.0) gustMsX1000 := int64(gust * 1000.0) wdirDeg := int64(wdir) probPct := int64(prob) pressureHpaX100 := int64(prs * 100.0) for _, sid := range stationIDs { _ = upsertForecastWithProvider(ctx, db, sid, "cma", issuedAt, ft, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) } } return nil } func getOrZero(tables []map[time.Time]float64, idx int, t time.Time) float64 { if idx >= 0 && idx < len(tables) { if v, ok := tables[idx][t]; ok { return v } } return 0 } // 加载所有站点ID func loadAllStationIDs(ctx context.Context, db *sql.DB) ([]string, error) { rows, err := db.QueryContext(ctx, `SELECT station_id FROM stations`) if err != nil { return nil, err } defer rows.Close() var list []string for rows.Next() { var s string if err := rows.Scan(&s); err == nil { list = append(list, s) } } return list, nil }