MongoDB


MongoDB 是一个基于文档的开源 NoSQL 数据库,使用类似 JSON 的文档模型灵活存储数据,旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。
MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。
https://www.mongodb.com/zh-cn/docs/manual/crud/

MongoDB 概念 类比 MySQL 概念 类比 Elasticsearch 概念 核心解析
文档 (Document) 行 (Row) 文档 (Document) 数据的基本单元,是一个键值对的有序集合。它类似于一条完整的记录。
集合 (Collection) 表 (Table) 索引 (Index) 一组文档的容器。它类似于一张数据表。
数据库 (Database) 数据库 (Database) 无直接对应(可类比为索引的逻辑分组) 最高层的命名空间,用于组织和管理多个集合。一个 MongoDB 实例可以运行多个数据库,每个数据库在文件系统上有独立的文件。
字段 (Field) 列 (Column) 字段 (Field) 文档中的键值对,代表一个数据属性。
  • 关键特性与对比
    1. BSON:MongoDB 的“增强版JSON”
    BSON(Binary JSON)是 MongoDB 用来存储文档和数据交换的二进制格式。您可以把它理解为 JSON 的超级版,它支持 JSON 的所有数据类型,但更强大: 二进制编码:比 JSON 更紧凑,存储和扫描效率更高。更丰富的数据类型:除了字符串、数字、数组、对象外,还支持日期、对象ID(ObjectId)、二进制数据等特定类型。
    2. 灵活的模式(Schema)
    这是 MongoDB 与 MySQL 最根本的区别之一。同一个集合内的文档不需要具有相同的字段集。您可以随时为文档添加新字段,而无需像关系型数据库那样进行复杂的 ALTER TABLE 操作。
  • 如何根据场景选择
    特性 MongoDB MySQL Elasticsearch
    数据模型 文档模型,动态模式 关系模型,固定模式 文档模型,可定义映射
    查询语言 面向对象的 API 方法 标准 SQL 语句 JSON-based DSL
    核心优势 敏捷开发、水平扩展、存储复杂数据结构 复杂查询、事务一致性、数据完整性 全文搜索、日志分析、复杂聚合
    典型场景 内容管理系统、用户画像、实时分析、物联网 金融交易系统、ERP、CRM等需要严格事务的系统 搜索引擎、日志和指标分析、应用内搜索
  • Mongo CRUD汇总
    操作类型 方法示例 功能说明
    创建 (Create) db.hosts.insertOne({hostname: "web-01", ip: "192.168.1.100"}) 向集合中插入一条新文档。若集合不存在,会自动创建。
    批量创建 db.hosts.insertMany([{hostname: "web-01"}, {hostname: "db-01"}]) 批量插入多条文档。
    查询 (Read) db.hosts.find({status: "running"}) 查询所有符合条件的文档。不传参数则返回所有文档。
    条件查询 db.hosts.find({"specs.memory": {$gte: 8}}, {hostname: 1, _id: 0}) 使用查询操作符进行过滤,并使用投影选择返回的字段。
    更新 (Update) db.hosts.updateOne({hostname: "web-01"}, {$set: {"specs.memory": 32}}) 更新一条匹配的文档。$set 操作符用于修改特定字段的值。
    批量更新 db.hosts.updateMany({environment: "production"}, {$inc: {visits: 1}}) 更新所有匹配的文档。$inc 操作符用于将字段的值增加指定数额。
    删除 (Delete) db.hosts.deleteOne({hostname: "web-01"}) 删除一条匹配的文档。
    批量删除 db.hosts.deleteMany({status: "terminated"}) 删除所有匹配的文档。

