package database import ( "database/sql" "fmt" "log" "time" "weatherstation/pkg/types" ) // GetOnlineDevicesCount 获取在线设备数量 func GetOnlineDevicesCount(db *sql.DB) int { query := ` SELECT COUNT(DISTINCT station_id) FROM rs485_weather_data WHERE timestamp > NOW() - INTERVAL '5 minutes'` var count int if err := db.QueryRow(query).Scan(&count); err != nil { return 0 } return count } // GetStations 获取所有WH65LP站点列表 func GetStations(db *sql.DB) ([]types.Station, error) { query := ` SELECT DISTINCT s.station_id, COALESCE(s.station_alias, '') as station_alias, COALESCE(s.password, '') as station_name, 'WH65LP' as device_type, COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update, COALESCE(s.latitude, 0) as latitude, COALESCE(s.longitude, 0) as longitude, COALESCE(s.name, '') as name, COALESCE(s.location, '') as location, COALESCE(s.z, 0) as z, COALESCE(s.y, 0) as y, COALESCE(s.x, 0) as x FROM stations s LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id WHERE s.station_id LIKE 'RS485-%' GROUP BY s.station_id, s.station_alias, s.password, s.latitude, s.longitude, s.name, s.location, s.z, s.y, s.x ORDER BY s.station_id` rows, err := db.Query(query) if err != nil { return nil, err } defer rows.Close() var stations []types.Station for rows.Next() { var station types.Station var lastUpdate time.Time err := rows.Scan( &station.StationID, &station.StationAlias, &station.StationName, &station.DeviceType, &lastUpdate, &station.Latitude, &station.Longitude, &station.Name, &station.Location, &station.Z, &station.Y, &station.X, ) if err != nil { continue } station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05") stations = append(stations, station) } return stations, nil } // GetWeatherData 获取指定站点的历史天气数据 func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) { // 构建查询SQL(统一风向矢量平均,雨量为累计量的正增量求和) var query string var intervalStr string switch interval { case "10min": intervalStr = "10 minutes" case "30min": intervalStr = "30 minutes" default: // 1hour intervalStr = "1 hour" } query = buildWeatherDataQuery(intervalStr) rows, err := db.Query(query, intervalStr, stationID, startTime, endTime) if err != nil { return nil, err } defer rows.Close() var points []types.WeatherPoint for rows.Next() { var point types.WeatherPoint err := rows.Scan( &point.DateTime, &point.Temperature, &point.Humidity, &point.Pressure, &point.WindSpeed, &point.WindDir, &point.Rainfall, &point.Light, &point.UV, ) if err != nil { continue } points = append(points, point) } return points, nil } // GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均) func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) { log.Printf("查询数据: stationID=%s, start=%v, end=%v, interval=%s", stationID, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), interval) var query string switch interval { case "10min": query = ` SELECT to_char(bucket_start + interval '10 minutes', 'YYYY-MM-DD HH24:MI:SS') AS date_time, ROUND(temp_c_x100/100.0, 2) AS temperature, ROUND(humidity_pct::numeric, 2) AS humidity, ROUND(pressure_hpa_x100/100.0, 2) AS pressure, ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed, ROUND(wind_dir_deg::numeric, 2) AS wind_direction, ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall, ROUND(solar_wm2_x100/100.0, 2) AS light, ROUND(uv_index::numeric, 2) AS uv, ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total FROM rs485_weather_10min WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 ORDER BY bucket_start + interval '10 minutes'` case "30min": query = buildAggFrom10MinQuery("30 minutes") default: // 1hour query = buildAggFrom10MinQuery("1 hour") } // // 调试输出完整SQL // debugSQL := fmt.Sprintf("-- SQL for %s\n%s\n-- Params: stationID=%s, start=%v, end=%v", // interval, query, stationID, startTime, endTime) // log.Println(debugSQL) rows, err := db.Query(query, stationID, startTime, endTime) if err != nil { log.Printf("查询失败: %v", err) return nil, err } defer rows.Close() var points []types.WeatherPoint for rows.Next() { var p types.WeatherPoint if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV, &p.RainTotal); err != nil { continue } points = append(points, p) } return points, nil } // buildAggFrom10MinQuery 返回从10分钟表再聚合的SQL(interval 支持 '30 minutes' 或 '1 hour') func buildAggFrom10MinQuery(interval string) string { return ` WITH base AS ( SELECT * FROM rs485_weather_10min WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 ), g AS ( SELECT CASE '` + interval + `' WHEN '1 hour' THEN date_trunc('hour', bucket_start) WHEN '30 minutes' THEN date_trunc('hour', bucket_start) + CASE WHEN date_part('minute', bucket_start) >= 30 THEN '30 minutes'::interval ELSE '0 minutes'::interval END END AS grp, SUM(temp_c_x100 * sample_count)::bigint AS w_temp, SUM(humidity_pct * sample_count)::bigint AS w_hum, SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p, SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar, SUM(uv_index * sample_count)::bigint AS w_uv, SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws, MAX(wind_gust_ms_x1000) AS gust_max, SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum, SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum, SUM(rain_10m_mm_x1000) AS rain_sum, SUM(sample_count) AS n_sum, MAX(rain_total_mm_x1000) AS rain_total_max FROM base GROUP BY 1 ) SELECT to_char(grp + '` + interval + `'::interval, 'YYYY-MM-DD HH24:MI:SS') AS date_time, ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature, ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity, ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure, ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed, ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0 THEN degrees(atan2(sin_sum, cos_sum)) + 360 ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction, ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall, ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light, ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv, ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total FROM g ORDER BY grp + '` + interval + `'::interval` } // buildWeatherDataQuery 构建天气数据查询SQL func buildWeatherDataQuery(interval string) string { return ` WITH base AS ( SELECT date_trunc('hour', timestamp) + (floor(date_part('minute', timestamp) / extract(epoch from $1::interval) * 60) * $1::interval) as time_group, timestamp as ts, temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv FROM rs485_weather_data WHERE station_id = $2 AND timestamp BETWEEN $3 AND $4 ), rain_inc AS ( SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc FROM base ), rain_sum AS ( SELECT time_group, SUM(inc) AS rainfall FROM rain_inc GROUP BY time_group ), grouped_data AS ( SELECT time_group, AVG(temperature) as temperature, AVG(humidity) as humidity, AVG(pressure) as pressure, AVG(wind_speed) as wind_speed, DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw, AVG(light) as light, AVG(uv) as uv FROM base GROUP BY time_group ) SELECT to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time, ROUND(g.temperature::numeric, 2) as temperature, ROUND(g.humidity::numeric, 2) as humidity, ROUND(g.pressure::numeric, 2) as pressure, ROUND(g.wind_speed::numeric, 2) as wind_speed, ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction, ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall, ROUND(g.light::numeric, 2) as light, ROUND(g.uv::numeric, 2) as uv FROM grouped_data g LEFT JOIN rain_sum r ON r.time_group = g.time_group ORDER BY g.time_group` } // GetForecastData 获取指定站点的预报数据(支持返回每个forecast_time的多版本issued_at) func GetForecastData(db *sql.DB, stationID string, startTime, endTime time.Time, provider string, versions int) ([]types.ForecastPoint, error) { var query string var args []interface{} if versions <= 0 { versions = 1 } if provider != "" { if provider == "open-meteo" { // 合并实时与历史,按 issued_at 降序为每个 forecast_time 取前 N 个版本 query = ` WITH ranked AS ( SELECT station_id, provider, issued_at, forecast_time, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100, ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn, CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours FROM forecast_hourly WHERE station_id = $1 AND provider IN ('open-meteo','open-meteo_historical') AND forecast_time BETWEEN $2 AND $3 ) SELECT to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time, provider, to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at, ROUND(temp_c_x100::numeric / 100.0, 2) as temperature, humidity_pct as humidity, ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure, ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed, wind_dir_deg as wind_direction, ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall, precip_prob_pct as precip_prob, uv_index as uv, lead_hours FROM ranked WHERE rn <= $4 ORDER BY forecast_time, issued_at DESC` args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"), versions} } else { query = ` WITH ranked AS ( SELECT station_id, provider, issued_at, forecast_time, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100, ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn, CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours FROM forecast_hourly WHERE station_id = $1 AND provider = $2 AND forecast_time BETWEEN $3 AND $4 ) SELECT to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time, provider, to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at, ROUND(temp_c_x100::numeric / 100.0, 2) as temperature, humidity_pct as humidity, ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure, ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed, wind_dir_deg as wind_direction, ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall, precip_prob_pct as precip_prob, uv_index as uv, lead_hours FROM ranked WHERE rn <= $5 ORDER BY forecast_time, issued_at DESC` args = []interface{}{stationID, provider, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"), versions} } } else { // 不指定预报提供商:对每个 provider,forecast_time 返回前 N 个 issued_at 版本 query = ` WITH ranked AS ( SELECT station_id, provider, issued_at, forecast_time, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100, ROW_NUMBER() OVER (PARTITION BY provider, forecast_time ORDER BY issued_at DESC) AS rn, CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours FROM forecast_hourly WHERE station_id = $1 AND forecast_time BETWEEN $2 AND $3 ) SELECT to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time, provider, to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at, ROUND(temp_c_x100::numeric / 100.0, 2) as temperature, humidity_pct as humidity, ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure, ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed, wind_dir_deg as wind_direction, ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall, precip_prob_pct as precip_prob, uv_index as uv, lead_hours FROM ranked WHERE rn <= $4 ORDER BY forecast_time, provider, issued_at DESC` args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"), versions} } rows, err := db.Query(query, args...) if err != nil { return nil, fmt.Errorf("查询预报数据失败: %v", err) } defer rows.Close() var points []types.ForecastPoint for rows.Next() { var point types.ForecastPoint err := rows.Scan( &point.DateTime, &point.Provider, &point.IssuedAt, &point.Temperature, &point.Humidity, &point.Pressure, &point.WindSpeed, &point.WindDir, &point.Rainfall, &point.PrecipProb, &point.UV, &point.LeadHours, ) if err != nil { log.Printf("数据扫描错误: %v", err) continue } point.Source = "forecast" points = append(points, point) } return points, nil } func GetSeriesRaw(db *sql.DB, stationID string, startTime, endTime time.Time) ([]types.WeatherPoint, error) { query := ` SELECT to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') AS date_time, COALESCE(temperature, 0) AS temperature, COALESCE(humidity, 0) AS humidity, COALESCE(pressure, 0) AS pressure, COALESCE(wind_speed, 0) AS wind_speed, COALESCE(wind_direction, 0) AS wind_direction, COALESCE(rainfall, 0) AS rainfall, COALESCE(light, 0) AS light, COALESCE(uv, 0) AS uv, COALESCE(rainfall, 0) AS rain_total FROM rs485_weather_data WHERE station_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp` rows, err := db.Query(query, stationID, startTime, endTime) if err != nil { return nil, err } defer rows.Close() var points []types.WeatherPoint for rows.Next() { var p types.WeatherPoint if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV, &p.RainTotal); err != nil { continue } points = append(points, p) } return points, nil }