Milvus 的 Python Sdk-分布式
项目描述
Milvus Python SDK
用于Milvus的 Python SDK 。要为本项目贡献代码,请先阅读我们的贡献指南。
如需详细的 SDK 文档,请参阅API 文档。
- 新的功能
- 开始使用
- 基本操作
- 创建/删除集合
- 在集合中创建/删除分区
- 在集合中创建/删除索引
- 在集合/分区中插入/删除向量
- 将一个或多个集合中的数据刷新到磁盘
- 压缩集合中的所有段
- 在集合/分区中搜索实体
- 与 Milvus 服务器断开连接
- 常问问题
新的功能
-
remove
get_index_info,索引信息可以通过get_collection_info -
添加
get_collection_stats,更详细的收集统计信息。
开始使用
先决条件
pymilvus 仅支持 Python 3.5 或更高版本。
安装 pymilvus
您可以通过 Python3pip或pip3为 Python3 安装 pymilvus:
$ pip3 install pymilvus
以下合集展示了 Milvus 版本和推荐的 pymilvus 版本:
| Milvus 版本 | 推荐pymilvus版本 |
|---|---|
| 0.3.0 | 0.1.13 |
| 0.3.1 | 0.1.25 |
| 0.4.0 | 0.2.2 |
| 0.5.0 | 0.2.3 |
| 0.5.1 | 0.2.3 |
| 0.5.2 | 0.2.3 |
| 0.5.3 | 0.2.5 |
| 0.6.0 | 0.2.6, 0.2.7 |
| 0.7.0 | 0.2.8 |
| 0.7.1 | 0.2.9 |
| 0.8.0 | 0.2.10 |
| 0.9.0 | 0.2.11 |
| 0.9.1 | 0.2.12 |
| 0.10.0 | 0.2.13 |
| >=0.10.1, <0.11.0 | 0.2.14 |
| 0.11.0 | 0.3.0 |
您可以通过以下方式安装特定版本的 pymilvus:
$ pip install pymilvus==0.3.0
您可以通过以下方式升级pymilvus到最新版本:
$ pip install --upgrade pymilvus
例子
有关更多示例程序,请参阅示例。
基本操作
连接 Milvus 服务器
- 导入pymilvus。
# Import pymilvus
>>> from milvus import Milvus, DataType
- 使用以下方法之一创建 Milvus 服务器的客户端:
# Connect to Milvus server
>>> client = Milvus(host='localhost', port='19530')
注意:在上面的代码中,默认值用于
host和port参数。随意将它们更改为您为 Milvus 服务器设置的 IP 地址和端口。
>>> client = Milvus(uri='tcp://localhost:19530')
创建/删除集合
创建一个集合
- 准备集合参数。
# create collection name
>>> collection_name = 'test01'
# create a collection of 4 fields, fields A, B and C are int type fields
# and Vec is a float vector field.
# segment_row_limit is default as 524288 if not specified
>>> collection_param = {
... "fields": [
... {"name": "A", "type": DataType.INT32},
... {"name": "B", "type": DataType.INT32},
... {"name": "C", "type": DataType.INT64},
... {"name": "Vec", "type": DataType.FLOAT_VECTOR, "params": {"dim": 128}}
... ],
... "segment_row_limit": 4096,
... "auto_id": True
... }
- 创建维度为 128 的集合
test01,Milvus 自动创建索引的数据文件大小为 4096。如果metric_type不提供,则默认度量类型为欧几里德距离(L2)。对于FLOAT_VECTOR领域,dim是必须的。
# Create a collection
>>> client.create_collection(collection_name, collection_param)
- 您可以通过以下方式查看收集信息
get_collection_info
>>> info = client.get_collection_info('test01')
>>> info
{'fields': [
{'name': 'A', 'type': <DataType.INT32: 4>, 'params': {}, 'indexes': [{}]},
{'name': 'C', 'type': <DataType.INT64: 5>, 'params': {}, 'indexes': [{}]},
{'name': 'B', 'type': <DataType.INT32: 4>, 'params': {}, 'indexes': [{}]},
{'name': 'Vec', 'type': <DataType.FLOAT_VECTOR: 101>, 'params': {'dim': 128},
'indexes': [{}]}
],
'auto_id': True,
'segment_row_limit': 4096
}
从信息中可以看出,auto_idcollection info中有一个选项,True默认是这个选项。因此,如果您有自己的 id 并且不想要自动生成的 id,您可能希望
在创建集合时设置auto_id为。False
删除一个集合
# Drop collection
>>> status = client.drop_collection('test01')
>>> status
Status(code=0, message='OK')
在集合中创建/删除分区
创建分区
您可以按分区标签将集合拆分为分区,以提高搜索性能。
# Create partition
>>> client.create_partition(collection_name='test01', partition_tag='tag01')
用于list_partitions()验证是否创建了分区。
# Show partitions
>>> partitions = client.list_partitions(collection_name='test01')
>>> partitions
['_default', 'tag01']
删除分区
# Drop partitions
>>> status = client.drop_partition(collection_name='test01', partition_tag='tag01')
Status(code=0, message='OK')
在集合中创建/删除索引
创建索引
注意:在生产中,建议在将向量插入集合之前创建索引。导入向量时会自动构建索引。但是,在向量插入过程完成后,您需要再次创建相同的索引,因为某些数据文件可能不符合,
index_file_size并且不会为这些数据文件自动建立索引。
IVF_FLAT为nlist = 100集合创建一个索引。
# Create index
>>> status = client.create_index('test01', "Vec", {"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 100}})
>>> status
Status(code=0, message='OK')
- 您可以通过以下方式查看索引信息
get_collection_info
>>> info = client.get_collection_info('test01')
>>> info
{'fields': [
{'name': 'A', 'type': <DataType.INT32: 4>, 'params': {}, 'indexes': [{}]},
{'name': 'C', 'type': <DataType.INT64: 5>, 'params': {}, 'indexes': [{}]},
{'name': 'B', 'type': <DataType.INT32: 4>, 'params': {}, 'indexes': [{}]},
{'name': 'Vec',
'type': <DataType.FLOAT_VECTOR: 101>,
'params': {'dim': 128, 'metric_type': 'L2'},
'indexes': [{'index_type': 'IVF_FLAT', 'metric_type': 'L2', 'params': {'nlist': 100}}]}],
'auto_id': True,
'segment_row_limit': 4096
}
删除索引
# Drop an index of a specific field "Vec"
>>> status = client.drop_index('test01', "Vec")
Status(code=0, message='OK')
在集合/分区中插入/删除实体
在集合中插入实体
- 生成 5000 个 128 维向量和一个整数列表。
>>> import random
>>> num = 5000
# Generate a list of integer.
>>> list_of_int = [random.randint(0, 255) for _ in range(num)]
# Generate 20 vectors of 128 dimension
>>> vectors = [[random.random() for _ in range(128)] for _ in range(num)]
- 创建混合实体
>>> hybrid_entities = [
{"name": "A", "values": list_of_int, "type": DataType.INT32},
{"name": "B", "values": list_of_int, "type": DataType.INT32},
{"name": "C", "values": list_of_int, "type": DataType.INT64},
{"name": "Vec", "values": vectors, "type":DataType.FLOAT_VECTOR}
]
- 插入混合实体。
如果您使用 新建集合auto_id = True,Milvus 会自动为向量生成 ID。
# Insert vectors
>>> ids = client.insert('test01', hybrid_entities)
如果您使用 来创建新集合auto_id = False,则必须提供用户定义的向量 id:
# Generate fake custom ids
>>> vector_ids = [id for id in range(num)]
# Insert to the non-auto-id collection
>>> ids = client.insert('test01', hybrid_entities, ids=vector_ids)
下面的示例基于auto_id = True.
在分区中插入实体
>>> inserted_vector_ids = client.insert('test01', hybrid_entities, partition_tag="tag01")
要验证您插入的实体,请使用get_entity_by_id().
>>> entities = client.get_entity_by_id(collection_name='test01', ids=inserted_vector_ids[:10])
按 ID 删除实体
您可以通过以下方式删除这些实体:
>>> status = client.delete_entity_by_id('test01', ids[:10])
>>> status
Status(code=0, message='OK')
将一个或多个集合中的数据刷新到磁盘
在执行与数据更改相关的操作时,您可以将数据从内存刷新到磁盘,以避免可能的数据丢失。Milvus 还支持自动刷新,它以固定的时间间隔运行,将所有集合中的数据刷新到磁盘。您可以使用Milvus 服务器配置文件来设置间隔。
>>> client.flush(['test01'])
压缩集合中的所有段
段是 Milvus 通过合并插入的向量数据自动创建的数据文件。一个集合可以包含多个段。如果从段中删除了某些向量,则被删除的向量占用的空间不能自动释放。您可以压缩集合中的段以释放空间。
>>> status = client.compact('test01')
>>> status
Status(code=0, message='OK')
在集合/分区中搜索实体
搜索集合中的实体
- 准备搜索参数。
"term"并且"range"是可选的,"params"in"vector"代表索引参数。
# This dsl will search topk `entities` that are
# close to vectors[:1] searched by `IVF_FLAT` index with `nprobe = 10` and `metric_type = L2`,
# AND field "A" in [1, 2, 5],
# AND field "B" greater than 1 less than 100
>>> dsl = {
... "bool": {
... "must":[
... {
... "term": {"A": [1, 2, 5]}
... },
... {
... "range": {"B": {"GT": 1, "LT": 100}}
... },
... {
... "vector": {
... "Vec": {"topk": 10, "query": vectors[:1], "metric_type": "L2", "params": {"nprobe": 10}}
... }
... }
... ]
... }
... }
没有带有IVF_FLAT索引的混合条件的搜索将类似于:
>>> dsl = {
... "bool": {
... "must":[
... {
... "vector": {
... "Vec": {"topk": 10, "query": vectors[:1], "metric_type": "L2", "params": {"nprobe": 10}}
... }
... }
... ]
... }
... }
搜索不需要FLAT索引参数,因此查询类似于:
>>> dsl = {
... "bool": {
... "must":[
... {
... "vector": {
... "Vec": {"topk": 10, "query": vectors[0], "metric_type": "L2"}
... }
... }
... ]
... }
... }
- 搜索实体。
有了fields=["B"],你不仅可以获得实体id和距离,还可以获得一个空间字段B的值。
# search entities and get entity field B back
>>> results = client.search('test01', dsl, fields=["B"])
您可以通过结果中的实体获取 id、距离和字段。
# Results consist of number-of-query entities
>>> entities = results[0]
# Entities consists of topk entity
>>> entity = entities[0]
# You can get all ids and distances by entities
>>> all_ids = entities.ids
>>> all_distances = entities.distances
# Or you can get them one by one by entity
>>> a_id = entity.id
>>> a_distance = entity.distance
>>> a_field = entity.entity.B # getattr(entity.entity, "B")
注意:如果您在搜索中不提供字段,您将只能获得 id 和距离。
搜索分区中的实体
# Search entities in a partition `tag01`
>>> client.search(collection_name='test01', dsl=dsl, partition_tags=['tag01'])
注意:如果不指定
partition_tags,Milvus 会搜索整个集合。
关闭客户端
>>> client.close()
常问问题
从 Gunicorn 上提供的应用程序连接到 Milvus 时,我从 gRPC 收到随机的“非套接字上的套接字操作”错误
确保设置环境变量GRPC_ENABLE_FORK_SUPPORT=1。参考见https://zhuanlan.zhihu.com/p/136619485