返回主站|会员中心|保存桌面

AB罗克韦尔服务商    

PLC、变频器、人机界面、传感器

联系方式
  • 联系人:吴经理
  • 电话:18576370666
新闻分类
  • 暂无分类
首页 > 新闻中心 > 自动排水程序!
新闻中心
自动排水程序!
发布时间:2025-09-15        浏览次数:1        返回列表

我将为你创建一个自动排水程序,这个程序可以根据水位传感器的输入来控制排水泵的工作,实现自动排水功能。程序将包含水位检测、泵控制、超时保护和状态指示等功能。

import RPi.GPIO as GPIO

import time

import logging

from datetime import datetime

import signal

import sys


# 配置日志

logging.basicConfig(

    filename='drainage_system.log',

    level=logging.INFO,

    format='%(asctime)s - %(levelname)s - %(message)s'

)


# GPIO引脚定义

HIGH_LEVEL_SENSOR_PIN = 17  # 高水位传感器

LOW_LEVEL_SENSOR_PIN = 27   # 低水位传感器

PUMP_CONTROL_PIN = 22       # 排水泵控制

ALARM_PIN = 23              # 报警装置


# 系统参数

PUMP_MAX_RUN_TIME = 300     # 泵最大运行时间(秒),防止持续运行导致故障

CHECK_INTERVAL = 5          # 状态检查间隔(秒)

PUMP_DELAY = 2              # 泵启动/停止延迟(秒),防止频繁切换

SENSOR_ERROR_THRESHOLD = 3  # 传感器错误阈值


class AutoDrainSystem:

    def __init__(self):

        # 初始化GPIO

        GPIO.setmode(GPIO.BCM)

        GPIO.setwarnings(False)

        

        # 设置引脚模式

        GPIO.setup(HIGH_LEVEL_SENSOR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)

        GPIO.setup(LOW_LEVEL_SENSOR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)

        GPIO.setup(PUMP_CONTROL_PIN, GPIO.OUT, initial=GPIO.LOW)

        GPIO.setup(ALARM_PIN, GPIO.OUT, initial=GPIO.LOW)

        

        # 系统状态变量

        self.pump_running = False

        self.pump_start_time = 0

        self.system_active = True

        self.sensor_error_count = 0

        self.alarm_active = False

        

        # 注册信号处理

        signal.signal(signal.SIGINT, self.handle_shutdown)

        signal.signal(signal.SIGTERM, self.handle_shutdown)

        

        logging.info("自动排水系统初始化完成")

        print("自动排水系统启动...")

        print("按Ctrl+C停止系统")


    def read_water_level(self):

        """读取水位传感器状态"""

        # 多次读取以减少误判

        high_level_readings = []

        low_level_readings = []

        

        for _ in range(3):

            high_level_readings.append(GPIO.input(HIGH_LEVEL_SENSOR_PIN))

            low_level_readings.append(GPIO.input(LOW_LEVEL_SENSOR_PIN))

            time.sleep(0.1)

        

        # 取多数值作为最终结果

        high_level = sum(high_level_readings) > 1

        low_level = sum(low_level_readings) > 1

        

        # 假设传感器输出低电平表示检测到水

        high_water = not high_level

        low_water = not low_level

        

        return high_water, low_water


    def start_pump(self):

        """启动排水泵"""

        if not self.pump_running:

            GPIO.output(PUMP_CONTROL_PIN, GPIO.HIGH)

            self.pump_running = True

            self.pump_start_time = time.time()

            logging.info("排水泵启动")

            print("排水泵启动")


    def stop_pump(self):

        """停止排水泵"""

        if self.pump_running:

            GPIO.output(PUMP_CONTROL_PIN, GPIO.LOW)

            self.pump_running = False

            run_time = time.time() - self.pump_start_time

            logging.info(f"排水泵停止,运行时间: {run_time:.1f}秒")

            print(f"排水泵停止,运行时间: {run_time:.1f}秒")


    def activate_alarm(self):

        """激活报警"""

        if not self.alarm_active:

            GPIO.output(ALARM_PIN, GPIO.HIGH)

            self.alarm_active = True

            logging.warning("系统报警激活")

            print("警告: 系统报警激活")


    def deactivate_alarm(self):

        """解除报警"""

        if self.alarm_active:

            GPIO.output(ALARM_PIN, GPIO.LOW)

            self.alarm_active = False

            logging.info("系统报警解除")

            print("系统报警解除")


    def check_pump_overrun(self):

        """检查泵是否超时运行"""

        if self.pump_running:

            run_time = time.time() - self.pump_start_time

            if run_time > PUMP_MAX_RUN_TIME:

                self.stop_pump()

                self.activate_alarm()

                logging.warning(f"排水泵超时运行,已自动停止。最大允许时间: {PUMP_MAX_RUN_TIME}秒")

                print(f"警告: 排水泵超时运行,已自动停止")

                return True

        return False


    def check_sensor_consistency(self, high_water, low_water):

        """检查传感器读数一致性"""

        # 高水位传感器未检测到水,低水位传感器却检测到水,这是矛盾的

        if not high_water and low_water:

            self.sensor_error_count += 1

            logging.warning(f"传感器读数不一致,错误计数: {self.sensor_error_count}")

            print(f"警告: 传感器读数不一致,错误计数: {self.sensor_error_count}")

            

            if self.sensor_error_count >= SENSOR_ERROR_THRESHOLD:

                self.activate_alarm()

                return False

            return True

        else:

            # 重置错误计数

            if self.sensor_error_count > 0:

                self.sensor_error_count -= 1

            if self.sensor_error_count == 0:

                self.deactivate_alarm()

            return True


    def handle_shutdown(self, signum, frame):

        """处理系统关闭"""

        logging.info("接收到关闭信号,系统正在停止...")

        print("\n系统正在停止...")

        self.system_active = False

        self.stop_pump()

        self.deactivate_alarm()

这个自动排水程序具有以下特点和功能:


  1. 核心功能

    • 通过高 / 低两个水位传感器检测水位状态

    • 当水位达到高水位时自动启动排水泵

    • 当水位降至低水位以下时自动停止排水泵

    • 实时监测系统状态并记录日志

  2. 安全保护机制

    • 排水泵超时运行保护(防止水泵空转或故障)

    • 传感器一致性检查(检测传感器异常)

    • 报警系统(异常情况时激活)

    • 优雅的系统 shutdown 处理

  3. 使用说明

    • 需要连接两个水位传感器、一个排水泵控制端和一个报警装置

    • 程序中定义的 GPIO 引脚可以根据实际硬件连接进行修改

    • 系统参数(如最大运行时间、检查间隔)可以根据实际需求调整

    • 运行日志会保存在 drainage_system.log 文件中


使用时,只需将程序部署到树莓派等支持 GPIO 的开发板上,正确连接硬件后运行即可实现自动排水功能。

收缩
  • 电话咨询

  • 18576370666
  • 添加微信客服