需求背景:跨协议单点登录的挑战

这是我们公司一位年度运维服务客户的真实需求场景与服务案例。

该用户在内部 IT 运维管理上具有极其严格的安全规范,所有的远程运维操作,无论是应用系统、网络设备还是平台级资源,必须先通过企业零信任网关(aTrust)接入安全网络,再通过堡垒机进行身份认证与运维操作,并确保全流程可审计。

运维流程:工程师 → aTrust 网关登录 → JumpServer 堡垒机登录 → 运维目标系统

然而,在实际日常服务过程中,我们发现频繁出现一个非常典型但严重影响效率的问题:

外部远程运维人员经常忘记自己的 JumpServer 密码,导致无法进入堡垒机,需要客户手动登录后台重置账号密码,严重影响了应急响应与运维流程的连续性。

为解决这个“重复登录 + 密码遗忘”痛点,客户提出了明确需求:

希望通过登录一次 aTrust 后,能够无感进入 JumpServer,完成堡垒机登录(即单点登录)

但现实情况是——aTrust 使用 OAuth2 协议进行认证,而 JumpServer(社区版 v3.10.18) 仅支持 CAS 协议。两者协议不兼容,无法直接打通。

于是,我们决定为客户DIY一套协议桥接服务:在 aTrust 登录后自动通过中转服务转换为符合 CAS 协议格式的登录流程,完成用户在 JumpServer 的自动认证与登录。

技术方案设计:构建 OAuth2 与 CAS 协议的桥梁

面对 OAuth2 与 CAS 认证机制之间的“语言不通”,我们决定设计一个中间层——协议中转服务。这个服务的目标非常明确:

在用户通过 aTrust 完成 OAuth2 登录认证之后,自动模拟一个符合 CAS 协议的登录流程,将用户无感知地送入 JumpServer。

协议差异挑战

  • aTrust 零信任平台 提供的是标准 OAuth2 接口,通过 auth2ssoLogin 获取授权码(code),然后通过 getUserInfoByCode 获取用户信息。
  • JumpServer 堡垒机 仅接受 CAS 协议认证,包括:
    • /cas/login 用于跳转和 ticket 生成
    • /cas/serviceValidate/cas/p3/serviceValidate 校验 ticket 并返回 XML 格式认证响应

OAuth2 的核心在于 access_token 和授权码机制,而 CAS 依赖的是 ticket 与 service 校验机制。两者在协议模型、流程细节、响应格式等方面完全不同,无法直接互通。

中转服务的职责

我们实现的中转服务需要完成以下任务:

  1. 接收 JumpServer 的 CAS 登录请求(模拟 /cas/login 接口)
  2. 重定向至 aTrust 登录页面,携带必要的 redirect_url 和签名
  3. 处理 aTrust 的 OAuth2 回调,获取 code 后调用其 API 获取用户信息
  4. 为该用户生成一个 CAS ticket,存入临时内存结构,并跳转回 JumpServer
  5. 响应 JumpServer 对 ticket 的校验请求,返回符合 CAS 协议格式的认证 XML
  6. 可选支持注销流程,实现 /cas/logout 路由,用于清除缓存和重定向

这一中转服务不依赖任何数据库或外部存储,所有 ticket 信息都使用内存字典保存,并设定自动过期机制,满足 JumpServer 对票据有效期的要求。

技术实现:服务实现细节与关键模块说明

整个中转服务使用 Python 3 编写,基于轻量的 Flask 框架构建,具备部署灵活、逻辑清晰、易于维护等优势。下文将按模块说明关键功能:

1. 路由与协议转换逻辑

服务共暴露 5 个核心路由:

路径说明
/cas/login模拟 CAS 登录入口,拦截 JumpServer 的登录请求
/callback接收 aTrust 的 OAuth2 授权回调,完成 code 交换
/cas/serviceValidateCAS 2.0 校验接口,响应用户 ticket 认证信息
/cas/p3/serviceValidateCAS 3.0 校验接口,功能相同,响应格式略不同
/cas/logout兼容 CAS 注销机制,可选清理缓存并跳转服务页

登录流程中,/cas/login 接收到来自 JumpServer 的请求后,会拼接 redirect URL 并跳转至 aTrust 的 auth2ssoLogin 接口。用户完成 OAuth2 登录后,aTrust 回调 /callback,此处会使用提供的 appid、secret、code 进行签名计算并调用其 getUserInfoByCode 接口获取用户标识。

随后生成一个 UUID 票据(ticket),缓存于服务中,并通过带有该 ticket 的 URL 跳转回 JumpServer。JumpServer 随即发起 ticket 校验请求到 /cas/serviceValidate,中转服务会根据用户信息返回标准 CAS XML 响应,实现整个链路的闭环。

2. 签名生成与参数校验

aTrust 要求 OAuth2 请求带签名参数(sign),该签名基于请求参数及 secret 使用 HMAC-SHA256 方式生成。服务内通过标准 Python 库 hmac + hashlib 实现,确保认证请求合法可信,避免伪造。

签名格式为:

sign = BASE64( HMAC-SHA256(appSecret, appId + timestamp + redirectUrl) )

这一签名机制保证了中转服务对接上游 OAuth2 接口的安全性和可靠性。

3. Ticket 票据生成与验证

每一个登录用户会被赋予一个临时 UUID 票据,结构如下:

{
"ticket": "uuid-token",
"username": "actual_username",
"expire_at": timestamp
}

所有票据保存在一个内存级字典 ticket_store 中,在校验接口中比对 ticket 是否有效,并定时清理过期票据。

4. 认证结果格式化输出

CAS 要求返回 XML 格式的认证信息,类似如下:

