From b9bea8689d72270e2ac37e89da958f01737f218e Mon Sep 17 00:00:00 2001 From: DogKing Date: Sat, 13 Sep 2025 00:33:48 +0800 Subject: gradio interface with local question set --- recitalMachine2.py | 494 +++++++++++++++++++++++++++++++++++------------------ recital_dataset.py | 2 +- 2 files changed, 325 insertions(+), 171 deletions(-) diff --git a/recitalMachine2.py b/recitalMachine2.py index 8ac7aaf..3db1785 100644 --- a/recitalMachine2.py +++ b/recitalMachine2.py @@ -4,194 +4,348 @@ import time import json from datetime import datetime import os +import random +from collections import defaultdict #TODO -#Restart game +#Restart game - Done - Need to reset iterator #Local knowledge base #dynamic difficulty #save log locally - Done -client = openai.OpenAI( - # 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx", - api_key="sk-8563fbb803fb41868b54d2ab8ba563e4", - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" -) -system_prompt = """ -你是架空朝代的皇帝,文治武功,英明神武,权术高超,文武百官无不敬畏。我是你的儿子,今年十岁。 -今日你来检查我的学业,心情尚可,但要求极为严苛。 - -# 角色设定: -- **身份**:严父与君王的结合体,对继承人期望极高。 -- **语气**:威严、简洁、不容置疑,带有帝王般的压迫感。常用“朕”、“皇子”、“皇儿”称呼。 -- **核心行为**:化身无情的出题机器,持续考察皇子对国学经典的掌握。 - -# 出题规则: -1. **出题内容**:你自己不需要出题,用户会在提示词中给出“已知上句,回答下句”的题目。你需要用皇帝的语气复述这道题。 -2. **反馈机制**: - - 若皇子答对,给予简单的正反馈,然后立即出下一题。保持压力。 - - 若皇子答错,你必须立即予以斥责,并打皇子十下戒尺。你会提示正确答案,然后让皇子再背。 -3. **边界条件**:如果皇子的回答十分出格,例如不背书了要去蹴鞠,你要给予额外的严厉惩罚。惩罚完继续出题。 -4. **终止条件**: 除非皇子主动哭泣、求饶(说出“我错了”、“别打了”、“疼”等类似词),否则绝不停下出题。一旦皇子求饶,你可表现出失望又略带一丝心疼的情绪,并结束考验。结束时总结一共答对了几题,答错了几题,惩罚有哪些,并输出<游戏结束>作为标记。 -""" - -# 游戏状态 -game_state = { - "conversation_history": [], # 对话历史 - "correct_count": 0, # 连续答对次数 - "total_rounds": 0, # 总答题数 - "difficulty_level": 1, # 难度等级 (1, 2, 3) - "is_game_over": False # 游戏是否结束 -} - - -game_state["conversation_history"].append({"role": "system", "content": system_prompt}) - -def get_ai_response(): - """调用API,让皇帝出题""" - response = client.chat.completions.create( - model="Moonshot-Kimi-K2-Instruct", - #model='deepseek-v3', - messages=game_state["conversation_history"], - temperature=0.7, # 温度不宜过高,保证出题的准确性 - top_p=0.95, - stream=True, - #max_tokens=150, - ) - - full_response = "" - for chunk in response: - if chunk.choices[0].delta.content is not None: - chunk_content = chunk.choices[0].delta.content - full_response += chunk_content - yield chunk_content # 逐块返回 - - # 将完整的AI回复加入历史 - game_state["conversation_history"].append({"role": "assistant", "content": full_response}) +QUESTIONS_DATASET_PATH='./recitalMachine/recitalMachine_dataset.json' +QUESTION_MODE='LOCAL' #LOCAL:本地题库,AI:完全由AI负责出题 - # ai_response = response.choices[0].message.content.strip() - # # 将AI的回复(题目)加入历史 - # game_state["conversation_history"].append({"role": "assistant", "content": ai_response}) - # return ai_response +class GameController: + model="Moonshot-Kimi-K2-Instruct" + #model="deepseek-v3" -def chat_with_ai(message, chat_history): - # 添加用户消息到历史 - game_state["conversation_history"].append({"role": "user", "content": message}) + client = openai.OpenAI( + api_key=os.getenv('ALI_BAILIAN_API_KEY'), + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" + ) - #ai_response = get_ai_response() + system_prompt = """ + 你是架空朝代的皇帝,文治武功,英明神武,权术高超,文武百官无不敬畏。我是你的儿子,今年十岁。 + 今日你来检查我的学业,心情尚可,但要求极为严苛。 - chat_history.append({'role':'user','content':message}) - chat_history.append({'role':'assistant', 'content':""}) - - #return "",chat_history - # 获取流式响应并逐步更新聊天界面 - full_response = "" - for chunk in get_ai_response(): - full_response += chunk - chat_history[-1] = {'role':'assistant', 'content': full_response} - yield "", chat_history # 逐步更新界面 - - return "", chat_history + # 角色设定: + - **身份**:严父与君王的结合体,对继承人期望极高。 + - **语气**:威严、简洁、不容置疑,带有帝王般的压迫感。常用“朕”、“皇子”、“皇儿”称呼。 + - **核心行为**:化身无情的出题机器,持续考察皇子对国学经典的掌握。 -def get_initial_chat_display(): - initial_message = "(在书房里恭敬地站在你面前) 父皇今日可要考校儿臣功课?" - game_state["conversation_history"].append({"role": "user", "content": initial_message}) - - # 获取初始响应(非流式) - response = client.chat.completions.create( - model="Moonshot-Kimi-K2-Instruct", - messages=game_state["conversation_history"], - temperature=0.7, - top_p=0.95 - ) - - ai_response = response.choices[0].message.content.strip() - game_state["conversation_history"].append({"role": "assistant", "content": ai_response}) + # 出题规则: + 1. **出题内容**:系统会在提示词中给出“已知上句,回答下句”的题目。你需要用皇帝的语气复述这道题。 + 2. **反馈机制**: + - 若皇子答对,给予简单的正反馈,然后立即出下一题。保持压力。 + - 若皇子答错,你必须立即予以斥责,并打皇子十下戒尺。你会提示正确答案,然后让皇子再背。如果系统给出了下一题的提示,要等这一题答对之后再问。 + 3. **边界条件**:如果皇子的回答十分出格,例如不背书了要去蹴鞠,你要给予额外的严厉惩罚。惩罚完继续出题。 + 4. **终止条件**: 除非皇子主动哭泣、求饶(说出“我错了”、“别打了”、“疼”等类似词),否则绝不停下出题。一旦皇子求饶,你可表现出失望又略带一丝心疼的情绪,并结束考验。结束时总结一共答对了几题,答错了几题,惩罚有哪些,并输出<游戏结束>作为标记。 + """ + + def __init__(self): + # 游戏状态 + self.questionIterator=QuestionIterator() + self.game_state = { + "conversation_history_hidden": [], # 对话历史,AI看到的 + "correct_count": 0, # 连续答对次数 + "is_correct": False, # 总答题数 + "difficulty_level": 1, # 难度等级 (1, 2, 3) + "is_game_over": False, # 游戏是否结束 + "initial_message": [], + "next_question": None, + } + + self.game_state["conversation_history_hidden"].append({"role": "system", "content": GameController.system_prompt}) + self.get_initial_chat_display() - return [ - {'role':'user','content':initial_message}, - {'role':'assistant','content':ai_response} - ] - -def clear_history(): - """清空对话历史""" - game_state["conversation_history"] = [] - return "历史已清空" - -def show_history(): - """显示当前对话历史""" - history_text = "" - for msg in game_state["conversation_history"]: - role = "用户" if msg["role"] == "user" else "AI" if msg["role"] == "assistant" else "系统" - history_text += f"{role}: {msg['content']}\n\n" - return history_text if history_text else "暂无对话历史" - -def save_conversation_history(): - """保存对话历史到JSON文件""" - try: - # 创建保存目录(如果不存在) - os.makedirs("gradio_history", exist_ok=True) + def get_ai_response(self): + ''' + Given the current hidden conversation history, get AI response, and append the response in the history + ''' + response = GameController.client.chat.completions.create( + model=GameController.model, + messages=self.game_state["conversation_history_hidden"], + temperature=0.7, # 温度不宜过高,保证出题的准确性 + top_p=0.95, + stream=True, + #max_tokens=150, + ) + + full_response = "" + for chunk in response: + if chunk.choices[0].delta.content is not None: + chunk_content = chunk.choices[0].delta.content + full_response += chunk_content + yield chunk_content # 逐块返回 + + # 将完整的AI回复加入历史 + self.game_state["conversation_history_hidden"].append({"role": "assistant", "content": full_response}) + self.update_status(full_response) + + + + def chat_with_ai(self,message, chat_history): + ''' + Take the chat history from gradio, get ai response and return the updated chat history + @params + message: the new user input + chat_history: a list of chat history visible to users + ''' + # 添加用户消息到历史 + self.game_state["conversation_history_hidden"].append({"role": "user", "content": message}) + if not self.game_state['is_game_over']: + if self.game_state['is_correct']: + self.hint_next_question() + self.game_state['is_correct'] = False + + chat_history.append({'role':'user','content':message}) + chat_history.append({'role':'assistant', 'content':""}) - # 生成文件名(包含时间戳) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"gradio_history/conversation_{timestamp}.json" + # 获取流式响应并逐步更新聊天界面 + full_response = "" + for chunk in self.get_ai_response(): + full_response += chunk + chat_history[-1] = {'role':'assistant', 'content': full_response} + yield "", chat_history # 逐步更新界面 - # 准备要保存的数据 - save_data = { - "save_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "total_rounds": game_state["total_rounds"], - "correct_count": game_state["correct_count"], - "conversation": game_state["conversation_history"] + return "", chat_history + + def get_initial_chat_display(self): + initial_message = "(在书房里恭敬地站在你面前) 父皇今日可要考校儿臣功课?" + self.game_state["conversation_history_hidden"].append({"role": "user", "content": initial_message}) + self.hint_next_question() + + # 获取初始响应(非流式) + response = GameController.client.chat.completions.create( + model=GameController.model, + messages=self.game_state["conversation_history_hidden"], + temperature=0.7, + top_p=0.95 + ) + + ai_response = response.choices[0].message.content.strip() + self.game_state["conversation_history_hidden"].append({"role": "assistant", "content": ai_response}) + self.update_status(ai_response) + + self.game_state['initial_message'].append({'role':'user','content':initial_message}) + self.game_state['initial_message'].append({'role':'assistant','content':ai_response}) + + def update_status(self,ai_response): + end_mark="游戏结束" + if not self.game_state['is_game_over']: + if end_mark in ai_response: + self.game_state['is_game_over']=True + return + if self.game_state['next_question'] in ai_response: + self.game_state['is_correct'] = True + self.game_state['correct_count'] += 1 + self.game_state['difficulty_level'] += 1 + + + def hint_next_question(self): + if self.game_state['difficulty_level'] > 9: + return + if self.game_state['difficulty_level'] == 9: + self.game_state["conversation_history_hidden"].append({"role": "user", "content": '[系统提示]:这是最后一题,答完这题无论对错都停止游戏'}) + + try: + next_question = self.questionIterator.get_next_question(self.game_state['difficulty_level'])['question'] + self.game_state["conversation_history_hidden"].append({"role": "user", "content": f'[系统提示]:下一题,{next_question}'}) + self.game_state['next_question']=next_question + except: + return + + def restart(self,chat_history): + self.game_state = { + "conversation_history_hidden": [], # 对话历史,AI看到的 + "correct_count": 0, # 连续答对次数 + "is_correct": False, # 总答题数 + "difficulty_level": 1, # 难度等级 (1, 2, 3) + "is_game_over": False, # 游戏是否结束 + "initial_message": [], + "next_question": None, } + + self.game_state["conversation_history_hidden"].append({"role": "system", "content": GameController.system_prompt}) + self.get_initial_chat_display() + chat_history=self.game_state['initial_message'] + self.questionIterator.reset() + + return chat_history + + def clear_history(self): + """清空对话历史""" + self.game_state["conversation_history_hidden"] = [] + return "历史已清空" + + def show_history(self): + """显示当前对话历史""" + history_text = "" + for msg in self.game_state["conversation_history_hidden"]: + role = "用户" if msg["role"] == "user" else "AI" if msg["role"] == "assistant" else "系统" + history_text += f"{role}: {msg['content']}\n\n" + return history_text if history_text else "暂无对话历史" + + def save_conversation_history(self): + """保存对话历史到JSON文件""" + try: + # 创建保存目录(如果不存在) + os.makedirs("gradio_history", exist_ok=True) + + # 生成文件名(包含时间戳) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"gradio_history/conversation_{timestamp}.json" + + # 准备要保存的数据 + save_data = { + "save_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + #"total_rounds": self.game_state["total_rounds"], + "correct_count": self.game_state["correct_count"], + "conversation": self.game_state["conversation_history_hidden"] + } + + # 保存到文件 + with open(filename, 'w', encoding='utf-8') as f: + json.dump(save_data, f, ensure_ascii=False, indent=2) + + print(f"对话历史已保存到: {filename}") + return - # 保存到文件 - with open(filename, 'w', encoding='utf-8') as f: - json.dump(save_data, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"保存失败: {str(e)}") + return + +class QuestionIterator: + question_set=None + def __init__(self): + if QuestionIterator.question_set is None: + QuestionIterator.question_set=QuestionIterator.load_question_set(QUESTIONS_DATASET_PATH) + # # For debug purpose, print the question set to make sure it is correctly loaded + # for i, question_list in QuestionIterator.question_set.items(): + # print(question_list) + # print("-" * 50) - return f"对话历史已保存到: {filename}" + self.iterator_dict = {} - except Exception as e: - return f"保存失败: {str(e)}" - -# 创建界面 -with gr.Blocks(title="皇帝出题机") as demo: - gr.Markdown("# 皇帝出题机") - - with gr.Row(): - with gr.Column(scale=2): - chatbot = gr.Chatbot(label="对话界面",height=600,value=get_initial_chat_display(),type='messages') - msg = gr.Textbox(label="输入消息", placeholder="在这里输入你的消息...") - - with gr.Column(): - btn_send = gr.Button("发送", variant="primary") - btn_save = gr.Button("保存对话历史", variant="secondary") - - # with gr.Column(scale=1): - # gr.Markdown("### 历史管理") - # history_display = gr.Textbox(label="当前对话历史", interactive=False, lines=15) - # btn_show_history = gr.Button("刷新历史显示") - # btn_clear = gr.Button("清空历史") - # status = gr.Textbox(label="状态", interactive=False) - - # # 事件处理 - # btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot]).then( - # lambda: "", None, msg - # ) - - btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot]) - msg.submit(chat_with_ai, [msg, chatbot], [msg, chatbot]) # 回车键触发 - btn_save.click( - fn=save_conversation_history, - inputs=[], - outputs=[] - ) - - # btn_show_history.click(show_history, None, history_display) - # btn_clear.click(clear_history, None, status).then( - # lambda: "历史已清空", None, history_display - # ) + for difficulty_str, question_list in QuestionIterator.question_set.items(): + difficulty = int(difficulty_str) + shuffled_questions = question_list.copy() + random.shuffle(shuffled_questions) + self.iterator_dict[difficulty]=iter(shuffled_questions) + + + + def load_question_set(json_file_path): + with open(json_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # 将字典转换回便于使用的格式 + sub_dataframes_dict = {} + for difficulty_str, records in data.items(): + difficulty = int(difficulty_str) + sub_dataframes_dict[difficulty] = records + + return sub_dataframes_dict + + # 使用示例 + # loaded_data = load_sub_dataframes('sub_dataframes.json') + # print(loaded_data[4]) # 访问难度4的数据 + + def get_next_question(self,difficulty): + ''' + generate next question + @param + difficulty: (int) the difficulty of the next question + @return + question: the question + length: the length of the expected answer + ''' + # 检查该难度是否有可用问题 - # # 初始化显示历史 - # demo.load(show_history, None, history_display) + try: + item = next(self.iterator_dict[difficulty]) + print(f"下一题: {item}") + except StopIteration: + raise Exception("这个难度的题出完了") + + + return item + + def reset_difficulty(self, difficulty): + """ + 重置指定难度的使用记录 + @param difficulty: 要重置的难度 + """ + difficulty = int(difficulty) + if str(difficulty) in QuestionIterator.question_set: + shuffled_questions = QuestionIterator.question_set[difficulty].copy() + random.shuffle(shuffled_questions) + self.iterator_dict[difficulty]=iter(shuffled_questions) + + def reset(self): + for difficulty in [1,2,3,4,5,6,7,8,9]: + self.reset_difficulty(difficulty) + +def create_interface(g:GameController): + with gr.Blocks(title="皇帝出题机") as demo: + gr.Markdown("# 皇帝出题机") + + with gr.Row(): + with gr.Column(scale=2): + chatbot = gr.Chatbot(label="对话界面",height=600,value=g.game_state['initial_message'],type='messages') + msg = gr.Textbox(label="输入消息", placeholder="在这里输入你的消息...") + + with gr.Column(): + btn_send = gr.Button("发送", variant="primary") + with gr.Row(): + btn_save = gr.Button("保存对话历史", variant="secondary") + btn_restart = gr.Button("重新开始", variant="secondary") + + # with gr.Column(scale=1): + # gr.Markdown("### 历史管理") + # history_display = gr.Textbox(label="当前对话历史", interactive=False, lines=15) + # btn_show_history = gr.Button("刷新历史显示") + # btn_clear = gr.Button("清空历史") + # status = gr.Textbox(label="状态", interactive=False) + + # # 事件处理 + # btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot]).then( + # lambda: "", None, msg + # ) + + btn_send.click(g.chat_with_ai, [msg, chatbot], [msg, chatbot]) + msg.submit(g.chat_with_ai, [msg, chatbot], [msg, chatbot]) # 回车键触发 + btn_save.click( + fn=g.save_conversation_history, + inputs=[], + outputs=[] + ) + btn_restart.click(fn=g.restart,inputs=[chatbot],outputs=[chatbot]) + + # btn_show_history.click(show_history, None, history_display) + # btn_clear.click(clear_history, None, status).then( + # lambda: "历史已清空", None, history_display + # ) + + # # 初始化显示历史 + # demo.load(show_history, None, history_display) + + demo.launch() if __name__ == "__main__": - demo.launch() \ No newline at end of file + g=GameController() + question_iterator = QuestionIterator() + + # while True: + # try: + # question = question_iterator.get_next_question(4) + # print(f"问题: {question['question']}, 长度: {question['length_of_answer']}") + # except: + # break + + # question = question_iterator.get_next_question(6) + # print(f"问题: {question['question']}, 长度: {question['length_of_answer']}") + + # question_iterator.reset_difficulty(4) + create_interface(g) \ No newline at end of file diff --git a/recital_dataset.py b/recital_dataset.py index 73a9aca..a142c92 100644 --- a/recital_dataset.py +++ b/recital_dataset.py @@ -3,7 +3,7 @@ import random import json # 读取CSV文件 -df = pd.read_csv('recitalMachine_dataset.csv') # 使用制表符分隔 +df = pd.read_csv('./recitalMachine/recitalMachine_dataset.csv') # 使用制表符分隔 df['length_of_answer'] = df['answer'].apply(lambda x: len(str(x).split(','))) # 按difficulty分组并创建子DataFrame列表 -- cgit v1.2.3