0%

模块 01 · 数据融合逻辑

模块 1 - 数据融合

核心定位: 数据融合是智能安全体系的"数据基础层",通过多源异构数据的统一采集、实时汇聚、智能富化、弹性存储,为上层安全分析提供高质量数据底座。


1. 功能概述

1.1 业务背景

传统安全运营中,企业部署了数十种安全设备(防火墙、WAF、IDS、HIDS、SIEM等),但各设备独立运行、数据孤立,形成严重的数据孤岛。安全分析师需要跨平台手动关联数据,耗时耗力且效果不佳。

核心问题:

问题 现状 影响
数据格式不统一 100+ 种日志格式,互不兼容 人工解析成本高,扩展性差
数据孤岛严重 各安全设备独立运行 跨源关联困难,漏报率高
实时性差 分钟级延迟甚至小时级 错过最佳响应时机
上下文缺失 单点告警无关联信息 无法判断真伪和影响范围

1.2 设计目标

目标 量化指标 价值
统一数据底座 接入 100+ 数据源,格式标准化率 > 95% 消除数据孤岛
实时数据汇聚 端到端延迟 < 10s 快速发现威胁
智能语义富化 自动关联覆盖率 > 85% 上下文完整
弹性存储扩展 PB 级存储,180 天+历史追溯 支持深度溯源

1.3 设计原则

  1. 标准化优先 — 统一数据格式,建立规范的数据模型
  2. 实时性保障 — 流式处理架构,保证数据时效性
  3. 智能化为辅 — AI 解决人力密集型工作,释放分析师精力
  4. 可扩展架构 — 插件式设计,支持快速接入新数据源
  5. 可靠性保证 — 多级容灾,数据不丢失

2. 功能架构

2.1 整体架构

graph TB subgraph 数据源层 direction TB DS1["网络设备\\n防火墙/交换机/IDS"] DS2["主机系统\\nWindows/Linux/EDR"] DS3["应用系统\\nWeb应用/数据库/API"] DS4["身份认证\\nIAM/IDaaS/AD"] DS5["威胁情报\\nSTIX/OTX/自产情报"] end subgraph 采集接入层 direction TB C1["OTEL Agent\\n标准化采集"] C2["主机Agent\\n轻量代理"] C3["API采集器\\nREST/Webhook"] C4["Syslog收集\\n日志转发"] C5["情报接口\\nTAXII/STIX"] end subgraph 智能处理层 direction TB P1["协议解析\\n多格式支持"] P2["Smart Routing\\n智能分发"] P3["语义映射\\n字段标准化"] P4["知识网络\\n实体关联"] P5["智能降噪\\n告警聚合"] end subgraph 数据存储层 direction TB S1["实时流数据\\nKafka/Flink"] S2["结构化存储\\nElasticsearch"] S3["向量存储\\nMilvus"] S4["图数据库\\nNeo4j"] S5["冷存储\\n对象存储"] end subgraph 数据服务层 direction TB SV1["统一查询接口"] SV2["实时事件API"] SV3["历史检索API"] SV4["实体关系API"] end DS1 & DS2 & DS3 & DS4 & DS5 --> C1 & C2 & C3 & C4 & C5 C1 & C2 & C3 & C4 & C5 --> P1 --> P2 --> P3 --> P4 --> P5 P1 & P2 & P3 & P4 & P5 --> S1 & S2 & S3 & S4 & S5 S1 & S2 & S3 & S4 & S5 --> SV1 & SV2 & SV3 & SV4 style DS1 fill:#e3f2fd,stroke:#1565c0 style DS2 fill:#fff3e0,stroke:#e65100 style DS3 fill:#e8f5e9,stroke:#2e7d32 style DS4 fill:#fce4ec,stroke:#c62828 style DS5 fill:#f3e5f5,stroke:#7b1fa2 style C1 fill:#bbdefb,stroke:#1565c0 style P1 fill:#bbdefb,stroke:#1565c0 style S1 fill:#bbdefb,stroke:#1565c0 style SV1 fill:#e1f5fe,stroke:#01579b,stroke-width:3px

2.2 数据流设计

