From ee4b3ed43e5472342e9e897ed65d284b71db740c Mon Sep 17 00:00:00 2001 From: chickenflyshigh Date: Fri, 1 Nov 2024 15:18:25 +1100 Subject: [PATCH] Initial commit --- .gitignore | 5 ++ README.md | 0 chinese_to_eng.py | 132 +++++++++++++++++++++++++++++++++++ draw_translation.py | 153 ++++++++++++++++++++++++++++++++++++++++ helpers/__init__.py | 0 helpers/draw.py | 88 +++++++++++++++++++++++ helpers/ocr.py | 87 +++++++++++++++++++++++ helpers/translation.py | 76 ++++++++++++++++++++ helpers/utils.py | 155 +++++++++++++++++++++++++++++++++++++++++ logging_config.py | 36 ++++++++++ main.py | 39 +++++++++++ templates/index.html | 23 ++++++ 12 files changed, 794 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 chinese_to_eng.py create mode 100755 draw_translation.py create mode 100644 helpers/__init__.py create mode 100644 helpers/draw.py create mode 100644 helpers/ocr.py create mode 100644 helpers/translation.py create mode 100644 helpers/utils.py create mode 100644 logging_config.py create mode 100644 main.py create mode 100644 templates/index.html diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..612f423 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.ipynb +*.log +translate/ +__pycache__/ +.* diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/chinese_to_eng.py b/chinese_to_eng.py new file mode 100644 index 0000000..872b1fb --- /dev/null +++ b/chinese_to_eng.py @@ -0,0 +1,132 @@ +################################################################################### +##### IMPORT LIBRARIES ##### +import os, time, logging, ast +from helpers.translation import init_TRANSLATE, translate +from helpers.utils import intercepts, contains_lang, printsc, romanize, convert_image_to_bytes, bytes_to_image +from helpers.ocr import id_filtered, id_lang, get_words, get_positions, get_confidences, init_OCR +from logging_config import setup_logger +from helpers.draw import modify_image_bytes +################################################################################### + +#### LOGGING #### +setup_logger('chinese_to_eng', log_file='chinese_to_eng.log') + +################################################################################### +##### Variables to edit ##### + + +INTERVAL = int(os.getenv('INTERVAL')) + +### available languages: 'ch_sim', 'ch_tra', 'ja', 'ko', 'en' +SOURCE_LANG = os.getenv('SOURCE_LANG', 'ja') +TARGET_LANG = os.getenv('TARGET_LANG', 'en') + +### Translation +TRANSLATION_MODEL = os.getenv('TRANSLATION_MODEL', 'opus') # 'opus' or 'm2m' # opus is a lot more lightweight +MAX_TRANSLATE = 200 + +### OCR +OCR_USE_GPU = ast.literal_eval(os.getenv('OCR_USE_GPU', 'True')) +OCR_MODEL = os.getenv('OCR_MODEL', 'easy') # 'easy', 'paddle', 'rapid' ### easy is the most accurate, paddle is the fastest with CUDA and rapid is the fastest with CPU + +REGION = ast.literal_eval(os.getenv('REGION','(0,0,2560,1440)')) +################################################################################### + + +OCR_LANGUAGES = [SOURCE_LANG, TARGET_LANG, 'en'] +latest_image = None + +def main(): + global latest_image + # screenshot + untranslated_image = printsc(REGION) + byte_image = convert_image_to_bytes(untranslated_image) + + ################################################################################### + ##### Initialize the OCR ##### + ocr = init_OCR(model=OCR_MODEL, ocr_languages = OCR_LANGUAGES, use_GPU=OCR_USE_GPU) + ocr_output = id_lang(ocr, byte_image, 'ja') + curr_words = set(get_words(ocr_output)) + prev_words = set() + + ##### Initialize the translation ##### + init_TRANSLATE() + ################################################################################### + + while True: + print('Running') + if prev_words != curr_words: + print('Translating') + to_translate = [entry[1] for entry in ocr_output][:MAX_TRANSLATE] + translation = translate(to_translate, from_lang, target_lang) + print(translation) + translated_image = modify_image_bytes(byte_image, ocr_output, translation) + latest_image = bytes_to_image(translated_image) + prev_words = curr_words + logging.info(f"Successfully translated image. Prev words are:\n{prev_words}") + else: + logging.info("The image has remained the same.") + # torch.cuda.empty_cache() + logging.info(f'Sleeping for {INTERVAL} seconds') + time.sleep(INTERVAL) + + untranslated_image = printsc(REGION) + byte_image = convert_image_to_bytes(untranslated_image) + ocr_output = id_lang(ocr, byte_image, 'ja') + curr_words = set(get_words(ocr_output)) + logging.info(f'Curr words to translate are:\n{curr_words}') + +if __name__ == "__main__": + main() + + +# image = Image.open(SCREENSHOT_PATH) +# draw = ImageDraw.Draw(image) + +# # set counter for limiting the number of translations +# translated_number = 0 +# bounding_boxes = [] +# for i, (position,words,confidence) in enumerate(ocr_output): +# if translated_number >= MAX_TRANSLATE: +# break +# # try: +# top_left, _, _, _ = position +# position = (top_left[0], top_left[1] - 60) +# text_content = f"{translation[i]}\n{romanize(words)}\n{words}" +# lines = text_content.split('\n') +# x,y = position + +# max_width = 0 +# total_height = 0 +# line_spacing = 3 +# line_height = FONT_SIZE + +# for line in lines: +# bbox = draw.textbbox(position, line, font=font) +# line_width, _ = bbox[2] - bbox[0], bbox[3] - bbox[1] +# max_width = max(max_width, line_width) +# total_height += line_height + line_spacing + +# bounding_box = (x, y, x + max_width, y + total_height, words) +# print(f"Bounding Box of Interest: {bounding_box}") + +# y = np.max([y,0]) +# if len(bounding_boxes) > 0: +# for box in bounding_boxes: +# print(f'Investigating box: {box}') +# if intercepts((box[0],box[2]),(bounding_box[0],bounding_box[2])) and intercepts((box[1],box[3]),(y, y+total_height)): +# print(f'Overlapping change adjustment to {words}') +# y = np.max([y,box[3]]) + line_spacing +# print(y, box[3]) +# print(f'Changed to {(x,y, x+max_width, y+total_height, words)}') +# adjusted_bounding_box = (x, y, x + max_width, y + total_height, words) +# bounding_boxes.append(adjusted_bounding_box) +# draw.rectangle([(x,y), (x+max_width, y+total_height)], outline="black", width=1) +# position = (x,y) +# for line in lines: +# draw.text(position, line, fill= TEXT_COLOR, font=font) +# y += FONT_SIZE + line_spacing +# position = (x,y) +# print("Adjusted_bounding_box:",adjusted_bounding_box) +# print('\n') +# translated_number += 1 \ No newline at end of file diff --git a/draw_translation.py b/draw_translation.py new file mode 100755 index 0000000..071c5d1 --- /dev/null +++ b/draw_translation.py @@ -0,0 +1,153 @@ +import easyocr +from pypinyin import pinyin +from PIL import Image, ImageDraw, ImageFont +import os, time, logging, torch, subprocess +from helpers.translation import init_M2M, translate_M2M +import langid +import numpy as np + +##### Variables to edit + +text_color = "#ff0000" +font_file = "/home/James/.local/share/fonts/Arial-Unicode-Bold.ttf" +font_size = 16 + +pyin = True # whether to add pinyin or not +max_translate = 100 + +# for detecting language to filter out other languages. Only writes the text when it is detected to be src_lang +src_lang = "zh" +tgt_lang = "en" +# af, am, an, ar, as, az, be, bg, bn, br, bs, ca, cs, cy, da, de, dz, el, en, eo, es, et, eu, fa, fi, fo, fr, ga, gl, gu, he, hi, hr, ht, hu, hy, id, is, it, ja, jv, ka, kk, km, kn, ko, ku, ky, la, lb, lo, lt, lv, mg, mk, ml, mn, mr, ms, mt, nb, ne, nl, nn, no, oc, or, pa, pl, ps, pt, qu, ro, ru, rw, se, si, sk, sl, sq, sr, sv, sw, ta, te, th, tl, tr, ug, uk, ur, vi, vo, wa, xh, zh, zu +langid.set_languages([src_lang,tgt_lang,'en']) + +# for translator (M2M100) +from_lang = "zh" +target_lang = "en" + +# Afrikaans (af), Amharic (am), Arabic (ar), Asturian (ast), Azerbaijani (az), Bashkir (ba), Belarusian (be), Bulgarian (bg), Bengali (bn), Breton (br), Bosnian (bs), Catalan; Valencian (ca), Cebuano (ceb), Czech (cs), Welsh (cy), Danish (da), German (de), Greeek (el), English (en), Spanish (es), Estonian (et), Persian (fa), Fulah (ff), Finnish (fi), French (fr), Western Frisian (fy), Irish (ga), Gaelic; Scottish Gaelic (gd), Galician (gl), Gujarati (gu), Hausa (ha), Hebrew (he), Hindi (hi), Croatian (hr), Haitian; Haitian Creole (ht), Hungarian (hu), Armenian (hy), Indonesian (id), Igbo (ig), Iloko (ilo), Icelandic (is), Italian (it), Japanese (ja), Javanese (jv), Georgian (ka), Kazakh (kk), Central Khmer (km), Kannada (kn), Korean (ko), Luxembourgish; Letzeburgesch (lb), Ganda (lg), Lingala (ln), Lao (lo), Lithuanian (lt), Latvian (lv), Malagasy (mg), Macedonian (mk), Malayalam (ml), Mongolian (mn), Marathi (mr), Malay (ms), Burmese (my), Nepali (ne), Dutch; Flemish (nl), Norwegian (no), Northern Sotho (ns), Occitan (post 1500) (oc), Oriya (or), Panjabi; Punjabi (pa), Polish (pl), Pushto; Pashto (ps), Portuguese (pt), Romanian; Moldavian; Moldovan (ro), Russian (ru), Sindhi (sd), Sinhala; Sinhalese (si), Slovak (sk), Slovenian (sl), Somali (so), Albanian (sq), Serbian (sr), Swati (ss), Sundanese (su), Swedish (sv), Swahili (sw), Tamil (ta), Thai (th), Tagalog (tl), Tswana (tn), Turkish (tr), Ukrainian (uk), Urdu (ur), Uzbek (uz), Vietnamese (vi), Wolof (wo), Xhosa (xh), Yiddish (yi), Yoruba (yo), Chinese (zh), Zulu (zu) + +# for easyOCR +OCR_languages = ['ch_sim','en'] # languages to recognise +# https://www.jaided.ai/easyocr/ + +log_directory = '/var/log/ocr' +printsc = lambda x: subprocess.run(f"grim -t png -o DP-1 -l 0 {x}", shell=True) + +# Configure the logger +os.makedirs(log_directory, exist_ok=True) + +logging.basicConfig( + filename=os.path.join(log_directory, 'ocr.log'), + level=logging.DEBUG, # Set the logging level + format='%(asctime)s - %(message)s', # Define the format for logging + datefmt='%Y-%m-%d %H:%M:%S' # Define the date format +) + +# screenshot +printsc(image_old) +time.sleep(1) + +# EasyOCR +reader = easyocr.Reader(OCR_languages) # this needs to run only once to load the model into memory + +def results(): + result = reader.readtext(image_old) + results_no_eng = [entry for entry in result if langid.classify(entry[1])[0] == src_lang] + return results_no_eng + +# result is a list of tuples with the following structure: +# (top_left, top_right, bottom_right, bottom_left, text, confidence) +# top_left, top_right, bottom_right, bottom_left are the coordinates of the bounding box +ocr_output = results() +curr_words = set(entry[1] for entry in ocr_output) +prev_words = set() + +# translator = GoogleTranslator(source=from_language, target=target_language) + +font = ImageFont.truetype(font_file, font_size) + +# define a function for checking whether one axis of a shape intercepts with another +def intercepts(x,y): + # both x and y are two dimensional tuples representing the ends of a line on one dimension. + x1, x2 = x + y1, y2 = y + return (x1 <= y1 <= x2) or (x1 <= y2 <= x2) or (y1 <= x1 <= y2) or (y1 <= x2 <= y2) + +while True: + print('Running') + if prev_words != curr_words: + print('Translating') + image = Image.open(image_old) + draw = ImageDraw.Draw(image) + to_translate = [entry[1] for entry in ocr_output][:max_translate] + translation = translate_M2M(to_translate, from_lang = from_lang, target_lang = target_lang) + # set counter for limiting the number of translations + translated_number = 0 + bounding_boxes = [] + for i, (position,words,confidence) in enumerate(ocr_output): + if translated_number >= max_translate: + break + word = translation[i] + # try: + top_left, _, _, _ = position + position = (top_left[0], top_left[1] - 60) + if pyin: + py = ' '.join([ py[0] for py in pinyin(words)]) + text_content = f"{translation[i]}\n{py}\n{words}" + else: + text_content = f"{translation[i]}\n{words}" + lines = text_content.split('\n') + x,y = position + + max_width = 0 + total_height = 0 + line_spacing = 3 + line_height = font_size + + for line in lines: + bbox = draw.textbbox(position, line, font=font) + line_width, _ = bbox[2] - bbox[0], bbox[3] - bbox[1] + max_width = max(max_width, line_width) + total_height += line_height + line_spacing + + bounding_box = (x, y, x + max_width, y + total_height, words) + print(f"Bounding Box of Interest: {bounding_box}") + + y = np.max([y,0]) + if len(bounding_boxes) > 0: + for box in bounding_boxes: + print(f'Investigating box: {box}') + if intercepts((box[0],box[2]),(bounding_box[0],bounding_box[2])) and intercepts((box[1],box[3]),(y, y+total_height)): + print(f'Overlapping change adjustment to {words}') + y = np.max([y,box[3]]) + line_spacing + print(y, box[3]) + print(f'Changed to {(x,y, x+max_width, y+total_height, words)}') + adjusted_bounding_box = (x, y, x + max_width, y + total_height, words) + bounding_boxes.append(adjusted_bounding_box) + draw.rectangle([(x,y), (x+max_width, y+total_height)], outline="black", width=1) + position = (x,y) + for line in lines: + draw.text(position, line, fill= text_color, font=font) + y += font_size + line_spacing + position = (x,y) + print("Adjusted_bounding_box:",adjusted_bounding_box) + print('\n') + # except Exception as e: + # logging.error(e) + translated_number += 1 + image.save(image_new) + logging.info(f"Saved the image to {image_new}") + prev_words = curr_words + logging.info(f"Successfully translated image. Prev words are:\n{prev_words}") + else: + logging.info("The image has remained the same.") + torch.cuda.empty_cache() + print('Sleeping') + time.sleep(10) + + printsc(image_old) + ocr_output = results() + curr_words = set(entry[1] for entry in ocr_output) + logging.info(f'Curr words are:\n{curr_words}') + diff --git a/helpers/__init__.py b/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/helpers/draw.py b/helpers/draw.py new file mode 100644 index 0000000..130144d --- /dev/null +++ b/helpers/draw.py @@ -0,0 +1,88 @@ +from PIL import Image, ImageDraw, ImageFont +from dotenv import load_dotenv +import os +import io +import numpy as np +import ast +from helpers.utils import romanize, intercepts +load_dotenv() + +MAX_TRANSLATE = int(os.getenv('MAX_TRANSLATION', 200)) +FONT_FILE = os.getenv('FONT_FILE') +FONT_SIZE = int(os.getenv('FONT_SIZE', 16)) +LINE_SPACING = int(os.getenv('LINE_SPACING', 3)) +TEXT_COLOR = os.getenv('TEXT_COLOR', "#ff0000") +LINE_HEIGHT = FONT_SIZE +TO_ROMANIZE = ast.literal_eval(os.getenv('TO_ROMANIZE', 'True')) +font = ImageFont.truetype(FONT_FILE, FONT_SIZE) + + + +def modify_image_bytes(image_bytes: io.BytesIO, ocr_output, translation: list) -> bytes: + # Load the image from bytes + with io.BytesIO(image_bytes) as byte_stream: + image = Image.open(byte_stream) + draw = ImageDraw.Draw(image) + translate_image(draw, translation, ocr_output, MAX_TRANSLATE) + + # Save the modified image back to bytes without changing the format + with io.BytesIO() as byte_stream: + image.save(byte_stream, format=image.format) # Save in original format + modified_image_bytes = byte_stream.getvalue() + + return modified_image_bytes + +def translate_image(draw: ImageDraw, translation: list, ocr_output: list, max_translate: int) -> ImageDraw: + translation + translated_number = 0 + bounding_boxes = [] + for i, (position, untranslated_phrase, confidence) in enumerate(ocr_output): + if translated_number >= max_translate: + break + translate_one_phrase(draw, translation[i], position, bounding_boxes, untranslated_phrase) + translated_number += 1 + return draw + +def translate_one_phrase(draw: ImageDraw, translated_phrase: str, position: tuple, bounding_boxes: list, untranslated_phrase: str) -> ImageDraw: + # Draw the bounding box + top_left, _, _, _ = position + position = (top_left[0], top_left[1] - 60) + text_content = f"{translated_phrase}\n{romanize(untranslated_phrase, TO_ROMANIZE)}\n{untranslated_phrase}" + lines = text_content.split('\n') + x,y = position + max_width = 0 + total_height = 0 + total_height = len(lines) * (LINE_HEIGHT + LINE_SPACING) + for line in lines: + bbox = draw.textbbox(position, line, font=font) + line_width = bbox[2] - bbox[0] + max_width = max(max_width, line_width) + bounding_box = (x, y, x + max_width, y + total_height, untranslated_phrase) + print(f"Bounding Box of Interest: {bounding_box}") + + adjust_if_intersects(x, y, bounding_box, bounding_boxes, untranslated_phrase, max_width, total_height) + adjusted_x, adjusted_y, adjusted_max_x, adjusted_max_y, _ = bounding_boxes[-1] + draw.rectangle([(adjusted_x,adjusted_y), (adjusted_max_x, adjusted_max_y)], outline="black", width=1) + position = (adjusted_x,adjusted_y) + for line in lines: + draw.text(position, line, fill= TEXT_COLOR, font=font) + adjusted_y += FONT_SIZE + LINE_SPACING + position = (adjusted_x,adjusted_y) + print(f"Adjusted_bounding_box: {bounding_box[-1]}.\n") + +def adjust_if_intersects(x: int, y: int, bounding_box: tuple, bounding_boxes: list, untranslated_phrase: str, max_width: int, total_height: int) -> tuple: + y = np.max([y,0]) + if len(bounding_boxes) > 0: + for box in bounding_boxes: + print(f'Investigating box: {box}') + if intercepts((box[0],box[2]),(bounding_box[0],bounding_box[2])) and intercepts((box[1],box[3]),(y, y+total_height)): + print(f'Overlapping change adjustment to {untranslated_phrase}') + y = np.max([y,box[3]]) + LINE_SPACING + print(y, box[3]) + print(f'Changed to {(x,y, x+max_width, y+total_height, untranslated_phrase)}') + adjusted_bounding_box = (x, y, x + max_width, y + total_height, untranslated_phrase) + bounding_boxes.append(adjusted_bounding_box) + return adjusted_bounding_box + + + diff --git a/helpers/ocr.py b/helpers/ocr.py new file mode 100644 index 0000000..d1de989 --- /dev/null +++ b/helpers/ocr.py @@ -0,0 +1,87 @@ +from paddleocr import PaddleOCR +import easyocr +from rapidocr_onnxruntime import RapidOCR +import langid +from helpers.utils import contains_lang +from concurrent.futures import ThreadPoolExecutor +# PaddleOCR +# Paddleocr supports Chinese, English, French, German, Korean and Japanese. +# You can set the parameter `lang` as `ch`, `en`, `fr`, `german`, `korean`, `japan` +# to switch the language model in order. +# need to run only once to download and load model into memory + +def _paddle_init(lang='ch', use_angle_cls=False, use_GPU=True): + return PaddleOCR(use_angle_cls=use_angle_cls, lang=lang, use_GPU=use_GPU) + + +def _paddle_ocr(ocr, image) -> list: + ### return a list containing the bounding box, text and confidence of the detected text + result = ocr.ocr(image, cls=False)[0] + if not isinstance(result, list): + return [] + result = [ (pos, text[0], text[1]) for pos, text in result] + return result + +# EasyOCR has support for many languages + +def _easy_init(ocr_languages: list, use_GPU=True): + return easyocr.Reader(ocr_languages, gpu=use_GPU) + +def _easy_ocr(ocr,image) -> list: + return ocr.readtext(image) + +# RapidOCR mostly for mandarin and some other asian languages + +def _rapid_init(use_GPU=True): + return RapidOCR(use_gpu=use_GPU) + +def _rapid_ocr(ocr, image) -> list: + return ocr(image) + +### Initialize the OCR model +def init_OCR(model='paddle', **kwargs): + if model == 'paddle': + return _paddle_init(**kwargs) + elif model == 'easy': + return _easy_init(**kwargs) + elif model == 'rapid': + return _rapid_init(**kwargs) + +### Perform OCR on the image +def identify(ocr, image) -> list: + if isinstance(ocr, PaddleOCR): + return _paddle_ocr(ocr, image) + elif isinstance(ocr, easyocr.Reader): + return _easy_ocr(ocr, image) + elif isinstance(ocr, RapidOCR): + return _rapid_ocr(ocr, image) + else: + raise ValueError("Invalid OCR model. Please initialise the OCR model first with init() and pass it as an argument to identify().") + + +### Filter out the results that are not in the source language +def id_filtered(ocr, image, lang) -> list: + result = identify(ocr, image) + + ### Parallelise since langid is slow + def classify_text(entry): + return entry if langid.classify(entry[1])[0] == lang else None + with ThreadPoolExecutor() as executor: + results_no_eng = list(filter(None, executor.map(classify_text, result))) + return results_no_eng + + +# zh, ja, ko +def id_lang(ocr, image, lang) -> list: + result = identify(ocr, image) + filtered = [entry for entry in result if contains_lang(entry[1], lang)] + return filtered + +def get_words(ocr_output) -> list: + return [entry[1] for entry in ocr_output] + +def get_positions(ocr_output) -> list: + return [entry[0] for entry in ocr_output] + +def get_confidences(ocr_output) -> list: + return [entry[2] for entry in ocr_output] diff --git a/helpers/translation.py b/helpers/translation.py new file mode 100644 index 0000000..ccf15e9 --- /dev/null +++ b/helpers/translation.py @@ -0,0 +1,76 @@ +from transformers import M2M100Tokenizer, M2M100ForConditionalGeneration, AutoTokenizer, AutoModelForSeq2SeqLM, GPTQConfig +import torch, os +from dotenv import load_dotenv +load_dotenv() + +if os.getenv('TRANSLATION_USE_GPU') in ['False', '0', 'false', 'no', 'No', 'NO', 'FALSE']: + device = torch.device("cpu") +else: + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + +### Batch translate a list of strings + + +# M2M100 model + + +def init_M2M(): + global tokenizer, model + tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_418M", local_files_only=True) + model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_418M", local_files_only=True, torch_dtype=torch.float16).to(device) + model.eval() + + + +def translate_M2M(text, from_lang = 'zh', target_lang = 'en'): + if len(text) == 0: + return [] + tokenizer.src_lang = from_lang + with torch.no_grad(): + encoded = tokenizer(text, return_tensors="pt", padding=True).to(device) + generated_tokens = model.generate(**encoded, + forced_bos_token_id=tokenizer.get_lang_id(target_lang)) + translated = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True) + return translated + + +# Helsinki-NLP model Opus MT + + +def init_OPUS(): + global tokenizer, model + tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-tc-bible-big-zhx-en", local_files_only=True) + model = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-tc-bible-big-zhx-en", local_files_only=True, torch_dtype=torch.float16).to(device) + model.eval() + +def translate_OPUS(text: list[str]) -> list[str]: + if len(text) == 0: + return [] + with torch.no_grad(): + encoded = tokenizer(text, return_tensors="pt", padding=True).to(device) + generated_tokens = model.generate(**encoded) + translated = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True) + return translated + +### +def init_TRANSLATE(model): # model = 'opus' or 'm2m' + if model == 'opus': + init_OPUS() + elif model == 'm2m': + init_M2M() + else: + raise ValueError("Invalid model. Please use 'opus' or 'm2m'.") + + +### +def translate(text, model, **kwargs): + if model == 'opus': + return translate_OPUS(text) + elif model == 'm2m': + try: + return translate_M2M(text, **kwargs) + except: + raise ValueError("Please provide the from_lang and target_lang variables if you are using the M2M model.") + + else: + raise ValueError("Invalid model. Please use 'opus' or 'm2m'.") \ No newline at end of file diff --git a/helpers/utils.py b/helpers/utils.py new file mode 100644 index 0000000..64826b8 --- /dev/null +++ b/helpers/utils.py @@ -0,0 +1,155 @@ +import re, uroman as ur +from pypinyin import pinyin +import pyscreenshot as ImageGrab # wayland tings not sure if it will work on other machines alternatively use mss +import mss, io, os +from PIL import Image +import jaconv +import MeCab +import unidic +mecab = MeCab.Tagger('-d "{}"'.format(unidic.DICDIR)) +uroman = ur.Uroman() + + +# define a function for checking whether one axis of a shape intercepts with another +def intercepts(x,y): + # both x and y are two dimensional tuples representing the ends of a line on one dimension. + x1, x2 = x + y1, y2 = y + return (x1 <= y1 <= x2) or (x1 <= y2 <= x2) or (y1 <= x1 <= y2) or (y1 <= x2 <= y2) + +def is_wayland(): + return 'WAYLAND_DISPLAY' in os.environ + +# path to save screenshot of monitor to +def printsc_wayland(region, save: bool = False, path: str = None): + if save: + im = ImageGrab.grab(bbox=region) + im.save(path) + else: + return ImageGrab.grab(bbox=region) + + +def printsc_non_wayland(region, save: bool = False, path: str = None): + # use mss to capture the screen + with mss.mss() as sct: + # grab the screen + img = sct.grab(region) + # convert the image to a PIL image + image = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX") + # save the image if save is True + if save: + image.save(path) + + +def printsc(region, save: bool = False, path: str = None): + try: + if is_wayland(): + return printsc_wayland(region, save, path) + else: + return printsc_non_wayland(region, save, path) + except Exception as e: + print(f'Error {e}') + +def convert_image_to_bytes(img): + with io.BytesIO() as byte_stream: + img.save(byte_stream, format='PNG') # Save the image to the byte stream + return byte_stream.getvalue() # Get the byte representation + + + +def bytes_to_image(image_bytes): + # Load the image from bytes + byte_stream = io.BytesIO(image_bytes) + # Open the image from the BytesIO stream + image = Image.open(byte_stream) + return image + +# for japanese + +def add_furigana(text): + parsed = mecab.parse(text).split('\n')[:-2] + furigana_string = '' + for i in parsed: + words = i.split('\t')[0] + add = f'({jaconv.kata2hira(i.split(',')[6])})' + to_add = add if contains_kanji(words) else '' + furigana_string += i.split('\t')[0] + to_add + return furigana_string + + +def contains_kanji(text): + return bool(re.search(r'[\u4E00-\u9FFF]', text)) + +def contains_hiragana(text): + return bool(re.search(r'[\u3040-\u309F]', text)) + +def contains_katakana(text): + return bool(re.search(r'[\u30A0-\u30FF]', text)) + + + +def romanize(text, piny=False): + if piny: + return ' '.join([ py[0] for py in pinyin(text, heteronym=True)]) + return uroman.romanize_string(text) + +# check if a string contains words from a language +def contains_lang(text, lang): + # Matches any character in the Unicode range of the language + if lang == 'zh': + return bool(re.search(r'[\u4e00-\u9fff]', text)) + elif lang == 'ja': + return bool(re.search(r'[\u3040-\u30ff]', text)) + elif lang == 'ko': + return bool(re.search(r'[\uac00-\ud7af]', text)) + elif lang == 'en': + return bool(re.search(r'[a-zA-Z]', text)) + else: + raise ValueError("Invalid language. Please use one of 'en', 'zh', 'ja', or 'ko'.") + +### en, ch_sim, ja, ko rapidocr only has chinese and en at the moment +def standardize_lang(lang): + if lang == 'ch_sim': + easyocr_lang = 'ch_sim' + paddleocr_lang = 'ch' + rapidocr_lang = 'ch' + translation_model_lang = 'zh' + elif lang == 'ch_tra': + easyocr_lang = 'ch_tra' + paddleocr_lang = 'ch' + rapidocr_lang = 'ch' + translation_model_lang = 'zh' + elif lang == 'ja': + easyocr_lang = 'ja' + paddleocr_lang = 'ja' + rapidocr_lang = 'ja' + translation_model_lang = 'ja' + elif lang == 'ko': + easyocr_lang = 'korean' + paddleocr_lang = 'ko' + rapidocr_lang = 'ko' + translation_model_lang = 'ko' + elif lang == 'en': + easyocr_lang = 'en' + paddleocr_lang = 'en' + rapidocr_lang = 'en' + translation_model_lang = 'en' + else: + raise ValueError("Invalid language. Please use one of 'en', 'ch_sim', 'ch_tra', 'ja', or 'ko'.") + return {'easyocr_lang': easyocr_lang, 'paddleocr_lang': paddleocr_lang, 'rapidocr_lang': rapidocr_lang, 'translation_model_lang': translation_model_lang} + +def which_ocr_lang(model): + if model == 'easy': + return 'easyocr_lang' + elif model == 'paddle': + return 'paddleocr_lang' + elif model == 'rapid': + return 'rapidocr_lang' + else: + raise ValueError("Invalid OCR model. Please use one of 'easy', 'paddle', or 'rapid'.") + + +if __name__ == "__main__": + # Example usage + japanesetext = "本が好きにちは" + print(add_furigana(japanesetext)) \ No newline at end of file diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..6d52b6b --- /dev/null +++ b/logging_config.py @@ -0,0 +1,36 @@ +import logging, os +from dotenv import load_dotenv +load_dotenv() + +# Configure the logger + +def setup_logger(name: str, log_file: str = None, level: int = logging.INFO) -> logging.Logger: + """Set up a logger with the specified name and level.""" + if log_file is None: + log_file = f"{name}.log" + + # Create a logger + logger = logging.getLogger(name) + logger.setLevel(level) + + # Create file handler + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(level) + + # Create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + + # Create a formatter and set it for both handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + + # Add handlers to the logger + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger + diff --git a/main.py b/main.py new file mode 100644 index 0000000..2ef0b70 --- /dev/null +++ b/main.py @@ -0,0 +1,39 @@ +from flask import Flask, Response, render_template +import time +import threading +from PIL import Image +import io +import chinese_to_eng +app = Flask(__name__) + +# Global variable to hold the current image +def curr_image(): + return chinese_to_eng.latest_image + +@app.route('/') +def index(): + return render_template('index.html') + + +@app.route('/image') +def stream_image(): + if curr_image() is None: + return "No image generated yet.", 503 + print('streaming') + print(curr_image()) + file_object = io.BytesIO() + curr_image().save(file_object, 'PNG') + file_object.seek(0) + response = Response(file_object.getvalue(), mimetype='image/png') + response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' # HTTP 1.1 + response.headers['Pragma'] = 'no-cache' # HTTP 1.0 + response.headers['Expires'] = '0' # Proxies + + return response + +if __name__ == '__main__': + # Start the image updating thread + threading.Thread(target=chinese_to_eng.main, daemon=True).start() + + # Start the Flask web server + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..d509599 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,23 @@ + + + + + + + + Live Image + + + +