Initial commit

This commit is contained in:
chickenflyshigh 2024-11-01 15:18:25 +11:00
commit ee4b3ed43e
12 changed files with 794 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*.ipynb
*.log
translate/
__pycache__/
.*

0
README.md Normal file
View File

132
chinese_to_eng.py Normal file
View File

@ -0,0 +1,132 @@
###################################################################################
##### IMPORT LIBRARIES #####
import os, time, logging, ast
from helpers.translation import init_TRANSLATE, translate
from helpers.utils import intercepts, contains_lang, printsc, romanize, convert_image_to_bytes, bytes_to_image
from helpers.ocr import id_filtered, id_lang, get_words, get_positions, get_confidences, init_OCR
from logging_config import setup_logger
from helpers.draw import modify_image_bytes
###################################################################################
#### LOGGING ####
setup_logger('chinese_to_eng', log_file='chinese_to_eng.log')
###################################################################################
##### Variables to edit #####
INTERVAL = int(os.getenv('INTERVAL'))
### available languages: 'ch_sim', 'ch_tra', 'ja', 'ko', 'en'
SOURCE_LANG = os.getenv('SOURCE_LANG', 'ja')
TARGET_LANG = os.getenv('TARGET_LANG', 'en')
### Translation
TRANSLATION_MODEL = os.getenv('TRANSLATION_MODEL', 'opus') # 'opus' or 'm2m' # opus is a lot more lightweight
MAX_TRANSLATE = 200
### OCR
OCR_USE_GPU = ast.literal_eval(os.getenv('OCR_USE_GPU', 'True'))
OCR_MODEL = os.getenv('OCR_MODEL', 'easy') # 'easy', 'paddle', 'rapid' ### easy is the most accurate, paddle is the fastest with CUDA and rapid is the fastest with CPU
REGION = ast.literal_eval(os.getenv('REGION','(0,0,2560,1440)'))
###################################################################################
OCR_LANGUAGES = [SOURCE_LANG, TARGET_LANG, 'en']
latest_image = None
def main():
global latest_image
# screenshot
untranslated_image = printsc(REGION)
byte_image = convert_image_to_bytes(untranslated_image)
###################################################################################
##### Initialize the OCR #####
ocr = init_OCR(model=OCR_MODEL, ocr_languages = OCR_LANGUAGES, use_GPU=OCR_USE_GPU)
ocr_output = id_lang(ocr, byte_image, 'ja')
curr_words = set(get_words(ocr_output))
prev_words = set()
##### Initialize the translation #####
init_TRANSLATE()
###################################################################################
while True:
print('Running')
if prev_words != curr_words:
print('Translating')
to_translate = [entry[1] for entry in ocr_output][:MAX_TRANSLATE]
translation = translate(to_translate, from_lang, target_lang)
print(translation)
translated_image = modify_image_bytes(byte_image, ocr_output, translation)
latest_image = bytes_to_image(translated_image)
prev_words = curr_words
logging.info(f"Successfully translated image. Prev words are:\n{prev_words}")
else:
logging.info("The image has remained the same.")
# torch.cuda.empty_cache()
logging.info(f'Sleeping for {INTERVAL} seconds')
time.sleep(INTERVAL)
untranslated_image = printsc(REGION)
byte_image = convert_image_to_bytes(untranslated_image)
ocr_output = id_lang(ocr, byte_image, 'ja')
curr_words = set(get_words(ocr_output))
logging.info(f'Curr words to translate are:\n{curr_words}')
if __name__ == "__main__":
main()
# image = Image.open(SCREENSHOT_PATH)
# draw = ImageDraw.Draw(image)
# # set counter for limiting the number of translations
# translated_number = 0
# bounding_boxes = []
# for i, (position,words,confidence) in enumerate(ocr_output):
# if translated_number >= MAX_TRANSLATE:
# break
# # try:
# top_left, _, _, _ = position
# position = (top_left[0], top_left[1] - 60)
# text_content = f"{translation[i]}\n{romanize(words)}\n{words}"
# lines = text_content.split('\n')
# x,y = position
# max_width = 0
# total_height = 0
# line_spacing = 3
# line_height = FONT_SIZE
# for line in lines:
# bbox = draw.textbbox(position, line, font=font)
# line_width, _ = bbox[2] - bbox[0], bbox[3] - bbox[1]
# max_width = max(max_width, line_width)
# total_height += line_height + line_spacing
# bounding_box = (x, y, x + max_width, y + total_height, words)
# print(f"Bounding Box of Interest: {bounding_box}")
# y = np.max([y,0])
# if len(bounding_boxes) > 0:
# for box in bounding_boxes:
# print(f'Investigating box: {box}')
# if intercepts((box[0],box[2]),(bounding_box[0],bounding_box[2])) and intercepts((box[1],box[3]),(y, y+total_height)):
# print(f'Overlapping change adjustment to {words}')
# y = np.max([y,box[3]]) + line_spacing
# print(y, box[3])
# print(f'Changed to {(x,y, x+max_width, y+total_height, words)}')
# adjusted_bounding_box = (x, y, x + max_width, y + total_height, words)
# bounding_boxes.append(adjusted_bounding_box)
# draw.rectangle([(x,y), (x+max_width, y+total_height)], outline="black", width=1)
# position = (x,y)
# for line in lines:
# draw.text(position, line, fill= TEXT_COLOR, font=font)
# y += FONT_SIZE + line_spacing
# position = (x,y)
# print("Adjusted_bounding_box:",adjusted_bounding_box)
# print('\n')
# translated_number += 1

