284 lines
13 KiB
Go
284 lines
13 KiB
Go
package data
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
"weatherstation/pkg/types"
|
|
)
|
|
|
|
func OnlineDevices() int {
|
|
const query = `SELECT COUNT(DISTINCT station_id) FROM rs485_weather_data WHERE timestamp > NOW() - INTERVAL '5 minutes'`
|
|
var count int
|
|
if err := DB().QueryRow(query).Scan(&count); err != nil {
|
|
return 0
|
|
}
|
|
return count
|
|
}
|
|
|
|
func Stations() ([]types.Station, error) {
|
|
const query = `
|
|
SELECT DISTINCT s.station_id,
|
|
COALESCE(s.station_alias, '') as station_alias,
|
|
COALESCE(s.password, '') as station_name,
|
|
'WH65LP' as device_type,
|
|
COALESCE(MAX(r.timestamp), '1970-01-01'::timestamp) as last_update,
|
|
COALESCE(s.latitude, 0) as latitude,
|
|
COALESCE(s.longitude, 0) as longitude,
|
|
COALESCE(s.name, '') as name,
|
|
COALESCE(s.location, '') as location,
|
|
COALESCE(s.z, 0) as z,
|
|
COALESCE(s.y, 0) as y,
|
|
COALESCE(s.x, 0) as x
|
|
FROM stations s
|
|
LEFT JOIN rs485_weather_data r ON s.station_id = r.station_id
|
|
WHERE s.station_id LIKE 'RS485-%'
|
|
GROUP BY s.station_id, s.station_alias, s.password, s.latitude, s.longitude, s.name, s.location, s.z, s.y, s.x
|
|
ORDER BY s.station_id`
|
|
rows, err := DB().Query(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var stations []types.Station
|
|
for rows.Next() {
|
|
var s types.Station
|
|
var last time.Time
|
|
if err := rows.Scan(&s.StationID, &s.StationAlias, &s.StationName, &s.DeviceType, &last, &s.Latitude, &s.Longitude, &s.Name, &s.Location, &s.Z, &s.Y, &s.X); err != nil {
|
|
continue
|
|
}
|
|
s.LastUpdate = last.Format("2006-01-02 15:04:05")
|
|
stations = append(stations, s)
|
|
}
|
|
return stations, nil
|
|
}
|
|
|
|
func SeriesRaw(stationID string, start, end time.Time) ([]types.WeatherPoint, error) {
|
|
const query = `
|
|
SELECT
|
|
to_char(timestamp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
|
COALESCE(temperature, 0) AS temperature,
|
|
COALESCE(humidity, 0) AS humidity,
|
|
COALESCE(pressure, 0) AS pressure,
|
|
COALESCE(wind_speed, 0) AS wind_speed,
|
|
COALESCE(wind_direction, 0) AS wind_direction,
|
|
COALESCE(rainfall, 0) AS rainfall,
|
|
COALESCE(light, 0) AS light,
|
|
COALESCE(uv, 0) AS uv,
|
|
COALESCE(rainfall, 0) AS rain_total
|
|
FROM rs485_weather_data
|
|
WHERE station_id = $1 AND timestamp >= $2 AND timestamp <= $3
|
|
ORDER BY timestamp`
|
|
rows, err := DB().Query(query, stationID, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var points []types.WeatherPoint
|
|
for rows.Next() {
|
|
var p types.WeatherPoint
|
|
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV, &p.RainTotal); err != nil {
|
|
continue
|
|
}
|
|
points = append(points, p)
|
|
}
|
|
return points, nil
|
|
}
|
|
|
|
func SeriesFrom10Min(stationID string, start, end time.Time, interval string) ([]types.WeatherPoint, error) {
|
|
log.Printf("查询数据: stationID=%s, start=%v, end=%v, interval=%s", stationID, start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), interval)
|
|
var query string
|
|
switch interval {
|
|
case "10min":
|
|
query = `
|
|
SELECT
|
|
to_char(bucket_start + interval '10 minutes', 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
|
ROUND(temp_c_x100/100.0, 2) AS temperature,
|
|
ROUND(humidity_pct::numeric, 2) AS humidity,
|
|
ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
|
|
ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed,
|
|
ROUND(wind_dir_deg::numeric, 2) AS wind_direction,
|
|
ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall,
|
|
ROUND(solar_wm2_x100/100.0, 2) AS light,
|
|
ROUND(uv_index::numeric, 2) AS uv,
|
|
ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total
|
|
FROM rs485_weather_10min
|
|
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
|
ORDER BY bucket_start + interval '10 minutes'`
|
|
case "30min":
|
|
query = buildAggFrom10MinQuery("30 minutes")
|
|
default:
|
|
query = buildAggFrom10MinQuery("1 hour")
|
|
}
|
|
rows, err := DB().Query(query, stationID, start, end)
|
|
if err != nil {
|
|
log.Printf("查询失败: %v", err)
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var points []types.WeatherPoint
|
|
for rows.Next() {
|
|
var p types.WeatherPoint
|
|
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV, &p.RainTotal); err != nil {
|
|
continue
|
|
}
|
|
points = append(points, p)
|
|
}
|
|
return points, nil
|
|
}
|
|
|
|
func buildAggFrom10MinQuery(interval string) string {
|
|
return `
|
|
WITH base AS (
|
|
SELECT * FROM rs485_weather_10min
|
|
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
|
), g AS (
|
|
SELECT
|
|
CASE '` + interval + `'
|
|
WHEN '1 hour' THEN date_trunc('hour', bucket_start)
|
|
WHEN '30 minutes' THEN
|
|
date_trunc('hour', bucket_start) +
|
|
CASE WHEN date_part('minute', bucket_start) >= 30
|
|
THEN '30 minutes'::interval
|
|
ELSE '0 minutes'::interval
|
|
END
|
|
END AS grp,
|
|
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
|
|
SUM(humidity_pct * sample_count)::bigint AS w_hum,
|
|
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
|
|
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
|
|
SUM(uv_index * sample_count)::bigint AS w_uv,
|
|
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
|
|
MAX(wind_gust_ms_x1000) AS gust_max,
|
|
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
|
|
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
|
|
SUM(rain_10m_mm_x1000) AS rain_sum,
|
|
SUM(sample_count) AS n_sum,
|
|
MAX(rain_total_mm_x1000) AS rain_total_max
|
|
FROM base
|
|
GROUP BY 1
|
|
)
|
|
SELECT
|
|
to_char(grp + '` + interval + `'::interval, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
|
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
|
|
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
|
|
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
|
|
ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed,
|
|
ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
|
|
THEN degrees(atan2(sin_sum, cos_sum)) + 360
|
|
ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction,
|
|
ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall,
|
|
ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light,
|
|
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv,
|
|
ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total
|
|
FROM g
|
|
ORDER BY grp + '` + interval + `'::interval`
|
|
}
|
|
|
|
func Forecast(stationID string, start, end time.Time, provider string, versions int) ([]types.ForecastPoint, error) {
|
|
var query string
|
|
var args []interface{}
|
|
if versions <= 0 {
|
|
versions = 1
|
|
}
|
|
if provider != "" {
|
|
if provider == "open-meteo" {
|
|
query = `
|
|
WITH ranked AS (
|
|
SELECT station_id, provider, issued_at, forecast_time,
|
|
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
|
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100,
|
|
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
|
|
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours
|
|
FROM forecast_hourly
|
|
WHERE station_id = $1 AND provider IN ('open-meteo','open-meteo_historical')
|
|
AND forecast_time BETWEEN $2 AND $3
|
|
)
|
|
SELECT to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
|
provider,
|
|
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
|
|
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
|
|
humidity_pct as humidity,
|
|
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
|
|
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
|
|
wind_dir_deg as wind_direction,
|
|
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
|
|
precip_prob_pct as precip_prob,
|
|
uv_index as uv,
|
|
lead_hours
|
|
FROM ranked WHERE rn <= $4
|
|
ORDER BY forecast_time, issued_at DESC`
|
|
args = []interface{}{stationID, start.Format("2006-01-02 15:04:05-07"), end.Format("2006-01-02 15:04:05-07"), versions}
|
|
} else {
|
|
query = `
|
|
WITH ranked AS (
|
|
SELECT station_id, provider, issued_at, forecast_time,
|
|
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
|
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100,
|
|
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
|
|
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours
|
|
FROM forecast_hourly
|
|
WHERE station_id = $1 AND provider = $2
|
|
AND forecast_time BETWEEN $3 AND $4
|
|
)
|
|
SELECT to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
|
provider,
|
|
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
|
|
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
|
|
humidity_pct as humidity,
|
|
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
|
|
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
|
|
wind_dir_deg as wind_direction,
|
|
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
|
|
precip_prob_pct as precip_prob,
|
|
uv_index as uv,
|
|
lead_hours
|
|
FROM ranked WHERE rn <= $5
|
|
ORDER BY forecast_time, issued_at DESC`
|
|
args = []interface{}{stationID, provider, start.Format("2006-01-02 15:04:05-07"), end.Format("2006-01-02 15:04:05-07"), versions}
|
|
}
|
|
} else {
|
|
query = `
|
|
WITH ranked AS (
|
|
SELECT station_id, provider, issued_at, forecast_time,
|
|
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
|
wind_dir_deg, rain_mm_x1000, precip_prob_pct, uv_index, pressure_hpa_x100,
|
|
ROW_NUMBER() OVER (PARTITION BY provider, forecast_time ORDER BY issued_at DESC) AS rn,
|
|
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours
|
|
FROM forecast_hourly
|
|
WHERE station_id = $1 AND forecast_time BETWEEN $2 AND $3
|
|
)
|
|
SELECT to_char(forecast_time, 'YYYY-MM-DD HH24:MI:SS') as date_time,
|
|
provider,
|
|
to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS') as issued_at,
|
|
ROUND(temp_c_x100::numeric / 100.0, 2) as temperature,
|
|
humidity_pct as humidity,
|
|
ROUND(pressure_hpa_x100::numeric / 100.0, 2) as pressure,
|
|
ROUND(wind_speed_ms_x1000::numeric / 1000.0, 2) as wind_speed,
|
|
wind_dir_deg as wind_direction,
|
|
ROUND(rain_mm_x1000::numeric / 1000.0, 3) as rainfall,
|
|
precip_prob_pct as precip_prob,
|
|
uv_index as uv,
|
|
lead_hours
|
|
FROM ranked WHERE rn <= $4
|
|
ORDER BY forecast_time, provider, issued_at DESC`
|
|
args = []interface{}{stationID, start.Format("2006-01-02 15:04:05-07"), end.Format("2006-01-02 15:04:05-07"), versions}
|
|
}
|
|
rows, err := DB().Query(query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("查询预报数据失败: %v", err)
|
|
}
|
|
defer rows.Close()
|
|
var points []types.ForecastPoint
|
|
for rows.Next() {
|
|
var p types.ForecastPoint
|
|
if err := rows.Scan(&p.DateTime, &p.Provider, &p.IssuedAt, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.PrecipProb, &p.UV, &p.LeadHours); err != nil {
|
|
log.Printf("数据扫描错误: %v", err)
|
|
continue
|
|
}
|
|
p.Source = "forecast"
|
|
points = append(points, p)
|
|
}
|
|
return points, nil
|
|
}
|