sequenceDiagram participant DS as 数据源 participant CA as 采集Agent participant KB as Kafka participant SR as Smart Router participant L1 as L1规则层 participant L2 as L2轻量模型 participant L3 as L3大模型 participant EN as 实体网络 participant ST as 存储层 DS->>CA: 原始日志 CA->>KB: 结构化事件 KB->>SR: 事件分发 SR->>L1: 规则匹配 alt L1 命中 L1-->>ST: 直接存储 else L1 未命中 SR->>L2: 语义分析 alt L2 可处理 L2-->>ST: 标准化事件 else L2 无法处理 L2->>L3: 深度推理 L3-->>EN: 实体关联 EN-->>ST: 富化事件 end end ST->>EN: 实体更新 EN-->>ST: 关联查询

3. 核心功能模块

3.1 多源数据采集引擎

3.1.1 功能描述

统一接入网络设备、主机系统、应用系统、身份认证、威胁情报等 5 大类数据源,支持 100+ 种日志格式的自动解析与标准化。

3.1.2 采集类型矩阵

数据源类型 采集方式 技术组件 数据格式 采集频率
网络设备 流量镜像/API SPAN/TAP + Flow Generator NetFlow/sFlow/IPFIX 实时
主机系统 Agent/ Syslog EDR/HIDS Agent JSON/Syslog 实时
应用系统 API/日志文件 REST Polling/Webhook JSON/Plain Text 按需/实时
身份认证 日志/API IAM Connector JSON/Syslog 实时
威胁情报 TAXII/STIX 情报接口 STIX/CSV/JSON 定期/实时

3.1.3 采集能力指标

指标 目标值 说明
并发采集源 1000+ 支持大规模部署
采集延迟 < 1s 从源到系统的延迟
吞吐量 50,000 events/s 峰值处理能力
数据源类型 100+ 种 覆盖主流设备
可用性 99.9% 采集服务可用性

3.1.4 采集流程

graph LR subgraph 发现 D1["数据源发现\\n自动识别"] D2["格式检测\\n样本分析"] end subgraph 配置 C1["采集配置\\n模板匹配"] C2["字段映射\\n规则生成"] end subgraph 执行 E1["数据采集\\n实时/定期"] E2["协议解析\\n格式转换"] E3["初步校验\\n数据清洗"] end subgraph 输出 O1["标准化事件\\n结构化"] O2["原始日志\\n归档存储"] end D1 --> D2 --> C1 --> C2 --> E1 --> E2 --> E3 --> O1 & O2

3.2 智能 Pipeline 处理

3.2.1 Smart Routing 三层分治

为解决大模型处理海量日志的成本与效率问题,采用三层分治架构:

层级 处理方式 日志占比 延迟 技术选型 成本
L1 规则层 正则/模板匹配 60-70% < 5ms Redis + Lua 极低
L2 轻量模型 语义分类(7B) 20-30% < 100ms Qwen2-7B
L3 大模型 深度推理(72B+) 1-5% 500ms-1s Qwen-Max

核心思想: 简单日志规则处理,复杂日志才调用大模型,实现成本降低 95%+

3.2.2 Pipeline 处理阶段

阶段 功能描述 智能化能力 处理能力
① 协议解析 多格式协议解析 自动识别格式类型 50,000/s
② Smart Routing 智能路由分发 L1/L2/L3 分层处理 30,000/s
③ 语义映射 字段名标准化 语义理解自动映射 8,000/s
④ 实体关联 跨源实体关联 知识网络图推理 5,000/s
⑤ 上下文富化 上下文补全 自动补全资产/用户 10,000/s
⑥ 智能降噪 告警聚合去重 语义聚合相似告警 20,000/s

3.2.3 向量缓存机制

为减少大模型调用次数,引入向量缓存:

缓存策略 命中率 效果
原始日志向量 首次处理后缓存 相同日志 0ms
解析结果缓存 格式+内容哈希 重复格式快速返回
关联结果缓存 实体关系图缓存 关联查询加速

效果: 大模型实际调用量从 100% 降至 1-5%,成本降低 95%+

3.3 知识网络实体关联

3.3.1 实体类型定义

实体类型 标识字段 关联属性
主机资产 asset_id IP、主机名、操作系统、业务系统
用户身份 user_id 账号、部门、角色、认证方式
网络实体 ip/mac 地理位置、归属组织、风险标签
应用服务 service_id 端口、协议、依赖关系
安全设备 device_id 设备类型、规则版本、告警阈值
威胁情报 ioc_id IOC类型、置信度、关联攻击队

3.3.2 实体关系图谱

