# 基于WebSocket和Python服务端的CSI检测可视化网页上位机 ## 0. 前言 我最近开始研究基于WiFi的室内定位和运动检测。CSI(Channel Status Information)是一种非常常用的技术,每个子载波会包含特定的幅度和相位信息。我想把CSI的检测结果可视化,正好我还在研究射箭图传观靶系统(这是另一个坑)的时候了解了WebSocket协议,可以用来实现服务端到网页前端的通信。 和CSI有关的技术,比如怎么根据CSI提取运动信息,又是[另一个坑](csi_esp.html)。这篇文章先假设我的WiFi模块能回传一个是否检测到运动的布尔值,然后在网页上显示出来。 --- ## 1. 系统框图 ![系统框图](csi.png) 上图是整个系统的框图。WiFi模组和AP之间建立WiFi连接,模组把CSI信息传给一块嵌入式Linux开发板,开发板再通过串口把信息传给PC。当然如果MCU本身就带有WiFi功能,Linux开发板和WiFi模组就可以合二为一了。 PC上的服务端是用Python写的。服务端首先通过串口收到CSI信息,然后通过WebSocket把CSI的结果传给客户端。 客户端,即一个网页,收到服务端发来的信息后,实时画出检测曲线。 --- ## 2. Python服务端 Python服务端需要做两件事:用串口和Linux开发板通信,用WebSocket和网页客户端通信。 ### 串口通信 pySerial是用于串口通信的库。首先列出电脑的所有串口并连上Linux开发板。 ``` python import serial import serial.tools.list_ports ports_list=list(serial.tools.list_ports.comports()) for comport in ports_list: print(list(comport)[0],list(comport)[1]) ser = serial.Serial("PORTNAME",115200) if ser.is_open: print(ser.name+ "opened") else: print("Failed to open port") ``` 通过pySerial的write功能我们可以给开发板下指令,但是这部分交互有点麻烦。所以我另外开一个窗口,在开发板的命令行下执行完所有CSI有关的指令后,才回到PC上的Python服务端。这时,开发板会不停地给PC发送CSI的检测结果,服务端只需要不停地读取串口内容即可: ``` python while (True): line=str(ser.readline()) print(line) line_data_parsing() #some processing, eg. apply a moving average filter ``` ### 通过WebSocket发送数据给网页前端 WebSocket是一种基于TCP的轻量协议,这里在Python创建一个server,当有client连进来或者离开时都有对应的函数来处理它。因为这个应用里只有服务端给客户端发,所以没有设置接收到消息时的动作,在其他应用里有需要的话也可以加上。 ``` python def start_server(): server = WebsocketServer(port=4200, host='127.0.0.1', loglevel=logging.INFO) server.set_fn_new_client(new_client) server.set_fn_client_left(client_left) server.run_forever() ``` 另外再准备一个函数用来给客户端发数据,其中x是代表时间的横坐标,y是运动检测的结果。数据会用json打一个包,客户端收到后也会用json解包。 ``` python def updateStatus(client,server): data={'x':x,'y':motion} server.send_message(client,json.dumps(data)) ``` ### 定时器 ![线程时序](threading.png) 服务端既要不停地从串口读东西,又要通过WebSocket传东西,所以需要两个线程互相不阻塞。此外,串口的速度是没法通过PC控制的,是Linux开发板有多快打多快。如果每次从串口读到东西就立刻更新的话,服务端和客户端的负担都会增加。所以我们用上打拍子的概念,每0.1秒给客户端推送一次最新的串口的状态。上图就是最后的线程时序框图。 用Python实现这两个线程也是相当直觉的,当有新客户端连入时就启动定时器线程。 ``` python def new_client(client, server): print("Client(%d) has joined." % client['id']) timer_thread=threading.Thread(target=timer_loop,args=(stop_event,client,server)) timer_thread.start() def timer_loop(stop_event,client,server): while not stop_event.is_set(): updateStatus(client,server) stop_event.wait(0.1) # Start Thread 1 and Thread 2 in main() stop_event=threading.Event() t1=threading.Thread(target=start_server) t2=threading.Thread(target=read_serialport) t1.start() t2.start() ``` ### 番外:用AI写代码 定时0.1秒发送数据的模块是Deepseek帮我写的。首先我让他帮我写一段Python代码,每0.1秒执行某段函数。他的第一版回答是个简单的sleep。 ```python import time def my_function(): print("执行函数") count = 0 while count < 10: my_function() time.sleep(0.1) # 暂停0.1秒 count += 1 ``` 我告诉他我不想阻塞其他功能的运行,不要用sleep,于是他给了另一个版本: ``` python import threading def my_function(): print("执行函数") # 设置下一次执行 threading.Timer(0.1, my_function).start() # 启动定时任务 my_function() # 主程序可以继续运行其他任务 print("主程序继续运行...") # 模拟主程序的其他任务 while True: print("主程序正在运行...") time.sleep(1) # 这里用 sleep 只是为了演示主程序的其他任务 ``` 我试着按他的回答写了一个服务端并且成功运行了,然而过了一段时间就会报错。 > 使用threading.Timer(0.1,my_function)运行一段时间以后会报错,显示maximum recursion depth exceeded,怎么办 Deepseek最后给出了本文里用的方案。他的完整回答在[这里](../cat0_env/python_timer.html)。 ## 3. Javascript前端 我用echarts来实现在网页里实时画图。客户端从WebSocket收到数据后更新x轴和y轴并重新画图。 ``` html CSI Motion Detection
``` --- ## 参考资料 1. [Echarts + Websocket 实现页面图表实时更新](https://cloud.baidu.com/article/2927344) 2. [Writing WebSocket client applications](https://zhaoxuhui.top/blog/2018/05/05/WebSocket&Client&Server.html) 3. [基于WebSocket的网页(JS)与服务器(Python)数据交互](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications) 4. [如何实现JS前端与Python后台的结合](https://blog.csdn.net/a312863063/article/details/87898349)