fastapi 集成队列

作者: adm 分类: python 发布时间: 2025-02-24

Celery 的核心是一个 分布式任务队列系统,主要用于:

异步任务处理:将耗时的任务(如发送邮件、处理文件、调用AI模型)放入队列,由Worker进程异步执行,避免阻塞主程序。
分布式架构:支持多Worker跨机器并行处理任务,通过消息中间件(如Redis/RabbitMQ)协调。
任务重试/错误处理:任务失败后可以自动重试,保证可靠性。
celery_app.py

import logging
import time

from celery import Celery
from sdk.redis.redis_manager import app_configs

celery_app = Celery(
    "queues",
    broker=app_configs['redis'],
    backend=app_configs['redis'],
    include=[
        "queues.tasks",
        "queues.callback"
             ],
    broker_transport_options={
        'socket_timeout': 10,
        'socket_connect_timeout': 5,
        'retry_on_timeout': True,
        'max_retries': 3,
        'health_check_interval': 30,  # 健康检查间隔
        'socket_keepalive': True,     # 启用 TCP keepalive
    },  # 增加 broker 超时
    redis_backend_health_check_interval=30,  # 可选:增加健康检查间隔
    redis_socket_timeout=120,  # 增加 socket 超时
    redis_max_connections=20,
    broker_connection_max_retries=3,  # 最大重试次数
    broker_connection_retry=True,     # 启用连接重试
    broker_pool_limit=10,            # 连接池大小
    broker_pool_timeout=30,          # 获取连接超时时间
    broker_connection_retry_on_startup=True,
)

# 可选配置
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60, # 任务超时30分钟
    broker_connection_retry=True,  # 启用连接重试
    broker_connection_max_retries=5,  # 最大重试次数
    broker_pool_limit=10,  # 连接池大小
    broker_pool_timeout=30,  # 获取连接超时时间
    worker_heartbeat = 30,
    broker_heartbeat=30,  # 30秒一次心跳
    broker_heartbeat_checkrate=2.0, # 心跳检查频率
    result_expires=3600,
    task_default_queue="celery", # 统一队列名
    beat_schedule={
        'check_redis_connection': {
            'task': 'queues.tasks.check_redis_connection',
            'schedule': 300.0,  # 每5分钟执行一次
        },
        'ai_interview_task': {
            'task': 'queues.tasks.ai_interview_task',
            'schedule': 3600.0,
        },
    },

)


connection_check.py


from celery import shared_task
import logging

from queues.connection_manager import ConnectionManager

conn_manager=ConnectionManager()

@shared_task
def check_redis_connection():
    logging.info("执行Redis连接检查...")
    conn_manager.ensure_connection()
    return "Connection check completed"


connection_manager.py

import logging
import time


class ConnectionManager:
    def __init__(self, app):
        self.app = app
        self.last_heartbeat = time.time()
        self.last_connection_check = 0
        self.connection_check_interval = 60  # 每60秒检查一次

    def ensure_connection(self):
        now = time.time()
        if now - self.last_connection_check < self.connection_check_interval:
            return

        self.last_connection_check = now
        try:
            with self.app.pool.acquire(block=True) as conn:
                print("执行ping")
                # 执行一个简单的Redis命令测试连接
                conn.client().ping()
        except Exception as e:
            print(f"连接检查失败: {str(e)},尝试重新连接...")
            logging.warning(f"连接检查失败: {str(e)},尝试重新连接...")
            try:
                self.app.pool.force_close_all()
                self.app.pool.disconnect()
                # 重新建立连接
                with self.app.pool.acquire(block=True) as conn:
                    conn.client().ping()
                logging.info("连接重新建立成功")
            except Exception as e:
                logging.error(f"重新连接失败: {str(e)}")


tasks.py


import asyncio
import uuid
from datetime import datetime

from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from celery.result import AsyncResult
from queues.tasks import ai_interview_task

def create_task(task_request: any):
    """创建新任务并加入Celery队列"""
    task_id = task_request.get("task_id")
    task_data = {
        "task_id": task_request.get("task_id"),
        "report_data": task_request.get("report_data"),
        "jd_title": task_request.get("jd_title"),
        "jd_portrait": task_request.get("jd_portrait"),
        "resume_portrait": task_request.get("resume_portrait"),
        "company_portrait": task_request.get("company_portrait"),
        "timestamp": str(datetime.now())
    }

    # 发送任务到Celery

    task =ai_interview_task.apply_async(
        kwargs={"task_data": task_data},
        task_id=task_id
    )
    if task is not None:
        return {
            "message": "任务已接收",
            "task_id": task_id,
            "status_url": f"/tasks/{task_id}"
        }

    return {
            "message": "任务接收失败",
            "task_id": task_id,
            "status_url": f"/tasks/{task_id}"
        }