153
draw_translation.py Executable file
View File

@ -0,0 +1,153 @@
import easyocr
from pypinyin import pinyin
from PIL import Image, ImageDraw, ImageFont
import os, time, logging, torch, subprocess
from helpers.translation import init_M2M, translate_M2M
import langid
import numpy as np
##### Variables to edit
text_color = "#ff0000"
font_file = "/home/James/.local/share/fonts/Arial-Unicode-Bold.ttf"
font_size = 16
pyin = True # whether to add pinyin or not
max_translate = 100
# for detecting language to filter out other languages. Only writes the text when it is detected to be src_lang
src_lang = "zh"
tgt_lang = "en"
# af, am, an, ar, as, az, be, bg, bn, br, bs, ca, cs, cy, da, de, dz, el, en, eo, es, et, eu, fa, fi, fo, fr, ga, gl, gu, he, hi, hr, ht, hu, hy, id, is, it, ja, jv, ka, kk, km, kn, ko, ku, ky, la, lb, lo, lt, lv, mg, mk, ml, mn, mr, ms, mt, nb, ne, nl, nn, no, oc, or, pa, pl, ps, pt, qu, ro, ru, rw, se, si, sk, sl, sq, sr, sv, sw, ta, te, th, tl, tr, ug, uk, ur, vi, vo, wa, xh, zh, zu
langid.set_languages([src_lang,tgt_lang,'en'])
# for translator (M2M100)
from_lang = "zh"
target_lang = "en"
# Afrikaans (af), Amharic (am), Arabic (ar), Asturian (ast), Azerbaijani (az), Bashkir (ba), Belarusian (be), Bulgarian (bg), Bengali (bn), Breton (br), Bosnian (bs), Catalan; Valencian (ca), Cebuano (ceb), Czech (cs), Welsh (cy), Danish (da), German (de), Greeek (el), English (en), Spanish (es), Estonian (et), Persian (fa), Fulah (ff), Finnish (fi), French (fr), Western Frisian (fy), Irish (ga), Gaelic; Scottish Gaelic (gd), Galician (gl), Gujarati (gu), Hausa (ha), Hebrew (he), Hindi (hi), Croatian (hr), Haitian; Haitian Creole (ht), Hungarian (hu), Armenian (hy), Indonesian (id), Igbo (ig), Iloko (ilo), Icelandic (is), Italian (it), Japanese (ja), Javanese (jv), Georgian (ka), Kazakh (kk), Central Khmer (km), Kannada (kn), Korean (ko), Luxembourgish; Letzeburgesch (lb), Ganda (lg), Lingala (ln), Lao (lo), Lithuanian (lt), Latvian (lv), Malagasy (mg), Macedonian (mk), Malayalam (ml), Mongolian (mn), Marathi (mr), Malay (ms), Burmese (my), Nepali (ne), Dutch; Flemish (nl), Norwegian (no), Northern Sotho (ns), Occitan (post 1500) (oc), Oriya (or), Panjabi; Punjabi (pa), Polish (pl), Pushto; Pashto (ps), Portuguese (pt), Romanian; Moldavian; Moldovan (ro), Russian (ru), Sindhi (sd), Sinhala; Sinhalese (si), Slovak (sk), Slovenian (sl), Somali (so), Albanian (sq), Serbian (sr), Swati (ss), Sundanese (su), Swedish (sv), Swahili (sw), Tamil (ta), Thai (th), Tagalog (tl), Tswana (tn), Turkish (tr), Ukrainian (uk), Urdu (ur), Uzbek (uz), Vietnamese (vi), Wolof (wo), Xhosa (xh), Yiddish (yi), Yoruba (yo), Chinese (zh), Zulu (zu)
# for easyOCR
OCR_languages = ['ch_sim','en'] # languages to recognise
# https://www.jaided.ai/easyocr/
log_directory = '/var/log/ocr'
printsc = lambda x: subprocess.run(f"grim -t png -o DP-1 -l 0 {x}", shell=True)
# Configure the logger
os.makedirs(log_directory, exist_ok=True)
logging.basicConfig(
filename=os.path.join(log_directory, 'ocr.log'),
level=logging.DEBUG, # Set the logging level
format='%(asctime)s - %(message)s', # Define the format for logging
datefmt='%Y-%m-%d %H:%M:%S' # Define the date format
)
# screenshot
printsc(image_old)
time.sleep(1)
# EasyOCR
reader = easyocr.Reader(OCR_languages) # this needs to run only once to load the model into memory
def results():
result = reader.readtext(image_old)
results_no_eng = [entry for entry in result if langid.classify(entry[1])[0] == src_lang]
return results_no_eng
# result is a list of tuples with the following structure:
# (top_left, top_right, bottom_right, bottom_left, text, confidence)
# top_left, top_right, bottom_right, bottom_left are the coordinates of the bounding box
ocr_output = results()
curr_words = set(entry[1] for entry in ocr_output)
prev_words = set()
# translator = GoogleTranslator(source=from_language, target=target_language)
font = ImageFont.truetype(font_file, font_size)
# define a function for checking whether one axis of a shape intercepts with another
def intercepts(x,y):
# both x and y are two dimensional tuples representing the ends of a line on one dimension.
x1, x2 = x
y1, y2 = y
return (x1 <= y1 <= x2) or (x1 <= y2 <= x2) or (y1 <= x1 <= y2) or (y1 <= x2 <= y2)
while True:
print('Running')
if prev_words != curr_words:
print('Translating')
image = Image.open(image_old)
draw = ImageDraw.Draw(image)
to_translate = [entry[1] for entry in ocr_output][:max_translate]
translation = translate_M2M(to_translate, from_lang = from_lang, target_lang = target_lang)
# set counter for limiting the number of translations
translated_number = 0
bounding_boxes = []
for i, (position,words,confidence) in enumerate(ocr_output):
if translated_number >= max_translate:
break
word = translation[i]
# try:
top_left, _, _, _ = position
position = (top_left[0], top_left[1] - 60)
if pyin:
py = ' '.join([ py[0] for py in pinyin(words)])
text_content = f"{translation[i]}\n{py}\n{words}"
else:
text_content = f"{translation[i]}\n{words}"
lines = text_content.split('\n')
x,y = position
max_width = 0
total_height = 0
line_spacing = 3
line_height = font_size
for line in lines:
bbox = draw.textbbox(position, line, font=font)
line_width, _ = bbox[2] - bbox[0], bbox[3] - bbox[1]
max_width = max(max_width, line_width)
total_height += line_height + line_spacing
bounding_box = (x, y, x + max_width, y + total_height, words)
print(f"Bounding Box of Interest: {bounding_box}")
y = np.max([y,0])
if len(bounding_boxes) > 0:
for box in bounding_boxes:
print(f'Investigating box: {box}')
if intercepts((box[0],box[2]),(bounding_box[0],bounding_box[2])) and intercepts((box[1],box[3]),(y, y+total_height)):
print(f'Overlapping change adjustment to {words}')
y = np.max([y,box[3]]) + line_spacing
print(y, box[3])
print(f'Changed to {(x,y, x+max_width, y+total_height, words)}')
adjusted_bounding_box = (x, y, x + max_width, y + total_height, words)
bounding_boxes.append(adjusted_bounding_box)
draw.rectangle([(x,y), (x+max_width, y+total_height)], outline="black", width=1)
position = (x,y)
for line in lines:
draw.text(position, line, fill= text_color, font=font)
y += font_size + line_spacing
position = (x,y)
print("Adjusted_bounding_box:",adjusted_bounding_box)
print('\n')
# except Exception as e:
# logging.error(e)
translated_number += 1
image.save(image_new)
logging.info(f"Saved the image to {image_new}")
prev_words = curr_words
logging.info(f"Successfully translated image. Prev words are:\n{prev_words}")
else:
logging.info("The image has remained the same.")
torch.cuda.empty_cache()
print('Sleeping')
time.sleep(10)
printsc(image_old)
ocr_output = results()
curr_words = set(entry[1] for entry in ocr_output)
logging.info(f'Curr words are:\n{curr_words}')

