diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index 3d1f2cc..5f48d81 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -32,6 +32,10 @@ func main() { var forecastOnly = flag.Bool("forecast_only", false, "仅执行一次open-meteo拉取并退出") var caiyunOnly = flag.Bool("caiyun_only", false, "仅执行一次彩云拉取并退出") var forecastDay = flag.String("forecast_day", "", "按日期抓取当天0点到当前时间+3h(格式YYYY-MM-DD)") + // 历史数据补完 + var historicalOnly = flag.Bool("historical_only", false, "仅执行历史数据补完并退出") + var historicalStart = flag.String("historical_start", "", "历史数据开始日期(格式YYYY-MM-DD)") + var historicalEnd = flag.String("historical_end", "", "历史数据结束日期(格式YYYY-MM-DD)") flag.Parse() // 设置日志 @@ -90,6 +94,18 @@ func main() { return } + // 历史数据补完 + if *historicalOnly { + if *historicalStart == "" || *historicalEnd == "" { + log.Fatalln("历史数据补完需要提供 --historical_start 与 --historical_end 日期(格式YYYY-MM-DD)") + } + if err := forecast.RunOpenMeteoHistoricalFetch(context.Background(), *historicalStart, *historicalEnd); err != nil { + log.Fatalf("历史数据补完失败: %v", err) + } + log.Println("历史数据补完完成") + return + } + // Backfill 调试路径 if *doBackfill { if *bfFrom == "" || *bfTo == "" { diff --git a/internal/database/models.go b/internal/database/models.go index ef8cef5..3f6c6e8 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -267,32 +267,92 @@ func GetForecastData(db *sql.DB, stationID string, startTime, endTime time.Time, if provider != "" { // 指定预报提供商 - query = ` - WITH latest_forecasts AS ( - SELECT DISTINCT ON (forecast_time) - station_id, provider, issued_at, forecast_time, - temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, - wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100 - FROM forecast_hourly - WHERE station_id = $1 AND provider = $2 - AND forecast_time BETWEEN $3 AND $4 - ORDER BY forecast_time, issued_at DESC - ) - SELECT - to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time, - provider, - to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at, - ROUND(temp_c_x100::numeric / 100.0, 2) as temperature, - humidity_pct as humidity, - ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure, - ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed, - wind_dir_deg as wind_direction, - ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall, - precip_prob_pct as precip_prob, - uv_index as uv - FROM latest_forecasts - ORDER BY forecast_time` - args = []interface{}{stationID, provider, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")} + if provider == "open-meteo" { + // 合并实时与历史,优先实时的最新issued_at + query = ` + WITH latest_forecasts AS ( + SELECT DISTINCT ON (forecast_time) + station_id, provider, issued_at, forecast_time, + temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, + wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100 + FROM forecast_hourly + WHERE station_id = $1 AND provider IN ('open-meteo','open-meteo_historical') + AND forecast_time BETWEEN $2 AND $3 + ORDER BY forecast_time, + CASE WHEN provider='open-meteo' THEN 0 ELSE 1 END, + issued_at DESC + ) + SELECT + to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time, + provider, + to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at, + ROUND(temp_c_x100::numeric / 100.0, 2) as temperature, + humidity_pct as humidity, + ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure, + ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed, + wind_dir_deg as wind_direction, + ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall, + precip_prob_pct as precip_prob, + uv_index as uv + FROM latest_forecasts + ORDER BY forecast_time` + args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")} + + // 调试日志 + log.Printf("执行open-meteo合并查询: stationID=%s, start=%s, end=%s", + stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")) + + // 检查是否有历史数据 + var histCount int + err := db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly + WHERE station_id = $1 AND provider = 'open-meteo_historical' + AND forecast_time BETWEEN $2 AND $3`, + stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")).Scan(&histCount) + if err != nil { + log.Printf("查询历史数据计数失败: %v", err) + } else { + log.Printf("时间范围内历史数据计数: %d 条", histCount) + } + + // 检查是否有实时数据 + var rtCount int + err = db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly + WHERE station_id = $1 AND provider = 'open-meteo' + AND forecast_time BETWEEN $2 AND $3`, + stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")).Scan(&rtCount) + if err != nil { + log.Printf("查询实时数据计数失败: %v", err) + } else { + log.Printf("时间范围内实时数据计数: %d 条", rtCount) + } + } else { + query = ` + WITH latest_forecasts AS ( + SELECT DISTINCT ON (forecast_time) + station_id, provider, issued_at, forecast_time, + temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, + wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100 + FROM forecast_hourly + WHERE station_id = $1 AND provider = $2 + AND forecast_time BETWEEN $3 AND $4 + ORDER BY forecast_time, issued_at DESC + ) + SELECT + to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time, + provider, + to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at, + ROUND(temp_c_x100::numeric / 100.0, 2) as temperature, + humidity_pct as humidity, + ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure, + ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed, + wind_dir_deg as wind_direction, + ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall, + precip_prob_pct as precip_prob, + uv_index as uv + FROM latest_forecasts + ORDER BY forecast_time` + args = []interface{}{stationID, provider, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")} + } } else { // 不指定预报提供商,取所有 query = ` @@ -322,7 +382,7 @@ func GetForecastData(db *sql.DB, stationID string, startTime, endTime time.Time, args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")} } - // log.Printf("执行预报数据查询: %s, args: %v", query, args) + // 执行查询 rows, err := db.Query(query, args...) if err != nil { return nil, fmt.Errorf("查询预报数据失败: %v", err) @@ -351,7 +411,6 @@ func GetForecastData(db *sql.DB, stationID string, startTime, endTime time.Time, } point.Source = "forecast" points = append(points, point) - // log.Printf("成功扫描预报数据: %+v", point) } return points, nil diff --git a/internal/forecast/open_meteo.go b/internal/forecast/open_meteo.go index 7086f5f..52feca6 100644 --- a/internal/forecast/open_meteo.go +++ b/internal/forecast/open_meteo.go @@ -184,3 +184,158 @@ func upsertForecast(ctx context.Context, db *sql.DB, stationID string, issuedAt, rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) return err } + +// 新增:支持自定义provider的upsert +func upsertForecastWithProvider(ctx context.Context, db *sql.DB, stationID, provider string, issuedAt, forecastTime time.Time, + rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100 int64, +) error { + // 调试日志 + if provider == "open-meteo_historical" { + log.Printf("写入历史数据: station=%s, time=%s, temp=%.2f, humidity=%d", + stationID, forecastTime.Format("2006-01-02 15:04:05"), float64(tempCx100)/100.0, humidityPct) + } + + _, err := db.ExecContext(ctx, ` + INSERT INTO forecast_hourly ( + station_id, provider, issued_at, forecast_time, + rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, + wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100 + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (station_id, provider, issued_at, forecast_time) + DO UPDATE SET + rain_mm_x1000 = EXCLUDED.rain_mm_x1000, + temp_c_x100 = EXCLUDED.temp_c_x100, + humidity_pct = EXCLUDED.humidity_pct, + wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, + wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, + wind_dir_deg = EXCLUDED.wind_dir_deg, + precip_prob_pct = EXCLUDED.precip_prob_pct, + pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100 + `, stationID, provider, issuedAt, forecastTime, + rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100) + return err +} + +// RunOpenMeteoHistoricalFetch 拉取指定时间段的历史数据并写入 forecast_hourly(provider=open-meteo_historical) +func RunOpenMeteoHistoricalFetch(ctx context.Context, startDate, endDate string) error { + db := database.GetDB() + stations, err := loadStationsWithLatLon(ctx, db) + if err != nil { + return fmt.Errorf("加载站点失败: %v", err) + } + + client := &http.Client{Timeout: 30 * time.Second} + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + log.Printf("开始补完历史数据: %s 到 %s,共 %d 个站点", startDate, endDate, len(stations)) + + for i, s := range stations { + log.Printf("处理站点 %d/%d: %s", i+1, len(stations), s.id) + + apiURL := buildOpenMeteoHistoricalURL(s.lat, s.lon, startDate, endDate) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil) + resp, err := client.Do(req) + if err != nil { + log.Printf("历史数据请求失败 station=%s err=%v", s.id, err) + continue + } + + var data openMeteoResponse + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + resp.Body.Close() + log.Printf("历史数据解码失败 station=%s err=%v", s.id, err) + continue + } + resp.Body.Close() + + // 处理并写入forecast_hourly(历史) + count := 0 + issuedAt := time.Now().In(loc) + for i := range data.Hourly.Time { + // 解析时间(使用CST时区) + t, err := time.ParseInLocation("2006-01-02T15:04", data.Hourly.Time[i], loc) + if err != nil { + log.Printf("解析时间失败: %s, err=%v", data.Hourly.Time[i], err) + continue + } + + // 收集并转换(与forecast_hourly缩放一致) + rainMmX1000 := int64(0) + if i < len(data.Hourly.Rain) { + rainMmX1000 = int64(data.Hourly.Rain[i] * 1000.0) + } + tempCx100 := int64(0) + if i < len(data.Hourly.Temperature) { + tempCx100 = int64(data.Hourly.Temperature[i] * 100.0) + } + humidityPct := int64(0) + if i < len(data.Hourly.Humidity) { + humidityPct = int64(data.Hourly.Humidity[i]) + } + wsMsX1000 := int64(0) + if i < len(data.Hourly.WindSpeed) { + wsMsX1000 = int64((data.Hourly.WindSpeed[i] / 3.6) * 1000.0) + } + gustMsX1000 := int64(0) // ERA5此接口未提供阵风,置0 + wdirDeg := int64(0) + if i < len(data.Hourly.WindDir) { + wdirDeg = int64(data.Hourly.WindDir[i]) + } + probPct := int64(0) // 历史无降水概率,置0 + pressureHpaX100 := int64(0) + if i < len(data.Hourly.SurfacePres) { + pressureHpaX100 = int64(data.Hourly.SurfacePres[i] * 100.0) + } + + if err := upsertForecastWithProvider( + ctx, db, s.id, "open-meteo_historical", issuedAt, t, + rainMmX1000, tempCx100, humidityPct, wsMsX1000, gustMsX1000, wdirDeg, probPct, pressureHpaX100, + ); err != nil { + log.Printf("写入历史forecast失败 station=%s time=%s err=%v", s.id, t.Format(time.RFC3339), err) + } else { + count++ + } + } + log.Printf("站点 %s 成功写入 %d 条历史forecast记录", s.id, count) + // 防止请求过频 + time.Sleep(100 * time.Millisecond) + } + + return nil +} + +func buildOpenMeteoHistoricalURL(lat, lon sql.NullFloat64, startDate, endDate string) string { + q := url.Values{} + q.Set("latitude", fmt.Sprintf("%f", lat.Float64)) + q.Set("longitude", fmt.Sprintf("%f", lon.Float64)) + q.Set("start_date", startDate) + q.Set("end_date", endDate) + q.Set("hourly", "temperature_2m,relative_humidity_2m,surface_pressure,wind_speed_10m,wind_direction_10m,rain") + q.Set("timezone", "Asia/Shanghai") + return "https://archive-api.open-meteo.com/v1/era5?" + q.Encode() +} + +func insertHistoricalData(ctx context.Context, db *sql.DB, stationID string, timestamp time.Time, + temp, humidity, pressure, windSpeed, windDir, rainfall *float64) error { + + _, err := db.ExecContext(ctx, ` + INSERT INTO rs485_weather_data ( + station_id, timestamp, temperature, humidity, pressure, + wind_speed, wind_direction, rainfall, raw_data + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (station_id, timestamp) DO UPDATE SET + temperature = EXCLUDED.temperature, + humidity = EXCLUDED.humidity, + pressure = EXCLUDED.pressure, + wind_speed = EXCLUDED.wind_speed, + wind_direction = EXCLUDED.wind_direction, + rainfall = EXCLUDED.rainfall, + raw_data = EXCLUDED.raw_data + `, stationID, timestamp, temp, humidity, pressure, windSpeed, windDir, rainfall, + fmt.Sprintf("open-meteo-historical:%s", timestamp.Format(time.RFC3339))) + + return err +} diff --git a/internal/server/gin.go b/internal/server/gin.go index 302d192..c97a906 100644 --- a/internal/server/gin.go +++ b/internal/server/gin.go @@ -192,5 +192,11 @@ func getForecastHandler(c *gin.Context) { } log.Printf("查询到预报数据: %d 条", len(points)) + // 调试:打印前几条记录 + for i, p := range points { + if i < 5 { + log.Printf("预报数据 #%d: time=%s, provider=%s, issued=%s", i, p.DateTime, p.Provider, p.IssuedAt) + } + } c.JSON(http.StatusOK, points) } diff --git a/templates/index.html b/templates/index.html index bd576d4..67f158e 100644 --- a/templates/index.html +++ b/templates/index.html @@ -103,7 +103,7 @@ } .map-container.collapsed { - height: 100px; + height: 20vh; } #map { @@ -139,6 +139,29 @@ background-color: #007bff; color: white; } + + .map-toggle-btn { + position: absolute; + top: 10px; + right: 10px; + z-index: 1001; + border-radius: 4px; + padding: 5px 10px; + cursor: pointer; + font-size: 12px; + font-weight: bold; + color: white; + background-color: #007bff; + } + + .station-info-title { + text-align: center; + margin-bottom: 15px; + padding: 10px; + font-size: 14px; + line-height: 1.6; + } + .chart-container { margin-bottom: 20px; @@ -181,7 +204,7 @@ th, td { border: 1px solid #ddd; padding: 12px 8px; - text-align: left; + text-align: center; } th { @@ -476,10 +499,12 @@
| 时间 | 温度 (°C) | 湿度 (%) | @@ -497,7 +528,6 @@风速 (m/s) | 风向 (°) | 雨量 (mm) | -降水概率 (%) | 光照 (lux) | 紫外线 | ${item.date_time} [${item.source}] | -${item.temperature !== null && item.temperature !== undefined ? item.temperature.toFixed(2) : '-'} | -${item.humidity !== null && item.humidity !== undefined ? item.humidity.toFixed(2) : '-'} | -${item.pressure !== null && item.pressure !== undefined ? item.pressure.toFixed(2) : '-'} | -${item.wind_speed !== null && item.wind_speed !== undefined ? item.wind_speed.toFixed(2) : '-'} | -${item.wind_direction !== null && item.wind_direction !== undefined ? item.wind_direction.toFixed(2) : '-'} | -${item.rainfall !== null && item.rainfall !== undefined ? item.rainfall.toFixed(3) : '-'} | -${item.source === '预报' && item.precip_prob !== null && item.precip_prob !== undefined ? item.precip_prob : '-'} | -${item.light !== null && item.light !== undefined ? item.light.toFixed(2) : '-'} | -${item.uv !== null && item.uv !== undefined ? item.uv.toFixed(2) : '-'} | - `; + // 移除错误的覆盖逻辑,让实测数据正常显示 + const overrideDash = false; // 不再覆盖实测数据为'-' + + const fmt2 = v => (v === null || v === undefined ? '-' : Number(v).toFixed(2)); + const fmt3 = v => (v === null || v === undefined ? '-' : Number(v).toFixed(3)); + + // 构建基础列 + let columns = [ + `${item.date_time}${hasForecast ? ` [${item.source}]` : ''} | `, + `${overrideDash ? '-' : fmt2(item.temperature)} | `, + `${overrideDash ? '-' : fmt2(item.humidity)} | `, + `${overrideDash ? '-' : fmt2(item.pressure)} | `, + `${overrideDash ? '-' : fmt2(item.wind_speed)} | `, + `${overrideDash ? '-' : fmt2(item.wind_direction)} | `, + `${overrideDash ? '-' : fmt3(item.rainfall)} | ` + ]; + + // 如果显示预报,添加降水概率列 + if (hasForecast) { + columns.push(`${item.source === '预报' && item.precip_prob !== null && item.precip_prob !== undefined ? item.precip_prob : '-'} | `); + } + + // 添加剩余列 + columns.push( + `${overrideDash ? '-' : (item.light !== null && item.light !== undefined ? Number(item.light).toFixed(2) : '-')} | `, + `${overrideDash ? '-' : (item.uv !== null && item.uv !== undefined ? Number(item.uv).toFixed(2) : '-')} | ` + ); + + row.innerHTML = columns.join(''); tbody.appendChild(row); }); }
|---|