Notice: Function _load_textdomain_just_in_time was called incorrectly. Translation loading for the simply-static domain was triggered too early. This is usually an indicator for some code in the plugin or theme running too early. Translations should be loaded at the init action or later. Please see Debugging in WordPress for more information. (This message was added in version 6.7.0.) in /var/www/html/wp-includes/functions.php on line 6121
树莓派上基于Python编写按键驱动程序

树莓派上基于Python编写按键驱动程序

import multiprocessing
import time
import RPi.GPIO as GPIO
import uinput

# 初始化输入GPIO引脚,将引脚拉低
GPIO.setmode(GPIO.BCM) # 使用BCM方式编号
up_pin = 5 # 可对照前文中管脚编号定义
down_pin = 6
left_pin = 13
right_pin = 19
left_button_pin = 20
right_button_pin = 12
GPIO.setup(up_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) # 将管脚均设置为输入上拉模式
GPIO.setup(down_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(left_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(right_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(left_button_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(right_button_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)

# 创建虚拟输入设备(鼠标)
device = uinput.Device([uinput.BTN_LEFT, uinput.BTN_RIGHT, uinput.REL_X, uinput.REL_Y])

XY_STEP = 1 # 用于调节光标的移动步长

# 定义一个通用四向摇杆按键操作的进程处理函数
def direction_key_process(queue, pin, direction):
    while True:
        GPIO.wait_for_edge(pin, GPIO.BOTH) # 该进程Pending等待四向摇杆按键发生动作
        while GPIO.input(pin) == direction: # 如果是有效的低电平,那么它将持续向队列写入管脚编号
            queue.put(pin)
            time.sleep(0.005) # 此延迟可调节光标灵敏度

# 定义一个通用处理左右按键操作的进程
def leftright_key_process(queue, pin):
    while True:
        GPIO.wait_for_edge(pin, GPIO.BOTH) # 该进程Pending等待左/右按键发生动作
        queue.put(pin) # 一旦有状态变化,则将按键编号写入队列

# 定义一个更新光标位置的进程处理函数
def update_position_device():
    while True:
        pin = position_queue.get() # 该进程Pending等待方向按键队列数据
        if pin == up_pin:
            device.emit(uinput.REL_Y, -XY_STEP) # 根据按键方向移动光标XY轴位置
        elif pin == down_pin:
            device.emit(uinput.REL_Y, XY_STEP)
        elif pin == left_pin:
            device.emit(uinput.REL_X, -XY_STEP)
        elif pin == right_pin:
            device.emit(uinput.REL_X, XY_STEP)

# 定义一个更新左右按钮状态的进程处理函数
def update_button_device():
    while True:
        pin = button_queue.get() # 该进程Pending等待左右按键队列数据
        if pin == left_button_pin:
            if GPIO.input(pin) == GPIO.LOW:
                device.emit(uinput.BTN_LEFT, 1) # 按键按下
            else:
                device.emit(uinput.BTN_LEFT, 0) # 按键释放
        elif pin == right_button_pin:
            if GPIO.input(pin) == GPIO.LOW:
                device.emit(uinput.BTN_RIGHT, 1)
            else:
                device.emit(uinput.BTN_RIGHT, 0)

# 为方向按键、左右建分别创建一个队列来保存按键操作的引脚
position_queue = multiprocessing.Queue()
button_queue = multiprocessing.Queue()

# 创建多个进程来处理按键和按钮事件(每个按钮一个进程,互不影响)
processes = [
    multiprocessing.Process(target=direction_key_process, args=(position_queue, up_pin, GPIO.LOW)),
    multiprocessing.Process(target=direction_key_process, args=(position_queue, down_pin, GPIO.LOW)),
    multiprocessing.Process(target=direction_key_process, args=(position_queue, left_pin, GPIO.LOW)),
    multiprocessing.Process(target=direction_key_process, args=(position_queue, right_pin, GPIO.LOW)),
    multiprocessing.Process(target=leftright_key_process, args=(button_queue, left_button_pin)),
    multiprocessing.Process(target=leftright_key_process, args=(button_queue, right_button_pin))
]

# 启动所有进程
for p in processes:
    p.daemon = True # 设置为守护进程,即在主进程结束时自动结束
    p.start()

# 启动两个进程来更新设备状态
update_position_process = multiprocessing.Process(target=update_position_device)
update_position_process.daemon = True # 设置为守护进程,即在主进程结束时自动结束
update_position_process.start()

update_button_process = multiprocessing.Process(target=update_button_device)
update_button_process.daemon = True # 设置为守护进程,即在主进程结束时自动结束
update_button_process.start()

# 等待所有进程结束
for p in processes:
    p.join()

update_position_process.join()
update_button_process.join()

如果运行错误,请执行如下指令

lsmod | grep uinput
sudo modprobe uinput

http://www.etrd.org/2023/05/14/%E6%A0%91%E8%8E%93%E6%B4%BE%E4%B8%8A%E5%9F%BA%E4%BA%8EPython%E7%BC%96%E5%86%99%E6%8C%89%E9%94%AE%E9%A9%B1%E5%8A%A8%E7%A8%8B%E5%BA%8F/

https://www.cnpython.com/qa/1003180

以下是正常代码

import RPi.GPIO as GPIO
import time
from sys import version_info
import uinput
device = uinput.Device([
    uinput.KEY_LEFTSHIFT,
    uinput.KEY_TAB,
    uinput.KEY_SPACE,
    uinput.KEY_LEFTALT
    ])

if version_info.major == 3:
    raw_input = input

# Set up pins
# Rotary A Pin
RoAPin = 17
# Rotary B Pin
RoBPin = 18
# Rotary Switch Pin
RoSPin = 27

def print_message():
    print ("========================================")
    print ("|            Rotary Encoder            |")
    print ("|    ------------------------------    |")
    print ("|        Pin A connect to GPIO17       |")
    print ("|        Pin B connect to GPIO18       |")
    print ("|     Button Pin connect to GPIO27     |")
    print ("|                                      |")
    print ("|         Use a Rotary Encoder         |")
    print ("|     Rotary to add/minus counter      |")
    print ("|      Press to set counter to 0       |")
    print ("|                                      |")
    print ("|                            SunFounder|")
    print ("========================================\n")
    print ("Program is running...")
    print ("Please press Ctrl+C to end the program...")
    #raw_input ("Press Enter to begin\n")

def setup():
    global counter
    global Last_RoB_Status, Current_RoB_Status
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(RoAPin, GPIO.IN)
    GPIO.setup(RoBPin, GPIO.IN)
    GPIO.setup(RoSPin,GPIO.IN, pull_up_down=GPIO.PUD_UP)
    # Set up a falling edge detect to callback clear
    GPIO.add_event_detect(RoSPin, GPIO.FALLING, callback=clear)

    # Set up a counter as a global variable
    counter = 0
    Last_RoB_Status = 0
    Current_RoB_Status = 0

# Define a function to deal with rotary encoder
def rotaryDeal():
    global counter
    global Last_RoB_Status, Current_RoB_Status

    flag = 0
    Last_RoB_Status = GPIO.input(RoBPin)
    # When RoAPin level changes
    while(not GPIO.input(RoAPin)):
        Current_RoB_Status = GPIO.input(RoBPin)
        flag = 1
    if flag == 1:
        # Reset flag
        flag = 0
        if (Last_RoB_Status == 0) and (Current_RoB_Status == 1):
            counter = counter + 1
#            time.sleep(0.5)
            device.emit_combo([
            #    uinput.KEY_LEFTALT,
#                uinput.KEY_LEFTALT,
                uinput.KEY_TAB,
                ])
        if (Last_RoB_Status == 1) and (Current_RoB_Status == 0):
            device.emit_combo([
                uinput.KEY_SPACE,
#                uinput.KEY_TAB,
                ])
            counter = counter - 1
        print ("counter = %d" % counter)

# Define a callback function on switch, to clean "counter"
def clear(ev=None):
    global counter
    counter = 0

def main():
    print_message()
    while True:
        rotaryDeal()

def destroy():
    # Release resource
    GPIO.cleanup()

# If run this script directly, do:
if __name__ == '__main__':
    setup()
    try:
        main()
    # When 'Ctrl+C' is pressed, the child program
    # destroy() will be  executed.
    except KeyboardInterrupt:
        destroy()

实现TAB/SHIFT+TAB键的前进后退

import RPi.GPIO as GPIO
import time
from sys import version_info
import uinput
device = uinput.Device([
    uinput.KEY_LEFTSHIFT,
    uinput.KEY_TAB,
    uinput.KEY_SPACE,
    uinput.KEY_LEFTALT
    ])

if version_info.major == 3:
    raw_input = input

# Set up pins
# Rotary A Pin
RoAPin = 17
# Rotary B Pin
RoBPin = 18
# Rotary Switch Pin
RoSPin = 27

def print_message():
    print ("========================================")
    print ("|            Rotary Encoder            |")
    print ("|    ------------------------------    |")
    print ("|        Pin A connect to GPIO17       |")
    print ("|        Pin B connect to GPIO18       |")
    print ("|     Button Pin connect to GPIO27     |")
    print ("|                                      |")
    print ("|         Use a Rotary Encoder         |")
    print ("|     Rotary to add/minus counter      |")
    print ("|      Press to set counter to 0       |")
    print ("|                                      |")
    print ("|                            SunFounder|")
    print ("========================================\n")
    print ("Program is running...")
    print ("Please press Ctrl+C to end the program...")
    #raw_input ("Press Enter to begin\n")

def setup():
    global counter
    global Last_RoB_Status, Current_RoB_Status
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(RoAPin, GPIO.IN)
    GPIO.setup(RoBPin, GPIO.IN)
    GPIO.setup(RoSPin,GPIO.IN, pull_up_down=GPIO.PUD_UP)
    # Set up a falling edge detect to callback clear
    GPIO.add_event_detect(RoSPin, GPIO.FALLING, callback=clear)

    # Set up a counter as a global variable
    counter = 0
    Last_RoB_Status = 0
    Current_RoB_Status = 0

# Define a function to deal with rotary encoder
def rotaryDeal():
    global counter
    global Last_RoB_Status, Current_RoB_Status

    flag = 0
    Last_RoB_Status = GPIO.input(RoBPin)
    # When RoAPin level changes
    while(not GPIO.input(RoAPin)):
        Current_RoB_Status = GPIO.input(RoBPin)
        flag = 1
    if flag == 1:
        # Reset flag
        flag = 0
        if (Last_RoB_Status == 0) and (Current_RoB_Status == 1):
            counter = counter + 1
#            time.sleep(0.5)
            device.emit_combo([
            #    uinput.KEY_LEFTALT,
#                uinput.KEY_LEFTALT,
                uinput.KEY_TAB,
                ])
        if (Last_RoB_Status == 1) and (Current_RoB_Status == 0):
            device.emit_combo([
                uinput.KEY_LEFTSHIFT,
                uinput.KEY_TAB,
                ])
            counter = counter - 1
#        print ("counter = %d" % counter)

# Define a callback function on switch, to clean "counter"
def clear(ev=None):
    global counter
    counter = 0

def main():
    print_message()
    while True:
        rotaryDeal()

def destroy():
    # Release resource
    GPIO.cleanup()

# If run this script directly, do:
if __name__ == '__main__':
    setup()
    try:
        main()
    # When 'Ctrl+C' is pressed, the child program
    # destroy() will be  executed.
    except KeyboardInterrupt:
        destroy()

一个正常运行的例子,可以移动光标并按下确定按钮

import multiprocessing
import time
import RPi.GPIO as GPIO
import uinput

# 初始化输入GPIO引脚,将引脚拉低
GPIO.setmode(GPIO.BCM) # 使用BCM方式编号
up_pin = 5 # 可对照前文中管脚编号定义
down_pin = 6
left_pin = 13
right_pin = 19
left_button_pin = 20
right_button_pin = 12
GPIO.setup(up_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) # 将管脚均设置为输入上拉模式
GPIO.setup(down_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(left_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(right_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(left_button_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(right_button_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)

# 创建虚拟输入设备(鼠标)
device = uinput.Device([uinput.BTN_LEFT, uinput.BTN_RIGHT, uinput.REL_X, uinput.REL_Y])

XY_STEP = 1 # 用于调节光标的移动步长

# 定义一个通用四向摇杆按键操作的进程处理函数
def direction_key_process(queue, pin, direction):
    while True:
        GPIO.wait_for_edge(pin, GPIO.BOTH) # 该进程Pending等待四向摇杆按键发生动作
        while GPIO.input(pin) == direction: # 如果是有效的低电平,那么它将持续向队列写入管脚编号
            queue.put(pin)
            time.sleep(0.005) # 此延迟可调节光标灵敏度

# 定义一个通用处理左右按键操作的进程
def leftright_key_process(queue, pin):
    while True:
        GPIO.wait_for_edge(pin, GPIO.BOTH) # 该进程Pending等待左/右按键发生动作
        queue.put(pin) # 一旦有状态变化,则将按键编号写入队列

# 定义一个更新光标位置的进程处理函数
def update_position_device():
    while True:
        pin = position_queue.get() # 该进程Pending等待方向按键队列数据
        if pin == up_pin:
            device.emit(uinput.REL_Y, -XY_STEP) # 根据按键方向移动光标XY轴位置
        elif pin == down_pin:
            device.emit(uinput.REL_Y, XY_STEP)
        elif pin == left_pin:
            device.emit(uinput.REL_X, -XY_STEP)
        elif pin == right_pin:
            device.emit(uinput.REL_X, XY_STEP)

# 定义一个更新左右按钮状态的进程处理函数
def update_button_device():
    while True:
        pin = button_queue.get() # 该进程Pending等待左右按键队列数据
        if pin == left_button_pin:
            if GPIO.input(pin) == GPIO.LOW:
                device.emit(uinput.BTN_LEFT, 1) # 按键按下
            else:
                device.emit(uinput.BTN_LEFT, 0) # 按键释放
        elif pin == right_button_pin:
            if GPIO.input(pin) == GPIO.LOW:
                device.emit(uinput.BTN_RIGHT, 1)
            else:
                device.emit(uinput.BTN_RIGHT, 0)

# 为方向按键、左右建分别创建一个队列来保存按键操作的引脚
position_queue = multiprocessing.Queue()
button_queue = multiprocessing.Queue()

# 创建多个进程来处理按键和按钮事件(每个按钮一个进程,互不影响)
processes = [
    multiprocessing.Process(target=direction_key_process, args=(position_queue, up_pin, GPIO.LOW)),
    multiprocessing.Process(target=direction_key_process, args=(position_queue, down_pin, GPIO.LOW)),
    multiprocessing.Process(target=direction_key_process, args=(position_queue, left_pin, GPIO.LOW)),
    multiprocessing.Process(target=direction_key_process, args=(position_queue, right_pin, GPIO.LOW)),
    multiprocessing.Process(target=leftright_key_process, args=(button_queue, left_button_pin)),
    multiprocessing.Process(target=leftright_key_process, args=(button_queue, right_button_pin))
]

# 启动所有进程
for p in processes:
    p.daemon = True # 设置为守护进程,即在主进程结束时自动结束
    p.start()

# 启动两个进程来更新设备状态
update_position_process = multiprocessing.Process(target=update_position_device)
update_position_process.daemon = True # 设置为守护进程,即在主进程结束时自动结束
update_position_process.start()

update_button_process = multiprocessing.Process(target=update_button_device)
update_button_process.daemon = True # 设置为守护进程,即在主进程结束时自动结束
update_button_process.start()

# 等待所有进程结束
for p in processes:
    p.join()

update_position_process.join()
update_button_process.join()

主要代码都做了注释解释,其中关键几点是:

  • 为每个按键都创建了一个进程(非线程),用于检测按键是否有动作;同时还为方向、左右键的动作更新到uinput分别创建了两个进程;使用了两个队列,来建立按键检测进程和uinput更新进程之间的数据通信。
  • 所有进程都要配置为守护进程,使得主进程结束时,可以将所有进程同步结束,否则可能出现Python脚本已退出,但创建的进程还在运行的情况。
  • 所有进程都使用Pending等待的方式等待事件发生(按键检测进程等待IO边沿发生,更新设备状态进程等待队列有数据),才执行代码,以便在无按键动作时尽可能不占用CPU资源

配置开启自启动

若开机自动运行该脚本,则可配置开机自启动。

  1. 将脚本拷贝到/usr目录下在/usr下建立一个pyscript目录,专门存放python脚本,以后调试好的python脚本都放这里1
    cp keydriver_multiprocess.py /usr/pyscript/
  2. 编辑 /etc/rc.local 文件1
    sudo nano /etc/rc.local
    在 exit 0 前添加执行语句1
    2
    3
    4
    5
    6
    ...
    ...

    /usr/bin/python3 /usr/pyscript/keydriver_multiprocess.py

    exit 0
可正常运行的旋转编码器版本
#!/usr/bin/env python
import RPi.GPIO as GPIO
import uinput
device = uinput.Device([
    uinput.KEY_LEFTSHIFT,
    uinput.KEY_TAB,
    uinput.KEY_SPACE
    ])

RoAPin = 17    # CLK Pin
RoBPin = 18    # DT Pin
BtnPin = 27    # Button Pin

globalCounter = 0

flag = 0
Last_RoB_Status = 0
Current_RoB_Status = 0

def setup():
    GPIO.setmode(GPIO.BCM)       # Numbers GPIOs by physical location
    GPIO.setup(RoAPin, GPIO.IN)    # input mode
    GPIO.setup(RoBPin, GPIO.IN)
    GPIO.setup(BtnPin, GPIO.IN, pull_up_down=GPIO.PUD_UP)

def rotaryDeal():
    global flag
    global Last_RoB_Status
    global Current_RoB_Status
    global globalCounter
    Last_RoB_Status = GPIO.input(RoBPin)  
    while(not GPIO.input(RoAPin)):      #未旋转时,GPIO.input(RoAPin)值为1,旋转时会变为0
        Current_RoB_Status = GPIO.input(RoBPin)  #旋转时的当前值
        flag = 1
    if flag == 1:
        flag = 0
        if (Last_RoB_Status == 1) and (Current_RoB_Status == 0):
            globalCounter = globalCounter + 1  #顺时针旋转,角位移增大
            device.emit_combo([uinput.KEY_LEFTSHIFT,uinput.KEY_TAB,])
        if (Last_RoB_Status == 0) and (Current_RoB_Status == 1):
            globalCounter = globalCounter - 1  #逆时针旋转,数值减小
            device.emit_combo([uinput.KEY_TAB,])

def btnISR(channel):
    global globalCounter
    globalCounter = 0
    device.emit_combo([uinput.KEY_SPACE,])

def loop():
    global globalCounter
    tmp = 0 # Rotary Temperary
    GPIO.add_event_detect(BtnPin, GPIO.FALLING, callback=btnISR)
    #当按下按钮时,调用回调函数btnISR
    while True:
        rotaryDeal()
        if tmp != globalCounter:
            print('globalCounter = %d' % globalCounter)
            tmp = globalCounter

def destroy():
    GPIO.cleanup()             # Release resource

if __name__ == '__main__':     # Program start from here
    setup()
    try:
        loop()
    except KeyboardInterrupt:  # When 'Ctrl+C' is pressed, the child program destroy() will be  executed.
        destroy()


sudo nano /etc/rc.local

# rc.local
#
# This script is executed at the end of each multiuser runlevel.
# Make sure that the script will "exit 0" on success or any other
# value on error.
#
# In order to enable or disable this script just change the execution
# bits.
#
# By default this script does nothing.

# Print the IP address
_IP=$(hostname -I) || true
if [ "$_IP" ]; then
  printf "My IP address is %s\n" "$_IP"
fi
sudo modprobe uinput
sudo /usr/bin/python3 /home/jack/Downloads/r_gpio_ok_over.py &
exit 0

旋转编码器定稿程序(含正反旋转、空格按键),开机启动参见上篇

#!/usr/bin/env python
import RPi.GPIO as GPIO
import uinput
device = uinput.Device([
    uinput.KEY_LEFTSHIFT,
    uinput.KEY_TAB,
    uinput.KEY_SPACE
    ])

RoAPin = 17    # CLK Pin
RoBPin = 18    # DT Pin
BtnPin = 27    # Button Pin

globalCounter = 0
flag = 0
BtnFlag = 0
Last_RoB_Status = 0
Current_RoB_Status = 0
Current_Btn_Status = 0

def setup():
    GPIO.setmode(GPIO.BCM)       # Numbers GPIOs by physical location
    GPIO.setup(RoAPin, GPIO.IN)    # input mode
    GPIO.setup(RoBPin, GPIO.IN)
    GPIO.setup(BtnPin, GPIO.IN, pull_up_down=GPIO.PUD_UP)

def rotaryDeal():
    global flag
    global Last_RoB_Status
    global Current_RoB_Status
    global globalCounter
    global BtnFlag
    global Current_Btn_Status
    Last_RoB_Status = GPIO.input(RoBPin)
    while(not GPIO.input(RoAPin)):      #未旋转时,GPIO.input(RoAPin)值为1,旋转时会变为0
        Current_RoB_Status = GPIO.input(RoBPin)  #旋转时的当前值
        flag = 1
    if flag == 1:
        flag = 0
        if (Last_RoB_Status == 1) and (Current_RoB_Status == 0):
            globalCounter = globalCounter + 1  #顺时针旋转,角位移增大
            device.emit_combo([uinput.KEY_LEFTSHIFT,uinput.KEY_TAB,])
        if (Last_RoB_Status == 0) and (Current_RoB_Status == 1):
            globalCounter = globalCounter - 1  #逆时针旋转,数值减小
            device.emit_combo([uinput.KEY_TAB,])
    while(not GPIO.input(BtnPin)):      #未按下按钮时,GPIO.input(BtnPin)值为1,按下时会变为0
        Current_Btn_Status = GPIO.input(BtnPin)  #按下按钮时的当前值
        BtnFlag = 1
    if BtnFlag == 1:
        BtnFlag = 0
        device.emit_combo([uinput.KEY_SPACE])

def btnISR(channel):
    global globalCounter
    globalCounter = 0

def loop():
    global globalCounter
    tmp = 0 # Rotary Temperary
    GPIO.add_event_detect(BtnPin, GPIO.FALLING, callback=btnISR)
    #当按下按钮时,调用回调函数btnISR
    while True:
        rotaryDeal()
        if tmp != globalCounter:
            print('globalCounter = %d' % globalCounter)
            tmp = globalCounter

def destroy():
    GPIO.cleanup()             # Release resource

if __name__ == '__main__':     # Program start from here
    setup()
    try:
        loop()
    except KeyboardInterrupt:  # When 'Ctrl+C' is pressed, the child program destroy() will be  executed.
        destroy()

https://mp.weixin.qq.com/s/ouzEOft6yZ4l0St4kO_79g