From e4b1c19064cc6f48c620b0216a08c79c11da2770 Mon Sep 17 00:00:00 2001 From: yarnom Date: Fri, 22 Aug 2025 10:03:34 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E6=AD=A3=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2SQL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/weatherstation/main.go | 15 +++++++ internal/database/models.go | 22 +++++++++- internal/selftest/selftest.go | 80 +++++++++++++++++++++++++++++++++++ internal/server/gin.go | 6 ++- 4 files changed, 120 insertions(+), 3 deletions(-) create mode 100644 internal/selftest/selftest.go diff --git a/cmd/weatherstation/main.go b/cmd/weatherstation/main.go index 4c701cd..6801bc7 100644 --- a/cmd/weatherstation/main.go +++ b/cmd/weatherstation/main.go @@ -7,6 +7,7 @@ import ( "sync" "time" "weatherstation/internal/database" + "weatherstation/internal/selftest" "weatherstation/internal/server" "weatherstation/internal/tools" ) @@ -21,6 +22,9 @@ func main() { var bfFrom = flag.String("from", "", "回填起始时间,格式:YYYY-MM-DD HH:MM:SS") var bfTo = flag.String("to", "", "回填结束时间,格式:YYYY-MM-DD HH:MM:SS") var bfWrap = flag.Float64("wrap", 0, "回绕一圈对应毫米值(mm),<=0 则降级为仅计当前值") + // 自检控制 + var noSelftest = flag.Bool("no-selftest", false, "跳过启动自检") + var selftestOnly = flag.Bool("selftest_only", false, "仅执行自检后退出") flag.Parse() // 设置日志 @@ -30,6 +34,17 @@ func main() { _ = database.GetDB() // 确保数据库连接已初始化 defer database.Close() + // 启动前自检 + if !*noSelftest { + if err := selftest.Run(context.Background()); err != nil { + log.Fatalf("启动自检失败: %v", err) + } + if *selftestOnly { + log.Println("自检完成,按 --selftest_only 要求退出") + return + } + } + // Backfill 调试路径 if *doBackfill { if *bfFrom == "" || *bfTo == "" { diff --git a/internal/database/models.go b/internal/database/models.go index 5985629..f0fee5e 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -2,6 +2,7 @@ package database import ( "database/sql" + "log" "time" "weatherstation/pkg/types" ) @@ -113,6 +114,9 @@ func GetWeatherData(db *sql.DB, stationID string, startTime, endTime time.Time, // GetSeriesFrom10Min 基于10分钟聚合表返回 10m/30m/1h 数据(风向向量平均、降雨求和、加权平均) func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Time, interval string) ([]types.WeatherPoint, error) { + log.Printf("查询数据: stationID=%s, start=%v, end=%v, interval=%s", + stationID, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), interval) + var query string switch interval { case "10min": @@ -136,8 +140,14 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti query = buildAggFrom10MinQuery("1 hour") } + // // 调试输出完整SQL + // debugSQL := fmt.Sprintf("-- SQL for %s\n%s\n-- Params: stationID=%s, start=%v, end=%v", + // interval, query, stationID, startTime, endTime) + // log.Println(debugSQL) + rows, err := db.Query(query, stationID, startTime, endTime) if err != nil { + log.Printf("查询失败: %v", err) return nil, err } defer rows.Close() @@ -158,10 +168,18 @@ func buildAggFrom10MinQuery(interval string) string { return ` WITH base AS ( SELECT * FROM rs485_weather_10min - WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 + '` + interval + `'::interval + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 ), g AS ( SELECT - date_trunc('` + interval + `', bucket_start) AS grp, + CASE '` + interval + `' + WHEN '1 hour' THEN date_trunc('hour', bucket_start) + WHEN '30 minutes' THEN + date_trunc('hour', bucket_start) + + CASE WHEN date_part('minute', bucket_start) >= 30 + THEN '30 minutes'::interval + ELSE '0 minutes'::interval + END + END AS grp, SUM(temp_c_x100 * sample_count)::bigint AS w_temp, SUM(humidity_pct * sample_count)::bigint AS w_hum, SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p, diff --git a/internal/selftest/selftest.go b/internal/selftest/selftest.go new file mode 100644 index 0000000..a5e7cd7 --- /dev/null +++ b/internal/selftest/selftest.go @@ -0,0 +1,80 @@ +package selftest + +import ( + "context" + "database/sql" + "fmt" + "time" + + "weatherstation/internal/database" +) + +// Run 执行启动前自检 +// 1) 数据库可用 2) 关键表存在 3) 基础查询可执行 +func Run(ctx context.Context) error { + db := database.GetDB() + if err := pingDB(ctx, db); err != nil { + return fmt.Errorf("数据库连通性失败: %w", err) + } + if err := ensureTables(ctx, db); err != nil { + return fmt.Errorf("关键表缺失: %w", err) + } + if err := basicQuery(ctx, db); err != nil { + return fmt.Errorf("基础查询失败: %w", err) + } + return nil +} + +func pingDB(ctx context.Context, db *sql.DB) error { + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + return db.PingContext(ctx) +} + +func ensureTables(ctx context.Context, db *sql.DB) error { + required := []string{ + "public.stations", + "public.rs485_weather_data", + "public.rs485_weather_10min", + "public.forecast_hourly", + } + for _, t := range required { + var exists bool + if err := db.QueryRowContext(ctx, "SELECT to_regclass($1) IS NOT NULL", t).Scan(&exists); err != nil { + return err + } + if !exists { + return fmt.Errorf("缺少表: %s", t) + } + } + return nil +} + +func basicQuery(ctx context.Context, db *sql.DB) error { + // 若无站点则跳过后续测试 + var stationID string + _ = db.QueryRowContext(ctx, "SELECT station_id FROM stations LIMIT 1").Scan(&stationID) + + // 10分钟表可被查询(允许为空) + var cnt int + if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM rs485_weather_10min").Scan(&cnt); err != nil { + return err + } + + // 若有站点,做一次安全时间窗聚合验证(不要求有数据,只要语句可执行) + if stationID != "" { + from := time.Now().Add(-2 * time.Hour).Truncate(time.Hour) + to := time.Now().Truncate(time.Hour) + _, err := db.QueryContext(ctx, + `WITH base AS ( + SELECT * FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start <= $3 + ), g AS ( + SELECT date_trunc('hour', bucket_start) AS grp, SUM(sample_count) AS n_sum FROM base GROUP BY 1 + ) SELECT COUNT(*) FROM g`, stationID, from, to, + ) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/server/gin.go b/internal/server/gin.go index 88c8272..6d06da2 100644 --- a/internal/server/gin.go +++ b/internal/server/gin.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "log" "net/http" "strconv" "time" @@ -126,7 +127,10 @@ func getDataHandler(c *gin.Context) { // 获取数据(改为基于10分钟聚合表的再聚合) points, err := database.GetSeriesFrom10Min(database.GetDB(), stationID, start, end, interval) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "查询数据失败"}) + log.Printf("查询数据失败: %v", err) // 记录具体错误到服务端日志 + c.JSON(http.StatusInternalServerError, gin.H{ + "error": fmt.Sprintf("查询数据失败: %v", err), + }) return }