def get_task_status(task_id: str):
    """获取任务状态"""
    task_result = AsyncResult(task_id)
    print("task_result", task_result)

    if not task_result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"任务 {task_id} 不存在"
        )

    response = {
        "task_id": task_id,
        "status": task_result.status,
        "result": None
    }

    if task_result.status == "SUCCESS":
        response["result"] = task_result.result
    elif task_result.status == "FAILURE":
        response["result"] = str(task_result.result)
    elif task_result.status == "PROGRESS":
        response.update(task_result.info)

    print("response", response)

    return response


if __name__ == "__main__":
    data=[
        {
            "question":"作为董事长,请结合您过往的团队管理经验,谈谈如何有效激励并保持一支高绩效AI技术团队的稳定性?",
            "answer":"首先,设定明确的目标和期望,第二,提供成长的成长与发展机会。第三。创建支持性的工作环境,第四,实施公平的奖励机制,五、第五,关注员工福利,第六。领导层的透明沟通。",
            "feedback":{
                "score":65,
                "evaluateContent":"回答内容整体逻辑较为清晰,涵盖了团队管理中的一些关键要素,如目标设定、员工成长、工作环境、奖励机制、员工福利以及透明沟通。但存在一些小问题:首先,表述中有重复和不规范的地方,例如‘第二,提供成长的成长与发展机会’应为‘提供成长与发展机会’,‘第三。创建支持性的工作环境’中的句号使用不当。其次,虽然提到了多个激励措施,但缺乏深度阐述,尤其是针对AI技术团队的特点没有具体展开,比如如何结合AI领域的前沿发展来激励团队成员。最后,关于高绩效团队稳定性的长期策略可以进一步细化。总体来说,答案与标准答案匹配度约为65%。",
            }
        }
    ]
    jd_title="董事长"
    jd_portrait={
      "content": {
         "工作职能": "",
         "风格要求": 3,
         "职位名称": "前端端端端端开发总经理 + 2",
         "工作地点": "浙江省宁波市鄞州区明楼街道明北社区",
         "其他补充/备注": "76uii语客语n",
         "年龄要求": "27-48",
         "工作职权": "",
         "薪酬范围": "14-67",
         "月工资": 0,
         "职位偏好": "用户运营",
         "入职时间": "2025-05-14",
         "是否统招": 1,
         "下级人数": "0人",
         "院校要求": 4,
         "详细岗位要求": "岗位说明:前端开发总经理\n\n公司简介:我们是一家位于武汉市东湖新技术开发区的大型企业,注册资本为伍佰万元整,主要提供人力资源管理服务及培训。我们的经营范围涵盖了人力/行政/法务和教育培训两大领域。公司规模在599-1999人之间,员工以知识型为主,强调学习氛围和企业文化。\n\n岗位职责:\n1. 负责公司前端技术团队的整体规划与管理,带领团队完成各项开发任务。\n2. 参与产品设计和用户体验优化,确保前端技术方案符合用户运营需求。\n3. 制定并实施前端技术发展战略,跟踪行业前沿技术,推动技术创新。\n4. 协调跨部门合作,确保项目按时高质量交付。\n\n任职要求:\n1. 本科及以上学历,计算机相关专业优先。\n2. 拥有5-8年前端开发相关工作经验,其中至少3年以上管理经验。\n3. 精通HTML、CSS、JavaScript等前端开发技术,熟悉主流前端框架(如React、Vue等)。\n4. 对用户运营有深刻理解,能够从用户角度出发进行   术方案设计。\n5. 具备良好的沟通能力和团队协作精神,能够激励团队成员达成目标。\n6. 对人力/行政/法务或教育培训行业有一定了解者优先。\n\n薪资待遇:每月14-67k,具体根据个人能力和经验面议。\n\n工作地点:湖北省武汉市东湖新技术开发区\n\n我们重视每一位员工的成长与发展,期待有志之士加入我们的团队,共同创造更加辉煌的未来!",
         "工龄要求": "5-8",
         "岗位优势": "加入我们,成为前端开发领域的领航者!作为一家位于武汉东湖新技术开发区的大型企业,我们专注于人力资源管理服务与培训,同时涉足教育培训行业。公司规模达599-1999人,以知识型员工为主,学习氛围浓厚,企业文化卓越。\n\n作为前端开发总经理,您将带领团队制定技术战略、优化用户体验,并推动技术创新。我们提供具有竞争力的薪资(14-67k/月),以及完善的福利体系:节日福利、补充   疗保险、健康体检、弹性工作制、生日福利和定期团建。\n\n在这里,您不仅能与优秀的人才共事,还能享受广阔的职业发展空间。我们期待热爱技术、关注用户运营、具备管理经验的您,携手共创未来!",
         "学历要求": "本科",
         "所属行业": "客服/运营",
         "职位类别": "业务运营",
         "试用期时长": 3,
         "工作性质": 2,
         "福利待遇": "节日福利-补充医疗保险-健康体检-弹性工作制-生日福利-定期团建"
      }
    }
    resume_portrait={
        "基本信息": {
            "年龄": 28,
            "性别": "",
            "工作年限": 5,
            "参加工作时间": "2008-07-01",
            "当前职位": "",
            "在职状态": "",
            "换岗频次": 3,
            "当前月薪(下限)": "",
            "当前月薪(上限)": "",
            "工作性质": "",
            "当前所在地": "",
            "当前所在地(规范化)": "",
            "简历解析时间": "",
            "简历更新时间": ""
        },
        "教育信息": [
            {
                "学历": "本科",
                "毕业院校": "南京师范大学",
                "是否统招": "否",
                "毕业学校类型": "普通",
                "毕业学校国内排名": "",
                "毕业学校国际排名": "",
                "专业": ""
            }
        ],
        "求职期望": {
            "期望工作": "人力资源、培训、互联网/电子商务/网游",
            "期望薪资": "4500~5999元/月",
            "期望薪资下限": "4.5K",
            "期望薪资上限": "6K",
            "期望行业": "广告、法律、贸易/进出口、公关/市场推广/会展、互联网/电子商务",
            "期望工作性质": "",
            "当前离职/在职状态": "未知",
            "期望工作地址": "",
            "期望工作地址(规范化)": ""
        },
        "工作经历": [
            {
                "开始时间": "2008-07-01",
                "结束时间": "2008-08-01",
                "公司名称": "***公司",
                "公司性质": "",
                "公司规模": "",
                "公司描述": "",
                "行业": "",
                "职位": "实习",
                "职能类型": "",
                "工作性质": "",
                "工作薪资": "",
                "工作地点": "",
                "持续时间": "一个月",
                "工作能力": "",
                "工作内容": "实习经历,具体职责不详"
            },
            {
                "开始时间": "2009-02-01",
                "结束时间": "2011-02-01",
                "公司名称": "***公司",
                "公司性质": "",
                "公司规模": "",
                "公司描述": "",
                "行业": "",
                "职位": "经理助理/秘书",
                "职能类型": "",
                "工作性质": "",
                "工作薪资": "",
                "工作地点": "",
                "持续时间": "两年",
                "工作能力": "具备一定的行政管理能力和沟通协调技巧",
                "工作内容": "协助部门经理处理日常事务,参与文件编写和会议组织等"
            },
            {
                "开始时间": "2011-03-01",
                "结束时间": "2013-03-01",
                "公司名称": "***公司",
                "公司性质": "",
                "公司规模": "",
                "公司描述": "",
                "行业": "",
                "职位": "招聘专员/助理",
                "职能类型": "",
                "工作性质": "",
                "工作薪资": "",
                "工作地点": "",
                "持续时间": "两年",
                "工作能力": "熟悉招聘流程,具有较强的沟通能力和问题解决能力",
                "工作内容": "负责简历筛选、面试安排及入职手续办理等工作"
            },
            {
                "开始时间": "2013-08-01",
                "结束时间": "至今",
                "公司名称": "***公司",
                "公司性质": "",
                "公司规模": "",
                "公司描述": "",
                "行业": "",
                "职位": "招聘专员/助理",
                "职能类型": "",
                "工作性质": "",
                "工作薪资": "",
                "工作地点": "",
                "持续时间": "至今",
                "工作能力": "具备丰富的招聘经验,擅长团队协作与沟通",
                "工作内容": "负责员工招聘、培训及绩效评估等相关工作"
            }
        ],
        "项目经历": [],
        "培训经历": [],
        "语言技能": [],
        "所有证书及奖项": [],
        "画像总结": "该候选人拥有5年的HR工作经验,主要担任招聘专员/助理的职务。在南京师范大学本科毕业后,先后在多家公司积累了丰富的人力资源管理工作经验,特别是在招聘流程和沟通协调方面表现出色。期望从事人力资源或互联网相关领域的工作,并寻求4.5K至6K元/月的薪酬。"
    }
    company_portrait="三五有限公司"
    task_data = {
        "task_id": "bb",
        "report_data": data,
        "jd_title": jd_title,
        "jd_portrait": jd_portrait,
        "resume_portrait": resume_portrait,
        "company_portrait":company_portrait,
        "timestamp": str(datetime.now())
    }
    #sss=create_task(task_data)
    #print(sss)
    get_task_status("bbbb")

