25 Introduction to Main Program Architecture

From Waveshare Wiki
Jump to: navigation, search

File Structure and Functionality

  • ugv_pt_rpi
    • [Folder] AccessPopup (Used for network connection-related functions)
    • [Folder] sounds (Used to store audio files, where you can configure voice packages)
    • [Folder] static (Used to store captured photos)
    • [Folder] templates (Related resources for the web application)
    • [Folder] tutorial_cn (Chinese version of interactive tutorials)
    • [Folder] tutorial_en (English version of interactive tutorials)
    • [Folder] videos (Used to store recorded videos)
    • app.py (Main program of the product, including web-socket and Flask-related functionalities)
    • asound.conf (Sound card configuration file)
    • audio_ctrl.py (Library related to audio functionalities)
    • autorun.sh (Script to configure automatic startup of the main program and JupyterLab)
    • base_camera.py (Library for flask real-time video streaming with underlying multithreaded capture, original project is flask-video-streaming)
    • base_ctrl.py (Library for communication with the lower computer, communicates with the lower computer via serial port)
    • config.yaml (Configuration file used to configure some parameters)
    • requirements.txt (Python project dependencies)
    • robot_ctrl.py (Library for robot actions and visual processing)
    • serial_simple_ctrl.py (Standalone program used for testing serial communication)
    • setup.sh (Automatic installation script)
    • start_jupyter.sh (Script to start the JupyterLab server)he JupyterLab server)

Installation Script

In the project folder, there is a file named setup.sh, written in shell script, which helps automate the configuration of the upper computer for the robot product. This includes setting up the serial port, configuring the camera, creating a project virtual environment, and installing dependencies. These steps are already configured in the SD card image we provide.
Usage of the installation script: The installation process involves downloading and installing many dependencies from the internet. For areas with special network environments, we recommend downloading the image file directly from our official website to install the product.

Automatic Program Execution

The autorun.sh script in the project folder is used to configure the automatic startup (as a user, not root) of the main program (app.py) and JupyterLab (start_jupyter.sh), and generates the configuration file for JupyterLab.terLab.

app.py Introduction (v0.89)

The following code block is for demonstration purposes only and cannot be executed.
Import the Flask application and libraries related to JSON for building a web application.

from importlib import import_module
import os, socket, psutil
import subprocess, re, netifaces
from flask import Flask, render_template, Response, jsonify, request, send_from_directory, send_file
from werkzeug.utils import secure_filename
import json

Import libraries related to web-socket.

from flask_socketio import SocketIO, emit

Import libraries related to robot control, including visual functions and action control.

from robot_ctrl import Camera
from robot_ctrl import RobotCtrlMiddleWare

Import other libraries.

import time    # Time function library
import logging    # Used to set output information for Flask application
import threading    # Threading function library

import yaml    # Used to read .yaml configuration files

Open the config.yaml configuration file to retrieve parameters from the configuration file.

curpath = os.path.realpath(__file__)
thisPath = os.path.dirname(curpath)
with open(thisPath + '/config.yaml', 'r') as yaml_file:
    f = yaml.safe_load(yaml_file)

robot_name  = f['base_config']['robot_name']
sbc_version = f['base_config']['sbc_version']

Instantiate the Flask application and configure output (turn off debug output).

app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.disabled = True

Instantiate WebSocket functionality (for communication between the web client and the server), video-related functionality (real-time video, OpenCV), and robot action control (movement, lighting, gimbal control, obtaining chassis feedback, etc.).

socketio = SocketIO(app)
camera = Camera()
robot = RobotCtrlMiddleWare()

Network-related settings

net_interface = "wlan0"  # Set wireless network interface, onboard is wlan0, USB is other numbers
wifi_mode = "None"
# Store WIFI mode, this variable will be displayed on the OLED screen
eth0_ip = None  # IP address of the Ethernet port, will be displayed on the OLED screen
wlan_ip = None  # IP address of the wireless network (net_interface), will be displayed on the OLED screen

Storage path for audio files uploaded via drag-and-drop on the webpage.

UPLOAD_FOLDER = thisPath + '/sounds/others'

Variables used to store information from the upper computer (these variables will be updated by other functions during the main program execution).

pic_size = 0;
vid_size = 0;
cpu_read = 0;
cpu_temp = 0;
ram_read = 0;
rssi_read= 0;

A dictionary containing executable commands and their corresponding functions. Each command has a unique code stored in the config.yaml file. Here, we use a dictionary to select the command to be executed because there are too many commands to use a series of if-else statements, which would severely impact readability.

