我将为你创建一个自动排水程序,这个程序可以根据水位传感器的输入来控制排水泵的工作,实现自动排水功能。程序将包含水位检测、泵控制、超时保护和状态指示等功能。
import RPi.GPIO as GPIO
import time
import logging
from datetime import datetime
import signal
import sys
# 配置日志
logging.basicConfig(
filename='drainage_system.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# GPIO引脚定义
HIGH_LEVEL_SENSOR_PIN = 17 # 高水位传感器
LOW_LEVEL_SENSOR_PIN = 27 # 低水位传感器
PUMP_CONTROL_PIN = 22 # 排水泵控制
ALARM_PIN = 23 # 报警装置
# 系统参数
PUMP_MAX_RUN_TIME = 300 # 泵最大运行时间(秒),防止持续运行导致故障
CHECK_INTERVAL = 5 # 状态检查间隔(秒)
PUMP_DELAY = 2 # 泵启动/停止延迟(秒),防止频繁切换
SENSOR_ERROR_THRESHOLD = 3 # 传感器错误阈值
class AutoDrainSystem:
def __init__(self):
# 初始化GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# 设置引脚模式
GPIO.setup(HIGH_LEVEL_SENSOR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(LOW_LEVEL_SENSOR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(PUMP_CONTROL_PIN, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(ALARM_PIN, GPIO.OUT, initial=GPIO.LOW)
# 系统状态变量
self.pump_running = False
self.pump_start_time = 0
self.system_active = True
self.sensor_error_count = 0
self.alarm_active = False
# 注册信号处理
signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)
logging.info("自动排水系统初始化完成")
print("自动排水系统启动...")
print("按Ctrl+C停止系统")
def read_water_level(self):
"""读取水位传感器状态"""
# 多次读取以减少误判
high_level_readings = []
low_level_readings = []
for _ in range(3):
high_level_readings.append(GPIO.input(HIGH_LEVEL_SENSOR_PIN))
low_level_readings.append(GPIO.input(LOW_LEVEL_SENSOR_PIN))
time.sleep(0.1)
# 取多数值作为最终结果
high_level = sum(high_level_readings) > 1
low_level = sum(low_level_readings) > 1
# 假设传感器输出低电平表示检测到水
high_water = not high_level
low_water = not low_level
return high_water, low_water
def start_pump(self):
"""启动排水泵"""
if not self.pump_running:
GPIO.output(PUMP_CONTROL_PIN, GPIO.HIGH)
self.pump_running = True
self.pump_start_time = time.time()
logging.info("排水泵启动")
print("排水泵启动")
def stop_pump(self):
"""停止排水泵"""
if self.pump_running:
GPIO.output(PUMP_CONTROL_PIN, GPIO.LOW)
self.pump_running = False
run_time = time.time() - self.pump_start_time
logging.info(f"排水泵停止,运行时间: {run_time:.1f}秒")
print(f"排水泵停止,运行时间: {run_time:.1f}秒")
def activate_alarm(self):
"""激活报警"""
if not self.alarm_active:
GPIO.output(ALARM_PIN, GPIO.HIGH)
self.alarm_active = True
logging.warning("系统报警激活")
print("警告: 系统报警激活")
def deactivate_alarm(self):
"""解除报警"""
if self.alarm_active:
GPIO.output(ALARM_PIN, GPIO.LOW)
self.alarm_active = False
logging.info("系统报警解除")
print("系统报警解除")
def check_pump_overrun(self):
"""检查泵是否超时运行"""
if self.pump_running:
run_time = time.time() - self.pump_start_time
if run_time > PUMP_MAX_RUN_TIME:
self.stop_pump()
self.activate_alarm()
logging.warning(f"排水泵超时运行,已自动停止。最大允许时间: {PUMP_MAX_RUN_TIME}秒")
print(f"警告: 排水泵超时运行,已自动停止")
return True
return False
def check_sensor_consistency(self, high_water, low_water):
"""检查传感器读数一致性"""
# 高水位传感器未检测到水,低水位传感器却检测到水,这是矛盾的
if not high_water and low_water:
self.sensor_error_count += 1
logging.warning(f"传感器读数不一致,错误计数: {self.sensor_error_count}")
print(f"警告: 传感器读数不一致,错误计数: {self.sensor_error_count}")
if self.sensor_error_count >= SENSOR_ERROR_THRESHOLD:
self.activate_alarm()
return False
return True
else:
# 重置错误计数
if self.sensor_error_count > 0:
self.sensor_error_count -= 1
if self.sensor_error_count == 0:
self.deactivate_alarm()
return True
def handle_shutdown(self, signum, frame):
"""处理系统关闭"""
logging.info("接收到关闭信号,系统正在停止...")
print("\n系统正在停止...")
self.system_active = False
self.stop_pump()
self.deactivate_alarm()
这个自动排水程序具有以下特点和功能:
使用时,只需将程序部署到树莓派等支持 GPIO 的开发板上,正确连接硬件后运行即可实现自动排水功能。