目标:配置 Claude Code 的可观测性,实现全链路监控和性能分析
预计时间:30 分钟
对应官方文档:Monitoring Usage、Agent SDK Observability
为什么需要可观测性?
在团队和企业环境中,你需要了解:
- 📊 用量:谁在用?花了多少?
- 🔍 性能:响应有多快?哪里瓶颈?
- 🐛 问题:为什么出错?如何修复?
- 📈 趋势:使用量增长如何?
OpenTelemetry 集成
架构
配置 Claude Code
# 启用 OpenTelemetry
export OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4317
export OTEL_SERVICE_NAME=claude-code-team
export OTEL_RESOURCE_ATTRIBUTES="team=backend,env=production"
# 启动 Claude Code
claudeAgent SDK 配置
from claude_agent_sdk import Agent, ObservabilityConfig
agent = Agent(
model="claude-sonnet-4-6",
observability=ObservabilityConfig(
exporter="otlp",
endpoint="http://otel-collector:4317",
service_name="code-review-agent",
attributes={
"team": "platform",
"project": "auth-service"
}
)
)收集的遥测数据
Traces(调用链)
{
"trace_id": "abc123",
"span_id": "def456",
"name": "claude.task",
"start_time": "2025-06-18T10:00:00Z",
"end_time": "2025-06-18T10:00:05Z",
"attributes": {
"claude.model": "claude-sonnet-4-6",
"claude.input_tokens": 4520,
"claude.output_tokens": 890,
"claude.tool_calls": 3,
"claude.files_read": 5,
"claude.files_written": 2
},
"events": [
{"time": "10:00:01", "name": "thinking_start"},
{"time": "10:00:03", "name": "tool_call", "attributes": {"tool": "file_read"}},
{"time": "10:00:05", "name": "completion"}
]
}Metrics(指标)
| 指标名 | 类型 | 说明 |
|---|---|---|
claude.tokens.input | Counter | 输入 token 数 |
claude.tokens.output | Counter | 输出 token 数 |
claude.requests.duration | Histogram | 请求耗时 |
claude.tool.calls | Counter | 工具调用次数 |
claude.errors | Counter | 错误次数 |
Logs(日志)
{
"timestamp": "2025-06-18T10:00:05Z",
"level": "INFO",
"message": "Task completed",
"attributes": {
"session_id": "sess_abc",
"task": "refactor auth",
"duration_ms": 5200,
"success": true
}
}监控仪表盘
Grafana 配置
# docker-compose.yaml
version: '3'
services:
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"关键看板
┌─────────────────────────────────────────────┐
│ Claude Code 监控仪表盘 │
├─────────────────────────────────────────────┤
│ │
│ 今日用量: $45.20 │ 活跃会话: 12 │
│ 较昨日: +15% │ 等待确认: 3 │
│ │
├─────────────────────────────────────────────┤
│ 请求耗时分布 │
│ ▓▓▓▓▓▓▓░░░ 平均 3.2s │
│ P50: 2.1s P95: 8.5s P99: 15.2s │
│ │
├─────────────────────────────────────────────┤
│ 错误率趋势 │
│ ──────╱╲────── 2.3% │
│ │
├─────────────────────────────────────────────┤
│ Top 5 用户 Top 5 项目 │
│ 1. alice $12.50 1. backend $18.20 │
│ 2. bob $10.30 2. mobile $15.40 │
│ ... │
└─────────────────────────────────────────────┘
告警配置
Prometheus Rules
# alerting-rules.yaml
groups:
- name: claude-alerts
rules:
- alert: HighCostSpike
expr: |
(
sum(claude_cost_usd)
/ sum(claude_cost_usd offset 1d)
) > 2
for: 1h
labels:
severity: warning
annotations:
summary: "Claude Code 成本激增"
- alert: HighErrorRate
expr: |
(
sum(rate(claude_errors_total[5m]))
/ sum(rate(claude_requests_total[5m]))
) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "错误率超过 5%"
- alert: SlowResponse
expr: |
histogram_quantile(0.95,
rate(claude_request_duration_seconds_bucket[5m])
) > 30
for: 10m
labels:
severity: warning
annotations:
summary: "P95 响应时间超过 30 秒"调试技巧
查看详细 Trace
# 导出当前会话的 trace
claude export-trace --session sess_abc123 --output trace.json
# 分析
jq '.spans[] | {name, duration_ms}' trace.json性能分析
# 找出最耗时的操作
claude profile --last-hour
# 输出示例:
# Operation Count Avg(ms) Total(ms)
# file_read 45 120 5400
# bash_exec 12 850 10200
# model_call 8 3200 25600完整的 Grafana 仪表盘配置
{
"dashboard": {
"title": "Claude Code 企业监控",
"tags": ["claude", "ai", "monitoring"],
"timezone": "browser",
"panels": [
{
"title": "实时用量概览",
"type": "stat",
"targets": [
{
"expr": "sum(claude_tokens_input_total) + sum(claude_tokens_output_total)",
"legendFormat": "总 Token 数"
}
],
"fieldConfig": {
"defaults": {
"unit": "short",
"color": {
"mode": "thresholds",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 1000000},
{"color": "red", "value": 5000000}
]
}
}
}
}
},
{
"title": "Top 10 活跃用户",
"type": "table",
"targets": [
{
"expr": "topk(10, sum by (user) (claude_cost_usd_total))",
"format": "table"
}
]
},
{
"title": "模型使用分布",
"type": "piechart",
"targets": [
{
"expr": "sum by (model) (claude_requests_total)",
"legendFormat": "{{model}}"
}
]
},
{
"title": "错误率趋势",
"type": "timeseries",
"targets": [
{
"expr": "rate(claude_errors_total[5m]) / rate(claude_requests_total[5m])",
"legendFormat": "错误率"
}
]
},
{
"title": "Token 使用热力图",
"type": "heatmap",
"targets": [
{
"expr": "sum by (le) (rate(claude_request_duration_seconds_bucket[5m]))",
"format": "heatmap"
}
]
}
]
}
}Docker Compose 一键部署
# observability-stack.yaml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
ports:
- "9090:9090"
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
volumes:
- ./loki-config.yml:/etc/loki/local-config.yaml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
- grafana-data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317"
- "4318:4318"
volumes:
prometheus-data:
grafana-data:遥测数据流转全景
下面这张图把「客户端 → Collector → 存储 → 告警/分析」的数据走向串起来,落地时按团队规模裁剪:
要点:
- 边缘 sidecar 不做采样,只做 batch + memory_limiter,避免节点本地丢数据;
- 采样在中心 collector 做,统一策略、便于对比;
- 安全相关日志(
permission_decision=deny)单独走Security DW,保留期与告警阈值都更严; - FinOps 看板独立从 Prometheus 拉,避免和工程师看的 SLO 看板互相干扰。
遵循 OTLP 与事件语义
Claude Code 生成的 trace/metrics/log 有几个务必会遇到的 attribute,提前记住会大幅减少仪表盘调试时间:
| Attribute | 含义 | 典型查询场景 |
|---|---|---|
claude.session_id | 单次 CLI 会话 | 复现单次故障 |
claude.turn_index | 会话内轮次序号 | 看单轮调用链 |
claude.tool | 工具名称 | 拆分工具耗时 |
claude.tool_args_hash | 工具参数 SHA256 | 去重定位重复调用 |
claude.model | 模型名 | 模型对比 |
claude.tokens.input/output | Token 计量 | 成本分析 |
claude.permission_decision | allow/deny/confirm | 安全事件分析 |
claude.user.email | 企业 SSO 帐号 | 按人统计 |
设置环境变量让 SDK 主动带上这些字段:
export OTEL_EXPORTER_OTLP_ENDPOINT="http://otel-collector.corp.local:4317"
export OTEL_EXPORTER_OTLP_PROTOCOL="grpc"
export OTEL_RESOURCE_ATTRIBUTES="service.name=claude-code,deployment.environment=prod,team=$TEAM"
export OTEL_LOG_LEVEL=warn
export CLAUDE_TELEMETRY="on"
export CLAUDE_TELEMETRY_INCLUDE_TOOL_IO="redacted" # 含敏感字段自动脱敏OTel Collector 生产配置
上面 docker-compose 里裸跑 collector 的配置是入门版,生产环境要补:尾部采样、脱敏、分流、限流。
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc: { endpoint: 0.0.0.0:4317 }
http: { endpoint: 0.0.0.0:4318 }
processors:
memory_limiter:
check_interval: 1s
limit_percentage: 75
spike_limit_percentage: 25
batch:
timeout: 5s
send_batch_size: 1024
# 尾部采样:错误/慢/高成本 100%,其余 10%
tail_sampling:
decision_wait: 10s
policies:
- name: errors
type: status_code
status_code: { status_codes: [ERROR] }
- name: slow
type: latency
latency: { threshold_ms: 5000 }
- name: high-cost
type: numeric_attribute
numeric_attribute: { key: claude.tokens.output, min_value: 8000 }
- name: random
type: probabilistic
probabilistic: { sampling_percentage: 10 }
# 脱敏敏感字段
attributes/redact:
actions:
- { key: claude.tool_args_raw, action: delete }
- { key: prompt.text, action: hash }
- { key: response.text, action: hash }
# 资源属性补全
resource:
attributes:
- { key: dc, value: ${env:DEPLOY_DC}, action: insert }
- { key: cluster, value: ${env:CLUSTER_NAME}, action: insert }
# 按权限决策分流到不同后端
routing:
from_attribute: claude.permission_decision
table:
- value: deny
exporters: [otlp/security, otlphttp/loki]
- value: allow
exporters: [otlp/jaeger, prometheus]
default_exporters: [otlp/jaeger, prometheus]
exporters:
prometheus:
endpoint: 0.0.0.0:8889
metric_expiration: 30m
resource_to_telemetry_conversion: { enabled: true }
otlp/jaeger:
endpoint: jaeger:4317
tls: { insecure: true }
sending_queue: { enabled: true, queue_size: 5000 }
retry_on_failure: { enabled: true, initial_interval: 5s, max_elapsed_time: 5m }
otlp/security:
endpoint: security-collector.corp.local:4317
tls: { insecure_skip_verify: false, ca_file: /etc/ssl/corp-ca.pem }
otlphttp/loki:
endpoint: http://loki:3100/otlp
extensions:
health_check: { endpoint: 0.0.0.0:13133 }
pprof: { endpoint: 0.0.0.0:1777 }
zpages: { endpoint: 0.0.0.0:55679 }
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, attributes/redact, resource, tail_sampling, batch]
exporters: [otlp/jaeger]
metrics:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [prometheus]
logs:
receivers: [otlp]
processors: [memory_limiter, attributes/redact, batch, routing]
exporters: [otlphttp/loki, otlp/security]
telemetry:
metrics: { address: 0.0.0.0:8888 }
logs: { level: info }SDK 内嵌埋点与自定义指标
企业场景往往需要「按项目/按团队」的业务指标。下面这个例子在 Agent SDK 外面裹一层 OTel,做出有业务语义的指标:
# observability/instrumented_agent.py
import asyncio
import hashlib
import os
import time
from contextlib import asynccontextmanager
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
def _setup_otel(service: str, env: str) -> None:
res = Resource.create({"service.name": service, "deployment.environment": env})
tp = TracerProvider(resource=res)
tp.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tp)
mp = MeterProvider(
resource=res,
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter(), 30_000)],
)
metrics.set_meter_provider(mp)
_setup_otel(os.environ.get("OTEL_SERVICE_NAME", "claude-agent"),
os.environ.get("DEPLOY_ENV", "dev"))
tracer = trace.get_tracer("claude.agent")
meter = metrics.get_meter("claude.agent")
turn_counter = meter.create_counter(
"claude_turns_total", description="Number of agent turns")
token_input = meter.create_counter(
"claude_tokens_input_total", unit="token")
token_output = meter.create_counter(
"claude_tokens_output_total", unit="token")
tool_latency = meter.create_histogram(
"claude_tool_latency_seconds", unit="s",
description="Latency per tool invocation")
tool_errors = meter.create_counter(
"claude_tool_errors_total", description="Tool failures")
@asynccontextmanager
async def traced_session(project: str, user: str):
"""围绕一次会话开启 root span,并把业务标签注入。"""
with tracer.start_as_current_span("claude.session") as span:
span.set_attribute("project", project)
span.set_attribute("claude.user.email", user)
span.set_attribute("deploy.env", os.environ.get("DEPLOY_ENV", "dev"))
yield span
async def run_task(project: str, user: str, prompt: str) -> None:
options = ClaudeAgentOptions(
system_prompt="You are an expert Python engineer.",
allowed_tools=["Read", "Grep", "Edit", "Bash"],
max_turns=12,
)
labels = {"project": project, "team": os.environ.get("TEAM", "unknown")}
async with traced_session(project, user):
async with ClaudeSDKClient(options=options) as client:
await client.query(prompt)
async for evt in client.receive_response():
if evt.type == "message_start":
turn_counter.add(1, labels)
if evt.type == "tool_use":
args_hash = hashlib.sha256(
str(evt.tool_input).encode()).hexdigest()[:16]
with tracer.start_as_current_span(f"tool.{evt.tool_name}") as s:
s.set_attribute("claude.tool", evt.tool_name)
s.set_attribute("claude.tool_args_hash", args_hash)
start = time.perf_counter()
try:
await evt.execute()
except Exception as exc:
tool_errors.add(1, {**labels, "tool": evt.tool_name,
"kind": type(exc).__name__})
s.record_exception(exc)
s.set_status(trace.Status(trace.StatusCode.ERROR))
raise
finally:
tool_latency.record(time.perf_counter() - start,
{**labels, "tool": evt.tool_name})
if evt.type == "message_delta" and getattr(evt, "usage", None):
token_input.add(evt.usage.input_tokens, labels)
token_output.add(evt.usage.output_tokens, labels)
if __name__ == "__main__":
asyncio.run(run_task(
project="finops-rules",
user=os.environ.get("USER_EMAIL", "[email protected]"),
prompt="审查 src/risk/ 下的代码并给出 3 个性能改进建议。",
))配套 PromQL 把成本和质量画到同一张图上:
# 按团队的 Token 消耗速率
sum by (team) (rate(claude_tokens_output_total[5m]))
+ sum by (team) (rate(claude_tokens_input_total[5m]))
# 按工具的 P95 延迟
histogram_quantile(0.95,
sum by (tool, le) (rate(claude_tool_latency_seconds_bucket[5m]))
)
# 工具错误率
sum by (tool) (rate(claude_tool_errors_total[5m]))
/
sum by (tool) (rate(claude_tool_latency_seconds_count[5m]))实用调试查询:从仪表盘到根因
OTel 数据用对了,定位「为什么这次会话慢/贵/失败」会非常顺手。下面是几条「先存到本子里」的查询:
# 1) Loki:把单次会话的所有日志按时间排出来
{service_name="claude-code"} | json | claude_session_id = "$session_id"
| line_format "{{.ts}} {{.claude_tool}} {{.message}}"
# 2) Loki:找最近 1h 内权限被拒绝的所有调用
{service_name="claude-code"} | json
| claude_permission_decision = "deny"
| line_format "{{.ts}} user={{.claude_user_email}} tool={{.claude_tool}} reason={{.deny_reason}}"-- 3) Tempo TraceQL:找耗时 > 30s 且产生 deny 的会话
{ resource.service.name = "claude-code" }
| duration > 30s
| events.name = "permission.deny"
| select(claude.session_id, claude.user.email, duration)把这三条做成 Grafana 面板的下钻链接(Data link → URL),从 SLO 看板可以一键跳到具体会话的日志/trace,节省大量切换时间。
企业级实战场景
场景一:FinOps 看板 + 团队预算护栏
业务背景:30 个团队共用一个 Anthropic 企业账号,月预算 80k USD。需要做到:① 实时看到每个团队消耗;② 接近预算时自动告警;③ 超预算时强制降级到便宜模型。
Prometheus 计算 Token 成本(recording rule):
# prometheus/rules/claude-finops.yml
groups:
- name: claude-finops
interval: 30s
rules:
- record: claude:cost_usd:rate5m
expr: |
(
sum by (team, model) (rate(claude_tokens_input_total[5m])) *
on(model) group_left() claude_model_price_input
)
+
(
sum by (team, model) (rate(claude_tokens_output_total[5m])) *
on(model) group_left() claude_model_price_output
)
- record: claude:cost_usd:month_to_date
expr: |
sum by (team) (
sum_over_time(
(claude:cost_usd:rate5m * 300)[31d:5m]
)
)
- alert: ClaudeTeamBudgetWarning
expr: claude:cost_usd:month_to_date / on(team) claude_team_budget_usd > 0.8
for: 10m
labels: { severity: warning }
annotations:
summary: "团队 {{ $labels.team }} 已用 80% 月预算"
- alert: ClaudeTeamBudgetExceeded
expr: claude:cost_usd:month_to_date / on(team) claude_team_budget_usd > 1
for: 5m
labels: { severity: critical }
annotations:
summary: "团队 {{ $labels.team }} 已超月预算"Alertmanager → budget-guard webhook:
# budget_guard.py —— 收到告警后写降级标记到 Redis
import json
import os
from http.server import BaseHTTPRequestHandler, HTTPServer
import redis
r = redis.Redis.from_url(os.environ["REDIS_URL"])
TTL = 6 * 3600 # 6 小时窗口
class Handler(BaseHTTPRequestHandler):
def do_POST(self):
body = json.loads(self.rfile.read(int(self.headers["Content-Length"])))
for alert in body.get("alerts", []):
team = alert["labels"].get("team")
sev = alert["labels"].get("severity")
status = alert.get("status")
if not team:
continue
key = f"team:{team}:degraded"
if status == "firing" and sev == "critical":
r.setex(key, TTL, "haiku")
elif status == "firing" and sev == "warning":
r.setex(key, TTL, "sonnet")
elif status == "resolved":
r.delete(key)
self.send_response(204); self.end_headers()
if __name__ == "__main__":
HTTPServer(("0.0.0.0", 8080), Handler).serve_forever()SDK 侧的预运行检查:
# pre_run_check.py
import os, redis
from claude_agent_sdk import ClaudeAgentOptions
r = redis.Redis.from_url(os.environ["REDIS_URL"])
def build_options(team: str) -> ClaudeAgentOptions:
forced = r.get(f"team:{team}:degraded")
forced_model = forced.decode() if forced else None
base = ClaudeAgentOptions(
system_prompt="You are an engineering assistant.",
allowed_tools=["Read", "Grep", "Edit", "Bash"],
max_turns=10,
)
if forced_model:
base.model = forced_model # 强制降级到便宜模型
base.max_turns = min(base.max_turns, 6)
return base落地效果:Grafana 上能按团队/项目/用户看到月度消耗、Top-N 单次会话成本、token 输入输出比;超预算后 SDK 自动切换 Haiku,开发者会看到一条「当前团队进入降级模式」的提示。
场景二:异常会话「取证 → 回放」一站式
业务背景:偶发线上事故里 AI 把生产配置改坏过一次。安全团队要求:任何 permission_decision=deny 或失败大于 3 次的会话,自动归档完整 trace + 输入输出 hash + 当时的代码快照。
告警规则:
- alert:
ClaudeAnomalousSession
expr: |
(
sum by (claude_session_id) (rate(claude_tool_errors_total[2m])) > 3
)
or
(
sum by (claude_session_id) (
increase(claude_permission_denials_total[2m])
) > 0
)
for: 1m
labels: { severity: critical, route: forensics }
annotations:
summary: "异常会话 {{ $labels.claude_session_id }}"取证 Worker:
# forensics_worker.py
import hashlib
import io
import json
import os
import pathlib
import subprocess
import tarfile
import time
import urllib.request
from http.server import BaseHTTPRequestHandler, HTTPServer
import boto3
S3 = boto3.client("s3")
BUCKET = os.environ["EVIDENCE_BUCKET"]
TEMPO = os.environ["TEMPO_URL"]
LOG_DIR = pathlib.Path(os.environ.get("CLAUDE_LOG_DIR", "/var/log/claude"))
def fetch_trace(session_id: str) -> bytes:
url = f"{TEMPO}/api/search?tags=claude.session_id={session_id}&limit=1"
with urllib.request.urlopen(url, timeout=15) as resp:
meta = json.loads(resp.read())
if not meta.get("traces"):
return b"{}"
trace_id = meta["traces"][0]["traceID"]
with urllib.request.urlopen(f"{TEMPO}/api/traces/{trace_id}", timeout=30) as resp:
return resp.read()
def collect(session_id: str) -> tuple[bytes, str]:
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
# trace
trace_bytes = fetch_trace(session_id)
info = tarfile.TarInfo("trace.json")
info.size = len(trace_bytes)
tar.addfile(info, io.BytesIO(trace_bytes))
# 本地 jsonl 日志
log_path = LOG_DIR / f"{session_id}.jsonl"
if log_path.exists():
tar.add(log_path, arcname="session.jsonl")
# 当时代码快照(用 git stash + bundle)
rev = subprocess.run(["git", "rev-parse", "HEAD"],
capture_output=True, text=True).stdout.strip()
bundle = subprocess.run(
["git", "bundle", "create", "/tmp/repo.bundle", "HEAD"],
capture_output=True)
if bundle.returncode == 0:
tar.add("/tmp/repo.bundle", arcname=f"repo-{rev[:12]}.bundle")
# 元数据
meta = {
"session_id": session_id,
"captured_at": int(time.time()),
"git_head": rev,
"host": os.uname().nodename,
}
meta_bytes = json.dumps(meta, indent=2).encode()
info = tarfile.TarInfo("meta.json")
info.size = len(meta_bytes)
tar.addfile(info, io.BytesIO(meta_bytes))
raw = buf.getvalue()
digest = hashlib.sha256(raw).hexdigest()
return raw, digest
class Handler(BaseHTTPRequestHandler):
def do_POST(self):
payload = json.loads(self.rfile.read(int(self.headers["Content-Length"])))
for alert in payload.get("alerts", []):
sid = alert["labels"].get("claude_session_id")
if not sid or alert.get("status") != "firing":
continue
blob, digest = collect(sid)
key = f"forensics/{time.strftime('%Y/%m/%d')}/{sid}.tgz"
S3.put_object(
Bucket=BUCKET, Key=key, Body=blob,
ServerSideEncryption="aws:kms",
Metadata={"sha256": digest, "session_id": sid},
ObjectLockMode="COMPLIANCE",
ObjectLockRetainUntilDate=time.strftime(
"%Y-%m-%dT%H:%M:%SZ",
time.gmtime(time.time() + 365 * 24 * 3600)),
)
self.send_response(204); self.end_headers()
if __name__ == "__main__":
HTTPServer(("0.0.0.0", 9090), Handler).serve_forever()落地要点:
- S3 桶开 Object Lock + KMS,证据写入即不可改、保留 1 年;
- 拉 Tempo 完整 trace + 本地 JSONL 日志 + 当时代码 bundle,事故复盘时能直接
git clone repo.bundle拿到当时的代码状态; - 取证流程不依赖被告警的工程师配合,即使他离线也能完成证据落库。
高级教程完成!
恭喜完成所有高级教程!你已经掌握了:
- ✅ Agent SDK 开发
- ✅ Hooks 自动化
- ✅ 插件开发与市场
- ✅ 动态工作流编排
- ✅ 企业部署管理
- ✅ 安全加固
- ✅ CI/CD 集成
- ✅ 可观测性监控
继续探索
- 📖 完整参考:官方文档
- 💬 社区:Discord
- 🐙 示例代码:GitHub Examples
本教程持续更新中,欢迎反馈和建议!