前言

這是 Jaba AI 技術分享系列 的第六篇文章。

專案整合 時,我們將 jaba(FastAPI + LINE Bot SDK v3)和 jaba-line-bot(Flask 代理)整合成單一應用。這篇文章分享 LINE Bot SDK v3 與 FastAPI 整合的實作細節。


LINE Bot SDK v3 的特點

LINE Bot SDK v3 有幾個與舊版不同的設計:

套件結構

from linebot.v3.webhook import WebhookParser
from linebot.v3.messaging import MessagingApi, Configuration, ApiClient
from linebot.v3.messaging import TextMessage, ReplyMessageRequest

特點:

  • 命名空間包含版本號 linebot.v3
  • API 客戶端需要明確的 Configuration
  • 訊息類別名稱簡潔(TextMessage 而非 TextSendMessage

API 初始化流程

# v3 需要三步驟初始化
configuration = Configuration(access_token=access_token)
api_client = ApiClient(configuration)
messaging_api = MessagingApi(api_client)

# 使用 Request 物件包裝參數
messaging_api.reply_message(
    ReplyMessageRequest(
        reply_token=reply_token,
        messages=[TextMessage(text="Hello")]
    )
)

初始化步驟:

  1. 建立 Configuration 物件(設定 access token)
  2. 建立 ApiClient 物件
  3. 建立 MessagingApi 物件
  4. 使用 Request 物件包裝 API 參數

Webhook 設定

FastAPI 路由

# app/routers/line_webhook.py
from fastapi import APIRouter, Depends, HTTPException, Request, Header
from linebot.v3.webhook import WebhookParser
from linebot.v3.exceptions import InvalidSignatureError
from linebot.v3.webhooks import (
    MessageEvent,
    TextMessageContent,
    JoinEvent,
    LeaveEvent,
    PostbackEvent,
)
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import settings
from app.database import get_db
from app.services import LineService

router = APIRouter(prefix="/api/webhook", tags=["webhook"])

# Webhook Parser
parser = WebhookParser(settings.line_channel_secret)


@router.post("/line")
async def line_callback(
    request: Request,
    x_line_signature: str = Header(...),
    db: AsyncSession = Depends(get_db),
):
    """LINE Webhook 回調"""
    # 讀取請求 body
    body = await request.body()
    body_str = body.decode("utf-8")

    # 驗證簽章並解析事件
    try:
        events = parser.parse(body_str, x_line_signature)
    except InvalidSignatureError:
        raise HTTPException(status_code=400, detail="Invalid signature")

    # 處理事件
    service = LineService(db)

    for event in events:
        try:
            if isinstance(event, MessageEvent):
                if isinstance(event.message, TextMessageContent):
                    await handle_text_message(service, event)
            elif isinstance(event, JoinEvent):
                await handle_join_event(service, event)
            elif isinstance(event, LeaveEvent):
                await handle_leave_event(service, event)
            elif isinstance(event, PostbackEvent):
                await handle_postback_event(service, event)
        except Exception as e:
            logger.error(f"Error handling event: {e}")

    return {"status": "ok"}

事件處理函數

async def handle_text_message(service: LineService, event: MessageEvent):
    """處理文字訊息"""
    text = event.message.text
    user_id = event.source.user_id

    # 判斷來源類型
    source_type = event.source.type
    group_id = None

    if source_type == "group":
        group_id = event.source.group_id
    elif source_type == "room":
        group_id = event.source.room_id

    # 處理訊息
    await service.handle_message(
        user_id=user_id,
        group_id=group_id,
        text=text,
        reply_token=event.reply_token,
    )


async def handle_join_event(service: LineService, event: JoinEvent):
    """處理加入群組事件"""
    source_type = event.source.type

    if source_type == "group":
        group_id = event.source.group_id
    elif source_type == "room":
        group_id = event.source.room_id
    else:
        return

    await service.handle_join(
        group_id=group_id,
        reply_token=event.reply_token,
    )

LINE Service 設計

初始化

# app/services/line_service.py
from linebot.v3.messaging import (
    ApiClient,
    Configuration,
    MessagingApi,
    ReplyMessageRequest,
    PushMessageRequest,
    TextMessage,
    QuickReply,
    QuickReplyItem,
    PostbackAction,
)

class LineService:
    """LINE 服務"""

    def __init__(self, session: AsyncSession):
        self.session = session
        self.channel_secret = settings.line_channel_secret
        self.channel_access_token = settings.line_channel_access_token

        # 設定 API 客戶端
        configuration = Configuration(access_token=self.channel_access_token)
        self.api_client = ApiClient(configuration)
        self.messaging_api = MessagingApi(self.api_client)

        # 初始化 Repositories
        self.user_repo = UserRepository(session)
        self.group_repo = GroupRepository(session)
        # ...

回覆訊息

async def reply_message(self, reply_token: str, message: str) -> None:
    """回覆訊息"""
    try:
        self.messaging_api.reply_message(
            ReplyMessageRequest(
                reply_token=reply_token,
                messages=[TextMessage(text=message)],
            )
        )
    except Exception as e:
        logger.error(f"Reply message error: {e}")

推送訊息

async def push_message(self, to: str, message: str) -> None:
    """推送訊息(不需要 reply_token)"""
    try:
        self.messaging_api.push_message(
            PushMessageRequest(
                to=to,
                messages=[TextMessage(text=message)],
            )
        )
    except Exception as e:
        logger.error(f"Push message error: {e}")

Quick Reply 按鈕

async def _reply_with_quick_reply(
    self, reply_token: str, message: str, items: list
) -> None:
    """回覆帶有 Quick Reply 按鈕的訊息"""
    try:
        self.messaging_api.reply_message(
            ReplyMessageRequest(
                reply_token=reply_token,
                messages=[
                    TextMessage(
                        text=message,
                        quick_reply=QuickReply(items=items),
                    )
                ],
            )
        )
    except Exception as e:
        logger.error(f"Quick reply error: {e}")


# 使用範例
async def show_menu_options(self, reply_token: str):
    """顯示菜單選項"""
    items = [
        QuickReplyItem(
            action=PostbackAction(label="今日菜單", data="action=menu")
        ),
        QuickReplyItem(
            action=PostbackAction(label="開始點餐", data="action=start")
        ),
        QuickReplyItem(
            action=PostbackAction(label="查看訂單", data="action=orders")
        ),
    ]

    await self._reply_with_quick_reply(
        reply_token,
        "請選擇操作:",
        items
    )

取得使用者資料

個人使用者

async def get_user_profile(self, user_id: str) -> Optional[dict]:
    """取得使用者資料"""
    try:
        profile = self.messaging_api.get_profile(user_id)
        return {
            "user_id": profile.user_id,
            "display_name": profile.display_name,
            "picture_url": profile.picture_url,
        }
    except Exception as e:
        logger.error(f"Get user profile error: {e}")
        return None

群組成員

async def get_group_member_profile(
    self, group_id: str, user_id: str
) -> Optional[dict]:
    """取得群組成員資料"""
    try:
        profile = self.messaging_api.get_group_member_profile(group_id, user_id)
        return {
            "user_id": profile.user_id,
            "display_name": profile.display_name,
            "picture_url": profile.picture_url,
        }
    except Exception as e:
        logger.error(f"Get group member profile error: {e}")
        return None

群組資訊

async def get_group_name(self, group_id: str) -> str:
    """取得群組名稱"""
    try:
        summary = self.messaging_api.get_group_summary(group_id)
        return summary.group_name
    except Exception as e:
        logger.error(f"Get group name error: {e}")
        return ""

訊息處理流程

主入口

async def handle_message(
    self,
    user_id: str,
    group_id: Optional[str],
    text: str,
    reply_token: str,
) -> None:
    """處理訊息 - 主入口"""
    # 1. 取得或建立使用者
    user = await self.user_repo.get_or_create(user_id)

    # 2. 檢查封鎖狀態
    if user.is_banned:
        return  # 靜默忽略

    # 3. 嘗試取得顯示名稱
    if not user.display_name:
        if group_id:
            profile = await self.get_group_member_profile(group_id, user_id)
        else:
            profile = await self.get_user_profile(user_id)
        if profile:
            user.display_name = profile["display_name"]
            await self.user_repo.update(user)

    # 4. 區分個人/群組訊息
    if group_id:
        await self._handle_group_message(user, group_id, text, reply_token)
    else:
        await self._handle_personal_message(user, text, reply_token)

群組訊息處理

async def _handle_group_message(
    self,
    user: User,
    line_group_id: str,
    text: str,
    reply_token: str,
) -> None:
    """處理群組訊息"""
    # 1. 取得或建立群組
    group = await self.group_repo.get_or_create(line_group_id)

    # 2. 更新群組名稱
    if not group.name:
        group_name = await self.get_group_name(line_group_id)
        if group_name:
            group.name = group_name
            await self.group_repo.update(group)

    # 3. 檢查群組狀態
    if group.status == "suspended":
        return  # 被凍結的群組不回應

    if group.status != "active":
        # 未啟用的群組,引導申請
        await self._handle_pending_group_chat(user, group, text, reply_token)
        return

    # 4. 記錄成員
    await self.member_repo.add_member(group.id, user.id)

    # 5. 檢查點餐狀態
    active_session = await self.session_repo.get_active_session(group.id)
    is_ordering = active_session is not None

    # 6. 處理快捷指令
    quick_response = await self._handle_quick_command(
        user, group, text.strip(), active_session
    )
    if quick_response:
        await self.reply_message(reply_token, quick_response)
        return

    # 7. 根據狀態決定是否回應
    should_reply = self._should_respond_in_group(text, is_ordering)
    if not should_reply:
        return

    # 8. 呼叫 AI 處理
    await self._handle_ai_chat(user, group, active_session, text, reply_token)

回應策略

def _should_respond_in_group(
    self, text: str, is_ordering: bool
) -> tuple[bool, str]:
    """判斷群組中是否應該回應

    Returns:
        (should_respond, cleaned_message)
    """
    text_lower = text.lower()

    if is_ordering:
        # 點餐中:所有訊息都回應
        return True, text

    # 非點餐中:只回應特定指令
    if text in ["開單", "菜單"]:
        return True, text

    # 呼叫幫助(@呷爸、呷爸)
    trigger_keywords = ["jaba", "呷爸", "點餐"]
    for keyword in trigger_keywords:
        if text_lower in [keyword.lower(), f"@{keyword.lower()}"]:
            return True, "help"

    return False, text

回傳 tuple 的好處是可以同時決定是否回應,並清理訊息(例如將 @呷爸 轉換成 help 指令)。


簽章驗證

SDK v3 的 WebhookParser 會自動驗證簽章:

try:
    events = parser.parse(body_str, x_line_signature)
except InvalidSignatureError:
    raise HTTPException(status_code=400, detail="Invalid signature")

如果需要手動驗證:

import hashlib
import hmac
import base64

def verify_signature(self, body: str, signature: str) -> bool:
    """手動驗證 LINE 簽章"""
    hash = hmac.new(
        self.channel_secret.encode("utf-8"),
        body.encode("utf-8"),
        hashlib.sha256,
    ).digest()
    expected_signature = base64.b64encode(hash).decode("utf-8")
    return hmac.compare_digest(signature, expected_signature)

注意事項

1. Reply Token 有效期

Reply Token 只能使用一次,且有時效限制(約 1 分鐘):

# 錯誤:同一個 token 用兩次
await self.reply_message(reply_token, "訊息 1")
await self.reply_message(reply_token, "訊息 2")  # 會失敗

# 正確:合併成一次回覆
await self.reply_message(reply_token, "訊息 1\n\n訊息 2")

2. 非同步注意

SDK v3 的 API 呼叫是同步的,但我們在 FastAPI 的 async 函數中使用。目前的做法是直接呼叫(會阻塞一小段時間):

async def reply_message(self, reply_token: str, message: str):
    # 這裡的 reply_message 是同步呼叫
    self.messaging_api.reply_message(...)

如果需要真正的非同步,可以用 asyncio.to_thread

import asyncio

async def reply_message(self, reply_token: str, message: str):
    await asyncio.to_thread(
        self.messaging_api.reply_message,
        ReplyMessageRequest(...)
    )

3. 錯誤處理

LINE API 可能因為各種原因失敗,需要妥善處理:

async def reply_message(self, reply_token: str, message: str) -> None:
    try:
        self.messaging_api.reply_message(...)
    except Exception as e:
        logger.error(f"Reply message error: {e}")
        # 不要 raise,避免影響整體流程

總結

LINE Bot SDK v3 與 FastAPI 整合的重點:

項目 說明
命名空間 使用 linebot.v3.*
API 初始化 Configuration → ApiClient → MessagingApi
訊息類別 TextMessageReplyMessageRequest
Webhook 處理 WebhookParser 解析事件
非同步整合 在 async 函數中呼叫同步 SDK API

SDK v3 的好處:

  • 清晰的 API 結構
  • 良好的型別提示
  • 與 FastAPI 架構整合良好

下一篇

下一篇文章會說明群組權限系統的設計:LINE 群組權限設計:從申請到審核的完整流程


系列文章