MongoDB 是一个基于文档的开源 NoSQL 数据库,使用类似 JSON 的文档模型灵活存储数据,旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。
MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。
https://www.mongodb.com/zh-cn/docs/manual/crud/
| MongoDB 概念 | 类比 MySQL 概念 | 类比 Elasticsearch 概念 | 核心解析 |
|---|---|---|---|
| 文档 (Document) | 行 (Row) | 文档 (Document) | 数据的基本单元,是一个键值对的有序集合。它类似于一条完整的记录。 |
| 集合 (Collection) | 表 (Table) | 索引 (Index) | 一组文档的容器。它类似于一张数据表。 |
| 数据库 (Database) | 数据库 (Database) | 无直接对应(可类比为索引的逻辑分组) | 最高层的命名空间,用于组织和管理多个集合。一个 MongoDB 实例可以运行多个数据库,每个数据库在文件系统上有独立的文件。 |
| 字段 (Field) | 列 (Column) | 字段 (Field) | 文档中的键值对,代表一个数据属性。 |
- 关键特性与对比
1. BSON:MongoDB 的“增强版JSON”
BSON(Binary JSON)是 MongoDB 用来存储文档和数据交换的二进制格式。您可以把它理解为 JSON 的超级版,它支持 JSON 的所有数据类型,但更强大: 二进制编码:比 JSON 更紧凑,存储和扫描效率更高。更丰富的数据类型:除了字符串、数字、数组、对象外,还支持日期、对象ID(ObjectId)、二进制数据等特定类型。
2. 灵活的模式(Schema)
这是 MongoDB 与 MySQL 最根本的区别之一。同一个集合内的文档不需要具有相同的字段集。您可以随时为文档添加新字段,而无需像关系型数据库那样进行复杂的ALTER TABLE操作。 - 如何根据场景选择
特性 MongoDB MySQL Elasticsearch 数据模型 文档模型,动态模式 关系模型,固定模式 文档模型,可定义映射 查询语言 面向对象的 API 方法 标准 SQL 语句 JSON-based DSL 核心优势 敏捷开发、水平扩展、存储复杂数据结构 复杂查询、事务一致性、数据完整性 全文搜索、日志分析、复杂聚合 典型场景 内容管理系统、用户画像、实时分析、物联网 金融交易系统、ERP、CRM等需要严格事务的系统 搜索引擎、日志和指标分析、应用内搜索 - Mongo CRUD汇总
操作类型 方法示例 功能说明 创建 (Create) db.hosts.insertOne({hostname: "web-01", ip: "192.168.1.100"})向集合中插入一条新文档。若集合不存在,会自动创建。 批量创建 db.hosts.insertMany([{hostname: "web-01"}, {hostname: "db-01"}])批量插入多条文档。 查询 (Read) db.hosts.find({status: "running"})查询所有符合条件的文档。不传参数则返回所有文档。 条件查询 db.hosts.find({"specs.memory": {$gte: 8}}, {hostname: 1, _id: 0})使用查询操作符进行过滤,并使用投影选择返回的字段。 更新 (Update) db.hosts.updateOne({hostname: "web-01"}, {$set: {"specs.memory": 32}})更新一条匹配的文档。 $set操作符用于修改特定字段的值。批量更新 db.hosts.updateMany({environment: "production"}, {$inc: {visits: 1}})更新所有匹配的文档。 $inc操作符用于将字段的值增加指定数额。删除 (Delete) db.hosts.deleteOne({hostname: "web-01"})删除一条匹配的文档。 批量删除 db.hosts.deleteMany({status: "terminated"})删除所有匹配的文档。
读写
- 写入确认:默认情况下,MongoDB 的写入操作是异步的。如果你的应用需要确保数据已经安全写入,可以设置 写关注(Write Concern)来控制确认级别。
- 单文档原子性:所有 CRUD 操作在 单个文档 级别是原子的。例如,一条文档的更新操作要么完全成功,要么完全失败,不会出现只更新一半字段的情况。需要注意的是,
insertMany这样的多文档操作,默认情况下并非一个整体事务。 - 比较操作符:
$gt(大于),$gte(大于等于),$lt(小于),$lte(小于等于),$ne(不等于)。
逻辑操作符:$and,$or,$in(匹配数组中任意值),$nin(不匹配数组中任何值)。 - 关键操作:
1
2
3
4
5
6
7// 创建
db.hosts.insertOne({
hostname: "web-01",
ip: "192.168.1.100",
specs: { cpu: 8, memory: 16 }, // 嵌入式文档
tags: ["web", "production"]
})1
2
3
4
5// 查询与投影
db.hosts.find(
{ "specs.memory": { $gte: 8 }, "status": "running" },
{ hostname: 1, ip: 1, _id: 0 } // 投影:只返回指定字段
)1
2
3
4
5
6
7
8
9// 更新操作符
db.hosts.updateOne(
{ hostname: "web-01" },
{
$set: { "specs.memory": 32 }, // 设置字段
$push: { tags: "upgraded" }, // 数组追加
$inc: { "stats.visits": 1 } // 数值递增
}
) - 复杂查询模式:
1
2
3
4
5
6
7// 逻辑组合查询
db.hosts.find({
$or: [ // $or逻辑操作符用于筛选出符合任意一组给定条件的文档
{ "environment": "production", "status": "running" }, // 每个条件组内部使用隐式的AND逻辑,需同时满足
{ "criticality": "high", "last_backup": { $gte: yesterday } } // $gte是“大于或等于”的比较操作符
]
})1
2
3
4
5// 数组查询
db.hosts.find({
"tags": { $all: ["web", "load_balancer"] }, // 包含所有指定标签
"ips": { $size: 3 } // 数组长度匹配
})1
2
3
4// 正则表达式与文本搜索
db.hosts.find({
"hostname": { $regex: "^web-.*", $options: "i" }
})
聚合框架
- 聚合和管道是 MongoDB 最强大、最独特的特性之一。核心概念用工厂流水线来理解:想象一下数据处理就像一条工厂装配线
- 普通查询:像是在成品仓库里筛选符合条件的产品。例如:“给我找出所有红色的汽车。” 你得到的是原始产品列表。
- 聚合管道:像是把原材料送进一条多阶段的加工流水线,每个工位(阶段)都对数据进行特定处理,最终产出全新的、经过深度加工的产品。例如:原材料 → 切割 → 焊接 → 喷漆 → 组装 → 全新的汽车模型+统计报告。
- 目的:对数据进行多阶段转换、分组、计算,生成全新的数据结构。
- 详细对比解析
- 普通查询:检索和筛选文档。你得到的是原始数据。
1
2
3
4
5
6
7
8
9
10
11
12// 场景:找出所有生产环境的Linux主机
db.hosts.find({
environment: "production",
"os.type": "Linux"
})
// 返回结果:原始文档数组
[
{hostname: "web-01", environment: "production", os: {type: "Linux"}, memory_gb: 16, ...},
{hostname: "db-01", environment: "production", os: {type: "Linux"}, memory_gb: 32, ...},
// ... 直接返回所有匹配的文档
] - 复杂查询:在检索时进行一些简单计算或逻辑处理。
1
2
3
4
5
6
7// 场景:找出内存大于8G的生产环境主机,按内存降序排列,只显示主机名和内存
db.hosts.find(
{ environment: "production", memory_gb: { $gt: 8 } },
{ hostname: 1, memory_gb: 1, _id: 0 } // 投影:选择字段
).sort({ memory_gb: -1 }).limit(10)
// 返回结果:经过筛选和排序的文档,但结构基本不变 - 聚合管道 (Aggregation Pipeline) :对数据进行多阶段转换、分组、计算,生成全新的数据结构。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50// 目标:查询生产环境的应用,并获取其运行主机的详细信息
db.applications.aggregate([ // 在'applications'集合上执行聚合管道
// 阶段 1: $match - 筛选文档
{
$match: { // 过滤条件:只处理符合以下条件的应用文档
"environment": "production", // 条件1: 环境为'production'
"status": "active" // 条件2: 状态为'active'
}
},
// 阶段 2: $lookup - 关联查询(连表查询)
{
$lookup: { // 从左集合(applications)关联到右集合(hosts)
from: "hosts", // 要关联的集合名称:'hosts'
localField: "app_name", // 左集合(applications)中的关联字段:应用名称
foreignField: "running_apps", // 右集合(hosts)中的关联字段:运行的应用列表
as: "host_details" // 将关联匹配到的主机文档存入此新字段(是一个数组)
}
},
// 阶段 3: $unwind - 展开数组字段
{
$unwind: "$host_details" // 将`host_details`数组拆分为多条独立的文档,每条包含一个应用和一个主机
},
// 阶段 4: $project - 重塑文档结构
{
$project: { // 定义输出文档的字段
"_id": 0, // 不显示原始的_id字段(0为排除)
"application": "$app_name", // 将app_name重命名为application
"team": "$owner_team", // 显示负责团队
"criticality": 1, // 显示应用关键程度(1为包含)
// 从关联的主机文档中提取信息
"hostname": "$host_details.hostname", // 显示主机名
"ip_address": "$host_details.ip_address", // 显示IP地址
"cpu_cores": "$host_details.specs.cpu_cores", // 显示CPU核心数
"memory_gb": "$host_details.specs.memory_gb", // 显示内存大小
"disk_gb": "$host_details.specs.disk_gb" // 显示磁盘大小
}
},
// 阶段 5: $sort - 结果排序
{
$sort: { // 排序条件
"team": 1, // 首先按团队名称升序排列 (1: 升序, -1: 降序)
"criticality": -1, // 其次按应用关键程度降序排列 (high比medium优先级高)
"memory_gb": -1 // 关键程度相同时,按主机内存降序排列
}
}
])1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45// 应用文档示例
{
"_id": ObjectId("67a1b2c3d4e5f6a1b2c3d4e5"),
"app_name": "订单服务",
"environment": "production", // 环境: production, staging, development
"owner_team": "order-team",
"criticality": "high"
}
// 主机文档示例
{
"_id": ObjectId("77a1b2c3d4e5f6a1b2c3d4e5"),
"hostname": "prod-order-01",
"ip_address": "192.168.1.100",
"environment": "production",
"specs": {
"cpu_cores": 8,
"memory_gb": 32,
"disk_gb": 500
},
"running_apps": ["订单服务", "用户服务"] // 在此主机上运行的应用名称数组
}
// 返回结果:完全重新组织的数据
[
{
"application": "订单服务",
"team": "order-team",
"criticality": "high",
"hostname": "prod-order-01",
"ip_address": "192.168.1.100",
"cpu_cores": 8,
"memory_gb": 32,
"disk_gb": 500
},
{
"application": "支付网关",
"team": "payment-team",
"criticality": "critical",
"hostname": "prod-payment-01",
"ip_address": "192.168.1.101",
"cpu_cores": 16,
"memory_gb": 64,
"disk_gb": 1000
}
// ... 其他结果
] - CMDB 应用场景
问题1:”显示每个团队负责的主机数量,并按环境分别统计”问题2:”找出生产环境中,哪些应用依赖的主机平均内存使用率超过80%”1
2
3
4
5
6
7
8db.hosts.aggregate([
{ $match: { status: "running" } },
{ $group: {
_id: { team: "$owner_team", env: "$environment" },
host_count: { $sum: 1 }
}},
{ $sort: { host_count: -1 } }
])1
2
3
4
5
6
7
8
9
10
11
12
13
14db.applications.aggregate([
{ $match: { environment: "production" } },
{ $lookup: {
from: "hosts",
localField: "dependent_hosts",
foreignField: "hostname",
as: "host_details"
}},
{ $project: {
app_name: 1,
avg_memory_usage: { $avg: "$host_details.memory_usage_percent" }
}},
{ $match: { avg_memory_usage: { $gt: 80 } } }
])
- 普通查询:检索和筛选文档。你得到的是原始数据。
索引策略
- 索引类型与应用场景:
- 单字段索引:
db.hosts.createIndex({ hostname: 1 }) - 复合索引:
db.hosts.createIndex({ environment: 1, status: 1 }) - 多键索引:数组字段索引
db.hosts.createIndex({ tags: 1 }) - 文本索引:全文搜索
db.hosts.createIndex({ description: "text" }) - TTL索引:自动过期数据
db.logs.createIndex({ createdAt: 1 }, { expireAfterSeconds: 3600 })
- 索引优化原则:
- ESR规则:Equality(等值) → Sort(排序) → Range(范围)
- 覆盖查询:索引包含所有查询字段,避免回表
- 索引交集:MongoDB可自动组合多个索引
高可用与扩展
1. 复制集(Replica Set)深度
- 架构组成:
1
2
3
4
5Primary(主节点)← 读写操作
↓ 数据同步
Secondary-1(从节点)← 只读查询
Secondary-2(从节点)← 只读查询
Arbiter(仲裁节点)← 仅参与选举 - 配置示例:
1
2
3
4
5
6
7
8
9
10// 复制集配置
cfg = {
_id: "cmdb-rs",
members: [
{_id: 0, host: "mongo1:27017", priority: 2},
{_id: 1, host: "mongo2:27017", priority: 1},
{_id: 2, host: "mongo3:27017", priority: 1, arbiterOnly: true}
]
}
rs.initiate(cfg) - 读写关注:
1
2
3
4
5
6
7// 强一致性写操作
db.hosts.insertOne(doc, {
writeConcern: { w: "majority", wtimeout: 5000 }
})
// 从节点读取(最终一致性)
db.hosts.find().readPref("secondary")
2. 分片集群(Sharded Cluster)
- 分片策略选择:
- 基于范围分片:适合范围查询,可能产生热点
- 基于哈希分片:数据分布均匀,范围查询效率低
- 区域分片:基于标签的智能数据分布
- 分片键选择考量:
1
2
3
4
5// 好的分片键:基数高、分布均匀
sh.shardCollection("cmdb.hosts", { "tenant_id": 1, "hostname": 1 })
// 避免的选择:低基数、单调递增
// sh.shardCollection("cmdb.hosts", { "created_date": 1 }) // 不好!
CMDB助手查询工具设计(基于MCP)
- MCP工具架构设计
1
自然语言查询 → MCP Server → 查询解析器 → MongoDB查询生成器 → 结果格式化
- MCP Server 核心代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87from mcp import MCPServer, Context
import pymongo
from typing import Dict, List, Any
class CMDBAgentMcpServer:
def __init__(self, mongo_uri: str):
self.client = pymongo.MongoClient(mongo_uri)
self.db = self.client.cmdb
self.setup_tools()
)
async def query_hosts(self, ctx: Context, **filters) -> List[Dict]:
"""智能主机查询工具"""
# 构建MongoDB查询
query = self._build_host_query(filters)
# 执行查询
cursor = self.db.hosts.find(query["filter"], query["projection"])
cursor = cursor.sort(query["sort"]).limit(query["limit"])
results = list(cursor)
return self._format_host_results(results)
def _build_host_query(self, filters: Dict) -> Dict:
"""将自然语言参数转换为MongoDB查询"""
query_filter = {}
# 环境过滤
if filters.get("environment"):
query_filter["environment"] = filters["environment"]
# 操作系统过滤(支持模糊匹配)
if filters.get("os_type"):
query_filter["os.type"] = {"$regex": filters["os_type"], "$options": "i"}
# 内存条件
if filters.get("min_memory"):
query_filter["specs.memory_gb"] = {"$gte": filters["min_memory"]}
# 状态过滤
if filters.get("status"):
query_filter["status"] = filters["status"]
# 标签查询
if filters.get("tags"):
query_filter["tags"] = {"$all": filters["tags"]}
return {
"filter": query_filter,
"projection": {"_id": 0, "hostname": 1, "ip": 1, "environment": 1, "os": 1, "specs": 1},
"sort": [("last_seen", -1)],
"limit": filters.get("limit", 10)
}
)
async def analyze_infrastructure(self, ctx: Context, analysis_type: str, metric: str = "count"):
"""聚合分析工具"""
pipeline = self._build_analysis_pipeline(analysis_type, metric)
results = list(self.db.hosts.aggregate(pipeline))
return self._format_analysis_results(results, analysis_type, metric)
# 使用示例
server = CMDBAgentMcpServer("mongodb://localhost:27017/cmdb") - Agent提示词设计:
1
2
3
4
5
6
7
8
9
10
11
12
13
14你是一个CMDB专家,可以查询和分析基础设施信息。
可用工具:
1. query_hosts - 查询主机信息,支持按环境、操作系统、内存等条件过滤
2. analyze_infrastructure - 统计分析基础设施资源分布
示例交互:
用户:列出生产环境中所有运行CentOS的主机
AI:我将使用query_hosts工具查询...
工具调用:query_hosts(environment="production", os_type="CentOS")
用户:按环境统计主机数量
AI:使用analyze_infrastructure进行分析...
工具调用:analyze_infrastructure(analysis_type="environment", metric="count") - 高级查询特性支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29# 支持复杂查询的扩展工具
)
async def advanced_search(self, ctx: Context, search_expression: str):
"""支持自然语言复杂查询"""
# 使用LLM解析复杂查询条件
parsed_query = await self.llm_parse_search(search_expression)
return await self.execute_advanced_query(parsed_query)
# 关系查询工具
)
async def find_related_assets(self, ctx: Context, hostname: str, relation_type: str):
"""查询资产关系"""
pipeline = [
{"$match": {"hostname": hostname}},
{"$graphLookup": { # 使用图查询查找关系
"from": "asset_relations",
"startWith": "$_id",
"connectFromField": "from_asset",
"connectToField": "to_asset",
"as": "relationships"
}}
]
return list(self.db.hosts.aggregate(pipeline))