diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index b86cca3..b380726 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -31,6 +31,8 @@ func main() { // 预报抓取 var forecastOnly = flag.Bool("forecast_only", false, "仅执行一次open-meteo拉取并退出") var caiyunOnly = flag.Bool("caiyun_only", false, "仅执行一次彩云拉取并退出") + var cmaCLI = flag.Bool("cma_cli", false, "仅执行一次CMA接口抓取并打印未来三小时") + var cmaOnly = flag.Bool("cma_only", false, "仅执行一次CMA拉取并退出") var forecastDay = flag.String("forecast_day", "", "按日期抓取当天0点到当前时间+3h(格式YYYY-MM-DD)") // 历史数据补完 var historicalOnly = flag.Bool("historical_only", false, "仅执行历史数据补完并退出") @@ -91,6 +93,23 @@ func main() { return } + // 单次 CMA 拉取(固定参数)写库并退出 + if *cmaOnly { + if err := forecast.RunCMAFetch(context.Background()); err != nil { + log.Fatalf("CMA 拉取失败: %v", err) + } + log.Println("CMA 拉取完成") + return + } + + // 单次 CMA 拉取(固定参数)并打印三小时 + if *cmaCLI { + if err := forecast.RunCMACLI(context.Background()); err != nil { + log.Fatalf("CMA 拉取失败: %v", err) + } + return + } + // 历史CSV范围导出 if *exportRangeOnly { if *exportStart == "" || *exportEnd == "" { diff --git a/internal/forecast/cma.go b/internal/forecast/cma.go new file mode 100644 index 0000000..036d2f1 --- /dev/null +++ b/internal/forecast/cma.go @@ -0,0 +1,385 @@ +package forecast + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "sort" + "strings" + "time" + "weatherstation/internal/database" +) + +// RunCMACLI 调用中国气象数据网 getSKStationInfo 接口, +// 以固定参数获取数据,并在控制台输出“当前时间的未来三小时”(整点)的要素值。 +// 仅打印,不写库。 +func RunCMACLI(ctx context.Context) error { + // 固定参数(可按需调整) + staID := "59238" + funitem := "1150101020" + typeCode := "NWST" + + // 目标时间:默认从“下一个整点”开始,但后续会根据接口可用时次选择“接下来可用的3个未来时次” + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + now := time.Now().In(loc) + base := now.Truncate(time.Hour) + + // 构造请求 + form := url.Values{} + form.Set("staId", staID) + form.Set("funitemmenuid", funitem) + form.Set("typeCode", typeCode) + + // 使用 getStationInfo(包含更长时段数据,含未来小时) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/") + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36") + + client := &http.Client{Timeout: 15 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CMA 响应状态码异常: %d", resp.StatusCode) + } + + // 响应结构:hover 字段标注各要素,list 为多要素时间序列 + var payload struct { + Hover string `json:"hover"` + List [][][]interface{} `json:"list"` + Value string `json:"value"` + } + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { + return err + } + + // 解析 hover,确定各要素索引;若 hover 异常则回退为固定顺序:0雨,1温,2湿,3压,4风 + labels := strings.Split(payload.Hover, ",") + useHover := len(labels) == len(payload.List) && len(payload.List) >= 4 + idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4 + if useHover { + r := findLabelIndex(labels, []string{"PRE_1h", "PRE"}) + t := findLabelIndex(labels, []string{"TEM", "temperature"}) + h := findLabelIndex(labels, []string{"RHU", "humidity"}) + p := findLabelIndex(labels, []string{"PRS", "pressure"}) + w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"}) + if r >= 0 && t >= 0 && h >= 0 && w >= 0 { // PRS 可能为空表,索引允许>=0 + idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w + } + } + + // 构建时间 -> 值 的查找表(按 Asia/Shanghai 解析时间戳 YYYYMMDDHHMMSS) + // 每个要素一张表 + tables := make([]map[time.Time]float64, len(payload.List)) + for i := range payload.List { + tables[i] = map[time.Time]float64{} + for _, pair := range payload.List[i] { + if len(pair) != 2 { + continue + } + ts, _ := pair[0].(string) + val := toFloat(pair[1]) + if len(ts) != 14 { + continue + } + t, err := time.ParseInLocation("20060102150405", ts, loc) + if err != nil { + continue + } + tables[i][t] = val + } + } + + // 统计可用时间范围(用于提示)和收集未来时次 + var latest time.Time + timeSet := map[time.Time]struct{}{} + for i := range tables { + for t := range tables[i] { + if t.After(latest) { + latest = t + } + if t.After(base) { // 未来时次 + timeSet[t] = struct{}{} + } + } + } + + // 选择“接下来可用的三个未来时次”(不强制为连续每小时) + var targets []time.Time + if len(timeSet) > 0 { + // 排序并取前3 + targets = make([]time.Time, 0, len(timeSet)) + for t := range timeSet { + targets = append(targets, t) + } + // 简单选择排序(避免引入 sort 包也没问题,但使用标准库更好) + } + + if len(timeSet) > 0 { + // 使用标准库排序 + targets = sortTimesAsc(targets) + if len(targets) > 3 { + targets = targets[:3] + } + } else { + // 回退到严格的下三个整点(若无则输出 NA) + targets = []time.Time{base.Add(1 * time.Hour), base.Add(2 * time.Hour), base.Add(3 * time.Hour)} + } + + // 输出 + fmt.Fprintf(os.Stdout, "CMA next 3 hours (CST) station=%s\n", staID) + fmt.Fprintln(os.Stdout, "Time, Rain(mm), Temp(°C), RHU(%), PRS(hPa), Wind(m/s)") + for _, tt := range targets { + var rain, temp, rhu, prs, wind string + if idxRain >= 0 && idxRain < len(tables) { + if v, ok := tables[idxRain][tt]; ok { + rain = formatOrNA(v) + } else { + rain = "NA" + } + } else { + rain = "NA" + } + if idxTemp >= 0 && idxTemp < len(tables) { + if v, ok := tables[idxTemp][tt]; ok { + temp = formatOrNA(v) + } else { + temp = "NA" + } + } else { + temp = "NA" + } + if idxRHU >= 0 && idxRHU < len(tables) { + if v, ok := tables[idxRHU][tt]; ok { + rhu = formatOrNA(v) + } else { + rhu = "NA" + } + } else { + rhu = "NA" + } + if idxPRS >= 0 && idxPRS < len(tables) { + if v, ok := tables[idxPRS][tt]; ok { + prs = formatOrNA(v) + } else { + prs = "NA" + } + } else { + prs = "NA" + } + if idxWIN >= 0 && idxWIN < len(tables) { + if v, ok := tables[idxWIN][tt]; ok { + wind = formatOrNA(v) + } else { + wind = "NA" + } + } else { + wind = "NA" + } + fmt.Fprintf(os.Stdout, "%s, %s, %s, %s, %s, %s\n", tt.Format("2006-01-02 15:00"), rain, temp, rhu, prs, wind) + } + + if len(targets) > 0 && latest.Before(targets[0]) { + fmt.Fprintf(os.Stdout, "Note: latest CMA data time is %s; no future values returned by this endpoint.\n", latest.Format("2006-01-02 15:04:05")) + } + + return nil +} + +func findLabelIndex(labels []string, keys []string) int { + for i, s := range labels { + for _, k := range keys { + if strings.Contains(s, k) { + return i + } + } + } + return -1 +} + +func toFloat(v interface{}) float64 { + switch t := v.(type) { + case float64: + return t + case float32: + return float64(t) + case int: + return float64(t) + case int64: + return float64(t) + case json.Number: + f, _ := t.Float64() + return f + default: + return 0 + } +} + +func formatOrNA(v float64) string { + // 使用默认格式,保留最多1-2位小数(按需要精简,这里用 %.2f) + // 但如果值为0,仍然输出 0 + return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.2f", v), "0"), ".") +} + +func sortTimesAsc(ts []time.Time) []time.Time { + sort.Slice(ts, func(i, j int) bool { return ts[i].Before(ts[j]) }) + return ts +} + +// RunCMAFetch 拉取一次CMA数据,并将“下一个整点开始的3个小时”写入 forecast_hourly。 +// 特殊规则:所有站点共用同一份数据;缺失项以0填充;provider='cma'。 +func RunCMAFetch(ctx context.Context) error { + // 固定参数 + staID := "59238" + funitem := "1150101020" + typeCode := "NWST" + + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + issuedAt := time.Now().In(loc) + startHour := issuedAt.Truncate(time.Hour) + targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)} + + // 请求CMA + form := url.Values{} + form.Set("staId", staID) + form.Set("funitemmenuid", funitem) + form.Set("typeCode", typeCode) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/") + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36") + client := &http.Client{Timeout: 15 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CMA status=%d", resp.StatusCode) + } + + var payload struct { + Hover string `json:"hover"` + List [][][]interface{} `json:"list"` + Value string `json:"value"` + } + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { + return err + } + + // 索引映射(容错) + labels := strings.Split(payload.Hover, ",") + useHover := len(labels) == len(payload.List) && len(payload.List) >= 4 + idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4 + if useHover { + r := findLabelIndex(labels, []string{"PRE_1h", "PRE"}) + t := findLabelIndex(labels, []string{"TEM", "temperature"}) + h := findLabelIndex(labels, []string{"RHU", "humidity"}) + p := findLabelIndex(labels, []string{"PRS", "pressure"}) + w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"}) + if r >= 0 && t >= 0 && h >= 0 && w >= 0 { + idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w + } + } + + // 建表:要素 -> 时间 -> 值 + tables := make([]map[time.Time]float64, len(payload.List)) + for i := range payload.List { + tables[i] = map[time.Time]float64{} + for _, pair := range payload.List[i] { + if len(pair) != 2 { + continue + } + ts, _ := pair[0].(string) + val := toFloat(pair[1]) + if len(ts) != 14 { + continue + } + t, err := time.ParseInLocation("20060102150405", ts, loc) + if err != nil { + continue + } + tables[i][t] = val + } + } + + // 读取站点列表(全部) + db := database.GetDB() + stationIDs, err := loadAllStationIDs(ctx, db) + if err != nil { + return err + } + + // 写库(缺失置0;单位转换同其他provider) + for _, ft := range targets { + rain := getOrZero(tables, idxRain, ft) // mm + temp := getOrZero(tables, idxTemp, ft) // °C + rhu := getOrZero(tables, idxRHU, ft) // % + prs := getOrZero(tables, idxPRS, ft) // hPa + ws := getOrZero(tables, idxWIN, ft) // m/s + gust := 0.0 // 未提供 + wdir := 0.0 // 未提供 + prob := 0.0 // 未提供 + + rainMmX1000 := int64(rain * 1000.0) + tempCx100 := int64(temp * 100.0) + humidityPct := int64(rhu) + wsMsX1000 := int64(ws * 1000.0) + gustMsX1000 := int64(gust * 1000.0) + wdirDeg := int64(wdir) + probPct := int64(prob) + pressureHpaX100 := int64(prs * 100.0) + + for _, sid := range stationIDs { + _ = upsertForecastWithProvider(ctx, db, sid, "cma", issuedAt, ft, + rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) + } + } + return nil +} + +func getOrZero(tables []map[time.Time]float64, idx int, t time.Time) float64 { + if idx >= 0 && idx < len(tables) { + if v, ok := tables[idx][t]; ok { + return v + } + } + return 0 +} + +// 加载所有站点ID +func loadAllStationIDs(ctx context.Context, db *sql.DB) ([]string, error) { + rows, err := db.QueryContext(ctx, `SELECT station_id FROM stations`) + if err != nil { + return nil, err + } + defer rows.Close() + var list []string + for rows.Next() { + var s string + if err := rows.Scan(&s); err == nil { + list = append(list, s) + } + } + return list, nil +} diff --git a/internal/server/udp.go b/internal/server/udp.go index 23431b8..5a71ddb 100644 --- a/internal/server/udp.go +++ b/internal/server/udp.go @@ -178,6 +178,20 @@ func StartUDPServer() error { } }() + // 后台定时:每小时拉取CMA(全站,共用一份数据,缺失填0) + go func() { + for { + now := time.Now() + next := now.Truncate(time.Hour).Add(time.Hour) + time.Sleep(time.Until(next)) + if err := forecast.RunCMAFetch(context.Background()); err != nil { + log.Printf("cma 定时拉取失败: %v", err) + } else { + log.Printf("cma 定时拉取完成") + } + } + }() + for { n, addr, err := conn.ReadFrom(buffer) if err != nil { diff --git a/templates/index.html b/templates/index.html index c34e574..80f1d24 100644 --- a/templates/index.html +++ b/templates/index.html @@ -479,8 +479,9 @@ @@ -569,4 +570,4 @@ - \ No newline at end of file +