fastapi 集成队列
Celery 的核心是一个 分布式任务队列系统,主要用于:
异步任务处理:将耗时的任务(如发送邮件、处理文件、调用AI模型)放入队列,由Worker进程异步执行,避免阻塞主程序。
分布式架构:支持多Worker跨机器并行处理任务,通过消息中间件(如Redis/RabbitMQ)协调。
任务重试/错误处理:任务失败后可以自动重试,保证可靠性。
celery_app.py
import logging import time from celery import Celery from sdk.redis.redis_manager import app_configs celery_app = Celery( "queues", broker=app_configs['redis'], backend=app_configs['redis'], include=[ "queues.tasks", "queues.callback" ], broker_transport_options={ 'socket_timeout': 10, 'socket_connect_timeout': 5, 'retry_on_timeout': True, 'max_retries': 3, 'health_check_interval': 30, # 健康检查间隔 'socket_keepalive': True, # 启用 TCP keepalive }, # 增加 broker 超时 redis_backend_health_check_interval=30, # 可选:增加健康检查间隔 redis_socket_timeout=120, # 增加 socket 超时 redis_max_connections=20, broker_connection_max_retries=3, # 最大重试次数 broker_connection_retry=True, # 启用连接重试 broker_pool_limit=10, # 连接池大小 broker_pool_timeout=30, # 获取连接超时时间 broker_connection_retry_on_startup=True, ) # 可选配置 celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=30 * 60, # 任务超时30分钟 broker_connection_retry=True, # 启用连接重试 broker_connection_max_retries=5, # 最大重试次数 broker_pool_limit=10, # 连接池大小 broker_pool_timeout=30, # 获取连接超时时间 worker_heartbeat = 30, broker_heartbeat=30, # 30秒一次心跳 broker_heartbeat_checkrate=2.0, # 心跳检查频率 result_expires=3600, task_default_queue="celery", # 统一队列名 beat_schedule={ 'check_redis_connection': { 'task': 'queues.tasks.check_redis_connection', 'schedule': 300.0, # 每5分钟执行一次 }, 'ai_interview_task': { 'task': 'queues.tasks.ai_interview_task', 'schedule': 3600.0, }, }, )
connection_check.py
from celery import shared_task import logging from queues.connection_manager import ConnectionManager conn_manager=ConnectionManager() @shared_task def check_redis_connection(): logging.info("执行Redis连接检查...") conn_manager.ensure_connection() return "Connection check completed"
connection_manager.py
import logging import time class ConnectionManager: def __init__(self, app): self.app = app self.last_heartbeat = time.time() self.last_connection_check = 0 self.connection_check_interval = 60 # 每60秒检查一次 def ensure_connection(self): now = time.time() if now - self.last_connection_check < self.connection_check_interval: return self.last_connection_check = now try: with self.app.pool.acquire(block=True) as conn: print("执行ping") # 执行一个简单的Redis命令测试连接 conn.client().ping() except Exception as e: print(f"连接检查失败: {str(e)},尝试重新连接...") logging.warning(f"连接检查失败: {str(e)},尝试重新连接...") try: self.app.pool.force_close_all() self.app.pool.disconnect() # 重新建立连接 with self.app.pool.acquire(block=True) as conn: conn.client().ping() logging.info("连接重新建立成功") except Exception as e: logging.error(f"重新连接失败: {str(e)}")
tasks.py
import asyncio import uuid from datetime import datetime from fastapi import HTTPException, status from fastapi.responses import JSONResponse from celery.result import AsyncResult from queues.tasks import ai_interview_task def create_task(task_request: any): """创建新任务并加入Celery队列""" task_id = task_request.get("task_id") task_data = { "task_id": task_request.get("task_id"), "report_data": task_request.get("report_data"), "jd_title": task_request.get("jd_title"), "jd_portrait": task_request.get("jd_portrait"), "resume_portrait": task_request.get("resume_portrait"), "company_portrait": task_request.get("company_portrait"), "timestamp": str(datetime.now()) } # 发送任务到Celery task =ai_interview_task.apply_async( kwargs={"task_data": task_data}, task_id=task_id ) if task is not None: return { "message": "任务已接收", "task_id": task_id, "status_url": f"/tasks/{task_id}" } return { "message": "任务接收失败", "task_id": task_id, "status_url": f"/tasks/{task_id}" } def get_task_status(task_id: str): """获取任务状态""" task_result = AsyncResult(task_id) print("task_result", task_result) if not task_result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务 {task_id} 不存在" ) response = { "task_id": task_id, "status": task_result.status, "result": None } if task_result.status == "SUCCESS": response["result"] = task_result.result elif task_result.status == "FAILURE": response["result"] = str(task_result.result) elif task_result.status == "PROGRESS": response.update(task_result.info) print("response", response) return response if __name__ == "__main__": data=[ { "question":"作为董事长,请结合您过往的团队管理经验,谈谈如何有效激励并保持一支高绩效AI技术团队的稳定性?", "answer":"首先,设定明确的目标和期望,第二,提供成长的成长与发展机会。第三。创建支持性的工作环境,第四,实施公平的奖励机制,五、第五,关注员工福利,第六。领导层的透明沟通。", "feedback":{ "score":65, "evaluateContent":"回答内容整体逻辑较为清晰,涵盖了团队管理中的一些关键要素,如目标设定、员工成长、工作环境、奖励机制、员工福利以及透明沟通。但存在一些小问题:首先,表述中有重复和不规范的地方,例如‘第二,提供成长的成长与发展机会’应为‘提供成长与发展机会’,‘第三。创建支持性的工作环境’中的句号使用不当。其次,虽然提到了多个激励措施,但缺乏深度阐述,尤其是针对AI技术团队的特点没有具体展开,比如如何结合AI领域的前沿发展来激励团队成员。最后,关于高绩效团队稳定性的长期策略可以进一步细化。总体来说,答案与标准答案匹配度约为65%。", } } ] jd_title="董事长" jd_portrait={ "content": { "工作职能": "", "风格要求": 3, "职位名称": "前端端端端端开发总经理 + 2", "工作地点": "浙江省宁波市鄞州区明楼街道明北社区", "其他补充/备注": "76uii语客语n", "年龄要求": "27-48", "工作职权": "", "薪酬范围": "14-67", "月工资": 0, "职位偏好": "用户运营", "入职时间": "2025-05-14", "是否统招": 1, "下级人数": "0人", "院校要求": 4, "详细岗位要求": "岗位说明:前端开发总经理\n\n公司简介:我们是一家位于武汉市东湖新技术开发区的大型企业,注册资本为伍佰万元整,主要提供人力资源管理服务及培训。我们的经营范围涵盖了人力/行政/法务和教育培训两大领域。公司规模在599-1999人之间,员工以知识型为主,强调学习氛围和企业文化。\n\n岗位职责:\n1. 负责公司前端技术团队的整体规划与管理,带领团队完成各项开发任务。\n2. 参与产品设计和用户体验优化,确保前端技术方案符合用户运营需求。\n3. 制定并实施前端技术发展战略,跟踪行业前沿技术,推动技术创新。\n4. 协调跨部门合作,确保项目按时高质量交付。\n\n任职要求:\n1. 本科及以上学历,计算机相关专业优先。\n2. 拥有5-8年前端开发相关工作经验,其中至少3年以上管理经验。\n3. 精通HTML、CSS、JavaScript等前端开发技术,熟悉主流前端框架(如React、Vue等)。\n4. 对用户运营有深刻理解,能够从用户角度出发进行 术方案设计。\n5. 具备良好的沟通能力和团队协作精神,能够激励团队成员达成目标。\n6. 对人力/行政/法务或教育培训行业有一定了解者优先。\n\n薪资待遇:每月14-67k,具体根据个人能力和经验面议。\n\n工作地点:湖北省武汉市东湖新技术开发区\n\n我们重视每一位员工的成长与发展,期待有志之士加入我们的团队,共同创造更加辉煌的未来!", "工龄要求": "5-8", "岗位优势": "加入我们,成为前端开发领域的领航者!作为一家位于武汉东湖新技术开发区的大型企业,我们专注于人力资源管理服务与培训,同时涉足教育培训行业。公司规模达599-1999人,以知识型员工为主,学习氛围浓厚,企业文化卓越。\n\n作为前端开发总经理,您将带领团队制定技术战略、优化用户体验,并推动技术创新。我们提供具有竞争力的薪资(14-67k/月),以及完善的福利体系:节日福利、补充 疗保险、健康体检、弹性工作制、生日福利和定期团建。\n\n在这里,您不仅能与优秀的人才共事,还能享受广阔的职业发展空间。我们期待热爱技术、关注用户运营、具备管理经验的您,携手共创未来!", "学历要求": "本科", "所属行业": "客服/运营", "职位类别": "业务运营", "试用期时长": 3, "工作性质": 2, "福利待遇": "节日福利-补充医疗保险-健康体检-弹性工作制-生日福利-定期团建" } } resume_portrait={ "基本信息": { "年龄": 28, "性别": "", "工作年限": 5, "参加工作时间": "2008-07-01", "当前职位": "", "在职状态": "", "换岗频次": 3, "当前月薪(下限)": "", "当前月薪(上限)": "", "工作性质": "", "当前所在地": "", "当前所在地(规范化)": "", "简历解析时间": "", "简历更新时间": "" }, "教育信息": [ { "学历": "本科", "毕业院校": "南京师范大学", "是否统招": "否", "毕业学校类型": "普通", "毕业学校国内排名": "", "毕业学校国际排名": "", "专业": "" } ], "求职期望": { "期望工作": "人力资源、培训、互联网/电子商务/网游", "期望薪资": "4500~5999元/月", "期望薪资下限": "4.5K", "期望薪资上限": "6K", "期望行业": "广告、法律、贸易/进出口、公关/市场推广/会展、互联网/电子商务", "期望工作性质": "", "当前离职/在职状态": "未知", "期望工作地址": "", "期望工作地址(规范化)": "" }, "工作经历": [ { "开始时间": "2008-07-01", "结束时间": "2008-08-01", "公司名称": "***公司", "公司性质": "", "公司规模": "", "公司描述": "", "行业": "", "职位": "实习", "职能类型": "", "工作性质": "", "工作薪资": "", "工作地点": "", "持续时间": "一个月", "工作能力": "", "工作内容": "实习经历,具体职责不详" }, { "开始时间": "2009-02-01", "结束时间": "2011-02-01", "公司名称": "***公司", "公司性质": "", "公司规模": "", "公司描述": "", "行业": "", "职位": "经理助理/秘书", "职能类型": "", "工作性质": "", "工作薪资": "", "工作地点": "", "持续时间": "两年", "工作能力": "具备一定的行政管理能力和沟通协调技巧", "工作内容": "协助部门经理处理日常事务,参与文件编写和会议组织等" }, { "开始时间": "2011-03-01", "结束时间": "2013-03-01", "公司名称": "***公司", "公司性质": "", "公司规模": "", "公司描述": "", "行业": "", "职位": "招聘专员/助理", "职能类型": "", "工作性质": "", "工作薪资": "", "工作地点": "", "持续时间": "两年", "工作能力": "熟悉招聘流程,具有较强的沟通能力和问题解决能力", "工作内容": "负责简历筛选、面试安排及入职手续办理等工作" }, { "开始时间": "2013-08-01", "结束时间": "至今", "公司名称": "***公司", "公司性质": "", "公司规模": "", "公司描述": "", "行业": "", "职位": "招聘专员/助理", "职能类型": "", "工作性质": "", "工作薪资": "", "工作地点": "", "持续时间": "至今", "工作能力": "具备丰富的招聘经验,擅长团队协作与沟通", "工作内容": "负责员工招聘、培训及绩效评估等相关工作" } ], "项目经历": [], "培训经历": [], "语言技能": [], "所有证书及奖项": [], "画像总结": "该候选人拥有5年的HR工作经验,主要担任招聘专员/助理的职务。在南京师范大学本科毕业后,先后在多家公司积累了丰富的人力资源管理工作经验,特别是在招聘流程和沟通协调方面表现出色。期望从事人力资源或互联网相关领域的工作,并寻求4.5K至6K元/月的薪酬。" } company_portrait="三五有限公司" task_data = { "task_id": "bb", "report_data": data, "jd_title": jd_title, "jd_portrait": jd_portrait, "resume_portrait": resume_portrait, "company_portrait":company_portrait, "timestamp": str(datetime.now()) } #sss=create_task(task_data) #print(sss) get_task_status("bbbb")
tasks.py
import asyncio import logging import time from datetime import datetime from interview.agent.complete_agent import CompleteAgent from queues.celery_app import celery_app from queues.callback import send_callback_task @celery_app.task(bind=True, autoretry_for=(TimeoutError,), max_retries=3) def ai_interview_task(self,task_data: dict): """模拟长时间运行的任务""" try: # 获取任务信息 task_id = task_data.get("task_id","") print(f"[{datetime.now()}] 开始处理任务 {task_id},-- {task_data}") jd_title = task_data.get("jd_title", "") jd_portrait = task_data.get("jd_portrait", "") resume_portrait = task_data.get("resume_portrait", "") company_portrait = task_data.get("company_portrait", "") report_data = task_data.get("report_data", "") if not jd_title: return {"code": 0, "data": {}, "message": "jd_title参数不存在"} if not jd_portrait: return {"code": 0, "data": {}, "message": "jd_portrait参数不存在"} if not resume_portrait: return {"code": 0, "data": {}, "message": "resume_portrait参数不存在"} if not company_portrait: return {"code": 0, "data": {}, "message": "company_portrait参数不存在"} feedback=get_feedback(jd_title, jd_portrait,resume_portrait,company_portrait,report_data) # #return '''agent = CompleteAgent() agent.jd_title = jd_title agent.jd_portrait = jd_portrait agent.resume_portrait = resume_portrait agent.company_portrait = company_portrait logging.info(f"[{datetime.now()}] --ai面试报告-- {report_data}")''' #feedback = await agent._run(report_data)''' #feedback = {"code": 200, "data": "feedback", "processed_at": str(datetime.now()), "message": "任务成功处理"} #print("feedback555", feedback) score=feedback.get("score") if score >=0: result = {"code": 200,"status":"SUCCESS","data": feedback,"processed_at": str(datetime.now()),"message": "任务成功处理"} logging.info(f"[{datetime.now()}] 任务 {task_id} 完成") print(f"[{datetime.now()}] 任务 {task_id} 完成") else: result = {"code": 0,"status":"FAILURE","data": feedback,"processed_at": str(datetime.now()),"message": "任务处理失败"} logging.info(f"[{datetime.now()}] 任务 {task_id} 失败") print(f"[{datetime.now()}] 任务 {task_id} 失败") # 使用链式任务来发送回调,避免任务整体重试 send_callback_task.si(task_id, feedback).apply_async(countdown=10) return result except TimeoutError as e: # 当前重试次数: self.request.retries # 下次重试等待时间: 2^retries 秒 self.retry(exc=e, countdown=2**self.request.retries) except Exception as e: print(f"[{datetime.now()}] 任务 {task_id} 失败: {str(e)}") raise self.retry(exc=e, countdown=60,max_retries=3) # 60秒后重试, def get_feedback(jd_title,jd_portrait,resume_portrait,company_portrait,report_data): agent = CompleteAgent() agent.jd_title = jd_title agent.jd_portrait = jd_portrait agent.resume_portrait = resume_portrait agent.company_portrait = company_portrait logging.info(f"[{datetime.now()}] --ai面试报告-- {report_data}") feedback = asyncio.run( asyncio.wait_for(agent._run(report_data), timeout=300.0) ) return feedback
callback.py
import logging import time from datetime import datetime import httpx from celery.exceptions import MaxRetriesExceededError, Retry from queues.celery_app import celery_app from sdk.app.config import ai_config @celery_app.task(bind=True, autoretry_for=(TimeoutError,), max_retries=24, default_retry_delay=60, expires=3600, retry_backoff=True, retry_backoff_max=600, # 最大退避时间10分钟 retry_jitter=True) # 添加随机抖动避免惊群 def send_callback_task(self, task_id: str, payload: dict): """ 增强版回调任务,具有以下特性: 1. 自动重试所有异常(包括PENDING状态) 2. 指数退避重试机制 3. 详细的日志记录 4. 状态检查机制 """ print(f"[{datetime.now()}]{task_id} {payload}") # 检查任务是否已经超过最大重试次数 if self.request.retries >= self.max_retries: logging.critical(f"[{datetime.now()}] 任务 {task_id} 已达到最大重试次数") return {"status": "failed", "reason": "max_retries_exceeded"} # 检查任务是否处于PENDING状态过久(超过1小时) print(f"[{datetime.now()}] {self.request}") if (self.request.delivery_info and getattr(self.request, 'is_pending', False) and time.time() - self.request.delivery_info.get('timestamp', 0) > 3600): logging.warning(f"[{datetime.now()}] 检测到长时间PENDING的任务 {task_id},触发重试") raise Retry() callback_url = ai_config.get("interview_push_uri") if not callback_url: error_msg = f"[{datetime.now()}] 回调地址未配置" logging.error(error_msg) raise ValueError(error_msg) if not isinstance(payload, dict) or not payload: error_msg = f"[{datetime.now()}] 无效的 payload: {payload}" logging.warning(error_msg) raise ValueError(error_msg) logging.info(f"[{datetime.now()}] 开始发送回调,任务ID: {task_id}, 重试次数: {self.request.retries}") try: # 使用更健壮的HTTP客户端配置 transport = httpx.HTTPTransport(retries=3) timeout = httpx.Timeout(60.0, connect=10.0) with httpx.Client(transport=transport, timeout=timeout) as client: response = client.post( callback_url, json={ "feedback":payload, "task_id":task_id }, headers={ "X-Task-ID": task_id, "X-Retry-Count": str(self.request.retries), "User-Agent": "AI-Callback/1.0" } ) logging.info(f"[{datetime.now()}] 回调响应: {response.status_code} {response.text[:200]}") # 对非2xx响应也记录详细日志 if not response.is_success: logging.warning( f"[{datetime.now()}] 非成功响应: {response.status_code}\nHeaders: {response.headers}\nBody: {response.text[:500]}") response.raise_for_status() return { "status": "success", "code": response.status_code, "body": response.text, "retries": self.request.retries } except TimeoutError as e: # 当前重试次数: self.request.retries # 下次重试等待时间: 2^retries 秒 self.retry(exc=e, countdown=2**self.request.retries) except httpx.TimeoutException as exc: error_msg = f"[{datetime.now()}] 请求超时: {exc}" logging.error(error_msg, exc_info=True) raise self.retry(exc=exc) except httpx.HTTPStatusError as exc: # 对不同的HTTP状态码采用不同策略 if exc.response.status_code in (502, 503, 504): retry_delay = min(60 * (self.request.retries + 1), 300) # 最大延迟5分钟 error_msg = f"[{datetime.now()}] 服务器错误 {exc.response.status_code}" logging.error(error_msg) raise self.retry(exc=exc, countdown=retry_delay) else: error_msg = f"[{datetime.now()}] HTTP错误 {exc.response.status_code}" logging.error(error_msg) raise exc # 非服务器错误不再重试 except httpx.RequestError as exc: error_msg = f"[{datetime.now()}] 请求失败: {exc}" logging.error(error_msg, exc_info=True) raise self.retry(exc=exc) except Exception as exc: error_msg = f"[{datetime.now()}] 未知错误: {exc}" logging.exception(error_msg) raise self.retry(exc=exc) 运行celery -A queues.celery_app worker --loglevel=info --concurrency=4