0
helpers/__init__.py Normal file
View File

88
helpers/draw.py Normal file
View File

@ -0,0 +1,88 @@
from PIL import Image, ImageDraw, ImageFont
from dotenv import load_dotenv
import os
import io
import numpy as np
import ast
from helpers.utils import romanize, intercepts
load_dotenv()
MAX_TRANSLATE = int(os.getenv('MAX_TRANSLATION', 200))
FONT_FILE = os.getenv('FONT_FILE')
FONT_SIZE = int(os.getenv('FONT_SIZE', 16))
LINE_SPACING = int(os.getenv('LINE_SPACING', 3))
TEXT_COLOR = os.getenv('TEXT_COLOR', "#ff0000")
LINE_HEIGHT = FONT_SIZE
TO_ROMANIZE = ast.literal_eval(os.getenv('TO_ROMANIZE', 'True'))
font = ImageFont.truetype(FONT_FILE, FONT_SIZE)
def modify_image_bytes(image_bytes: io.BytesIO, ocr_output, translation: list) -> bytes:
# Load the image from bytes
with io.BytesIO(image_bytes) as byte_stream:
image = Image.open(byte_stream)
draw = ImageDraw.Draw(image)
translate_image(draw, translation, ocr_output, MAX_TRANSLATE)
# Save the modified image back to bytes without changing the format
with io.BytesIO() as byte_stream:
image.save(byte_stream, format=image.format) # Save in original format
modified_image_bytes = byte_stream.getvalue()
return modified_image_bytes
def translate_image(draw: ImageDraw, translation: list, ocr_output: list, max_translate: int) -> ImageDraw:
translation
translated_number = 0
bounding_boxes = []
for i, (position, untranslated_phrase, confidence) in enumerate(ocr_output):
if translated_number >= max_translate:
break
translate_one_phrase(draw, translation[i], position, bounding_boxes, untranslated_phrase)
translated_number += 1
return draw
def translate_one_phrase(draw: ImageDraw, translated_phrase: str, position: tuple, bounding_boxes: list, untranslated_phrase: str) -> ImageDraw:
# Draw the bounding box
top_left, _, _, _ = position
position = (top_left[0], top_left[1] - 60)
text_content = f"{translated_phrase}\n{romanize(untranslated_phrase, TO_ROMANIZE)}\n{untranslated_phrase}"
lines = text_content.split('\n')
x,y = position
max_width = 0
total_height = 0
total_height = len(lines) * (LINE_HEIGHT + LINE_SPACING)
for line in lines:
bbox = draw.textbbox(position, line, font=font)
line_width = bbox[2] - bbox[0]
max_width = max(max_width, line_width)
bounding_box = (x, y, x + max_width, y + total_height, untranslated_phrase)
print(f"Bounding Box of Interest: {bounding_box}")
adjust_if_intersects(x, y, bounding_box, bounding_boxes, untranslated_phrase, max_width, total_height)
adjusted_x, adjusted_y, adjusted_max_x, adjusted_max_y, _ = bounding_boxes[-1]
draw.rectangle([(adjusted_x,adjusted_y), (adjusted_max_x, adjusted_max_y)], outline="black", width=1)
position = (adjusted_x,adjusted_y)
for line in lines:
draw.text(position, line, fill= TEXT_COLOR, font=font)
adjusted_y += FONT_SIZE + LINE_SPACING
position = (adjusted_x,adjusted_y)
print(f"Adjusted_bounding_box: {bounding_box[-1]}.\n")
def adjust_if_intersects(x: int, y: int, bounding_box: tuple, bounding_boxes: list, untranslated_phrase: str, max_width: int, total_height: int) -> tuple:
y = np.max([y,0])
if len(bounding_boxes) > 0:
for box in bounding_boxes:
print(f'Investigating box: {box}')
if intercepts((box[0],box[2]),(bounding_box[0],bounding_box[2])) and intercepts((box[1],box[3]),(y, y+total_height)):
print(f'Overlapping change adjustment to {untranslated_phrase}')
y = np.max([y,box[3]]) + LINE_SPACING
print(y, box[3])
print(f'Changed to {(x,y, x+max_width, y+total_height, untranslated_phrase)}')
adjusted_bounding_box = (x, y, x + max_width, y + total_height, untranslated_phrase)
bounding_boxes.append(adjusted_bounding_box)
return adjusted_bounding_box

