Wayland SS delegated to Grim at reduced quality for faster faster speeds
This commit is contained in:
parent
11600ae70f
commit
56d8c18871
12
config.py
12
config.py
@ -1,4 +1,4 @@
|
|||||||
import os, ast, torch
|
import os, ast, torch, platform
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
load_dotenv(override=True)
|
load_dotenv(override=True)
|
||||||
|
|
||||||
@ -7,13 +7,19 @@ load_dotenv(override=True)
|
|||||||
|
|
||||||
### available languages: 'ch_sim', 'ch_tra', 'ja', 'ko', 'en'
|
### available languages: 'ch_sim', 'ch_tra', 'ja', 'ko', 'en'
|
||||||
|
|
||||||
INTERVAL = int(os.getenv('INTERVAL'))
|
INTERVAL = float(os.getenv('INTERVAL'))
|
||||||
|
|
||||||
### OCR
|
### OCR
|
||||||
IMAGE_CHANGE_THRESHOLD = float(os.getenv('IMAGE_CHANGE_THRESHOLD', 0.75)) # higher values mean more sensitivity to changes in the screen, too high and the screen will constantly refresh
|
IMAGE_CHANGE_THRESHOLD = float(os.getenv('IMAGE_CHANGE_THRESHOLD', 0.75)) # higher values mean more sensitivity to changes in the screen, too high and the screen will constantly refresh
|
||||||
OCR_MODEL = os.getenv('OCR_MODEL', 'easy') # 'easy', 'paddle', 'rapid' ### easy is the most accurate, paddle is the fastest with CUDA and rapid is the fastest with CPU. Rapid has only between Chinese and English unless you add more languages
|
OCR_MODEL = os.getenv('OCR_MODEL', 'easy') # 'easy', 'paddle', 'rapid' ### easy is the most accurate, paddle is the fastest with CUDA and rapid is the fastest with CPU. Rapid has only between Chinese and English unless you add more languages
|
||||||
OCR_USE_GPU = ast.literal_eval(os.getenv('OCR_USE_GPU', 'True'))
|
OCR_USE_GPU = ast.literal_eval(os.getenv('OCR_USE_GPU', 'True'))
|
||||||
|
|
||||||
|
if platform.system() == 'Windows':
|
||||||
|
default_tmp_dir = "C:\\Users\\AppData\\Local\\Temp"
|
||||||
|
elif platform.system() in ['Linux', 'Darwin']:
|
||||||
|
default_tmp_dir = "/tmp"
|
||||||
|
|
||||||
|
TEMP_IMG_DIR = os.getenv('TEMP_IMG_PATH', default_tmp_dir) # where the temporary images are stored
|
||||||
|
|
||||||
### Drawing/Overlay Config
|
### Drawing/Overlay Config
|
||||||
FILL_COLOUR = os.getenv('FILL_COLOUR', 'white')
|
FILL_COLOUR = os.getenv('FILL_COLOUR', 'white')
|
||||||
@ -65,7 +71,7 @@ if TRANSLATION_USE_GPU is False:
|
|||||||
else:
|
else:
|
||||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||||
|
|
||||||
|
TEMP_IMG_PATH = os.path.join(TEMP_IMG_DIR, 'tempP_img91258102.png')
|
||||||
### Just for info
|
### Just for info
|
||||||
|
|
||||||
available_langs = ['ch_sim', 'ch_tra', 'ja', 'ko', 'en'] # there are limitations with the languages that can be used with the OCR models
|
available_langs = ['ch_sim', 'ch_tra', 'ja', 'ko', 'en'] # there are limitations with the languages that can be used with the OCR models
|
||||||
|
|||||||
Binary file not shown.
19
draw.py
19
draw.py
@ -10,20 +10,27 @@ font = ImageFont.truetype(FONT_FILE, FONT_SIZE)
|
|||||||
|
|
||||||
#### CREATE A CLASS LATER so it doesn't have to inherit the same arguments all the way too confusing :| its so ass like this man i had no foresight
|
#### CREATE A CLASS LATER so it doesn't have to inherit the same arguments all the way too confusing :| its so ass like this man i had no foresight
|
||||||
|
|
||||||
def modify_image_bytes(image_bytes: io.BytesIO, ocr_output, translation: list) -> bytes:
|
def modify_image(input: io.BytesIO | str, ocr_output, translation: list) -> bytes:
|
||||||
"""Modify the image bytes with the translated text and return the modified image bytes"""
|
"""Modify the image bytes with the translated text and return the modified image bytes. If it is a path then open directly."""
|
||||||
|
# if input is str, then check if it exists
|
||||||
with io.BytesIO(image_bytes) as byte_stream:
|
if isinstance(input, str):
|
||||||
image = Image.open(byte_stream)
|
image = Image.open(input)
|
||||||
draw = ImageDraw.Draw(image)
|
draw = ImageDraw.Draw(image)
|
||||||
draw_on_image(draw, translation, ocr_output, MAX_TRANSLATE)
|
draw_on_image(draw, translation, ocr_output, MAX_TRANSLATE)
|
||||||
|
elif isinstance(input, io.BytesIO):
|
||||||
|
with io.BytesIO(input) as byte_stream:
|
||||||
|
image = Image.open(byte_stream)
|
||||||
|
draw = ImageDraw.Draw(image)
|
||||||
|
draw_on_image(draw, translation, ocr_output, MAX_TRANSLATE)
|
||||||
|
else:
|
||||||
|
raise TypeError('Incorrect filetype input')
|
||||||
# Save the modified image back to bytes without changing the format
|
# Save the modified image back to bytes without changing the format
|
||||||
with io.BytesIO() as byte_stream:
|
with io.BytesIO() as byte_stream:
|
||||||
image.save(byte_stream, format=image.format) # Save in original format
|
image.save(byte_stream, format=image.format) # Save in original format
|
||||||
modified_image_bytes = byte_stream.getvalue()
|
modified_image_bytes = byte_stream.getvalue()
|
||||||
return modified_image_bytes
|
return modified_image_bytes
|
||||||
|
|
||||||
|
|
||||||
def draw_on_image(draw: ImageDraw, translation: list, ocr_output: list, max_translate: int, draw_mode: str = DRAW_TRANSLATIONS_MODE) -> ImageDraw:
|
def draw_on_image(draw: ImageDraw, translation: list, ocr_output: list, max_translate: int, draw_mode: str = DRAW_TRANSLATIONS_MODE) -> ImageDraw:
|
||||||
"""Draw the original, translated and optionally the romanisation of the texts on the image"""
|
"""Draw the original, translated and optionally the romanisation of the texts on the image"""
|
||||||
translated_number = 0
|
translated_number = 0
|
||||||
|
|||||||
@ -192,13 +192,12 @@ class ApiModel():
|
|||||||
#prompt = f"Without any additional remarks, and without any code, translate the following items of the Python list from {self.from_lang} into {self.target_lang} and output as a Python list ensuring proper escaping of characters and ensuring the length of the list given is exactly equal to the length of the list you provide. Do not output in any other language other than the specified target language: {texts_to_translate}"
|
#prompt = f"Without any additional remarks, and without any code, translate the following items of the Python list from {self.from_lang} into {self.target_lang} and output as a Python list ensuring proper escaping of characters and ensuring the length of the list given is exactly equal to the length of the list you provide. Do not output in any other language other than the specified target language: {texts_to_translate}"
|
||||||
prompt = f"""INSTRUCTIONS:
|
prompt = f"""INSTRUCTIONS:
|
||||||
- Provide ONE and ONLY ONE translation to each text provided in the JSON array given.
|
- Provide ONE and ONLY ONE translation to each text provided in the JSON array given.
|
||||||
|
- Respond using ONLY valid JSON array syntax. Do not use any Python-like dictionary syntax and therefore it must not contain any keys or curly braces.
|
||||||
|
- Do not include explanations or additional text
|
||||||
- The translations must preserve the original order.
|
- The translations must preserve the original order.
|
||||||
- Each translation must be from the Source language to the Target language
|
- Each translation must be from the Source language to the Target language
|
||||||
- Source language: {self.from_lang}
|
- Source language: {self.from_lang}
|
||||||
- Target language: {self.target_lang}
|
- Target language: {self.target_lang}
|
||||||
- Texts are provided in JSON array syntax.
|
|
||||||
- Respond using ONLY valid JSON array syntax.
|
|
||||||
- Do not include explanations or additional text
|
|
||||||
- Escape special characters properly
|
- Escape special characters properly
|
||||||
|
|
||||||
Input texts:
|
Input texts:
|
||||||
@ -212,7 +211,8 @@ Translation:"""
|
|||||||
response_list = ast.literal_eval(response.strip())
|
response_list = ast.literal_eval(response.strip())
|
||||||
logger.debug(repr(self))
|
logger.debug(repr(self))
|
||||||
logger.info(f'{self.model} translated texts from: {texts_to_translate} to {response_list}.')
|
logger.info(f'{self.model} translated texts from: {texts_to_translate} to {response_list}.')
|
||||||
|
if not isinstance(response_list, list):
|
||||||
|
raise TypeError(f"Incorrect response type. Expected list, got {type(response_list)}")
|
||||||
if len(response_list) != len(texts_to_translate) and len(texts_to_translate) <= MAX_TRANSLATE:
|
if len(response_list) != len(texts_to_translate) and len(texts_to_translate) <= MAX_TRANSLATE:
|
||||||
logger.error(f"{self.model} model failed to translate all the texts. Number of translations to make: {len(texts_to_translate)}; Number of translated texts: {len(response_list)}.")
|
logger.error(f"{self.model} model failed to translate all the texts. Number of translations to make: {len(texts_to_translate)}; Number of translated texts: {len(response_list)}.")
|
||||||
if store:
|
if store:
|
||||||
@ -220,7 +220,6 @@ Translation:"""
|
|||||||
else:
|
else:
|
||||||
if store:
|
if store:
|
||||||
self._db_add_translation(texts_to_translate, response_list)
|
self._db_add_translation(texts_to_translate, response_list)
|
||||||
print(response_list)
|
|
||||||
return response_list
|
return response_list
|
||||||
|
|
||||||
class Groq(ApiModel):
|
class Groq(ApiModel):
|
||||||
|
|||||||
@ -86,7 +86,6 @@ def _id_filtered(ocr, image, lang) -> list:
|
|||||||
def _id_lang(ocr, image, lang) -> list:
|
def _id_lang(ocr, image, lang) -> list:
|
||||||
result = _identify(ocr, image)
|
result = _identify(ocr, image)
|
||||||
lang = standardize_lang(lang)['id_model_lang']
|
lang = standardize_lang(lang)['id_model_lang']
|
||||||
print(result)
|
|
||||||
try:
|
try:
|
||||||
filtered = [entry for entry in result if contains_lang(entry[1], lang)]
|
filtered = [entry for entry in result if contains_lang(entry[1], lang)]
|
||||||
except:
|
except:
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import jaconv, MeCab, unidic, pykakasi
|
|||||||
from sklearn.metrics.pairwise import cosine_similarity
|
from sklearn.metrics.pairwise import cosine_similarity
|
||||||
from sklearn.feature_extraction.text import TfidfVectorizer
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
import subprocess
|
||||||
# for creating furigana
|
# for creating furigana
|
||||||
mecab = MeCab.Tagger('-d "{}"'.format(unidic.DICDIR))
|
mecab = MeCab.Tagger('-d "{}"'.format(unidic.DICDIR))
|
||||||
uroman = ur.Uroman()
|
uroman = ur.Uroman()
|
||||||
@ -25,33 +26,26 @@ def intercepts(x,y):
|
|||||||
def is_wayland():
|
def is_wayland():
|
||||||
return 'WAYLAND_DISPLAY' in os.environ
|
return 'WAYLAND_DISPLAY' in os.environ
|
||||||
|
|
||||||
# path to save screenshot of monitor to
|
# please install grim otherwise this is way too slow for wayland
|
||||||
def printsc_wayland(region, save: bool = False, path: str = None):
|
def printsc_wayland(region: tuple, path: str):
|
||||||
if save:
|
subprocess.run(['grim','-g', f'{region[0]},{region[1]} {region[2]-region[0]}x{region[3]-region[1]}', '-t', 'jpeg', '-q','95', path])
|
||||||
im = ImageGrab.grab(bbox=region)
|
|
||||||
im.save(path)
|
|
||||||
else:
|
|
||||||
return ImageGrab.grab(bbox=region)
|
|
||||||
|
|
||||||
|
def printsc_non_wayland(region: tuple, path: str):
|
||||||
def printsc_non_wayland(region, save: bool = False, path: str = None):
|
|
||||||
# use mss to capture the screen
|
# use mss to capture the screen
|
||||||
with mss.mss() as sct:
|
with mss.mss() as sct:
|
||||||
# grab the screen
|
# grab the screen
|
||||||
img = sct.grab(region)
|
img = sct.grab(region)
|
||||||
# convert the image to a PIL image
|
# convert the image to a PIL image
|
||||||
image = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
|
image = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
|
||||||
# save the image if save is True
|
image.save(path)
|
||||||
if save:
|
|
||||||
image.save(path)
|
|
||||||
|
|
||||||
|
|
||||||
def printsc(region, save: bool = False, path: str = None):
|
def printsc(region: tuple, path: str):
|
||||||
try:
|
try:
|
||||||
if is_wayland():
|
if is_wayland():
|
||||||
return printsc_wayland(region, save, path)
|
printsc_wayland(region, path)
|
||||||
else:
|
else:
|
||||||
return printsc_non_wayland(region, save, path)
|
printsc_non_wayland(region, path)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'Error {e}')
|
print(f'Error {e}')
|
||||||
|
|
||||||
@ -187,5 +181,5 @@ def similar_tfidf(list1,list2,threshold) -> float:
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Example usage
|
# Example usage
|
||||||
japanesetext = "本が好きにちは"
|
x = printsc_wayland((0,0,1920,1080), save = False)
|
||||||
print(add_furigana(japanesetext))
|
print(x)
|
||||||
@ -64,5 +64,5 @@ def setup_logger(
|
|||||||
print(f"Failed to setup logger: {e}")
|
print(f"Failed to setup logger: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
logger = setup_logger('on_screen_translator', log_file='translate.log', level=logging.DEBUG)
|
logger = setup_logger('on_screen_translator', log_file='translate.log', level=logging.INFO)
|
||||||
|
|
||||||
|
|||||||
22
main.py
22
main.py
@ -5,12 +5,12 @@ import os, time, sys, threading, subprocess
|
|||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'helpers'))
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'helpers'))
|
||||||
|
|
||||||
from translation import translate_Seq_LLM, translate_API_LLM, init_API_LLM, init_Seq_LLM
|
from translation import translate_Seq_LLM, translate_API_LLM, init_API_LLM, init_Seq_LLM
|
||||||
from utils import printsc, convert_image_to_bytes, bytes_to_image, similar_tfidf
|
from utils import printsc, convert_image_to_bytes, bytes_to_image, similar_tfidf, is_wayland
|
||||||
from ocr import get_words, init_OCR, id_keep_source_lang
|
from ocr import get_words, init_OCR, id_keep_source_lang
|
||||||
from data import Base, engine, create_tables
|
from data import Base, engine, create_tables
|
||||||
from draw import modify_image_bytes
|
from draw import modify_image
|
||||||
import config, asyncio
|
import config, asyncio
|
||||||
from config import SOURCE_LANG, TARGET_LANG, OCR_MODEL, OCR_USE_GPU, LOCAL_FILES_ONLY, REGION, INTERVAL, MAX_TRANSLATE, TRANSLATION_MODEL, IMAGE_CHANGE_THRESHOLD
|
from config import SOURCE_LANG, TARGET_LANG, OCR_MODEL, OCR_USE_GPU, LOCAL_FILES_ONLY, REGION, INTERVAL, MAX_TRANSLATE, TRANSLATION_MODEL, IMAGE_CHANGE_THRESHOLD, TEMP_IMG_PATH
|
||||||
from logging_config import logger
|
from logging_config import logger
|
||||||
import web_app
|
import web_app
|
||||||
import view_buffer_app
|
import view_buffer_app
|
||||||
@ -40,16 +40,15 @@ async def main():
|
|||||||
# try:
|
# try:
|
||||||
while True:
|
while True:
|
||||||
logger.debug("Capturing screen")
|
logger.debug("Capturing screen")
|
||||||
untranslated_image = printsc(REGION)
|
printsc(REGION, TEMP_IMG_PATH)
|
||||||
logger.debug(f"Screen Captured. Proceeding to perform OCR.")
|
logger.debug(f"Screen Captured. Proceeding to perform OCR.")
|
||||||
byte_image = convert_image_to_bytes(untranslated_image)
|
ocr_output = id_keep_source_lang(ocr, TEMP_IMG_PATH, SOURCE_LANG) # keep only phrases containing the source language
|
||||||
ocr_output = id_keep_source_lang(ocr, byte_image, SOURCE_LANG) # keep only phrases containing the source language
|
|
||||||
logger.debug(f"OCR completed. Detected {len(ocr_output)} phrases.")
|
logger.debug(f"OCR completed. Detected {len(ocr_output)} phrases.")
|
||||||
if runs == 0:
|
if runs == 0:
|
||||||
logger.info('Initial run')
|
logger.info('Initial run')
|
||||||
prev_words = set()
|
prev_words = set()
|
||||||
else:
|
else:
|
||||||
logger.info(f'Run number: {runs}.')
|
logger.debug(f'Run number: {runs}.')
|
||||||
runs += 1
|
runs += 1
|
||||||
|
|
||||||
curr_words = set(get_words(ocr_output))
|
curr_words = set(get_words(ocr_output))
|
||||||
@ -67,18 +66,15 @@ async def main():
|
|||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
continue
|
continue
|
||||||
logger.debug('Translation complete. Modifying image.')
|
logger.debug('Translation complete. Modifying image.')
|
||||||
translated_image = modify_image_bytes(byte_image, ocr_output, translation)
|
translated_image = modify_image(TEMP_IMG_PATH, ocr_output, translation)
|
||||||
# view_buffer_app.show_buffer_image(translated_image, label)
|
# view_buffer_app.show_buffer_image(translated_image, label)
|
||||||
web_app.latest_image = bytes_to_image(translated_image)
|
web_app.latest_image = bytes_to_image(translated_image)
|
||||||
logger.debug("Image modified. Saving image.")
|
logger.debug("Image modified. Saving image.")
|
||||||
# web_app.latest_image.save('/home/James/Pictures/translated.png') # home use
|
|
||||||
# logger.debug("Image saved.")
|
|
||||||
prev_words = curr_words
|
prev_words = curr_words
|
||||||
else:
|
else:
|
||||||
logger.info("Skipping translation. No significant change in the screen detected.")
|
logger.info(f"Skipping translation. No significant change in the screen detected. Total translation attempts so far: {runs}.")
|
||||||
logger.debug("Continuing to next iteration.")
|
logger.debug("Continuing to next iteration.")
|
||||||
# logger.debug(f'Sleeping for {INTERVAL} seconds')
|
time.sleep(INTERVAL)
|
||||||
asyncio.sleep(INTERVAL)
|
|
||||||
# finally:
|
# finally:
|
||||||
# label.close()
|
# label.close()
|
||||||
# app.quit()
|
# app.quit()
|
||||||
|
|||||||
@ -17,7 +17,7 @@
|
|||||||
setInterval(function () {
|
setInterval(function () {
|
||||||
document.getElementById("live-image").src =
|
document.getElementById("live-image").src =
|
||||||
"/image?" + new Date().getTime();
|
"/image?" + new Date().getTime();
|
||||||
}, 2500); // Update every 2.5 seconds. Beware that if the image fails to reload on time, the browser will continuously refresh without being able to display the images.
|
}, 1500); // Update every 2.5 seconds. Beware that if the image fails to reload on time, the browser will continuously refresh without being able to display the images.
|
||||||
</script>
|
</script>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user