feat:新增导出日志,方便查找问题

This commit is contained in:
yarnom 2025-08-22 20:30:44 +08:00
parent 24dca2f489
commit 1defe32470

View File

@ -18,9 +18,10 @@ import (
// Exporter 负责每10分钟导出 CSV含ZTD融合
type Exporter struct {
pg *sql.DB
my *sql.DB
loc *time.Location // Asia/Shanghai
pg *sql.DB
my *sql.DB
loc *time.Location // Asia/Shanghai
logger *log.Logger // 专用日志记录器
}
func NewExporter() *Exporter {
@ -28,10 +29,25 @@ func NewExporter() *Exporter {
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// 创建导出专用日志文件(追加模式)
if err := os.MkdirAll("export_data", 0o755); err != nil {
log.Printf("创建导出日志目录失败: %v", err)
}
f, err := os.OpenFile(filepath.Join("export_data", "export.log"),
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
log.Printf("创建导出日志文件失败: %v", err)
}
// 使用自定义日志格式,包含时间戳
logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds)
return &Exporter{
pg: database.GetDB(),
my: database.GetMySQL(),
loc: loc,
pg: database.GetDB(),
my: database.GetMySQL(),
loc: loc,
logger: logger,
}
}
@ -46,7 +62,7 @@ func (e *Exporter) Start(ctx context.Context) error {
now := time.Now().In(e.loc)
next := alignToNextBucketEnd(now, 10).Add(30 * time.Second)
log.Printf("exporter: now=%s next_run=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
e.logger.Printf("调度: 当前=%s, 下次执行=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
delay := time.Until(next)
if delay > 0 {
timer := time.NewTimer(delay)
@ -63,7 +79,7 @@ func (e *Exporter) Start(ctx context.Context) error {
start := end.Add(-10 * time.Minute)
if err := e.exportBucket(ctx, start, end); err != nil {
log.Printf("export bucket %s-%s failed: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err)
e.logger.Printf("导出桶 %s-%s 失败: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err)
}
}
}
@ -77,15 +93,39 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
return err
}
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
log.Printf("exporter: begin bucket start=%s end=%s -> %s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath)
e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath)
// 先查询所有符合条件的站点,用于后续比对缺失
var eligibleStations []string
stationsRows, err := e.pg.QueryContext(ctx, `
SELECT device_id
FROM stations
WHERE device_type = 'WH65LP'
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude <> 0 AND longitude <> 0
ORDER BY device_id
`)
if err != nil {
e.logger.Printf("查询合格站点失败: %v", err)
} else {
defer stationsRows.Close()
for stationsRows.Next() {
var deviceID string
if err := stationsRows.Scan(&deviceID); err == nil {
eligibleStations = append(eligibleStations, deviceID)
}
}
e.logger.Printf("合格站点总数: %d, 站点列表: %v", len(eligibleStations), eligibleStations)
}
// 轮转上一 UTC 日的文件(若存在且未压缩)
if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil {
log.Printf("rotate previous day failed: %v", err)
e.logger.Printf("轮转上一日文件失败: %v", err)
}
needHeader := ensureFileWithHeader(activePath)
// 查询该桶的数据
rows, err := e.pg.QueryContext(ctx, `
SELECT
s.latitude,
@ -106,7 +146,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
ORDER BY s.device_id
`, bucketStart)
if err != nil {
return err
return fmt.Errorf("查询10分钟数据失败: %v", err)
}
defer rows.Close()
@ -122,6 +162,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
}
var total, ztdHit, ztdMiss int
processedStations := make(map[string]bool)
for rows.Next() {
var (
@ -134,11 +175,13 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
bucketStartTS time.Time
)
if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS); err != nil {
log.Printf("scan row failed: %v", err)
e.logger.Printf("扫描行失败: %v", err)
continue
}
// date_time 使用 Go 端按 CST 格式化
processedStations[deviceID] = true
e.logger.Printf("处理站点 %s: 经度=%.6f, 纬度=%.6f", deviceID, lon.Float64, lat.Float64)
dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05")
var pressureStr, tempStr, wsStr, wdStr, rhStr string
@ -161,8 +204,10 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
if ztdStr != "" {
ztdHit++
e.logger.Printf("站点 %s: ZTD 数据正常, 值=%s", deviceID, ztdStr)
} else {
ztdMiss++
e.logger.Printf("站点 %s: ZTD 数据缺失或超出5分钟窗口", deviceID)
}
var b strings.Builder
@ -192,14 +237,27 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
b.WriteByte(',') // pwv 留空
if _, err := f.WriteString(b.String() + "\n"); err != nil {
log.Printf("write csv failed: %v", err)
e.logger.Printf("写入CSV失败: %v", err)
}
total++
}
if err := rows.Err(); err != nil {
return err
}
log.Printf("exporter: done bucket start=%s end=%s, rows=%d, ztd_hit=%d, ztd_miss=%d", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), total, ztdHit, ztdMiss)
// 检查哪些站点在这个桶完全没有数据
for _, station := range eligibleStations {
if !processedStations[station] {
e.logger.Printf("站点 %s: 本桶完全无数据rs485_weather_10min 表中未找到记录)", station)
}
}
e.logger.Printf("导出完成: 桶=%s-%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d",
bucketStart.Format("2006-01-02 15:04:05"),
bucketEnd.Format("2006-01-02 15:04:05"),
total, ztdHit, ztdMiss,
len(eligibleStations)-len(processedStations))
return nil
}
@ -217,11 +275,12 @@ func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd tim
`, deviceID, bucketEnd).Scan(&ztd, &ts)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Printf("mysql query ztd failed: %v", err)
e.logger.Printf("查询ZTD失败: station_id=%s, error=%v", deviceID, err)
}
return ""
}
if !ztd.Valid {
e.logger.Printf("站点 %s: ZTD值为NULL", deviceID)
return ""
}
return fmtFloat(ztd.Float64*1000.0, -1)