需求背景:跨协议单点登录的挑战
这是我们公司一位年度运维服务客户的真实需求场景与服务案例。
该用户在内部 IT 运维管理上具有极其严格的安全规范,所有的远程运维操作,无论是应用系统、网络设备还是平台级资源,必须先通过企业零信任网关(aTrust)接入安全网络,再通过堡垒机进行身份认证与运维操作,并确保全流程可审计。
运维流程:工程师 → aTrust 网关登录 → JumpServer 堡垒机登录 → 运维目标系统
然而,在实际日常服务过程中,我们发现频繁出现一个非常典型但严重影响效率的问题:
外部远程运维人员经常忘记自己的 JumpServer 密码,导致无法进入堡垒机,需要客户手动登录后台重置账号密码,严重影响了应急响应与运维流程的连续性。
为解决这个“重复登录 + 密码遗忘”痛点,客户提出了明确需求:
希望通过登录一次 aTrust 后,能够无感进入 JumpServer,完成堡垒机登录(即单点登录)。
但现实情况是——aTrust 使用 OAuth2 协议进行认证,而 JumpServer(社区版 v3.10.18) 仅支持 CAS 协议。两者协议不兼容,无法直接打通。
于是,我们决定为客户DIY一套协议桥接服务:在 aTrust 登录后自动通过中转服务转换为符合 CAS 协议格式的登录流程,完成用户在 JumpServer 的自动认证与登录。
技术方案设计:构建 OAuth2 与 CAS 协议的桥梁
面对 OAuth2 与 CAS 认证机制之间的“语言不通”,我们决定设计一个中间层——协议中转服务。这个服务的目标非常明确:
在用户通过 aTrust 完成 OAuth2 登录认证之后,自动模拟一个符合 CAS 协议的登录流程,将用户无感知地送入 JumpServer。
协议差异挑战
- aTrust 零信任平台 提供的是标准 OAuth2 接口,通过
auth2ssoLogin
获取授权码(code),然后通过getUserInfoByCode
获取用户信息。 - JumpServer 堡垒机 仅接受 CAS 协议认证,包括:
/cas/login
用于跳转和 ticket 生成/cas/serviceValidate
或/cas/p3/serviceValidate
校验 ticket 并返回 XML 格式认证响应
OAuth2 的核心在于 access_token 和授权码机制,而 CAS 依赖的是 ticket 与 service 校验机制。两者在协议模型、流程细节、响应格式等方面完全不同,无法直接互通。
中转服务的职责
我们实现的中转服务需要完成以下任务:
- 接收 JumpServer 的 CAS 登录请求(模拟
/cas/login
接口) - 重定向至 aTrust 登录页面,携带必要的 redirect_url 和签名
- 处理 aTrust 的 OAuth2 回调,获取 code 后调用其 API 获取用户信息
- 为该用户生成一个 CAS ticket,存入临时内存结构,并跳转回 JumpServer
- 响应 JumpServer 对 ticket 的校验请求,返回符合 CAS 协议格式的认证 XML
- 可选支持注销流程,实现
/cas/logout
路由,用于清除缓存和重定向
这一中转服务不依赖任何数据库或外部存储,所有 ticket 信息都使用内存字典保存,并设定自动过期机制,满足 JumpServer 对票据有效期的要求。
技术实现:服务实现细节与关键模块说明
整个中转服务使用 Python 3 编写,基于轻量的 Flask 框架构建,具备部署灵活、逻辑清晰、易于维护等优势。下文将按模块说明关键功能:
1. 路由与协议转换逻辑
服务共暴露 5 个核心路由:
路径 | 说明 |
---|---|
/cas/login | 模拟 CAS 登录入口,拦截 JumpServer 的登录请求 |
/callback | 接收 aTrust 的 OAuth2 授权回调,完成 code 交换 |
/cas/serviceValidate | CAS 2.0 校验接口,响应用户 ticket 认证信息 |
/cas/p3/serviceValidate | CAS 3.0 校验接口,功能相同,响应格式略不同 |
/cas/logout | 兼容 CAS 注销机制,可选清理缓存并跳转服务页 |
登录流程中,/cas/login
接收到来自 JumpServer 的请求后,会拼接 redirect URL 并跳转至 aTrust 的 auth2ssoLogin
接口。用户完成 OAuth2 登录后,aTrust 回调 /callback
,此处会使用提供的 appid、secret、code 进行签名计算并调用其 getUserInfoByCode
接口获取用户标识。
随后生成一个 UUID 票据(ticket),缓存于服务中,并通过带有该 ticket 的 URL 跳转回 JumpServer。JumpServer 随即发起 ticket 校验请求到 /cas/serviceValidate
,中转服务会根据用户信息返回标准 CAS XML 响应,实现整个链路的闭环。
2. 签名生成与参数校验
aTrust 要求 OAuth2 请求带签名参数(sign),该签名基于请求参数及 secret 使用 HMAC-SHA256 方式生成。服务内通过标准 Python 库 hmac
+ hashlib
实现,确保认证请求合法可信,避免伪造。
签名格式为:
sign = BASE64( HMAC-SHA256(appSecret, appId + timestamp + redirectUrl) )
这一签名机制保证了中转服务对接上游 OAuth2 接口的安全性和可靠性。
3. Ticket 票据生成与验证
每一个登录用户会被赋予一个临时 UUID 票据,结构如下:
{
"ticket": "uuid-token",
"username": "actual_username",
"expire_at": timestamp
}
所有票据保存在一个内存级字典 ticket_store
中,在校验接口中比对 ticket 是否有效,并定时清理过期票据。
4. 认证结果格式化输出
CAS 要求返回 XML 格式的认证信息,类似如下:
<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:authenticationSuccess>
<cas:user>username</cas:user>
</cas:authenticationSuccess>
</cas:serviceResponse>
我们根据 JumpServer 的要求实现了 CAS 2.0 与 3.0 两种格式输出,确保兼容。
5. 后台运行与日志管理
服务支持通过 nohup
+ run.sh
脚本形式启动,可自动输出日志至 /var/log/oauth2_cas_gateway/YYYY-MM-DD.log
,并支持每日切割、自动清理旧日志。也可通过 systemd
注册为服务,适合生产环境部署。
6. 主程序代码
from flask import Flask, request, redirect, make_response
import requests
import hashlib
import hmac
import time
import uuid
import urllib3
# 禁用 SSL 警告
urllib3.disable_warnings()
app = Flask(__name__)
# ===== 配置项 =====
ATRUST_BASE = 'https://aTrust客户端接入地址:端口'
APP_ID = '应用中获取'
APP_SECRET = '应用中获取'
CALLBACK_URL = 'http://ip/callback' # 中转服务地址
# ===== Ticket 缓存(内存) =====
ticket_store = {} # {ticket: {"username": ..., "expire": ...}}
TICKET_TTL = 300 # 5分钟有效期
# ===== 生成签名 =====
def generate_signature(appid, code):
raw = f"appid={appid}\ncode={code}"
return hmac.new(APP_SECRET.encode(), raw.encode(), hashlib.sha256).hexdigest()
# ===== 路由定义 =====
@app.route('/cas/login')
def cas_login():
service = request.args.get('service')
if not service:
return 'Missing service parameter', 400
state = str(uuid.uuid4())
redirect_url = f"{CALLBACK_URL}?service={service}&state={state}"
auth_url = (
f"{ATRUST_BASE}/passport/v1/public/auth2ssoLogin"
f"?redirectUrl={redirect_url}&appid={APP_ID}&responseType=code"
)
return redirect(auth_url)
@app.route('/callback')
def callback():
code = request.args.get('code')
service = request.args.get('service')
if not code or not service:
return 'Missing code or service', 400
sign = generate_signature(APP_ID, code)
headers = {"X-SDP-Signature": sign}
params = {"appid": APP_ID, "code": code}
userinfo_url = f"{ATRUST_BASE}/passport/v1/user/getUserInfoByCode"
try:
res = requests.get(userinfo_url, headers=headers, params=params, verify=False, timeout=5)
data = res.json()
except Exception as e:
return f"Error contacting aTrust: {str(e)}", 500
if res.status_code != 200 or data.get("code") != 0:
return f"Failed to retrieve user info: {data.get('message')}", 403
username = data.get("data", {}).get("name")
if not username:
return 'Username not found in user info', 403
ticket = str(uuid.uuid4())
ticket_store[ticket] = {"username": username, "expire": time.time() + TICKET_TTL}
# 修复:正确拼接 ticket
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
parsed = urlparse(service)
qs = parse_qs(parsed.query)
qs['ticket'] = [ticket]
new_query = urlencode(qs, doseq=True)
new_url = urlunparse(parsed._replace(query=new_query))
return redirect(new_url)
@app.route('/cas/p3/serviceValidate')
def cas_p3_service_validate():
return service_validate()
@app.route('/cas/serviceValidate')
def service_validate():
service = request.args.get('service')
ticket = request.args.get('ticket')
if not ticket or ticket not in ticket_store:
return make_response("""
<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationFailure code="INVALID_TICKET">Ticket not found</cas:authenticationFailure>
</cas:serviceResponse>
""", 200, {'Content-Type': 'application/xml'})
entry = ticket_store[ticket]
if time.time() > entry['expire']:
del ticket_store[ticket]
return make_response("""
<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationFailure code="TICKET_EXPIRED">Ticket expired</cas:authenticationFailure>
</cas:serviceResponse>
""", 200, {'Content-Type': 'application/xml'})
username = entry['username']
return make_response(f"""
<cas:serviceResponse xmlns:cas='http://www.yale.edu/t/tp/cas'>
<cas:authenticationSuccess>
<cas:user>{username}</cas:user>
</cas:authenticationSuccess>
</cas:serviceResponse>
""", 200, {'Content-Type': 'application/xml'})
@app.route('/cas/logout')
def cas_logout():
ticket_store.clear()
service = request.args.get('service')
return f"""
<html><body>
<h2>您已成功退出 CAS 登录</h2>
<p><a href="{service}">返回系统</a></p>
</body></html>
"""
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80)
技术总结:服务实现细节与关键模块说明
本案例成功为客户构建了一个 OAuth2 与 CAS 协议之间的桥接服务,解决了两个认证系统协议不兼容的问题,达成以下目标:
- 单点登录体验优化:用户只需登录一次零信任平台(aTrust),即可自动跳转并完成 JumpServer 登录,无需重复输入密码;
- 减少人工干预:彻底消除了运维人员因密码遗忘而频繁找客户重置账号的操作;
- 协议兼容性保障:中转服务完全模拟 CAS 协议行为,兼容 JumpServer 原生登录机制;
- 无侵入部署:中转服务独立部署,不需要修改 JumpServer 或 aTrust 任一系统;
- 轻量稳定:使用 Python + Flask 实现,启动快速,资源占用低,支持 systemd 管理或 nohup 自运行。