87
helpers/ocr.py Normal file
View File

@ -0,0 +1,87 @@
from paddleocr import PaddleOCR
import easyocr
from rapidocr_onnxruntime import RapidOCR
import langid
from helpers.utils import contains_lang
from concurrent.futures import ThreadPoolExecutor
# PaddleOCR
# Paddleocr supports Chinese, English, French, German, Korean and Japanese.
# You can set the parameter `lang` as `ch`, `en`, `fr`, `german`, `korean`, `japan`
# to switch the language model in order.
# need to run only once to download and load model into memory
def _paddle_init(lang='ch', use_angle_cls=False, use_GPU=True):
return PaddleOCR(use_angle_cls=use_angle_cls, lang=lang, use_GPU=use_GPU)
def _paddle_ocr(ocr, image) -> list:
### return a list containing the bounding box, text and confidence of the detected text
result = ocr.ocr(image, cls=False)[0]
if not isinstance(result, list):
return []
result = [ (pos, text[0], text[1]) for pos, text in result]
return result
# EasyOCR has support for many languages
def _easy_init(ocr_languages: list, use_GPU=True):
return easyocr.Reader(ocr_languages, gpu=use_GPU)
def _easy_ocr(ocr,image) -> list:
return ocr.readtext(image)
# RapidOCR mostly for mandarin and some other asian languages
def _rapid_init(use_GPU=True):
return RapidOCR(use_gpu=use_GPU)
def _rapid_ocr(ocr, image) -> list:
return ocr(image)
### Initialize the OCR model
def init_OCR(model='paddle', **kwargs):
if model == 'paddle':
return _paddle_init(**kwargs)
elif model == 'easy':
return _easy_init(**kwargs)
elif model == 'rapid':
return _rapid_init(**kwargs)
### Perform OCR on the image
def identify(ocr, image) -> list:
if isinstance(ocr, PaddleOCR):
return _paddle_ocr(ocr, image)
elif isinstance(ocr, easyocr.Reader):
return _easy_ocr(ocr, image)
elif isinstance(ocr, RapidOCR):
return _rapid_ocr(ocr, image)
else:
raise ValueError("Invalid OCR model. Please initialise the OCR model first with init() and pass it as an argument to identify().")
### Filter out the results that are not in the source language
def id_filtered(ocr, image, lang) -> list:
result = identify(ocr, image)
### Parallelise since langid is slow
def classify_text(entry):
return entry if langid.classify(entry[1])[0] == lang else None
with ThreadPoolExecutor() as executor:
results_no_eng = list(filter(None, executor.map(classify_text, result)))
return results_no_eng
# zh, ja, ko
def id_lang(ocr, image, lang) -> list:
result = identify(ocr, image)
filtered = [entry for entry in result if contains_lang(entry[1], lang)]
return filtered
def get_words(ocr_output) -> list:
return [entry[1] for entry in ocr_output]
def get_positions(ocr_output) -> list:
return [entry[0] for entry in ocr_output]
def get_confidences(ocr_output) -> list:
return [entry[2] for entry in ocr_output]

