feat: 优化气象数据时间桶
This commit is contained in:
parent
3152c6bb14
commit
337ee06caf
@ -1,17 +1,26 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
"weatherstation/internal/database"
|
"weatherstation/internal/database"
|
||||||
"weatherstation/internal/server"
|
"weatherstation/internal/server"
|
||||||
|
"weatherstation/internal/tools"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// 命令行参数
|
// 命令行参数
|
||||||
var webOnly = flag.Bool("web", false, "只启动Web服务器(Gin)")
|
var webOnly = flag.Bool("web", false, "只启动Web服务器(Gin)")
|
||||||
var udpOnly = flag.Bool("udp", false, "只启动UDP服务器")
|
var udpOnly = flag.Bool("udp", false, "只启动UDP服务器")
|
||||||
|
// 调试:回填10分钟表
|
||||||
|
var doBackfill = flag.Bool("backfill", false, "将16s原始数据聚合写入10分钟表(调试用途)")
|
||||||
|
var bfStation = flag.String("station", "", "指定站点ID(为空则全站回填)")
|
||||||
|
var bfFrom = flag.String("from", "", "回填起始时间,格式:YYYY-MM-DD HH:MM:SS")
|
||||||
|
var bfTo = flag.String("to", "", "回填结束时间,格式:YYYY-MM-DD HH:MM:SS")
|
||||||
|
var bfWrap = flag.Float64("wrap", 0, "回绕一圈对应毫米值(mm),<=0 则降级为仅计当前值")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// 设置日志
|
// 设置日志
|
||||||
@ -21,6 +30,33 @@ func main() {
|
|||||||
_ = database.GetDB() // 确保数据库连接已初始化
|
_ = database.GetDB() // 确保数据库连接已初始化
|
||||||
defer database.Close()
|
defer database.Close()
|
||||||
|
|
||||||
|
// Backfill 调试路径
|
||||||
|
if *doBackfill {
|
||||||
|
if *bfFrom == "" || *bfTo == "" {
|
||||||
|
log.Fatalln("backfill 需要提供 --from 与 --to 时间")
|
||||||
|
}
|
||||||
|
fromT, err := time.Parse("2006-01-02 15:04:05", *bfFrom)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("解析from失败: %v", err)
|
||||||
|
}
|
||||||
|
toT, err := time.Parse("2006-01-02 15:04:05", *bfTo)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("解析to失败: %v", err)
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := tools.RunBackfill10Min(ctx, tools.BackfillOptions{
|
||||||
|
StationID: *bfStation,
|
||||||
|
FromTime: fromT,
|
||||||
|
ToTime: toT,
|
||||||
|
WrapCycleMM: *bfWrap,
|
||||||
|
BucketMinutes: 10,
|
||||||
|
}); err != nil {
|
||||||
|
log.Fatalf("回填失败: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("回填完成")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 根据命令行参数启动服务
|
// 根据命令行参数启动服务
|
||||||
if *webOnly {
|
if *webOnly {
|
||||||
// 只启动Web服务器
|
// 只启动Web服务器
|
||||||
|
|||||||
539
db/schema.sql
Normal file
539
db/schema.sql
Normal file
@ -0,0 +1,539 @@
|
|||||||
|
--
|
||||||
|
-- PostgreSQL database dump
|
||||||
|
--
|
||||||
|
|
||||||
|
-- Dumped from database version 14.18 (Ubuntu 14.18-0ubuntu0.22.04.1)
|
||||||
|
-- Dumped by pg_dump version 17.5
|
||||||
|
|
||||||
|
SET statement_timeout = 0;
|
||||||
|
SET lock_timeout = 0;
|
||||||
|
SET idle_in_transaction_session_timeout = 0;
|
||||||
|
SET transaction_timeout = 0;
|
||||||
|
SET client_encoding = 'UTF8';
|
||||||
|
SET standard_conforming_strings = on;
|
||||||
|
SELECT pg_catalog.set_config('search_path', '', false);
|
||||||
|
SET check_function_bodies = false;
|
||||||
|
SET xmloption = content;
|
||||||
|
SET client_min_messages = warning;
|
||||||
|
SET row_security = off;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: public; Type: SCHEMA; Schema: -; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
-- *not* creating schema, since initdb creates it
|
||||||
|
|
||||||
|
|
||||||
|
SET default_tablespace = '';
|
||||||
|
|
||||||
|
SET default_table_access_method = heap;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data; Type: TABLE; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE public.rs485_weather_data (
|
||||||
|
id integer NOT NULL,
|
||||||
|
station_id character varying(50) NOT NULL,
|
||||||
|
"timestamp" timestamp with time zone NOT NULL,
|
||||||
|
temperature double precision,
|
||||||
|
humidity double precision,
|
||||||
|
wind_speed double precision,
|
||||||
|
wind_direction double precision,
|
||||||
|
rainfall double precision,
|
||||||
|
light double precision,
|
||||||
|
uv double precision,
|
||||||
|
pressure double precision,
|
||||||
|
raw_data text
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data_bak; Type: TABLE; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE public.rs485_weather_data_bak (
|
||||||
|
id integer,
|
||||||
|
station_id character varying(50),
|
||||||
|
"timestamp" timestamp without time zone,
|
||||||
|
temperature numeric(5,2),
|
||||||
|
humidity numeric(5,2),
|
||||||
|
wind_speed numeric(5,2),
|
||||||
|
wind_direction numeric(5,2),
|
||||||
|
rainfall numeric(5,2),
|
||||||
|
light numeric(15,2),
|
||||||
|
uv numeric(8,2),
|
||||||
|
pressure numeric(7,2),
|
||||||
|
raw_data text
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data_id_seq; Type: SEQUENCE; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE SEQUENCE public.rs485_weather_data_id_seq
|
||||||
|
AS integer
|
||||||
|
START WITH 1
|
||||||
|
INCREMENT BY 1
|
||||||
|
NO MINVALUE
|
||||||
|
NO MAXVALUE
|
||||||
|
CACHE 1;
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER SEQUENCE public.rs485_weather_data_id_seq OWNED BY public.rs485_weather_data.id;
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: stations; Type: TABLE; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE public.stations (
|
||||||
|
station_id character varying(50) NOT NULL,
|
||||||
|
password character varying(50) NOT NULL,
|
||||||
|
name character varying(100),
|
||||||
|
location character varying(100),
|
||||||
|
latitude numeric(10,6),
|
||||||
|
longitude numeric(10,6),
|
||||||
|
altitude numeric(8,3),
|
||||||
|
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
last_update timestamp with time zone,
|
||||||
|
software_type character varying(100),
|
||||||
|
device_type character varying(20) DEFAULT 'UNKNOWN'::character varying NOT NULL,
|
||||||
|
CONSTRAINT check_device_type CHECK (((device_type)::text = ANY ((ARRAY['ECOWITT'::character varying, 'WH65LP'::character varying, 'UNKNOWN'::character varying])::text[])))
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: TABLE stations; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON TABLE public.stations IS '气象站设备信息表,存储设备的基本信息和认证信息';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN stations.device_type; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.stations.device_type IS 'ECOWITT: WIFI型, WH65LP: 485型';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: weather_data; Type: TABLE; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE public.weather_data (
|
||||||
|
id integer NOT NULL,
|
||||||
|
station_id character varying(50) NOT NULL,
|
||||||
|
"timestamp" timestamp with time zone NOT NULL,
|
||||||
|
temp_f integer,
|
||||||
|
humidity integer,
|
||||||
|
dewpoint_f integer,
|
||||||
|
windchill_f integer,
|
||||||
|
wind_dir integer,
|
||||||
|
wind_speed_mph integer,
|
||||||
|
wind_gust_mph integer,
|
||||||
|
rain_in integer,
|
||||||
|
daily_rain_in integer,
|
||||||
|
weekly_rain_in integer,
|
||||||
|
monthly_rain_in integer,
|
||||||
|
yearly_rain_in integer,
|
||||||
|
total_rain_in integer,
|
||||||
|
solar_radiation integer,
|
||||||
|
uv integer,
|
||||||
|
indoor_temp_f integer,
|
||||||
|
indoor_humidity integer,
|
||||||
|
abs_barometer_in integer,
|
||||||
|
barometer_in integer,
|
||||||
|
low_battery boolean,
|
||||||
|
raw_data text
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: TABLE weather_data; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON TABLE public.weather_data IS '气象站数据表,存储所有气象观测数据,数值型数据以整数形式存储,查询时需进行转换';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.id; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.id IS '自增主键ID';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.station_id; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.station_id IS '气象站ID,外键关联stations表';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data."timestamp"; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data."timestamp" IS '数据记录时间,使用UTC+8时区(中国标准时间)';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.temp_f; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.temp_f IS '室外温度,存储值=实际值×10,单位:华氏度,查询时需除以10,如768表示76.8°F';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.humidity; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.humidity IS '室外湿度,单位:百分比,如53表示53%';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.dewpoint_f; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.dewpoint_f IS '露点温度,存储值=实际值×10,单位:华氏度,查询时需除以10,如585表示58.5°F';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.windchill_f; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.windchill_f IS '风寒指数,存储值=实际值×10,单位:华氏度,查询时需除以10,如768表示76.8°F';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.wind_dir; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.wind_dir IS '风向,单位:角度(0-359),如44表示东北风(44°)';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.wind_speed_mph; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.wind_speed_mph IS '风速,存储值=实际值×100,单位:英里/小时,查询时需除以100,如100表示1.00mph';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.wind_gust_mph; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.wind_gust_mph IS '阵风速度,存储值=实际值×100,单位:英里/小时,查询时需除以100,如100表示1.00mph';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.rain_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.rain_in IS '当前降雨速率,存储值=实际值×1000,单位:英寸/小时,查询时需除以1000,如500表示0.500英寸/小时';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.daily_rain_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.daily_rain_in IS '日降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如500表示0.500英寸';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.weekly_rain_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.weekly_rain_in IS '周降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如500表示0.500英寸';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.monthly_rain_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.monthly_rain_in IS '月降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如79表示0.079英寸';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.yearly_rain_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.yearly_rain_in IS '年降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如79表示0.079英寸';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.total_rain_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.total_rain_in IS '总降雨量,存储值=实际值×1000,单位:英寸,查询时需除以1000,如79表示0.079英寸';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.solar_radiation; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.solar_radiation IS '太阳辐射,存储值=实际值×100,单位:W/m²,查询时需除以100,如172表示1.72W/m²';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.uv; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.uv IS '紫外线指数,整数值,如0表示无紫外线';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.indoor_temp_f; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.indoor_temp_f IS '室内温度,存储值=实际值×10,单位:华氏度,查询时需除以10,如837表示83.7°F';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.indoor_humidity; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.indoor_humidity IS '室内湿度,单位:百分比,如48表示48%';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.abs_barometer_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.abs_barometer_in IS '绝对气压,存储值=实际值×1000,单位:英寸汞柱,查询时需除以1000,如29320表示29.320英寸汞柱';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.barometer_in; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.barometer_in IS '相对气压,存储值=实际值×1000,单位:英寸汞柱,查询时需除以1000,如29805表示29.805英寸汞柱';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.low_battery; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.low_battery IS '低电量标志,布尔值,true表示电量低';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: COLUMN weather_data.raw_data; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON COLUMN public.weather_data.raw_data IS '原始数据字符串';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: weather_data_id_seq; Type: SEQUENCE; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE SEQUENCE public.weather_data_id_seq
|
||||||
|
AS integer
|
||||||
|
START WITH 1
|
||||||
|
INCREMENT BY 1
|
||||||
|
NO MINVALUE
|
||||||
|
NO MAXVALUE
|
||||||
|
CACHE 1;
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: weather_data_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER SEQUENCE public.weather_data_id_seq OWNED BY public.weather_data.id;
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data id; Type: DEFAULT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.rs485_weather_data ALTER COLUMN id SET DEFAULT nextval('public.rs485_weather_data_id_seq'::regclass);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: weather_data id; Type: DEFAULT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.weather_data ALTER COLUMN id SET DEFAULT nextval('public.weather_data_id_seq'::regclass);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data rs485_udx; Type: CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.rs485_weather_data
|
||||||
|
ADD CONSTRAINT rs485_udx UNIQUE (station_id, "timestamp");
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data rs485_weather_data_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.rs485_weather_data
|
||||||
|
ADD CONSTRAINT rs485_weather_data_pkey PRIMARY KEY (id);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: stations stations_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.stations
|
||||||
|
ADD CONSTRAINT stations_pkey PRIMARY KEY (station_id);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: weather_data weather_data_pkey; Type: CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.weather_data
|
||||||
|
ADD CONSTRAINT weather_data_pkey PRIMARY KEY (id);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: idx_rwd_station_time; Type: INDEX; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE INDEX idx_rwd_station_time ON public.rs485_weather_data USING btree (station_id, "timestamp");
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: idx_rwd_time; Type: INDEX; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE INDEX idx_rwd_time ON public.rs485_weather_data USING btree ("timestamp");
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: idx_weather_data_station_timestamp; Type: INDEX; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE INDEX idx_weather_data_station_timestamp ON public.weather_data USING btree (station_id, "timestamp");
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: INDEX idx_weather_data_station_timestamp; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON INDEX public.idx_weather_data_station_timestamp IS '气象站ID和时间戳的复合索引';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: idx_weather_data_timestamp; Type: INDEX; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE INDEX idx_weather_data_timestamp ON public.weather_data USING btree ("timestamp");
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: INDEX idx_weather_data_timestamp; Type: COMMENT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
COMMENT ON INDEX public.idx_weather_data_timestamp IS '时间戳索引';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: rs485_weather_data rs485_weather_data_station_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.rs485_weather_data
|
||||||
|
ADD CONSTRAINT rs485_weather_data_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: weather_data weather_data_station_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.weather_data
|
||||||
|
ADD CONSTRAINT weather_data_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id);
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- PostgreSQL database dump complete
|
||||||
|
--
|
||||||
|
|
||||||
|
|
||||||
|
-- Name: rs485_weather_10min; Type: TABLE; Schema: public; Owner: -
|
||||||
|
-- 用途:10分钟粒度聚合(长期保留,缩放整数存储)
|
||||||
|
--
|
||||||
|
CREATE TABLE IF NOT EXISTS public.rs485_weather_10min (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
station_id character varying(50) NOT NULL,
|
||||||
|
"bucket_start" timestamp with time zone NOT NULL,
|
||||||
|
temp_c_x100 integer,
|
||||||
|
humidity_pct integer,
|
||||||
|
wind_speed_ms_x1000 integer,
|
||||||
|
wind_gust_ms_x1000 integer,
|
||||||
|
wind_dir_deg integer,
|
||||||
|
rain_10m_mm_x1000 integer,
|
||||||
|
rain_total_mm_x1000 integer,
|
||||||
|
solar_wm2_x100 integer,
|
||||||
|
uv_index integer,
|
||||||
|
pressure_hpa_x100 integer,
|
||||||
|
sample_count integer DEFAULT 0 NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 约束与索引
|
||||||
|
ALTER TABLE ONLY public.rs485_weather_10min
|
||||||
|
ADD CONSTRAINT r10_udx UNIQUE (station_id, "bucket_start");
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.rs485_weather_10min
|
||||||
|
ADD CONSTRAINT rs485_weather_10min_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id);
|
||||||
|
|
||||||
|
CREATE INDEX idx_r10_station_time ON public.rs485_weather_10min USING btree (station_id, "bucket_start");
|
||||||
|
|
||||||
|
COMMENT ON TABLE public.rs485_weather_10min IS '10分钟聚合数据表,数值型以缩放整数存储(温度×100、风速×1000等)';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min."bucket_start" IS '10分钟桶开始时间(与CST对齐分桶,存储为timestamptz)';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.temp_c_x100 IS '10分钟平均温度,单位℃×100';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.humidity_pct IS '10分钟平均湿度,单位%';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.wind_speed_ms_x1000 IS '10分钟平均风速,单位m/s×1000';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.wind_gust_ms_x1000 IS '10分钟最大阵风,单位m/s×1000';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.wind_dir_deg IS '10分钟风向向量平均,单位度(0-359)';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.rain_10m_mm_x1000 IS '10分钟降雨量,按“带回绕正增量”计算,单位mm×1000';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.rain_total_mm_x1000 IS '桶末设备累计降雨(自开机起累加,0..FFFF回绕),单位mm×1000';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.solar_wm2_x100 IS '10分钟平均太阳辐射,单位W/m²×100';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.uv_index IS '10分钟平均紫外线指数';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.pressure_hpa_x100 IS '10分钟平均气压,单位hPa×100';
|
||||||
|
COMMENT ON COLUMN public.rs485_weather_10min.sample_count IS '10分钟样本数量';
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Name: forecast_hourly; Type: TABLE; Schema: public; Owner: -
|
||||||
|
-- 用途:小时级预报(版本化:issued_at 为预报方案发布时间)
|
||||||
|
--
|
||||||
|
CREATE TABLE IF NOT EXISTS public.forecast_hourly (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
station_id character varying(50) NOT NULL,
|
||||||
|
provider character varying(50) NOT NULL,
|
||||||
|
issued_at timestamp with time zone NOT NULL,
|
||||||
|
forecast_time timestamp with time zone NOT NULL,
|
||||||
|
temp_c_x100 integer,
|
||||||
|
humidity_pct integer,
|
||||||
|
wind_speed_ms_x1000 integer,
|
||||||
|
wind_gust_ms_x1000 integer,
|
||||||
|
wind_dir_deg integer,
|
||||||
|
rain_mm_x1000 integer,
|
||||||
|
precip_prob_pct integer,
|
||||||
|
uv_index integer,
|
||||||
|
pressure_hpa_x100 integer
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 约束与索引
|
||||||
|
ALTER TABLE ONLY public.forecast_hourly
|
||||||
|
ADD CONSTRAINT forecast_hourly_udx UNIQUE (station_id, provider, issued_at, forecast_time);
|
||||||
|
|
||||||
|
ALTER TABLE ONLY public.forecast_hourly
|
||||||
|
ADD CONSTRAINT forecast_hourly_station_id_fkey FOREIGN KEY (station_id) REFERENCES public.stations(station_id);
|
||||||
|
|
||||||
|
CREATE INDEX idx_fcast_station_time ON public.forecast_hourly USING btree (station_id, forecast_time);
|
||||||
|
|
||||||
|
-- 注释
|
||||||
|
COMMENT ON TABLE public.forecast_hourly IS '小时级预报表,按issued_at版本化;要素使用缩放整数存储';
|
||||||
|
COMMENT ON COLUMN public.forecast_hourly.issued_at IS '预报方案发布时间(版本时间)';
|
||||||
|
COMMENT ON COLUMN public.forecast_hourly.forecast_time IS '目标小时时间戳';
|
||||||
|
COMMENT ON COLUMN public.forecast_hourly.rain_mm_x1000 IS '该小时降雨量,单位mm×1000';
|
||||||
@ -111,6 +111,87 @@ func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time,
|
|||||||
return points, nil
|
return points, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均)
|
||||||
|
func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) {
|
||||||
|
var query string
|
||||||
|
switch interval {
|
||||||
|
case "10min":
|
||||||
|
query = `
|
||||||
|
SELECT
|
||||||
|
to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||||||
|
ROUND(temp_c_x100/100.0, 2) AS temperature,
|
||||||
|
ROUND(humidity_pct::numeric, 2) AS humidity,
|
||||||
|
ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
|
||||||
|
ROUND(wind_speed_ms_x1000/1000.0, 3) AS wind_speed,
|
||||||
|
ROUND(wind_dir_deg::numeric, 2) AS wind_direction,
|
||||||
|
ROUND(rain_10m_mm_x1000/1000.0, 3) AS rainfall,
|
||||||
|
ROUND(solar_wm2_x100/100.0, 2) AS light,
|
||||||
|
ROUND(uv_index::numeric, 2) AS uv
|
||||||
|
FROM rs485_weather_10min
|
||||||
|
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
||||||
|
ORDER BY bucket_start`
|
||||||
|
case "30min":
|
||||||
|
query = buildAggFrom10MinQuery("30 minutes")
|
||||||
|
default: // 1hour
|
||||||
|
query = buildAggFrom10MinQuery("1 hour")
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := db.Query(query, stationID, startTime, endTime)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var points []types.WeatherPoint
|
||||||
|
for rows.Next() {
|
||||||
|
var p types.WeatherPoint
|
||||||
|
if err := rows.Scan(&p.DateTime, &p.Temperature, &p.Humidity, &p.Pressure, &p.WindSpeed, &p.WindDir, &p.Rainfall, &p.Light, &p.UV); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
points = append(points, p)
|
||||||
|
}
|
||||||
|
return points, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildAggFrom10MinQuery 返回从10分钟表再聚合的SQL(interval 支持 '30 minutes' 或 '1 hour')
|
||||||
|
func buildAggFrom10MinQuery(interval string) string {
|
||||||
|
return `
|
||||||
|
WITH base AS (
|
||||||
|
SELECT * FROM rs485_weather_10min
|
||||||
|
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
||||||
|
), g AS (
|
||||||
|
SELECT
|
||||||
|
date_trunc('hour', bucket_start) + floor(extract(epoch from (bucket_start - date_trunc('hour', bucket_start)))/extract(epoch from '` + interval + `'::interval)) * '` + interval + `'::interval AS grp,
|
||||||
|
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
|
||||||
|
SUM(humidity_pct * sample_count)::bigint AS w_hum,
|
||||||
|
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
|
||||||
|
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
|
||||||
|
SUM(uv_index * sample_count)::bigint AS w_uv,
|
||||||
|
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
|
||||||
|
MAX(wind_gust_ms_x1000) AS gust_max,
|
||||||
|
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
|
||||||
|
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
|
||||||
|
SUM(rain_10m_mm_x1000) AS rain_sum,
|
||||||
|
SUM(sample_count) AS n_sum
|
||||||
|
FROM base
|
||||||
|
GROUP BY 1
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||||||
|
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
|
||||||
|
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
|
||||||
|
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
|
||||||
|
ROUND((w_ws/NULLIF(n_sum,0))/1000.0, 3) AS wind_speed,
|
||||||
|
ROUND((CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
|
||||||
|
THEN degrees(atan2(sin_sum, cos_sum)) + 360
|
||||||
|
ELSE degrees(atan2(sin_sum, cos_sum)) END)::numeric, 2) AS wind_direction,
|
||||||
|
ROUND((rain_sum/1000.0)::numeric, 3) AS rainfall,
|
||||||
|
ROUND((w_solar/NULLIF(n_sum,0))/100.0, 2) AS light,
|
||||||
|
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv
|
||||||
|
FROM g
|
||||||
|
ORDER BY grp`
|
||||||
|
}
|
||||||
|
|
||||||
// buildWeatherDataQuery 构建天气数据查询SQL
|
// buildWeatherDataQuery 构建天气数据查询SQL
|
||||||
func buildWeatherDataQuery(interval string) string {
|
func buildWeatherDataQuery(interval string) string {
|
||||||
return `
|
return `
|
||||||
|
|||||||
@ -119,8 +119,8 @@ func getDataHandler(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取数据
|
// 获取数据(改为基于10分钟聚合表的再聚合)
|
||||||
points, err := database.GetWeatherData(database.GetDB(), stationID, start, end, interval)
|
points, err := database.GetSeriesFrom10Min(database.GetDB(), stationID, start, end, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"})
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"})
|
||||||
return
|
return
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -14,6 +15,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
"weatherstation/internal/config"
|
"weatherstation/internal/config"
|
||||||
|
"weatherstation/internal/tools"
|
||||||
"weatherstation/model"
|
"weatherstation/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -117,6 +119,24 @@ func StartUDPServer() error {
|
|||||||
log.Printf("UDP服务器已启动,监听端口 %d...", cfg.Server.UDPPort)
|
log.Printf("UDP服务器已启动,监听端口 %d...", cfg.Server.UDPPort)
|
||||||
buffer := make([]byte, 2048)
|
buffer := make([]byte, 2048)
|
||||||
|
|
||||||
|
// 后台定时:每分钟回刷最近30分钟(全站)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
now := time.Now()
|
||||||
|
from := now.Add(-30 * time.Minute)
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := tools.RunBackfill10Min(ctx, tools.BackfillOptions{
|
||||||
|
FromTime: from,
|
||||||
|
ToTime: now,
|
||||||
|
WrapCycleMM: 0,
|
||||||
|
BucketMinutes: 10,
|
||||||
|
}); err != nil {
|
||||||
|
log.Printf("定时回填失败: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Minute)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, addr, err := conn.ReadFrom(buffer)
|
n, addr, err := conn.ReadFrom(buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
257
internal/tools/backfill.go
Normal file
257
internal/tools/backfill.go
Normal file
@ -0,0 +1,257 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"weatherstation/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BackfillOptions struct {
|
||||||
|
StationID string // 为空则处理所有站点
|
||||||
|
FromTime time.Time // 含
|
||||||
|
ToTime time.Time // 含
|
||||||
|
WrapCycleMM float64 // 设备累计回绕一圈对应的毫米值;<=0 则按“回绕后仅记当次值”降级处理
|
||||||
|
BucketMinutes int // 默认10
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunBackfill10Min 将 rs485_weather_data 的16秒数据汇总写入 rs485_weather_10min
|
||||||
|
func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error {
|
||||||
|
if opts.BucketMinutes <= 0 {
|
||||||
|
opts.BucketMinutes = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
db := database.GetDB()
|
||||||
|
// 取时序数据
|
||||||
|
query := `
|
||||||
|
SELECT station_id, timestamp, temperature, humidity, wind_speed, wind_direction,
|
||||||
|
rainfall, light, uv, pressure
|
||||||
|
FROM rs485_weather_data
|
||||||
|
WHERE timestamp >= $1 AND timestamp <= $2
|
||||||
|
%s
|
||||||
|
ORDER BY station_id, timestamp`
|
||||||
|
|
||||||
|
stationFilter := ""
|
||||||
|
args := []any{opts.FromTime, opts.ToTime}
|
||||||
|
if opts.StationID != "" {
|
||||||
|
stationFilter = "AND station_id = $3"
|
||||||
|
args = append(args, opts.StationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := db.QueryContext(ctx, fmt.Sprintf(query, stationFilter), args...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("query raw failed: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// 载入上海时区用于分桶
|
||||||
|
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||||||
|
if loc == nil {
|
||||||
|
loc = time.FixedZone("CST", 8*3600)
|
||||||
|
}
|
||||||
|
|
||||||
|
type agg struct {
|
||||||
|
// sums
|
||||||
|
sumTemp float64
|
||||||
|
sumHum float64
|
||||||
|
sumWS float64
|
||||||
|
sumLight float64
|
||||||
|
sumUV float64
|
||||||
|
sumP float64
|
||||||
|
sinSum float64
|
||||||
|
cosSum float64
|
||||||
|
gustMax float64
|
||||||
|
rainIncSum float64
|
||||||
|
count int
|
||||||
|
lastTotal float64 // 桶末累计
|
||||||
|
lastTS time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
currentStation := ""
|
||||||
|
var prevTotal float64
|
||||||
|
var prevTS time.Time
|
||||||
|
|
||||||
|
buckets := make(map[string]map[time.Time]*agg) // station -> bucketStart -> agg
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
stationID string
|
||||||
|
ts time.Time
|
||||||
|
t, h, ws, wd, rf, light, uv, p sql.NullFloat64
|
||||||
|
)
|
||||||
|
if err := rows.Scan(&stationID, &ts, &t, &h, &ws, &wd, &rf, &light, &uv, &p); err != nil {
|
||||||
|
return fmt.Errorf("scan failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 切换设备时重置 prevTotal
|
||||||
|
if stationID != currentStation {
|
||||||
|
currentStation = stationID
|
||||||
|
prevTotal = math.NaN()
|
||||||
|
prevTS = time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算该样本所在桶(CST对齐)
|
||||||
|
bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
|
||||||
|
bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc)
|
||||||
|
|
||||||
|
if _, ok := buckets[stationID]; !ok {
|
||||||
|
buckets[stationID] = make(map[time.Time]*agg)
|
||||||
|
}
|
||||||
|
ag := buckets[stationID][bucketStart]
|
||||||
|
if ag == nil {
|
||||||
|
ag = &agg{gustMax: -1}
|
||||||
|
buckets[stationID][bucketStart] = ag
|
||||||
|
}
|
||||||
|
|
||||||
|
// 累加平均项
|
||||||
|
if t.Valid {
|
||||||
|
ag.sumTemp += t.Float64
|
||||||
|
}
|
||||||
|
if h.Valid {
|
||||||
|
ag.sumHum += h.Float64
|
||||||
|
}
|
||||||
|
if ws.Valid {
|
||||||
|
ag.sumWS += ws.Float64
|
||||||
|
if ws.Float64 > ag.gustMax {
|
||||||
|
ag.gustMax = ws.Float64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if light.Valid {
|
||||||
|
ag.sumLight += light.Float64
|
||||||
|
}
|
||||||
|
if uv.Valid {
|
||||||
|
ag.sumUV += uv.Float64
|
||||||
|
}
|
||||||
|
if p.Valid {
|
||||||
|
ag.sumP += p.Float64
|
||||||
|
}
|
||||||
|
if wd.Valid {
|
||||||
|
rad := wd.Float64 * math.Pi / 180.0
|
||||||
|
ag.sinSum += math.Sin(rad)
|
||||||
|
ag.cosSum += math.Cos(rad)
|
||||||
|
}
|
||||||
|
ag.count++
|
||||||
|
|
||||||
|
// 雨量增量:按时间比例切分到跨越的各个桶,避免边界全部被计入后一桶
|
||||||
|
if rf.Valid {
|
||||||
|
curr := rf.Float64
|
||||||
|
if !math.IsNaN(prevTotal) && !prevTS.IsZero() {
|
||||||
|
// 计算增量(带回绕)
|
||||||
|
inc := 0.0
|
||||||
|
if curr >= prevTotal {
|
||||||
|
inc = curr - prevTotal
|
||||||
|
} else {
|
||||||
|
if opts.WrapCycleMM > 0 {
|
||||||
|
inc = (opts.WrapCycleMM - prevTotal) + curr
|
||||||
|
} else {
|
||||||
|
// 降级:仅计当前值
|
||||||
|
inc = curr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 将 [prevTS, ts] 区间按10分钟边界切分,按时长比例分配增量
|
||||||
|
startLocal := prevTS.In(loc)
|
||||||
|
endLocal := ts.In(loc)
|
||||||
|
if endLocal.After(startLocal) && inc > 0 {
|
||||||
|
totalSec := endLocal.Sub(startLocal).Seconds()
|
||||||
|
segStart := startLocal
|
||||||
|
for segStart.Before(endLocal) {
|
||||||
|
segBucketStart := segStart.Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
|
||||||
|
segBucketEnd := segBucketStart.Add(time.Duration(opts.BucketMinutes) * time.Minute)
|
||||||
|
segEnd := endLocal
|
||||||
|
if segBucketEnd.Before(segEnd) {
|
||||||
|
segEnd = segBucketEnd
|
||||||
|
}
|
||||||
|
portion := inc * (segEnd.Sub(segStart).Seconds() / totalSec)
|
||||||
|
// 确保段对应桶存在
|
||||||
|
if _, ok := buckets[stationID][segBucketStart]; !ok {
|
||||||
|
buckets[stationID][segBucketStart] = &agg{gustMax: -1}
|
||||||
|
}
|
||||||
|
buckets[stationID][segBucketStart].rainIncSum += portion
|
||||||
|
segStart = segEnd
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 记录桶末累计到当前样本所在桶
|
||||||
|
ag.lastTotal = curr
|
||||||
|
prevTotal = curr
|
||||||
|
prevTS = ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return fmt.Errorf("iterate rows failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 写入/更新到 10 分钟表
|
||||||
|
upsert := `
|
||||||
|
INSERT INTO rs485_weather_10min (
|
||||||
|
station_id, bucket_start,
|
||||||
|
temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000,
|
||||||
|
wind_dir_deg, rain_10m_mm_x1000, rain_total_mm_x1000,
|
||||||
|
solar_wm2_x100, uv_index, pressure_hpa_x100, sample_count
|
||||||
|
) VALUES (
|
||||||
|
$1, $2, $3, $4, $5, $6,
|
||||||
|
$7, $8, $9,
|
||||||
|
$10, $11, $12, $13
|
||||||
|
) ON CONFLICT (station_id, bucket_start) DO UPDATE SET
|
||||||
|
temp_c_x100 = EXCLUDED.temp_c_x100,
|
||||||
|
humidity_pct = EXCLUDED.humidity_pct,
|
||||||
|
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
|
||||||
|
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
|
||||||
|
wind_dir_deg = EXCLUDED.wind_dir_deg,
|
||||||
|
rain_10m_mm_x1000 = EXCLUDED.rain_10m_mm_x1000,
|
||||||
|
rain_total_mm_x1000 = EXCLUDED.rain_total_mm_x1000,
|
||||||
|
solar_wm2_x100 = EXCLUDED.solar_wm2_x100,
|
||||||
|
uv_index = EXCLUDED.uv_index,
|
||||||
|
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100,
|
||||||
|
sample_count = EXCLUDED.sample_count`
|
||||||
|
|
||||||
|
for stationID, m := range buckets {
|
||||||
|
for bucketStart, ag := range m {
|
||||||
|
if ag.count == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
avgTemp := ag.sumTemp / float64(ag.count)
|
||||||
|
avgHum := ag.sumHum / float64(ag.count)
|
||||||
|
avgWS := ag.sumWS / float64(ag.count)
|
||||||
|
avgLight := ag.sumLight / float64(ag.count)
|
||||||
|
avgUV := ag.sumUV / float64(ag.count)
|
||||||
|
avgP := ag.sumP / float64(ag.count)
|
||||||
|
// 风向向量平均
|
||||||
|
windDir := 0.0
|
||||||
|
if ag.sinSum != 0 || ag.cosSum != 0 {
|
||||||
|
ang := math.Atan2(ag.sinSum/float64(ag.count), ag.cosSum/float64(ag.count)) * 180 / math.Pi
|
||||||
|
if ang < 0 {
|
||||||
|
ang += 360
|
||||||
|
}
|
||||||
|
windDir = ang
|
||||||
|
}
|
||||||
|
|
||||||
|
// 缩放整数
|
||||||
|
tempScaled := int(math.Round(avgTemp * 100))
|
||||||
|
humScaled := int(math.Round(avgHum))
|
||||||
|
wsScaled := int(math.Round(avgWS * 1000))
|
||||||
|
gustScaled := int(math.Round(ag.gustMax * 1000))
|
||||||
|
wdScaled := int(math.Round(windDir))
|
||||||
|
rain10mScaled := int(math.Round(ag.rainIncSum * 1000))
|
||||||
|
rainTotalScaled := int(math.Round(ag.lastTotal * 1000))
|
||||||
|
solarScaled := int(math.Round(avgLight * 100))
|
||||||
|
uvScaled := int(math.Round(avgUV))
|
||||||
|
pScaled := int(math.Round(avgP * 100))
|
||||||
|
|
||||||
|
if _, err := db.ExecContext(ctx, upsert,
|
||||||
|
stationID, bucketStart,
|
||||||
|
tempScaled, humScaled, wsScaled, gustScaled,
|
||||||
|
wdScaled, rain10mScaled, rainTotalScaled,
|
||||||
|
solarScaled, uvScaled, pScaled, ag.count,
|
||||||
|
); err != nil {
|
||||||
|
return fmt.Errorf("upsert 10min failed: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user