import pygame, os, subprocess, language, wave, pyaudio, time, numpy as np, random from dotenv import load_dotenv load_dotenv() ROOT_DIR = os.getenv('ROOT_DIR', os.path.dirname(__file__)) PIPER_AUDIO_OUTPUT_FILE = os.path.join(ROOT_DIR, 'llm_media/chat_voice_out.wav') AUDIO_INPUT_FILE = os.path.join(ROOT_DIR, 'llm_media/input_audio.wav') def play_audio(audio_file, volume=0.5): # Initialize pygame mixer pygame.mixer.init() pygame.mixer.music.load(audio_file) pygame.mixer.music.play() pygame.mixer.music.set_volume(volume) while pygame.mixer.music.get_busy(): continue def piper(text, model, config): process1 = subprocess.run(['echo', text], stdout=subprocess.PIPE) subprocess.run(['piper-tts', '--sentence-silence', '0.5', '--model', model, '--config', config, '--output_file', PIPER_AUDIO_OUTPUT_FILE], input=process1.stdout) play_audio(PIPER_AUDIO_OUTPUT_FILE) def eng_piper(text): piper(text, model = eng_piper_model, config = eng_piper_conf) def play_prompt(fallback_prompt: str = "Missing Fallback Prompt", audio_files: list = [], use_custom: bool = True): if use_custom: valid_files = [] for i in audio_files: if os.path.exists(i): valid_files.append(i) if len(valid_files) == 0: eng_piper(fallback_prompt) return number_of_files = len(valid_files) file = random.randint(0, number_of_files-1) play_audio(valid_files[file], volume=1) else: eng_piper(fallback_prompt) eng_piper_model, eng_piper_conf = language.files_language('en') # Capture the audio input def capture_audio_until_silence(threshold=800, silence_duration=3, output_file = AUDIO_INPUT_FILE): """ Capture audio until a period of silence is detected. threshold: The audio level that defines silence. silence_duration: The duration of silence to wait for before stopping. output_file: The file to save the recorded audio to. """ # PyAudio configuration p = pyaudio.PyAudio() chunk = 1024 sample_format = pyaudio.paInt16 channels = 2 rate = 44100 # Start recording stream = p.open(format=sample_format, channels=channels, rate=rate, input=True, frames_per_buffer=chunk) print("Listening...") audio_frames = [] last_time = time.time() try: while True: # Read audio data data = stream.read(chunk) audio_frames.append(data) # Convert data to numpy array for analysis audio_data = np.frombuffer(data, dtype=np.int16) peak = np.abs(audio_data).max() # Check if the sound level exceeds the threshold if peak > threshold: last_time = time.time() # Reset the silence timer else: # Check for silence if time.time() - last_time > silence_duration: print(f"No sound detected for {silence_duration} seconds. Stopping...") break except KeyboardInterrupt: print("Stopped by user.") finally: # Stop and close the stream stream.stop_stream() stream.close() p.terminate() with wave.open(output_file, 'wb') as wf: wf.setnchannels(channels) wf.setsampwidth(p.get_sample_size(sample_format)) wf.setframerate(rate) wf.writeframes(b''.join(audio_frames)) if __name__ == "__main__": zh_piper_model, zh_piper_conf = language.files_language('zh') eng_piper("Hello, I am Piper. I am a text-to-speech model.") piper("你好,我是派珀。我是一个文本转语音模型。", model = zh_piper_model, config = zh_piper_conf) # capture_audio_until_silence() # you can try out whisper on the recording you have just made by running whisper.py