读写

  • 写入确认:默认情况下,MongoDB 的写入操作是异步的。如果你的应用需要确保数据已经安全写入,可以设置 写关注(Write Concern)来控制确认级别。
  • 单文档原子性:所有 CRUD 操作在 单个文档 级别是原子的。例如,一条文档的更新操作要么完全成功,要么完全失败,不会出现只更新一半字段的情况。需要注意的是,insertMany 这样的多文档操作,默认情况下并非一个整体事务。
  • 比较操作符:$gt (大于), $gte (大于等于), $lt (小于), $lte (小于等于), $ne (不等于)。
    逻辑操作符:$and, $or, $in (匹配数组中任意值), $nin (不匹配数组中任何值)。
  • 关键操作:
    1
    2
    3
    4
    5
    6
    7
    // 创建
    db.hosts.insertOne({
    hostname: "web-01",
    ip: "192.168.1.100",
    specs: { cpu: 8, memory: 16 }, // 嵌入式文档
    tags: ["web", "production"]
    })
    1
    2
    3
    4
    5
    // 查询与投影
    db.hosts.find(
    { "specs.memory": { $gte: 8 }, "status": "running" },
    { hostname: 1, ip: 1, _id: 0 } // 投影:只返回指定字段
    )
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 更新操作符
    db.hosts.updateOne(
    { hostname: "web-01" },
    {
    $set: { "specs.memory": 32 }, // 设置字段
    $push: { tags: "upgraded" }, // 数组追加
    $inc: { "stats.visits": 1 } // 数值递增
    }
    )
  • 复杂查询模式:
    1
    2
    3
    4
    5
    6
    7
    // 逻辑组合查询
    db.hosts.find({
    $or: [ // $or逻辑操作符用于筛选出符合任意一组给定条件的文档
    { "environment": "production", "status": "running" }, // 每个条件组内部使用隐式的AND逻辑,需同时满足
    { "criticality": "high", "last_backup": { $gte: yesterday } } // $gte是“大于或等于”的比较操作符
    ]
    })
    1
    2
    3
    4
    5
    // 数组查询
    db.hosts.find({
    "tags": { $all: ["web", "load_balancer"] }, // 包含所有指定标签
    "ips": { $size: 3 } // 数组长度匹配
    })
    1
    2
    3
    4
    // 正则表达式与文本搜索
    db.hosts.find({
    "hostname": { $regex: "^web-.*", $options: "i" }
    })