76
helpers/translation.py Normal file
View File

@ -0,0 +1,76 @@
from transformers import M2M100Tokenizer, M2M100ForConditionalGeneration, AutoTokenizer, AutoModelForSeq2SeqLM, GPTQConfig
import torch, os
from dotenv import load_dotenv
load_dotenv()
if os.getenv('TRANSLATION_USE_GPU') in ['False', '0', 'false', 'no', 'No', 'NO', 'FALSE']:
device = torch.device("cpu")
else:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
### Batch translate a list of strings
# M2M100 model
def init_M2M():
global tokenizer, model
tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_418M", local_files_only=True)
model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_418M", local_files_only=True, torch_dtype=torch.float16).to(device)
model.eval()
def translate_M2M(text, from_lang = 'zh', target_lang = 'en'):
if len(text) == 0:
return []
tokenizer.src_lang = from_lang
with torch.no_grad():
encoded = tokenizer(text, return_tensors="pt", padding=True).to(device)
generated_tokens = model.generate(**encoded,
forced_bos_token_id=tokenizer.get_lang_id(target_lang))
translated = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
return translated
# Helsinki-NLP model Opus MT
def init_OPUS():
global tokenizer, model
tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-tc-bible-big-zhx-en", local_files_only=True)
model = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-tc-bible-big-zhx-en", local_files_only=True, torch_dtype=torch.float16).to(device)
model.eval()
def translate_OPUS(text: list[str]) -> list[str]:
if len(text) == 0:
return []
with torch.no_grad():
encoded = tokenizer(text, return_tensors="pt", padding=True).to(device)
generated_tokens = model.generate(**encoded)
translated = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
return translated
###
def init_TRANSLATE(model): # model = 'opus' or 'm2m'
if model == 'opus':
init_OPUS()
elif model == 'm2m':
init_M2M()
else:
raise ValueError("Invalid model. Please use 'opus' or 'm2m'.")
###
def translate(text, model, **kwargs):
if model == 'opus':
return translate_OPUS(text)
elif model == 'm2m':
try:
return translate_M2M(text, **kwargs)
except:
raise ValueError("Please provide the from_lang and target_lang variables if you are using the M2M model.")
else:
raise ValueError("Invalid model. Please use 'opus' or 'm2m'.")

