基于WebSocket和Python服务端的CSI检测可视化网页上位机

0. 前言

我最近开始研究基于WiFi的室内定位和运动检测。CSI(Channel Status Information)是一种非常常用的技术,每个子载波会包含特定的幅度和相位信息。我想把CSI的检测结果可视化,正好我还在研究射箭图传观靶系统(这是另一个坑)的时候了解了WebSocket协议,可以用来实现服务端到网页前端的通信。

和CSI有关的技术,比如怎么根据CSI提取运动信息,又是另一个坑。这篇文章先假设我的WiFi模块能回传一个是否检测到运动的布尔值,然后在网页上显示出来。


1. 系统框图

系统框图 上图是整个系统的框图。WiFi模组和AP之间建立WiFi连接,模组把CSI信息传给一块嵌入式Linux开发板,开发板再通过串口把信息传给PC。当然如果MCU本身就带有WiFi功能,Linux开发板和WiFi模组就可以合二为一了。

PC上的服务端是用Python写的。服务端首先通过串口收到CSI信息,然后通过WebSocket把CSI的结果传给客户端。

客户端,即一个网页,收到服务端发来的信息后,实时画出检测曲线。


2. Python服务端

Python服务端需要做两件事:用串口和Linux开发板通信,用WebSocket和网页客户端通信。

串口通信

pySerial是用于串口通信的库。首先列出电脑的所有串口并连上Linux开发板。

import serial
import serial.tools.list_ports

ports_list=list(serial.tools.list_ports.comports())
for comport in ports_list:
    print(list(comport)[0],list(comport)[1])

ser = serial.Serial("PORTNAME",115200)
if ser.is_open:
    print(ser.name+ "opened")
else:
    print("Failed to open port")

通过pySerial的write功能我们可以给开发板下指令,但是这部分交互有点麻烦。所以我另外开一个窗口,在开发板的命令行下执行完所有CSI有关的指令后,才回到PC上的Python服务端。这时,开发板会不停地给PC发送CSI的检测结果,服务端只需要不停地读取串口内容即可:

while (True):
    line=str(ser.readline())
    print(line)
    line_data_parsing()  #some processing, eg. apply a moving average filter

通过WebSocket发送数据给网页前端

WebSocket是一种基于TCP的轻量协议,这里在Python创建一个server,当有client连进来或者离开时都有对应的函数来处理它。因为这个应用里只有服务端给客户端发,所以没有设置接收到消息时的动作,在其他应用里有需要的话也可以加上。

def start_server():
    server = WebsocketServer(port=4200, host='127.0.0.1', loglevel=logging.INFO)
    server.set_fn_new_client(new_client)
    server.set_fn_client_left(client_left)
    server.run_forever()

另外再准备一个函数用来给客户端发数据,其中x是代表时间的横坐标,y是运动检测的结果。数据会用json打一个包,客户端收到后也会用json解包。

def updateStatus(client,server):
    data={'x':x,'y':motion}
    server.send_message(client,json.dumps(data))

定时器

线程时序 服务端既要不停地从串口读东西,又要通过WebSocket传东西,所以需要两个线程互相不阻塞。此外,串口的速度是没法通过PC控制的,是Linux开发板有多快打多快。如果每次从串口读到东西就立刻更新的话,服务端和客户端的负担都会增加。所以我们用上打拍子的概念,每0.1秒给客户端推送一次最新的串口的状态。上图就是最后的线程时序框图。

用Python实现这两个线程也是相当直觉的,当有新客户端连入时就启动定时器线程。

def new_client(client, server):
    print("Client(%d) has joined." % client['id'])
    timer_thread=threading.Thread(target=timer_loop,args=(stop_event,client,server))
    timer_thread.start()
    
def timer_loop(stop_event,client,server):
    while not stop_event.is_set():
        updateStatus(client,server)
        stop_event.wait(0.1)

# Start Thread 1 and Thread 2 in main()
stop_event=threading.Event()

t1=threading.Thread(target=start_server)
t2=threading.Thread(target=read_serialport)

t1.start()
t2.start()

番外:用AI写代码

定时0.1秒发送数据的模块是Deepseek帮我写的。首先我让他帮我写一段Python代码,每0.1秒执行某段函数。他的第一版回答是个简单的sleep。

import time

def my_function():
    print("执行函数")

count = 0
while count < 10:
    my_function()
    time.sleep(0.1)  # 暂停0.1秒
    count += 1

我告诉他我不想阻塞其他功能的运行,不要用sleep,于是他给了另一个版本:

import threading

def my_function():
    print("执行函数")
    # 设置下一次执行
    threading.Timer(0.1, my_function).start()

# 启动定时任务
my_function()

# 主程序可以继续运行其他任务
print("主程序继续运行...")

# 模拟主程序的其他任务
while True:
    print("主程序正在运行...")
    time.sleep(1)  # 这里用 sleep 只是为了演示主程序的其他任务

我试着按他的回答写了一个服务端并且成功运行了,然而过了一段时间就会报错。

使用threading.Timer(0.1,my_function)运行一段时间以后会报错,显示maximum recursion depth exceeded,怎么办

Deepseek最后给出了本文里用的方案。他的完整回答在这里

3. Javascript前端

我用echarts来实现在网页里实时画图。客户端从WebSocket收到数据后更新x轴和y轴并重新画图。

<script src='https://cdn.bootcdn.net/ajax/libs/echarts/5.2.2/echarts.min.js'></script>
<title>CSI Motion Detection</title>
<div id='chart' style='width: 1200px; height: 800px;'></div>
<script type="text/javascript">
    var chart = echarts.init(document.getElementById('chart'));
    var option = {
    title: {
    text: '实时数据',
    backgroundColor: 'rgb(128,128,128)',
    textStyle:{fontSize:48},
    left: 'right'
    },
    tooltip: {
    trigger: 'axis'
    },
    xAxis: {
    type: 'category',
    data: [] // 这里用于存放X轴数据
    },
    yAxis: {
    type: 'value',
    min:0,
    max:1
    },
    series: [{
    data: [], // 这里用于存放Y轴数据
    type: 'line'
    }]
    };
    chart.setOption(option);

    function updateChart(data) {
        
        option.xAxis.data.push(data.x);
        option.series[0].data.push(data.y); 
        if (option.xAxis.data.length >= 20){
            option.xAxis.data.shift();
            option.series[0].data.shift();
        }
        
        if (data.y > 0.5){
            option.title.text='Motion Detected';
            option.title.backgroundColor='rgb(0,200,0)';
        }
        else {
            option.title.text='No Motion';
            option.title.backgroundColor='rgb(128,128,128)';
        }

        chart.setOption(option); 
    }


    var socket = new WebSocket("ws://localhost:4200"); // 替换为你的WebSocket服务器地址
    socket.onopen = function(event) {
    console.log('WebSocket connected');
    };
    socket.onmessage = function(event) {
    var data = JSON.parse(event.data); // 假设服务器发送的数据是JSON格式的字符串
    updateChart(data); // 调用函数更新图表数据
    };
    socket.onclose = function(event) {
    console.log('WebSocket closed');
    };
    socket.onerror = function(error) {
    console.log('WebSocket error: ' + error);
    };
</script>

参考资料

  1. Echarts + Websocket 实现页面图表实时更新

  2. Writing WebSocket client applications

  3. 基于WebSocket的网页(JS)与服务器(Python)数据交互

  4. 如何实现JS前端与Python后台的结合