JobData/app/core/proxy_rule.py

66 lines
2.1 KiB
Python

import json
import re
from typing import Any, Dict, List
from app.models.cleaning import ProxyProvider
def _resolve_path(data: Any, path: str) -> Any:
if not path:
return data
current = data
for part in path.split("."):
if isinstance(current, list):
try:
index = int(part)
except ValueError:
return None
if index < 0 or index >= len(current):
return None
current = current[index]
elif isinstance(current, dict):
current = current.get(part)
else:
return None
return current
def parse_proxies(raw_body: str, provider: ProxyProvider) -> List[str]:
mode = provider.mode
template = provider.template or "{ip}:{port}"
result: List[str] = []
if mode == "json":
obj = json.loads(raw_body)
items = _resolve_path(obj, provider.list_path) if provider.list_path else obj
if items is None:
return result
if not isinstance(items, list):
items = [items]
for item in items:
context: Dict[str, Any] = {}
if provider.ip_path:
context["ip"] = _resolve_path(item, provider.ip_path)
if provider.port_path:
context["port"] = _resolve_path(item, provider.port_path)
if provider.username_path:
context["username"] = _resolve_path(item, provider.username_path)
if provider.password_path:
context["password"] = _resolve_path(item, provider.password_path)
result.append(template.format(**context))
return result
if mode == "text":
if not provider.pattern:
return result
pattern = re.compile(provider.pattern)
for match in pattern.finditer(raw_body):
context = match.groupdict()
result.append(template.format(**context))
return result
return result
async def parse_proxies_with_provider(provider_id: int, raw_body: str) -> List[str]:
provider = await ProxyProvider.get(id=provider_id)
return parse_proxies(raw_body, provider)