280 lines
6.8 KiB
Go

package main
import (
"bufio"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"sort"
"strings"
"time"
)
// Minimal Caiyun hourly model focusing on required fields.
type caiyunHourly struct {
Status string `json:"status"`
Result struct {
Hourly struct {
Status string `json:"status"`
Temperature []valTime `json:"temperature"`
Humidity []valTime `json:"humidity"`
Visibility []valTime `json:"visibility"`
Dswrf []valTime `json:"dswrf"`
Pressure []valTime `json:"pressure"`
Wind []windTime `json:"wind"`
} `json:"hourly"`
} `json:"result"`
}
type valTime struct {
Datetime string `json:"datetime"`
Value float64 `json:"value"`
}
type windTime struct {
Datetime string `json:"datetime"`
Speed float64 `json:"speed"`
Direction float64 `json:"direction"`
}
type row struct {
t time.Time
temperature *float64
humidity *float64
windSpeed *float64
windDir *float64
pressure *float64
visibility *float64
dswrf *float64
}
func main() {
var file string
var tz string
var mode string
var alias string
var lat float64
var lon float64
var sqlTable string
flag.StringVar(&file, "file", "", "Path to Caiyun hourly JSON; if empty, read from stdin")
flag.StringVar(&tz, "tz", "Asia/Shanghai", "Timezone for output timestamps")
flag.StringVar(&mode, "mode", "csv", "Output mode: csv | sql")
flag.StringVar(&alias, "alias", "", "Station alias for SQL output (required for mode=sql)")
flag.Float64Var(&lat, "lat", 0, "Latitude for SQL output")
flag.Float64Var(&lon, "lon", 0, "Longitude for SQL output")
flag.StringVar(&sqlTable, "sqltable", "radar_weather", "SQL table name for inserts")
flag.Parse()
var r io.Reader
if file == "" {
r = bufio.NewReader(os.Stdin)
} else {
f, err := os.Open(file)
if err != nil {
fatalf("open file: %v", err)
}
defer f.Close()
r = f
}
var payload caiyunHourly
dec := json.NewDecoder(r)
if err := dec.Decode(&payload); err != nil {
fatalf("decode json: %v", err)
}
if strings.ToLower(payload.Status) != "ok" && payload.Status != "" {
fatalf("top-level status not ok: %s", payload.Status)
}
loc, _ := time.LoadLocation(tz)
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// Merge series by timestamp
rowsByTime := map[time.Time]*row{}
upsert := func(ts string) *row {
t, ok := parseTime(ts, loc)
if !ok {
fatalf("parse time failed: %s", ts)
}
if v, exists := rowsByTime[t]; exists {
return v
}
nr := &row{t: t}
rowsByTime[t] = nr
return nr
}
for _, v := range payload.Result.Hourly.Temperature {
rr := upsert(v.Datetime)
rr.temperature = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Humidity {
rr := upsert(v.Datetime)
rr.humidity = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Visibility {
rr := upsert(v.Datetime)
rr.visibility = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Dswrf {
rr := upsert(v.Datetime)
rr.dswrf = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Pressure {
rr := upsert(v.Datetime)
rr.pressure = ptr(v.Value)
}
for _, w := range payload.Result.Hourly.Wind {
rr := upsert(w.Datetime)
rr.windSpeed = ptr(w.Speed)
rr.windDir = ptr(w.Direction)
}
// Sort by time
times := make([]time.Time, 0, len(rowsByTime))
for t := range rowsByTime {
times = append(times, t)
}
sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) })
if mode == "sql" {
if alias == "" {
fatalf("-alias is required for mode=sql")
}
// Emit upserts into radar_weather; convert wind_speed km/h -> m/s, keep humidity as ratio (0..1)
fmt.Println("BEGIN;")
for _, t := range times {
rr := rowsByTime[t]
// wind speed conversion
var ws string
if rr.windSpeed != nil {
v := *rr.windSpeed / 3.6
ws = trimZeros(fmt.Sprintf("%.6f", v))
}
// Build SQL with NULLs where missing
q := fmt.Sprintf(
"INSERT INTO %s (alias, lat, lon, dt, temperature, humidity, cloudrate, visibility, dswrf, wind_speed, wind_direction, pressure) "+
"VALUES (%s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s, %s) "+
"ON CONFLICT (alias, dt) DO UPDATE SET "+
"lat=EXCLUDED.lat, lon=EXCLUDED.lon, temperature=EXCLUDED.temperature, humidity=EXCLUDED.humidity, "+
"visibility=EXCLUDED.visibility, dswrf=EXCLUDED.dswrf, wind_speed=EXCLUDED.wind_speed, "+
"wind_direction=EXCLUDED.wind_direction, pressure=EXCLUDED.pressure;",
sqlTable,
sqlQuote(alias),
sqlNum(lat),
sqlNum(lon),
sqlTime(t),
sqlOpt(rr.temperature),
sqlOpt(rr.humidity),
sqlOpt(rr.visibility),
sqlOpt(rr.dswrf),
sqlStrOrNull(ws),
sqlOpt(rr.windDir),
sqlOpt(rr.pressure),
)
fmt.Println(q)
}
fmt.Println("COMMIT;")
return
}
// CSV output
w := csv.NewWriter(os.Stdout)
_ = w.Write([]string{"datetime", "temperature", "humidity", "wind_speed", "wind_direction", "pressure", "visibility", "dswrf"})
for _, t := range times {
rr := rowsByTime[t]
var ws string
if rr.windSpeed != nil {
v := *rr.windSpeed / 3.6
ws = trimZeros(fmt.Sprintf("%.6f", v))
}
rec := []string{
t.Format("2006-01-02 15:04:05"),
optf(rr.temperature),
optf(rr.humidity),
ws,
optf(rr.windDir),
optf(rr.pressure),
optf(rr.visibility),
optf(rr.dswrf),
}
_ = w.Write(rec)
}
w.Flush()
if err := w.Error(); err != nil {
fatalf("write csv: %v", err)
}
}
func ptr(f float64) *float64 { return &f }
func optf(p *float64) string {
if p == nil {
return ""
}
// Trim trailing zeros via fmt
return trimZeros(fmt.Sprintf("%.6f", *p))
}
func trimZeros(s string) string {
if !strings.Contains(s, ".") {
return s
}
s = strings.TrimRight(s, "0")
s = strings.TrimRight(s, ".")
return s
}
// parseTime attempts RFC3339 and common Caiyun formats without seconds.
func parseTime(s string, loc *time.Location) (time.Time, bool) {
// Try RFC3339 first
if t, err := time.Parse(time.RFC3339, s); err == nil {
return t.In(loc), true
}
// Try without seconds, with offset, e.g. 2006-01-02T15:04+08:00
if t, err := time.Parse("2006-01-02T15:04-07:00", s); err == nil {
return t.In(loc), true
}
// Try without offset (assume loc)
if t, err := time.ParseInLocation("2006-01-02 15:04", s, loc); err == nil {
return t.In(loc), true
}
return time.Time{}, false
}
func fatalf(format string, args ...any) {
fmt.Fprintf(os.Stderr, format+"\n", args...)
os.Exit(1)
}
func sqlQuote(s string) string {
return "'" + strings.ReplaceAll(s, "'", "''") + "'"
}
func sqlNum(f float64) string {
return trimZeros(fmt.Sprintf("%.8f", f))
}
func sqlTime(t time.Time) string {
return sqlQuote(t.Format("2006-01-02 15:04:05"))
}
func sqlOpt(p *float64) string {
if p == nil {
return "NULL"
}
return trimZeros(fmt.Sprintf("%.6f", *p))
}
func sqlStrOrNull(s string) string {
if s == "" {
return "NULL"
}
return s
}