2025-08-22 10:03:34 +08:00

261 lines
8.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package database
import (
"database/sql"
"log"
"time"
"weatherstation/pkg/types"
)
// GetOnlineDevicesCount 获取在线设备数量
func GetOnlineDevicesCount(db *sql.DB) int {
query := `
SELECT COUNT(DISTINCT station_id)
FROM rs485_weather_data
WHERE timestamp > NOW() - INTERVAL '5 minutes'`
var count int
if err := db.QueryRow(query).Scan(&count); err != nil {
return 0
}
return count
}
// GetStations 获取所有WH65LP站点列表
func GetStations(db *sql.DB) ([]types.Station, error) {
query := `
SELECT DISTINCT s.station_id,
COALESCE(s.password, '') as station_name,
'WH65LP' as device_type,
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update,
COALESCE(s.latitude, 0) as latitude,
COALESCE(s.longitude, 0) as longitude,
COALESCE(s.name, '') as name,
COALESCE(s.location, '') as location
FROM stations s
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
WHERE s.station_id LIKE 'RS485-%'
GROUP BY s.station_id, s.password, s.latitude, s.longitude, s.name, s.location
ORDER BY s.station_id`
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var stations []types.Station
for rows.Next() {
var station types.Station
var lastUpdate time.Time
err := rows.Scan(
&station.StationID,
&station.StationName,
&station.DeviceType,
&lastUpdate,
&station.Latitude,
&station.Longitude,
&station.Name,
&station.Location,
)
if err != nil {
continue
}
station.LastUpdate = lastUpdate.Format("2006-01-02 15:04:05")
stations = append(stations, station)
}
return stations, nil
}
// GetWeatherData 获取指定站点的历史天气数据
func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
// 构建查询SQL统一风向矢量平均雨量为累计量的正增量求和
var query string
var intervalStr string
switch interval {
case "10min":
intervalStr = "10 minutes"
case "30min":
intervalStr = "30 minutes"
default: // 1hour
intervalStr = "1 hour"
}
query = buildWeatherDataQuery(intervalStr)
rows, err := db.Query(query, intervalStr, stationID, startTime, endTime)
if err != nil {
return nil, err
}
defer rows.Close()
var points []types.WeatherPoint
for rows.Next() {
var point types.WeatherPoint
err := rows.Scan(
&point.DateTime,
&point.Temperature,
&point.Humidity,
&point.Pressure,
&point.WindSpeed,
&point.WindDir,
&point.Rainfall,
&point.Light,
&point.UV,
)
if err != nil {
continue
}
points = append(points, point)
}
return points, nil
}
// GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均)
func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
log.Printf("查询数据: stationID=%s, start=%v, end=%v, interval=%s",
stationID, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), interval)
var query string
switch interval {
case "10min":
query = `
SELECT
to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
ROUND(temp_c_x100/100.0, 2) AS temperature,
ROUND(humidity_pct::numeric, 2) AS humidity,
ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed,
ROUND(wind_dir_deg::numeric, 2) AS wind_direction,
ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall,
ROUND(solar_wm2_x100/100.0, 2) AS light,
ROUND(uv_index::numeric, 2) AS uv
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
ORDER BY bucket_start`
case "30min":
query = buildAggFrom10MinQuery("30 minutes")
default: // 1hour
query = buildAggFrom10MinQuery("1 hour")
}
// // 调试输出完整SQL
// debugSQL := fmt.Sprintf("-- SQL for %s\n%s\n-- Params: stationID=%s, start=%v, end=%v",
// interval, query, stationID, startTime, endTime)
// log.Println(debugSQL)
rows, err := db.Query(query, stationID, startTime, endTime)
if err != nil {
log.Printf("查询失败: %v", err)
return nil, err
}
defer rows.Close()
var points []types.WeatherPoint
for rows.Next() {
var p types.WeatherPoint
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV); err != nil {
continue
}
points = append(points, p)
}
return points, nil
}
// buildAggFrom10MinQuery 返回从10分钟表再聚合的SQLinterval 支持 '30 minutes' 或 '1 hour'
func buildAggFrom10MinQuery(interval string) string {
return `
WITH base AS (
SELECT * FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
), g AS (
SELECT
CASE '` + interval + `'
WHEN '1 hour' THEN date_trunc('hour', bucket_start)
WHEN '30 minutes' THEN
date_trunc('hour', bucket_start) +
CASE WHEN date_part('minute', bucket_start) >= 30
THEN '30 minutes'::interval
ELSE '0 minutes'::interval
END
END AS grp,
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
SUM(humidity_pct * sample_count)::bigint AS w_hum,
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
SUM(uv_index * sample_count)::bigint AS w_uv,
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
MAX(wind_gust_ms_x1000) AS gust_max,
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
SUM(rain_10m_mm_x1000) AS rain_sum,
SUM(sample_count) AS n_sum
FROM base
GROUP BY 1
)
SELECT
to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed,
ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
THEN degrees(atan2(sin_sum, cos_sum)) + 360
ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction,
ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall,
ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light,
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv
FROM g
ORDER BY grp`
}
// buildWeatherDataQuery 构建天气数据查询SQL
func buildWeatherDataQuery(interval string) string {
return `
WITH base AS (
SELECT
date_trunc('hour', timestamp) +
(floor(date_part('minute', timestamp) / extract(epoch from $1::interval) * 60) * $1::interval) as time_group,
timestamp as ts,
temperature, humidity, pressure, wind_speed, wind_direction, rainfall, light, uv
FROM rs485_weather_data
WHERE station_id = $2 AND timestamp BETWEEN $3 AND $4
),
rain_inc AS (
SELECT time_group, GREATEST(rainfall - LAG(rainfall) OVER (PARTITION BY time_group ORDER BY ts), 0) AS inc
FROM base
),
rain_sum AS (
SELECT time_group, SUM(inc) AS rainfall
FROM rain_inc
GROUP BY time_group
),
grouped_data AS (
SELECT
time_group,
AVG(temperature) as temperature,
AVG(humidity) as humidity,
AVG(pressure) as pressure,
AVG(wind_speed) as wind_speed,
DEGREES(ATAN2(AVG(SIN(RADIANS(wind_direction))), AVG(COS(RADIANS(wind_direction))))) AS wind_direction_raw,
AVG(light) as light,
AVG(uv) as uv
FROM base
GROUP BY time_group
)
SELECT
to_char(g.time_group, 'YYYY-MM-DD HH24:MI:SS') as date_time,
ROUND(g.temperature::numeric, 2) as temperature,
ROUND(g.humidity::numeric, 2) as humidity,
ROUND(g.pressure::numeric, 2) as pressure,
ROUND(g.wind_speed::numeric, 2) as wind_speed,
ROUND((CASE WHEN g.wind_direction_raw < 0 THEN g.wind_direction_raw + 360 ELSE g.wind_direction_raw END)::numeric, 2) AS wind_direction,
ROUND(COALESCE(r.rainfall, 0)::numeric, 3) as rainfall,
ROUND(g.light::numeric, 2) as light,
ROUND(g.uv::numeric, 2) as uv
FROM grouped_data g
LEFT JOIN rain_sum r ON r.time_group = g.time_group
ORDER BY g.time_group`
}