cmd_actions = {
    f['code']['min_res']: lambda: camera.set_video_resolution("240P"),
    f['code']['mid_res']: lambda: camera.set_video_resolution("480P"),
    f['code']['max_res']: lambda: camera.set_video_resolution("960P"),
    f['code']['zoom_x1']: lambda: camera.scale_frame(1),
    f['code']['zoom_x2']: lambda: camera.scale_frame(2),
    f['code']['zoom_x4']: lambda: camera.scale_frame(4),
    f['code']['pic_cap']: lambda: camera.capture_frame(thisPath + '/static/'),
    f['code']['vid_sta']: lambda: camera.record_video(1, thisPath + '/videos/'),
    f['code']['vid_end']: lambda: camera.record_video(0, thisPath + '/videos/'),
    f['code']['cv_none']: lambda: camera.set_cv_mode(f['code']['cv_none']),
    f['code']['cv_moti']: lambda: camera.set_cv_mode(f['code']['cv_moti']),
    f['code']['cv_face']: lambda: camera.set_cv_mode(f['code']['cv_face']),
    f['code']['cv_objs']: lambda: camera.set_cv_mode(f['code']['cv_objs']),
    f['code']['cv_clor']: lambda: camera.set_cv_mode(f['code']['cv_clor']),
    f['code']['cv_hand']: lambda: camera.set_cv_mode(f['code']['cv_hand']),
    f['code']['cv_auto']: lambda: camera.set_cv_mode(f['code']['cv_auto']),
    f['code']['mp_face']: lambda: camera.set_cv_mode(f['code']['mp_face']),
    f['code']['mp_pose']: lambda: camera.set_cv_mode(f['code']['mp_pose']),
    f['code']['re_none']: lambda: camera.set_detection_reaction(f['code']['re_none']),
    f['code']['re_capt']: lambda: camera.set_detection_reaction(f['code']['re_capt']),
    f['code']['re_reco']: lambda: camera.set_detection_reaction(f['code']['re_reco']),
    f['code']['mc_lock']: lambda: camera.set_movtion_lock(f['code']['mc_lock']),
    f['code']['mc_unlo']: lambda: camera.set_movtion_lock(f['code']['mc_unlo']),
    f['code']['led_off']: robot.set_led_mode_off,
    f['code']['led_aut']: robot.set_led_mode_auto,
    f['code']['led_ton']: robot.set_led_mode_on,
    f['code']['base_of']: robot.set_base_led_off,
    f['code']['base_on']: robot.set_base_led_on,
    f['code']['head_ct']: robot.head_led_ctrl,
    f['code']['base_ct']: robot.base_led_ctrl,
    f['code']['s_panid']: camera.set_pan_id,
    f['code']['release']: camera.release_torque,
    f['code']['set_mid']: camera.middle_set,
    f['code']['s_tilid']: camera.set_tilt_id
}

When the webpage loads, it also needs to request the config.yaml file from the server for configuring some information on the webpage, such as displaying the product name or unifying the codes corresponding to the instructions with the server. The webpage sends a request through the "/config" route, and the server returns the config.yaml file that the webpage client needs on this route.

@app.route('/config')
def get_config():
    with open(thisPath + '/config.yaml', 'r') as file:
        yaml_content = file.read()
    return yaml_content

Obtaining the WIFI signal strength, with the parameter being the name of the wireless network interface (a device may contain multiple wireless network interfaces).

def get_signal_strength(interface):
    try:
        output = subprocess.check_output(["/sbin/iwconfig", interface]).decode("utf-8")
        signal_strength = re.search(r"Signal level=(-\d+)", output)
        if signal_strength:
            return int(signal_strength.group(1))
        return 0
    except FileNotFoundError:
        print("iwconfig command not found. Please ensure it's installed and in your PATH.")
        return -1
    except subprocess.CalledProcessError as e:
        print(f"Error executing iwconfig: {e}")
        return -1
    except Exception as e:
        print(f"An error occurred: {e}")
        return -1

Obtaining the WIFI mode, determining whether WIFI is in AP (Access Point) or STA (Station) mode.

def get_wifi_mode():
    global wifi_mode
    try:
        result = subprocess.check_output(['/sbin/iwconfig', 'wlan0'], encoding='utf-8')

        if "Mode:Master" in result or "Mode:AP" in result:
            wifi_mode = "AP"
            return "AP"

        if "Mode:Managed" in result:
            wifi_mode = "STA"
            return "STA"

    except subprocess.CalledProcessError as e:
        print(f"Error checking Wi-Fi mode: {e}")
        return None

    return None

