407 lines
11 KiB
Go
407 lines
11 KiB
Go
package forecast
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"net/http"
|
||
"net/url"
|
||
"os"
|
||
"sort"
|
||
"strings"
|
||
"time"
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
// RunCMACLI 调用中国气象数据网 getSKStationInfo 接口,
|
||
// 以固定参数获取数据,并在控制台输出“当前时间的未来三小时”(整点)的要素值。
|
||
// 仅打印,不写库。
|
||
func RunCMACLI(ctx context.Context) error {
|
||
// 固定参数(可按需调整)
|
||
staID := "59238"
|
||
funitem := "1150101020"
|
||
typeCode := "NWST"
|
||
|
||
// 目标时间:默认从“下一个整点”开始,但后续会根据接口可用时次选择“接下来可用的3个未来时次”
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
now := time.Now().In(loc)
|
||
base := now.Truncate(time.Hour)
|
||
|
||
// 构造请求
|
||
form := url.Values{}
|
||
form.Set("staId", staID)
|
||
form.Set("funitemmenuid", funitem)
|
||
form.Set("typeCode", typeCode)
|
||
|
||
// 使用 getStationInfo(包含更长时段数据,含未来小时)
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode()))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||
req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/")
|
||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
|
||
|
||
client := &http.Client{Timeout: 15 * time.Second}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return fmt.Errorf("CMA 响应状态码异常: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 响应结构:hover 字段标注各要素,list 为多要素时间序列
|
||
var payload struct {
|
||
Hover string `json:"hover"`
|
||
List [][][]interface{} `json:"list"`
|
||
Value string `json:"value"`
|
||
}
|
||
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 解析 hover,确定各要素索引;若 hover 异常则回退为固定顺序:0雨,1温,2湿,3压,4风
|
||
labels := strings.Split(payload.Hover, ",")
|
||
useHover := len(labels) == len(payload.List) && len(payload.List) >= 4
|
||
idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4
|
||
if useHover {
|
||
r := findLabelIndex(labels, []string{"PRE_1h", "PRE"})
|
||
t := findLabelIndex(labels, []string{"TEM", "temperature"})
|
||
h := findLabelIndex(labels, []string{"RHU", "humidity"})
|
||
p := findLabelIndex(labels, []string{"PRS", "pressure"})
|
||
w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"})
|
||
if r >= 0 && t >= 0 && h >= 0 && w >= 0 { // PRS 可能为空表,索引允许>=0
|
||
idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w
|
||
}
|
||
}
|
||
|
||
// 构建时间 -> 值 的查找表(按 Asia/Shanghai 解析时间戳 YYYYMMDDHHMMSS)
|
||
// 每个要素一张表
|
||
tables := make([]map[time.Time]float64, len(payload.List))
|
||
for i := range payload.List {
|
||
tables[i] = map[time.Time]float64{}
|
||
for _, pair := range payload.List[i] {
|
||
if len(pair) != 2 {
|
||
continue
|
||
}
|
||
ts, _ := pair[0].(string)
|
||
val := toFloat(pair[1])
|
||
if len(ts) != 14 {
|
||
continue
|
||
}
|
||
t, err := time.ParseInLocation("20060102150405", ts, loc)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
tables[i][t] = val
|
||
}
|
||
}
|
||
|
||
// 统计可用时间范围(用于提示)和收集未来时次
|
||
var latest time.Time
|
||
timeSet := map[time.Time]struct{}{}
|
||
for i := range tables {
|
||
for t := range tables[i] {
|
||
if t.After(latest) {
|
||
latest = t
|
||
}
|
||
if t.After(base) { // 未来时次
|
||
timeSet[t] = struct{}{}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 选择“接下来可用的三个未来时次”(不强制为连续每小时)
|
||
var targets []time.Time
|
||
if len(timeSet) > 0 {
|
||
// 排序并取前3
|
||
targets = make([]time.Time, 0, len(timeSet))
|
||
for t := range timeSet {
|
||
targets = append(targets, t)
|
||
}
|
||
// 简单选择排序(避免引入 sort 包也没问题,但使用标准库更好)
|
||
}
|
||
|
||
if len(timeSet) > 0 {
|
||
// 使用标准库排序
|
||
targets = sortTimesAsc(targets)
|
||
if len(targets) > 3 {
|
||
targets = targets[:3]
|
||
}
|
||
} else {
|
||
// 回退到严格的下三个整点(若无则输出 NA)
|
||
targets = []time.Time{base.Add(1 * time.Hour), base.Add(2 * time.Hour), base.Add(3 * time.Hour)}
|
||
}
|
||
|
||
// 输出
|
||
fmt.Fprintf(os.Stdout, "CMA next 3 hours (CST) station=%s\n", staID)
|
||
fmt.Fprintln(os.Stdout, "Time, Rain(mm), Temp(°C), RHU(%), PRS(hPa), Wind(m/s)")
|
||
for _, tt := range targets {
|
||
var rain, temp, rhu, prs, wind string
|
||
if idxRain >= 0 && idxRain < len(tables) {
|
||
if v, ok := tables[idxRain][tt]; ok {
|
||
rain = formatOrNA(v)
|
||
} else {
|
||
rain = "NA"
|
||
}
|
||
} else {
|
||
rain = "NA"
|
||
}
|
||
if idxTemp >= 0 && idxTemp < len(tables) {
|
||
if v, ok := tables[idxTemp][tt]; ok {
|
||
temp = formatOrNA(v)
|
||
} else {
|
||
temp = "NA"
|
||
}
|
||
} else {
|
||
temp = "NA"
|
||
}
|
||
if idxRHU >= 0 && idxRHU < len(tables) {
|
||
if v, ok := tables[idxRHU][tt]; ok {
|
||
rhu = formatOrNA(v)
|
||
} else {
|
||
rhu = "NA"
|
||
}
|
||
} else {
|
||
rhu = "NA"
|
||
}
|
||
if idxPRS >= 0 && idxPRS < len(tables) {
|
||
if v, ok := tables[idxPRS][tt]; ok {
|
||
prs = formatOrNA(v)
|
||
} else {
|
||
prs = "NA"
|
||
}
|
||
} else {
|
||
prs = "NA"
|
||
}
|
||
if idxWIN >= 0 && idxWIN < len(tables) {
|
||
if v, ok := tables[idxWIN][tt]; ok {
|
||
wind = formatOrNA(v)
|
||
} else {
|
||
wind = "NA"
|
||
}
|
||
} else {
|
||
wind = "NA"
|
||
}
|
||
fmt.Fprintf(os.Stdout, "%s, %s, %s, %s, %s, %s\n", tt.Format("2006-01-02 15:00"), rain, temp, rhu, prs, wind)
|
||
}
|
||
|
||
if len(targets) > 0 && latest.Before(targets[0]) {
|
||
fmt.Fprintf(os.Stdout, "Note: latest CMA data time is %s; no future values returned by this endpoint.\n", latest.Format("2006-01-02 15:04:05"))
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func findLabelIndex(labels []string, keys []string) int {
|
||
for i, s := range labels {
|
||
for _, k := range keys {
|
||
if strings.Contains(s, k) {
|
||
return i
|
||
}
|
||
}
|
||
}
|
||
return -1
|
||
}
|
||
|
||
func toFloat(v interface{}) float64 {
|
||
switch t := v.(type) {
|
||
case float64:
|
||
return t
|
||
case float32:
|
||
return float64(t)
|
||
case int:
|
||
return float64(t)
|
||
case int64:
|
||
return float64(t)
|
||
case json.Number:
|
||
f, _ := t.Float64()
|
||
return f
|
||
default:
|
||
return 0
|
||
}
|
||
}
|
||
|
||
func formatOrNA(v float64) string {
|
||
// 使用默认格式,保留最多1-2位小数(按需要精简,这里用 %.2f)
|
||
// 但如果值为0,仍然输出 0
|
||
return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.2f", v), "0"), ".")
|
||
}
|
||
|
||
func sortTimesAsc(ts []time.Time) []time.Time {
|
||
sort.Slice(ts, func(i, j int) bool { return ts[i].Before(ts[j]) })
|
||
return ts
|
||
}
|
||
|
||
// RunCMAFetch 拉取一次CMA数据,并将“下一个整点开始的3个小时”写入 forecast_hourly。
|
||
// 特殊规则:所有站点共用同一份数据;缺失项以0填充;provider='cma'。
|
||
func RunCMAFetch(ctx context.Context) error {
|
||
// 固定参数
|
||
staID := "59238"
|
||
funitem := "1150101020"
|
||
typeCode := "NWST"
|
||
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
if loc == nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
issuedAt := time.Now().In(loc)
|
||
base := issuedAt.Truncate(time.Hour)
|
||
|
||
// 请求CMA
|
||
form := url.Values{}
|
||
form.Set("staId", staID)
|
||
form.Set("funitemmenuid", funitem)
|
||
form.Set("typeCode", typeCode)
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://data.cma.cn/dataGis/exhibitionData/getStationInfo", strings.NewReader(form.Encode()))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||
req.Header.Set("Referer", "https://data.cma.cn/dataGis/static/gridgis/")
|
||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
|
||
client := &http.Client{Timeout: 15 * time.Second}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode != http.StatusOK {
|
||
return fmt.Errorf("CMA status=%d", resp.StatusCode)
|
||
}
|
||
|
||
var payload struct {
|
||
Hover string `json:"hover"`
|
||
List [][][]interface{} `json:"list"`
|
||
Value string `json:"value"`
|
||
}
|
||
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 索引映射(容错)
|
||
labels := strings.Split(payload.Hover, ",")
|
||
useHover := len(labels) == len(payload.List) && len(payload.List) >= 4
|
||
idxRain, idxTemp, idxRHU, idxPRS, idxWIN := 0, 1, 2, 3, 4
|
||
if useHover {
|
||
r := findLabelIndex(labels, []string{"PRE_1h", "PRE"})
|
||
t := findLabelIndex(labels, []string{"TEM", "temperature"})
|
||
h := findLabelIndex(labels, []string{"RHU", "humidity"})
|
||
p := findLabelIndex(labels, []string{"PRS", "pressure"})
|
||
w := findLabelIndex(labels, []string{"WIN_S_Avg_10mi", "WIN_SN_S", "WIN_S", "wind"})
|
||
if r >= 0 && t >= 0 && h >= 0 && w >= 0 {
|
||
idxRain, idxTemp, idxRHU, idxPRS, idxWIN = r, t, h, p, w
|
||
}
|
||
}
|
||
|
||
// 建表:要素 -> 时间 -> 值
|
||
tables := make([]map[time.Time]float64, len(payload.List))
|
||
for i := range payload.List {
|
||
tables[i] = map[time.Time]float64{}
|
||
for _, pair := range payload.List[i] {
|
||
if len(pair) != 2 {
|
||
continue
|
||
}
|
||
ts, _ := pair[0].(string)
|
||
val := toFloat(pair[1])
|
||
if len(ts) != 14 {
|
||
continue
|
||
}
|
||
t, err := time.ParseInLocation("20060102150405", ts, loc)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
tables[i][t] = val
|
||
}
|
||
}
|
||
|
||
// 从可用数据中选择“接下来可用的三个未来时次”(按接口实际返回)
|
||
timeSet := map[time.Time]struct{}{}
|
||
for i := range tables {
|
||
for t := range tables[i] {
|
||
if t.After(base) {
|
||
timeSet[t] = struct{}{}
|
||
}
|
||
}
|
||
}
|
||
var targets []time.Time
|
||
for t := range timeSet {
|
||
targets = append(targets, t)
|
||
}
|
||
targets = sortTimesAsc(targets)
|
||
if len(targets) > 3 {
|
||
targets = targets[:3]
|
||
}
|
||
if len(targets) == 0 {
|
||
// 无未来时次则不写库,避免写入“假想整点”。
|
||
return nil
|
||
}
|
||
|
||
// 读取站点列表(全部)
|
||
db := database.GetDB()
|
||
stationIDs, err := loadAllStationIDs(ctx, db)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 写库(缺失置0;单位转换同其他provider)
|
||
for _, ft := range targets {
|
||
rain := getOrZero(tables, idxRain, ft) // mm
|
||
temp := getOrZero(tables, idxTemp, ft) // °C
|
||
rhu := getOrZero(tables, idxRHU, ft) // %
|
||
prs := getOrZero(tables, idxPRS, ft) // hPa
|
||
ws := getOrZero(tables, idxWIN, ft) // m/s
|
||
gust := 0.0 // 未提供
|
||
wdir := 0.0 // 未提供
|
||
prob := 0.0 // 未提供
|
||
|
||
rainMmX1000 := int64(rain * 1000.0)
|
||
tempCx100 := int64(temp * 100.0)
|
||
humidityPct := int64(rhu)
|
||
wsMsX1000 := int64(ws * 1000.0)
|
||
gustMsX1000 := int64(gust * 1000.0)
|
||
wdirDeg := int64(wdir)
|
||
probPct := int64(prob)
|
||
pressureHpaX100 := int64(prs * 100.0)
|
||
|
||
for _, sid := range stationIDs {
|
||
_ = upsertForecastWithProvider(ctx, db, sid, "cma", issuedAt, ft,
|
||
rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func getOrZero(tables []map[time.Time]float64, idx int, t time.Time) float64 {
|
||
if idx >= 0 && idx < len(tables) {
|
||
if v, ok := tables[idx][t]; ok {
|
||
return v
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
|
||
// 加载所有站点ID
|
||
func loadAllStationIDs(ctx context.Context, db *sql.DB) ([]string, error) {
|
||
rows, err := db.QueryContext(ctx, `SELECT station_id FROM stations`)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var list []string
|
||
for rows.Next() {
|
||
var s string
|
||
if err := rows.Scan(&s); err == nil {
|
||
list = append(list, s)
|
||
}
|
||
}
|
||
return list, nil
|
||
}
|