2025-08-29 20:23:13 +08:00

437 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package database
import (
"database/sql"
"fmt"
"log"
"time"
"weatherstation/pkg/types"
)
// GetOnlineDevicesCount 获取在线设备数量
func GetOnlineDevicesCount(db *sql.DB) int {
query := `
SELECT COUNT(DISTINCT station_id)
FROM rs485_weather_data
WHERE timestamp > NOW() - INTERVAL '5 minutes'`
var count int
if err := db.QueryRow(query).Scan(&count); err != nil {
return 0
}
return count
}
// GetStations 获取所有WH65LP站点列表
func GetStations(db *sql.DB) ([]types.Station, error) {
query := `
SELECT DISTINCT s.station_id,
COALESCE(s.password, '') as station_name,
'WH65LP' as device_type,
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update,
COALESCE(s.latitude, 0) as latitude,
COALESCE(s.longitude, 0) as longitude,
COALESCE(s.name, '') as name,
COALESCE(s.location, '') as location
FROM stations s
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
WHERE s.station_id LIKE 'RS485-%'
GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location
ORDER BY s.station_id`
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var stations []types.Station
for rows.Next() {
var station types.Station
var lastUpdate time.Time
err := rows.Scan(
&station.StationID,
&station.StationName,
&station.DeviceType,
&lastUpdate,
&station.Latitude,
&station.Longitude,
&station.Name,
&station.Location,
)
if err != nil {
continue
}
station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05")
stations = append(stations, station)
}
return stations, nil
}
// GetWeatherData 获取指定站点的历史天气数据
func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
// 构建查询SQL统一风向矢量平均雨量为累计量的正增量求和
var query string
var intervalStr string
switch interval {
case "10min":
intervalStr = "10 minutes"
case "30min":
intervalStr = "30 minutes"
default: // 1hour
intervalStr = "1 hour"
}
query = buildWeatherDataQuery(intervalStr)
rows, err := db.Query(query, intervalStr, stationID, startTime, endTime)
if err != nil {
return nil, err
}
defer rows.Close()
var points []types.WeatherPoint
for rows.Next() {
var point types.WeatherPoint
err := rows.Scan(
&point.DateTime,
&point.Temperature,
&point.Humidity,
&point.Pressure,
&point.WindSpeed,
&point.WindDir,
&point.Rainfall,
&point.Light,
&point.UV,
)
if err != nil {
continue
}
points = append(points, point)
}
return points, nil
}
// GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均)
func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
log.Printf("查询数据: stationID=%s, start=%v, end=%v, interval=%s",
stationID, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), interval)
var query string
switch interval {
case "10min":
query = `
SELECT
to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
ROUND(temp_c_x100/100.0, 2) AS temperature,
ROUND(humidity_pct::numeric, 2) AS humidity,
ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed,
ROUND(wind_dir_deg::numeric, 2) AS wind_direction,
ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall,
ROUND(solar_wm2_x100/100.0, 2) AS light,
ROUND(uv_index::numeric, 2) AS uv,
ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
ORDER BY bucket_start`
case "30min":
query = buildAggFrom10MinQuery("30 minutes")
default: // 1hour
query = buildAggFrom10MinQuery("1 hour")
}
// // 调试输出完整SQL
// debugSQL := fmt.Sprintf("-- SQL for %s\n%s\n-- Params: stationID=%s, start=%v, end=%v",
// interval, query, stationID, startTime, endTime)
// log.Println(debugSQL)
rows, err := db.Query(query, stationID, startTime, endTime)
if err != nil {
log.Printf("查询失败: %v", err)
return nil, err
}
defer rows.Close()
var points []types.WeatherPoint
for rows.Next() {
var p types.WeatherPoint
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV, &p.RainTotal); err != nil {
continue
}
points = append(points, p)
}
return points, nil
}
// buildAggFrom10MinQuery 返回从10分钟表再聚合的SQLinterval 支持 '30 minutes' 或 '1 hour'
func buildAggFrom10MinQuery(interval string) string {
return `
WITH base AS (
SELECT * FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
), g AS (
SELECT
CASE '` + interval + `'
WHEN '1 hour' THEN date_trunc('hour', bucket_start)
WHEN '30 minutes' THEN
date_trunc('hour', bucket_start) +
CASE WHEN date_part('minute', bucket_start) >= 30
THEN '30 minutes'::interval
ELSE '0 minutes'::interval
END
END AS grp,
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
SUM(humidity_pct * sample_count)::bigint AS w_hum,
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
SUM(uv_index * sample_count)::bigint AS w_uv,
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
MAX(wind_gust_ms_x1000) AS gust_max,
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
SUM(rain_10m_mm_x1000) AS rain_sum,
SUM(sample_count) AS n_sum,
MAX(rain_total_mm_x1000) AS rain_total_max
FROM base
GROUP BY 1
)
SELECT
to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed,
ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
THEN degrees(atan2(sin_sum, cos_sum)) + 360
ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction,
ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall,
ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light,
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv,
ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total
FROM g
ORDER BY grp`
}
// buildWeatherDataQuery 构建天气数据查询SQL
func buildWeatherDataQuery(interval string) string {
return `
WITH base AS (
SELECT
date_trunc('hour', timestamp) +
(floor(date_part('minute', timestamp) / extract(epoch from $1::interval) * 60) * $1::interval) as time_group,
timestamp as ts,
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
FROM rs485_weather_data
WHERE station_id = $2 AND timestamp BETWEEN $3 AND $4
),
rain_inc AS (
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
FROM base
),
rain_sum AS (
SELECT time_group, SUM(inc) AS rainfall
FROM rain_inc
GROUP BY time_group
),
grouped_data AS (
SELECT
time_group,
AVG(temperature) as temperature,
AVG(humidity) as humidity,
AVG(pressure) as pressure,
AVG(wind_speed) as wind_speed,
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
AVG(light) as light,
AVG(uv) as uv
FROM base
GROUP BY time_group
)
SELECT
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
ROUND(g.temperature::numeric, 2) as temperature,
ROUND(g.humidity::numeric, 2) as humidity,
ROUND(g.pressure::numeric, 2) as pressure,
ROUND(g.wind_speed::numeric, 2) as wind_speed,
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
ROUND(g.light::numeric, 2) as light,
ROUND(g.uv::numeric, 2) as uv
FROM grouped_data g
LEFT JOIN rain_sum r ON r.time_group = g.time_group
ORDER BY g.time_group`
}
// GetForecastData 获取指定站点的预报数据支持返回每个forecast_time的多版本issued_at
func GetForecastData(db *sql.DB, stationID string, startTime, endTime time.Time, provider string, versions int) ([]types.ForecastPoint, error) {
var query string
var args []interface{}
if versions <= 0 {
versions = 1
}
if provider != "" {
if provider == "open-meteo" {
// 合并实时与历史,按 issued_at 降序为每个 forecast_time 取前 N 个版本
query = `
WITH ranked AS (
SELECT
station_id, provider, issued_at, forecast_time,
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours
FROM forecast_hourly
WHERE station_id = $1 AND provider IN ('open-meteo','open-meteo_historical')
AND forecast_time BETWEEN $2 AND $3
)
SELECT
to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
provider,
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
humidity_pct as humidity,
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
wind_dir_deg as wind_direction,
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
precip_prob_pct as precip_prob,
uv_index as uv,
lead_hours
FROM ranked
WHERE rn <= $4
ORDER BY forecast_time, issued_at DESC`
args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"), versions}
} else {
query = `
WITH ranked AS (
SELECT
station_id, provider, issued_at, forecast_time,
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours
FROM forecast_hourly
WHERE station_id = $1 AND provider = $2
AND forecast_time BETWEEN $3 AND $4
)
SELECT
to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
provider,
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
humidity_pct as humidity,
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
wind_dir_deg as wind_direction,
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
precip_prob_pct as precip_prob,
uv_index as uv,
lead_hours
FROM ranked
WHERE rn <= $5
ORDER BY forecast_time, issued_at DESC`
args = []interface{}{stationID, provider, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"), versions}
}
} else {
// 不指定预报提供商:对每个 provider,forecast_time 返回前 N 个 issued_at 版本
query = `
WITH ranked AS (
SELECT
station_id, provider, issued_at, forecast_time,
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100,
ROW_NUMBER() OVER (PARTITION BY provider, forecast_time ORDER BY issued_at DESC) AS rn,
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours
FROM forecast_hourly
WHERE station_id = $1 AND forecast_time BETWEEN $2 AND $3
)
SELECT
to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
provider,
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
humidity_pct as humidity,
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
wind_dir_deg as wind_direction,
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
precip_prob_pct as precip_prob,
uv_index as uv,
lead_hours
FROM ranked
WHERE rn <= $4
ORDER BY forecast_time, provider, issued_at DESC`
args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"), versions}
}
rows, err := db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("查询预报数据失败: %v", err)
}
defer rows.Close()
var points []types.ForecastPoint
for rows.Next() {
var point types.ForecastPoint
err := rows.Scan(
&point.DateTime,
&point.Provider,
&point.IssuedAt,
&point.Temperature,
&point.Humidity,
&point.Pressure,
&point.WindSpeed,
&point.WindDir,
&point.Rainfall,
&point.PrecipProb,
&point.UV,
&point.LeadHours,
)
if err != nil {
log.Printf("数据扫描错误: %v", err)
continue
}
point.Source = "forecast"
points = append(points, point)
}
return points, nil
}
func GetSeriesRaw(db *sql.DB, stationID string, startTime, endTime time.Time) ([]types.WeatherPoint, error) {
query := `
SELECT
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
COALESCE(temperature, 0) AS temperature,
COALESCE(humidity, 0) AS humidity,
COALESCE(pressure, 0) AS pressure,
COALESCE(wind_speed, 0) AS wind_speed,
COALESCE(wind_direction, 0) AS wind_direction,
COALESCE(rainfall, 0) AS rainfall,
COALESCE(light, 0) AS light,
COALESCE(uv, 0) AS uv,
COALESCE(rainfall, 0) AS rain_total
FROM rs485_weather_data
WHERE station_id = $1 AND timestamp >= $2 AND timestamp <= $3
ORDER BY timestamp`
rows, err := db.Query(query, stationID, startTime, endTime)
if err != nil {
return nil, err
}
defer rows.Close()
var points []types.WeatherPoint
for rows.Next() {
var p types.WeatherPoint
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV, &p.RainTotal); err != nil {
continue
}
points = append(points, p)
}
return points, nil
}