聚合框架

  • 聚合和管道是 MongoDB 最强大、最独特的特性之一。核心概念用工厂流水线来理解:想象一下数据处理就像一条工厂装配线
    • 普通查询:像是在成品仓库里筛选符合条件的产品。例如:“给我找出所有红色的汽车。” 你得到的是原始产品列表。
    • 聚合管道:像是把原材料送进一条多阶段的加工流水线,每个工位(阶段)都对数据进行特定处理,最终产出全新的、经过深度加工的产品。例如:原材料 → 切割 → 焊接 → 喷漆 → 组装 → 全新的汽车模型+统计报告。
  • 目的:对数据进行多阶段转换、分组、计算,生成全新的数据结构。
  • 详细对比解析
    1. 普通查询:检索和筛选文档。你得到的是原始数据。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 场景:找出所有生产环境的Linux主机
      db.hosts.find({
      environment: "production",
      "os.type": "Linux"
      })

      // 返回结果:原始文档数组
      [
      {hostname: "web-01", environment: "production", os: {type: "Linux"}, memory_gb: 16, ...},
      {hostname: "db-01", environment: "production", os: {type: "Linux"}, memory_gb: 32, ...},
      // ... 直接返回所有匹配的文档
      ]
    2. 复杂查询:在检索时进行一些简单计算或逻辑处理。
      1
      2
      3
      4
      5
      6
      7
      // 场景:找出内存大于8G的生产环境主机,按内存降序排列,只显示主机名和内存
      db.hosts.find(
      { environment: "production", memory_gb: { $gt: 8 } },
      { hostname: 1, memory_gb: 1, _id: 0 } // 投影:选择字段
      ).sort({ memory_gb: -1 }).limit(10)

      // 返回结果:经过筛选和排序的文档,但结构基本不变
    3. 聚合管道 (Aggregation Pipeline) :对数据进行多阶段转换、分组、计算,生成全新的数据结构
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      // 目标:查询生产环境的应用,并获取其运行主机的详细信息
      db.applications.aggregate([ // 在'applications'集合上执行聚合管道
      // 阶段 1: $match - 筛选文档
      {
      $match: { // 过滤条件:只处理符合以下条件的应用文档
      "environment": "production", // 条件1: 环境为'production'
      "status": "active" // 条件2: 状态为'active'
      }
      },

      // 阶段 2: $lookup - 关联查询(连表查询)
      {
      $lookup: { // 从左集合(applications)关联到右集合(hosts)
      from: "hosts", // 要关联的集合名称:'hosts'
      localField: "app_name", // 左集合(applications)中的关联字段:应用名称
      foreignField: "running_apps", // 右集合(hosts)中的关联字段:运行的应用列表
      as: "host_details" // 将关联匹配到的主机文档存入此新字段(是一个数组)
      }
      },

      // 阶段 3: $unwind - 展开数组字段
      {
      $unwind: "$host_details" // 将`host_details`数组拆分为多条独立的文档,每条包含一个应用和一个主机
      },

      // 阶段 4: $project - 重塑文档结构
      {
      $project: { // 定义输出文档的字段
      "_id": 0, // 不显示原始的_id字段(0为排除)
      "application": "$app_name", // 将app_name重命名为application
      "team": "$owner_team", // 显示负责团队
      "criticality": 1, // 显示应用关键程度(1为包含)
      // 从关联的主机文档中提取信息
      "hostname": "$host_details.hostname", // 显示主机名
      "ip_address": "$host_details.ip_address", // 显示IP地址
      "cpu_cores": "$host_details.specs.cpu_cores", // 显示CPU核心数
      "memory_gb": "$host_details.specs.memory_gb", // 显示内存大小
      "disk_gb": "$host_details.specs.disk_gb" // 显示磁盘大小
      }
      },

      // 阶段 5: $sort - 结果排序
      {
      $sort: { // 排序条件
      "team": 1, // 首先按团队名称升序排列 (1: 升序, -1: 降序)
      "criticality": -1, // 其次按应用关键程度降序排列 (high比medium优先级高)
      "memory_gb": -1 // 关键程度相同时,按主机内存降序排列
      }
      }
      ])
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      // 应用文档示例
      {
      "_id": ObjectId("67a1b2c3d4e5f6a1b2c3d4e5"),
      "app_name": "订单服务",
      "environment": "production", // 环境: production, staging, development
      "owner_team": "order-team",
      "criticality": "high"
      }
      // 主机文档示例
      {
      "_id": ObjectId("77a1b2c3d4e5f6a1b2c3d4e5"),
      "hostname": "prod-order-01",
      "ip_address": "192.168.1.100",
      "environment": "production",
      "specs": {
      "cpu_cores": 8,
      "memory_gb": 32,
      "disk_gb": 500
      },
      "running_apps": ["订单服务", "用户服务"] // 在此主机上运行的应用名称数组
      }
      // 返回结果:完全重新组织的数据
      [
      {
      "application": "订单服务",
      "team": "order-team",
      "criticality": "high",
      "hostname": "prod-order-01",
      "ip_address": "192.168.1.100",
      "cpu_cores": 8,
      "memory_gb": 32,
      "disk_gb": 500
      },
      {
      "application": "支付网关",
      "team": "payment-team",
      "criticality": "critical",
      "hostname": "prod-payment-01",
      "ip_address": "192.168.1.101",
      "cpu_cores": 16,
      "memory_gb": 64,
      "disk_gb": 1000
      }
      // ... 其他结果
      ]
    4. CMDB 应用场景
      问题1:”显示每个团队负责的主机数量,并按环境分别统计”
      1
      2
      3
      4
      5
      6
      7
      8
      db.hosts.aggregate([
      { $match: { status: "running" } },
      { $group: {
      _id: { team: "$owner_team", env: "$environment" },
      host_count: { $sum: 1 }
      }},
      { $sort: { host_count: -1 } }
      ])
      问题2:”找出生产环境中,哪些应用依赖的主机平均内存使用率超过80%”
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      db.applications.aggregate([
      { $match: { environment: "production" } },
      { $lookup: {
      from: "hosts",
      localField: "dependent_hosts",
      foreignField: "hostname",
      as: "host_details"
      }},
      { $project: {
      app_name: 1,
      avg_memory_usage: { $avg: "$host_details.memory_usage_percent" }
      }},
      { $match: { avg_memory_usage: { $gt: 80 } } }
      ])

