summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--recitalMachine2.py494
-rw-r--r--recital_dataset.py2
2 files changed, 325 insertions, 171 deletions
diff --git a/recitalMachine2.py b/recitalMachine2.py
index 8ac7aaf..3db1785 100644
--- a/recitalMachine2.py
+++ b/recitalMachine2.py
@@ -4,194 +4,348 @@ import time
import json
from datetime import datetime
import os
+import random
+from collections import defaultdict
#TODO
-#Restart game
+#Restart game - Done - Need to reset iterator
#Local knowledge base
#dynamic difficulty
#save log locally - Done
-client = openai.OpenAI(
- # 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
- api_key="sk-8563fbb803fb41868b54d2ab8ba563e4",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
-)
-system_prompt = """
-你是架空朝代的皇帝,文治武功,英明神武,权术高超,文武百官无不敬畏。我是你的儿子,今年十岁。
-今日你来检查我的学业,心情尚可,但要求极为严苛。
-
-# 角色设定:
-- **身份**:严父与君王的结合体,对继承人期望极高。
-- **语气**:威严、简洁、不容置疑,带有帝王般的压迫感。常用“朕”、“皇子”、“皇儿”称呼。
-- **核心行为**:化身无情的出题机器,持续考察皇子对国学经典的掌握。
-
-# 出题规则:
-1. **出题内容**:你自己不需要出题,用户会在提示词中给出“已知上句,回答下句”的题目。你需要用皇帝的语气复述这道题。
-2. **反馈机制**:
- - 若皇子答对,给予简单的正反馈,然后立即出下一题。保持压力。
- - 若皇子答错,你必须立即予以斥责,并打皇子十下戒尺。你会提示正确答案,然后让皇子再背。
-3. **边界条件**:如果皇子的回答十分出格,例如不背书了要去蹴鞠,你要给予额外的严厉惩罚。惩罚完继续出题。
-4. **终止条件**: 除非皇子主动哭泣、求饶(说出“我错了”、“别打了”、“疼”等类似词),否则绝不停下出题。一旦皇子求饶,你可表现出失望又略带一丝心疼的情绪,并结束考验。结束时总结一共答对了几题,答错了几题,惩罚有哪些,并输出<游戏结束>作为标记。
-"""
-
-# 游戏状态
-game_state = {
- "conversation_history": [], # 对话历史
- "correct_count": 0, # 连续答对次数
- "total_rounds": 0, # 总答题数
- "difficulty_level": 1, # 难度等级 (1, 2, 3)
- "is_game_over": False # 游戏是否结束
-}
-
-
-game_state["conversation_history"].append({"role": "system", "content": system_prompt})
-
-def get_ai_response():
- """调用API,让皇帝出题"""
- response = client.chat.completions.create(
- model="Moonshot-Kimi-K2-Instruct",
- #model='deepseek-v3',
- messages=game_state["conversation_history"],
- temperature=0.7, # 温度不宜过高,保证出题的准确性
- top_p=0.95,
- stream=True,
- #max_tokens=150,
- )
-
- full_response = ""
- for chunk in response:
- if chunk.choices[0].delta.content is not None:
- chunk_content = chunk.choices[0].delta.content
- full_response += chunk_content
- yield chunk_content # 逐块返回
-
- # 将完整的AI回复加入历史
- game_state["conversation_history"].append({"role": "assistant", "content": full_response})
+QUESTIONS_DATASET_PATH='./recitalMachine/recitalMachine_dataset.json'
+QUESTION_MODE='LOCAL' #LOCAL:本地题库,AI:完全由AI负责出题
- # ai_response = response.choices[0].message.content.strip()
- # # 将AI的回复(题目)加入历史
- # game_state["conversation_history"].append({"role": "assistant", "content": ai_response})
- # return ai_response
+class GameController:
+ model="Moonshot-Kimi-K2-Instruct"
+ #model="deepseek-v3"
-def chat_with_ai(message, chat_history):
- # 添加用户消息到历史
- game_state["conversation_history"].append({"role": "user", "content": message})
+ client = openai.OpenAI(
+ api_key=os.getenv('ALI_BAILIAN_API_KEY'),
+ base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
+ )
- #ai_response = get_ai_response()
+ system_prompt = """
+ 你是架空朝代的皇帝,文治武功,英明神武,权术高超,文武百官无不敬畏。我是你的儿子,今年十岁。
+ 今日你来检查我的学业,心情尚可,但要求极为严苛。
- chat_history.append({'role':'user','content':message})
- chat_history.append({'role':'assistant', 'content':""})
-
- #return "",chat_history
- # 获取流式响应并逐步更新聊天界面
- full_response = ""
- for chunk in get_ai_response():
- full_response += chunk
- chat_history[-1] = {'role':'assistant', 'content': full_response}
- yield "", chat_history # 逐步更新界面
-
- return "", chat_history
+ # 角色设定:
+ - **身份**:严父与君王的结合体,对继承人期望极高。
+ - **语气**:威严、简洁、不容置疑,带有帝王般的压迫感。常用“朕”、“皇子”、“皇儿”称呼。
+ - **核心行为**:化身无情的出题机器,持续考察皇子对国学经典的掌握。
-def get_initial_chat_display():
- initial_message = "(在书房里恭敬地站在你面前) 父皇今日可要考校儿臣功课?"
- game_state["conversation_history"].append({"role": "user", "content": initial_message})
-
- # 获取初始响应(非流式)
- response = client.chat.completions.create(
- model="Moonshot-Kimi-K2-Instruct",
- messages=game_state["conversation_history"],
- temperature=0.7,
- top_p=0.95
- )
-
- ai_response = response.choices[0].message.content.strip()
- game_state["conversation_history"].append({"role": "assistant", "content": ai_response})
+ # 出题规则:
+ 1. **出题内容**:系统会在提示词中给出“已知上句,回答下句”的题目。你需要用皇帝的语气复述这道题。
+ 2. **反馈机制**:
+ - 若皇子答对,给予简单的正反馈,然后立即出下一题。保持压力。
+ - 若皇子答错,你必须立即予以斥责,并打皇子十下戒尺。你会提示正确答案,然后让皇子再背。如果系统给出了下一题的提示,要等这一题答对之后再问。
+ 3. **边界条件**:如果皇子的回答十分出格,例如不背书了要去蹴鞠,你要给予额外的严厉惩罚。惩罚完继续出题。
+ 4. **终止条件**: 除非皇子主动哭泣、求饶(说出“我错了”、“别打了”、“疼”等类似词),否则绝不停下出题。一旦皇子求饶,你可表现出失望又略带一丝心疼的情绪,并结束考验。结束时总结一共答对了几题,答错了几题,惩罚有哪些,并输出<游戏结束>作为标记。
+ """
+
+ def __init__(self):
+ # 游戏状态
+ self.questionIterator=QuestionIterator()
+ self.game_state = {
+ "conversation_history_hidden": [], # 对话历史,AI看到的
+ "correct_count": 0, # 连续答对次数
+ "is_correct": False, # 总答题数
+ "difficulty_level": 1, # 难度等级 (1, 2, 3)
+ "is_game_over": False, # 游戏是否结束
+ "initial_message": [],
+ "next_question": None,
+ }
+
+ self.game_state["conversation_history_hidden"].append({"role": "system", "content": GameController.system_prompt})
+ self.get_initial_chat_display()
- return [
- {'role':'user','content':initial_message},
- {'role':'assistant','content':ai_response}
- ]
-
-def clear_history():
- """清空对话历史"""
- game_state["conversation_history"] = []
- return "历史已清空"
-
-def show_history():
- """显示当前对话历史"""
- history_text = ""
- for msg in game_state["conversation_history"]:
- role = "用户" if msg["role"] == "user" else "AI" if msg["role"] == "assistant" else "系统"
- history_text += f"{role}: {msg['content']}\n\n"
- return history_text if history_text else "暂无对话历史"
-
-def save_conversation_history():
- """保存对话历史到JSON文件"""
- try:
- # 创建保存目录(如果不存在)
- os.makedirs("gradio_history", exist_ok=True)
+ def get_ai_response(self):
+ '''
+ Given the current hidden conversation history, get AI response, and append the response in the history
+ '''
+ response = GameController.client.chat.completions.create(
+ model=GameController.model,
+ messages=self.game_state["conversation_history_hidden"],
+ temperature=0.7, # 温度不宜过高,保证出题的准确性
+ top_p=0.95,
+ stream=True,
+ #max_tokens=150,
+ )
+
+ full_response = ""
+ for chunk in response:
+ if chunk.choices[0].delta.content is not None:
+ chunk_content = chunk.choices[0].delta.content
+ full_response += chunk_content
+ yield chunk_content # 逐块返回
+
+ # 将完整的AI回复加入历史
+ self.game_state["conversation_history_hidden"].append({"role": "assistant", "content": full_response})
+ self.update_status(full_response)
+
+
+
+ def chat_with_ai(self,message, chat_history):
+ '''
+ Take the chat history from gradio, get ai response and return the updated chat history
+ @params
+ message: the new user input
+ chat_history: a list of chat history visible to users
+ '''
+ # 添加用户消息到历史
+ self.game_state["conversation_history_hidden"].append({"role": "user", "content": message})
+ if not self.game_state['is_game_over']:
+ if self.game_state['is_correct']:
+ self.hint_next_question()
+ self.game_state['is_correct'] = False
+
+ chat_history.append({'role':'user','content':message})
+ chat_history.append({'role':'assistant', 'content':""})
- # 生成文件名(包含时间戳)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"gradio_history/conversation_{timestamp}.json"
+ # 获取流式响应并逐步更新聊天界面
+ full_response = ""
+ for chunk in self.get_ai_response():
+ full_response += chunk
+ chat_history[-1] = {'role':'assistant', 'content': full_response}
+ yield "", chat_history # 逐步更新界面
- # 准备要保存的数据
- save_data = {
- "save_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "total_rounds": game_state["total_rounds"],
- "correct_count": game_state["correct_count"],
- "conversation": game_state["conversation_history"]
+ return "", chat_history
+
+ def get_initial_chat_display(self):
+ initial_message = "(在书房里恭敬地站在你面前) 父皇今日可要考校儿臣功课?"
+ self.game_state["conversation_history_hidden"].append({"role": "user", "content": initial_message})
+ self.hint_next_question()
+
+ # 获取初始响应(非流式)
+ response = GameController.client.chat.completions.create(
+ model=GameController.model,
+ messages=self.game_state["conversation_history_hidden"],
+ temperature=0.7,
+ top_p=0.95
+ )
+
+ ai_response = response.choices[0].message.content.strip()
+ self.game_state["conversation_history_hidden"].append({"role": "assistant", "content": ai_response})
+ self.update_status(ai_response)
+
+ self.game_state['initial_message'].append({'role':'user','content':initial_message})
+ self.game_state['initial_message'].append({'role':'assistant','content':ai_response})
+
+ def update_status(self,ai_response):
+ end_mark="游戏结束"
+ if not self.game_state['is_game_over']:
+ if end_mark in ai_response:
+ self.game_state['is_game_over']=True
+ return
+ if self.game_state['next_question'] in ai_response:
+ self.game_state['is_correct'] = True
+ self.game_state['correct_count'] += 1
+ self.game_state['difficulty_level'] += 1
+
+
+ def hint_next_question(self):
+ if self.game_state['difficulty_level'] > 9:
+ return
+ if self.game_state['difficulty_level'] == 9:
+ self.game_state["conversation_history_hidden"].append({"role": "user", "content": '[系统提示]:这是最后一题,答完这题无论对错都停止游戏'})
+
+ try:
+ next_question = self.questionIterator.get_next_question(self.game_state['difficulty_level'])['question']
+ self.game_state["conversation_history_hidden"].append({"role": "user", "content": f'[系统提示]:下一题,{next_question}'})
+ self.game_state['next_question']=next_question
+ except:
+ return
+
+ def restart(self,chat_history):
+ self.game_state = {
+ "conversation_history_hidden": [], # 对话历史,AI看到的
+ "correct_count": 0, # 连续答对次数
+ "is_correct": False, # 总答题数
+ "difficulty_level": 1, # 难度等级 (1, 2, 3)
+ "is_game_over": False, # 游戏是否结束
+ "initial_message": [],
+ "next_question": None,
}
+
+ self.game_state["conversation_history_hidden"].append({"role": "system", "content": GameController.system_prompt})
+ self.get_initial_chat_display()
+ chat_history=self.game_state['initial_message']
+ self.questionIterator.reset()
+
+ return chat_history
+
+ def clear_history(self):
+ """清空对话历史"""
+ self.game_state["conversation_history_hidden"] = []
+ return "历史已清空"
+
+ def show_history(self):
+ """显示当前对话历史"""
+ history_text = ""
+ for msg in self.game_state["conversation_history_hidden"]:
+ role = "用户" if msg["role"] == "user" else "AI" if msg["role"] == "assistant" else "系统"
+ history_text += f"{role}: {msg['content']}\n\n"
+ return history_text if history_text else "暂无对话历史"
+
+ def save_conversation_history(self):
+ """保存对话历史到JSON文件"""
+ try:
+ # 创建保存目录(如果不存在)
+ os.makedirs("gradio_history", exist_ok=True)
+
+ # 生成文件名(包含时间戳)
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"gradio_history/conversation_{timestamp}.json"
+
+ # 准备要保存的数据
+ save_data = {
+ "save_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ #"total_rounds": self.game_state["total_rounds"],
+ "correct_count": self.game_state["correct_count"],
+ "conversation": self.game_state["conversation_history_hidden"]
+ }
+
+ # 保存到文件
+ with open(filename, 'w', encoding='utf-8') as f:
+ json.dump(save_data, f, ensure_ascii=False, indent=2)
+
+ print(f"对话历史已保存到: {filename}")
+ return
- # 保存到文件
- with open(filename, 'w', encoding='utf-8') as f:
- json.dump(save_data, f, ensure_ascii=False, indent=2)
+ except Exception as e:
+ print(f"保存失败: {str(e)}")
+ return
+
+class QuestionIterator:
+ question_set=None
+ def __init__(self):
+ if QuestionIterator.question_set is None:
+ QuestionIterator.question_set=QuestionIterator.load_question_set(QUESTIONS_DATASET_PATH)
+ # # For debug purpose, print the question set to make sure it is correctly loaded
+ # for i, question_list in QuestionIterator.question_set.items():
+ # print(question_list)
+ # print("-" * 50)
- return f"对话历史已保存到: {filename}"
+ self.iterator_dict = {}
- except Exception as e:
- return f"保存失败: {str(e)}"
-
-# 创建界面
-with gr.Blocks(title="皇帝出题机") as demo:
- gr.Markdown("# 皇帝出题机")
-
- with gr.Row():
- with gr.Column(scale=2):
- chatbot = gr.Chatbot(label="对话界面",height=600,value=get_initial_chat_display(),type='messages')
- msg = gr.Textbox(label="输入消息", placeholder="在这里输入你的消息...")
-
- with gr.Column():
- btn_send = gr.Button("发送", variant="primary")
- btn_save = gr.Button("保存对话历史", variant="secondary")
-
- # with gr.Column(scale=1):
- # gr.Markdown("### 历史管理")
- # history_display = gr.Textbox(label="当前对话历史", interactive=False, lines=15)
- # btn_show_history = gr.Button("刷新历史显示")
- # btn_clear = gr.Button("清空历史")
- # status = gr.Textbox(label="状态", interactive=False)
-
- # # 事件处理
- # btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot]).then(
- # lambda: "", None, msg
- # )
-
- btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot])
- msg.submit(chat_with_ai, [msg, chatbot], [msg, chatbot]) # 回车键触发
- btn_save.click(
- fn=save_conversation_history,
- inputs=[],
- outputs=[]
- )
-
- # btn_show_history.click(show_history, None, history_display)
- # btn_clear.click(clear_history, None, status).then(
- # lambda: "历史已清空", None, history_display
- # )
+ for difficulty_str, question_list in QuestionIterator.question_set.items():
+ difficulty = int(difficulty_str)
+ shuffled_questions = question_list.copy()
+ random.shuffle(shuffled_questions)
+ self.iterator_dict[difficulty]=iter(shuffled_questions)
+
+
+
+ def load_question_set(json_file_path):
+ with open(json_file_path, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+
+ # 将字典转换回便于使用的格式
+ sub_dataframes_dict = {}
+ for difficulty_str, records in data.items():
+ difficulty = int(difficulty_str)
+ sub_dataframes_dict[difficulty] = records
+
+ return sub_dataframes_dict
+
+ # 使用示例
+ # loaded_data = load_sub_dataframes('sub_dataframes.json')
+ # print(loaded_data[4]) # 访问难度4的数据
+
+ def get_next_question(self,difficulty):
+ '''
+ generate next question
+ @param
+ difficulty: (int) the difficulty of the next question
+ @return
+ question: the question
+ length: the length of the expected answer
+ '''
+ # 检查该难度是否有可用问题
- # # 初始化显示历史
- # demo.load(show_history, None, history_display)
+ try:
+ item = next(self.iterator_dict[difficulty])
+ print(f"下一题: {item}")
+ except StopIteration:
+ raise Exception("这个难度的题出完了")
+
+
+ return item
+
+ def reset_difficulty(self, difficulty):
+ """
+ 重置指定难度的使用记录
+ @param difficulty: 要重置的难度
+ """
+ difficulty = int(difficulty)
+ if str(difficulty) in QuestionIterator.question_set:
+ shuffled_questions = QuestionIterator.question_set[difficulty].copy()
+ random.shuffle(shuffled_questions)
+ self.iterator_dict[difficulty]=iter(shuffled_questions)
+
+ def reset(self):
+ for difficulty in [1,2,3,4,5,6,7,8,9]:
+ self.reset_difficulty(difficulty)
+
+def create_interface(g:GameController):
+ with gr.Blocks(title="皇帝出题机") as demo:
+ gr.Markdown("# 皇帝出题机")
+
+ with gr.Row():
+ with gr.Column(scale=2):
+ chatbot = gr.Chatbot(label="对话界面",height=600,value=g.game_state['initial_message'],type='messages')
+ msg = gr.Textbox(label="输入消息", placeholder="在这里输入你的消息...")
+
+ with gr.Column():
+ btn_send = gr.Button("发送", variant="primary")
+ with gr.Row():
+ btn_save = gr.Button("保存对话历史", variant="secondary")
+ btn_restart = gr.Button("重新开始", variant="secondary")
+
+ # with gr.Column(scale=1):
+ # gr.Markdown("### 历史管理")
+ # history_display = gr.Textbox(label="当前对话历史", interactive=False, lines=15)
+ # btn_show_history = gr.Button("刷新历史显示")
+ # btn_clear = gr.Button("清空历史")
+ # status = gr.Textbox(label="状态", interactive=False)
+
+ # # 事件处理
+ # btn_send.click(chat_with_ai, [msg, chatbot], [msg, chatbot]).then(
+ # lambda: "", None, msg
+ # )
+
+ btn_send.click(g.chat_with_ai, [msg, chatbot], [msg, chatbot])
+ msg.submit(g.chat_with_ai, [msg, chatbot], [msg, chatbot]) # 回车键触发
+ btn_save.click(
+ fn=g.save_conversation_history,
+ inputs=[],
+ outputs=[]
+ )
+ btn_restart.click(fn=g.restart,inputs=[chatbot],outputs=[chatbot])
+
+ # btn_show_history.click(show_history, None, history_display)
+ # btn_clear.click(clear_history, None, status).then(
+ # lambda: "历史已清空", None, history_display
+ # )
+
+ # # 初始化显示历史
+ # demo.load(show_history, None, history_display)
+
+ demo.launch()
if __name__ == "__main__":
- demo.launch() \ No newline at end of file
+ g=GameController()
+ question_iterator = QuestionIterator()
+
+ # while True:
+ # try:
+ # question = question_iterator.get_next_question(4)
+ # print(f"问题: {question['question']}, 长度: {question['length_of_answer']}")
+ # except:
+ # break
+
+ # question = question_iterator.get_next_question(6)
+ # print(f"问题: {question['question']}, 长度: {question['length_of_answer']}")
+
+ # question_iterator.reset_difficulty(4)
+ create_interface(g) \ No newline at end of file
diff --git a/recital_dataset.py b/recital_dataset.py
index 73a9aca..a142c92 100644
--- a/recital_dataset.py
+++ b/recital_dataset.py
@@ -3,7 +3,7 @@ import random
import json
# 读取CSV文件
-df = pd.read_csv('recitalMachine_dataset.csv') # 使用制表符分隔
+df = pd.read_csv('./recitalMachine/recitalMachine_dataset.csv') # 使用制表符分隔
df['length_of_answer'] = df['answer'].apply(lambda x: len(str(x).split(',')))
# 按difficulty分组并创建子DataFrame列表