前言
在完成模組化重構之後,系統的程式碼結構已經大幅改善。但在實際運行中,我們碰到了兩個棘手的問題:
- Server 重啟後,所有使用者被強制登出 – 因為 session 存在記憶體裡,重啟就歸零
- SMB 和文件操作會卡住整個 event loop – 一個人在下載大檔案,其他人的 API 回應就變慢
這篇文章記錄我們如何用 PostgreSQL Session 持久化和 Thread Pool 來解決這兩個問題。
問題分析:記憶體 Session 的局限
原本的 session 管理很直觀 – 用一個 Python dict 存放所有 session:
# 舊架構(示意)
_sessions: dict[str, SessionData] = {}
def create_session(username, password):
token = str(uuid4())
_sessions[token] = SessionData(username=username, password=password, ...)
return token
def get_session(token):
return _sessions.get(token)
這在開發階段沒什麼問題,但到了實際部署就暴露了三個缺陷:
| 問題 | 影響 |
|---|---|
| Server 重啟 | 所有 session 消失,全員重新登入 |
| 多 Worker 部署 | 各 worker 的 dict 獨立,session 無法共享 |
| 密碼明文儲存 | SessionData.password 直接存放明文,記憶體 dump 可見 |
尤其是第一點,我們使用 Uvicorn 搭配 Docker 部署,每次更新程式碼或設定變更都需要重啟容器,使用者就得重新登入,體驗很差。
Session 持久化:PostgreSQL + AES-256-GCM
資料表設計
我們用 Alembic migration 建立 sessions 表:
# migrations/versions/005_sessions_table.py
def upgrade() -> None:
op.create_table(
'sessions',
sa.Column('token', sa.Text(), primary_key=True),
sa.Column('user_id', sa.Integer(),
sa.ForeignKey('users.id', ondelete='CASCADE'), nullable=True),
sa.Column('username', sa.Text(), nullable=False),
sa.Column('password_enc', sa.Text(), nullable=False, server_default=''),
sa.Column('nas_host', sa.Text(), nullable=False),
sa.Column('role', sa.Text(), nullable=False, server_default='user'),
sa.Column('app_permissions', JSONB(), nullable=False, server_default='{}'),
sa.Column('created_at', sa.DateTime(timezone=True),
nullable=False, server_default=sa.text('NOW()')),
sa.Column('expires_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('last_accessed_at', sa.DateTime(timezone=True),
nullable=False, server_default=sa.text('NOW()')),
)
op.create_index('idx_sessions_expires', 'sessions', ['expires_at'])
op.create_index('idx_sessions_user_id', 'sessions', ['user_id'])
幾個設計重點:
token作為 primary key:UUID v4,不需要自增 IDpassword_enc:加密後的密碼,不是明文DateTime(timezone=True):所有時間欄位都帶時區,避免 naive datetime 造成的混亂last_accessed_at:追蹤最後存取時間,方便後續分析使用者活躍度expires_at索引:加速過期 session 的清理查詢
密碼加密:AES-256-GCM
因為 SMB 操作需要使用者的明文密碼(smbprotocol 要求),我們不能用 hash,只能用對稱加密。選擇 AES-256-GCM 的原因:
- 同時提供加密和認證(Authenticated Encryption)
- GCM 模式不需要 padding,效能好
- 業界標準,
cryptography套件原生支援
# utils/crypto.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
def _get_encryption_key() -> bytes:
"""從環境變數取得金鑰,SHA-256 產生固定 32 bytes"""
key_str = os.getenv("BOT_SECRET_KEY", "")
if not key_str:
key_str = "ching-tech-os-default-dev-key-2024" # 僅限開發環境
return hashlib.sha256(key_str.encode()).digest()
def encrypt_credential(plaintext: str) -> str:
"""AES-256-GCM 加密,回傳 base64(nonce + ciphertext + tag)"""
key = _get_encryption_key()
aesgcm = AESGCM(key)
nonce = secrets.token_bytes(12) # GCM 建議 12 bytes nonce
ciphertext = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
return base64.b64encode(nonce + ciphertext).decode("ascii")
def decrypt_credential(encrypted: str) -> str:
"""解密"""
key = _get_encryption_key()
aesgcm = AESGCM(key)
data = base64.b64decode(encrypted.encode("ascii"))
nonce = data[:12]
ciphertext = data[12:]
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
return plaintext.decode("utf-8")
加密流程:
明文密碼 → AES-256-GCM 加密 → base64 編碼 → 存入 password_enc 欄位
↑
BOT_SECRET_KEY 環境變數 → SHA-256 → 32 bytes 金鑰
注意:生產環境必須設定
BOT_SECRET_KEY環境變數。應用程式啟動時會檢查並發出警告。
SessionManager 實作
# services/session.py
class SessionManager:
"""Session 管理器(PostgreSQL 持久化)"""
def __init__(self):
self._cleanup_task: asyncio.Task | None = None
self._cache = _SessionCache()
async def create_session(self, username, password, nas_host=None,
user_id=None, role="user", app_permissions=None):
token = str(uuid4())
password_enc = encrypt_credential(password) if password else ""
async with get_connection() as conn:
await conn.execute("""
INSERT INTO sessions
(token, user_id, username, password_enc, nas_host,
role, app_permissions, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7,
NOW() + $8 * INTERVAL '1 hour')
""", token, user_id, username, password_enc,
nas_host or settings.nas_host, role,
app_permissions or {}, float(settings.session_ttl_hours))
return token
Session 的生命週期由 expires_at 控制,預設 8 小時(透過 SESSION_TTL_HOURS 環境變數設定)。
TTL Cache:減少資料庫查詢
Session 驗證是最高頻的操作 – 每個 API 請求都要驗證 session。如果每次都查資料庫,效能會很差。
設計思路
我們實作了一個簡易的 TTL(Time-To-Live)cache,不需要引入 Redis 或其他外部套件:
class _SessionCache:
"""簡易 TTL cache(不需外部套件)"""
def __init__(self, ttl: int = 30): # 預設 30 秒
self._store: dict[str, tuple[SessionData, float]] = {}
self._ttl = ttl
def get(self, token: str) -> SessionData | None:
entry = self._store.get(token)
if entry is None:
return None
data, expire_at = entry
if time.monotonic() > expire_at:
del self._store[token]
return None
return data
def set(self, token: str, data: SessionData) -> None:
self._store[token] = (data, time.monotonic() + self._ttl)
def delete(self, token: str) -> None:
self._store.pop(token, None)
查詢流程
async def get_session(self, token: str) -> Optional[SessionData]:
# 1. 先查 cache
cached = self._cache.get(token)
if cached is not None:
return cached
# 2. Cache miss → 查 DB 並更新 last_accessed_at
async with get_connection() as conn:
row = await conn.fetchrow("""
UPDATE sessions SET last_accessed_at = NOW()
WHERE token = $1 AND expires_at > NOW()
RETURNING username, password_enc, nas_host, user_id,
created_at, expires_at, role, app_permissions
""", token)
if row is None:
return None
# 3. 解密密碼
password = decrypt_credential(row["password_enc"]) if row["password_enc"] else ""
session = SessionData(
username=row["username"],
password=password,
nas_host=row["nas_host"],
# ... 其他欄位
)
# 4. 寫入 cache
self._cache.set(token, session)
return session
這裡有幾個細節值得說明:
| 設計 | 理由 |
|---|---|
| TTL 30 秒 | 在「資料新鮮度」和「DB 負擔」之間取得平衡 |
time.monotonic() |
不受系統時鐘調整影響,比 time.time() 可靠 |
UPDATE ... RETURNING |
用一條 SQL 同時更新 last_accessed_at 和取回資料,減少 round trip |
| Cache miss 才查 DB | 30 秒內的重複請求直接從記憶體回傳 |
last_accessed_at 追蹤
last_accessed_at 讓我們可以追蹤使用者的活躍狀況:
- 哪些 session 已經長時間未使用?
- 使用者的實際使用頻率如何?
- 配合 TTL cache,
last_accessed_at的更新頻率自然被限制在每 30 秒一次,不會造成過多的 DB 寫入
背景清理任務
過期的 session 不會自動消失,需要定期清理:
async def start_cleanup_task(self):
"""啟動背景清理任務"""
async def cleanup_loop():
interval = settings.session_cleanup_interval_minutes * 60 # 10 分鐘
while True:
await asyncio.sleep(interval)
try:
count = await self.cleanup_expired()
if count > 0:
logger.info("Cleaned up %d expired sessions", count)
except Exception as e:
logger.error("Session cleanup failed: %s", e)
self._cleanup_task = asyncio.create_task(cleanup_loop())
清理邏輯很單純:
async def cleanup_expired(self) -> int:
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM sessions WHERE expires_at < NOW()"
)
return int(result.split()[-1]) # "DELETE 5" → 5
配合 idx_sessions_expires 索引,即使 session 數量很大,清理也很快。
Thread Pool:讓 SMB 和文件操作不阻塞
問題:asyncio 的致命弱點
FastAPI 底層用的是 asyncio event loop – 所有的 async 函式都在同一條執行緒上跑。如果某個操作是同步阻塞的(例如 SMB 網路 I/O、文件解析),它會卡住整個 event loop:
時間軸:
User A 請求 SMB 下載(阻塞 3 秒)
├── User B 的 API 請求 → 等待中...
├── User C 的 API 請求 → 等待中...
└── 完成 → User B、C 才開始處理
smbprotocol 和 python-docx/pptx 等套件都是同步的,直接在 async 函式裡呼叫會阻塞。
解決方案:專用執行緒池
我們建立了兩個專用的 ThreadPoolExecutor:
# services/workers/thread_pool.py
from concurrent.futures import ThreadPoolExecutor
# SMB 操作執行緒池(I/O 密集,4 條執行緒)
_smb_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="smb")
# 文件解析執行緒池(CPU 密集,2 條執行緒)
_doc_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="doc")
async def run_in_smb_pool(func, *args, **kwargs):
"""在 SMB 執行緒池中執行阻塞式操作"""
loop = asyncio.get_running_loop()
if kwargs:
return await loop.run_in_executor(_smb_pool, partial(func, *args, **kwargs))
return await loop.run_in_executor(_smb_pool, partial(func, *args) if args else func)
async def run_in_doc_pool(func, *args, **kwargs):
"""在文件解析執行緒池中執行阻塞式操作"""
loop = asyncio.get_running_loop()
if kwargs:
return await loop.run_in_executor(_doc_pool, partial(func, *args, **kwargs))
return await loop.run_in_executor(_doc_pool, partial(func, *args) if args else func)
def shutdown_pools():
"""關閉所有執行緒池(應用程式關閉時呼叫)"""
_smb_pool.shutdown(wait=False)
_doc_pool.shutdown(wait=False)
為什麼分成兩個池?
| 池 | 用途 | Worker 數 | 特性 |
|---|---|---|---|
_smb_pool |
NAS 檔案操作、SMB 認證 | 4 | I/O 密集,等待網路回應 |
_doc_pool |
文件解析(PDF、DOCX、PPTX) | 2 | CPU 密集,實際計算 |
分開的好處是隔離:即使 4 個 SMB 操作同時進行,文件解析仍有獨立的 2 條執行緒可用,不會互相搶奪資源。
實際使用範例
SMB 認證(登入):
# api/auth.py
from ..services.workers import run_in_smb_pool
# 原本會阻塞 event loop
# smb.test_auth()
# 改為在執行緒池中執行
await run_in_smb_pool(smb.test_auth)
NAS 檔案列表:
# api/nas.py
from ..services.workers import run_in_smb_pool
async def _list_files(path: str, session: SessionData):
def _exec():
smb = create_smb_service(session.username, session.password)
return smb.list_directory(path)
return await run_in_smb_pool(_exec)
文件內容擷取(MCP 工具):
# services/mcp/nas_tools.py
from ..workers import run_in_doc_pool
result = await run_in_doc_pool(document_reader.extract_text, str(full_path))
簡報上傳至 NAS:
# services/presentation.py
from .workers import run_in_smb_pool
await run_in_smb_pool(_upload)
修改後的時間軸
時間軸(Thread Pool):
User A 請求 SMB 下載 → 交給 smb-0 執行緒
├── User B 的 API 請求 → event loop 立刻處理 ✓
├── User C 的 API 請求 → event loop 立刻處理 ✓
└── smb-0 完成 → 回傳給 User A
Event loop 不再被阻塞,所有非 SMB/文件的操作都能即時回應。
時區修正
在 Session 持久化過程中,我們順手修正了一個長期存在的問題:時區處理不一致。
問題
PostgreSQL 的 NOW() 回傳的是帶時區的 timestamp(預設 UTC),但程式中有些地方用 naive datetime(沒有時區資訊),導致:
- 前端顯示的時間可能差 8 小時
- Session 過期判斷可能不準確
解決
- 資料庫層:所有
DateTime欄位都加上timezone=True - 應用層:統一使用
timezone.utc或台北時區
# services/mcp/server.py
TAIPEI_TZ = timezone(timedelta(hours=8))
def to_taipei_time(dt: datetime) -> datetime:
"""將 datetime 轉換為台北時區"""
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc) # naive → 視為 UTC
return dt.astimezone(TAIPEI_TZ)
- Session 表:
created_at、expires_at、last_accessed_at全部使用DateTime(timezone=True)
應用程式生命週期整合
所有新功能都整合到 FastAPI 的 lifespan 管理:
# main.py
@asynccontextmanager
async def lifespan(app: FastAPI):
# 啟動時
await init_db_pool() # 資料庫連線池
await session_manager.start_cleanup_task() # Session 清理
await terminal_service.start_cleanup_task() # Terminal 清理
start_scheduler() # 排程器
yield
# 關閉時
stop_scheduler()
await terminal_service.stop_cleanup_task()
terminal_service.close_all()
await session_manager.stop_cleanup_task()
shutdown_pools() # 關閉執行緒池
await close_db_pool() # 關閉資料庫
關閉順序很重要:先停排程、再停 session 清理、然後關執行緒池、最後才關資料庫連線。反過來的話,正在執行的任務可能會因為資料庫已關閉而拋出例外。
效能改善對比
| 指標 | 改善前 | 改善後 |
|---|---|---|
| Server 重啟 | 全員重新登入 | Session 不受影響 |
| 密碼儲存 | 記憶體明文 | AES-256-GCM 加密 |
| Session 驗證(cache hit) | N/A | < 1ms(記憶體讀取) |
| Session 驗證(cache miss) | N/A | ~2ms(單次 DB query) |
| SMB 下載(對其他用戶影響) | 阻塞 event loop 3-5 秒 | 0 影響 |
| 文件解析(對其他用戶影響) | 阻塞 event loop 1-3 秒 | 0 影響 |
| 多 Worker 部署 | Session 不共享 | 透過 PostgreSQL 共享 |
小結
這次的優化主要解決兩個核心問題:
- Session 持久化 – 從記憶體
dict搬到 PostgreSQL,搭配 AES-256-GCM 加密密碼、TTL cache 減少查詢、背景任務清理過期資料 - Thread Pool – 將阻塞式的 SMB 和文件操作移至專用執行緒池,讓 asyncio event loop 保持暢通
這些改動不涉及前端,也不改變 API 介面,屬於純後端的基礎設施升級。對使用者來說,最直觀的改善就是「更新後不用重新登入」和「別人在操作檔案時,我的操作不會變慢」。
下一步可以考慮的方向:
- Session 自動延展:活躍使用者的 session 自動延長過期時間
- 連線池監控:追蹤 thread pool 的使用率,動態調整 worker 數量
- 分散式 cache:如果未來需要多機部署,TTL cache 可以換成 Redis