418 lines
14 KiB
Go
418 lines
14 KiB
Go
package database
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"log"
|
||
"time"
|
||
"weatherstation/pkg/types"
|
||
)
|
||
|
||
// GetOnlineDevicesCount 获取在线设备数量
|
||
func GetOnlineDevicesCount(db *sql.DB) int {
|
||
query := `
|
||
SELECT COUNT(DISTINCT station_id)
|
||
FROM rs485_weather_data
|
||
WHERE timestamp > NOW() - INTERVAL '5 minutes'`
|
||
|
||
var count int
|
||
if err := db.QueryRow(query).Scan(&count); err != nil {
|
||
return 0
|
||
}
|
||
return count
|
||
}
|
||
|
||
// GetStations 获取所有WH65LP站点列表
|
||
func GetStations(db *sql.DB) ([]types.Station, error) {
|
||
query := `
|
||
SELECT DISTINCT s.station_id,
|
||
COALESCE(s.password, '') as station_name,
|
||
'WH65LP' as device_type,
|
||
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update,
|
||
COALESCE(s.latitude, 0) as latitude,
|
||
COALESCE(s.longitude, 0) as longitude,
|
||
COALESCE(s.name, '') as name,
|
||
COALESCE(s.location, '') as location
|
||
FROM stations s
|
||
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
|
||
WHERE s.station_id LIKE 'RS485-%'
|
||
GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location
|
||
ORDER BY s.station_id`
|
||
|
||
rows, err := db.Query(query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var stations []types.Station
|
||
for rows.Next() {
|
||
var station types.Station
|
||
var lastUpdate time.Time
|
||
err := rows.Scan(
|
||
&station.StationID,
|
||
&station.StationName,
|
||
&station.DeviceType,
|
||
&lastUpdate,
|
||
&station.Latitude,
|
||
&station.Longitude,
|
||
&station.Name,
|
||
&station.Location,
|
||
)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05")
|
||
stations = append(stations, station)
|
||
}
|
||
|
||
return stations, nil
|
||
}
|
||
|
||
// GetWeatherData 获取指定站点的历史天气数据
|
||
func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
|
||
// 构建查询SQL(统一风向矢量平均,雨量为累计量的正增量求和)
|
||
var query string
|
||
var intervalStr string
|
||
switch interval {
|
||
case "10min":
|
||
intervalStr = "10 minutes"
|
||
case "30min":
|
||
intervalStr = "30 minutes"
|
||
default: // 1hour
|
||
intervalStr = "1 hour"
|
||
}
|
||
query = buildWeatherDataQuery(intervalStr)
|
||
|
||
rows, err := db.Query(query, intervalStr, stationID, startTime, endTime)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var points []types.WeatherPoint
|
||
for rows.Next() {
|
||
var point types.WeatherPoint
|
||
err := rows.Scan(
|
||
&point.DateTime,
|
||
&point.Temperature,
|
||
&point.Humidity,
|
||
&point.Pressure,
|
||
&point.WindSpeed,
|
||
&point.WindDir,
|
||
&point.Rainfall,
|
||
&point.Light,
|
||
&point.UV,
|
||
)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
points = append(points, point)
|
||
}
|
||
|
||
return points, nil
|
||
}
|
||
|
||
// GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均)
|
||
func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
|
||
log.Printf("查询数据: stationID=%s, start=%v, end=%v, interval=%s",
|
||
stationID, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), interval)
|
||
|
||
var query string
|
||
switch interval {
|
||
case "10min":
|
||
query = `
|
||
SELECT
|
||
to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||
ROUND(temp_c_x100/100.0, 2) AS temperature,
|
||
ROUND(humidity_pct::numeric, 2) AS humidity,
|
||
ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
|
||
ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed,
|
||
ROUND(wind_dir_deg::numeric, 2) AS wind_direction,
|
||
ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall,
|
||
ROUND(solar_wm2_x100/100.0, 2) AS light,
|
||
ROUND(uv_index::numeric, 2) AS uv
|
||
FROM rs485_weather_10min
|
||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
||
ORDER BY bucket_start`
|
||
case "30min":
|
||
query = buildAggFrom10MinQuery("30 minutes")
|
||
default: // 1hour
|
||
query = buildAggFrom10MinQuery("1 hour")
|
||
}
|
||
|
||
// // 调试输出完整SQL
|
||
// debugSQL := fmt.Sprintf("-- SQL for %s\n%s\n-- Params: stationID=%s, start=%v, end=%v",
|
||
// interval, query, stationID, startTime, endTime)
|
||
// log.Println(debugSQL)
|
||
|
||
rows, err := db.Query(query, stationID, startTime, endTime)
|
||
if err != nil {
|
||
log.Printf("查询失败: %v", err)
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var points []types.WeatherPoint
|
||
for rows.Next() {
|
||
var p types.WeatherPoint
|
||
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV); err != nil {
|
||
continue
|
||
}
|
||
points = append(points, p)
|
||
}
|
||
return points, nil
|
||
}
|
||
|
||
// buildAggFrom10MinQuery 返回从10分钟表再聚合的SQL(interval 支持 '30 minutes' 或 '1 hour')
|
||
func buildAggFrom10MinQuery(interval string) string {
|
||
return `
|
||
WITH base AS (
|
||
SELECT * FROM rs485_weather_10min
|
||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
||
), g AS (
|
||
SELECT
|
||
CASE '` + interval + `'
|
||
WHEN '1 hour' THEN date_trunc('hour', bucket_start)
|
||
WHEN '30 minutes' THEN
|
||
date_trunc('hour', bucket_start) +
|
||
CASE WHEN date_part('minute', bucket_start) >= 30
|
||
THEN '30 minutes'::interval
|
||
ELSE '0 minutes'::interval
|
||
END
|
||
END AS grp,
|
||
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
|
||
SUM(humidity_pct * sample_count)::bigint AS w_hum,
|
||
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
|
||
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
|
||
SUM(uv_index * sample_count)::bigint AS w_uv,
|
||
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
|
||
MAX(wind_gust_ms_x1000) AS gust_max,
|
||
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
|
||
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
|
||
SUM(rain_10m_mm_x1000) AS rain_sum,
|
||
SUM(sample_count) AS n_sum
|
||
FROM base
|
||
GROUP BY 1
|
||
)
|
||
SELECT
|
||
to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
|
||
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
|
||
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
|
||
ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed,
|
||
ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
|
||
THEN degrees(atan2(sin_sum, cos_sum)) + 360
|
||
ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction,
|
||
ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall,
|
||
ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light,
|
||
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv
|
||
FROM g
|
||
ORDER BY grp`
|
||
}
|
||
|
||
// buildWeatherDataQuery 构建天气数据查询SQL
|
||
func buildWeatherDataQuery(interval string) string {
|
||
return `
|
||
WITH base AS (
|
||
SELECT
|
||
date_trunc('hour', timestamp) +
|
||
(floor(date_part('minute', timestamp) / extract(epoch from $1::interval) * 60) * $1::interval) as time_group,
|
||
timestamp as ts,
|
||
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
|
||
FROM rs485_weather_data
|
||
WHERE station_id = $2 AND timestamp BETWEEN $3 AND $4
|
||
),
|
||
rain_inc AS (
|
||
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
|
||
FROM base
|
||
),
|
||
rain_sum AS (
|
||
SELECT time_group, SUM(inc) AS rainfall
|
||
FROM rain_inc
|
||
GROUP BY time_group
|
||
),
|
||
grouped_data AS (
|
||
SELECT
|
||
time_group,
|
||
AVG(temperature) as temperature,
|
||
AVG(humidity) as humidity,
|
||
AVG(pressure) as pressure,
|
||
AVG(wind_speed) as wind_speed,
|
||
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
|
||
AVG(light) as light,
|
||
AVG(uv) as uv
|
||
FROM base
|
||
GROUP BY time_group
|
||
)
|
||
SELECT
|
||
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
||
ROUND(g.temperature::numeric, 2) as temperature,
|
||
ROUND(g.humidity::numeric, 2) as humidity,
|
||
ROUND(g.pressure::numeric, 2) as pressure,
|
||
ROUND(g.wind_speed::numeric, 2) as wind_speed,
|
||
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
|
||
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
|
||
ROUND(g.light::numeric, 2) as light,
|
||
ROUND(g.uv::numeric, 2) as uv
|
||
FROM grouped_data g
|
||
LEFT JOIN rain_sum r ON r.time_group = g.time_group
|
||
ORDER BY g.time_group`
|
||
}
|
||
|
||
// GetForecastData 获取指定站点的预报数据(优先最新issued_at)
|
||
func GetForecastData(db *sql.DB, stationID string, startTime, endTime time.Time, provider string) ([]types.ForecastPoint, error) {
|
||
var query string
|
||
var args []interface{}
|
||
|
||
if provider != "" {
|
||
// 指定预报提供商
|
||
if provider == "open-meteo" {
|
||
// 合并实时与历史,优先实时的最新issued_at
|
||
query = `
|
||
WITH latest_forecasts AS (
|
||
SELECT DISTINCT ON (forecast_time)
|
||
station_id, provider, issued_at, forecast_time,
|
||
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
||
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100
|
||
FROM forecast_hourly
|
||
WHERE station_id = $1 AND provider IN ('open-meteo','open-meteo_historical')
|
||
AND forecast_time BETWEEN $2 AND $3
|
||
ORDER BY forecast_time,
|
||
CASE WHEN provider='open-meteo' THEN 0 ELSE 1 END,
|
||
issued_at DESC
|
||
)
|
||
SELECT
|
||
to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
||
provider,
|
||
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
|
||
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
|
||
humidity_pct as humidity,
|
||
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
|
||
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
|
||
wind_dir_deg as wind_direction,
|
||
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
|
||
precip_prob_pct as precip_prob,
|
||
uv_index as uv
|
||
FROM latest_forecasts
|
||
ORDER BY forecast_time`
|
||
args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")}
|
||
|
||
// 调试日志
|
||
log.Printf("执行open-meteo合并查询: stationID=%s, start=%s, end=%s",
|
||
stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07"))
|
||
|
||
// 检查是否有历史数据
|
||
var histCount int
|
||
err := db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly
|
||
WHERE station_id = $1 AND provider = 'open-meteo_historical'
|
||
AND forecast_time BETWEEN $2 AND $3`,
|
||
stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")).Scan(&histCount)
|
||
if err != nil {
|
||
log.Printf("查询历史数据计数失败: %v", err)
|
||
} else {
|
||
log.Printf("时间范围内历史数据计数: %d 条", histCount)
|
||
}
|
||
|
||
// 检查是否有实时数据
|
||
var rtCount int
|
||
err = db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly
|
||
WHERE station_id = $1 AND provider = 'open-meteo'
|
||
AND forecast_time BETWEEN $2 AND $3`,
|
||
stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")).Scan(&rtCount)
|
||
if err != nil {
|
||
log.Printf("查询实时数据计数失败: %v", err)
|
||
} else {
|
||
log.Printf("时间范围内实时数据计数: %d 条", rtCount)
|
||
}
|
||
} else {
|
||
query = `
|
||
WITH latest_forecasts AS (
|
||
SELECT DISTINCT ON (forecast_time)
|
||
station_id, provider, issued_at, forecast_time,
|
||
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
||
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100
|
||
FROM forecast_hourly
|
||
WHERE station_id = $1 AND provider = $2
|
||
AND forecast_time BETWEEN $3 AND $4
|
||
ORDER BY forecast_time, issued_at DESC
|
||
)
|
||
SELECT
|
||
to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
||
provider,
|
||
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
|
||
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
|
||
humidity_pct as humidity,
|
||
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
|
||
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
|
||
wind_dir_deg as wind_direction,
|
||
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
|
||
precip_prob_pct as precip_prob,
|
||
uv_index as uv
|
||
FROM latest_forecasts
|
||
ORDER BY forecast_time`
|
||
args = []interface{}{stationID, provider, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")}
|
||
}
|
||
} else {
|
||
// 不指定预报提供商,取所有
|
||
query = `
|
||
WITH latest_forecasts AS (
|
||
SELECT DISTINCT ON (provider, forecast_time)
|
||
station_id, provider, issued_at, forecast_time,
|
||
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
||
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100
|
||
FROM forecast_hourly
|
||
WHERE station_id = $1 AND forecast_time BETWEEN $2 AND $3
|
||
ORDER BY provider, forecast_time, issued_at DESC
|
||
)
|
||
SELECT
|
||
to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
||
provider,
|
||
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
|
||
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
|
||
humidity_pct as humidity,
|
||
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
|
||
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
|
||
wind_dir_deg as wind_direction,
|
||
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
|
||
precip_prob_pct as precip_prob,
|
||
uv_index as uv
|
||
FROM latest_forecasts
|
||
ORDER BY forecast_time, provider`
|
||
args = []interface{}{stationID, startTime.Format("2006-01-02 15:04:05-07"), endTime.Format("2006-01-02 15:04:05-07")}
|
||
}
|
||
|
||
// 执行查询
|
||
rows, err := db.Query(query, args...)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询预报数据失败: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
var points []types.ForecastPoint
|
||
for rows.Next() {
|
||
var point types.ForecastPoint
|
||
err := rows.Scan(
|
||
&point.DateTime,
|
||
&point.Provider,
|
||
&point.IssuedAt,
|
||
&point.Temperature,
|
||
&point.Humidity,
|
||
&point.Pressure,
|
||
&point.WindSpeed,
|
||
&point.WindDir,
|
||
&point.Rainfall,
|
||
&point.PrecipProb,
|
||
&point.UV,
|
||
)
|
||
if err != nil {
|
||
log.Printf("数据扫描错误: %v", err)
|
||
continue
|
||
}
|
||
point.Source = "forecast"
|
||
points = append(points, point)
|
||
}
|
||
|
||
return points, nil
|
||
}
|