tasks.py

import asyncio
import logging
import time

from datetime import datetime


from interview.agent.complete_agent import CompleteAgent
from queues.celery_app import celery_app
from queues.callback import send_callback_task


@celery_app.task(bind=True, autoretry_for=(TimeoutError,), max_retries=3)
def ai_interview_task(self,task_data: dict):
    """模拟长时间运行的任务"""
    try:
        # 获取任务信息
        task_id = task_data.get("task_id","")
        print(f"[{datetime.now()}] 开始处理任务 {task_id},-- {task_data}")

        jd_title = task_data.get("jd_title", "")
        jd_portrait = task_data.get("jd_portrait", "")
        resume_portrait = task_data.get("resume_portrait", "")
        company_portrait = task_data.get("company_portrait", "")
        report_data = task_data.get("report_data", "")
        if not jd_title:
            return {"code": 0, "data": {}, "message": "jd_title参数不存在"}
        if not jd_portrait:
            return {"code": 0, "data": {}, "message": "jd_portrait参数不存在"}
        if not resume_portrait:
            return {"code": 0, "data": {}, "message": "resume_portrait参数不存在"}
        if not company_portrait:
            return {"code": 0, "data": {}, "message": "company_portrait参数不存在"}



        feedback=get_feedback(jd_title, jd_portrait,resume_portrait,company_portrait,report_data)
        #
        #return
        '''agent = CompleteAgent()
        agent.jd_title = jd_title
        agent.jd_portrait = jd_portrait
        agent.resume_portrait = resume_portrait
        agent.company_portrait = company_portrait
        logging.info(f"[{datetime.now()}] --ai面试报告--  {report_data}")'''
        #feedback = await agent._run(report_data)'''
        #feedback = {"code": 200, "data": "feedback", "processed_at": str(datetime.now()), "message": "任务成功处理"}
        #print("feedback555", feedback)
        score=feedback.get("score")
        if score >=0:
            result = {"code": 200,"status":"SUCCESS","data": feedback,"processed_at": str(datetime.now()),"message": "任务成功处理"}
            logging.info(f"[{datetime.now()}] 任务 {task_id} 完成")
            print(f"[{datetime.now()}] 任务 {task_id} 完成")
        else:
            result = {"code": 0,"status":"FAILURE","data": feedback,"processed_at": str(datetime.now()),"message": "任务处理失败"}
            logging.info(f"[{datetime.now()}] 任务 {task_id} 失败")
            print(f"[{datetime.now()}] 任务 {task_id} 失败")

        # 使用链式任务来发送回调,避免任务整体重试
        send_callback_task.si(task_id, feedback).apply_async(countdown=10)
        return result

    except TimeoutError as e:
        # 当前重试次数: self.request.retries
        # 下次重试等待时间: 2^retries 秒
        self.retry(exc=e, countdown=2**self.request.retries)
    except Exception as e:
        print(f"[{datetime.now()}] 任务 {task_id} 失败: {str(e)}")
        raise self.retry(exc=e, countdown=60,max_retries=3)  # 60秒后重试,



