test(01-02): write sign algorithm unit tests for crawler_core
- Add tests/crawler_core/test_boss_sign.py: 13 tests for BossSign, _compute_checksum, _generate_uuid - Add tests/crawler_core/test_qcwy_sign.py: 10 tests for Job51Sign and SIGN_KEY - Add tests/crawler_core/test_zhilian_sign.py: 13 tests for ZhilianSign - Add conftest.py at project root to add project root to sys.path - Update pyproject.toml with [tool.pytest.ini_options] pythonpath config - Fix crawler_core/__init__.py: wrap heavy-dep imports in try/except so sign subpackages are importable in lightweight envs without requests_go installed - Remove tests/crawler_core/__init__.py to prevent namespace shadowing of crawler_core package
This commit is contained in:
parent
d7c8bec287
commit
333a6d155e
4
conftest.py
Normal file
4
conftest.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
# Ensure project root is on sys.path for crawler_core imports
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||||
@ -84,6 +84,9 @@ lint.ignore = [
|
|||||||
"F405",
|
"F405",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
pythonpath = ["."]
|
||||||
|
|
||||||
[tool.aerich]
|
[tool.aerich]
|
||||||
tortoise_orm = "app.settings.TORTOISE_ORM"
|
tortoise_orm = "app.settings.TORTOISE_ORM"
|
||||||
location = "./migrations"
|
location = "./migrations"
|
||||||
|
|||||||
84
tests/crawler_core/test_boss_sign.py
Normal file
84
tests/crawler_core/test_boss_sign.py
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
"""Unit tests for crawler_core.boss.sign — BossSign and helper functions.
|
||||||
|
|
||||||
|
All tests are pure function assertions: no HTTP, no network, no mocks.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||||
|
|
||||||
|
import re
|
||||||
|
import pytest
|
||||||
|
from crawler_core.boss.sign import BossSign, _compute_checksum, _generate_uuid, _CHARS
|
||||||
|
|
||||||
|
|
||||||
|
class TestBossSignGenerateTraceid:
|
||||||
|
def test_traceid_format(self):
|
||||||
|
tid = BossSign.generate_traceid()
|
||||||
|
assert re.match(r'^M-W[0-9a-f]{13}[0-9a-zA-Z]{6}[0-9a-zA-Z]{3}$', tid), \
|
||||||
|
f"Traceid format wrong: {tid}"
|
||||||
|
|
||||||
|
def test_traceid_length(self):
|
||||||
|
tid = BossSign.generate_traceid()
|
||||||
|
assert len(tid) == 25, f"Expected 25 chars, got {len(tid)}: {tid}"
|
||||||
|
|
||||||
|
def test_traceid_custom_prefix(self):
|
||||||
|
tid = BossSign.generate_traceid(prefix="X-Y")
|
||||||
|
assert tid.startswith("X-Y"), f"Expected X-Y prefix, got: {tid}"
|
||||||
|
|
||||||
|
def test_traceid_uniqueness(self):
|
||||||
|
t1 = BossSign.generate_traceid()
|
||||||
|
t2 = BossSign.generate_traceid()
|
||||||
|
assert t1 != t2, "Two calls should return different traceids"
|
||||||
|
|
||||||
|
def test_bosssign_init_defaults(self):
|
||||||
|
sign = BossSign()
|
||||||
|
assert sign.mpt == ""
|
||||||
|
assert sign.wt2 == ""
|
||||||
|
|
||||||
|
def test_bosssign_init_with_tokens(self):
|
||||||
|
sign = BossSign(mpt="mpt_token", wt2="wt2_token")
|
||||||
|
assert sign.mpt == "mpt_token"
|
||||||
|
assert sign.wt2 == "wt2_token"
|
||||||
|
|
||||||
|
|
||||||
|
class TestComputeChecksum:
|
||||||
|
def test_checksum_length(self):
|
||||||
|
checksum = _compute_checksum("1234567890abc456789") # 19 chars
|
||||||
|
assert len(checksum) == 3, f"Expected 3 chars, got {len(checksum)}"
|
||||||
|
|
||||||
|
def test_checksum_chars_in_base62(self):
|
||||||
|
checksum = _compute_checksum("1234567890abc456789")
|
||||||
|
for ch in checksum:
|
||||||
|
assert ch in _CHARS, f"Char {ch!r} not in base62 set"
|
||||||
|
|
||||||
|
def test_checksum_deterministic(self):
|
||||||
|
uuid_str = "1234567890abc456789"
|
||||||
|
c1 = _compute_checksum(uuid_str)
|
||||||
|
c2 = _compute_checksum(uuid_str)
|
||||||
|
assert c1 == c2, "Same input must produce same checksum"
|
||||||
|
|
||||||
|
def test_checksum_differs_for_different_input(self):
|
||||||
|
# Different inputs should (almost always) produce different checksums
|
||||||
|
c1 = _compute_checksum("1234567890abc456789")
|
||||||
|
c2 = _compute_checksum("9876543210xyz456789")
|
||||||
|
# Not guaranteed to differ but extremely likely
|
||||||
|
# We test at least that they are valid 3-char strings
|
||||||
|
assert len(c1) == 3 and len(c2) == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestGenerateUuid:
|
||||||
|
def test_generate_uuid_length(self):
|
||||||
|
uuid = _generate_uuid()
|
||||||
|
assert len(uuid) == 19, f"Expected 19 chars, got {len(uuid)}: {uuid}"
|
||||||
|
|
||||||
|
def test_generate_uuid_hex_prefix(self):
|
||||||
|
uuid = _generate_uuid()
|
||||||
|
hex_part = uuid[:13]
|
||||||
|
assert re.match(r'^[0-9a-f]{13}$', hex_part), \
|
||||||
|
f"First 13 chars should be hex: {hex_part}"
|
||||||
|
|
||||||
|
def test_generate_uuid_base62_suffix(self):
|
||||||
|
uuid = _generate_uuid()
|
||||||
|
rand_part = uuid[13:]
|
||||||
|
for ch in rand_part:
|
||||||
|
assert ch in _CHARS, f"Char {ch!r} in random suffix not in base62"
|
||||||
76
tests/crawler_core/test_qcwy_sign.py
Normal file
76
tests/crawler_core/test_qcwy_sign.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
"""Unit tests for crawler_core.qcwy.sign — Job51Sign.
|
||||||
|
|
||||||
|
All tests are pure function assertions: no HTTP, no network, no mocks.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||||
|
|
||||||
|
import re
|
||||||
|
import pytest
|
||||||
|
from crawler_core.qcwy.sign import Job51Sign, SIGN_KEY
|
||||||
|
|
||||||
|
|
||||||
|
class TestJob51SignInit:
|
||||||
|
def test_default_sign_key(self):
|
||||||
|
signer = Job51Sign()
|
||||||
|
assert signer.sign_key == SIGN_KEY
|
||||||
|
assert len(SIGN_KEY) == 64 # 64-char hex key
|
||||||
|
|
||||||
|
def test_custom_sign_key(self):
|
||||||
|
custom_key = "a" * 64
|
||||||
|
signer = Job51Sign(sign_key=custom_key)
|
||||||
|
assert signer.sign_key == custom_key
|
||||||
|
|
||||||
|
|
||||||
|
class TestJob51SignBuildSignPath:
|
||||||
|
def setup_method(self):
|
||||||
|
self.signer = Job51Sign()
|
||||||
|
|
||||||
|
def test_returns_tuple_of_two_strings(self):
|
||||||
|
result = self.signer.build_sign_path("open/test")
|
||||||
|
assert isinstance(result, tuple)
|
||||||
|
assert len(result) == 2
|
||||||
|
assert all(isinstance(s, str) for s in result)
|
||||||
|
|
||||||
|
def test_get_path_format(self):
|
||||||
|
path, sign = self.signer.build_sign_path("open/test", "GET")
|
||||||
|
assert path.startswith("/open/test?api_key=51job×tamp="), \
|
||||||
|
f"Path format wrong: {path}"
|
||||||
|
|
||||||
|
def test_sign_hex_length(self):
|
||||||
|
_, sign = self.signer.build_sign_path("open/test")
|
||||||
|
assert len(sign) == 64, f"Sign should be 64-char hex, got {len(sign)}: {sign}"
|
||||||
|
|
||||||
|
def test_sign_hex_format(self):
|
||||||
|
_, sign = self.signer.build_sign_path("open/test")
|
||||||
|
assert re.match(r'^[0-9a-f]{64}$', sign), f"Sign not hex: {sign}"
|
||||||
|
|
||||||
|
def test_get_vs_post_different_sign(self):
|
||||||
|
_, get_sign = self.signer.build_sign_path("open/test", "GET")
|
||||||
|
_, post_sign = self.signer.build_sign_path("open/test", "POST", body={"k": "v"})
|
||||||
|
assert get_sign != post_sign, "GET and POST should produce different signatures"
|
||||||
|
|
||||||
|
def test_get_with_params_includes_params_in_path(self):
|
||||||
|
path, _ = self.signer.build_sign_path("open/test", "GET", params={"city": "shanghai"})
|
||||||
|
assert "city" in path and "shanghai" in path, \
|
||||||
|
f"Params should appear in path: {path}"
|
||||||
|
|
||||||
|
def test_sign_key_in_path(self):
|
||||||
|
path, _ = self.signer.build_sign_path("open/jobs")
|
||||||
|
assert "api_key=51job" in path, f"api_key=51job missing from path: {path}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestJob51SignGenerateUuid:
|
||||||
|
def test_generate_uuid_is_string(self):
|
||||||
|
uuid = Job51Sign.generate_uuid()
|
||||||
|
assert isinstance(uuid, str)
|
||||||
|
|
||||||
|
def test_generate_uuid_length(self):
|
||||||
|
uuid = Job51Sign.generate_uuid()
|
||||||
|
# 13-char ms timestamp + 10-char random int = 23 chars
|
||||||
|
assert len(uuid) == 23, f"Expected 23 chars, got {len(uuid)}: {uuid}"
|
||||||
|
|
||||||
|
def test_generate_uuid_numeric(self):
|
||||||
|
uuid = Job51Sign.generate_uuid()
|
||||||
|
assert uuid.isdigit(), f"UUID should be all digits: {uuid}"
|
||||||
116
tests/crawler_core/test_zhilian_sign.py
Normal file
116
tests/crawler_core/test_zhilian_sign.py
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
"""Unit tests for crawler_core.zhilian.sign — ZhilianSign.
|
||||||
|
|
||||||
|
All tests are pure function assertions: no HTTP, no network, no mocks.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||||
|
|
||||||
|
import re
|
||||||
|
import pytest
|
||||||
|
from crawler_core.zhilian.sign import ZhilianSign
|
||||||
|
|
||||||
|
EXPECTED_HEADER_KEYS = {
|
||||||
|
"x-zp-at", "x-zp-rt", "x-zp-action-id", "x-zp-page-code",
|
||||||
|
"x-zp-version", "x-zp-channel", "x-zp-platform", "x-zp-device-id",
|
||||||
|
"x-zp-business-system",
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECTED_PARAM_KEYS = {"at", "rt", "channel", "platform", "version", "d"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestZhilianSignInit:
|
||||||
|
def test_defaults(self):
|
||||||
|
sign = ZhilianSign()
|
||||||
|
assert sign.at == ""
|
||||||
|
assert sign.rt == ""
|
||||||
|
assert sign.version == "4.1.259"
|
||||||
|
assert sign.channel == "wxxiaochengxu"
|
||||||
|
assert sign.platform == "12"
|
||||||
|
assert sign.device_id # auto-generated, not empty
|
||||||
|
|
||||||
|
def test_custom_tokens(self):
|
||||||
|
sign = ZhilianSign(at="at_token", rt="rt_token")
|
||||||
|
assert sign.at == "at_token"
|
||||||
|
assert sign.rt == "rt_token"
|
||||||
|
|
||||||
|
def test_custom_device_id(self):
|
||||||
|
sign = ZhilianSign(device_id="CUSTOM-DEVICE-ID")
|
||||||
|
assert sign.device_id == "CUSTOM-DEVICE-ID"
|
||||||
|
|
||||||
|
def test_auto_device_id_is_uuid4_format(self):
|
||||||
|
sign = ZhilianSign()
|
||||||
|
uuid_pattern = r'^[0-9A-F]{8}-[0-9A-F]{4}-4[0-9A-F]{3}-[89AB][0-9A-F]{3}-[0-9A-F]{12}$'
|
||||||
|
assert re.match(uuid_pattern, sign.device_id), \
|
||||||
|
f"device_id not UUID4 format: {sign.device_id}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestZhilianSignHeaders:
|
||||||
|
def setup_method(self):
|
||||||
|
self.sign = ZhilianSign(at="at123", rt="rt456")
|
||||||
|
|
||||||
|
def test_keys_exactly_nine(self):
|
||||||
|
headers = self.sign.sign_headers()
|
||||||
|
assert set(headers.keys()) == EXPECTED_HEADER_KEYS, \
|
||||||
|
f"Header keys wrong: {set(headers.keys())}"
|
||||||
|
|
||||||
|
def test_business_system_is_73(self):
|
||||||
|
headers = self.sign.sign_headers()
|
||||||
|
assert headers["x-zp-business-system"] == "73"
|
||||||
|
|
||||||
|
def test_tokens_reflected(self):
|
||||||
|
headers = self.sign.sign_headers()
|
||||||
|
assert headers["x-zp-at"] == "at123"
|
||||||
|
assert headers["x-zp-rt"] == "rt456"
|
||||||
|
|
||||||
|
def test_action_id_is_uuid4_format(self):
|
||||||
|
headers = self.sign.sign_headers()
|
||||||
|
action_id = headers["x-zp-action-id"]
|
||||||
|
uuid_pattern = r'^[0-9A-F]{8}-[0-9A-F]{4}-4[0-9A-F]{3}-[89AB][0-9A-F]{3}-[0-9A-F]{12}$'
|
||||||
|
assert re.match(uuid_pattern, action_id), \
|
||||||
|
f"action_id not UUID4 format: {action_id}"
|
||||||
|
|
||||||
|
def test_action_id_unique_per_call(self):
|
||||||
|
h1 = self.sign.sign_headers()
|
||||||
|
h2 = self.sign.sign_headers()
|
||||||
|
assert h1["x-zp-action-id"] != h2["x-zp-action-id"], \
|
||||||
|
"action_id must be freshly generated on each call"
|
||||||
|
|
||||||
|
def test_device_id_in_headers(self):
|
||||||
|
headers = self.sign.sign_headers()
|
||||||
|
assert headers["x-zp-device-id"] == self.sign.device_id
|
||||||
|
|
||||||
|
|
||||||
|
class TestZhilianSignParams:
|
||||||
|
def setup_method(self):
|
||||||
|
self.sign = ZhilianSign(at="at789", rt="rt012", device_id="DEV-ID")
|
||||||
|
|
||||||
|
def test_keys_exactly_six(self):
|
||||||
|
params = self.sign.sign_params()
|
||||||
|
assert set(params.keys()) == EXPECTED_PARAM_KEYS, \
|
||||||
|
f"Param keys wrong: {set(params.keys())}"
|
||||||
|
|
||||||
|
def test_device_id_as_d(self):
|
||||||
|
params = self.sign.sign_params()
|
||||||
|
assert params["d"] == "DEV-ID"
|
||||||
|
|
||||||
|
def test_tokens_reflected(self):
|
||||||
|
params = self.sign.sign_params()
|
||||||
|
assert params["at"] == "at789"
|
||||||
|
assert params["rt"] == "rt012"
|
||||||
|
|
||||||
|
|
||||||
|
class TestZhilianGenerateUuid:
|
||||||
|
def test_uuid4_format(self):
|
||||||
|
uuid = ZhilianSign.generate_uuid()
|
||||||
|
uuid_pattern = r'^[0-9A-F]{8}-[0-9A-F]{4}-4[0-9A-F]{3}-[89AB][0-9A-F]{3}-[0-9A-F]{12}$'
|
||||||
|
assert re.match(uuid_pattern, uuid), \
|
||||||
|
f"UUID not UUID4 format: {uuid}"
|
||||||
|
|
||||||
|
def test_uuid_length(self):
|
||||||
|
uuid = ZhilianSign.generate_uuid()
|
||||||
|
assert len(uuid) == 36, f"Expected 36 chars, got {len(uuid)}"
|
||||||
|
|
||||||
|
def test_uuid_version_4(self):
|
||||||
|
uuid = ZhilianSign.generate_uuid()
|
||||||
|
assert uuid[14] == "4", f"Version digit should be 4, got: {uuid[14]}"
|
||||||
Loading…
x
Reference in New Issue
Block a user