155
helpers/utils.py Normal file
View File

@ -0,0 +1,155 @@
import re, uroman as ur
from pypinyin import pinyin
import pyscreenshot as ImageGrab # wayland tings not sure if it will work on other machines alternatively use mss
import mss, io, os
from PIL import Image
import jaconv
import MeCab
import unidic
mecab = MeCab.Tagger('-d "{}"'.format(unidic.DICDIR))
uroman = ur.Uroman()
# define a function for checking whether one axis of a shape intercepts with another
def intercepts(x,y):
# both x and y are two dimensional tuples representing the ends of a line on one dimension.
x1, x2 = x
y1, y2 = y
return (x1 <= y1 <= x2) or (x1 <= y2 <= x2) or (y1 <= x1 <= y2) or (y1 <= x2 <= y2)
def is_wayland():
return 'WAYLAND_DISPLAY' in os.environ
# path to save screenshot of monitor to
def printsc_wayland(region, save: bool = False, path: str = None):
if save:
im = ImageGrab.grab(bbox=region)
im.save(path)
else:
return ImageGrab.grab(bbox=region)
def printsc_non_wayland(region, save: bool = False, path: str = None):
# use mss to capture the screen
with mss.mss() as sct:
# grab the screen
img = sct.grab(region)
# convert the image to a PIL image
image = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
# save the image if save is True
if save:
image.save(path)
def printsc(region, save: bool = False, path: str = None):
try:
if is_wayland():
return printsc_wayland(region, save, path)
else:
return printsc_non_wayland(region, save, path)
except Exception as e:
print(f'Error {e}')
def convert_image_to_bytes(img):
with io.BytesIO() as byte_stream:
img.save(byte_stream, format='PNG') # Save the image to the byte stream
return byte_stream.getvalue() # Get the byte representation
def bytes_to_image(image_bytes):
# Load the image from bytes
byte_stream = io.BytesIO(image_bytes)
# Open the image from the BytesIO stream
image = Image.open(byte_stream)
return image
# for japanese
def add_furigana(text):
parsed = mecab.parse(text).split('\n')[:-2]
furigana_string = ''
for i in parsed:
words = i.split('\t')[0]
add = f'({jaconv.kata2hira(i.split(',')[6])})'
to_add = add if contains_kanji(words) else ''
furigana_string += i.split('\t')[0] + to_add
return furigana_string
def contains_kanji(text):
return bool(re.search(r'[\u4E00-\u9FFF]', text))
def contains_hiragana(text):
return bool(re.search(r'[\u3040-\u309F]', text))
def contains_katakana(text):
return bool(re.search(r'[\u30A0-\u30FF]', text))
def romanize(text, piny=False):
if piny:
return ' '.join([ py[0] for py in pinyin(text, heteronym=True)])
return uroman.romanize_string(text)
# check if a string contains words from a language
def contains_lang(text, lang):
# Matches any character in the Unicode range of the language
if lang == 'zh':
return bool(re.search(r'[\u4e00-\u9fff]', text))
elif lang == 'ja':
return bool(re.search(r'[\u3040-\u30ff]', text))
elif lang == 'ko':
return bool(re.search(r'[\uac00-\ud7af]', text))
elif lang == 'en':
return bool(re.search(r'[a-zA-Z]', text))
else:
raise ValueError("Invalid language. Please use one of 'en', 'zh', 'ja', or 'ko'.")
### en, ch_sim, ja, ko rapidocr only has chinese and en at the moment
def standardize_lang(lang):
if lang == 'ch_sim':
easyocr_lang = 'ch_sim'
paddleocr_lang = 'ch'
rapidocr_lang = 'ch'
translation_model_lang = 'zh'
elif lang == 'ch_tra':
easyocr_lang = 'ch_tra'
paddleocr_lang = 'ch'
rapidocr_lang = 'ch'
translation_model_lang = 'zh'
elif lang == 'ja':
easyocr_lang = 'ja'
paddleocr_lang = 'ja'
rapidocr_lang = 'ja'
translation_model_lang = 'ja'
elif lang == 'ko':
easyocr_lang = 'korean'
paddleocr_lang = 'ko'
rapidocr_lang = 'ko'
translation_model_lang = 'ko'
elif lang == 'en':
easyocr_lang = 'en'
paddleocr_lang = 'en'
rapidocr_lang = 'en'
translation_model_lang = 'en'
else:
raise ValueError("Invalid language. Please use one of 'en', 'ch_sim', 'ch_tra', 'ja', or 'ko'.")
return {'easyocr_lang': easyocr_lang, 'paddleocr_lang': paddleocr_lang, 'rapidocr_lang': rapidocr_lang, 'translation_model_lang': translation_model_lang}
def which_ocr_lang(model):
if model == 'easy':
return 'easyocr_lang'
elif model == 'paddle':
return 'paddleocr_lang'
elif model == 'rapid':
return 'rapidocr_lang'
else:
raise ValueError("Invalid OCR model. Please use one of 'easy', 'paddle', or 'rapid'.")
if __name__ == "__main__":
# Example usage
japanesetext = "本が好きにちは"
print(add_furigana(japanesetext))

