rain_forcast/src/database/db_connector.py
2025-05-21 11:33:39 +08:00

236 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import pymysql
from datetime import datetime, timedelta
class DatabaseConnector:
def __init__(self, rtk_host='localhost', rtk_user='root', rtk_password='root', rtk_database='rtk_data',
rain_host='8.134.185.53', rain_user='remote', rain_password='root', rain_database='rain_db'):
"""
初始化数据库连接器
"""
# RTK数据库配置
self.rtk_host = 'localhost'
self.rtk_user = 'root'
self.rtk_password = 'root'
self.rtk_database = 'rtk_data'
self.rtk_connection = None
# 气象数据库配置
self.rain_host = '8.134.185.53'
self.rain_user = 'remote'
self.rain_password = 'root'
self.rain_database = 'rain_db'
self.rain_connection = None
def connect_rtk(self):
"""
建立RTK数据库连接
"""
try:
self.rtk_connection = pymysql.connect(
host=self.rtk_host,
user=self.rtk_user,
password=self.rtk_password,
database=self.rtk_database
)
return True
except Exception as e:
print(f"RTK数据库连接失败: {str(e)}")
return False
def connect_rain(self):
"""
建立气象数据库连接
"""
try:
self.rain_connection = pymysql.connect(
host=self.rain_host,
user=self.rain_user,
password=self.rain_password,
database=self.rain_database
)
return True
except Exception as e:
print(f"气象数据库连接失败: {str(e)}")
return False
def disconnect(self):
"""
关闭所有数据库连接
"""
if self.rtk_connection:
self.rtk_connection.close()
self.rtk_connection = None
if self.rain_connection:
self.rain_connection.close()
self.rain_connection = None
def get_latest_weather_data(self):
"""
获取最新的气象数据合并RTK和气象数据的最新记录
使用RTK数据的时间作为基准时间
"""
# 获取RTK数据
if not self.rtk_connection:
if not self.connect_rtk():
return None
try:
# 获取RTK数据最新记录
rtk_query = """
SELECT
timestamp,
pwv,
ztd,
ztd_gradient_east,
ztd_gradient_north
FROM rtk_data
ORDER BY timestamp DESC
LIMIT 1
"""
rtk_df = pd.read_sql(rtk_query, self.rtk_connection)
if rtk_df.empty:
print("RTK数据库中没有数据")
return None
# 获取气象数据最新记录
if not self.rain_connection:
if not self.connect_rain():
print("无法连接到气象数据库")
return rtk_df
rain_query = """
SELECT
wind_speed,
wind_force,
wind_direction_360 as wind_direction,
humidity,
temperature,
atm_pressure,
solar_radiation,
rainfall
FROM sensor_data
ORDER BY timestamp DESC
LIMIT 1
"""
rain_df = pd.read_sql(rain_query, self.rain_connection)
if rain_df.empty:
print("气象数据库中没有数据")
return rtk_df
# 重命名RTK数据列
rtk_df = rtk_df.rename(columns={
'timestamp': 'datetime',
'pwv': 'PWV',
'ztd': 'ZTD'
})
# 重命名气象数据列
rain_df = rain_df.rename(columns={
'humidity': 'rh',
'temperature': 'Temp',
'atm_pressure': 'Press',
'wind_speed': 'ws'
})
# 将RTK的时间戳转换为datetime类型
rtk_df['datetime'] = pd.to_datetime(rtk_df['datetime'])
# 合并数据使用RTK的时间
merged_df = pd.concat([rtk_df, rain_df], axis=1)
# 删除重复的列(如果有的话)
merged_df = merged_df.loc[:,~merged_df.columns.duplicated()]
return merged_df
except Exception as e:
print(f"获取数据失败: {str(e)}")
return None
def get_historical_data(self, start_date, end_date):
"""
获取历史数据合并RTK和气象数据
"""
# 获取RTK数据
if not self.rtk_connection:
if not self.connect_rtk():
return None
try:
# 获取RTK数据
rtk_query = """
SELECT
timestamp,
pwv,
ztd,
ztd_gradient_east,
ztd_gradient_north
FROM rtk_data
WHERE timestamp BETWEEN %s AND %s
ORDER BY timestamp
"""
rtk_df = pd.read_sql(rtk_query, self.rtk_connection, params=[start_date, end_date])
# 重命名RTK数据列
rtk_df = rtk_df.rename(columns={
'timestamp': 'datetime',
'pwv': 'PWV',
'ztd': 'ZTD'
})
# 获取气象数据
if not self.rain_connection:
if not self.connect_rain():
return rtk_df # 如果气象数据库连接失败返回RTK数据
rain_query = """
SELECT
timestamp,
wind_speed,
wind_force,
wind_direction_360 as wind_direction,
humidity,
temperature,
atm_pressure,
solar_radiation,
rainfall
FROM sensor_data
WHERE timestamp BETWEEN %s AND %s
ORDER BY timestamp
"""
rain_df = pd.read_sql(rain_query, self.rain_connection, params=[start_date, end_date])
# 重命名气象数据列
rain_df = rain_df.rename(columns={
'timestamp': 'datetime',
'humidity': 'rh',
'temperature': 'Temp',
'atm_pressure': 'Press',
'wind_speed': 'ws'
})
# 合并数据
if not rain_df.empty:
# 将时间戳转换为datetime类型
rtk_df['datetime'] = pd.to_datetime(rtk_df['datetime'])
rain_df['datetime'] = pd.to_datetime(rain_df['datetime'])
# 合并数据
merged_df = pd.merge(rtk_df, rain_df, on='datetime', how='outer')
# 按时间排序
merged_df = merged_df.sort_values('datetime')
# 填充缺失值
merged_df = merged_df.fillna(method='ffill')
return merged_df
else:
return rtk_df
except Exception as e:
print(f"获取历史数据失败: {str(e)}")
return None