索引策略

  1. 索引类型与应用场景:
  • 单字段索引:db.hosts.createIndex({ hostname: 1 })
  • 复合索引:db.hosts.createIndex({ environment: 1, status: 1 })
  • 多键索引:数组字段索引 db.hosts.createIndex({ tags: 1 })
  • 文本索引:全文搜索 db.hosts.createIndex({ description: "text" })
  • TTL索引:自动过期数据 db.logs.createIndex({ createdAt: 1 }, { expireAfterSeconds: 3600 })
  1. 索引优化原则:
  • ESR规则:Equality(等值) → Sort(排序) → Range(范围)
  • 覆盖查询:索引包含所有查询字段,避免回表
  • 索引交集:MongoDB可自动组合多个索引

高可用与扩展

1. 复制集(Replica Set)深度

  • 架构组成
    1
    2
    3
    4
    5
    Primary(主节点)← 读写操作
    ↓ 数据同步
    Secondary-1(从节点)← 只读查询
    Secondary-2(从节点)← 只读查询
    Arbiter(仲裁节点)← 仅参与选举
  • 配置示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 复制集配置
    cfg = {
    _id: "cmdb-rs",
    members: [
    {_id: 0, host: "mongo1:27017", priority: 2},
    {_id: 1, host: "mongo2:27017", priority: 1},
    {_id: 2, host: "mongo3:27017", priority: 1, arbiterOnly: true}
    ]
    }
    rs.initiate(cfg)
  • 读写关注
    1
    2
    3
    4
    5
    6
    7
    // 强一致性写操作
    db.hosts.insertOne(doc, {
    writeConcern: { w: "majority", wtimeout: 5000 }
    })

    // 从节点读取(最终一致性)
    db.hosts.find().readPref("secondary")

2. 分片集群(Sharded Cluster)

  • 分片策略选择
    • 基于范围分片:适合范围查询,可能产生热点
    • 基于哈希分片:数据分布均匀,范围查询效率低
    • 区域分片:基于标签的智能数据分布
  • 分片键选择考量
    1
    2
    3
    4
    5
    // 好的分片键:基数高、分布均匀
    sh.shardCollection("cmdb.hosts", { "tenant_id": 1, "hostname": 1 })

    // 避免的选择:低基数、单调递增
    // sh.shardCollection("cmdb.hosts", { "created_date": 1 }) // 不好!

