mirror of
https://github.com/vale981/tridactyl
synced 2025-03-05 09:31:41 -05:00

Python's tempfile.mkstemp method returns an open file descriptor, but native_main.py was ignoring that and re-opening the file. This commit modifies the code to use os.fdopen so that the file descriptor is closed when exiting the 'with ...' context manager.
174 lines
4.7 KiB
Python
Executable file
174 lines
4.7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import struct
|
|
import subprocess
|
|
import tempfile
|
|
|
|
VERSION = "0.1.3"
|
|
|
|
class NoConnectionError(Exception):
|
|
""" Exception thrown when stdin cannot be read """
|
|
|
|
|
|
def eprint(*args, **kwargs):
|
|
""" Print to stderr, which gets echoed in the browser console when run
|
|
by Firefox
|
|
"""
|
|
print(*args, file=sys.stderr, flush=True, **kwargs)
|
|
|
|
|
|
def getenv(variable, default):
|
|
""" Get an environment variable value, or use the default provided """
|
|
return os.environ.get(variable) or default
|
|
|
|
def getMessage():
|
|
"""Read a message from stdin and decode it.
|
|
|
|
"Each message is serialized using JSON, UTF-8 encoded and is preceded with
|
|
a 32-bit value containing the message length in native byte order."
|
|
|
|
https://developer.mozilla.org/en-US/Add-ons/WebExtensions/Native_messaging#App_side
|
|
|
|
"""
|
|
rawLength = sys.stdin.buffer.read(4)
|
|
if len(rawLength) == 0:
|
|
sys.exit(0)
|
|
messageLength = struct.unpack('@I', rawLength)[0]
|
|
message = sys.stdin.buffer.read(messageLength).decode('utf-8')
|
|
return json.loads(message)
|
|
|
|
|
|
# Encode a message for transmission,
|
|
# given its content.
|
|
def encodeMessage(messageContent):
|
|
""" Encode a message for transmission, given its content."""
|
|
encodedContent = json.dumps(messageContent).encode('utf-8')
|
|
encodedLength = struct.pack('@I', len(encodedContent))
|
|
return {'length': encodedLength, 'content': encodedContent}
|
|
|
|
|
|
# Send an encoded message to stdout
|
|
def sendMessage(encodedMessage):
|
|
""" Send an encoded message to stdout."""
|
|
sys.stdout.buffer.write(encodedMessage['length'])
|
|
sys.stdout.buffer.write(encodedMessage['content'])
|
|
try:
|
|
sys.stdout.buffer.write(encodedMessage['code'])
|
|
except KeyError:
|
|
pass
|
|
|
|
sys.stdout.buffer.flush()
|
|
|
|
|
|
def findUserConfigFile():
|
|
""" Find a user config file, if it exists. Return the file path, or None
|
|
if not found
|
|
"""
|
|
home = os.path.expanduser('~')
|
|
config_dir = getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
|
|
|
|
# Will search for files in this order
|
|
candidate_files = [
|
|
os.path.join(config_dir, "tridactyl", "tridactylrc"),
|
|
os.path.join(home, '.tridactylrc')
|
|
]
|
|
|
|
config_path = None
|
|
|
|
# find the first path in the list that exists
|
|
for path in candidate_files:
|
|
if os.path.isfile(path):
|
|
config_path = path
|
|
break
|
|
|
|
return config_path
|
|
|
|
|
|
def getUserConfig():
|
|
# look it up freshly each time - the user could have moved or killed it
|
|
cfg_file = findUserConfigFile()
|
|
|
|
# no file, return
|
|
if not cfg_file:
|
|
return None
|
|
|
|
# for now, this is a simple file read, but if the files can
|
|
# include other files, that will need more work
|
|
return open(cfg_file, 'r').read()
|
|
|
|
|
|
def handleMessage(message):
|
|
""" Generate reply from incoming message. """
|
|
cmd = message["cmd"]
|
|
reply = {'cmd': cmd}
|
|
|
|
if cmd == 'version':
|
|
reply = {'version': VERSION}
|
|
|
|
elif cmd == 'getconfig':
|
|
file_content = getUserConfig()
|
|
if file_content:
|
|
reply['content'] = file_content
|
|
else:
|
|
reply['code'] = 'File not found'
|
|
|
|
elif cmd == 'run':
|
|
commands = message["command"]
|
|
|
|
try:
|
|
p = subprocess.check_output(commands, shell=True)
|
|
reply["content"] = p.decode("utf-8")
|
|
reply["code"] = 0
|
|
|
|
except subprocess.CalledProcessError as process:
|
|
reply["code"] = process.returncode
|
|
reply["content"] = process.output.decode("utf-8")
|
|
|
|
elif cmd == 'eval':
|
|
output = eval(message["command"])
|
|
reply['content'] = output
|
|
|
|
elif cmd == 'read':
|
|
try:
|
|
with open(os.path.expandvars(os.path.expanduser(message["file"])), "r") as file:
|
|
reply['content'] = file.read()
|
|
reply['code'] = 0
|
|
except FileNotFoundError:
|
|
reply['content'] = ""
|
|
reply['code'] = 2
|
|
|
|
elif cmd == 'mkdir':
|
|
os.makedirs(
|
|
os.path.relpath(message["dir"]), exist_ok=message["exist_ok"]
|
|
)
|
|
reply['content'] = ""
|
|
reply['code'] = 0
|
|
|
|
elif cmd == 'write':
|
|
with open(message["file"], "w") as file:
|
|
file.write(message["content"])
|
|
|
|
elif cmd == 'temp':
|
|
(handle, filepath) = tempfile.mkstemp()
|
|
with os.fdopen(handle, "w") as file:
|
|
file.write(message["content"])
|
|
reply['content'] = filepath
|
|
|
|
elif cmd == 'env':
|
|
reply['content'] = getenv(message["var"], "")
|
|
|
|
else:
|
|
reply = {'cmd': 'error', 'error': 'Unhandled message'}
|
|
eprint('Unhandled message: {}'.format(message))
|
|
|
|
return reply
|
|
|
|
|
|
while True:
|
|
message = getMessage()
|
|
reply = handleMessage(message)
|
|
sendMessage(encodeMessage(reply))
|