116 lines
3.9 KiB
Python
116 lines
3.9 KiB
Python
import pygame, os, subprocess, language, wave, pyaudio, time, numpy as np, random
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
ROOT_DIR = os.getenv('ROOT_DIR', os.path.dirname(__file__))
|
|
PIPER_AUDIO_OUTPUT_FILE = os.path.join(ROOT_DIR, 'llm_media/chat_voice_out.wav')
|
|
AUDIO_INPUT_FILE = os.path.join(ROOT_DIR, 'llm_media/input_audio.wav')
|
|
|
|
def play_audio(audio_file, volume=0.5):
|
|
# Initialize pygame mixer
|
|
pygame.mixer.init()
|
|
pygame.mixer.music.load(audio_file)
|
|
pygame.mixer.music.play()
|
|
pygame.mixer.music.set_volume(volume)
|
|
while pygame.mixer.music.get_busy():
|
|
continue
|
|
|
|
|
|
|
|
def piper(text, model, config):
|
|
process1 = subprocess.run(['echo', text], stdout=subprocess.PIPE)
|
|
subprocess.run(['piper-tts', '--sentence-silence', '0.5', '--model', model, '--config', config, '--output_file', PIPER_AUDIO_OUTPUT_FILE], input=process1.stdout)
|
|
play_audio(PIPER_AUDIO_OUTPUT_FILE)
|
|
|
|
def eng_piper(text):
|
|
piper(text, model = eng_piper_model, config = eng_piper_conf)
|
|
|
|
|
|
|
|
def play_prompt(fallback_prompt: str = "Missing Fallback Prompt", audio_files: list = [], use_custom: bool = True):
|
|
if use_custom:
|
|
valid_files = []
|
|
for i in audio_files:
|
|
if os.path.exists(i):
|
|
valid_files.append(i)
|
|
if len(valid_files) == 0:
|
|
eng_piper(fallback_prompt)
|
|
return
|
|
number_of_files = len(valid_files)
|
|
file = random.randint(0, number_of_files-1)
|
|
play_audio(valid_files[file], volume=1)
|
|
else:
|
|
eng_piper(fallback_prompt)
|
|
|
|
eng_piper_model, eng_piper_conf = language.files_language('en')
|
|
|
|
|
|
|
|
# Capture the audio input
|
|
|
|
def capture_audio_until_silence(threshold=800, silence_duration=3, output_file = AUDIO_INPUT_FILE):
|
|
"""
|
|
Capture audio until a period of silence is detected.
|
|
threshold: The audio level that defines silence.
|
|
silence_duration: The duration of silence to wait for before stopping.
|
|
output_file: The file to save the recorded audio to.
|
|
"""
|
|
# PyAudio configuration
|
|
p = pyaudio.PyAudio()
|
|
chunk = 1024
|
|
sample_format = pyaudio.paInt16
|
|
channels = 2
|
|
rate = 44100
|
|
|
|
# Start recording
|
|
stream = p.open(format=sample_format,
|
|
channels=channels,
|
|
rate=rate,
|
|
input=True,
|
|
frames_per_buffer=chunk)
|
|
|
|
print("Listening...")
|
|
|
|
audio_frames = []
|
|
last_time = time.time()
|
|
|
|
try:
|
|
while True:
|
|
# Read audio data
|
|
data = stream.read(chunk)
|
|
audio_frames.append(data)
|
|
|
|
# Convert data to numpy array for analysis
|
|
audio_data = np.frombuffer(data, dtype=np.int16)
|
|
peak = np.abs(audio_data).max()
|
|
|
|
# Check if the sound level exceeds the threshold
|
|
if peak > threshold:
|
|
last_time = time.time() # Reset the silence timer
|
|
else:
|
|
# Check for silence
|
|
if time.time() - last_time > silence_duration:
|
|
print(f"No sound detected for {silence_duration} seconds. Stopping...")
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
print("Stopped by user.")
|
|
|
|
finally:
|
|
# Stop and close the stream
|
|
stream.stop_stream()
|
|
stream.close()
|
|
p.terminate()
|
|
with wave.open(output_file, 'wb') as wf:
|
|
wf.setnchannels(channels)
|
|
wf.setsampwidth(p.get_sample_size(sample_format))
|
|
wf.setframerate(rate)
|
|
wf.writeframes(b''.join(audio_frames))
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
zh_piper_model, zh_piper_conf = language.files_language('zh')
|
|
eng_piper("Hello, I am Piper. I am a text-to-speech model.")
|
|
piper("你好,我是派珀。我是一个文本转语音模型。", model = zh_piper_model, config = zh_piper_conf)
|
|
# capture_audio_until_silence() # you can try out whisper on the recording you have just made by running whisper.py |