mirror of
https://github.com/vale981/tridactyl
synced 2025-03-04 09:01:39 -05:00
523 lines
14 KiB
Python
Executable file
523 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import shutil
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import unicodedata
|
|
|
|
DEBUG = False
|
|
VERSION = "0.1.11"
|
|
|
|
|
|
class NoConnectionError(Exception):
|
|
""" Exception thrown when stdin cannot be read """
|
|
|
|
|
|
def is_command_on_path(command):
|
|
""" Returns 'True' if the if the specified command is found on
|
|
user's $PATH.
|
|
"""
|
|
if shutil.which(command):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def eprint(*args, **kwargs):
|
|
""" Print to stderr, which gets echoed in the browser console
|
|
when run by Firefox
|
|
"""
|
|
print(*args, file=sys.stderr, flush=True, **kwargs)
|
|
|
|
|
|
def getenv(variable, default):
|
|
""" Get an environment variable value, or use the default provided """
|
|
return os.environ.get(variable) or default
|
|
|
|
|
|
def getMessage():
|
|
"""Read a message from stdin and decode it.
|
|
|
|
"Each message is serialized using JSON, UTF-8 encoded and is preceded with
|
|
a 32-bit value containing the message length in native byte order."
|
|
|
|
https://developer.mozilla.org/en-US/Add-ons/WebExtensions/Native_messaging#App_side
|
|
|
|
"""
|
|
rawLength = sys.stdin.buffer.read(4)
|
|
if len(rawLength) == 0:
|
|
sys.exit(0)
|
|
messageLength = struct.unpack("@I", rawLength)[0]
|
|
message = sys.stdin.buffer.read(messageLength).decode("utf-8")
|
|
return json.loads(message)
|
|
|
|
|
|
# Encode a message for transmission,
|
|
# given its content.
|
|
def encodeMessage(messageContent):
|
|
""" Encode a message for transmission, given its content."""
|
|
encodedContent = json.dumps(messageContent).encode("utf-8")
|
|
encodedLength = struct.pack("@I", len(encodedContent))
|
|
return {"length": encodedLength, "content": encodedContent}
|
|
|
|
|
|
# Send an encoded message to stdout
|
|
def sendMessage(encodedMessage):
|
|
""" Send an encoded message to stdout."""
|
|
sys.stdout.buffer.write(encodedMessage["length"])
|
|
sys.stdout.buffer.write(encodedMessage["content"])
|
|
try:
|
|
sys.stdout.buffer.write(encodedMessage["code"])
|
|
except KeyError:
|
|
pass
|
|
|
|
sys.stdout.buffer.flush()
|
|
|
|
|
|
def findUserConfigFile():
|
|
""" Find a user config file, if it exists. Return the file path, or None
|
|
if not found
|
|
"""
|
|
home = os.path.expanduser("~")
|
|
config_dir = getenv(
|
|
"XDG_CONFIG_HOME", os.path.join(home, ".config")
|
|
)
|
|
|
|
# Will search for files in this order
|
|
candidate_files = [
|
|
os.path.join(config_dir, "tridactyl", "tridactylrc"),
|
|
os.path.join(home, ".tridactylrc"),
|
|
os.path.join(home, "_config", "tridactyl", "tridactylrc"),
|
|
os.path.join(home, "_tridactylrc"),
|
|
]
|
|
|
|
config_path = None
|
|
|
|
# find the first path in the list that exists
|
|
for path in candidate_files:
|
|
if os.path.isfile(path):
|
|
config_path = path
|
|
break
|
|
|
|
return config_path
|
|
|
|
|
|
def getUserConfig():
|
|
# look it up freshly each time - the user could have moved or killed it
|
|
cfg_file = findUserConfigFile()
|
|
|
|
# no file, return
|
|
if not cfg_file:
|
|
return None
|
|
|
|
# for now, this is a simple file read, but if the files can
|
|
# include other files, that will need more work
|
|
return open(cfg_file, "r", encoding="utf-8").read()
|
|
|
|
|
|
def sanitizeFilename(fn):
|
|
""" Transform a string to make it suitable for use as a filename.
|
|
|
|
From https://stackoverflow.com/a/295466/147356"""
|
|
|
|
fn = (
|
|
unicodedata.normalize("NFKD", fn)
|
|
.encode("ascii", "ignore")
|
|
.decode("ascii")
|
|
)
|
|
fn = re.sub("[^\w\s/.-]", "", fn).strip().lower()
|
|
fn = re.sub("\.\.+", "", fn)
|
|
fn = re.sub("[-/\s]+", "-", fn)
|
|
return fn
|
|
|
|
|
|
def is_valid_firefox_profile(profile_dir):
|
|
is_valid = False
|
|
validity_indicator = "times.json"
|
|
|
|
if pathlib.WindowsPath(profile_dir).is_dir():
|
|
test_path = "%s\\%s" % (profile_dir, validity_indicator)
|
|
|
|
if pathlib.WindowsPath(test_path).is_file():
|
|
is_valid = True
|
|
|
|
return is_valid
|
|
|
|
|
|
def win_firefox_restart(message):
|
|
"""Handle 'win_firefox_restart' message."""
|
|
reply = {}
|
|
profile_dir = None
|
|
browser_cmd = None
|
|
|
|
try:
|
|
profile_dir = message["profiledir"].strip()
|
|
browser_cmd = message["browsercmd"].strip()
|
|
except KeyError:
|
|
reply = {
|
|
"code": -1,
|
|
"cmd": "error",
|
|
"error": "Error parsing 'restart' message.",
|
|
}
|
|
return reply
|
|
|
|
if (
|
|
profile_dir
|
|
and profile_dir != "auto"
|
|
and not is_valid_firefox_profile(profile_dir)
|
|
):
|
|
reply = {
|
|
"code": -1,
|
|
"cmd": "error",
|
|
"error": "%s %s %s"
|
|
% (
|
|
"Invalid profile directory specified.",
|
|
"Vaild profile directory path(s) can be found by",
|
|
"navigating to 'about:support'.",
|
|
),
|
|
}
|
|
|
|
elif browser_cmd and not is_command_on_path(browser_cmd):
|
|
reply = {
|
|
"code": -1,
|
|
"cmd": "error",
|
|
"error": "%s %s %s"
|
|
% (
|
|
"'{0}' wasn't found on %PATH%.".format(browser_cmd),
|
|
"Please set valid browser by",
|
|
"'set browser [browser-command]'.",
|
|
),
|
|
}
|
|
|
|
else:
|
|
# {{{
|
|
# Native messenger can't seem to create detached process on
|
|
# Windows while Firefox is quitting, which is essential to
|
|
# trigger restarting Firefox. So, below we are resorting to
|
|
# create a scheduled task with the task start-time set in
|
|
# the near future.
|
|
#
|
|
|
|
#
|
|
# subprocess.Popen(
|
|
# [ff_bin_path, "-profile", profile_dir],
|
|
# shell=False,
|
|
# creationflags=0x208 \
|
|
# | subprocess.CREATE_NEW_PROCESS_GROUP)
|
|
#
|
|
|
|
#
|
|
# 'schtasks.exe' is limited as in it doesn't support
|
|
# task-time with granularity in seconds. So, falling back
|
|
# to PowerShell as the last resort.
|
|
#
|
|
|
|
# out_str = ""
|
|
# task_time = time.strftime("%H:%M",
|
|
# time.localtime(
|
|
# time.time() + 60))
|
|
#
|
|
# out_str = subprocess.check_output(
|
|
# ["schtasks.exe",
|
|
# "/Create",
|
|
# "/F",
|
|
# "/SC",
|
|
# "ONCE",
|
|
# "/TN",
|
|
# "tridactyl",
|
|
# "/TR",
|
|
# "calc",
|
|
# "/IT",
|
|
# "/ST",
|
|
# task_time],
|
|
# shell=True)
|
|
# }}}
|
|
|
|
ff_lock_name = "parent.lock"
|
|
|
|
ff_bin_name = browser_cmd
|
|
ff_bin_path = '"%s"' % shutil.which(ff_bin_name)
|
|
|
|
ff_bin_dir = '"%s"' % str(
|
|
pathlib.WindowsPath(shutil.which(ff_bin_name)).parent
|
|
)
|
|
|
|
if profile_dir == "auto":
|
|
ff_lock_path = ff_bin_path
|
|
ff_args = '"%s"' % ("-foreground")
|
|
else:
|
|
ff_lock_path = '"%s/%s"' % (profile_dir, ff_lock_name)
|
|
ff_args = '"%s","%s","%s"' % (
|
|
"-foreground",
|
|
"-profile",
|
|
profile_dir,
|
|
)
|
|
|
|
try:
|
|
restart_ps1_content = """
|
|
$env:PATH=$env:PATH;{ff_bin_dir}
|
|
Set-Location -Path {ff_bin_dir}
|
|
$profileDir = "{profile_dir}"
|
|
if ($profileDir -ne "auto") {{
|
|
$lockFilePath = {ff_lock_path}
|
|
$locked = $true
|
|
$num_try = 10
|
|
}} else {{
|
|
$locked = $false
|
|
}}
|
|
while (($locked -eq $true) -and ($num_try -gt 0)) {{
|
|
try {{
|
|
[IO.File]::OpenWrite($lockFilePath).close()
|
|
$locked=$false
|
|
}} catch {{
|
|
$num_try-=1
|
|
Write-Host "[+] Trial: $num_try [lock == true]"
|
|
Start-Sleep -Seconds 1
|
|
}}
|
|
}}
|
|
if ($locked -eq $true) {{
|
|
$errorMsg = "Restarting Firefox failed. Please restart manually."
|
|
Write-Host "$errorMsg"
|
|
# Add-Type -AssemblyName System.Windows.Forms
|
|
# [System.Windows.MessageBox]::Show(
|
|
# $errorMsg,
|
|
# "Tridactyl")
|
|
}} else {{
|
|
Write-Host "[+] Restarting Firefox ..."
|
|
Start-Process `
|
|
-WorkingDirectory {ff_bin_dir} `
|
|
-FilePath {ff_bin_path} `
|
|
-ArgumentList {ff_args} `
|
|
-WindowStyle Normal
|
|
}}
|
|
""".format(
|
|
ff_bin_dir=ff_bin_dir,
|
|
profile_dir=profile_dir,
|
|
ff_lock_path=ff_lock_path,
|
|
ff_bin_path=ff_bin_path,
|
|
ff_args=ff_args,
|
|
)
|
|
|
|
delay_sec = 1.5
|
|
task_name = "firefox-restart"
|
|
native_messenger_dirname = ".tridactyl"
|
|
|
|
powershell_cmd = "powershell"
|
|
powershell_args = "%s %s" % (
|
|
"-NoProfile",
|
|
"-ExecutionPolicy Bypass",
|
|
)
|
|
|
|
restart_ps1_path = "%s\\%s\\%s" % (
|
|
os.path.expanduser("~"),
|
|
native_messenger_dirname,
|
|
"win_firefox_restart.ps1",
|
|
)
|
|
|
|
task_cmd = "cmd"
|
|
task_arg = '/c "%s %s -File %s"' % (
|
|
powershell_cmd,
|
|
powershell_args,
|
|
restart_ps1_path,
|
|
)
|
|
|
|
open(restart_ps1_path, "w+").write(restart_ps1_content)
|
|
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
|
|
subprocess.check_output(
|
|
[
|
|
"powershell",
|
|
"-NonInteractive",
|
|
"-NoProfile",
|
|
"-WindowStyle",
|
|
"Minimized",
|
|
"-InputFormat",
|
|
"None",
|
|
"-ExecutionPolicy",
|
|
"Bypass",
|
|
"-Command",
|
|
"Register-ScheduledTask \
|
|
-TaskName '%s' \
|
|
-Force \
|
|
-Action (New-ScheduledTaskAction \
|
|
-Execute '%s' \
|
|
-Argument '%s') \
|
|
-Trigger (New-ScheduledTaskTrigger \
|
|
-Once \
|
|
-At \
|
|
(Get-Date).AddSeconds(%d).ToString('HH:mm:ss'))"
|
|
% (task_name, task_cmd, task_arg, delay_sec),
|
|
],
|
|
shell=False,
|
|
startupinfo=startupinfo,
|
|
)
|
|
|
|
reply = {
|
|
"code": 0,
|
|
"content": "Restarting in %d seconds..."
|
|
% delay_sec,
|
|
}
|
|
|
|
except subprocess.CalledProcessError:
|
|
reply = {
|
|
"code": -1,
|
|
"cmd": "error",
|
|
"error": "error creating restart task.",
|
|
}
|
|
|
|
return reply
|
|
|
|
|
|
def write_log(msg):
|
|
debug_log_dirname = ".tridactyl"
|
|
debug_log_filename = "native_main.log"
|
|
|
|
debug_log_path = os.path.join(
|
|
os.path.expanduser("~"),
|
|
debug_log_dirname,
|
|
debug_log_filename,
|
|
)
|
|
|
|
open(debug_log_path, "a+").write(msg)
|
|
|
|
|
|
def handleMessage(message):
|
|
""" Generate reply from incoming message. """
|
|
cmd = message["cmd"]
|
|
reply = {"cmd": cmd}
|
|
|
|
if DEBUG:
|
|
msg = "%s %s\n" % (
|
|
time.strftime("%H:%M:%S %p", time.localtime()),
|
|
str(message),
|
|
)
|
|
write_log(msg)
|
|
|
|
if cmd == "version":
|
|
reply = {"version": VERSION}
|
|
|
|
elif cmd == "getconfig":
|
|
file_content = getUserConfig()
|
|
if file_content:
|
|
reply["content"] = file_content
|
|
else:
|
|
reply["code"] = "File not found"
|
|
|
|
elif cmd == "getconfigpath":
|
|
reply["content"] = findUserConfigFile()
|
|
reply["code"] = 0
|
|
if reply["content"] is None:
|
|
reply["code"] = "Path not found"
|
|
|
|
elif cmd == "run":
|
|
commands = message["command"]
|
|
stdin = message.get("content", "").encode("utf-8")
|
|
|
|
p = subprocess.Popen(commands, shell=True,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE)
|
|
|
|
reply["content"] = p.communicate(stdin)[0].decode("utf-8")
|
|
reply["code"] = p.returncode
|
|
|
|
elif cmd == "eval":
|
|
output = eval(message["command"])
|
|
reply["content"] = output
|
|
|
|
elif cmd == "read":
|
|
try:
|
|
with open(
|
|
os.path.expandvars(
|
|
os.path.expanduser(message["file"])
|
|
),
|
|
"r", encoding="utf-8"
|
|
) as file:
|
|
reply["content"] = file.read()
|
|
reply["code"] = 0
|
|
except FileNotFoundError:
|
|
reply["content"] = ""
|
|
reply["code"] = 2
|
|
|
|
elif cmd == "mkdir":
|
|
os.makedirs(
|
|
os.path.relpath(message["dir"]),
|
|
exist_ok=message["exist_ok"],
|
|
)
|
|
reply["content"] = ""
|
|
reply["code"] = 0
|
|
|
|
elif cmd == "move":
|
|
dest = os.path.expanduser(message["to"])
|
|
if (os.path.isfile(dest)):
|
|
reply["code"] = 1
|
|
else:
|
|
try:
|
|
shutil.move(os.path.expanduser(message["from"]), dest)
|
|
reply["code"] = 0
|
|
except Exception:
|
|
reply["code"] = 2
|
|
|
|
elif cmd == "write":
|
|
with open(message["file"], "w", encoding="utf-8") as file:
|
|
file.write(message["content"])
|
|
|
|
elif cmd == "writerc":
|
|
path = os.path.expanduser(message["file"])
|
|
if not os.path.isfile(path) or message["force"]:
|
|
try:
|
|
with open(path, "w", encoding="utf-8") as file:
|
|
file.write(message["content"])
|
|
reply["code"] = 0 # Success.
|
|
except EnvironmentError:
|
|
reply["code"] = 2 # Some OS related error.
|
|
else:
|
|
reply["code"] = 1 # File exist, send force="true" or try another filename.
|
|
|
|
elif cmd == "temp":
|
|
prefix = message.get("prefix")
|
|
if prefix is None:
|
|
prefix = ""
|
|
prefix = "tmp_{}_".format(sanitizeFilename(prefix))
|
|
|
|
(handle, filepath) = tempfile.mkstemp(prefix=prefix, suffix=".txt")
|
|
with os.fdopen(handle, "w", encoding="utf-8") as file:
|
|
file.write(message["content"])
|
|
reply["content"] = filepath
|
|
|
|
elif cmd == "env":
|
|
reply["content"] = getenv(message["var"], "")
|
|
|
|
elif cmd == "win_firefox_restart":
|
|
reply = win_firefox_restart(message)
|
|
|
|
elif cmd == "list_dir":
|
|
path = os.path.expanduser(message.get("path"))
|
|
reply["sep"] = os.sep
|
|
reply["isDir"] = os.path.isdir(path)
|
|
if not reply["isDir"]:
|
|
path = os.path.dirname(path)
|
|
if not path:
|
|
path = "./"
|
|
reply["files"] = os.listdir(path)
|
|
|
|
else:
|
|
reply = {"cmd": "error", "error": "Unhandled message"}
|
|
eprint("Unhandled message: {}".format(message))
|
|
|
|
return reply
|
|
|
|
|
|
while True:
|
|
message = getMessage()
|
|
reply = handleMessage(message)
|
|
sendMessage(encodeMessage(reply))
|