def get_feedback(jd_title,jd_portrait,resume_portrait,company_portrait,report_data):

    agent = CompleteAgent()
    agent.jd_title = jd_title
    agent.jd_portrait = jd_portrait
    agent.resume_portrait = resume_portrait
    agent.company_portrait = company_portrait
    logging.info(f"[{datetime.now()}] --ai面试报告--  {report_data}")
    feedback = asyncio.run(
        asyncio.wait_for(agent._run(report_data), timeout=300.0)
    )
    return feedback


callback.py

import logging
import time
from datetime import datetime

import httpx
from celery.exceptions import MaxRetriesExceededError, Retry

from queues.celery_app import celery_app
from sdk.app.config import ai_config


@celery_app.task(bind=True,
                 autoretry_for=(TimeoutError,),
                 max_retries=24,
                 default_retry_delay=60,
                 expires=3600,
                 retry_backoff=True,
                 retry_backoff_max=600,  # 最大退避时间10分钟
                 retry_jitter=True)  # 添加随机抖动避免惊群
def send_callback_task(self, task_id: str, payload: dict):
    """
    增强版回调任务,具有以下特性:
    1. 自动重试所有异常(包括PENDING状态)
    2. 指数退避重试机制
    3. 详细的日志记录
    4. 状态检查机制
    """
    print(f"[{datetime.now()}]  {task_id}  {payload}")
    # 检查任务是否已经超过最大重试次数
    if self.request.retries >= self.max_retries:
        logging.critical(f"[{datetime.now()}] 任务 {task_id} 已达到最大重试次数")
        return {"status": "failed", "reason": "max_retries_exceeded"}

    # 检查任务是否处于PENDING状态过久(超过1小时)
    print(f"[{datetime.now()}]  {self.request}")
    if (self.request.delivery_info and
            getattr(self.request, 'is_pending', False) and
            time.time() - self.request.delivery_info.get('timestamp', 0) > 3600):
        logging.warning(f"[{datetime.now()}] 检测到长时间PENDING的任务 {task_id},触发重试")
        raise Retry()

    callback_url = ai_config.get("interview_push_uri")
    if not callback_url:
        error_msg = f"[{datetime.now()}] 回调地址未配置"
        logging.error(error_msg)
        raise ValueError(error_msg)

    if not isinstance(payload, dict) or not payload:
        error_msg = f"[{datetime.now()}] 无效的 payload: {payload}"
        logging.warning(error_msg)
        raise ValueError(error_msg)

    logging.info(f"[{datetime.now()}] 开始发送回调,任务ID: {task_id}, 重试次数: {self.request.retries}")

    try:
        # 使用更健壮的HTTP客户端配置
        transport = httpx.HTTPTransport(retries=3)
        timeout = httpx.Timeout(60.0, connect=10.0)

        with httpx.Client(transport=transport, timeout=timeout) as client:
            response = client.post(
                callback_url,
                json={
                    "feedback":payload,
                    "task_id":task_id
                },
                headers={
                    "X-Task-ID": task_id,
                    "X-Retry-Count": str(self.request.retries),
                    "User-Agent": "AI-Callback/1.0"
                }
            )

            logging.info(f"[{datetime.now()}] 回调响应: {response.status_code} {response.text[:200]}")

            # 对非2xx响应也记录详细日志
            if not response.is_success:
                logging.warning(
                    f"[{datetime.now()}] 非成功响应: {response.status_code}\nHeaders: {response.headers}\nBody: {response.text[:500]}")

            response.raise_for_status()

            return {
                "status": "success",
                "code": response.status_code,
                "body": response.text,
                "retries": self.request.retries
            }

    except TimeoutError as e:
        # 当前重试次数: self.request.retries
        # 下次重试等待时间: 2^retries 秒
        self.retry(exc=e, countdown=2**self.request.retries)

    except httpx.TimeoutException as exc:
        error_msg = f"[{datetime.now()}] 请求超时: {exc}"
        logging.error(error_msg, exc_info=True)
        raise self.retry(exc=exc)

    except httpx.HTTPStatusError as exc:
        # 对不同的HTTP状态码采用不同策略
        if exc.response.status_code in (502, 503, 504):
            retry_delay = min(60 * (self.request.retries + 1), 300)  # 最大延迟5分钟
            error_msg = f"[{datetime.now()}] 服务器错误 {exc.response.status_code}"
            logging.error(error_msg)
            raise self.retry(exc=exc, countdown=retry_delay)
        else:
            error_msg = f"[{datetime.now()}] HTTP错误 {exc.response.status_code}"
            logging.error(error_msg)
            raise exc  # 非服务器错误不再重试

    except httpx.RequestError as exc:
        error_msg = f"[{datetime.now()}] 请求失败: {exc}"
        logging.error(error_msg, exc_info=True)
        raise self.retry(exc=exc)

    except Exception as exc:
        error_msg = f"[{datetime.now()}] 未知错误: {exc}"
        logging.exception(error_msg)
        raise self.retry(exc=exc)

运行
celery -A queues.celery_app worker --loglevel=info --concurrency=4

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!