graph TB subgraph 实体 H["主机资产\\nHOST-001"] U["用户身份\\nzhangsan"] N["网络实体\\n192.168.1.100"] A["应用服务\\nAPI-Gateway"] D["安全设备\\nWAF-01"] I["威胁情报\\nmalicious.com"] end subgraph 关系 H -->|运行| A U -->|登录| H N -->|属于| U N -->|访问| A D -->|监控| N I -->|解析| N end style H fill:#e3f2fd,stroke:#1565c0 style U fill:#fff3e0,stroke:#e65100 style N fill:#e8f5e9,stroke:#2e7d32 style A fill:#fce4ec,stroke:#c62828 style D fill:#f3e5f5,stroke:#7b1fa2 style I fill:#eceff1,stroke:#455a64

3.3.3 关联能力指标

关联维度 覆盖率 延迟
IP → 用户 90% < 100ms
IP → 资产 95% < 100ms
用户 → 行为 85% < 500ms
跨源关联 85% < 1s

3.4 统一数据模型

3.4.1 事件数据结构

{
  "event": {
    "metadata": {
      "event_id": "evt_20260601_abc123",
      "timestamp": "2026-06-01T12:33:00Z",
      "event_type": "network_connection",
      "source": "waf-01",
      "source_type": "network_device"
    },
    "severity": {
      "level": "high",
      "confidence_score": 0.95,
      "risk_level": "critical"
    },
    "actor": {
      "ip": "192.168.1.100",
      "port": 54321,
      "user": "zhangsan@corp.com",
      "asset_id": "HOST-001",
      "process": "chrome.exe",
      "entity_id": "entity_abc123"
    },
    "target": {
      "ip": "10.0.0.50",
      "port": 443,
      "domain": "evil.com",
      "service": "api-gateway"
    },
    "context": {
      "session_id": "sess_abc123",
      "geo_location": "CN",
      "related_entities": ["entity_xyz789", "entity_def456"],
      "kill_chain_phase": "command_and_control",
      "mitre_attack": ["T1041", "T1071"]
    },
    "enrichment": {
      "asset_info": {
        "asset_type": "server",
        "os": "Linux 4.19",
        "department": "IT",
        "criticality": "high"
      },
      "threat_intel": {
        "threat_actor": "APT29",
        "campaign": "SolarStorm",
        "ttps": ["T1041", "T1071"]
      }
    }
  }
}

3.4.2 字段标准化规范

原始字段 标准字段 数据类型 说明
src_ip / sip / source_ip actor.ip string 源IP
dst_ip / dip / dest_ip target.ip string 目标IP
user / username / account actor.user string 用户名
hostname / host / computer actor.asset_id string 主机标识
timestamp / time / datetime metadata.timestamp datetime 事件时间
severity / level / priority severity.level enum 严重级别

3.5 智能降噪引擎

3.5.1 降噪策略

策略 原理 降噪效果
时间窗口聚合 5分钟内同类告警合并 60% 压缩
攻击链关联 同阶段告警归并 40% 压缩
上下文去重 相同上下文去重 50% 压缩
语义相似聚合 大模型理解语义相似合并 70% 压缩

3.5.2 降噪效果指标

指标 降噪前 降噪后 压缩比
日均告警量 10,000+ < 500 95%
误报率 70%+ < 15% 79%
事件上下文 3个 20+ 个 6.7x

4. 技术实现

4.1 技术选型

组件 技术选型 作用 关键配置
消息队列 Apache Kafka 事件缓冲与分发 3 Broker,副本因子 3
流处理 Apache Flink 实时流计算 100 并行度,Checkpoint
规则引擎 Redis + Lua 高速规则匹配 < 5ms 响应
轻量模型 Qwen2-7B 语义分类与解析 GPU 加速
大模型 Qwen-Max 深度推理与富化 API 调用,缓存优化
图数据库 Neo4j 实体关系存储 知识图谱查询
向量数据库 Milvus 向量相似度检索 大模型结果缓存
搜索引擎 Elasticsearch 日志存储与检索 6 节点集群
对象存储 MinIO/S3 冷数据归档 180 天存储

4.2 高可用架构

