tridactyl/native/native_main.py
2019-06-23 10:04:21 -03:00

523 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import os
import pathlib
import re
import shutil
import struct
import subprocess
import sys
import tempfile
import time
import unicodedata
DEBUG = False
VERSION = "0.1.11"
class NoConnectionError(Exception):
""" Exception thrown when stdin cannot be read """
def is_command_on_path(command):
""" Returns 'True' if the if the specified command is found on
user's $PATH.
"""
if shutil.which(command):
return True
else:
return False
def eprint(*args, **kwargs):
""" Print to stderr, which gets echoed in the browser console
when run by Firefox
"""
print(*args, file=sys.stderr, flush=True, **kwargs)
def getenv(variable, default):
""" Get an environment variable value, or use the default provided """
return os.environ.get(variable) or default
def getMessage():
"""Read a message from stdin and decode it.
"Each message is serialized using JSON, UTF-8 encoded and is preceded with
a 32-bit value containing the message length in native byte order."
https://developer.mozilla.org/en-US/Add-ons/WebExtensions/Native_messaging#App_side
"""
rawLength = sys.stdin.buffer.read(4)
if len(rawLength) == 0:
sys.exit(0)
messageLength = struct.unpack("@I", rawLength)[0]
message = sys.stdin.buffer.read(messageLength).decode("utf-8")
return json.loads(message)
# Encode a message for transmission,
# given its content.
def encodeMessage(messageContent):
""" Encode a message for transmission, given its content."""
encodedContent = json.dumps(messageContent).encode("utf-8")
encodedLength = struct.pack("@I", len(encodedContent))
return {"length": encodedLength, "content": encodedContent}
# Send an encoded message to stdout
def sendMessage(encodedMessage):
""" Send an encoded message to stdout."""
sys.stdout.buffer.write(encodedMessage["length"])
sys.stdout.buffer.write(encodedMessage["content"])
try:
sys.stdout.buffer.write(encodedMessage["code"])
except KeyError:
pass
sys.stdout.buffer.flush()
def findUserConfigFile():
""" Find a user config file, if it exists. Return the file path, or None
if not found
"""
home = os.path.expanduser("~")
config_dir = getenv(
"XDG_CONFIG_HOME", os.path.join(home, ".config")
)
# Will search for files in this order
candidate_files = [
os.path.join(config_dir, "tridactyl", "tridactylrc"),
os.path.join(home, ".tridactylrc"),
os.path.join(home, "_config", "tridactyl", "tridactylrc"),
os.path.join(home, "_tridactylrc"),
]
config_path = None
# find the first path in the list that exists
for path in candidate_files:
if os.path.isfile(path):
config_path = path
break
return config_path
def getUserConfig():
# look it up freshly each time - the user could have moved or killed it
cfg_file = findUserConfigFile()
# no file, return
if not cfg_file:
return None
# for now, this is a simple file read, but if the files can
# include other files, that will need more work
return open(cfg_file, "r", encoding="utf-8").read()
def sanitizeFilename(fn):
""" Transform a string to make it suitable for use as a filename.
From https://stackoverflow.com/a/295466/147356"""
fn = (
unicodedata.normalize("NFKD", fn)
.encode("ascii", "ignore")
.decode("ascii")
)
fn = re.sub("[^\w\s/.-]", "", fn).strip().lower()
fn = re.sub("\.\.+", "", fn)
fn = re.sub("[-/\s]+", "-", fn)
return fn
def is_valid_firefox_profile(profile_dir):
is_valid = False
validity_indicator = "times.json"
if pathlib.WindowsPath(profile_dir).is_dir():
test_path = "%s\\%s" % (profile_dir, validity_indicator)
if pathlib.WindowsPath(test_path).is_file():
is_valid = True
return is_valid
def win_firefox_restart(message):
"""Handle 'win_firefox_restart' message."""
reply = {}
profile_dir = None
browser_cmd = None
try:
profile_dir = message["profiledir"].strip()
browser_cmd = message["browsercmd"].strip()
except KeyError:
reply = {
"code": -1,
"cmd": "error",
"error": "Error parsing 'restart' message.",
}
return reply
if (
profile_dir
and profile_dir != "auto"
and not is_valid_firefox_profile(profile_dir)
):
reply = {
"code": -1,
"cmd": "error",
"error": "%s %s %s"
% (
"Invalid profile directory specified.",
"Vaild profile directory path(s) can be found by",
"navigating to 'about:support'.",
),
}
elif browser_cmd and not is_command_on_path(browser_cmd):
reply = {
"code": -1,
"cmd": "error",
"error": "%s %s %s"
% (
"'{0}' wasn't found on %PATH%.".format(browser_cmd),
"Please set valid browser by",
"'set browser [browser-command]'.",
),
}
else:
# {{{
# Native messenger can't seem to create detached process on
# Windows while Firefox is quitting, which is essential to
# trigger restarting Firefox. So, below we are resorting to
# create a scheduled task with the task start-time set in
# the near future.
#
#
# subprocess.Popen(
# [ff_bin_path, "-profile", profile_dir],
# shell=False,
# creationflags=0x208 \
# | subprocess.CREATE_NEW_PROCESS_GROUP)
#
#
# 'schtasks.exe' is limited as in it doesn't support
# task-time with granularity in seconds. So, falling back
# to PowerShell as the last resort.
#
# out_str = ""
# task_time = time.strftime("%H:%M",
# time.localtime(
# time.time() + 60))
#
# out_str = subprocess.check_output(
# ["schtasks.exe",
# "/Create",
# "/F",
# "/SC",
# "ONCE",
# "/TN",
# "tridactyl",
# "/TR",
# "calc",
# "/IT",
# "/ST",
# task_time],
# shell=True)
# }}}
ff_lock_name = "parent.lock"
ff_bin_name = browser_cmd
ff_bin_path = '"%s"' % shutil.which(ff_bin_name)
ff_bin_dir = '"%s"' % str(
pathlib.WindowsPath(shutil.which(ff_bin_name)).parent
)
if profile_dir == "auto":
ff_lock_path = ff_bin_path
ff_args = '"%s"' % ("-foreground")
else:
ff_lock_path = '"%s/%s"' % (profile_dir, ff_lock_name)
ff_args = '"%s","%s","%s"' % (
"-foreground",
"-profile",
profile_dir,
)
try:
restart_ps1_content = """
$env:PATH=$env:PATH;{ff_bin_dir}
Set-Location -Path {ff_bin_dir}
$profileDir = "{profile_dir}"
if ($profileDir -ne "auto") {{
$lockFilePath = {ff_lock_path}
$locked = $true
$num_try = 10
}} else {{
$locked = $false
}}
while (($locked -eq $true) -and ($num_try -gt 0)) {{
try {{
[IO.File]::OpenWrite($lockFilePath).close()
$locked=$false
}} catch {{
$num_try-=1
Write-Host "[+] Trial: $num_try [lock == true]"
Start-Sleep -Seconds 1
}}
}}
if ($locked -eq $true) {{
$errorMsg = "Restarting Firefox failed. Please restart manually."
Write-Host "$errorMsg"
# Add-Type -AssemblyName System.Windows.Forms
# [System.Windows.MessageBox]::Show(
# $errorMsg,
# "Tridactyl")
}} else {{
Write-Host "[+] Restarting Firefox ..."
Start-Process `
-WorkingDirectory {ff_bin_dir} `
-FilePath {ff_bin_path} `
-ArgumentList {ff_args} `
-WindowStyle Normal
}}
""".format(
ff_bin_dir=ff_bin_dir,
profile_dir=profile_dir,
ff_lock_path=ff_lock_path,
ff_bin_path=ff_bin_path,
ff_args=ff_args,
)
delay_sec = 1.5
task_name = "firefox-restart"
native_messenger_dirname = ".tridactyl"
powershell_cmd = "powershell"
powershell_args = "%s %s" % (
"-NoProfile",
"-ExecutionPolicy Bypass",
)
restart_ps1_path = "%s\\%s\\%s" % (
os.path.expanduser("~"),
native_messenger_dirname,
"win_firefox_restart.ps1",
)
task_cmd = "cmd"
task_arg = '/c "%s %s -File %s"' % (
powershell_cmd,
powershell_args,
restart_ps1_path,
)
open(restart_ps1_path, "w+").write(restart_ps1_content)
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.check_output(
[
"powershell",
"-NonInteractive",
"-NoProfile",
"-WindowStyle",
"Minimized",
"-InputFormat",
"None",
"-ExecutionPolicy",
"Bypass",
"-Command",
"Register-ScheduledTask \
-TaskName '%s' \
-Force \
-Action (New-ScheduledTaskAction \
-Execute '%s' \
-Argument '%s') \
-Trigger (New-ScheduledTaskTrigger \
-Once \
-At \
(Get-Date).AddSeconds(%d).ToString('HH:mm:ss'))"
% (task_name, task_cmd, task_arg, delay_sec),
],
shell=False,
startupinfo=startupinfo,
)
reply = {
"code": 0,
"content": "Restarting in %d seconds..."
% delay_sec,
}
except subprocess.CalledProcessError:
reply = {
"code": -1,
"cmd": "error",
"error": "error creating restart task.",
}
return reply
def write_log(msg):
debug_log_dirname = ".tridactyl"
debug_log_filename = "native_main.log"
debug_log_path = os.path.join(
os.path.expanduser("~"),
debug_log_dirname,
debug_log_filename,
)
open(debug_log_path, "a+").write(msg)
def handleMessage(message):
""" Generate reply from incoming message. """
cmd = message["cmd"]
reply = {"cmd": cmd}
if DEBUG:
msg = "%s %s\n" % (
time.strftime("%H:%M:%S %p", time.localtime()),
str(message),
)
write_log(msg)
if cmd == "version":
reply = {"version": VERSION}
elif cmd == "getconfig":
file_content = getUserConfig()
if file_content:
reply["content"] = file_content
else:
reply["code"] = "File not found"
elif cmd == "getconfigpath":
reply["content"] = findUserConfigFile()
reply["code"] = 0
if reply["content"] is None:
reply["code"] = "Path not found"
elif cmd == "run":
commands = message["command"]
stdin = message.get("content", "").encode("utf-8")
p = subprocess.Popen(commands, shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
reply["content"] = p.communicate(stdin)[0].decode("utf-8")
reply["code"] = p.returncode
elif cmd == "eval":
output = eval(message["command"])
reply["content"] = output
elif cmd == "read":
try:
with open(
os.path.expandvars(
os.path.expanduser(message["file"])
),
"r", encoding="utf-8"
) as file:
reply["content"] = file.read()
reply["code"] = 0
except FileNotFoundError:
reply["content"] = ""
reply["code"] = 2
elif cmd == "mkdir":
os.makedirs(
os.path.relpath(message["dir"]),
exist_ok=message["exist_ok"],
)
reply["content"] = ""
reply["code"] = 0
elif cmd == "move":
dest = os.path.expanduser(message["to"])
if (os.path.isfile(dest)):
reply["code"] = 1
else:
try:
shutil.move(os.path.expanduser(message["from"]), dest)
reply["code"] = 0
except Exception:
reply["code"] = 2
elif cmd == "write":
with open(message["file"], "w", encoding="utf-8") as file:
file.write(message["content"])
elif cmd == "writerc":
path = os.path.expanduser(message["file"])
if not os.path.isfile(path) or message["force"]:
try:
with open(path, "w", encoding="utf-8") as file:
file.write(message["content"])
reply["code"] = 0 # Success.
except EnvironmentError:
reply["code"] = 2 # Some OS related error.
else:
reply["code"] = 1 # File exist, send force="true" or try another filename.
elif cmd == "temp":
prefix = message.get("prefix")
if prefix is None:
prefix = ""
prefix = "tmp_{}_".format(sanitizeFilename(prefix))
(handle, filepath) = tempfile.mkstemp(prefix=prefix, suffix=".txt")
with os.fdopen(handle, "w", encoding="utf-8") as file:
file.write(message["content"])
reply["content"] = filepath
elif cmd == "env":
reply["content"] = getenv(message["var"], "")
elif cmd == "win_firefox_restart":
reply = win_firefox_restart(message)
elif cmd == "list_dir":
path = os.path.expanduser(message.get("path"))
reply["sep"] = os.sep
reply["isDir"] = os.path.isdir(path)
if not reply["isDir"]:
path = os.path.dirname(path)
if not path:
path = "./"
reply["files"] = os.listdir(path)
else:
reply = {"cmd": "error", "error": "Unhandled message"}
eprint("Unhandled message: {}".format(message))
return reply
while True:
message = getMessage()
reply = handleMessage(message)
sendMessage(encodeMessage(reply))