📚 系列文章
這篇文章要解決什麼問題?
使用者:「又要記一組新密碼?我已經有公司信箱、NAS、ERP 三組密碼了…」
IT:「大家密碼都設太簡單,還到處貼便利貼,資安風險很高。」
老闆:「有沒有辦法讓員工少記密碼,又能確保安全?」
後端工程師:「我們直接用 NAS 帳號驗證,員工用熟悉的帳密登入,我們不另外存密碼,安全責任回歸 NAS。」
IT:「這樣離職員工停用 NAS 帳號,這個系統也自動不能用了,管理很方便!」
開發內部系統時,最頭痛的問題之一就是帳號管理:
- 要不要自己建一套帳號密碼系統?
- 使用者又要記一組新密碼?
- 密碼忘記了誰來重設?
- 密碼怎麼安全地儲存?
如果公司已經有 NAS(網路儲存設備),裡面本來就有一套帳號系統。能不能借用 NAS 的帳號來登入我們的系統?這樣使用者用熟悉的帳密就能登入,還能直接存取 NAS 上的檔案。這就是本篇要實作的「用 NAS 帳號實現 SSO 效果」。
技術概念
什麼是 SSO?
SSO(Single Sign-On,單一登入)是指使用者只需要登入一次,就能存取多個系統。
登入一次
使用者 ──────────────────────▶ 公司 NAS
╱ │ ╲
╱ │ ╲
▼ ▼ ▼
系統A 系統B 系統C
真正的 SSO 需要 LDAP、OAuth 等協定,設定較複雜。我們的做法是簡化版 SSO:
- 使用者輸入 NAS 帳密
- 我們拿這組帳密去 NAS 驗證
- 驗證成功就發 Session Token
Session Token 機制
登入成功後,我們不可能每次 API 請求都要求使用者輸入帳密。所以需要一個「通行證」來證明「這個人已經登入過了」。
登入流程:
使用者 ──帳號密碼──▶ 後端 ──SMB認證──▶ NAS
│
▼ 驗證成功
產生 Session Token
│
▼
使用者 ◀──Token──────┘
存入 localStorage
後續請求:
使用者 ──帶 Token──▶ 後端 ──查詢 Token──▶ Session Store
│
▼ Token 有效
回傳資料或執行操作
我們的 Session 設計
| 項目 | 選擇 | 原因 |
|---|---|---|
| Token 格式 | UUID v4 | 足夠隨機,不可預測 |
| 儲存位置 | 後端記憶體 | 簡單、快速、重啟即失效 |
| 有效時間 | 8 小時 | 一個工作天 |
| 密碼暫存 | 是 | 需要密碼才能存取 NAS 檔案 |
跟著做:Step by Step
步驟 1:定義 Session 資料結構
首先定義 Session 需要儲存哪些資料:
# models/auth.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SessionData:
"""Session 資料結構"""
username: str # 使用者帳號
password: str # SMB 密碼(後續檔案操作需要)
nas_host: str # NAS 主機位址
user_id: int | None # 資料庫使用者 ID
created_at: datetime # 建立時間
expires_at: datetime # 過期時間
為什麼要存密碼?因為後續使用者要瀏覽 NAS 檔案時,每次 SMB 連線都需要帳密。
步驟 2:實作 Session Manager
接下來建立管理 Session 的類別:
# services/session.py
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Optional
class SessionManager:
"""Session 管理器
以記憶體儲存 session 資料,server 重啟後 session 失效。
"""
def __init__(self):
self._sessions: dict[str, SessionData] = {}
self._cleanup_task: asyncio.Task | None = None
def create_session(
self,
username: str,
password: str,
nas_host: str | None = None,
user_id: int | None = None
) -> str:
"""建立新 session,回傳 token"""
token = str(uuid.uuid4()) # 產生隨機 token
now = datetime.now()
# 預設 8 小時過期
expires_at = now + timedelta(hours=8)
self._sessions[token] = SessionData(
username=username,
password=password,
nas_host=nas_host or "192.168.11.50",
user_id=user_id,
created_at=now,
expires_at=expires_at,
)
return token
def get_session(self, token: str) -> Optional[SessionData]:
"""取得 session,若過期則回傳 None"""
session = self._sessions.get(token)
if session is None:
return None
# 檢查是否過期
if datetime.now() > session.expires_at:
self.delete_session(token)
return None
return session
def delete_session(self, token: str) -> bool:
"""刪除 session(登出用)"""
if token in self._sessions:
del self._sessions[token]
return True
return False
# 建立全域實例
session_manager = SessionManager()
步驟 3:實作登入 API
登入 API 的流程:
- 收到帳密
- 用 SMB 向 NAS 驗證
- 驗證成功則建立 Session
- 回傳 Token 給前端
# api/auth.py
from fastapi import APIRouter, HTTPException, Request, status
from pydantic import BaseModel
from ..services.session import session_manager
from ..services.smb import create_smb_service, SMBAuthError, SMBConnectionError
router = APIRouter(prefix="/api/auth", tags=["auth"])
class LoginRequest(BaseModel):
"""登入請求"""
username: str
password: str
class LoginResponse(BaseModel):
"""登入回應"""
success: bool
token: str | None = None
username: str | None = None
error: str | None = None
@router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest, req: Request) -> LoginResponse:
"""登入並建立 session
使用 NAS SMB 認證驗證使用者身份。
"""
# 建立 SMB 服務來測試認證
smb = create_smb_service(request.username, request.password)
try:
# 嘗試連線 NAS,驗證帳密
smb.test_auth()
except SMBAuthError:
# 帳密錯誤
return LoginResponse(success=False, error="帳號或密碼錯誤")
except SMBConnectionError:
# NAS 連不上
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="無法連線至檔案伺服器",
)
# 認證成功,建立 session
token = session_manager.create_session(
username=request.username,
password=request.password
)
return LoginResponse(
success=True,
token=token,
username=request.username,
)
步驟 4:實作登出 API
登出就是刪除 Session:
# api/auth.py(續)
from fastapi import Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer(auto_error=False)
def get_token(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> str:
"""從 Authorization header 取得 token"""
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未授權,請重新登入",
)
return credentials.credentials
class LogoutResponse(BaseModel):
success: bool
@router.post("/logout", response_model=LogoutResponse)
async def logout(token: str = Depends(get_token)) -> LogoutResponse:
"""登出並清除 session"""
session_manager.delete_session(token)
return LogoutResponse(success=True)
步驟 5:保護需要認證的 API
建立一個依賴注入函式,用來驗證 Token:
# api/auth.py(續)
async def get_current_session(token: str = Depends(get_token)) -> SessionData:
"""驗證 token 並取得目前 session
用於保護需要登入才能存取的 API。
"""
session = session_manager.get_session(token)
if session is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未授權,請重新登入",
)
return session
使用方式:
# api/nas.py
from .auth import get_current_session
@router.get("/files")
async def list_files(
path: str,
session: SessionData = Depends(get_current_session)
):
"""列出檔案(需要登入)"""
# session.username 和 session.password 可用於 SMB 連線
smb = create_smb_service(session.username, session.password)
return smb.list_directory(path)
步驟 6:前端登入實作
// login.js
const LoginModule = (function() {
async function login(username, password) {
const response = await fetch('/api/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username, password })
});
const data = await response.json();
if (data.success) {
// 儲存 token 到 localStorage
localStorage.setItem('session_token', data.token);
localStorage.setItem('username', data.username);
return { success: true };
} else {
return { success: false, error: data.error };
}
}
async function logout() {
const token = localStorage.getItem('session_token');
if (token) {
await fetch('/api/auth/logout', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`
}
});
}
// 清除本地儲存
localStorage.removeItem('session_token');
localStorage.removeItem('username');
}
function getToken() {
return localStorage.getItem('session_token');
}
function isLoggedIn() {
return !!getToken();
}
return { login, logout, getToken, isLoggedIn };
})();
步驟 7:API 請求自動帶 Token
// api-client.js
const ApiClient = (function() {
async function request(url, options = {}) {
const token = LoginModule.getToken();
// 自動加入 Authorization header
const headers = {
...options.headers,
};
if (token) {
headers['Authorization'] = `Bearer ${token}`;
}
const response = await fetch(url, {
...options,
headers
});
// 若收到 401,跳轉登入頁
if (response.status === 401) {
LoginModule.logout();
window.location.href = '/login.html';
throw new Error('Session expired');
}
return response.json();
}
return {
get: (url) => request(url),
post: (url, data) => request(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
})
};
})();
進階技巧與踩坑紀錄
1. 為什麼用記憶體而不用 Redis?
| 方案 | 優點 | 缺點 |
|---|---|---|
| 記憶體 | 簡單、快速、無額外依賴 | 重啟失效、無法水平擴展 |
| Redis | 持久化、可跨服務共享 | 需額外維護、複雜度增加 |
對於單機部署的內部系統,記憶體儲存完全夠用。重啟後使用者重新登入即可。
2. Session 定期清理
過期的 Session 會累積在記憶體中,需要定期清理:
class SessionManager:
# ... 前面的程式碼 ...
def cleanup_expired(self) -> int:
"""清理過期的 session"""
now = datetime.now()
expired_tokens = [
token
for token, session in self._sessions.items()
if now > session.expires_at
]
for token in expired_tokens:
del self._sessions[token]
return len(expired_tokens)
async def start_cleanup_task(self):
"""啟動背景清理任務(每 30 分鐘執行一次)"""
async def cleanup_loop():
while True:
await asyncio.sleep(30 * 60) # 30 分鐘
count = self.cleanup_expired()
if count > 0:
print(f"Cleaned up {count} expired sessions")
self._cleanup_task = asyncio.create_task(cleanup_loop())
在應用程式啟動時啟動清理任務:
# main.py
@app.on_event("startup")
async def startup():
await session_manager.start_cleanup_task()
3. 密碼安全考量
Session 中儲存明文密碼是必要的(SMB 需要),但要注意:
- 只存記憶體:不寫入資料庫或日誌
- 重啟即失效:Server 重啟,密碼隨之消失
- 過期即清除:Session 過期,密碼也被清除
- HTTPS 傳輸:前後端通訊必須加密
4. Token 放 Header 還是 Query?
正常情況下,Token 應該放在 Authorization header。但有些情況無法設定 header,例如:
<img src="/api/file/xxx"><a href="/api/download/xxx">
這時可以允許 Query Parameter:
def get_session_from_token_or_query(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
token: str | None = None, # Query parameter
) -> SessionData:
"""從 header 或 query parameter 取得 session"""
# 優先使用 header
actual_token = None
if credentials is not None:
actual_token = credentials.credentials
elif token is not None:
actual_token = token
if actual_token is None:
raise HTTPException(status_code=401, detail="未授權")
session = session_manager.get_session(actual_token)
if session is None:
raise HTTPException(status_code=401, detail="Session 已過期")
return session
使用方式:
<!-- 圖片預覽 -->
<img src="/api/file/preview?path=/photos/test.jpg&token=xxx-xxx-xxx">
5. 取得客戶端真實 IP
如果前面有 Nginx 反向代理,request.client.host 會是 Nginx 的 IP。需要從 header 取得真實 IP:
def get_client_ip(req: Request) -> str:
"""取得客戶端真實 IP"""
# 檢查 X-Forwarded-For(經過代理)
forwarded = req.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
# 檢查 X-Real-IP
real_ip = req.headers.get("x-real-ip")
if real_ip:
return real_ip
# 直接連線
return req.client.host if req.client else "127.0.0.1"
記得在 Nginx 設定:
location /api {
proxy_pass http://backend:8000;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Real-IP $remote_addr;
}
小結
這篇文章實作了:
- NAS SMB 認證:借用既有帳號系統,無需自己管理密碼
- Session Token 機制:登入後發放 Token,後續請求帶 Token
- Session Manager:記憶體儲存、自動過期、定期清理
- API 認證保護:透過依賴注入保護需要登入的 API
- 前端整合:localStorage 儲存、自動帶 Token
下一篇我們會加入登入追蹤功能,記錄每次登入的裝置、地點等資訊,偵測異常登入行為。
完整程式碼
Session Manager
"""Session 管理服務"""
import asyncio
import uuid
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
@dataclass
class SessionData:
"""Session 資料結構"""
username: str
password: str
nas_host: str
user_id: int | None
created_at: datetime
expires_at: datetime
class SessionManager:
"""Session 管理器"""
def __init__(self, ttl_hours: int = 8):
self._sessions: dict[str, SessionData] = {}
self._cleanup_task: asyncio.Task | None = None
self._ttl_hours = ttl_hours
def create_session(
self,
username: str,
password: str,
nas_host: str = "192.168.11.50",
user_id: int | None = None
) -> str:
"""建立新 session,回傳 token"""
token = str(uuid.uuid4())
now = datetime.now()
expires_at = now + timedelta(hours=self._ttl_hours)
self._sessions[token] = SessionData(
username=username,
password=password,
nas_host=nas_host,
user_id=user_id,
created_at=now,
expires_at=expires_at,
)
return token
def get_session(self, token: str) -> Optional[SessionData]:
"""取得 session,若過期則回傳 None"""
session = self._sessions.get(token)
if session is None:
return None
if datetime.now() > session.expires_at:
self.delete_session(token)
return None
return session
def delete_session(self, token: str) -> bool:
"""刪除 session"""
if token in self._sessions:
del self._sessions[token]
return True
return False
def cleanup_expired(self) -> int:
"""清理過期 session"""
now = datetime.now()
expired = [t for t, s in self._sessions.items() if now > s.expires_at]
for token in expired:
del self._sessions[token]
return len(expired)
async def start_cleanup_task(self, interval_minutes: int = 30):
"""啟動背景清理任務"""
async def cleanup_loop():
while True:
await asyncio.sleep(interval_minutes * 60)
count = self.cleanup_expired()
if count > 0:
print(f"Cleaned up {count} expired sessions")
self._cleanup_task = asyncio.create_task(cleanup_loop())
@property
def active_count(self) -> int:
"""目前活躍 session 數量"""
return len(self._sessions)
# 全域實例
session_manager = SessionManager()
認證 API
"""認證 API"""
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from ..services.session import session_manager, SessionData
from ..services.smb import create_smb_service, SMBAuthError, SMBConnectionError
router = APIRouter(prefix="/api/auth", tags=["auth"])
security = HTTPBearer(auto_error=False)
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
success: bool
token: str | None = None
username: str | None = None
error: str | None = None
class LogoutResponse(BaseModel):
success: bool
def get_token(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> str:
"""取得 token"""
if credentials is None:
raise HTTPException(status_code=401, detail="未授權,請重新登入")
return credentials.credentials
async def get_current_session(token: str = Depends(get_token)) -> SessionData:
"""取得目前 session(用於保護 API)"""
session = session_manager.get_session(token)
if session is None:
raise HTTPException(status_code=401, detail="未授權,請重新登入")
return session
@router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest) -> LoginResponse:
"""登入"""
smb = create_smb_service(request.username, request.password)
try:
smb.test_auth()
except SMBAuthError:
return LoginResponse(success=False, error="帳號或密碼錯誤")
except SMBConnectionError:
raise HTTPException(status_code=503, detail="無法連線至檔案伺服器")
token = session_manager.create_session(request.username, request.password)
return LoginResponse(success=True, token=token, username=request.username)
@router.post("/logout", response_model=LogoutResponse)
async def logout(token: str = Depends(get_token)) -> LogoutResponse:
"""登出"""
session_manager.delete_session(token)
return LogoutResponse(success=True)
前端登入模組
/**
* 登入模組
*/
const LoginModule = (function() {
const TOKEN_KEY = 'session_token';
const USERNAME_KEY = 'username';
async function login(username, password) {
const response = await fetch('/api/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username, password })
});
const data = await response.json();
if (data.success) {
localStorage.setItem(TOKEN_KEY, data.token);
localStorage.setItem(USERNAME_KEY, data.username);
return { success: true };
}
return { success: false, error: data.error };
}
async function logout() {
const token = getToken();
if (token) {
try {
await fetch('/api/auth/logout', {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}` }
});
} catch (e) {
// 忽略登出 API 錯誤
}
}
localStorage.removeItem(TOKEN_KEY);
localStorage.removeItem(USERNAME_KEY);
}
function getToken() {
return localStorage.getItem(TOKEN_KEY);
}
function getUsername() {
return localStorage.getItem(USERNAME_KEY);
}
function isLoggedIn() {
return !!getToken();
}
return { login, logout, getToken, getUsername, isLoggedIn };
})();