graph TB subgraph 采集层 CA1["采集Agent-1"] CA2["采集Agent-2"] CA3["采集Agent-N"] end subgraph 消息层 KB1["Kafka-1"] KB2["Kafka-2"] KB3["Kafka-3"] end subgraph 处理层 FL1["Flink-1"] FL2["Flink-2"] end subgraph 存储层 ES1["ES-1"] ES2["ES-2"] ES3["ES-N"] end CA1 & CA2 & CA3 -->|负载均衡| KB1 & KB2 & KB3 KB1 & KB2 & KB3 -->|流处理| FL1 & FL2 FL1 & FL2 -->|写入| ES1 & ES2 & ES3 style CA1 fill:#e3f2fd,stroke:#1565c0 style KB1 fill:#fff3e0,stroke:#e65100 style FL1 fill:#e8f5e9,stroke:#2e7d32 style ES1 fill:#e1f5fe,stroke:#01579b

4.3 容灾设计

故障场景 影响 应对策略 恢复时间
采集Agent宕机 单源采集中断 自动切换备份Agent < 30s
Kafka Broker故障 消息缓冲中断 副本自动切换 < 10s
Flink任务失败 实时处理中断 Checkpoint 自动恢复 < 1min
ES节点故障 存储可用性下降 副本自动补充 < 5min
大模型服务不可用 富化能力降级 降级为规则处理 自动切换

5. 接口设计

5.1 采集配置接口

POST /api/v1/collection/config
{
  "source_name": "waf-01",
  "source_type": "network_device",
  "collect_method": "syslog",
  "endpoint": "192.168.1.10:514",
  "format": "json",
  "template": "waf_json_v1",
  "enabled": true
}

5.2 事件查询接口

GET /api/v1/events?start_time=2026-06-01T00:00:00Z&end_time=2026-06-02T00:00:00Z&severity=high&limit=100
{
  "total": 1523,
  "events": [
    {
      "event_id": "evt_20260601_abc123",
      "timestamp": "2026-06-01T12:33:00Z",
      "event_type": "network_connection",
      "severity": "high",
      "actor": { "ip": "192.168.1.100", "user": "zhangsan@corp.com" },
      "target": { "ip": "10.0.0.50", "port": 443 }
    }
  ]
}

5.3 实体关系接口

GET /api/v1/entities/{entity_id}/relations?depth=2
{
  "entity_id": "entity_abc123",
  "entity_type": "host",
  "relations": [
    {
      "target": "entity_xyz789",
      "relation": "communicated_with",
      "direction": "outbound",
      "last_seen": "2026-06-01T12:33:00Z"
    }
  ]
}

6. 量化指标

6.1 核心指标达成

指标 当前值 目标值 提升幅度 状态
数据采集覆盖率 60% 95% +35% 🚧 进行中
端到端延迟 5-15min < 10s -99% 🚧 进行中
多源关联自动化率 20% 85% +65% 🚧 进行中
日志存储周期 30天 180天 +150天 ✅ 已达成
新数据源接入时间 2-3 天 < 10min 350x 🚧 进行中
告警误报率 70%+ < 15% 降低 4.7x 🚧 进行中

6.2 性能指标

指标 目标值 峰值能力 状态
采集吞吐量 50,000 events/s 80,000 events/s
Pipeline 处理能力 30,000 events/s 50,000 events/s
查询响应时间 P99 < 1s P99 = 500ms
系统可用性 99.9% 99.99% 🚧

6.3 业务价值

价值维度 传统方案 智能化方案 提升
数据源接入效率 2-3 天/个 < 10 分钟/个 350x
关联分析时间 30min+ < 30s 60x
告警处理效率 100个/人天 500个/人天 5x
存储成本 全量高成本存储 冷热分层存储 -60%

7. 部署架构

7.1 组件部署

组件 部署模式 资源配置 数量
OTEL Agent DaemonSet 0.5CPU/512MB 每节点
主机Agent Host 0.2CPU/256MB 每主机
Kafka Cluster 4CPU/8GB 3节点
Flink Cluster 8CPU/16GB 5节点
Elasticsearch Cluster 8CPU/32GB 6节点
Neo4j Standalone 8CPU/32GB 1节点
Milvus Cluster 8CPU/16GB 3节点

7.2 网络拓扑

graph TB subgraph DMZ FW["防火墙"] end subgraph Core SW["核心交换机"] end subgraph Security Zone KB["Kafka集群"] FL["Flink集群"] ES["ES集群"] end subgraph Data Zone NG["Neo4j"] MV["Milvus"] MN["MinIO"] end FW --> SW --> KB --> FL --> ES FL --> NG & MV ES --> MN style FW fill:#fce4ec,stroke:#c62828 style KB fill:#e3f2fd,stroke:#1565c0 style ES fill:#e1f5fe,stroke:#01579b