JobData/ecs_full_pipeline.py

521 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import time
import base64
from datetime import datetime, timedelta, timezone
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ecs20140526 import models as ecs_models
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_tea_util import models as util_models
INSTANCE_STATUS_CHECK_INTERVAL_MILLISECOND = 1000
INSTANCE_STATUS_TOTAL_CHECK_TIME_ELAPSE_MILLISECOND = 500000
CLOUD_ASSISTANT_READY_TIMEOUT_SECONDS = 500
COMMAND_TIMEOUT_SECONDS = 500
# 固定环境配置
AMOUNT = 20 # 购买数量
MIN_AMOUNT = 20 # 最小购买数量
REGION_ID = "cn-shanghai"
ZONE_ID = "cn-shanghai-b"
INSTANCE_TYPE = "ecs.t5-lc1m1.small"
IMAGE_ID = "ubuntu_24_04_x64_20G_alibase_20251102.vhd"
INSTANCE_NAME_PREFIX = "launch-advisor-20251121"
VSWITCH_ID = "vsw-uf6twb2lrd02fi7q2i605"
SECURITY_GROUP_ID = "sg-uf60380ctrux2uusucad"
SYSTEM_DISK_SIZE = 40
SYSTEM_DISK_CATEGORY = "cloud_efficiency"
INTERNET_CHARGE_TYPE = "PayByBandwidth"
INTERNET_MAX_BW_OUT = 1
SPOT_STRATEGY = "SpotAsPriceGo"
SPOT_BEHAVIOR = "Terminate"
IO_OPTIMIZED = "optimized"
SECURITY_ENHANCE = "Active"
HTTP_TOKENS = "optional"
DELETE_NAME_PREFIX = INSTANCE_NAME_PREFIX
def init_ecs_client() -> EcsClient:
"""
初始化 ECS 客户端
参数:无
返回EcsClient —— 使用默认凭据链与地域端点的客户端
用途:用于后续调用阿里云 ECS 接口
"""
region_id = REGION_ID
credential = CredentialClient()
config = open_api_models.Config(
credential=credential,
region_id=region_id,
)
config.endpoint = f"ecs.{region_id}.aliyuncs.com"
return EcsClient(config)
def init_ecs_client_with_aksk(ak: str, sk: str) -> EcsClient:
"""
使用 AK/SK 初始化 ECS 客户端
参数ak、sk
返回EcsClient —— 使用显式 AK/SK 与地域端点的客户端
用途:当需要直接注入 AK/SK 时使用(不建议在代码中硬编码)
"""
region_id = REGION_ID
config = open_api_models.Config(
access_key_id=ak,
access_key_secret=sk,
region_id=region_id,
)
config.endpoint = f"ecs.{region_id}.aliyuncs.com"
return EcsClient(config)
def compute_auto_release_time() -> str:
"""
计算自动释放时间UTC确保至少晚于当前时间 30 分钟
参数:读取环境变量 AUTO_RELEASE_HOURS/AUTO_RELEASE_MINUTES可选
返回str —— ISO8601 格式时间,如 2025-11-21T08:00:00Z
用途:满足 AutoReleaseTime 的格式与时间窗口要求
"""
hours = int(os.getenv("AUTO_RELEASE_HOURS") or "6")
minutes = int(os.getenv("AUTO_RELEASE_MINUTES") or "0")
delta = timedelta(hours=hours, minutes=minutes)
if delta < timedelta(minutes=30):
delta = timedelta(minutes=30)
target = datetime.now(timezone.utc) + delta
return target.strftime("%Y-%m-%dT%H:%M:%SZ")
def list_instance_ids(ecs_client: EcsClient, prefix: str | None = None, ctx=None) -> list:
"""
列出实例 ID可按实例名称前缀过滤
参数ecs_client、prefix可选
返回List[str]
用途:用于批量清理现有实例或定位本批次实例
"""
region_id = REGION_ID
ids = []
page = 1
while True:
req = ecs_models.DescribeInstancesRequest(region_id=region_id, page_size=100, page_number=page)
try:
resp = ecs_client.describe_instances(req)
except Exception:
break
body = resp.body if hasattr(resp, "body") else resp
items = body.instances.instance if hasattr(body, "instances") else []
if not items:
break
for it in items:
name = getattr(it, "instance_name", None) or getattr(it, "InstanceName", None)
iid = getattr(it, "instance_id", None) or getattr(it, "instanceId", None)
if not iid:
continue
if prefix:
if name and name.startswith(prefix):
ids.append(iid)
else:
ids.append(iid)
page += 1
return ids
def wait_instances_status(ecs_client: EcsClient, instance_ids, target_status, timeout_seconds=600, ctx=None) -> bool:
"""
等待一组实例达到指定状态
参数ecs_client、instance_ids、target_status、timeout_seconds
返回bool —— 全部达到返回 True
用途:用于创建后/重启后等待恢复 Running 或等待 Stopped
"""
region_id = REGION_ID
deadline = time.time() + timeout_seconds
pending = set(instance_ids)
while time.time() < deadline and pending:
time.sleep(3)
req = ecs_models.DescribeInstancesRequest(region_id=region_id, instance_ids=json.dumps(list(pending)))
try:
resp = ecs_client.describe_instances(req)
except Exception:
continue
body = resp.body if hasattr(resp, "body") else resp
items = body.instances.instance if hasattr(body, "instances") else []
for it in items:
st = getattr(it, "status", None)
iid = getattr(it, "instance_id", None) or getattr(it, "instanceId", None)
if st == target_status and iid in pending:
pending.remove(iid)
return not pending
def clear_instances(ecs_client: EcsClient, delete_prefix: str | None = None):
"""
清空实例:停止 → 删除 → 校验
参数ecs_client、delete_prefix可选按前缀过滤删除
返回None
用途:在批量创建前保持干净环境
"""
ids = list_instance_ids(ecs_client, delete_prefix)
region_id = REGION_ID
if not ids:
print("当前地域无实例或无匹配实例,无需清理")
return
print(f"准备清理 {len(ids)} 台实例:{json.dumps(ids)}")
try:
stop_req = ecs_models.StopInstancesRequest(instance_id=ids, region_id=region_id)
ecs_client.stop_instances(stop_req)
except Exception as e:
print(f"停止实例失败:{e}")
wait_instances_status(ecs_client, ids, target_status="Stopped", timeout_seconds=600)
try:
del_req = ecs_models.DeleteInstancesRequest(instance_id=ids, force=True, region_id=region_id)
ecs_client.delete_instances(del_req)
except Exception:
for iid in ids:
try:
one = ecs_models.DeleteInstanceRequest(instance_id=iid, force=True, region_id=region_id)
ecs_client.delete_instance(one)
except Exception as e:
print(f"删除实例 {iid} 失败:{e}")
time.sleep(5)
left = list_instance_ids(ecs_client, delete_prefix)
if not left:
print("实例已全部清理完毕")
else:
print(f"仍有实例未删除:{json.dumps(left)}")
def compose_run_instances_request() -> ecs_models.RunInstancesRequest:
"""
组装创建实例请求参数
参数:无(从环境变量读取可选覆盖项)
返回RunInstancesRequest —— 包含计费、地域/可用区、规格、镜像、磁盘、网络、数量等参数
用途:用于调用 RunInstances 创建 ECS 实例
"""
region_id = REGION_ID
return ecs_models.RunInstancesRequest(
instance_charge_type="PostPaid",
region_id=region_id,
zone_id=ZONE_ID,
instance_type=INSTANCE_TYPE,
io_optimized=IO_OPTIMIZED,
spot_strategy=SPOT_STRATEGY,
spot_interruption_behavior=SPOT_BEHAVIOR,
image_id=IMAGE_ID,
security_enhancement_strategy=SECURITY_ENHANCE,
system_disk=ecs_models.RunInstancesRequestSystemDisk(
size=SYSTEM_DISK_SIZE,
category=SYSTEM_DISK_CATEGORY,
),
internet_charge_type=INTERNET_CHARGE_TYPE,
internet_max_bandwidth_out=INTERNET_MAX_BW_OUT,
v_switch_id=VSWITCH_ID,
security_group_id=SECURITY_GROUP_ID,
image_options=ecs_models.RunInstancesRequestImageOptions(login_as_non_root=False),
instance_name=INSTANCE_NAME_PREFIX,
private_dns_name_options=ecs_models.RunInstancesRequestPrivateDnsNameOptions(hostname_type="Custom"),
unique_suffix=False,
http_tokens=HTTP_TOKENS,
tenancy="default",
affinity="default",
amount=AMOUNT,
min_amount=MIN_AMOUNT,
auto_release_time=compute_auto_release_time(),
)
def run_and_wait_instances(ecs_client: EcsClient):
"""
创建实例并等待到 Running
参数ecs_client
返回List[str] —— 新建实例 ID 列表
用途:完成批量创建与状态检查
"""
request = compose_run_instances_request()
print("[创建] 正在提交创建实例请求")
try:
runtime = util_models.RuntimeOptions(connect_timeout=5000)
resp = ecs_client.run_instances_with_options(request, runtime)
except Exception as error:
print(getattr(error, "code", str(type(error))))
print(getattr(error, "message", str(error)))
data = getattr(error, "data", None)
if isinstance(data, dict) and data.get("Recommend") is not None:
print(data.get("Recommend"))
return []
body = resp.body if hasattr(resp, "body") else resp
instance_ids = body.instance_id_sets.instance_id_set
print(f"[创建] 创建成功实例ID列表{json.dumps(instance_ids)}")
start_time = int(time.time() * 1000)
pending = list(instance_ids)
region_id = REGION_ID
print(f"[创建] 开始轮询实例状态,共 {len(pending)}")
while True:
time.sleep(INSTANCE_STATUS_CHECK_INTERVAL_MILLISECOND / 1000.0)
req = ecs_models.DescribeInstancesRequest(region_id=region_id, instance_ids=json.dumps(pending))
try:
runtime = util_models.RuntimeOptions(connect_timeout=5000)
dresp = ecs_client.describe_instances_with_options(req, runtime)
except Exception as error:
print(getattr(error, "message", str(error)))
continue
instances = dresp.body.instances.instance if hasattr(dresp, "body") else dresp.instances.instance
for inst in instances:
if getattr(inst, "status", None) == "Running":
iid = getattr(inst, "instance_id", None) or getattr(inst, "instanceId", None)
if iid in pending:
pending.remove(iid)
print(f"[创建] 实例已运行:{iid}")
if not pending:
print("[创建] 所有实例启动完成")
return instance_ids
if int(time.time() * 1000) - start_time > INSTANCE_STATUS_TOTAL_CHECK_TIME_ELAPSE_MILLISECOND:
print(f"[创建] 在 {int(INSTANCE_STATUS_TOTAL_CHECK_TIME_ELAPSE_MILLISECOND/60000)} 分钟内仍未全部启动,剩余:{json.dumps(pending)}")
return instance_ids
def install_cloud_assistant_and_reboot(ecs_client: EcsClient, instance_ids) -> bool:
"""
为所有实例主动安装云助手并重启实例
参数ecs_client、instance_ids
返回bool
用途:确保云助手安装需要的重启操作已完成
"""
region_id = REGION_ID
if not instance_ids:
return False
print(f"[云助手] 正在为 {len(instance_ids)} 台实例安装云助手")
try:
install_req = ecs_models.InstallCloudAssistantRequest(region_id=region_id, instance_id=instance_ids)
runtime = util_models.RuntimeOptions(connect_timeout=5000)
ecs_client.install_cloud_assistant_with_options(install_req, runtime)
except Exception as e:
print(f"InstallCloudAssistant 调用失败:{e}")
return False
print("[云助手] 正在重启实例以完成安装")
try:
reboot_req = ecs_models.RebootInstancesRequest(region_id=region_id, instance_id=instance_ids)
runtime = util_models.RuntimeOptions(connect_timeout=5000)
ecs_client.reboot_instances_with_options(reboot_req, runtime)
except Exception as e:
print(f"RebootInstances 调用失败:{e}")
return False
print("[云助手] 等待实例重启后恢复为 Running")
ok = wait_instances_status(ecs_client, instance_ids, target_status="Running", timeout_seconds=600)
return ok
def ensure_cloud_assistant_ready(ecs_client: EcsClient, instance_ids) -> bool:
"""
等待并确保 Cloud Assistant 就绪
参数ecs_client、instance_ids
返回bool
用途:避免实例未就绪时命令执行失败
"""
region_id = REGION_ID
deadline = time.time() + CLOUD_ASSISTANT_READY_TIMEOUT_SECONDS
print(f"[云助手] 正在检查就绪状态,共 {len(instance_ids)}")
while time.time() < deadline:
try:
req = ecs_models.DescribeCloudAssistantStatusRequest(
region_id=region_id,
instance_id=instance_ids,
page_size=50,
)
runtime = util_models.RuntimeOptions(connect_timeout=5000)
resp = ecs_client.describe_cloud_assistant_status_with_options(req, runtime)
except Exception:
time.sleep(5)
continue
body = resp.body if hasattr(resp, "body") else resp
statuses = getattr(body, "instance_cloud_assistant_status", None)
if not statuses and hasattr(body, "instance_cloud_assistant_status_set"):
statuses = body.instance_cloud_assistant_status_set.instance_cloud_assistant_status
if not statuses and hasattr(body, "cloud_assistant"):
statuses = body.cloud_assistant.instance_cloud_assistant_status
ready_count = 0
not_ready_ids = []
for s in statuses or []:
st = (
getattr(s, "cloud_assistant_status", None)
or getattr(s, "CloudAssistantStatus", None)
or getattr(s, "status", None)
or getattr(s, "Status", None)
)
iid = getattr(s, "instance_id", None) or getattr(s, "InstanceId", None)
is_ready = (st is True) or (isinstance(st, str) and st.lower() in ("true", "enabled", "running"))
if is_ready:
ready_count += 1
else:
if iid:
not_ready_ids.append(iid)
print(f"[云助手] 已就绪 {ready_count}/{len(instance_ids)}")
if not_ready_ids:
print(f"[云助手] 未就绪实例:{json.dumps(not_ready_ids)}")
if ready_count == len(instance_ids):
return True
time.sleep(5)
return False
def create_and_invoke_command(ecs_client: EcsClient, instance_ids, command: str, timeout_seconds: int):
"""
创建命令并触发执行,随后轮询结果
参数ecs客户端、实例ID列表、命令、超时
返回None
用途:下发并打印各实例的执行状态与输出
"""
region_id = REGION_ID
encoded = base64.b64encode(command.encode("utf-8")).decode("ascii")
create_req = ecs_models.CreateCommandRequest(
type="RunShellScript",
command_content=encoded,
name="batch-command",
working_dir="/root",
timeout=timeout_seconds,
region_id=region_id,
)
print("[命令] 正在创建云助手命令")
create_resp = ecs_client.create_command(create_req)
cmd_id = getattr(create_resp.body, "command_id", None) if hasattr(create_resp, "body") else getattr(create_resp, "command_id", None)
if not cmd_id:
print("CreateCommand 未返回 command_id")
return
print(f"[命令] 命令已创建command_id={cmd_id}")
invoke_req = ecs_models.InvokeCommandRequest(command_id=cmd_id, instance_id=instance_ids, region_id=region_id)
print(f"[命令] 正在下发到 {len(instance_ids)} 台实例")
invoke_resp = ecs_client.invoke_command(invoke_req)
invoke_id = getattr(invoke_resp.body, "invoke_id", None) if hasattr(invoke_resp, "body") else getattr(invoke_resp, "invoke_id", None)
if not invoke_id:
print("InvokeCommand 未返回 invoke_id")
return
print(f"[命令] invoke_id={invoke_id}")
deadline = time.time() + timeout_seconds
printed = set()
while time.time() < deadline:
time.sleep(3)
results_req = ecs_models.DescribeInvocationResultsRequest(region_id=region_id, invoke_id=invoke_id, page_size=50)
results_resp = ecs_client.describe_invocation_results(results_req)
body = results_resp.body if hasattr(results_resp, "body") else results_resp
items = getattr(body, "invocation_result", None)
if items is None and hasattr(body, "invocation"):
items = body.invocation.invocation_results.invocation_result
if not items:
continue
all_done = True
for it in items:
instance_id = getattr(it, "instance_id", None) or getattr(it, "instanceId", None)
status = getattr(it, "invoke_status", None) or getattr(it, "status", None) or getattr(it, "InvokeStatus", None)
exit_code = getattr(it, "exit_code", None) or getattr(it, "ExitCode", None)
raw_output = getattr(it, "output", "") or getattr(it, "Output", "")
try:
output = base64.b64decode(raw_output).decode("utf-8", errors="ignore") if raw_output else ""
except Exception:
output = raw_output
if status in ("Running", "Scheduled", "Starting"):
all_done = False
continue
if instance_id not in printed:
print(f"[命令] [{instance_id}] 状态={status} 退出码={exit_code}")
if output:
print(output)
printed.add(instance_id)
print(f"[命令] 已完成 {len(printed)}/{len(instance_ids)}")
if all_done and len(printed) == len(instance_ids):
break
def get_default_command() -> str:
"""
获取默认批量执行命令(你的脚本)
参数:无
返回str
用途:在未提供 --cmd 或环境变量时使用默认值
"""
return """#!/bin/bash
URL="https://keaiya-1259195914.cos.ap-shanghai.myqcloud.com/jobs_spider.zip?ts=$(date +%s)"
cd /root && \
apt-get update -y && \
apt-get install -y unzip curl tmux && \
curl -fL -o jobs_spider.zip "$URL" && \
unzip -o jobs_spider.zip && \
cd /root/jobs_spider && \
bash start_all.sh
"""
def main():
"""
脚本入口:删除 → 创建 → 安装云助手 → 执行命令(严格就绪)
参数:支持 --cmd 与 --delete-prefix其余通过环境变量配置
返回None
用途:按既定顺序执行完整流程
"""
global REGION_ID, ZONE_ID, INSTANCE_TYPE, IMAGE_ID, INSTANCE_NAME_PREFIX, VSWITCH_ID, SECURITY_GROUP_ID
global AMOUNT, MIN_AMOUNT, SYSTEM_DISK_SIZE, SYSTEM_DISK_CATEGORY, INTERNET_CHARGE_TYPE, INTERNET_MAX_BW_OUT
global SPOT_STRATEGY, SPOT_BEHAVIOR, IO_OPTIMIZED, SECURITY_ENHANCE, HTTP_TOKENS
global CLOUD_ASSISTANT_READY_TIMEOUT_SECONDS, COMMAND_TIMEOUT_SECONDS
global INSTANCE_STATUS_CHECK_INTERVAL_MILLISECOND, INSTANCE_STATUS_TOTAL_CHECK_TIME_ELAPSE_MILLISECOND
REGION_ID = "cn-qingdao"
ZONE_ID = "cn-qingdao-b"
INSTANCE_TYPE = "ecs.n1.tiny"
IMAGE_ID = "ubuntu_24_04_x64_20G_alibase_20251102.vhd"
INSTANCE_NAME_PREFIX = "launch-advisor-20251123"
VSWITCH_ID = "vsw-m5e5oq3xv5m96jt47r4hp"
SECURITY_GROUP_ID = "sg-m5e3rsab25rt1dhyai1c"
AMOUNT = 20
MIN_AMOUNT = 20
SYSTEM_DISK_SIZE = 40
SYSTEM_DISK_CATEGORY = "cloud_efficiency"
INTERNET_CHARGE_TYPE = "PayByBandwidth"
INTERNET_MAX_BW_OUT = 1
SPOT_STRATEGY = "SpotAsPriceGo"
SPOT_BEHAVIOR = "Terminate"
IO_OPTIMIZED = "optimized"
SECURITY_ENHANCE = "Active"
HTTP_TOKENS = "optional"
CLOUD_ASSISTANT_READY_TIMEOUT_SECONDS = 600
COMMAND_TIMEOUT_SECONDS = 500
INSTANCE_STATUS_CHECK_INTERVAL_MILLISECOND = 3000
INSTANCE_STATUS_TOTAL_CHECK_TIME_ELAPSE_MILLISECOND = 60000 * 3
ak = "LTAI5tBgW3hAzcnHBkZywxkD"
sk = "Il7M4bkJvdZIutkJH8pxhuMLrMvj5x"
ecs_client = init_ecs_client_with_aksk(ak, sk) if ak and sk else init_ecs_client()
print(f"[main] start clearing instances with prefix {INSTANCE_NAME_PREFIX}")
clear_instances(ecs_client, delete_prefix=INSTANCE_NAME_PREFIX)
print("[main] clearing completed")
instance_ids = run_and_wait_instances(ecs_client)
if not instance_ids:
print("未获得实例ID终止")
return
print("[main] installing Cloud Assistant and rebooting instances")
installed = install_cloud_assistant_and_reboot(ecs_client, instance_ids)
if not installed:
print("Cloud Assistant 安装或重启失败,终止")
return
print("[main] Cloud Assistant installation + reboot completed")
print("[main] verifying Cloud Assistant readiness")
ready = ensure_cloud_assistant_ready(ecs_client, instance_ids)
if not ready:
print("Cloud Assistant 未全部就绪,终止")
return
print("[main] Cloud Assistant is ready on all instances")
command = get_default_command()
print("[main] creating and invoking batch command")
create_and_invoke_command(ecs_client, instance_ids, command, timeout_seconds=COMMAND_TIMEOUT_SECONDS)
print("[main] command invocation completed")
if __name__ == "__main__":
main()