CMDB助手查询工具设计(基于MCP)

  • MCP工具架构设计
    1
    自然语言查询 → MCP Server → 查询解析器 → MongoDB查询生成器 → 结果格式化
  • MCP Server 核心代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    from mcp import MCPServer, Context
    import pymongo
    from typing import Dict, List, Any

    class CMDBAgentMcpServer:
    def __init__(self, mongo_uri: str):
    self.client = pymongo.MongoClient(mongo_uri)
    self.db = self.client.cmdb
    self.setup_tools()

    @mcp_tool(
    name="query_hosts",
    description="根据条件查询主机信息",
    args_schema={
    "type": "object",
    "properties": {
    "environment": {"type": "string", "enum": ["production", "staging", "development"]},
    "os_type": {"type": "string"},
    "min_memory": {"type": "integer"},
    "status": {"type": "string", "enum": ["running", "stopped", "maintenance"]},
    "tags": {"type": "array", "items": {"type": "string"}},
    "limit": {"type": "integer", "default": 10}
    }
    }
    )
    async def query_hosts(self, ctx: Context, **filters) -> List[Dict]:
    """智能主机查询工具"""
    # 构建MongoDB查询
    query = self._build_host_query(filters)

    # 执行查询
    cursor = self.db.hosts.find(query["filter"], query["projection"])
    cursor = cursor.sort(query["sort"]).limit(query["limit"])

    results = list(cursor)
    return self._format_host_results(results)

    def _build_host_query(self, filters: Dict) -> Dict:
    """将自然语言参数转换为MongoDB查询"""
    query_filter = {}

    # 环境过滤
    if filters.get("environment"):
    query_filter["environment"] = filters["environment"]

    # 操作系统过滤(支持模糊匹配)
    if filters.get("os_type"):
    query_filter["os.type"] = {"$regex": filters["os_type"], "$options": "i"}

    # 内存条件
    if filters.get("min_memory"):
    query_filter["specs.memory_gb"] = {"$gte": filters["min_memory"]}

    # 状态过滤
    if filters.get("status"):
    query_filter["status"] = filters["status"]

    # 标签查询
    if filters.get("tags"):
    query_filter["tags"] = {"$all": filters["tags"]}

    return {
    "filter": query_filter,
    "projection": {"_id": 0, "hostname": 1, "ip": 1, "environment": 1, "os": 1, "specs": 1},
    "sort": [("last_seen", -1)],
    "limit": filters.get("limit", 10)
    }

    @mcp_tool(
    name="analyze_infrastructure",
    description="基础设施统计分析",
    args_schema={
    "type": "object",
    "properties": {
    "analysis_type": {"type": "string", "enum": ["environment", "os", "team"]},
    "metric": {"type": "string", "enum": ["count", "memory", "cpu"]}
    }
    }
    )
    async def analyze_infrastructure(self, ctx: Context, analysis_type: str, metric: str = "count"):
    """聚合分析工具"""
    pipeline = self._build_analysis_pipeline(analysis_type, metric)
    results = list(self.db.hosts.aggregate(pipeline))
    return self._format_analysis_results(results, analysis_type, metric)

    # 使用示例
    server = CMDBAgentMcpServer("mongodb://localhost:27017/cmdb")
  • Agent提示词设计
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    你是一个CMDB专家,可以查询和分析基础设施信息。

    可用工具:
    1. query_hosts - 查询主机信息,支持按环境、操作系统、内存等条件过滤
    2. analyze_infrastructure - 统计分析基础设施资源分布

    示例交互:
    用户:列出生产环境中所有运行CentOS的主机
    AI:我将使用query_hosts工具查询...
    工具调用:query_hosts(environment="production", os_type="CentOS")

    用户:按环境统计主机数量
    AI:使用analyze_infrastructure进行分析...
    工具调用:analyze_infrastructure(analysis_type="environment", metric="count")
  • 高级查询特性支持
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    # 支持复杂查询的扩展工具
    @mcp_tool(
    name="advanced_host_search",
    description="高级主机搜索,支持复杂条件组合"
    )
    async def advanced_search(self, ctx: Context, search_expression: str):
    """支持自然语言复杂查询"""
    # 使用LLM解析复杂查询条件
    parsed_query = await self.llm_parse_search(search_expression)
    return await self.execute_advanced_query(parsed_query)

    # 关系查询工具
    @mcp_tool(
    name="find_related_assets",
    description="查找相关资产和依赖关系"
    )
    async def find_related_assets(self, ctx: Context, hostname: str, relation_type: str):
    """查询资产关系"""
    pipeline = [
    {"$match": {"hostname": hostname}},
    {"$graphLookup": { # 使用图查询查找关系
    "from": "asset_relations",
    "startWith": "$_id",
    "connectFromField": "from_asset",
    "connectToField": "to_asset",
    "as": "relationships"
    }}
    ]
    return list(self.db.hosts.aggregate(pipeline))