"""
DualShield API client using OAuth 2.0 Client Credentials Flow.
The client fetches a Bearer token from /das5/rest/oauth/token on first use
and attaches it to all subsequent requests. The token is refreshed
automatically 60 seconds before it expires.
Requires: pip install requests
"""
import time
import requests
import urllib3
# Suppress InsecureRequestWarning for self-signed server certs
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
APP_CONTEXT = "/das5/rest/"
class DualShieldClient:
def __init__(self, host: str, port: int, client_id: str, client_secret: str):
self.base_url = f"https://{host}:{port}"
self.client_id = client_id
self.client_secret = client_secret
self.headers = {"Content-Type": "application/json"}
self._access_token: str | None = None
self._token_expiry: float = 0.0 # Unix timestamp
# -------------------------------------------------------------------------
# Token management
# -------------------------------------------------------------------------
def _ensure_valid_token(self):
if self._access_token is None or time.time() > self._token_expiry - 60:
self._fetch_token()
def _fetch_token(self):
body = {"client_id": self.client_id, "client_secret": self.client_secret}
response = requests.post(
self.base_url + APP_CONTEXT + "oauth/token",
json=body,
headers=self.headers,
verify=False,
)
response.raise_for_status()
data = response.json()
if data.get("error", 0) != 0:
raise RuntimeError(f"OAuth token request failed: {data.get('message')}")
self._access_token = data["access_token"]
self._token_expiry = time.time() + data["expires_in"]
# -------------------------------------------------------------------------
# HTTP execution
# -------------------------------------------------------------------------
def execute(self, method: str, body: dict) -> dict:
self._ensure_valid_token()
auth_headers = {**self.headers, "Authorization": f"Bearer {self._access_token}"}
response = requests.post(
self.base_url + APP_CONTEXT + method,
json=body,
headers=auth_headers,
verify=False,
)
response.raise_for_status()
return response.json()
@staticmethod
def _user(username: str, domain: str) -> dict:
return {"loginName": username, "domain.name": domain}
# -------------------------------------------------------------------------
# API methods
# -------------------------------------------------------------------------
def hello(self) -> dict:
return self.execute("auth/hello", {})
def verify_spass(self, username: str, domain: str, password: str) -> dict:
return self.execute("auth/verify", {
"user": self._user(username, domain),
"credential": {"method": "SPASS", "password": password},
})
def verify_otp(self, username: str, domain: str, otp: str) -> dict:
return self.execute("auth/verify", {
"user": self._user(username, domain),
"credential": {"method": "OTP", "password": otp},
})
def send_otp(self, username: str, domain: str) -> dict:
return self.execute("auth/sendOTP", {
"user": self._user(username, domain),
"credential": {"method": "OTPoD"},
"options": {"channel": "SMS"},
})
def verify_otpod(self, username: str, domain: str, otp: str) -> dict:
return self.execute("auth/verify", {
"user": self._user(username, domain),
"credential": {"method": "OTPoD", "password": otp},
})
# -------------------------------------------------------------------------
# Example usage
# -------------------------------------------------------------------------
if __name__ == "__main__":
import json
host = "your-dualshield-server.example.com"
port = 8071
client_id = "example-dea0b139e7bb45bda7054b39b00e021e"
client_secret = "example-secret-f07505e427f547418a7adeb394a69d29"
domain = "YourDomain"
username = "testuser"
client = DualShieldClient(host, port, client_id, client_secret)
# 1. Hello
print("hello: ", json.dumps(client.hello(), indent=2))
# 2. Static password
print("spass: ", json.dumps(client.verify_spass(username, domain, "s3cr3t"), indent=2))
# 3. OTP (SafeID)
print("otp: ", json.dumps(client.verify_otp(username, domain, "123456"), indent=2))
# 4. On-demand OTP – send first, then verify
print("sendOTP: ", json.dumps(client.send_otp(username, domain), indent=2))
otp = input("Enter the OTP you received: ")
print("otpod: ", json.dumps(client.verify_otpod(username, domain, otp), indent=2))
Overview
Content Tools