From 337ee06cafc801412439fce654d638421704d531 Mon Sep 17 00:00:00 2001 From: yarnom Date: Thu, 21 Aug 2025 18:20:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E6=B0=94=E8=B1=A1?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=97=B6=E9=97=B4=E6=A1=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/weatherstation/main.go | 36 +++ db/schema.sql | 539 ++++++++++++++++++++++++++++++++++++ internal/database/models.go | 81 ++++++ internal/server/gin.go | 4 +- internal/server/udp.go | 20 ++ internal/tools/backfill.go | 257 +++++++++++++++++ 6 files changed, 935 insertions(+), 2 deletions(-) create mode 100644 db/schema.sql create mode 100644 internal/tools/backfill.go diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index 371d2d2..4c701cd 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -1,17 +1,26 @@ package main import ( + "context" "flag" "log" "sync" + "time" "weatherstation/internal/database" "weatherstation/internal/server" + "weatherstation/internal/tools" ) func main() { // 命令行参数 var webOnly = flag.Bool("web", false, "只启动Web服务器(Gin)") var udpOnly = flag.Bool("udp", false, "只启动UDP服务器") + // 调试:回填10分钟表 + var doBackfill = flag.Bool("backfill", false, "将16s原始数据聚合写入10分钟表(调试用途)") + var bfStation = flag.String("station", "", "指定站点ID(为空则全站回填)") + var bfFrom = flag.String("from", "", "回填起始时间,格式:YYYY-MM-DD HH:MM:SS") + var bfTo = flag.String("to", "", "回填结束时间,格式:YYYY-MM-DD HH:MM:SS") + var bfWrap = flag.Float64("wrap", 0, "回绕一圈对应毫米值(mm),<=0 则降级为仅计当前值") flag.Parse() // 设置日志 @@ -21,6 +30,33 @@ func main() { _ = database.GetDB() // 确保数据库连接已初始化 defer database.Close() + // Backfill 调试路径 + if *doBackfill { + if *bfFrom == "" || *bfTo == "" { + log.Fatalln("backfill 需要提供 --from 与 --to 时间") + } + fromT, err := time.Parse("2006-01-02 15:04:05", *bfFrom) + if err != nil { + log.Fatalf("解析from失败: %v", err) + } + toT, err := time.Parse("2006-01-02 15:04:05", *bfTo) + if err != nil { + log.Fatalf("解析to失败: %v", err) + } + ctx := context.Background() + if err := tools.RunBackfill10Min(ctx, tools.BackfillOptions{ + StationID: *bfStation, + FromTime: fromT, + ToTime: toT, + WrapCycleMM: *bfWrap, + BucketMinutes: 10, + }); err != nil { + log.Fatalf("回填失败: %v", err) + } + log.Println("回填完成") + return + } + // 根据命令行参数启动服务 if *webOnly { // 只启动Web服务器 diff --git a/db/schema.sql b/db/schema.sql new file mode 100644 index 0000000..d7e79dc --- /dev/null +++ b/db/schema.sql @@ -0,0 +1,539 @@ +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 14.18 (Ubuntu 14.18-0ubuntu0.22.04.1) +-- Dumped by pg_dump version 17.5 + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET transaction_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET xmloption = content; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- Name: public; Type: SCHEMA; Schema: -; Owner: - +-- + +-- *not* creating schema, since initdb creates it + + +SET default_tablespace = ''; + +SET default_table_access_method = heap; + +-- +-- Name: rs485_weather_data; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.rs485_weather_data ( + id integer NOT NULL, + station_id character varying(50) NOT NULL, + "timestamp" timestamp with time zone NOT NULL, + temperature double precision, + humidity double precision, + wind_speed double precision, + wind_direction double precision, + rainfall double precision, + light double precision, + uv double precision, + pressure double precision, + raw_data text +); + + +-- +-- Name: rs485_weather_data_bak; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.rs485_weather_data_bak ( + id integer, + station_id character varying(50), + "timestamp" timestamp without time zone, + temperature numeric(5,2), + humidity numeric(5,2), + wind_speed numeric(5,2), + wind_direction numeric(5,2), + rainfall numeric(5,2), + light numeric(15,2), + uv numeric(8,2), + pressure numeric(7,2), + raw_data text +); + + +-- +-- Name: rs485_weather_data_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.rs485_weather_data_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: rs485_weather_data_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.rs485_weather_data_id_seq OWNED BY public.rs485_weather_data.id; + + +-- +-- Name: stations; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.stations ( + station_id character varying(50) NOT NULL, + password character varying(50) NOT NULL, + name character varying(100), + location character varying(100), + latitude numeric(10,6), + longitude numeric(10,6), + altitude numeric(8,3), + created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP, + last_update timestamp with time zone, + software_type character varying(100), + device_type character varying(20) DEFAULT 'UNKNOWN'::character varying NOT NULL, + CONSTRAINT check_device_type CHECK (((device_type)::text = ANY ((ARRAY['ECOWITT'::character varying, 'WH65LP'::character varying, 'UNKNOWN'::character varying])::text[]))) +); + + +-- +-- Name: TABLE stations; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.stations IS '气象站设备信息表,存储设备的基本信息和认证信息'; + + +-- +-- Name: COLUMN stations.device_type; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.stations.device_type IS 'ECOWITT: WIFI型, WH65LP: 485型'; + + +-- +-- Name: weather_data; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.weather_data ( + id integer NOT NULL, + station_id character varying(50) NOT NULL, + "timestamp" timestamp with time zone NOT NULL, + temp_f integer, + humidity integer, + dewpoint_f integer, + windchill_f integer, + wind_dir integer, + wind_speed_mph integer, + wind_gust_mph integer, + rain_in integer, + daily_rain_in integer, + weekly_rain_in integer, + monthly_rain_in integer, + yearly_rain_in integer, + total_rain_in integer, + solar_radiation integer, + uv integer, + indoor_temp_f integer, + indoor_humidity integer, + abs_barometer_in integer, + barometer_in integer, + low_battery boolean, + raw_data text +); + + +-- +-- Name: TABLE weather_data; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.weather_data IS '气象站数据表,存储所有气象观测数据,数值型数据以整数形式存储,查询时需进行转换'; + + +-- +-- Name: COLUMN weather_data.id; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.id IS '自增主键ID'; + + +-- +-- Name: COLUMN weather_data.station_id; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.station_id IS '气象站ID,外键关联stations表'; + + +-- +-- Name: COLUMN weather_data."timestamp"; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data."timestamp" IS '数据记录时间,使用UTC+8时区(中国标准时间)'; + + +-- +-- Name: COLUMN weather_data.temp_f; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.temp_f IS '室外温度,存储值=实际值×10,单位:华氏度,查询时需除以10,如768表示76.8°F'; + + +-- +-- Name: COLUMN weather_data.humidity; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.humidity IS '室外湿度,单位:百分比,如53表示53%'; + + +-- +-- Name: COLUMN weather_data.dewpoint_f; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.dewpoint_f IS '露点温度,存储值=实际值×10,单位:华氏度,查询时需除以10,如585表示58.5°F'; + + +-- +-- Name: COLUMN weather_data.windchill_f; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.windchill_f IS '风寒指数,存储值=实际值×10,单位:华氏度,查询时需除以10,如768表示76.8°F'; + + +-- +-- Name: COLUMN weather_data.wind_dir; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.wind_dir IS '风向,单位:角度(0-359),如44表示东北风(44°)'; + + +-- +-- Name: COLUMN weather_data.wind_speed_mph; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.wind_speed_mph IS '风速,存储值=实际值×100,单位:英里/小时,查询时需除以100,如100表示1.00mph'; + + +-- +-- Name: COLUMN weather_data.wind_gust_mph; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.wind_gust_mph IS '阵风速度,存储值=实际值×100,单位:英里/小时,查询时需除以100,如100表示1.00mph'; + + +-- +-- Name: COLUMN weather_data.rain_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.rain_in IS '当前降雨速率,存储值=实际值×1000,单位:英寸/小时,查询时需除以1000,如500表示0.500英寸/小时'; + + +-- +-- Name: COLUMN weather_data.daily_rain_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.daily_rain_in IS '日降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如500表示0.500英寸'; + + +-- +-- Name: COLUMN weather_data.weekly_rain_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.weekly_rain_in IS '周降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如500表示0.500英寸'; + + +-- +-- Name: COLUMN weather_data.monthly_rain_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.monthly_rain_in IS '月降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如79表示0.079英寸'; + + +-- +-- Name: COLUMN weather_data.yearly_rain_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.yearly_rain_in IS '年降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如79表示0.079英寸'; + + +-- +-- Name: COLUMN weather_data.total_rain_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.total_rain_in IS '总降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如79表示0.079英寸'; + + +-- +-- Name: COLUMN weather_data.solar_radiation; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.solar_radiation IS '太阳辐射,存储值=实际值×100,单位:W/m²,查询时需除以100,如172表示1.72W/m²'; + + +-- +-- Name: COLUMN weather_data.uv; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.uv IS '紫外线指数,整数值,如0表示无紫外线'; + + +-- +-- Name: COLUMN weather_data.indoor_temp_f; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.indoor_temp_f IS '室内温度,存储值=实际值×10,单位:华氏度,查询时需除以10,如837表示83.7°F'; + + +-- +-- Name: COLUMN weather_data.indoor_humidity; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.indoor_humidity IS '室内湿度,单位:百分比,如48表示48%'; + + +-- +-- Name: COLUMN weather_data.abs_barometer_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.abs_barometer_in IS '绝对气压,存储值=实际值×1000,单位:英寸汞柱,查询时需除以1000,如29320表示29.320英寸汞柱'; + + +-- +-- Name: COLUMN weather_data.barometer_in; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.barometer_in IS '相对气压,存储值=实际值×1000,单位:英寸汞柱,查询时需除以1000,如29805表示29.805英寸汞柱'; + + +-- +-- Name: COLUMN weather_data.low_battery; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.low_battery IS '低电量标志,布尔值,true表示电量低'; + + +-- +-- Name: COLUMN weather_data.raw_data; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.weather_data.raw_data IS '原始数据字符串'; + + +-- +-- Name: weather_data_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.weather_data_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: weather_data_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.weather_data_id_seq OWNED BY public.weather_data.id; + + +-- +-- Name: rs485_weather_data id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.rs485_weather_data ALTER COLUMN id SET DEFAULT nextval('public.rs485_weather_data_id_seq'::regclass); + + +-- +-- Name: weather_data id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.weather_data ALTER COLUMN id SET DEFAULT nextval('public.weather_data_id_seq'::regclass); + + +-- +-- Name: rs485_weather_data rs485_udx; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.rs485_weather_data + ADD CONSTRAINT rs485_udx UNIQUE (station_id, "timestamp"); + + +-- +-- Name: rs485_weather_data rs485_weather_data_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.rs485_weather_data + ADD CONSTRAINT rs485_weather_data_pkey PRIMARY KEY (id); + + +-- +-- Name: stations stations_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.stations + ADD CONSTRAINT stations_pkey PRIMARY KEY (station_id); + + +-- +-- Name: weather_data weather_data_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.weather_data + ADD CONSTRAINT weather_data_pkey PRIMARY KEY (id); + + +-- +-- Name: idx_rwd_station_time; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_rwd_station_time ON public.rs485_weather_data USING btree (station_id, "timestamp"); + + +-- +-- Name: idx_rwd_time; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_rwd_time ON public.rs485_weather_data USING btree ("timestamp"); + + +-- +-- Name: idx_weather_data_station_timestamp; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_weather_data_station_timestamp ON public.weather_data USING btree (station_id, "timestamp"); + + +-- +-- Name: INDEX idx_weather_data_station_timestamp; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.idx_weather_data_station_timestamp IS '气象站ID和时间戳的复合索引'; + + +-- +-- Name: idx_weather_data_timestamp; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_weather_data_timestamp ON public.weather_data USING btree ("timestamp"); + + +-- +-- Name: INDEX idx_weather_data_timestamp; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.idx_weather_data_timestamp IS '时间戳索引'; + + +-- +-- Name: rs485_weather_data rs485_weather_data_station_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.rs485_weather_data + ADD CONSTRAINT rs485_weather_data_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id); + + +-- +-- Name: weather_data weather_data_station_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.weather_data + ADD CONSTRAINT weather_data_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id); + + +-- +-- PostgreSQL database dump complete +-- + + +-- Name: rs485_weather_10min; Type: TABLE; Schema: public; Owner: - +-- 用途:10分钟粒度聚合(长期保留,缩放整数存储) +-- +CREATE TABLE IF NOT EXISTS public.rs485_weather_10min ( + id SERIAL PRIMARY KEY, + station_id character varying(50) NOT NULL, + "bucket_start" timestamp with time zone NOT NULL, + temp_c_x100 integer, + humidity_pct integer, + wind_speed_ms_x1000 integer, + wind_gust_ms_x1000 integer, + wind_dir_deg integer, + rain_10m_mm_x1000 integer, + rain_total_mm_x1000 integer, + solar_wm2_x100 integer, + uv_index integer, + pressure_hpa_x100 integer, + sample_count integer DEFAULT 0 NOT NULL +); + +-- 约束与索引 +ALTER TABLE ONLY public.rs485_weather_10min + ADD CONSTRAINT r10_udx UNIQUE (station_id, "bucket_start"); + +ALTER TABLE ONLY public.rs485_weather_10min + ADD CONSTRAINT rs485_weather_10min_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id); + +CREATE INDEX idx_r10_station_time ON public.rs485_weather_10min USING btree (station_id, "bucket_start"); + +COMMENT ON TABLE public.rs485_weather_10min IS '10分钟聚合数据表,数值型以缩放整数存储(温度×100、风速×1000等)'; +COMMENT ON COLUMN public.rs485_weather_10min."bucket_start" IS '10分钟桶开始时间(与CST对齐分桶,存储为timestamptz)'; +COMMENT ON COLUMN public.rs485_weather_10min.temp_c_x100 IS '10分钟平均温度,单位℃×100'; +COMMENT ON COLUMN public.rs485_weather_10min.humidity_pct IS '10分钟平均湿度,单位%'; +COMMENT ON COLUMN public.rs485_weather_10min.wind_speed_ms_x1000 IS '10分钟平均风速,单位m/s×1000'; +COMMENT ON COLUMN public.rs485_weather_10min.wind_gust_ms_x1000 IS '10分钟最大阵风,单位m/s×1000'; +COMMENT ON COLUMN public.rs485_weather_10min.wind_dir_deg IS '10分钟风向向量平均,单位度(0-359)'; +COMMENT ON COLUMN public.rs485_weather_10min.rain_10m_mm_x1000 IS '10分钟降雨量,按“带回绕正增量”计算,单位mm×1000'; +COMMENT ON COLUMN public.rs485_weather_10min.rain_total_mm_x1000 IS '桶末设备累计降雨(自开机起累加,0..FFFF回绕),单位mm×1000'; +COMMENT ON COLUMN public.rs485_weather_10min.solar_wm2_x100 IS '10分钟平均太阳辐射,单位W/m²×100'; +COMMENT ON COLUMN public.rs485_weather_10min.uv_index IS '10分钟平均紫外线指数'; +COMMENT ON COLUMN public.rs485_weather_10min.pressure_hpa_x100 IS '10分钟平均气压,单位hPa×100'; +COMMENT ON COLUMN public.rs485_weather_10min.sample_count IS '10分钟样本数量'; + + +-- +-- Name: forecast_hourly; Type: TABLE; Schema: public; Owner: - +-- 用途:小时级预报(版本化:issued_at 为预报方案发布时间) +-- +CREATE TABLE IF NOT EXISTS public.forecast_hourly ( + id SERIAL PRIMARY KEY, + station_id character varying(50) NOT NULL, + provider character varying(50) NOT NULL, + issued_at timestamp with time zone NOT NULL, + forecast_time timestamp with time zone NOT NULL, + temp_c_x100 integer, + humidity_pct integer, + wind_speed_ms_x1000 integer, + wind_gust_ms_x1000 integer, + wind_dir_deg integer, + rain_mm_x1000 integer, + precip_prob_pct integer, + uv_index integer, + pressure_hpa_x100 integer +); + +-- 约束与索引 +ALTER TABLE ONLY public.forecast_hourly + ADD CONSTRAINT forecast_hourly_udx UNIQUE (station_id, provider, issued_at, forecast_time); + +ALTER TABLE ONLY public.forecast_hourly + ADD CONSTRAINT forecast_hourly_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id); + +CREATE INDEX idx_fcast_station_time ON public.forecast_hourly USING btree (station_id, forecast_time); + +-- 注释 +COMMENT ON TABLE public.forecast_hourly IS '小时级预报表,按issued_at版本化;要素使用缩放整数存储'; +COMMENT ON COLUMN public.forecast_hourly.issued_at IS '预报方案发布时间(版本时间)'; +COMMENT ON COLUMN public.forecast_hourly.forecast_time IS '目标小时时间戳'; +COMMENT ON COLUMN public.forecast_hourly.rain_mm_x1000 IS '该小时降雨量,单位mm×1000'; \ No newline at end of file diff --git a/internal/database/models.go b/internal/database/models.go index db013fa..3a9112b 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -111,6 +111,87 @@ func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, return points, nil } +// GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均) +func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) { + var query string + switch interval { + case "10min": + query = ` + SELECT + to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time, + ROUND(temp_c_x100/100.0, 2) AS temperature, + ROUND(humidity_pct::numeric, 2) AS humidity, + ROUND(pressure_hpa_x100/100.0, 2) AS pressure, + ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed, + ROUND(wind_dir_deg::numeric, 2) AS wind_direction, + ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall, + ROUND(solar_wm2_x100/100.0, 2) AS light, + ROUND(uv_index::numeric, 2) AS uv + FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 + ORDER BY bucket_start` + case "30min": + query = buildAggFrom10MinQuery("30 minutes") + default: // 1hour + query = buildAggFrom10MinQuery("1 hour") + } + + rows, err := db.Query(query, stationID, startTime, endTime) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []types.WeatherPoint + for rows.Next() { + var p types.WeatherPoint + if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV); err != nil { + continue + } + points = append(points, p) + } + return points, nil +} + +// buildAggFrom10MinQuery 返回从10分钟表再聚合的SQL(interval 支持 '30 minutes' 或 '1 hour') +func buildAggFrom10MinQuery(interval string) string { + return ` + WITH base AS ( + SELECT * FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 + ), g AS ( + SELECT + date_trunc('hour', bucket_start) + floor(extract(epoch from (bucket_start - date_trunc('hour', bucket_start)))/extract(epoch from '` + interval + `'::interval)) * '` + interval + `'::interval AS grp, + SUM(temp_c_x100 * sample_count)::bigint AS w_temp, + SUM(humidity_pct * sample_count)::bigint AS w_hum, + SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p, + SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar, + SUM(uv_index * sample_count)::bigint AS w_uv, + SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws, + MAX(wind_gust_ms_x1000) AS gust_max, + SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum, + SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum, + SUM(rain_10m_mm_x1000) AS rain_sum, + SUM(sample_count) AS n_sum + FROM base + GROUP BY 1 + ) + SELECT + to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time, + ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature, + ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity, + ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure, + ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed, + ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0 + THEN degrees(atan2(sin_sum, cos_sum)) + 360 + ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction, + ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall, + ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light, + ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv + FROM g + ORDER BY grp` +} + // buildWeatherDataQuery 构建天气数据查询SQL func buildWeatherDataQuery(interval string) string { return ` diff --git a/internal/server/gin.go b/internal/server/gin.go index 8a1ffa0..4319880 100644 --- a/internal/server/gin.go +++ b/internal/server/gin.go @@ -119,8 +119,8 @@ func getDataHandler(c *gin.Context) { return } - // 获取数据 - points, err := database.GetWeatherData(database.GetDB(), stationID, start, end, interval) + // 获取数据(改为基于10分钟聚合表的再聚合) + points, err := database.GetSeriesFrom10Min(database.GetDB(), stationID, start, end, interval) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"}) return diff --git a/internal/server/udp.go b/internal/server/udp.go index 41cc302..8d25632 100644 --- a/internal/server/udp.go +++ b/internal/server/udp.go @@ -2,6 +2,7 @@ package server import ( "bufio" + "context" "encoding/hex" "fmt" "io" @@ -14,6 +15,7 @@ import ( "time" "unicode/utf8" "weatherstation/internal/config" + "weatherstation/internal/tools" "weatherstation/model" ) @@ -117,6 +119,24 @@ func StartUDPServer() error { log.Printf("UDP服务器已启动,监听端口 %d...", cfg.Server.UDPPort) buffer := make([]byte, 2048) + // 后台定时:每分钟回刷最近30分钟(全站) + go func() { + for { + now := time.Now() + from := now.Add(-30 * time.Minute) + ctx := context.Background() + if err := tools.RunBackfill10Min(ctx, tools.BackfillOptions{ + FromTime: from, + ToTime: now, + WrapCycleMM: 0, + BucketMinutes: 10, + }); err != nil { + log.Printf("定时回填失败: %v", err) + } + time.Sleep(1 * time.Minute) + } + }() + for { n, addr, err := conn.ReadFrom(buffer) if err != nil { diff --git a/internal/tools/backfill.go b/internal/tools/backfill.go new file mode 100644 index 0000000..1d473ee --- /dev/null +++ b/internal/tools/backfill.go @@ -0,0 +1,257 @@ +package tools + +import ( + "context" + "database/sql" + "fmt" + "math" + "time" + + "weatherstation/internal/database" +) + +type BackfillOptions struct { + StationID string // 为空则处理所有站点 + FromTime time.Time // 含 + ToTime time.Time // 含 + WrapCycleMM float64 // 设备累计回绕一圈对应的毫米值;<=0 则按“回绕后仅记当次值”降级处理 + BucketMinutes int // 默认10 +} + +// RunBackfill10Min 将 rs485_weather_data 的16秒数据汇总写入 rs485_weather_10min +func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error { + if opts.BucketMinutes <= 0 { + opts.BucketMinutes = 10 + } + + db := database.GetDB() + // 取时序数据 + query := ` + SELECT station_id, timestamp, temperature, humidity, wind_speed, wind_direction, + rainfall, light, uv, pressure + FROM rs485_weather_data + WHERE timestamp >= $1 AND timestamp <= $2 + %s + ORDER BY station_id, timestamp` + + stationFilter := "" + args := []any{opts.FromTime, opts.ToTime} + if opts.StationID != "" { + stationFilter = "AND station_id = $3" + args = append(args, opts.StationID) + } + + rows, err := db.QueryContext(ctx, fmt.Sprintf(query, stationFilter), args...) + if err != nil { + return fmt.Errorf("query raw failed: %w", err) + } + defer rows.Close() + + // 载入上海时区用于分桶 + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + type agg struct { + // sums + sumTemp float64 + sumHum float64 + sumWS float64 + sumLight float64 + sumUV float64 + sumP float64 + sinSum float64 + cosSum float64 + gustMax float64 + rainIncSum float64 + count int + lastTotal float64 // 桶末累计 + lastTS time.Time + } + + currentStation := "" + var prevTotal float64 + var prevTS time.Time + + buckets := make(map[string]map[time.Time]*agg) // station -> bucketStart -> agg + + for rows.Next() { + var ( + stationID string + ts time.Time + t, h, ws, wd, rf, light, uv, p sql.NullFloat64 + ) + if err := rows.Scan(&stationID, &ts, &t, &h, &ws, &wd, &rf, &light, &uv, &p); err != nil { + return fmt.Errorf("scan failed: %w", err) + } + + // 切换设备时重置 prevTotal + if stationID != currentStation { + currentStation = stationID + prevTotal = math.NaN() + prevTS = time.Time{} + } + + // 计算该样本所在桶(CST对齐) + bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute) + bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc) + + if _, ok := buckets[stationID]; !ok { + buckets[stationID] = make(map[time.Time]*agg) + } + ag := buckets[stationID][bucketStart] + if ag == nil { + ag = &agg{gustMax: -1} + buckets[stationID][bucketStart] = ag + } + + // 累加平均项 + if t.Valid { + ag.sumTemp += t.Float64 + } + if h.Valid { + ag.sumHum += h.Float64 + } + if ws.Valid { + ag.sumWS += ws.Float64 + if ws.Float64 > ag.gustMax { + ag.gustMax = ws.Float64 + } + } + if light.Valid { + ag.sumLight += light.Float64 + } + if uv.Valid { + ag.sumUV += uv.Float64 + } + if p.Valid { + ag.sumP += p.Float64 + } + if wd.Valid { + rad := wd.Float64 * math.Pi / 180.0 + ag.sinSum += math.Sin(rad) + ag.cosSum += math.Cos(rad) + } + ag.count++ + + // 雨量增量:按时间比例切分到跨越的各个桶,避免边界全部被计入后一桶 + if rf.Valid { + curr := rf.Float64 + if !math.IsNaN(prevTotal) && !prevTS.IsZero() { + // 计算增量(带回绕) + inc := 0.0 + if curr >= prevTotal { + inc = curr - prevTotal + } else { + if opts.WrapCycleMM > 0 { + inc = (opts.WrapCycleMM - prevTotal) + curr + } else { + // 降级:仅计当前值 + inc = curr + } + } + + // 将 [prevTS, ts] 区间按10分钟边界切分,按时长比例分配增量 + startLocal := prevTS.In(loc) + endLocal := ts.In(loc) + if endLocal.After(startLocal) && inc > 0 { + totalSec := endLocal.Sub(startLocal).Seconds() + segStart := startLocal + for segStart.Before(endLocal) { + segBucketStart := segStart.Truncate(time.Duration(opts.BucketMinutes) * time.Minute) + segBucketEnd := segBucketStart.Add(time.Duration(opts.BucketMinutes) * time.Minute) + segEnd := endLocal + if segBucketEnd.Before(segEnd) { + segEnd = segBucketEnd + } + portion := inc * (segEnd.Sub(segStart).Seconds() / totalSec) + // 确保段对应桶存在 + if _, ok := buckets[stationID][segBucketStart]; !ok { + buckets[stationID][segBucketStart] = &agg{gustMax: -1} + } + buckets[stationID][segBucketStart].rainIncSum += portion + segStart = segEnd + } + } + } + // 记录桶末累计到当前样本所在桶 + ag.lastTotal = curr + prevTotal = curr + prevTS = ts + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("iterate rows failed: %w", err) + } + + // 写入/更新到 10 分钟表 + upsert := ` + INSERT INTO rs485_weather_10min ( + station_id, bucket_start, + temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, + wind_dir_deg, rain_10m_mm_x1000, rain_total_mm_x1000, + solar_wm2_x100, uv_index, pressure_hpa_x100, sample_count + ) VALUES ( + $1, $2, $3, $4, $5, $6, + $7, $8, $9, + $10, $11, $12, $13 + ) ON CONFLICT (station_id, bucket_start) DO UPDATE SET + temp_c_x100 = EXCLUDED.temp_c_x100, + humidity_pct = EXCLUDED.humidity_pct, + wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, + wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, + wind_dir_deg = EXCLUDED.wind_dir_deg, + rain_10m_mm_x1000 = EXCLUDED.rain_10m_mm_x1000, + rain_total_mm_x1000 = EXCLUDED.rain_total_mm_x1000, + solar_wm2_x100 = EXCLUDED.solar_wm2_x100, + uv_index = EXCLUDED.uv_index, + pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100, + sample_count = EXCLUDED.sample_count` + + for stationID, m := range buckets { + for bucketStart, ag := range m { + if ag.count == 0 { + continue + } + avgTemp := ag.sumTemp / float64(ag.count) + avgHum := ag.sumHum / float64(ag.count) + avgWS := ag.sumWS / float64(ag.count) + avgLight := ag.sumLight / float64(ag.count) + avgUV := ag.sumUV / float64(ag.count) + avgP := ag.sumP / float64(ag.count) + // 风向向量平均 + windDir := 0.0 + if ag.sinSum != 0 || ag.cosSum != 0 { + ang := math.Atan2(ag.sinSum/float64(ag.count), ag.cosSum/float64(ag.count)) * 180 / math.Pi + if ang < 0 { + ang += 360 + } + windDir = ang + } + + // 缩放整数 + tempScaled := int(math.Round(avgTemp * 100)) + humScaled := int(math.Round(avgHum)) + wsScaled := int(math.Round(avgWS * 1000)) + gustScaled := int(math.Round(ag.gustMax * 1000)) + wdScaled := int(math.Round(windDir)) + rain10mScaled := int(math.Round(ag.rainIncSum * 1000)) + rainTotalScaled := int(math.Round(ag.lastTotal * 1000)) + solarScaled := int(math.Round(avgLight * 100)) + uvScaled := int(math.Round(avgUV)) + pScaled := int(math.Round(avgP * 100)) + + if _, err := db.ExecContext(ctx, upsert, + stationID, bucketStart, + tempScaled, humScaled, wsScaled, gustScaled, + wdScaled, rain10mScaled, rainTotalScaled, + solarScaled, uvScaled, pScaled, ag.count, + ); err != nil { + return fmt.Errorf("upsert 10min failed: %w", err) + } + } + } + + return nil +}