
Implementing Secure Authentication with JWT in Python
Understanding JWT Security Fundamentals
Before diving into implementation details, it's essential to understand JWT security principles. A secure JWT implementation requires careful attention to signing algorithms, secret management, token expiration, and payload validation. The header should specify a strong signing algorithm such as HS256 or RS256, never use none algorithms, and ensure secrets are properly managed using environment variables or secure key management systems.
The payload should contain minimal required information and never include sensitive data. Tokens must have appropriate expiration times, and refresh tokens should be implemented for long-lived sessions. Additionally, secure HTTP headers should be configured to prevent XSS and CSRF attacks.
Core Implementation with PyJWT
The PyJWT library provides a robust foundation for JWT operations in Python. Here's a secure implementation pattern:
import jwt
import datetime
from typing import Dict, Any
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
class SecureJWT:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.token_expiry = datetime.timedelta(hours=1)
def create_token(self, payload: Dict[str, Any]) -> str:
"""Create a secure JWT token with proper expiration"""
payload['exp'] = datetime.datetime.utcnow() + self.token_expiry
payload['iat'] = datetime.datetime.utcnow()
payload['nbf'] = datetime.datetime.utcnow()
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Dict[str, Any]:
"""Verify and decode a JWT token"""
try:
decoded = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_signature": True}
)
return decoded
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
# Usage example
jwt_handler = SecureJWT("your-secure-secret-key-here")
user_payload = {
"user_id": 12345,
"username": "john_doe",
"role": "user"
}
token = jwt_handler.create_token(user_payload)
print(f"Generated token: {token}")Advanced Security Considerations
Token Refresh Patterns
Implementing token refresh mechanisms prevents session hijacking while maintaining user convenience:
import secrets
import hashlib
from datetime import datetime, timedelta
class RefreshTokenManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.refresh_expiry = timedelta(days=7)
def generate_refresh_token(self, user_id: int) -> str:
"""Generate a secure refresh token with unique identifier"""
token_data = {
"user_id": user_id,
"token_id": secrets.token_urlsafe(32),
"created_at": datetime.utcnow().isoformat()
}
return jwt.encode(
token_data,
self.secret_key,
algorithm="HS256"
)
def validate_refresh_token(self, token: str) -> Dict[str, Any]:
"""Validate refresh token with expiration check"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=["HS256"]
)
# Check if refresh token has expired
created_at = datetime.fromisoformat(payload["created_at"])
if datetime.utcnow() > created_at + self.refresh_expiry:
raise ValueError("Refresh token expired")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Refresh token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid refresh token")
# Usage
refresh_manager = RefreshTokenManager("refresh-secret-key")
refresh_token = refresh_manager.generate_refresh_token(12345)Security Vulnerability Prevention
Mitigating Common Attack Vectors
| Attack Vector | Prevention Method | Implementation |
|---|---|---|
| Weak Algorithm | Use HS256 or RS256 | Specify algorithm in encode/decode |
| Token Replay | Add unique identifiers | Include token_id in payload |
| Signature Bypass | Verify algorithm | Use options parameter |
| Information Disclosure | Avoid sensitive data | Keep payload minimal |
# Secure token creation with additional protections
def create_secure_token(self, payload: Dict[str, Any]) -> str:
# Add unique token identifier
payload['jti'] = secrets.token_urlsafe(32)
# Set explicit expiration
payload['exp'] = datetime.datetime.utcnow() + self.token_expiry
# Add creation timestamp
payload['iat'] = datetime.datetime.utcnow()
# Add not-before timestamp
payload['nbf'] = datetime.datetime.utcnow()
# Add audience and issuer for additional security
payload['aud'] = "your-app-name"
payload['iss'] = "authentication-service"
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)Environment-Based Configuration
Secure JWT implementation requires proper configuration management:
import os
from dotenv import load_dotenv
load_dotenv()
class JWTConfig:
SECRET_KEY = os.getenv('JWT_SECRET_KEY')
ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
EXPIRY_HOURS = int(os.getenv('JWT_EXPIRY_HOURS', '1'))
REFRESH_EXPIRY_DAYS = int(os.getenv('JWT_REFRESH_EXPIRY_DAYS', '7'))
@classmethod
def validate_config(cls):
if not cls.SECRET_KEY:
raise ValueError("JWT_SECRET_KEY environment variable required")
if cls.ALGORITHM not in ['HS256', 'HS384', 'HS512']:
raise ValueError("Unsupported JWT algorithm")
# Configuration validation
JWTConfig.validate_config()HTTP Header Security
Proper HTTP header configuration prevents common attacks:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.after_request
def add_security_headers(response):
"""Add security headers to prevent XSS and CSRF"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
@app.route('/login', methods=['POST'])
def login():
# Authentication logic here
token = jwt_handler.create_token({"user_id": 123})
response = jsonify({"token": token})
response.headers['Set-Cookie'] = (
f"auth_token={token}; "
f"HttpOnly; "
f"Secure; "
f"SameSite=Strict; "
f"Max-Age={3600}"
)
return responseToken Blacklisting and Revocation
Implementing token blacklisting prevents compromised tokens from being used:
import redis
from typing import Set
class TokenBlacklist:
def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.blacklist_ttl = 3600 # 1 hour
def add_to_blacklist(self, token: str, expiry: int = 3600):
"""Add token to blacklist with expiration"""
# Extract token ID from payload
try:
payload = jwt.decode(token, JWTConfig.SECRET_KEY, algorithms=[JWTConfig.ALGORITHM])
token_id = payload.get('jti')
if token_id:
self.redis_client.setex(f"blacklist:{token_id}", expiry, "1")
except:
pass
def is_blacklisted(self, token: str) -> bool:
"""Check if token is blacklisted"""
try:
payload = jwt.decode(token, JWTConfig.SECRET_KEY, algorithms=[JWTConfig.ALGORITHM])
token_id = payload.get('jti')
if token_id:
return self.redis_client.exists(f"blacklist:{token_id}") == 1
except:
pass
return False
# Usage in authentication middleware
blacklist = TokenBlacklist()
def authenticate_request():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if blacklist.is_blacklisted(token):
return False, "Token has been revoked"
try:
payload = jwt_handler.verify_token(token)
return True, payload
except ValueError as e:
return False, str(e)Testing Secure JWT Implementation
Comprehensive testing ensures security:
import unittest
class TestJWTSecurity(unittest.TestCase):
def setUp(self):
self.jwt_handler = SecureJWT("test-secret-key")
def test_token_creation(self):
payload = {"user_id": 123, "username": "test"}
token = self.jwt_handler.create_token(payload)
self.assertIsInstance(token, str)
self.assertNotEqual(token, "")
def test_token_verification(self):
payload = {"user_id": 123, "username": "test"}
token = self.jwt_handler.create_token(payload)
verified_payload = self.jwt_handler.verify_token(token)
self.assertEqual(verified_payload["user_id"], 123)
def test_expired_token(self):
# This would require mocking time or using a very short expiry
pass
if __name__ == '__main__':
unittest.main()