2022-07-20 04:41:09 +00:00
|
|
|
from socket import socket
|
|
|
|
from evdev import InputDevice, InputEvent
|
|
|
|
from select import select
|
|
|
|
import json
|
2022-07-21 21:45:30 +00:00
|
|
|
from time import time
|
2022-07-20 04:41:09 +00:00
|
|
|
|
2022-07-25 11:52:55 +00:00
|
|
|
from ..config import DeviceInfo, DeviceType
|
2022-07-20 04:41:09 +00:00
|
|
|
|
|
|
|
|
2022-07-25 11:52:55 +00:00
|
|
|
def send_device(s: socket, device: InputDevice, device_type: DeviceType):
|
2022-07-20 04:41:09 +00:00
|
|
|
buf = json.dumps(device.capabilities()).encode()
|
|
|
|
s.sendall(len(buf).to_bytes(4, byteorder='big'))
|
|
|
|
s.sendall(device.fd.to_bytes(4, byteorder='big'))
|
2022-07-25 11:52:55 +00:00
|
|
|
s.sendall(device_type.value.to_bytes(1, byteorder='big'))
|
2022-07-20 04:41:09 +00:00
|
|
|
s.sendall(buf)
|
|
|
|
|
|
|
|
|
|
|
|
def send_input_event(s: socket, fd: int, input_event: InputEvent):
|
|
|
|
s.sendall(fd.to_bytes(4, byteorder='big'))
|
|
|
|
s.sendall(input_event.type.to_bytes(4, byteorder='big'))
|
|
|
|
s.sendall(input_event.code.to_bytes(4, byteorder='big'))
|
|
|
|
s.sendall(input_event.value.to_bytes(8, byteorder='big', signed=True))
|
|
|
|
|
|
|
|
|
2022-07-21 21:45:30 +00:00
|
|
|
def exit_nice(s: socket, d: dict[int: InputDevice], guest_input: bool):
|
|
|
|
s.sendall(b'\xff' * 20)
|
|
|
|
if guest_input:
|
|
|
|
for fd in d:
|
|
|
|
d[fd].ungrab()
|
|
|
|
s.close()
|
|
|
|
print("Exiting")
|
|
|
|
exit()
|
|
|
|
|
|
|
|
|
2022-07-25 11:52:55 +00:00
|
|
|
def give_control(s: socket, d: dict[int: InputDevice]):
|
|
|
|
s.sendall(b'\xfe' * 20)
|
|
|
|
print("Giving up control")
|
|
|
|
prnt = time()
|
|
|
|
while True:
|
|
|
|
for fd in d:
|
|
|
|
if d[fd].active_keys():
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
if time() > prnt:
|
|
|
|
print("Let go of all keys!")
|
|
|
|
prnt += 1
|
|
|
|
continue
|
|
|
|
for fd in d:
|
|
|
|
d[fd].grab()
|
2022-07-25 11:55:55 +00:00
|
|
|
print("Ready")
|
2022-07-25 11:52:55 +00:00
|
|
|
|
|
|
|
|
|
|
|
def take_control(s: socket, d: dict[int: InputDevice]):
|
|
|
|
print("Taking control")
|
|
|
|
s.sendall(b'\xfd' * 20)
|
|
|
|
for fd in d:
|
|
|
|
d[fd].ungrab()
|
|
|
|
|
|
|
|
|
2022-07-21 21:45:30 +00:00
|
|
|
def double_control(s: socket,
|
|
|
|
d: dict[int: InputDevice],
|
|
|
|
active_keys: list[int],
|
|
|
|
guest_input: bool,
|
2022-07-25 11:52:55 +00:00
|
|
|
ctrl2: bool) -> bool:
|
2022-07-21 21:45:30 +00:00
|
|
|
if 14 in active_keys:
|
|
|
|
exit_nice(s, d, guest_input)
|
|
|
|
elif not ctrl2:
|
|
|
|
guest_input = not guest_input
|
|
|
|
if guest_input:
|
2022-07-25 11:52:55 +00:00
|
|
|
give_control(s, d)
|
2022-07-21 21:45:30 +00:00
|
|
|
else:
|
2022-07-25 11:52:55 +00:00
|
|
|
take_control(s, d)
|
2022-07-21 21:45:30 +00:00
|
|
|
return guest_input
|
|
|
|
|
|
|
|
|
2022-07-20 04:41:09 +00:00
|
|
|
def handle_client(s: socket, devices_info: list[DeviceInfo]):
|
2022-07-21 21:45:30 +00:00
|
|
|
ctrl2 = False
|
|
|
|
guest_input = True
|
|
|
|
d = []
|
|
|
|
device_types = {}
|
2022-07-20 04:41:09 +00:00
|
|
|
for device_info in devices_info:
|
2022-07-25 11:52:55 +00:00
|
|
|
device = InputDevice(device_info.path)
|
|
|
|
d.append(device)
|
|
|
|
device_types[device.fd] = device_info.device_type
|
|
|
|
send_device(s, device, device_info.device_type)
|
2022-07-20 04:41:09 +00:00
|
|
|
s.sendall((0).to_bytes(4, byteorder='big'))
|
2022-07-21 21:45:30 +00:00
|
|
|
d = {device.fd: device for device in d}
|
2022-07-25 11:52:55 +00:00
|
|
|
give_control(s, d)
|
2022-07-20 04:41:09 +00:00
|
|
|
while True:
|
2022-07-21 21:45:30 +00:00
|
|
|
r, w, x = select(d, [], [])
|
2022-07-20 04:41:09 +00:00
|
|
|
for fd in r:
|
2022-07-21 21:45:30 +00:00
|
|
|
events = [event for event in d[fd].read()]
|
2022-07-25 11:52:55 +00:00
|
|
|
if device_types[fd] == DeviceType.KEYBOARD:
|
2022-07-21 21:45:30 +00:00
|
|
|
active_keys = d[fd].active_keys()
|
|
|
|
if 29 in active_keys and 97 in active_keys:
|
2022-07-25 11:52:55 +00:00
|
|
|
guest_input = double_control(s, d, active_keys, guest_input, ctrl2)
|
2022-07-21 21:45:30 +00:00
|
|
|
ctrl2 = True
|
|
|
|
else:
|
|
|
|
ctrl2 = False
|
|
|
|
if guest_input:
|
|
|
|
for event in events:
|
|
|
|
send_input_event(s, fd, event)
|