fix: 修正时间
This commit is contained in:
parent
bc8027a3d8
commit
75cb5722f8
@ -61,6 +61,7 @@ func (e *Exporter) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now().In(e.loc)
|
now := time.Now().In(e.loc)
|
||||||
|
// 下一个10分钟边界 + 30s
|
||||||
next := alignToNextBucketEnd(now, 10).Add(30 * time.Second)
|
next := alignToNextBucketEnd(now, 10).Add(30 * time.Second)
|
||||||
e.logger.Printf("调度: 当前=%s, 下次执行=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
|
e.logger.Printf("调度: 当前=%s, 下次执行=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05"))
|
||||||
delay := time.Until(next)
|
delay := time.Until(next)
|
||||||
@ -74,12 +75,19 @@ func (e *Exporter) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
end := time.Now().In(e.loc).Truncate(time.Minute)
|
// 修正:当前时间是桶末,需要导出的是刚结束的上一个桶
|
||||||
end = alignToPrevBucketEnd(end, 10)
|
// 例如:现在是20:50:30,应该导出的是20:40桶(表示20:40-20:50的数据)
|
||||||
start := end.Add(-10 * time.Minute)
|
currentTime := time.Now().In(e.loc)
|
||||||
|
bucketEnd := alignToPrevBucketEnd(currentTime, 10) // 上一个桶末,如20:50
|
||||||
|
bucketStart := bucketEnd.Add(-10 * time.Minute) // 上一个桶开始,如20:40
|
||||||
|
|
||||||
if err := e.exportBucket(ctx, start, end); err != nil {
|
e.logger.Printf("当前时间=%s, 导出桶=%s-%s",
|
||||||
e.logger.Printf("导出桶 %s-%s 失败: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err)
|
currentTime.Format("2006-01-02 15:04:05"),
|
||||||
|
bucketStart.Format("2006-01-02 15:04:05"),
|
||||||
|
bucketEnd.Format("2006-01-02 15:04:05"))
|
||||||
|
|
||||||
|
if err := e.exportBucket(ctx, bucketStart, bucketEnd); err != nil {
|
||||||
|
e.logger.Printf("导出桶 %s-%s 失败: %v", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -95,7 +103,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
|||||||
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
|
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
|
||||||
e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath)
|
e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath)
|
||||||
|
|
||||||
// 先查询所有符合条件的站点,用于后续比对缺失 - 修正:使用station_id和device_id
|
// 先查询所有符合条件的站点,用于后续比对缺失
|
||||||
var eligibleStations []struct {
|
var eligibleStations []struct {
|
||||||
StationID string
|
StationID string
|
||||||
DeviceID string
|
DeviceID string
|
||||||
@ -134,7 +142,8 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
|||||||
|
|
||||||
needHeader := ensureFileWithHeader(activePath)
|
needHeader := ensureFileWithHeader(activePath)
|
||||||
|
|
||||||
// 查询该桶的数据 - 修正:使用正确的JOIN条件
|
// 查询该桶的数据 - 使用桶开始时间查询
|
||||||
|
e.logger.Printf("查询数据: 使用bucket_start=%s", bucketStart.Format("2006-01-02 15:04:05"))
|
||||||
rows, err := e.pg.QueryContext(ctx, `
|
rows, err := e.pg.QueryContext(ctx, `
|
||||||
SELECT
|
SELECT
|
||||||
s.latitude,
|
s.latitude,
|
||||||
@ -193,7 +202,9 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
|||||||
processedStationIDs[stationID] = true
|
processedStationIDs[stationID] = true
|
||||||
e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64)
|
e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64)
|
||||||
|
|
||||||
|
// 使用桶开始时间作为date_time
|
||||||
dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05")
|
dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05")
|
||||||
|
e.logger.Printf("站点 %s: 使用时间=%s", stationID, dateTimeStr)
|
||||||
|
|
||||||
var pressureStr, tempStr, wsStr, wdStr, rhStr string
|
var pressureStr, tempStr, wsStr, wdStr, rhStr string
|
||||||
if pX100.Valid {
|
if pX100.Valid {
|
||||||
@ -212,7 +223,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
|||||||
rhStr = fmtFloat(float64(rh.Int64), 0)
|
rhStr = fmtFloat(float64(rh.Int64), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 使用device_id查询ZTD
|
// 使用device_id查询ZTD,使用桶末时间
|
||||||
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
|
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
|
||||||
if ztdStr != "" {
|
if ztdStr != "" {
|
||||||
ztdHit++
|
ztdHit++
|
||||||
@ -257,7 +268,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查哪些站点在这个桶完全没有数据 - 修正:使用station_id比较
|
// 检查哪些站点在这个桶完全没有数据
|
||||||
var missingCount int
|
var missingCount int
|
||||||
for _, station := range eligibleStations {
|
for _, station := range eligibleStations {
|
||||||
if !processedStationIDs[station.StationID] {
|
if !processedStationIDs[station.StationID] {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user