修改时间戳为右端点

This commit is contained in:
zms 2025-10-23 12:33:46 +08:00
parent 7bc2337549
commit d1be656e13
6 changed files with 320 additions and 8 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -131,7 +131,7 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti
case "10min": case "10min":
query = ` query = `
SELECT SELECT
to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time, to_char(bucket_start + interval '10 minutes', 'YYYY-MM-DD HH24:MI:SS') AS date_time,
ROUND(temp_c_x100/100.0, 2) AS temperature, ROUND(temp_c_x100/100.0, 2) AS temperature,
ROUND(humidity_pct::numeric, 2) AS humidity, ROUND(humidity_pct::numeric, 2) AS humidity,
ROUND(pressure_hpa_x100/100.0, 2) AS pressure, ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
@ -143,7 +143,7 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti
ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total
FROM rs485_weather_10min FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3 WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
ORDER BY bucket_start` ORDER BY bucket_start + interval '10 minutes'`
case "30min": case "30min":
query = buildAggFrom10MinQuery("30 minutes") query = buildAggFrom10MinQuery("30 minutes")
default: // 1hour default: // 1hour
@ -206,7 +206,7 @@ func buildAggFrom10MinQuery(interval string) string {
GROUP BY 1 GROUP BY 1
) )
SELECT SELECT
to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time, to_char(grp + '` + interval + `'::interval, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature, ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity, ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure, ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
@ -219,7 +219,7 @@ func buildAggFrom10MinQuery(interval string) string {
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv, ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv,
ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total
FROM g FROM g
ORDER BY grp` ORDER BY grp + '` + interval + `'::interval`
} }
// buildWeatherDataQuery 构建天气数据查询SQL // buildWeatherDataQuery 构建天气数据查询SQL

View File

