import json
import os
import sys
import re
import argparse
import time
from math import floor
from os.path import dirname
from subprocess import Popen, PIPE, STDOUT
from blessings import Terminal
class Heatmap(object):
coords = [
[
[ 4, 0], [ 4, 2], [ 2, 0], [ 1, 0], [ 2, 2], [ 3, 0], [ 3, 2],
[ 3, 4], [ 3, 6], [ 2, 4], [ 1, 2], [ 2, 6], [ 4, 4], [ 4, 6],
],
[
[ 8, 0], [ 8, 2], [ 6, 0], [ 5, 0], [ 6, 2], [ 7, 0], [ 7, 2],
[ 7, 4], [ 7, 6], [ 6, 4], [ 5, 2], [ 6, 6], [ 8, 4], [ 8, 6],
],
[
[12, 0], [12, 2], [10, 0], [ 9, 0], [10, 2], [11, 0], [ ],
[ ], [11, 2], [10, 4], [ 9, 2], [10, 6], [12, 4], [12, 6],
],
[
[17, 0], [17, 2], [15, 0], [14, 0], [15, 2], [16, 0], [13, 0],
[13, 2], [16, 2], [15, 4], [14, 2], [15, 6], [17, 4], [17, 6],
],
[
[20, 0], [20, 2], [19, 0], [18, 0], [19, 2], [], [], [], [],
[19, 4], [18, 2], [19, 6], [20, 4], [20, 6], [], [], [], []
],
[
[ ], [23, 0], [22, 2], [22, 0], [22, 4], [21, 0], [21, 2],
[24, 0], [24, 2], [25, 0], [25, 4], [25, 2], [26, 0], [ ],
],
]
def set_attr_at(self, block, n, attr, fn, val):
blk = self.heatmap[block][n]
if attr in blk:
blk[attr] = fn(blk[attr], val)
else:
blk[attr] = fn(None, val)
def coord(self, col, row):
return self.coords[row][col]
@staticmethod
def set_attr(orig, new):
return new
def set_bg(self, coords, color):
(block, n) = coords
self.set_attr_at(block, n, "c", self.set_attr, color)
def set_tap_info(self, coords, count, cap):
(block, n) = coords
def _set_tap_info(o, _count, _cap):
ns = 4 - o.count ("\n")
return o + "\n" * ns + "%.02f%%" % (float(_count) / float(_cap) * 100)
if not cap:
cap = 1
self.heatmap[block][n + 1] = _set_tap_info (self.heatmap[block][n + 1], count, cap)
@staticmethod
def heatmap_color (v):
colors = [ [0.3, 0.3, 1], [0.3, 1, 0.3], [1, 1, 0.3], [1, 0.3, 0.3]]
fb = 0
if v <= 0:
idx1, idx2 = 0, 0
elif v >= 1:
idx1, idx2 = len(colors) - 1, len(colors) - 1
else:
val = v * (len(colors) - 1)
idx1 = int(floor(val))
idx2 = idx1 + 1
fb = val - float(idx1)
r = (colors[idx2][0] - colors[idx1][0]) * fb + colors[idx1][0]
g = (colors[idx2][1] - colors[idx1][1]) * fb + colors[idx1][1]
b = (colors[idx2][2] - colors[idx1][2]) * fb + colors[idx1][2]
r, g, b = [x * 255 for x in (r, g, b)]
return "#%02x%02x%02x" % (int(r), int(g), int(b))
def __init__(self, layout):
self.log = {}
self.total = 0
self.max_cnt = 0
self.layout = layout
def update_log(self, coords):
(c, r) = coords
if not (c, r) in self.log:
self.log[(c, r)] = 0
self.log[(c, r)] = self.log[(c, r)] + 1
self.total = self.total + 1
if self.max_cnt < self.log[(c, r)]:
self.max_cnt = self.log[(c, r)]
def get_heatmap(self):
with open("%s/heatmap-layout.%s.json" % (dirname(sys.argv[0]), self.layout), "r") as f:
self.heatmap = json.load (f)
for row in self.coords:
for coord in row:
if coord != []:
self.set_bg (coord, "#d9dae0")
for (c, r) in self.log:
coords = self.coord(c, r)
b, n = coords
cap = self.max_cnt
if cap == 0:
cap = 1
v = float(self.log[(c, r)]) / cap
self.set_bg (coords, self.heatmap_color (v))
self.set_tap_info (coords, self.log[(c, r)], self.total)
return self.heatmap
def get_stats(self):
usage = [
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0]
]
finger_map = [0, 0, 1, 2, 3, 3, 3, 1, 1, 1, 2, 3, 4, 4]
for (c, r) in self.log:
if r == 5: if c <= 6: usage[0][4] = usage[0][4] + self.log[(c, r)]
else:
usage[1][0] = usage[1][0] + self.log[(c, r)]
elif r == 4 and (c == 4 or c == 9): if c <= 6: usage[0][4] = usage[0][4] + self.log[(c, r)]
else:
usage[1][0] = usage[1][0] + self.log[(c, r)]
else:
fc = c
hand = 0
if fc >= 7:
hand = 1
fm = finger_map[fc]
usage[hand][fm] = usage[hand][fm] + self.log[(c, r)]
hand_usage = [0, 0]
for f in usage[0]:
hand_usage[0] = hand_usage[0] + f
for f in usage[1]:
hand_usage[1] = hand_usage[1] + f
total = self.total
if total == 0:
total = 1
stats = {
"total-keys": total,
"hands": {
"left": {
"usage": round(float(hand_usage[0]) / total * 100, 2),
"fingers": {
"pinky": 0,
"ring": 0,
"middle": 0,
"index": 0,
"thumb": 0,
}
},
"right": {
"usage": round(float(hand_usage[1]) / total * 100, 2),
"fingers": {
"thumb": 0,
"index": 0,
"middle": 0,
"ring": 0,
"pinky": 0,
}
},
}
}
hmap = ['left', 'right']
fmap = ['pinky', 'ring', 'middle', 'index', 'thumb',
'thumb', 'index', 'middle', 'ring', 'pinky']
for hand_idx in range(len(usage)):
hand = usage[hand_idx]
for finger_idx in range(len(hand)):
stats['hands'][hmap[hand_idx]]['fingers'][fmap[finger_idx + hand_idx * 5]] = round(float(hand[finger_idx]) / total * 100, 2)
return stats
def dump_all(out_dir, heatmaps):
stats = {}
t = Terminal()
t.clear()
sys.stdout.write("\x1b[2J\x1b[H")
print ('{t.underline}{outdir}{t.normal}\n'.format(t=t, outdir=out_dir))
keys = list(heatmaps.keys())
keys.sort()
for layer in keys:
if len(heatmaps[layer].log) == 0:
continue
with open ("%s/%s.json" % (out_dir, layer), "w") as f:
json.dump(heatmaps[layer].get_heatmap(), f)
stats[layer] = heatmaps[layer].get_stats()
left = stats[layer]['hands']['left']
right = stats[layer]['hands']['right']
print ('{t.bold}{layer}{t.normal} ({total:,} taps):'.format(t=t, layer=layer,
total=int(stats[layer]['total-keys'] / 2)))
print (('{t.underline} | ' + \
'left ({l[usage]:6.2f}%) | ' + \
'right ({r[usage]:6.2f}%) |{t.normal}').format(t=t, l=left, r=right))
print ((' {t.bright_magenta}pinky{t.white} | {left[pinky]:6.2f}% | {right[pinky]:6.2f}% |\n' + \
' {t.bright_cyan}ring{t.white} | {left[ring]:6.2f}% | {right[ring]:6.2f}% |\n' + \
' {t.bright_blue}middle{t.white} | {left[middle]:6.2f}% | {right[middle]:6.2f}% |\n' + \
' {t.bright_green}index{t.white} | {left[index]:6.2f}% | {right[index]:6.2f}% |\n' + \
' {t.bright_red}thumb{t.white} | {left[thumb]:6.2f}% | {right[thumb]:6.2f}% |\n' + \
'').format(left=left['fingers'], right=right['fingers'], t=t))
def process_line(line, heatmaps, opts, stamped_log = None):
m = re.search ('KL: col=(\d+), row=(\d+), pressed=(\d+), layer=(.*)', line)
if not m:
return False
if stamped_log is not None:
if line.startswith("KL:"):
print ("%10.10f %s" % (time.time(), line),
file = stamped_log, end = '')
else:
print (line,
file = stamped_log, end = '')
stamped_log.flush()
(c, r, l) = (int(m.group (2)), int(m.group (1)), m.group (4))
if (c, r) not in opts.allowed_keys:
return False
heatmaps[l].update_log ((c, r))
return True
def setup_allowed_keys(opts):
if len(opts.only_key):
incmap={}
for v in opts.only_key:
m = re.search ('(\d+),(\d+)', v)
if not m:
continue
(c, r) = (int(m.group(1)), int(m.group(2)))
incmap[(c, r)] = True
else:
incmap={}
for r in range(0, 6):
for c in range(0, 14):
incmap[(c, r)] = True
for v in opts.ignore_key:
m = re.search ('(\d+),(\d+)', v)
if not m:
continue
(c, r) = (int(m.group(1)), int(m.group(2)))
del(incmap[(c, r)])
return incmap
def main(opts):
heatmaps = {"Dvorak": Heatmap("Dvorak"),
"ADORE": Heatmap("ADORE")
}
cnt = 0
out_dir = opts.outdir
if not os.path.exists(out_dir):
os.makedirs(out_dir)
opts.allowed_keys = setup_allowed_keys(opts)
if not opts.one_shot:
try:
with open("%s/stamped-log" % out_dir, "r") as f:
while True:
line = f.readline()
if not line:
break
if not process_line(line, heatmaps, opts):
continue
except:
pass
stamped_log = open ("%s/stamped-log" % (out_dir), "a+")
else:
stamped_log = None
while True:
line = sys.stdin.readline()
if not line:
break
if not process_line(line, heatmaps, opts, stamped_log):
continue
cnt = cnt + 1
if opts.dump_interval != -1 and cnt >= opts.dump_interval and not opts.one_shot:
cnt = 0
dump_all(out_dir, heatmaps)
dump_all (out_dir, heatmaps)
if __name__ == "__main__":
parser = argparse.ArgumentParser (description = "keylog to heatmap processor")
parser.add_argument ('outdir', action = 'store',
help = 'Output directory')
parser.add_argument ('--dump-interval', dest = 'dump_interval', action = 'store', type = int,
default = 100, help = 'Dump stats and heatmap at every Nth event, -1 for dumping at EOF only')
parser.add_argument ('--ignore-key', dest = 'ignore_key', action = 'append', type = str,
default = [], help = 'Ignore the key at position (x, y)')
parser.add_argument ('--only-key', dest = 'only_key', action = 'append', type = str,
default = [], help = 'Only include key at position (x, y)')
parser.add_argument ('--one-shot', dest = 'one_shot', action = 'store_true',
help = 'Do not load previous data, and do not update it, either.')
args = parser.parse_args()
if len(args.ignore_key) and len(args.only_key):
print ("--ignore-key and --only-key are mutually exclusive, please only use one of them!",
file = sys.stderr)
sys.exit(1)
main(args)