Obtaining the IP address, with the required parameter being the network interface name.

def get_ip_address(interface):
    try:
        interface_info = netifaces.ifaddresses(interface)

        ipv4_info = interface_info.get(netifaces.AF_INET, [{}])
        return ipv4_info[0].get('addr')
    except ValueError:
        print(f"Interface {interface} not found.")
        return None
    except IndexError:
        print(f"No IPv4 address assigned to {interface}.")
        return None

Obtaining CPU usage percentage. This function will block, with the blocking time being the interval parameter of cpu_percent().

def get_cpu_usage():
    return psutil.cpu_percent(interval=2)

Obtaining CPU temperature.

def get_cpu_temperature():
    try:
        temperature_str = os.popen('vcgencmd measure_temp').readline()
        temperature = float(temperature_str.replace("temp=", "").replace("'C\n", ""))
        return temperature
    except Exception as e:
        print("Error reading CPU temperature:", str(e))
        return None

Obtaining the usage percentage of the running memory (RAM).

def get_memory_usage():
    return psutil.virtual_memory().percent

This function integrates the above functions to obtain various information and assign them to corresponding variables, making it convenient for other parts of the program to access those variables.

def update_device_info():
    global pic_size, vid_size, cpu_read, ram_read, rssi_read, cpu_temp
    cpu_read = get_cpu_usage()
    cpu_temp = get_cpu_temperature()
    ram_read = get_memory_usage()
    rssi_read= get_signal_strength(net_interface)

This function is used to continuously obtain various feedback information, merge them, and then send them to the web client. It is executed in a separate thread generated by threading when the main program is started (when the client first establishes a connection). It will not affect the execution of the main program. This function retrieves information from camera.get_status() to obtain information about the chassis and other visual function-related information. It returns a merged JSON to the web client on the "/ctrl" route.
The frequency of this function is 10Hz, but the frequency of obtaining some information is not 10Hz. This frequency is the frequency of obtaining information in camera.get_status(). The frequency of obtaining other information such as folder size and CPU memory usage is much lower because those pieces of information consume more resources.

def update_data_websocket():
    while 1:
        try:
            fb_json = camera.get_status()
        except:
            continue
        socket_data = {
                    f['fb']['picture_size']:pic_size,
                    f['fb']['video_size']:  vid_size,
                    f['fb']['cpu_load']:    cpu_read,
                    f['fb']['cpu_temp']:    cpu_temp,
                    f['fb']['ram_usage']:   ram_read,
                    f['fb']['wifi_rssi']:   rssi_read
                    }
        try:
            socket_data.update(fb_json)
            socketio.emit('update', socket_data, namespace='/ctrl')
        except:
            pass
        time.sleep(0.1)

All the @app.route() decorators below are Flask application route function decorators. Routes are used to distinguish the types of requests sent by the client, and different functions are used to handle different types of requests.
This route is the main route. When a client connects (when someone accesses the IP:5000 page), a random audio file from the sounds/connected folder will be played, and the server will return the main control interface of the WEB application. The HTML file for the main control interface is index.html.

@app.route('/')
def index():
    """Video streaming home page."""
    robot.play_random_audio("connected", False)
    return render_template('index.html')

Routes used to send various files to the client: CSS, JavaScript, photos, and videos.

@app.route('/<path:filename>')
def serve_static(filename):
    return send_from_directory('templates', filename)


@app.route('/photo/<path:filename>')
def serve_static_photo(filename):
    return send_from_directory('templates', filename)


@app.route('/video/<path:filename>')
def serve_static_video(filename):
    return send_from_directory('templates', filename)

Route used to open the settings page.

@app.route('/settings/<path:filename>')
def serve_static_settings(filename):
    return send_from_directory('templates', filename)

Route used to return from the settings page to the homepage.

@app.route('/index')
def serve_static_home(filename):
    return redirect(url_for('index'))

Function used to obtain real-time video frames, sourced from the open-source project Flask Video Streaming.

def gen(cameraInput):
    """Video streaming generator function."""
    yield b'--frame\r\n'
    while True:
        frame = cameraInput.get_frame()
        yield b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n--frame\r\n'

Route used to display real-time video frames on the webpage.

