Compare commits
3 Commits
7bc2337549
...
65f95662ca
| Author | SHA1 | Date | |
|---|---|---|---|
| 65f95662ca | |||
| 19ea80614c | |||
| d1be656e13 |
8
.idea/.gitignore
generated
vendored
Normal file
8
.idea/.gitignore
generated
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
@ -131,7 +131,7 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti
|
||||
case "10min":
|
||||
query = `
|
||||
SELECT
|
||||
to_char(bucket_start, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||||
to_char(bucket_start + interval '10 minutes', 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||||
ROUND(temp_c_x100/100.0, 2) AS temperature,
|
||||
ROUND(humidity_pct::numeric, 2) AS humidity,
|
||||
ROUND(pressure_hpa_x100/100.0, 2) AS pressure,
|
||||
@ -143,7 +143,7 @@ func GetSeriesFrom10Min(db *sql.DB, stationID string, startTime, endTime time.Ti
|
||||
ROUND(rain_total_mm_x1000/1000.0, 3) AS rain_total
|
||||
FROM rs485_weather_10min
|
||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start <= $3
|
||||
ORDER BY bucket_start`
|
||||
ORDER BY bucket_start + interval '10 minutes'`
|
||||
case "30min":
|
||||
query = buildAggFrom10MinQuery("30 minutes")
|
||||
default: // 1hour
|
||||
@ -205,8 +205,8 @@ func buildAggFrom10MinQuery(interval string) string {
|
||||
FROM base
|
||||
GROUP BY 1
|
||||
)
|
||||
SELECT
|
||||
to_char(grp, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||||
SELECT
|
||||
to_char(grp + '` + interval + `'::interval, 'YYYY-MM-DD HH24:MI:SS') AS date_time,
|
||||
ROUND((w_temp/NULLIF(n_sum,0))/100.0, 2) AS temperature,
|
||||
ROUND((w_hum/NULLIF(n_sum,0))::numeric, 2) AS humidity,
|
||||
ROUND((w_p/NULLIF(n_sum,0))/100.0, 2) AS pressure,
|
||||
@ -219,7 +219,7 @@ func buildAggFrom10MinQuery(interval string) string {
|
||||
ROUND((w_uv/NULLIF(n_sum,0))::numeric, 2) AS uv,
|
||||
ROUND((rain_total_max/1000.0)::numeric, 3) AS rain_total
|
||||
FROM g
|
||||
ORDER BY grp`
|
||||
ORDER BY grp + '` + interval + `'::interval`
|
||||
}
|
||||
|
||||
// buildWeatherDataQuery 构建天气数据查询SQL
|
||||
|
||||
@ -65,7 +65,10 @@ func RunCaiyunFetch(ctx context.Context, token string) error {
|
||||
|
||||
issuedAt := time.Now().In(loc)
|
||||
startHour := issuedAt.Truncate(time.Hour)
|
||||
targets := []time.Time{startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour), startHour.Add(3 * time.Hour)}
|
||||
// 彩云小时接口返回的是“左端点”时刻(例如 13:00 表示 13:00-14:00 区间)。
|
||||
// 我们将左端点列表保留为 startHour, startHour+1h, startHour+2h,并在写库时统一 +1h
|
||||
// 使得 forecast_time 表示区间右端,与实测聚合对齐。
|
||||
leftEdges := []time.Time{startHour, startHour.Add(1 * time.Hour), startHour.Add(2 * time.Hour)}
|
||||
|
||||
for _, s := range stations {
|
||||
if !s.lat.Valid || !s.lon.Valid {
|
||||
@ -161,27 +164,30 @@ func RunCaiyunFetch(ctx context.Context, token string) error {
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("处理时间点: %v", targets)
|
||||
for _, ft := range targets {
|
||||
if v, ok := table[ft]; ok {
|
||||
log.Printf("写入预报点: station=%s time=%s rain=%.3f temp=%.2f rh=%.1f ws=%.3f wdir=%.1f prob=%.1f pres=%.2f",
|
||||
s.id, ft.Format(time.RFC3339), v.rain, v.temp, v.rh, v.ws, v.wdir, v.prob, v.pres)
|
||||
if err := upsertForecastCaiyun(ctx, db, s.id, issuedAt, ft,
|
||||
int64(v.rain*1000.0), // mm → x1000
|
||||
int64(v.temp*100.0), // °C → x100
|
||||
int64(v.rh), // %
|
||||
int64(v.ws*1000.0), // m/s → x1000
|
||||
int64(0), // gust: 彩云小时接口无阵风,置0
|
||||
int64(v.wdir), // 度
|
||||
int64(v.prob), // %
|
||||
int64(v.pres*100.0), // hPa → x100
|
||||
); err != nil {
|
||||
log.Printf("写入forecast失败(caiyun) station=%s time=%s err=%v", s.id, ft.Format(time.RFC3339), err)
|
||||
} else {
|
||||
log.Printf("写入forecast成功(caiyun) station=%s time=%s", s.id, ft.Format(time.RFC3339))
|
||||
}
|
||||
log.Printf("处理时间点(彩云左端): %v", leftEdges)
|
||||
for _, left := range leftEdges {
|
||||
v, ok := table[left]
|
||||
if !ok {
|
||||
log.Printf("时间点无数据: %s", left.Format(time.RFC3339))
|
||||
continue
|
||||
}
|
||||
ft := left.Add(1 * time.Hour)
|
||||
log.Printf("写入预报点: station=%s forecast_time=%s (source=%s) rain=%.3f temp=%.2f rh=%.1f ws=%.3f wdir=%.1f prob=%.1f pres=%.2f",
|
||||
s.id, ft.Format(time.RFC3339), left.Format(time.RFC3339), v.rain, v.temp, v.rh, v.ws, v.wdir, v.prob, v.pres)
|
||||
err := upsertForecastCaiyun(ctx, db, s.id, issuedAt, ft,
|
||||
int64(v.rain*1000.0), // mm → x1000
|
||||
int64(v.temp*100.0), // °C → x100
|
||||
int64(v.rh), // %
|
||||
int64(v.ws*1000.0), // m/s → x1000
|
||||
int64(0), // gust: 彩云小时接口无阵风,置0
|
||||
int64(v.wdir), // 度
|
||||
int64(v.prob), // %
|
||||
int64(v.pres*100.0), // hPa → x100
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("写入forecast失败(caiyun) station=%s time=%s err=%v", s.id, ft.Format(time.RFC3339), err)
|
||||
} else {
|
||||
log.Printf("时间点无数据: %s", ft.Format(time.RFC3339))
|
||||
log.Printf("写入forecast成功(caiyun) station=%s time=%s", s.id, ft.Format(time.RFC3339))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,9 +93,15 @@ func RunBackfill10Min(ctx context.Context, opts BackfillOptions) error {
|
||||
prevTS = time.Time{}
|
||||
}
|
||||
|
||||
// 计算该样本所在桶(CST对齐)
|
||||
bucketLocal := ts.In(loc).Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
|
||||
bucketStart := time.Date(bucketLocal.Year(), bucketLocal.Month(), bucketLocal.Day(), bucketLocal.Hour(), bucketLocal.Minute(), 0, 0, loc)
|
||||
// 计算该样本所在桶(CST对齐)
|
||||
// 改为左开右闭 (left-open, right-closed):恰好落在边界的样本归入“结束于该边界”的桶
|
||||
localTs := ts.In(loc)
|
||||
floor := localTs.Truncate(time.Duration(opts.BucketMinutes) * time.Minute)
|
||||
bucketStart := floor
|
||||
if localTs.Equal(floor) {
|
||||
bucketStart = floor.Add(-time.Duration(opts.BucketMinutes) * time.Minute)
|
||||
}
|
||||
bucketStart = time.Date(bucketStart.Year(), bucketStart.Month(), bucketStart.Day(), bucketStart.Hour(), bucketStart.Minute(), 0, 0, loc)
|
||||
|
||||
if _, ok := buckets[stationID]; !ok {
|
||||
buckets[stationID] = make(map[time.Time]*agg)
|
||||
|
||||
236
main.go
Normal file
236
main.go
Normal file
@ -0,0 +1,236 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"weatherstation/config"
|
||||
"weatherstation/model"
|
||||
)
|
||||
|
||||
type UTF8Writer struct {
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func NewUTF8Writer(w io.Writer) *UTF8Writer {
|
||||
return &UTF8Writer{w: w}
|
||||
}
|
||||
|
||||
func (w *UTF8Writer) Write(p []byte) (n int, err error) {
|
||||
if utf8.Valid(p) {
|
||||
return w.w.Write(p)
|
||||
}
|
||||
s := string(p)
|
||||
s = strings.ToValidUTF8(s, "")
|
||||
return w.w.Write([]byte(s))
|
||||
}
|
||||
|
||||
var (
|
||||
logFile *os.File
|
||||
logFileMutex sync.Mutex
|
||||
currentLogDay int
|
||||
)
|
||||
|
||||
func getLogFileName() string {
|
||||
currentTime := time.Now()
|
||||
return filepath.Join("log", fmt.Sprintf("%s.log", currentTime.Format("2006-01-02")))
|
||||
}
|
||||
|
||||
func openLogFile() (*os.File, error) {
|
||||
logDir := "log"
|
||||
if _, err := os.Stat(logDir); os.IsNotExist(err) {
|
||||
os.MkdirAll(logDir, 0755)
|
||||
}
|
||||
|
||||
logFileName := getLogFileName()
|
||||
return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
}
|
||||
|
||||
func setupLogger() {
|
||||
var err error
|
||||
logFile, err = openLogFile()
|
||||
if err != nil {
|
||||
log.Fatalf("无法创建日志文件: %v", err)
|
||||
}
|
||||
|
||||
currentLogDay = time.Now().Day()
|
||||
|
||||
bufferedWriter := bufio.NewWriter(logFile)
|
||||
utf8Writer := NewUTF8Writer(bufferedWriter)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
logFileMutex.Lock()
|
||||
bufferedWriter.Flush()
|
||||
|
||||
now := time.Now()
|
||||
if now.Day() != currentLogDay {
|
||||
oldLogFile := logFile
|
||||
logFile, err = openLogFile()
|
||||
if err != nil {
|
||||
log.Printf("无法创建新日志文件: %v", err)
|
||||
} else {
|
||||
oldLogFile.Close()
|
||||
currentLogDay = now.Day()
|
||||
bufferedWriter = bufio.NewWriter(logFile)
|
||||
utf8Writer = NewUTF8Writer(bufferedWriter)
|
||||
log.SetOutput(io.MultiWriter(os.Stdout, utf8Writer))
|
||||
log.Println("日志文件已轮转")
|
||||
}
|
||||
}
|
||||
logFileMutex.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
multiWriter := io.MultiWriter(os.Stdout, utf8Writer)
|
||||
log.SetOutput(multiWriter)
|
||||
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
||||
}
|
||||
|
||||
func startUDP() {
|
||||
cfg := config.GetConfig()
|
||||
err := model.InitDB()
|
||||
if err != nil {
|
||||
log.Fatalf("初始化数据库失败: %v", err)
|
||||
}
|
||||
defer model.CloseDB()
|
||||
addr := fmt.Sprintf(":%d", cfg.Server.UDPPort)
|
||||
conn, err := net.ListenPacket("udp", addr)
|
||||
if err != nil {
|
||||
log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
log.Printf("UDP服务器已启动,监听端口 %d...", cfg.Server.UDPPort)
|
||||
buffer := make([]byte, 2048)
|
||||
for {
|
||||
n, addr, err := conn.ReadFrom(buffer)
|
||||
if err != nil {
|
||||
log.Printf("读取数据错误: %v", err)
|
||||
continue
|
||||
}
|
||||
rawData := buffer[:n]
|
||||
log.Printf("从 %s 接收到 %d 字节数据", addr.String(), n)
|
||||
|
||||
hexDump := hexDump(rawData)
|
||||
log.Printf("原始码流(十六进制):\n%s", hexDump)
|
||||
asciiDump := asciiDump(rawData)
|
||||
log.Printf("ASCII码:\n%s", asciiDump)
|
||||
|
||||
// 首先尝试解析为WH65LP数据
|
||||
if len(rawData) == 25 && rawData[0] == 0x24 {
|
||||
wh65lpData, err := model.ParseWH65LPData(rawData)
|
||||
if err != nil {
|
||||
log.Printf("解析WH65LP数据失败: %v", err)
|
||||
} else {
|
||||
log.Println("成功解析WH65LP气象站数据:")
|
||||
log.Println(wh65lpData)
|
||||
|
||||
// 更新内存中的设备信息
|
||||
model.UpdateDeviceInMemory(wh65lpData.StationID, addr, model.DeviceTypeWH65LP)
|
||||
// 注册设备到数据库
|
||||
err = model.RegisterDeviceInDB(wh65lpData.StationID, addr)
|
||||
if err != nil {
|
||||
log.Printf("注册设备失败: %v", err)
|
||||
}
|
||||
log.Printf("设备 %s 已注册,IP: %s", wh65lpData.StationID, addr.String())
|
||||
|
||||
// 保存数据
|
||||
err = model.SaveWH65LPData(wh65lpData, rawData)
|
||||
if err != nil {
|
||||
log.Printf("保存数据到数据库失败: %v", err)
|
||||
} else {
|
||||
log.Printf("数据已成功保存到数据库")
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 如果不是WH65LP数据,尝试解析为ECOWITT数据
|
||||
data := string(rawData)
|
||||
weatherData, err := model.ParseWeatherData(data)
|
||||
if err != nil {
|
||||
log.Printf("解析ECOWITT数据失败: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Println("成功解析ECOWITT气象站数据:")
|
||||
log.Println(weatherData)
|
||||
|
||||
if weatherData.StationID != "" {
|
||||
// 更新内存中的设备信息
|
||||
model.UpdateDeviceInMemory(weatherData.StationID, addr, model.DeviceTypeEcowitt)
|
||||
// 注册设备到数据库
|
||||
err = model.RegisterDeviceInDB(weatherData.StationID, addr)
|
||||
if err != nil {
|
||||
log.Printf("注册设备失败: %v", err)
|
||||
}
|
||||
log.Printf("设备 %s 已注册,IP: %s", weatherData.StationID, addr.String())
|
||||
} else {
|
||||
log.Printf("警告: 收到的数据没有站点ID")
|
||||
}
|
||||
|
||||
err = model.SaveWeatherData(weatherData, data)
|
||||
if err != nil {
|
||||
log.Printf("保存数据到数据库失败: %v", err)
|
||||
} else {
|
||||
log.Printf("数据已成功保存到数据库")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
setupLogger()
|
||||
startUDP() // 直接运行UDP服务器,不再使用goroutine
|
||||
}
|
||||
|
||||
func hexDump(data []byte) string {
|
||||
var result strings.Builder
|
||||
for i := 0; i < len(data); i += 16 {
|
||||
end := i + 16
|
||||
if end > len(data) {
|
||||
end = len(data)
|
||||
}
|
||||
chunk := data[i:end]
|
||||
hexStr := hex.EncodeToString(chunk)
|
||||
for j := 0; j < len(hexStr); j += 2 {
|
||||
if j+2 <= len(hexStr) {
|
||||
result.WriteString(strings.ToUpper(hexStr[j : j+2]))
|
||||
result.WriteString(" ")
|
||||
}
|
||||
}
|
||||
result.WriteString("\n")
|
||||
}
|
||||
return result.String()
|
||||
}
|
||||
|
||||
func asciiDump(data []byte) string {
|
||||
var result strings.Builder
|
||||
for i := 0; i < len(data); i += 64 {
|
||||
end := i + 64
|
||||
if end > len(data) {
|
||||
end = len(data)
|
||||
}
|
||||
chunk := data[i:end]
|
||||
for _, b := range chunk {
|
||||
if b >= 32 && b <= 126 {
|
||||
result.WriteByte(b)
|
||||
} else {
|
||||
result.WriteString(".")
|
||||
}
|
||||
}
|
||||
result.WriteString("\n")
|
||||
}
|
||||
return result.String()
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user