@ -94,8 +94,14 @@ func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error {
} }
// 计算该样本所在桶CST对齐 // 计算该样本所在桶CST对齐
bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute) // 改为左开右闭 (left-open, right-closed):恰好落在边界的样本归入“结束于该边界”的桶
bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc) localTs := ts.In(loc)
floor := localTs.Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
bucketStart := floor
if localTs.Equal(floor) {
bucketStart = floor.Add(-time.Duration(opts.BucketMinutes) * time.Minute)
}
bucketStart = time.Date(bucketStart.Year(), bucketStart.Month(), bucketStart.Day(), bucketStart.Hour(), bucketStart.Minute(), 0, 0, loc)
if _, ok := buckets[stationID]; !ok { if _, ok := buckets[stationID]; !ok {
buckets[stationID] = make(map[time.Time]*agg) buckets[stationID] = make(map[time.Time]*agg)

236
main.go Normal file
View File

@ -0,0 +1,236 @@
package main
import (
"bufio"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
"unicode/utf8"
"weatherstation/config"
"weatherstation/model"
)
type UTF8Writer struct {
w io.Writer
}
func NewUTF8Writer(w io.Writer) *UTF8Writer {
return &UTF8Writer{w: w}
}
func (w *UTF8Writer) Write(p []byte) (n int, err error) {
if utf8.Valid(p) {
return w.w.Write(p)
}
s := string(p)
s = strings.ToValidUTF8(s, "")
return w.w.Write([]byte(s))
}
var (
logFile *os.File
logFileMutex sync.Mutex
currentLogDay int
)
func getLogFileName() string {
currentTime := time.Now()
return filepath.Join("log", fmt.Sprintf("%s.log", currentTime.Format("2006-01-02")))
}
func openLogFile() (*os.File, error) {
logDir := "log"
if _, err := os.Stat(logDir); os.IsNotExist(err) {
os.MkdirAll(logDir, 0755)
}
logFileName := getLogFileName()
return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
}
func setupLogger() {
var err error
logFile, err = openLogFile()
if err != nil {
log.Fatalf("无法创建日志文件: %v", err)
}
currentLogDay = time.Now().Day()
bufferedWriter := bufio.NewWriter(logFile)
utf8Writer := NewUTF8Writer(bufferedWriter)
go func() {
for {
time.Sleep(1 * time.Second)
logFileMutex.Lock()
bufferedWriter.Flush()
now := time.Now()
if now.Day() != currentLogDay {
oldLogFile := logFile
logFile, err = openLogFile()
if err != nil {
log.Printf("无法创建新日志文件: %v", err)
} else {
oldLogFile.Close()
currentLogDay = now.Day()
bufferedWriter = bufio.NewWriter(logFile)
utf8Writer = NewUTF8Writer(bufferedWriter)
log.SetOutput(io.MultiWriter(os.Stdout, utf8Writer))
log.Println("日志文件已轮转")
}
}
logFileMutex.Unlock()
}
}()
multiWriter := io.MultiWriter(os.Stdout, utf8Writer)
log.SetOutput(multiWriter)
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}
func startUDP() {
cfg := config.GetConfig()
err := model.InitDB()
if err != nil {
log.Fatalf("初始化数据库失败: %v", err)
}
defer model.CloseDB()
addr := fmt.Sprintf(":%d", cfg.Server.UDPPort)
conn, err := net.ListenPacket("udp", addr)
if err != nil {
log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err)
}
defer conn.Close()
log.Printf("UDP服务器已启动监听端口 %d...", cfg.Server.UDPPort)
buffer := make([]byte, 2048)
for {
n, addr, err := conn.ReadFrom(buffer)
if err != nil {
log.Printf("读取数据错误: %v", err)
continue
}
rawData := buffer[:n]
log.Printf("从 %s 接收到 %d 字节数据", addr.String(), n)
hexDump := hexDump(rawData)
log.Printf("原始码流(十六进制):\n%s", hexDump)
asciiDump := asciiDump(rawData)
log.Printf("ASCII码:\n%s", asciiDump)
// 首先尝试解析为WH65LP数据
if len(rawData) == 25 && rawData[0] == 0x24 {
wh65lpData, err := model.ParseWH65LPData(rawData)
if err != nil {
log.Printf("解析WH65LP数据失败: %v", err)
} else {
log.Println("成功解析WH65LP气象站数据:")
log.Println(wh65lpData)
// 更新内存中的设备信息
model.UpdateDeviceInMemory(wh65lpData.StationID, addr, model.DeviceTypeWH65LP)
// 注册设备到数据库
err = model.RegisterDeviceInDB(wh65lpData.StationID, addr)
if err != nil {
log.Printf("注册设备失败: %v", err)
}
log.Printf("设备 %s 已注册IP: %s", wh65lpData.StationID, addr.String())
// 保存数据
err = model.SaveWH65LPData(wh65lpData, rawData)
if err != nil {
log.Printf("保存数据到数据库失败: %v", err)
} else {
log.Printf("数据已成功保存到数据库")
}
continue
}
}
// 如果不是WH65LP数据尝试解析为ECOWITT数据
data := string(rawData)
weatherData, err := model.ParseWeatherData(data)
if err != nil {
log.Printf("解析ECOWITT数据失败: %v", err)
continue
}
log.Println("成功解析ECOWITT气象站数据:")
log.Println(weatherData)
if weatherData.StationID != "" {
// 更新内存中的设备信息
model.UpdateDeviceInMemory(weatherData.StationID, addr, model.DeviceTypeEcowitt)
// 注册设备到数据库
err = model.RegisterDeviceInDB(weatherData.StationID, addr)
if err != nil {
log.Printf("注册设备失败: %v", err)
}
log.Printf("设备 %s 已注册IP: %s", weatherData.StationID, addr.String())
} else {
log.Printf("警告: 收到的数据没有站点ID")
}
err = model.SaveWeatherData(weatherData, data)
if err != nil {
log.Printf("保存数据到数据库失败: %v", err)
} else {
log.Printf("数据已成功保存到数据库")
}
}
}
func main() {
setupLogger()
startUDP() // 直接运行UDP服务器不再使用goroutine
}
func hexDump(data []byte) string {
var result strings.Builder
for i := 0; i < len(data); i += 16 {
end := i + 16
if end > len(data) {
end = len(data)
}
chunk := data[i:end]
hexStr := hex.EncodeToString(chunk)
for j := 0; j < len(hexStr); j += 2 {
if j+2 <= len(hexStr) {
result.WriteString(strings.ToUpper(hexStr[j : j+2]))
result.WriteString(" ")
}
}
result.WriteString("\n")
}
return result.String()
}
func asciiDump(data []byte) string {
var result strings.Builder
for i := 0; i < len(data); i += 64 {
end := i + 64
if end > len(data) {
end = len(data)
}
chunk := data[i:end]
for _, b := range chunk {
if b >= 32 && b <= 126 {
result.WriteByte(b)
} else {
result.WriteString(".")
}
}
result.WriteString("\n")
}
return result.String()
}

61
server/udp_server.go Normal file
View File

@ -0,0 +1,61 @@
package server
import (
"log"
"net"
"weatherstation/model"
)
func StartUDPServer(port string) error {
addr, err := net.ResolveUDPAddr("udp", ":"+port)
if err != nil {
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
defer conn.Close()
log.Printf("UDP服务器已启动监听端口 %s", port)
buffer := make([]byte, 1024)
for {
n, remoteAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取UDP数据失败: %v", err)
continue
}
// 处理接收到的数据
go handleUDPData(buffer[:n], remoteAddr)
}
}
func handleUDPData(data []byte, addr *net.UDPAddr) {
// 尝试解析为WH65LP数据
weatherData, err := model.ParseUDP10006Data(data)
if err != nil {
log.Printf("解析数据失败: %v", err)
return
}
// 更新内存中的设备信息
model.UpdateDeviceInMemory(weatherData.StationID, addr, model.DeviceTypeWH65LP)
// 注册设备到数据库
err = model.RegisterDeviceInDB(weatherData.StationID, addr)
if err != nil {
log.Printf("注册设备失败: %v", err)
}
// 保存数据
err = model.SaveWH65LPData(weatherData, data)
if err != nil {
log.Printf("保存数据失败: %v", err)
return
}
log.Printf("成功接收并保存来自 %s 的WH65LP数据:\n%s", addr.String(), weatherData.String())
}

1
参考页面.html Normal file
View File

@ -0,0 +1 @@
Hello