36
logging_config.py Normal file
View File

@ -0,0 +1,36 @@
import logging, os
from dotenv import load_dotenv
load_dotenv()
# Configure the logger
def setup_logger(name: str, log_file: str = None, level: int = logging.INFO) -> logging.Logger:
"""Set up a logger with the specified name and level."""
if log_file is None:
log_file = f"{name}.log"
# Create a logger
logger = logging.getLogger(name)
logger.setLevel(level)
# Create file handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(level)
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
# Create a formatter and set it for both handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger

39
main.py Normal file
View File

@ -0,0 +1,39 @@
from flask import Flask, Response, render_template
import time
import threading
from PIL import Image
import io
import chinese_to_eng
app = Flask(__name__)
# Global variable to hold the current image
def curr_image():
return chinese_to_eng.latest_image
@app.route('/')
def index():
return render_template('index.html')
@app.route('/image')
def stream_image():
if curr_image() is None:
return "No image generated yet.", 503
print('streaming')
print(curr_image())
file_object = io.BytesIO()
curr_image().save(file_object, 'PNG')
file_object.seek(0)
response = Response(file_object.getvalue(), mimetype='image/png')
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' # HTTP 1.1
response.headers['Pragma'] = 'no-cache' # HTTP 1.0
response.headers['Expires'] = '0' # Proxies
return response
if __name__ == '__main__':
# Start the image updating thread
threading.Thread(target=chinese_to_eng.main, daemon=True).start()
# Start the Flask web server
app.run(host='0.0.0.0', port=5000, debug=True)

23
templates/index.html Normal file
View File

@ -0,0 +1,23 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta
name="viewport"
content="width=device-width, initial-scale=1.0" />
</head>
<body>
<img
id="live-image"
src="/image"
alt="Live Image"
style="max-width: 100%; height: auto" />
<script>
setInterval(function () {
document.getElementById("live-image").src =
"/image?" + new Date().getTime();
}, 3500); // Update every 2 seconds
</script>
</body>
</html>