UART 串口通讯

环境搭建

使用pyserial库

pyserial 库封装了对串口资源的访问方法,该库兼容多种平台对串口资源使用, 有许多平台特性相关的方法。

安装 pyserial

# 在板卡使用如下命令安装
sudo pip3 install pyserial

使用 pyserial

import serial
import serial.tools.list_ports
import time
import argparse
import binascii

def list_available_ports():
    """列出所有可用的串口设备"""
    ports = serial.tools.list_ports.comports()
    print("可用的串口设备:")
    for i, port in enumerate(ports):
        print(f"{i+1}. {port.device} - {port.description}")
    return ports

def configure_serial_port(port, baudrate=115200, timeout=1):
    """配置并打开串口连接"""
    try:
        ser = serial.Serial(
            port=port,
            baudrate=baudrate,
            bytesize=serial.EIGHTBITS,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            timeout=timeout
        )
        print(f"串口 {port} 已打开,波特率: {baudrate}")
        return ser
    except serial.SerialException as e:
        print(f"无法打开串口 {port}: {e}")
        return None

def send_data(ser, data, encoding='utf-8'):
    """发送数据到串口"""
    try:
        # 根据数据类型选择发送方式
        if isinstance(data, str):
            # 字符串需要编码为字节
            bytes_data = data.encode(encoding)
        elif isinstance(data, bytes):
            bytes_data = data
        else:
            # 其他类型转换为字符串再编码
            bytes_data = str(data).encode(encoding)
        
        # 发送数据
        bytes_sent = ser.write(bytes_data)
        print(f"已发送 {bytes_sent} 字节数据: {data}")
        return bytes_sent
    except Exception as e:
        print(f"发送数据时出错: {e}")
        return 0

def receive_data(ser, max_bytes=1024, encoding='utf-8', display_mode='text'):
    """从串口接收数据"""
    try:
        # 等待数据到达
        if ser.in_waiting > 0:
            # 读取可用数据
            data = ser.read(min(ser.in_waiting, max_bytes))
            
            if data:
                print(f"接收到 {len(data)} 字节数据")
                
                # 根据显示模式展示数据
                if display_mode == 'hex':
                    # 十六进制显示
                    hex_data = binascii.hexlify(data).decode('ascii')
                    print(f"十六进制: {hex_data}")
                elif display_mode == 'raw':
                    # 原始字节显示
                    print(f"原始数据: {data}")
                else:
                    # 文本显示 (默认)
                    try:
                        text_data = data.decode(encoding)
                        print(f"文本数据: {text_data}")
                    except UnicodeDecodeError:
                        print("无法解码为文本,尝试十六进制显示:")
                        hex_data = binascii.hexlify(data).decode('ascii')
                        print(f"十六进制: {hex_data}")
                
                return data
            else:
                print("未接收到数据")
                return None
        else:
            print("没有可用数据")
            return None
    except Exception as e:
        print(f"接收数据时出错: {e}")
        return None

def interactive_mode(ser, encoding='utf-8', display_mode='text'):
    """交互式串口通信模式"""
    print("\n=== 进入交互式模式 ===")
    print("输入 'exit' 退出")
    print("输入 'hex' 切换到十六进制显示模式")
    print("输入 'text' 切换到文本显示模式")
    print("输入 'raw' 切换到原始数据显示模式")
    
    try:
        while True:
            # 接收并显示数据
            receive_data(ser, encoding=encoding, display_mode=display_mode)
            
            # 获取用户输入
            user_input = input("\n请输入要发送的数据: ")
            
            # 处理特殊命令
            if user_input.lower() == 'exit':
                break
            elif user_input.lower() == 'hex':
                display_mode = 'hex'
                print("已切换到十六进制显示模式")
                continue
            elif user_input.lower() == 'text':
                display_mode = 'text'
                print("已切换到文本显示模式")
                continue
            elif user_input.lower() == 'raw':
                display_mode = 'raw'
                print("已切换到原始数据显示模式")
                continue
            
            # 发送用户输入的数据
            send_data(ser, user_input, encoding=encoding)
            
            # 等待一小段时间,让接收设备有时间响应
            time.sleep(0.1)
            
    except KeyboardInterrupt:
        print("\n用户中断,退出交互式模式")