@app.route('/video_feed')
def video_feed():
    """Video streaming route. Put this in the src attribute of an img tag."""
    return Response(gen(camera),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

Route used to obtain a list of photo names in the photo folder.

@app.route('/get_photo_names')
def get_photo_names():
    photo_files = sorted(os.listdir(thisPath + '/static'), key=lambda x: os.path.getmtime(os.path.join(thisPath + '/static', x)), reverse=True)
    return jsonify(photo_files)

Route used to send images to the webpage.

@app.route('/get_photo/<filename>')
def get_photo(filename):
    return send_from_directory(thisPath + '/static', filename)

Route used to delete images.

@app.route('/delete_photo', methods=['POST'])
def delete_photo():
    filename = request.form.get('filename')
    try:
        os.remove(os.path.join(thisPath + '/static', filename))
        return jsonify(success=True)
    except Exception as e:
        print(e)
        return jsonify(success=False)

The following functions perform similar functions but are used to manipulate videos.

@app.route('/delete_video', methods=['POST'])
def delete_video():
    filename = request.form.get('filename')
    try:
        os.remove(os.path.join(thisPath + '/videos', filename))
        return jsonify(success=True)
    except Exception as e:
        print(e)
        return jsonify(success=False)


@app.route('/get_video_names')
def get_video_names():
    video_files = sorted(
        [filename for filename in os.listdir(thisPath + '/videos/') if filename.endswith('.mp4')],
        key=lambda filename: os.path.getctime(os.path.join(thisPath + '/videos/', filename)),
        reverse=True
    )
    return jsonify(video_files)


@app.route('/videos/<path:filename>')
def videos(filename):
    return send_from_directory(thisPath + '/videos', filename)

Using a method that iterates through each file internally to obtain the size of a folder will result in significant resource consumption.

def get_folder_size(folder_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder_path):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            total_size += os.path.getsize(file_path)
    # Convert total_size to MB
    size_in_mb = total_size / (1024 * 1024)
    return round(size_in_mb,2)

WebSocket routes are used to receive JSON instructions from the client. Some instructions are high-frequency and require low latency, so WebSocket is used here instead of HTTP. WebSocket is connection-oriented, allowing multiple communications per connection, while HTTP is connectionless, requiring connection establishment-communication-connection destruction for each request. HTTP is not suitable for high-frequency low-latency communication (HTTP's advantage is simplicity).

@socketio.on('json', namespace='/json')
def handle_socket_json(json):
    try:
        robot.json_command_handler(json)
    except Exception as e:
        print("Error handling JSON data:", e)
        return

Function to update the information displayed on the OLED. This function is executed in a separate thread generated by threading during the main program execution. There is another function that feeds information to the webpage at a frequency of 10Hz. Some variables in that function are updated by this function. The information here does not require high real-time performance, so the frequency of information retrieval in this function is lower. (This function does not use time.sleep() for delay, but rather relies on get_cpu_usage() in update_device_info() to implement delay).

def oled_update():
    global eth0_ip, wlan_ip
    robot.base_oled(0, f"E: No Ethernet")
    robot.base_oled(1, f"W: NO {net_interface}")
    robot.base_oled(2, "F/J:5000/8888")
    get_wifi_mode()
    start_time = time.time()
    last_folder_check_time = 0

    while True:
        current_time = time.time()

        if current_time - last_folder_check_time > 600:
            pic_size = get_folder_size(thisPath + '/static')
            vid_size = get_folder_size(thisPath + '/videos')
            last_folder_check_time = current_time
        
        update_device_info() # the interval of this loop is set in here
        get_wifi_mode()

        if get_ip_address('eth0') != eth0_ip:
            eth0_ip = get_ip_address('eth0');
            if eth0_ip:
                robot.base_oled(0, f"E:{eth0_ip}")
            else:
                robot.base_oled(0, f"E: No Ethernet")

        if get_ip_address(net_interface) != wlan_ip:
            wlan_ip = get_ip_address(net_interface)
            if wlan_ip:
                robot.base_oled(1, f"W:{wlan_ip}")
            else:
                robot.base_oled(1, f"W: NO {net_interface}")

        elapsed_time = current_time - start_time
        hours = int(elapsed_time // 3600)
        minutes = int((elapsed_time % 3600) // 60)
        seconds = int(elapsed_time % 60)
        robot.base_oled(3, f"{wifi_mode} {hours:02d}:{minutes:02d}:{seconds:02d} {rssi_read}dBm")

This route is used to handle command-line information sent by the client.

@app.route('/send_command', methods=['POST'])
def handle_command():
    command = request.form['command']
    print("Received command:", command)
    # camera.info_update("CMD:" + command, (0,255,255), 0.36)
    camera.cmd_process(command)
    return jsonify({"status": "success", "message": "Command received"})

Route used to obtain a list of audio files stored in the sounds/other folder.

@app.route('/getAudioFiles', methods=['GET'])
def get_audio_files():
    files = [f for f in os.listdir(UPLOAD_FOLDER) if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))]
    return jsonify(files)

Route used to implement drag-and-drop upload functionality on the webpage. The uploaded audio files are saved in the sounds/other folder.

@app.route('/uploadAudio', methods=['POST'])
def upload_audio():
    if 'file' not in request.files:
        return jsonify({'error': 'No file part'})
    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No selected file'})
    if file:
        filename = secure_filename(file.filename)
        file.save(os.path.join(UPLOAD_FOLDER, filename))
        return jsonify({'success': 'File uploaded successfully'})

Route used to play audio files stored in the sounds/other folder.

@app.route('/playAudio', methods=['POST'])
def play_audio():
    audio_file = request.form['audio_file']
    print(thisPath + '/sounds/others/' + audio_file)
    robot.audio_play(thisPath + '/sounds/others/' + audio_file)
    return jsonify({'success': 'Audio is playing'})

Route used to stop playback.

@app.route('/stop_audio', methods=['POST'])
def audio_stop():
    robot.audio_stop()
    return jsonify({'success': 'Audio stop'})

This function is used to execute certain command-line instructions automatically at startup. It will be executed automatically when the main program starts. You can freely add more instructions. Here are some examples:

  • base -c {"T":142,"cmd":50}: Sets the feedback interval for the base. The default loop of the base has no delay. Adding a parameter of 50ms helps to improve the efficiency of the host controller (decoding the serial port information from the lower computer also consumes resources).
  • base -c {"T":131,"cmd":1}: Enables continuous feedback from the base, so that feedback information is automatically sent continuously instead of in a request-response manner.
  • base -c {"T":143,"cmd":0}: Disables echo, so the base does not echo the original information sent to it. This saves resources, especially when controlling the base with high-frequency instructions.
  • base -c {"T":4,"cmd":2}: Sets the type of peripheral for the base. 0 means no peripheral, 1 means a manipulator, and 2 means a gimbal.
  • base -c {"T":300,"mode":0,"mac":"EF:EF:EF:EF:EF:EF"}: Configures the base to not be controlled by ESP-NOW broadcast signals, but only by ESP-NOW instructions sent from the MAC address EF:EF:EF:EF:EF:EF. You can change this MAC address as needed.
  • send -a -b: Adds the broadcast address to the ESP-NOW peer list of the lower computer, facilitating inter-device communication functionalities.
def cmd_on_boot():
    cmd_list = [
        'base -c {"T":142,"cmd":50}',   # set feedback interval
        'base -c {"T":131,"cmd":1}',    # serial feedback flow on
        'base -c {"T":143,"cmd":0}',    # serial echo off
        'base -c {"T":4,"cmd":2}',      # select the module - 0:None 1:RoArm-M2-S 2:Gimbal
        'base -c {"T":300,"mode":0,"mac":"EF:EF:EF:EF:EF:EF"}',  # the base won't be ctrl by esp-now broadcast cmd, but it can still recv broadcast megs.
        'send -a -b'    # add broadcast mac addr to peer
    ]
    for i in range(0, len(cmd_list)):
        camera.cmd_process(cmd_list[i])

When the main program is running, it will perform the following tasks:

if __name__ == '__main__':
    # Randomly play an audio file from the sounds/robot_started folder
    robot.play_random_audio("robot_started", False)
    
    # Turn on the LED lights
    robot.set_led_mode_on()
    
    # Create a separate thread for update_data_websocket()
    date_update_thread = threading.Thread(target=update_data_websocket, daemon=True)
    date_update_thread.start()
    
    # Create another thread for update_data_websocket()
    oled_update_thread = threading.Thread(target=oled_update, daemon=True)
    oled_update_thread.start()
    
    # Get the size of the photo folder
    pic_size = get_folder_size(thisPath + '/static')
    
    # Get the size of the video folder
    vid_size = get_folder_size(thisPath + '/videos')
    
    # Turn off the LED lights
    robot.set_led_mode_off()
    
    # Execute commands on boot
    cmd_on_boot()
    
    # Start the Flask application server
    socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True)