fastapi 集成队列
Celery 的核心是一个 分布式任务队列系统,主要用于:
异步任务处理:将耗时的任务(如发送邮件、处理文件、调用AI模型)放入队列,由Worker进程异步执行,避免阻塞主程序。
分布式架构:支持多Worker跨机器并行处理任务,通过消息中间件(如Redis/RabbitMQ)协调。
任务重试/错误处理:任务失败后可以自动重试,保证可靠性。
celery_app.py
import logging
import time
from celery import Celery
from sdk.redis.redis_manager import app_configs
celery_app = Celery(
"queues",
broker=app_configs['redis'],
backend=app_configs['redis'],
include=[
"queues.tasks",
"queues.callback"
],
broker_transport_options={
'socket_timeout': 10,
'socket_connect_timeout': 5,
'retry_on_timeout': True,
'max_retries': 3,
'health_check_interval': 30, # 健康检查间隔
'socket_keepalive': True, # 启用 TCP keepalive
}, # 增加 broker 超时
redis_backend_health_check_interval=30, # 可选:增加健康检查间隔
redis_socket_timeout=120, # 增加 socket 超时
redis_max_connections=20,
broker_connection_max_retries=3, # 最大重试次数
broker_connection_retry=True, # 启用连接重试
broker_pool_limit=10, # 连接池大小
broker_pool_timeout=30, # 获取连接超时时间
broker_connection_retry_on_startup=True,
)
# 可选配置
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 任务超时30分钟
broker_connection_retry=True, # 启用连接重试
broker_connection_max_retries=5, # 最大重试次数
broker_pool_limit=10, # 连接池大小
broker_pool_timeout=30, # 获取连接超时时间
worker_heartbeat = 30,
broker_heartbeat=30, # 30秒一次心跳
broker_heartbeat_checkrate=2.0, # 心跳检查频率
result_expires=3600,
task_default_queue="celery", # 统一队列名
beat_schedule={
'check_redis_connection': {
'task': 'queues.tasks.check_redis_connection',
'schedule': 300.0, # 每5分钟执行一次
},
'ai_interview_task': {
'task': 'queues.tasks.ai_interview_task',
'schedule': 3600.0,
},
},
)
connection_check.py
from celery import shared_task
import logging
from queues.connection_manager import ConnectionManager
conn_manager=ConnectionManager()
@shared_task
def check_redis_connection():
logging.info("执行Redis连接检查...")
conn_manager.ensure_connection()
return "Connection check completed"
connection_manager.py
import logging
import time
class ConnectionManager:
def __init__(self, app):
self.app = app
self.last_heartbeat = time.time()
self.last_connection_check = 0
self.connection_check_interval = 60 # 每60秒检查一次
def ensure_connection(self):
now = time.time()
if now - self.last_connection_check < self.connection_check_interval:
return
self.last_connection_check = now
try:
with self.app.pool.acquire(block=True) as conn:
print("执行ping")
# 执行一个简单的Redis命令测试连接
conn.client().ping()
except Exception as e:
print(f"连接检查失败: {str(e)},尝试重新连接...")
logging.warning(f"连接检查失败: {str(e)},尝试重新连接...")
try:
self.app.pool.force_close_all()
self.app.pool.disconnect()
# 重新建立连接
with self.app.pool.acquire(block=True) as conn:
conn.client().ping()
logging.info("连接重新建立成功")
except Exception as e:
logging.error(f"重新连接失败: {str(e)}")
tasks.py
import asyncio
import uuid
from datetime import datetime
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from celery.result import AsyncResult
from queues.tasks import ai_interview_task
def create_task(task_request: any):
"""创建新任务并加入Celery队列"""
task_id = task_request.get("task_id")
task_data = {
"task_id": task_request.get("task_id"),
"report_data": task_request.get("report_data"),
"jd_title": task_request.get("jd_title"),
"jd_portrait": task_request.get("jd_portrait"),
"resume_portrait": task_request.get("resume_portrait"),
"company_portrait": task_request.get("company_portrait"),
"timestamp": str(datetime.now())
}
# 发送任务到Celery
task =ai_interview_task.apply_async(
kwargs={"task_data": task_data},
task_id=task_id
)
if task is not None:
return {
"message": "任务已接收",
"task_id": task_id,
"status_url": f"/tasks/{task_id}"
}
return {
"message": "任务接收失败",
"task_id": task_id,
"status_url": f"/tasks/{task_id}"
}
def get_task_status(task_id: str):
"""获取任务状态"""
task_result = AsyncResult(task_id)
print("task_result", task_result)
if not task_result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"任务 {task_id} 不存在"
)
response = {
"task_id": task_id,
"status": task_result.status,
"result": None
}
if task_result.status == "SUCCESS":
response["result"] = task_result.result
elif task_result.status == "FAILURE":
response["result"] = str(task_result.result)
elif task_result.status == "PROGRESS":
response.update(task_result.info)
print("response", response)
return response
if __name__ == "__main__":
data=[
{
"question":"作为董事长,请结合您过往的团队管理经验,谈谈如何有效激励并保持一支高绩效AI技术团队的稳定性?",
"answer":"首先,设定明确的目标和期望,第二,提供成长的成长与发展机会。第三。创建支持性的工作环境,第四,实施公平的奖励机制,五、第五,关注员工福利,第六。领导层的透明沟通。",
"feedback":{
"score":65,
"evaluateContent":"回答内容整体逻辑较为清晰,涵盖了团队管理中的一些关键要素,如目标设定、员工成长、工作环境、奖励机制、员工福利以及透明沟通。但存在一些小问题:首先,表述中有重复和不规范的地方,例如‘第二,提供成长的成长与发展机会’应为‘提供成长与发展机会’,‘第三。创建支持性的工作环境’中的句号使用不当。其次,虽然提到了多个激励措施,但缺乏深度阐述,尤其是针对AI技术团队的特点没有具体展开,比如如何结合AI领域的前沿发展来激励团队成员。最后,关于高绩效团队稳定性的长期策略可以进一步细化。总体来说,答案与标准答案匹配度约为65%。",
}
}
]
jd_title="董事长"
jd_portrait={
"content": {
"工作职能": "",
"风格要求": 3,
"职位名称": "前端端端端端开发总经理 + 2",
"工作地点": "浙江省宁波市鄞州区明楼街道明北社区",
"其他补充/备注": "76uii语客语n",
"年龄要求": "27-48",
"工作职权": "",
"薪酬范围": "14-67",
"月工资": 0,
"职位偏好": "用户运营",
"入职时间": "2025-05-14",
"是否统招": 1,
"下级人数": "0人",
"院校要求": 4,
"详细岗位要求": "岗位说明:前端开发总经理\n\n公司简介:我们是一家位于武汉市东湖新技术开发区的大型企业,注册资本为伍佰万元整,主要提供人力资源管理服务及培训。我们的经营范围涵盖了人力/行政/法务和教育培训两大领域。公司规模在599-1999人之间,员工以知识型为主,强调学习氛围和企业文化。\n\n岗位职责:\n1. 负责公司前端技术团队的整体规划与管理,带领团队完成各项开发任务。\n2. 参与产品设计和用户体验优化,确保前端技术方案符合用户运营需求。\n3. 制定并实施前端技术发展战略,跟踪行业前沿技术,推动技术创新。\n4. 协调跨部门合作,确保项目按时高质量交付。\n\n任职要求:\n1. 本科及以上学历,计算机相关专业优先。\n2. 拥有5-8年前端开发相关工作经验,其中至少3年以上管理经验。\n3. 精通HTML、CSS、JavaScript等前端开发技术,熟悉主流前端框架(如React、Vue等)。\n4. 对用户运营有深刻理解,能够从用户角度出发进行 术方案设计。\n5. 具备良好的沟通能力和团队协作精神,能够激励团队成员达成目标。\n6. 对人力/行政/法务或教育培训行业有一定了解者优先。\n\n薪资待遇:每月14-67k,具体根据个人能力和经验面议。\n\n工作地点:湖北省武汉市东湖新技术开发区\n\n我们重视每一位员工的成长与发展,期待有志之士加入我们的团队,共同创造更加辉煌的未来!",
"工龄要求": "5-8",
"岗位优势": "加入我们,成为前端开发领域的领航者!作为一家位于武汉东湖新技术开发区的大型企业,我们专注于人力资源管理服务与培训,同时涉足教育培训行业。公司规模达599-1999人,以知识型员工为主,学习氛围浓厚,企业文化卓越。\n\n作为前端开发总经理,您将带领团队制定技术战略、优化用户体验,并推动技术创新。我们提供具有竞争力的薪资(14-67k/月),以及完善的福利体系:节日福利、补充 疗保险、健康体检、弹性工作制、生日福利和定期团建。\n\n在这里,您不仅能与优秀的人才共事,还能享受广阔的职业发展空间。我们期待热爱技术、关注用户运营、具备管理经验的您,携手共创未来!",
"学历要求": "本科",
"所属行业": "客服/运营",
"职位类别": "业务运营",
"试用期时长": 3,
"工作性质": 2,
"福利待遇": "节日福利-补充医疗保险-健康体检-弹性工作制-生日福利-定期团建"
}
}
resume_portrait={
"基本信息": {
"年龄": 28,
"性别": "",
"工作年限": 5,
"参加工作时间": "2008-07-01",
"当前职位": "",
"在职状态": "",
"换岗频次": 3,
"当前月薪(下限)": "",
"当前月薪(上限)": "",
"工作性质": "",
"当前所在地": "",
"当前所在地(规范化)": "",
"简历解析时间": "",
"简历更新时间": ""
},
"教育信息": [
{
"学历": "本科",
"毕业院校": "南京师范大学",
"是否统招": "否",
"毕业学校类型": "普通",
"毕业学校国内排名": "",
"毕业学校国际排名": "",
"专业": ""
}
],
"求职期望": {
"期望工作": "人力资源、培训、互联网/电子商务/网游",
"期望薪资": "4500~5999元/月",
"期望薪资下限": "4.5K",
"期望薪资上限": "6K",
"期望行业": "广告、法律、贸易/进出口、公关/市场推广/会展、互联网/电子商务",
"期望工作性质": "",
"当前离职/在职状态": "未知",
"期望工作地址": "",
"期望工作地址(规范化)": ""
},
"工作经历": [
{
"开始时间": "2008-07-01",
"结束时间": "2008-08-01",
"公司名称": "***公司",
"公司性质": "",
"公司规模": "",
"公司描述": "",
"行业": "",
"职位": "实习",
"职能类型": "",
"工作性质": "",
"工作薪资": "",
"工作地点": "",
"持续时间": "一个月",
"工作能力": "",
"工作内容": "实习经历,具体职责不详"
},
{
"开始时间": "2009-02-01",
"结束时间": "2011-02-01",
"公司名称": "***公司",
"公司性质": "",
"公司规模": "",
"公司描述": "",
"行业": "",
"职位": "经理助理/秘书",
"职能类型": "",
"工作性质": "",
"工作薪资": "",
"工作地点": "",
"持续时间": "两年",
"工作能力": "具备一定的行政管理能力和沟通协调技巧",
"工作内容": "协助部门经理处理日常事务,参与文件编写和会议组织等"
},
{
"开始时间": "2011-03-01",
"结束时间": "2013-03-01",
"公司名称": "***公司",
"公司性质": "",
"公司规模": "",
"公司描述": "",
"行业": "",
"职位": "招聘专员/助理",
"职能类型": "",
"工作性质": "",
"工作薪资": "",
"工作地点": "",
"持续时间": "两年",
"工作能力": "熟悉招聘流程,具有较强的沟通能力和问题解决能力",
"工作内容": "负责简历筛选、面试安排及入职手续办理等工作"
},
{
"开始时间": "2013-08-01",
"结束时间": "至今",
"公司名称": "***公司",
"公司性质": "",
"公司规模": "",
"公司描述": "",
"行业": "",
"职位": "招聘专员/助理",
"职能类型": "",
"工作性质": "",
"工作薪资": "",
"工作地点": "",
"持续时间": "至今",
"工作能力": "具备丰富的招聘经验,擅长团队协作与沟通",
"工作内容": "负责员工招聘、培训及绩效评估等相关工作"
}
],
"项目经历": [],
"培训经历": [],
"语言技能": [],
"所有证书及奖项": [],
"画像总结": "该候选人拥有5年的HR工作经验,主要担任招聘专员/助理的职务。在南京师范大学本科毕业后,先后在多家公司积累了丰富的人力资源管理工作经验,特别是在招聘流程和沟通协调方面表现出色。期望从事人力资源或互联网相关领域的工作,并寻求4.5K至6K元/月的薪酬。"
}
company_portrait="三五有限公司"
task_data = {
"task_id": "bb",
"report_data": data,
"jd_title": jd_title,
"jd_portrait": jd_portrait,
"resume_portrait": resume_portrait,
"company_portrait":company_portrait,
"timestamp": str(datetime.now())
}
#sss=create_task(task_data)
#print(sss)
get_task_status("bbbb")
tasks.py
import asyncio
import logging
import time
from datetime import datetime
from interview.agent.complete_agent import CompleteAgent
from queues.celery_app import celery_app
from queues.callback import send_callback_task
@celery_app.task(bind=True, autoretry_for=(TimeoutError,), max_retries=3)
def ai_interview_task(self,task_data: dict):
"""模拟长时间运行的任务"""
try:
# 获取任务信息
task_id = task_data.get("task_id","")
print(f"[{datetime.now()}] 开始处理任务 {task_id},-- {task_data}")
jd_title = task_data.get("jd_title", "")
jd_portrait = task_data.get("jd_portrait", "")
resume_portrait = task_data.get("resume_portrait", "")
company_portrait = task_data.get("company_portrait", "")
report_data = task_data.get("report_data", "")
if not jd_title:
return {"code": 0, "data": {}, "message": "jd_title参数不存在"}
if not jd_portrait:
return {"code": 0, "data": {}, "message": "jd_portrait参数不存在"}
if not resume_portrait:
return {"code": 0, "data": {}, "message": "resume_portrait参数不存在"}
if not company_portrait:
return {"code": 0, "data": {}, "message": "company_portrait参数不存在"}
feedback=get_feedback(jd_title, jd_portrait,resume_portrait,company_portrait,report_data)
#
#return
'''agent = CompleteAgent()
agent.jd_title = jd_title
agent.jd_portrait = jd_portrait
agent.resume_portrait = resume_portrait
agent.company_portrait = company_portrait
logging.info(f"[{datetime.now()}] --ai面试报告-- {report_data}")'''
#feedback = await agent._run(report_data)'''
#feedback = {"code": 200, "data": "feedback", "processed_at": str(datetime.now()), "message": "任务成功处理"}
#print("feedback555", feedback)
score=feedback.get("score")
if score >=0:
result = {"code": 200,"status":"SUCCESS","data": feedback,"processed_at": str(datetime.now()),"message": "任务成功处理"}
logging.info(f"[{datetime.now()}] 任务 {task_id} 完成")
print(f"[{datetime.now()}] 任务 {task_id} 完成")
else:
result = {"code": 0,"status":"FAILURE","data": feedback,"processed_at": str(datetime.now()),"message": "任务处理失败"}
logging.info(f"[{datetime.now()}] 任务 {task_id} 失败")
print(f"[{datetime.now()}] 任务 {task_id} 失败")
# 使用链式任务来发送回调,避免任务整体重试
send_callback_task.si(task_id, feedback).apply_async(countdown=10)
return result
except TimeoutError as e:
# 当前重试次数: self.request.retries
# 下次重试等待时间: 2^retries 秒
self.retry(exc=e, countdown=2**self.request.retries)
except Exception as e:
print(f"[{datetime.now()}] 任务 {task_id} 失败: {str(e)}")
raise self.retry(exc=e, countdown=60,max_retries=3) # 60秒后重试,
def get_feedback(jd_title,jd_portrait,resume_portrait,company_portrait,report_data):
agent = CompleteAgent()
agent.jd_title = jd_title
agent.jd_portrait = jd_portrait
agent.resume_portrait = resume_portrait
agent.company_portrait = company_portrait
logging.info(f"[{datetime.now()}] --ai面试报告-- {report_data}")
feedback = asyncio.run(
asyncio.wait_for(agent._run(report_data), timeout=300.0)
)
return feedback
callback.py
import logging
import time
from datetime import datetime
import httpx
from celery.exceptions import MaxRetriesExceededError, Retry
from queues.celery_app import celery_app
from sdk.app.config import ai_config
@celery_app.task(bind=True,
autoretry_for=(TimeoutError,),
max_retries=24,
default_retry_delay=60,
expires=3600,
retry_backoff=True,
retry_backoff_max=600, # 最大退避时间10分钟
retry_jitter=True) # 添加随机抖动避免惊群
def send_callback_task(self, task_id: str, payload: dict):
"""
增强版回调任务,具有以下特性:
1. 自动重试所有异常(包括PENDING状态)
2. 指数退避重试机制
3. 详细的日志记录
4. 状态检查机制
"""
print(f"[{datetime.now()}] {task_id} {payload}")
# 检查任务是否已经超过最大重试次数
if self.request.retries >= self.max_retries:
logging.critical(f"[{datetime.now()}] 任务 {task_id} 已达到最大重试次数")
return {"status": "failed", "reason": "max_retries_exceeded"}
# 检查任务是否处于PENDING状态过久(超过1小时)
print(f"[{datetime.now()}] {self.request}")
if (self.request.delivery_info and
getattr(self.request, 'is_pending', False) and
time.time() - self.request.delivery_info.get('timestamp', 0) > 3600):
logging.warning(f"[{datetime.now()}] 检测到长时间PENDING的任务 {task_id},触发重试")
raise Retry()
callback_url = ai_config.get("interview_push_uri")
if not callback_url:
error_msg = f"[{datetime.now()}] 回调地址未配置"
logging.error(error_msg)
raise ValueError(error_msg)
if not isinstance(payload, dict) or not payload:
error_msg = f"[{datetime.now()}] 无效的 payload: {payload}"
logging.warning(error_msg)
raise ValueError(error_msg)
logging.info(f"[{datetime.now()}] 开始发送回调,任务ID: {task_id}, 重试次数: {self.request.retries}")
try:
# 使用更健壮的HTTP客户端配置
transport = httpx.HTTPTransport(retries=3)
timeout = httpx.Timeout(60.0, connect=10.0)
with httpx.Client(transport=transport, timeout=timeout) as client:
response = client.post(
callback_url,
json={
"feedback":payload,
"task_id":task_id
},
headers={
"X-Task-ID": task_id,
"X-Retry-Count": str(self.request.retries),
"User-Agent": "AI-Callback/1.0"
}
)
logging.info(f"[{datetime.now()}] 回调响应: {response.status_code} {response.text[:200]}")
# 对非2xx响应也记录详细日志
if not response.is_success:
logging.warning(
f"[{datetime.now()}] 非成功响应: {response.status_code}\nHeaders: {response.headers}\nBody: {response.text[:500]}")
response.raise_for_status()
return {
"status": "success",
"code": response.status_code,
"body": response.text,
"retries": self.request.retries
}
except TimeoutError as e:
# 当前重试次数: self.request.retries
# 下次重试等待时间: 2^retries 秒
self.retry(exc=e, countdown=2**self.request.retries)
except httpx.TimeoutException as exc:
error_msg = f"[{datetime.now()}] 请求超时: {exc}"
logging.error(error_msg, exc_info=True)
raise self.retry(exc=exc)
except httpx.HTTPStatusError as exc:
# 对不同的HTTP状态码采用不同策略
if exc.response.status_code in (502, 503, 504):
retry_delay = min(60 * (self.request.retries + 1), 300) # 最大延迟5分钟
error_msg = f"[{datetime.now()}] 服务器错误 {exc.response.status_code}"
logging.error(error_msg)
raise self.retry(exc=exc, countdown=retry_delay)
else:
error_msg = f"[{datetime.now()}] HTTP错误 {exc.response.status_code}"
logging.error(error_msg)
raise exc # 非服务器错误不再重试
except httpx.RequestError as exc:
error_msg = f"[{datetime.now()}] 请求失败: {exc}"
logging.error(error_msg, exc_info=True)
raise self.retry(exc=exc)
except Exception as exc:
error_msg = f"[{datetime.now()}] 未知错误: {exc}"
logging.exception(error_msg)
raise self.retry(exc=exc)
运行
celery -A queues.celery_app worker --loglevel=info --concurrency=4

