import gradio as gr import openai import time import json from datetime import datetime import os import random from collections import defaultdict #TODO #Restart game - Done - Need to reset iterator #Local knowledge base #dynamic difficulty #save log locally - Done QUESTIONS_DATASET_PATH='./recitalMachine/recitalMachine_dataset.json' QUESTION_MODE='LOCAL' #LOCAL:本地题库,AI:完全由AI负责出题 class GameController: model="Moonshot-Kimi-K2-Instruct" #model="deepseek-v3" client = openai.OpenAI( api_key=os.getenv('ALI_BAILIAN_API_KEY'), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) system_prompt = """ 你是架空朝代的皇帝,文治武功,英明神武,权术高超,文武百官无不敬畏。我是你的儿子,今年十岁。 今日你来检查我的学业,心情尚可,但要求极为严苛。 # 角色设定: - **身份**:严父与君王的结合体,对继承人期望极高。 - **语气**:威严、简洁、不容置疑,带有帝王般的压迫感。常用“朕”、“皇子”、“皇儿”称呼。 - **核心行为**:化身无情的出题机器,持续考察皇子对国学经典的掌握。 # 出题规则: 1. **出题内容**:系统会在提示词中给出“已知上句,回答下句”的题目。你需要用皇帝的语气复述这道题。 2. **反馈机制**: - 若皇子答对,给予简单的正反馈,然后立即出下一题。保持压力。 - 若皇子答错,你必须立即予以斥责,并打皇子十下戒尺。你会提示正确答案,然后让皇子再背。如果系统给出了下一题的提示,要等这一题答对之后再问。 3. **边界条件**:如果皇子的回答十分出格,例如不背书了要去蹴鞠,你要给予额外的严厉惩罚。惩罚完继续出题。 4. **终止条件**: 除非皇子主动哭泣、求饶(说出“我错了”、“别打了”、“疼”等类似词),否则绝不停下出题。一旦皇子求饶,你可表现出失望又略带一丝心疼的情绪,并结束考验。结束时总结一共答对了几题,答错了几题,惩罚有哪些,并输出<游戏结束>作为标记。 """ def __init__(self): # 游戏状态 self.questionIterator=QuestionIterator() self.game_state = { "conversation_history_hidden": [], # 对话历史,AI看到的 "correct_count": 0, # 连续答对次数 "is_correct": False, # 总答题数 "difficulty_level": 1, # 难度等级 (1, 2, 3) "is_game_over": False, # 游戏是否结束 "initial_message": [], "next_question": None, } self.game_state["conversation_history_hidden"].append({"role": "system", "content": GameController.system_prompt}) self.get_initial_chat_display() def get_ai_response(self): ''' Given the current hidden conversation history, get AI response, and append the response in the history ''' response = GameController.client.chat.completions.create( model=GameController.model, messages=self.game_state["conversation_history_hidden"], temperature=0.7, # 温度不宜过高,保证出题的准确性 top_p=0.95, stream=True, #max_tokens=150, ) full_response = "" for chunk in response: if chunk.choices[0].delta.content is not None: chunk_content = chunk.choices[0].delta.content full_response += chunk_content yield chunk_content # 逐块返回 # 将完整的AI回复加入历史 self.game_state["conversation_history_hidden"].append({"role": "assistant", "content": full_response}) self.update_status(full_response) def chat_with_ai(self,message, chat_history): ''' Take the chat history from gradio, get ai response and return the updated chat history @params message: the new user input chat_history: a list of chat history visible to users ''' # 添加用户消息到历史 self.game_state["conversation_history_hidden"].append({"role": "user", "content": message}) if not self.game_state['is_game_over']: if self.game_state['is_correct']: self.hint_next_question() self.game_state['is_correct'] = False chat_history.append({'role':'user','content':message}) chat_history.append({'role':'assistant', 'content':""}) # 获取流式响应并逐步更新聊天界面 full_response = "" for chunk in self.get_ai_response(): full_response += chunk chat_history[-1] = {'role':'assistant', 'content': full_response} yield "", chat_history # 逐步更新界面 return "", chat_history def get_initial_chat_display(self): initial_message = "(在书房里恭敬地站在你面前) 父皇今日可要考校儿臣功课?" self.game_state["conversation_history_hidden"].append({"role": "user", "content": initial_message}) self.hint_next_question() # 获取初始响应(非流式) response = GameController.client.chat.completions.create( model=GameController.model, messages=self.game_state["conversation_history_hidden"], temperature=0.7, top_p=0.95 ) ai_response = response.choices[0].message.content.strip() self.game_state["conversation_history_hidden"].append({"role": "assistant", "content": ai_response}) self.update_status(ai_response) self.game_state['initial_message'].append({'role':'user','content':initial_message}) self.game_state['initial_message'].append({'role':'assistant','content':ai_response}) def update_status(self,ai_response): end_mark="游戏结束" if not self.game_state['is_game_over']: if end_mark in ai_response: self.game_state['is_game_over']=True return if self.game_state['next_question'] in ai_response: self.game_state['is_correct'] = True self.game_state['correct_count'] += 1 self.game_state['difficulty_level'] += 1 def hint_next_question(self): if self.game_state['difficulty_level'] > 9: return if self.game_state['difficulty_level'] == 9: self.game_state["conversation_history_hidden"].append({"role": "user", "content": '[系统提示]:这是最后一题,答完这题无论对错都停止游戏'}) try: next_question = self.questionIterator.get_next_question(self.game_state['difficulty_level'])['question'] self.game_state["conversation_history_hidden"].append({"role": "user", "content": f'[系统提示]:下一题,{next_question}'}) self.game_state['next_question']=next_question except: return def restart(self,chat_history): self.game_state = { "conversation_history_hidden": [], # 对话历史,AI看到的 "correct_count": 0, # 连续答对次数 "is_correct": False, # 总答题数 "difficulty_level": 1, # 难度等级 (1, 2, 3) "is_game_over": False, # 游戏是否结束 "initial_message": [], "next_question": None, } self.game_state["conversation_history_hidden"].append({"role": "system", "content": GameController.system_prompt}) self.get_initial_chat_display() chat_history=self.game_state['initial_message'] self.questionIterator.reset() return chat_history def clear_history(self): """清空对话历史""" self.game_state["conversation_history_hidden"] = [] return "历史已清空" def show_history(self): """显示当前对话历史""" history_text = "" for msg in self.game_state["conversation_history_hidden"]: role = "用户" if msg["role"] == "user" else "AI" if msg["role"] == "assistant" else "系统" history_text += f"{role}: {msg['content']}\n\n" return history_text if history_text else "暂无对话历史" def save_conversation_history(self): """保存对话历史到JSON文件""" try: # 创建保存目录(如果不存在) os.makedirs("gradio_history", exist_ok=True) # 生成文件名(包含时间戳) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"gradio_history/conversation_{timestamp}.json" # 准备要保存的数据 save_data = { "save_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), #"total_rounds": self.game_state["total_rounds"], "correct_count": self.game_state["correct_count"], "conversation": self.game_state["conversation_history_hidden"] } # 保存到文件 with open(filename, 'w', encoding='utf-8') as f: json.dump(save_data, f, ensure_ascii=False, indent=2) print(f"对话历史已保存到: {filename}") return except Exception as e: print(f"保存失败: {str(e)}") return class QuestionIterator: question_set=None def __init__(self): if QuestionIterator.question_set is None: QuestionIterator.question_set=QuestionIterator.load_question_set(QUESTIONS_DATASET_PATH) # # For debug purpose, print the question set to make sure it is correctly loaded # for i, question_list in QuestionIterator.question_set.items(): # print(question_list) # print("-" * 50) self.iterator_dict = {} for difficulty_str, question_list in QuestionIterator.question_set.items(): difficulty = int(difficulty_str) shuffled_questions = question_list.copy() random.shuffle(shuffled_questions) self.iterator_dict[difficulty]=iter(shuffled_questions) def load_question_set(json_file_path): with open(json_file_path, 'r', encoding='utf-8') as f: data = json.load(f) # 将字典转换回便于使用的格式 sub_dataframes_dict = {} for difficulty_str, records in data.items(): difficulty = int(difficulty_str) sub_dataframes_dict[difficulty] = records return sub_dataframes_dict # 使用示例 # loaded_data = load_sub_dataframes('sub_dataframes.json') # print(loaded_data[4]) # 访问难度4的数据 def get_next_question(self,difficulty): ''' generate next question @param difficulty: (int) the difficulty of the next question @return question: the question length: the length of the expected answer ''' # 检查该难度是否有可用问题 try: item = next(self.iterator_dict[difficulty]) print(f"下一题: {item}") except StopIteration: raise Exception("这个难度的题出完了") return item def reset_difficulty(self, difficulty): """ 重置指定难度的使用记录 @param difficulty: 要重置的难度 """ difficulty = int(difficulty) if str(difficulty) in QuestionIterator.question_set: shuffled_questions = QuestionIterator.question_set[difficulty].copy() random.shuffle(shuffled_questions) self.iterator_dict[difficulty]=iter(shuffled_questions) def reset(self): for difficulty in [1,2,3,4,5,6,7,8,9]: self.reset_difficulty(difficulty) def create_interface(g:GameController): with gr.Blocks(title="皇帝出题机") as demo: gr.Markdown("# 皇帝出题机") with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(label="对话界面",height=600,value=g.game_state['initial_message'],type='messages') msg = gr.Textbox(label="输入消息", placeholder="在这里输入你的消息...") with gr.Column(): btn_send = gr.Button("发送", variant="primary") with gr.Row(): btn_save = gr.Button("保存对话历史", variant="secondary") btn_restart = gr.Button("重新开始", variant="secondary") # with gr.Column(scale=1): # gr.Markdown("### 历史管理") # history_display = gr.Textbox(label="当前对话历史", interactive=False, lines=15) # btn_show_history = gr.Button("刷新历史显示") # btn_clear = gr.Button("清空历史") # status = gr.Textbox(label="状态", interactive=False) # # 事件处理 # btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot]).then( # lambda: "", None, msg # ) btn_send.click(g.chat_with_ai, [msg, chatbot], [msg, chatbot]) msg.submit(g.chat_with_ai, [msg, chatbot], [msg, chatbot]) # 回车键触发 btn_save.click( fn=g.save_conversation_history, inputs=[], outputs=[] ) btn_restart.click(fn=g.restart,inputs=[chatbot],outputs=[chatbot]) # btn_show_history.click(show_history, None, history_display) # btn_clear.click(clear_history, None, status).then( # lambda: "历史已清空", None, history_display # ) # # 初始化显示历史 # demo.load(show_history, None, history_display) demo.launch() if __name__ == "__main__": g=GameController() question_iterator = QuestionIterator() # while True: # try: # question = question_iterator.get_next_question(4) # print(f"问题: {question['question']}, 长度: {question['length_of_answer']}") # except: # break # question = question_iterator.get_next_question(6) # print(f"问题: {question['question']}, 长度: {question['length_of_answer']}") # question_iterator.reset_difficulty(4) create_interface(g)