参考

事前準備

必要なライブラリ(aiohttp, Adafruit_PCA9685)をインストールしておく。

1
2
% pipenv install aiohttp
% pipenv install Adafruit_PCA9685

WebSocketでキー入力情報をサーバに送信

index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!DOCTYPE html>
<html>
  <head>
    <title>WebSocketによるキー入力送信のサンプル</title>
    <meta charset="UTF-8">
    <script src="/static/controller.js"></script>
  </head>

  <body id="mainBox">
    <h1>WebSocketによるキー入力送信のサンプル</h1>
    <div id="messageBox">
    </div>
  </body>
</html>

/static/controller.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
socket = null;

function connectWS(event){
    socket = new WebSocket('ws://' + location.hostname + ':8080/controller');
    socket.onclose = reconnect;
    socket.onmessage = receiveMessage;
}

function receiveMessage(event){
    console.log(event);
    let msgBox = document.getElementById('messageBox');
    msgBox.innerHTML = event.data;
}

function reconnect(event){
    setTimeout(() => {connectWS();}, 5000);
}

function keyboardEvent(event){
    let data = {
        'type':    event.type,
        'keyCode': event.keyCode,
        'key':     event.key,
        'code':    event.code,
        'altkey':  event.altkey,
        'ctrlkey': event.ctrlkey,
        'metakey': event.metakey,
        'repeat':  event.repeat
    }
    console.log(data);
    socket.send(JSON.stringify(data));
    event.stopPropagation();
    event.preventDefault();
    return false;
}

function initHID(){
    let inputBox = document.getElementById('mainBox');
    inputBox.focus();
    ['keydown', 'keyup', 'keypress'].forEach((type)=>{
        inputBox.addEventListener(type, keyboardEvent);
    });
}

window.onload = ()=>{
    initHID();
    connectWS();
};

サーバ側でキー入力情報を受け取る

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from aiohttp import web
import json

async def index(request):
    return web.FileResponse('index.html')

async def wscontroller(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.WSMsgType.text:
            req = json.loads(msg.data)
            print(req)
            await ws.send_str(json.dumps(req))
        elif msg.type == web.WSMsgType.binary:
            print("Unknown binary data received.")
            break
        elif msg.type == web.WSMsgType.close:
            break

    return ws

app = web.Application()
app.add_routes([web.get('/', index),
                web.get('/controller', wscontroller),
                web.static('/static', 'static'),
                ])
if __name__ == '__main__':
    web.run_app(app)

サーボモータに反映する

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from aiohttp import web
import json
from servocontroller import ServoController

svc = ServoController()
x = 0.0
y = 0.0

def controller(cmd):
    global x
    global y
    if cmd['type'] == 'keydown':
        if cmd['key'] == 'ArrowUp':
            y += 0.01
        elif cmd['key'] == 'ArrowDown':
            y -= 0.01
        elif cmd['key'] == 'ArrowLeft':
            x -= 0.01
        elif cmd['key'] == 'ArrowRight':
            x += 0.01
        if x >= 1.0:
            x = 1.0
        elif x <= -1.0:
            x = -1.0
        if y >= 1.0:
            y = 1.0
        elif y <= -1.0:
            y = 1.0
        svc.set_pos(x, y)

async def index(request):
    return web.FileResponse('index.html')

async def dash(request):
    return web.FileResponse('dash.html')

async def wscontroller(request):
    global x
    global y
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.WSMsgType.text:
            req = json.loads(msg.data)
            print(req)
            controller(req)
            req["pos_x"] = x
            req["pos_y"] = y
            await ws.send_str(json.dumps(req))
        elif msg.type == web.WSMsgType.binary:
            print("Unknown binary data received.")
            break
        elif msg.type == web.WSMsgType.close:
            break

    return ws

app = web.Application()
app.add_routes([web.get('/', index),
                web.get('/controller', wscontroller),
                web.static('/static', 'static'),
                ])
if __name__ == '__main__':
    web.run_app(app)

servocontroller.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#-*- coding: utf-8 -*-

import Adafruit_PCA9685

class ServoController:
  def __init__(self, horizontal_id=12, vertical_id=13):
    self.pwm = Adafruit_PCA9685.PCA9685()
    self.pwm.set_pwm_freq(50)
    self.horizontal_value = 0.0
    self.vertical_value   = 0.0
    self.update()

  def reset(self):
    self.horizontal_value = 0.0
    self.vertical_value = 0.0
    self.update()

  def set_pos(self, x, y):
    self.horizontal_value = round(x, 2)
    self.vertical_value = round(y, 2)
    self.update()

  def update(self):
    if self.horizontal_value >= 1.0:
      self.horizontal_value = 0.99
    elif self.horizontal_value <= -1.0:
      self.horizontal_value = -0.99
    if self.vertical_value >= 1.0:
      self.vertical_value = 0.99
    elif self.vertical_value <= -1.0:
      self.vertical_value = -0.99
    self.pwm.set_pwm(1, 0, self.convert_deg(self.horizontal_value * 90))
    self.pwm.set_pwm(0, 0, self.convert_deg(self.vertical_value * 90))

  def convert_deg(self, deg, freq=50):
    step = 4096
    max_pulse = 2.5
    min_pulse = 0.5
    center_pulse = (max_pulse - min_pulse) / 2 + min_pulse
    one_pulse = round((max_pulse - min_pulse) / 180, 2)
    deg_pulse = center_pulse + deg * one_pulse
    deg_num = int(deg_pulse / (1.0 / freq * 1000 / step))
    return deg_num