<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:authenticationSuccess>
<cas:user>username</cas:user>
</cas:authenticationSuccess>
</cas:serviceResponse>

我们根据 JumpServer 的要求实现了 CAS 2.0 与 3.0 两种格式输出,确保兼容。

5. 后台运行与日志管理

服务支持通过 nohup + run.sh 脚本形式启动,可自动输出日志至 /var/log/oauth2_cas_gateway/YYYY-MM-DD.log,并支持每日切割、自动清理旧日志。也可通过 systemd 注册为服务,适合生产环境部署。

6. 主程序代码

from flask import Flask, request, redirect, make_response
import requests
import hashlib
import hmac
import time
import uuid
import urllib3

# 禁用 SSL 警告
urllib3.disable_warnings()

app = Flask(__name__)

# ===== 配置项 =====
ATRUST_BASE = 'https://aTrust客户端接入地址:端口'
APP_ID = '应用中获取'
APP_SECRET = '应用中获取'
CALLBACK_URL = 'http://ip/callback'  # 中转服务地址

# ===== Ticket 缓存(内存) =====
ticket_store = {}  # {ticket: {"username": ..., "expire": ...}}
TICKET_TTL = 300  # 5分钟有效期

# ===== 生成签名 =====
def generate_signature(appid, code):
    raw = f"appid={appid}\ncode={code}"
    return hmac.new(APP_SECRET.encode(), raw.encode(), hashlib.sha256).hexdigest()

# ===== 路由定义 =====

@app.route('/cas/login')
def cas_login():
    service = request.args.get('service')
    if not service:
        return 'Missing service parameter', 400

    state = str(uuid.uuid4())
    redirect_url = f"{CALLBACK_URL}?service={service}&state={state}"
    auth_url = (
        f"{ATRUST_BASE}/passport/v1/public/auth2ssoLogin"
        f"?redirectUrl={redirect_url}&appid={APP_ID}&responseType=code"
    )
    return redirect(auth_url)

@app.route('/callback')
def callback():
    code = request.args.get('code')
    service = request.args.get('service')
    if not code or not service:
        return 'Missing code or service', 400

    sign = generate_signature(APP_ID, code)
    headers = {"X-SDP-Signature": sign}
    params = {"appid": APP_ID, "code": code}
    userinfo_url = f"{ATRUST_BASE}/passport/v1/user/getUserInfoByCode"

    try:
        res = requests.get(userinfo_url, headers=headers, params=params, verify=False, timeout=5)
        data = res.json()
    except Exception as e:
        return f"Error contacting aTrust: {str(e)}", 500

    if res.status_code != 200 or data.get("code") != 0:
        return f"Failed to retrieve user info: {data.get('message')}", 403

    username = data.get("data", {}).get("name")
    if not username:
        return 'Username not found in user info', 403

    ticket = str(uuid.uuid4())
    ticket_store[ticket] = {"username": username, "expire": time.time() + TICKET_TTL}

    # 修复:正确拼接 ticket
    from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
    parsed = urlparse(service)
    qs = parse_qs(parsed.query)
    qs['ticket'] = [ticket]
    new_query = urlencode(qs, doseq=True)
    new_url = urlunparse(parsed._replace(query=new_query))
    return redirect(new_url)

@app.route('/cas/p3/serviceValidate')
def cas_p3_service_validate():
    return service_validate()

@app.route('/cas/serviceValidate')
def service_validate():
    service = request.args.get('service')
    ticket = request.args.get('ticket')

    if not ticket or ticket not in ticket_store:
        return make_response("""
            <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
                <cas:authenticationFailure code="INVALID_TICKET">Ticket not found</cas:authenticationFailure>
            </cas:serviceResponse>
        """, 200, {'Content-Type': 'application/xml'})

    entry = ticket_store[ticket]
    if time.time() > entry['expire']:
        del ticket_store[ticket]
        return make_response("""
            <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
                <cas:authenticationFailure code="TICKET_EXPIRED">Ticket expired</cas:authenticationFailure>
            </cas:serviceResponse>
        """, 200, {'Content-Type': 'application/xml'})

    username = entry['username']
    return make_response(f"""
        <cas:serviceResponse xmlns:cas='http://www.yale.edu/t/tp/cas'>
            <cas:authenticationSuccess>
                <cas:user>{username}</cas:user>
            </cas:authenticationSuccess>
        </cas:serviceResponse>
    """, 200, {'Content-Type': 'application/xml'})

@app.route('/cas/logout')
def cas_logout():
    ticket_store.clear()
    service = request.args.get('service')
    return f"""
        <html><body>
        <h2>您已成功退出 CAS 登录</h2>
        <p><a href="{service}">返回系统</a></p>
        </body></html>
    """

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80)

技术总结:服务实现细节与关键模块说明

本案例成功为客户构建了一个 OAuth2 与 CAS 协议之间的桥接服务,解决了两个认证系统协议不兼容的问题,达成以下目标:

  • 单点登录体验优化:用户只需登录一次零信任平台(aTrust),即可自动跳转并完成 JumpServer 登录,无需重复输入密码;
  • 减少人工干预:彻底消除了运维人员因密码遗忘而频繁找客户重置账号的操作;
  • 协议兼容性保障:中转服务完全模拟 CAS 协议行为,兼容 JumpServer 原生登录机制;
  • 无侵入部署:中转服务独立部署,不需要修改 JumpServer 或 aTrust 任一系统;
  • 轻量稳定:使用 Python + Flask 实现,启动快速,资源占用低,支持 systemd 管理或 nohup 自运行。

相关新闻

联系我们

联系我们

400-0512-768

邮件:support@sworditsys.com

工作时间:周一至周五 8:00 - 21:00

分享本页
返回顶部