def auto_detect_port():
    """自动检测可能的串口设备"""
    ports = serial.tools.list_ports.comports()
    if not ports:
        print("未检测到任何串口设备")
        return None
    
    # 优先选择可能的USB转串口设备
    for port in ports:
        if "USB" in port.description or "Serial" in port.description:
            return port.device
    
    # 如果没有USB设备,返回第一个找到的串口
    return ports[0].device

def main():
    """主函数"""
    # 设置命令行参数解析器
    parser = argparse.ArgumentParser(description='串口通信示例程序')
    parser.add_argument('-p', '--port', help='串口设备路径 (例如: COM3, /dev/ttyUSB0)')
    parser.add_argument('-b', '--baudrate', type=int, default=115200, help='波特率 (默认: 115200)')
    parser.add_argument('-e', '--encoding', default='utf-8', help='字符编码 (默认: utf-8)')
    parser.add_argument('-t', '--timeout', type=float, default=1.0, help='超时时间 (秒, 默认: 1.0)')
    parser.add_argument('-m', '--mode', choices=['interactive', 'send', 'receive'], 
                        default='interactive', help='运行模式 (默认: interactive)')
    parser.add_argument('-d', '--data', help='要发送的数据 (在send模式下使用)')
    parser.add_argument('--list', action='store_true', help='列出所有可用串口并退出')
    
    args = parser.parse_args()
    
    # 列出可用串口并退出
    if args.list:
        list_available_ports()
        return
    
    # 如果未指定端口,尝试自动检测
    if not args.port:
        args.port = auto_detect_port()
        if not args.port:
            print("无法自动检测串口,请手动指定")
            list_available_ports()
            return
    
    # 配置并打开串口
    ser = configure_serial_port(args.port, args.baudrate, args.timeout)
    if not ser:
        return
    
    try:
        # 根据选择的模式执行不同操作
        if args.mode == 'send':
            if args.data is None:
                print("错误: 在发送模式下必须提供数据 (-d/--data)")
                return
            send_data(ser, args.data, args.encoding)
            # 给接收设备一些时间响应
            time.sleep(0.5)
            receive_data(ser, encoding=args.encoding)
            
        elif args.mode == 'receive':
            print(f"等待从 {args.port} 接收数据...")
            while True:
                receive_data(ser, encoding=args.encoding)
                time.sleep(0.1)
                
        else:  # interactive 模式
            interactive_mode(ser, args.encoding)
            
    finally:
        # 确保关闭串口
        if ser and ser.is_open:
            ser.close()
            print(f"串口 {args.port} 已关闭")

if __name__ == "__main__":
    main()

这个示例提供了一个功能完整的串口通信程序,具有以下特点:

  1. 自动检测串口设备:能够列出并自动选择可用的串口

  2. 灵活的配置选项

    • 可自定义波特率、超时时间和字符编码

    • 支持命令行参数配置

  3. 多种通信模式

    • 交互式模式:允许用户实时发送和接收数据

    • 发送模式:发送指定数据并接收响应

    • 接收模式:持续监听并显示接收到的数据

  4. 数据显示选项

    • 文本模式:显示解码后的文本

    • 十六进制模式:显示原始字节的十六进制表示

    • 原始模式:显示原始字节数据

  5. 完善的错误处理:捕获并显示可能的异常情况

  6. 清晰的代码结构:功能模块化,便于理解和扩展

使用方法

  1. 安装依赖

    pip install pyserial
    
  2. 运行程序

    • 列出所有可用串口:

      python uart_communication.py --list
      
    • 交互式模式(自动检测串口):

      python uart_communication.py
      
    • 指定串口和波特率:

      python uart_communication.py -p COM3 -b 9600
      
    • 发送固定数据:

      python uart_communication.py -p COM3 -m send -d "Hello World!"
      
    • 持续接收数据:

      python uart_communication.py -p COM3 -m receive