From b9aa0ee5f3b9079d9feee3113c17bf3734259a85 Mon Sep 17 00:00:00 2001 From: Surferlul Date: Mon, 25 Jul 2022 13:52:55 +0200 Subject: [PATCH] fixed keyboard --- remote_evdev/config/__init__.py | 17 ++++- remote_evdev/stream/guest.py | 130 ++++++++++++++++++++++++++++---- remote_evdev/stream/host.py | 101 +++++++++++-------------- 3 files changed, 176 insertions(+), 72 deletions(-) diff --git a/remote_evdev/config/__init__.py b/remote_evdev/config/__init__.py index 39d111d..6634459 100644 --- a/remote_evdev/config/__init__.py +++ b/remote_evdev/config/__init__.py @@ -1,12 +1,20 @@ from dataclasses import dataclass import sys from os import path +from enum import Enum + + +class DeviceType(Enum): + KEYBOARD = 0 + POINTER = 1 + TOUCH = 2 + OTHER = 3 @dataclass class DeviceInfo: path: str - device_type: str + device_type: DeviceType @dataclass @@ -57,8 +65,8 @@ def get_config() -> tuple[NetConfig, list[DeviceInfo]]: case "--event": value = "/dev/input/" case "--full-path": pass case "--type": - match args[n+1]: - case "pointer" | "keyboard": key = "type" + match args[n+1].upper(): + case "POINTER" | "KEYBOARD" | "TOUCH" | "OTHER": key = "type" case _: raise ValueError(f"Invalid device type {args[n+1]}") n += 1 match key: @@ -66,7 +74,8 @@ def get_config() -> tuple[NetConfig, list[DeviceInfo]]: value = f"{value}{args[n]}" if path.exists(value): devices_info[-1].path = value - case "type": devices_info[-1].device_type = args[n] + case "type": + devices_info[-1].device_type = getattr(DeviceType, args[n].upper()) n += 1 if cfg.ip_address == "auto": diff --git a/remote_evdev/stream/guest.py b/remote_evdev/stream/guest.py index fe88f43..53bb6a1 100644 --- a/remote_evdev/stream/guest.py +++ b/remote_evdev/stream/guest.py @@ -2,15 +2,101 @@ from evdev import UInput from socket import socket import json from typing import Generator +from ..config import DeviceType + + +class EventSequence: + def __init__(self, name: str = "", sequence: list[tuple[int, int, int]] = None): + if sequence is None: + sequence = [] + self.name = name + self.sequence = sequence + self.curr = {} + + def add(self, event_type: int, event_code: int, event_value: int): + self.sequence.append((event_type, event_code, event_value)) + + def next(self, fd: int, event_type: int, event_code: int, event_value: int): + if len(self.sequence) == 0: + return True + if fd not in self.curr: + self.curr[fd] = 0 + if self.sequence[self.curr[fd]] == (event_type, event_code, event_value): + self.curr[fd] += 1 + if self.curr[fd] == len(self.sequence): + self.curr[fd] = 0 + return True + else: + self.curr[fd] = 0 + return False + + def reset(self): + self.curr = {} + + +class SequenceCollection: + def __init__(self, name: str = "", sequences: list[EventSequence] = None): + if sequences is None: + sequences = [] + self.name = name + self.sequences = sequences + + def add(self, sequence: EventSequence): + self.sequences.append(sequence) + + def next(self, *args): + return sum([sequence.next(*args) for sequence in self.sequences]) + + def reset(self): + for sequence in self.sequences: + sequence.reset() + + +def get_double_control_sequences() -> SequenceCollection: + return SequenceCollection("doublecontrol", [ + EventSequence("rllr", [ + (4, 4, 29), + (1, 29, 1), + (0, 0, 0), + (4, 4, 29), + (0, 0, 0), + (4, 4, 157) + ]), + EventSequence("lrrl", [ + (4, 4, 157), + (1, 97, 1), + (0, 0, 0), + (4, 4, 157), + (0, 0, 0), + (4, 4, 29) + ]), + EventSequence("rlrl", [ + (4, 4, 29), + (1, 29, 1), + (0, 0, 0), + (4, 4, 157), + (0, 0, 0), + (4, 4, 29) + ]), + EventSequence("lrlr", [ + (4, 4, 157), + (1, 97, 1), + (0, 0, 0), + (4, 4, 29), + (0, 0, 0), + (4, 4, 157) + ]) + ]) def receive_devices(s: socket) -> Generator[tuple[int, UInput], None, None]: while buf_size := int.from_bytes(s.recv(4), byteorder='big'): fd = int.from_bytes(s.recv(4), byteorder='big') + device_type = DeviceType(int.from_bytes(s.recv(1), byteorder='big')) cap = json.loads(s.recv(buf_size).decode()) cap = {int(key): cap[key] for key in cap} del cap[0] - yield fd, UInput(cap, name=f"web-evdev-device-fd{fd}") + yield fd, UInput(cap, name=f"web-evdev-device-fd{fd}"), device_type def receive_event(s: socket) -> tuple[int, tuple[int, int, int]]: @@ -21,28 +107,46 @@ def receive_event(s: socket) -> tuple[int, tuple[int, int, int]]: return fd, (event_type, code, value) -def handle_client(s: socket): - d = {} - for fd, device in receive_devices(s): - d[fd] = device +def reset_control(d: dict[int: tuple[UInput, DeviceType]]): + print("trying to reset controls") + for fd in d: + if d[fd][1] != DeviceType.KEYBOARD: + continue + print(f"resetting control keys on keyboard {fd}") + for key in [29, 97]: + d[fd][0].write(1, key, 0) + d[fd][0].syn() - active_keys = {} + +def handle_client(s: socket): + control_sequences = get_double_control_sequences() + d = {} + for fd, device, device_type in receive_devices(s): + d[fd] = (device, device_type) + + got_control = False while True: fd, event = receive_event(s) - if event[0] == 1: - active_keys[event[1]] = event[2] - print(i for i in active_keys if active_keys[i]) if fd == 4294967295: s.close() for fd in d: - d[fd].close() + d[fd][0].close() print("Exiting") exit() elif fd == 4278124286: + got_control = True + control_sequences.reset() print("Control was given") elif fd == 4261281277: + reset_control(d) print("Control was taken") - elif event[0] == 0: - d[fd].syn() else: - d[fd].write(*event) \ No newline at end of file + if not got_control: + if event[0] == 0: + d[fd][0].syn() + else: + d[fd][0].write(*event) + else: + if d[fd][1] == DeviceType.KEYBOARD: + if control_sequences.next(fd, *event): + got_control = False diff --git a/remote_evdev/stream/host.py b/remote_evdev/stream/host.py index 1b42f53..32fa56f 100644 --- a/remote_evdev/stream/host.py +++ b/remote_evdev/stream/host.py @@ -4,13 +4,14 @@ from select import select import json from time import time -from ..config import DeviceInfo +from ..config import DeviceInfo, DeviceType -def send_device(s: socket, device: InputDevice): +def send_device(s: socket, device: InputDevice, device_type: DeviceType): buf = json.dumps(device.capabilities()).encode() s.sendall(len(buf).to_bytes(4, byteorder='big')) s.sendall(device.fd.to_bytes(4, byteorder='big')) + s.sendall(device_type.value.to_bytes(1, byteorder='big')) s.sendall(buf) @@ -31,57 +32,9 @@ def exit_nice(s: socket, d: dict[int: InputDevice], guest_input: bool): exit() -def double_control(s: socket, - d: dict[int: InputDevice], - active_keys: list[int], - guest_input: bool, - ctrl2: bool, - device_fd: int, - event_value: int) -> bool: - if 14 in active_keys: - exit_nice(s, d, guest_input) - elif not ctrl2: - guest_input = not guest_input - event = ({29, 157} ^ {event_value}).pop() - send_input_event(s, device_fd, InputEvent(0, 0, 4, 4, event)) - if guest_input: - send_input_event(s, device_fd, InputEvent( - 0, 0, 1, - event if event == 29 else 97, - 1, - )) - send_input_event(s, device_fd, InputEvent(0, 0, 0, 0, 0)) - print("Giving up control") - s.sendall(b'\xfe' * 20) - for fd in d: - d[fd].grab() - else: - send_input_event(s, device_fd, InputEvent( - 0, 0, 1, - event if event == 29 else 97, - 0, - )) - send_input_event(s, device_fd, InputEvent(0, 0, 0, 0, 0)) - print("Taking control") - s.sendall(b'\xfd' * 20) - for fd in d: - d[fd].ungrab() - return guest_input - - -def handle_client(s: socket, devices_info: list[DeviceInfo]): - - ctrl2 = False - guest_input = True - d = [] - device_types = {} - for device_info in devices_info: - d.append(InputDevice(device_info.path)) - device_types[device_info.path] = device_info.device_type - send_device(s, d[-1]) - s.sendall((0).to_bytes(4, byteorder='big')) - d = {device.fd: device for device in d} - device_types = {fd: device_types[d[fd].path] for fd in d} +def give_control(s: socket, d: dict[int: InputDevice]): + s.sendall(b'\xfe' * 20) + print("Giving up control") prnt = time() while True: for fd in d: @@ -95,14 +48,52 @@ def handle_client(s: socket, devices_info: list[DeviceInfo]): continue for fd in d: d[fd].grab() + + +def take_control(s: socket, d: dict[int: InputDevice]): + print("Taking control") + s.sendall(b'\xfd' * 20) + for fd in d: + d[fd].ungrab() + + +def double_control(s: socket, + d: dict[int: InputDevice], + active_keys: list[int], + guest_input: bool, + ctrl2: bool) -> bool: + if 14 in active_keys: + exit_nice(s, d, guest_input) + elif not ctrl2: + guest_input = not guest_input + if guest_input: + give_control(s, d) + else: + take_control(s, d) + return guest_input + + +def handle_client(s: socket, devices_info: list[DeviceInfo]): + ctrl2 = False + guest_input = True + d = [] + device_types = {} + for device_info in devices_info: + device = InputDevice(device_info.path) + d.append(device) + device_types[device.fd] = device_info.device_type + send_device(s, device, device_info.device_type) + s.sendall((0).to_bytes(4, byteorder='big')) + d = {device.fd: device for device in d} + give_control(s, d) while True: r, w, x = select(d, [], []) for fd in r: events = [event for event in d[fd].read()] - if device_types[fd] == "keyboard": + if device_types[fd] == DeviceType.KEYBOARD: active_keys = d[fd].active_keys() if 29 in active_keys and 97 in active_keys: - guest_input = double_control(s, d, active_keys, guest_input, ctrl2, fd, events[0].value) + guest_input = double_control(s, d, active_keys, guest_input, ctrl2) ctrl2 = True else: ctrl2 = False