千家信息网

怎么使用enrich processor

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"怎么使用enrich processor",在日常操作中,相信很多人在怎么使用enrich processor问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家
千家信息网最后更新 2025年12月03日怎么使用enrich processor

这篇文章主要介绍"怎么使用enrich processor",在日常操作中,相信很多人在怎么使用enrich processor问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"怎么使用enrich processor"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

enrich processor 简介

ingest pipeline 可以在传入的文档被索引之前,对文档进行预处理,通过 processor 中定义的一系列规则来修改文档的内容(例如大小写转换等)。

在 Elasticsearch 7.5 版本引入了 enrich processor,可以将现有索引(source index)中的数据添加到传入的文档(incoming document)中。

比如,你可以在如下的场景中用到:

  • 根据已知的 IP 地址识别 Web 服务或供应商。

  • 根据产品 ID 将产品信息添加到零售订单中。

  • 根据电子邮件地址补充联系信息。

  • 根据用户坐标添加邮政编码。

使用 enrich processor

使用 enrich processor 有如下几个步骤:

  • 1.添加 enrich data:添加 document (enrich data)到一个或者多个的 source index 中,这些 document 中应包含之后要添加到 incoming documents 中的数据。

  • 2.创建 enrich policy:enrich policy 中应至少包含如下参数:

    • 指定source index的。

    • 指定 incoming documents 和 source index 用于匹配的属性。

    • 指定要添加到 incoming documents 中的属性。

  • 3.执行 enrich policy:执行完后会自动创建相应的 enrich index, enrich index 和普通索引不同,进行了优化。

  • 4.在 ingest pipeline 使用 enrich processor:enrich processor 使用 enrich index 来查询。

背景说明

source index 的内容如下:

locnumcompany
广东省A1001腾讯
上海市B1001Bilibili
浙江省C1001阿里巴巴

incoming document 传入的文档如下,通过 num 字段查到对应 source index 中的 loc 的值,添加到 incoming document 中新增 enrich_loc 属性中。

numcompany
A1001腾讯
B1001Bilibili
C1001阿里巴巴

第一步:添加 enrich data

通过 _bulk API 批量添加文档到 location 索引,这些文档和普通的文档一样。

POST _bulk{"index": {"_index":"location"}}{"loc":"广东省","company":"腾讯","num":"A1001"}{"index": {"_index":"location"}}{"loc":"上海市","company":"Bilibili","num":"B1001"}{"index": {"_index":"location"}}{"loc":"浙江省","company":"阿里巴巴","num":"C1001"}

第二步:创建 enrich policy

enrich policy 一旦创建,就不能更新或者修改。

PUT /_enrich/policy/my-policy{  "match": {    "indices": "location",  #source index 索引名,就是前面创建的 enrich data 对应的索引    "match_field": "num", #source index 中的属性名,用于incoming documents 和 source index 匹配的属性,属性名一样都是 num    "enrich_fields": ["loc"], #添加到 incoming documents 中的属性    # 可选,过滤 source index 的文档,只有 loc.keyword 是上海市的 enrich data 才能将属性添加到 incoming documents 中    "query": {      "match": {        "loc.keyword": "上海市"      }    }  }}

第三步:执行 enrich policy

当创建了 enrich policy 后,你可以通过 execute enrich policy API 去执行 enrich policy。当执行 enrich policy 后,会自动创建 enrich index。

直接将 incoming document 与 source index 中的文档匹配可能会很慢且占用大量资源。 为了加快处理速度,enrich processor 使用了 enrich index。 enrich index 包含来自 source index 的 enrich data,enrich index 具有一些特殊属性可帮助简化它们:

  • 它们是系统索引,这意味着它们由 Elasticsearch 在内部进行管理,仅适用于 enrich processor。

  • 它们始终以 .enrich- * 开头。

  • 它们是只读的,这意味着你不能直接更改它们。

  • 它们被强制合并以便快速检索。

当 source index 中新增或者修改了数据,只需要重新执行 enrich policy 就可以更改 enrich index,从而更新 enrich processor。

通过以下命令执行 enrich policy:

PUT /_enrich/policy/my-policy/_execute

查看自动创建的 enrich index:

GET _cat/indices/.enrich*# 返回结果green open .enrich-my-policy-1616136526661 Vxal9lLBSlKS5lmzMpFfwQ 1 3 1 0 13.4kb 3.3kb

我感觉 enrich policy 这里有个小 bug,当删除 enrich policy 时,例如删除的 enrich policy 为 my-policy-1,会同时删除 my-policy-1 的 enrich index 和 enrich policy ,但是如果原先还有个 my-policy-2(两个 enrich policy 在-之前是一样的),会把 my-policy-2 的 enrich index 也误删了(enrich policy 不删)。

第四步:在 ingest pipeline 使用 enrich processor

PUT _ingest/pipeline/loc-pipeline{  "processors": [    {      "enrich": {        "policy_name": "my-policy", #引用前面创建的 enrich policy        "field": "num",  # incoming document 中的属性名,用于和 source index 中的属性匹配值        #在incoming document 中新增的属性,        #包含在 enrich policy 中定义的 match_field 和 enrich_fields 的值        "target_field": "enrich_loc"       }    }  ]}

验证

使用 simulate 用来调试 ingest pipeline的效果,由于 source index 中匹配到的 loc.keyword 不是上海市,不会对这个文档进行处理:

POST _ingest/pipeline/loc-pipeline/_simulate{  "docs": [    {      "_source": {        "num": "A1001",        "company": "腾讯"      }    }  ]}# 返回结果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "company" : "腾讯",          "num" : "A1001"        },        "_ingest" : {          "timestamp" : "2021-03-19T06:56:45.754486259Z"        }      }    }  ]}

这个文档的 loc.keyword 是上海市,因此会添加上 enrich data 中指定的属性:

POST _ingest/pipeline/loc-pipeline/_simulate{  "docs": [    {      "_source": {        "num": "B1001",        "company": "Bilibili"      }    }  ]}# 返回结果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "company" : "Bilibili",          "enrich_loc" : {            "loc" : "上海市",            "num" : "B1001"          },          "num" : "B1001"        },        "_ingest" : {          "timestamp" : "2021-03-19T06:56:29.393585306Z"        }      }    }  ]}

在 simulate 调试成功之后,我们在插入文档的时候指定 ingest pipeline:

# 方式一:单条插入POST origin-location/_doc?pipeline=loc-pipeline{  "num": "A1001",  "company": "腾讯"}POST origin-location/_doc?pipeline=loc-pipeline{  "num": "B1001",  "company": "Bilibili"}# 方式二:批量插入POST _bulk?pipeline=loc-pipeline{"index":{"_index":"origin-location"}}{"num":"A1001","company":"腾讯"}{"index":{"_index":"origin-location"}}{"num":"B1001","company":"Bilibili"}

查看插入的结果:

GET origin-location/_search#返回结果{  "took" : 12,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 2,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "origin-location",        "_type" : "_doc",        "_id" : "zXxLSXgBUc4opBV-QiOv",        "_score" : 1.0,        "_source" : {          "num" : "A1001",          "company" : "腾讯"        }      },      {        "_index" : "origin-location",        "_type" : "_doc",        "_id" : "znxLSXgBUc4opBV-SCPk",        "_score" : 1.0,        "_source" : {          "num" : "B1001",          "company" : "Bilibili",          "enrich_loc" : {            "loc" : "上海市",            "num" : "B1001"          }        }      }    ]  }}

也可以指定索引默认使用的 ingest pipeline ,这样就不用每次在插入文档的时候指定 ingest pipeline了:

# 指定索引默认使用的 ingest pipelinePUT origin-location2{  "settings": {    "default_pipeline": "loc-pipeline"    }}# 插入数据POST _bulk{"index":{"_index":"origin-location2"}}{"num":"A1001","company":"腾讯"}{"index":{"_index":"origin-location2"}}{"num":"B1001","company":"Bilibili"}# 查看结果GET origin-location2/_search# 输出结果{  "took" : 8,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 2,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "origin-location2",        "_type" : "_doc",        "_id" : "CXxPSXgBUc4opBV-oyTJ",        "_score" : 1.0,        "_source" : {          "num" : "A1001",          "company" : "腾讯"        }      },      {        "_index" : "origin-location2",        "_type" : "_doc",        "_id" : "CnxPSXgBUc4opBV-oyTJ",        "_score" : 1.0,        "_source" : {          "num" : "B1001",          "company" : "Bilibili",          "enrich_loc" : {            "loc" : "上海市",            "num" : "B1001"          }        }      }    ]  }}

另外还可以使用 index template,通过正则表达式的方式匹配多个索引,来指定索引使用的 ingest pipeline:

# 使用 index templatePUT _template/my-template{  "index_patterns": ["origin-*"],  "settings": {   "default_pipeline": "loc-pipeline"  }}# 插入数据POST _bulk{"index":{"_index":"origin-location3"}}{"num":"A1001","company":"腾讯"}{"index":{"_index":"origin-location3"}}{"num":"B1001","company":"Bilibili"}# 查看结果GET origin-location3/_search# 输出结果{  "took" : 2,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 2,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "origin-location3",        "_type" : "_doc",        "_id" : "XnxVSXgBUc4opBV-1yRp",        "_score" : 1.0,        "_source" : {          "num" : "A1001",          "company" : "腾讯"        }      },      {        "_index" : "origin-location3",        "_type" : "_doc",        "_id" : "X3xVSXgBUc4opBV-1yRp",        "_score" : 1.0,        "_source" : {          "num" : "B1001",          "company" : "Bilibili",          "enrich_loc" : {            "loc" : "上海市",            "num" : "B1001"          }        }      }    ]  }}

到此,关于"怎么使用enrich processor"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

文档 属性 腾讯 索引 上海 上海市 结果 数据 学习 巴巴 方式 阿里 中新 阿里巴巴 帮助 普通 产品 信息 内容 地址 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器机箱哪家好 数据库主健 手机远程管理服务器软件 网络安全搞笑文案 信息网络安全第一责任人 政府网络安全工作部署情况 网络安全法 四十四 fifa22中国球员数据库 集群存储管理平台服务器的功能 上海满誉软件开发有限公司 比超单上有哪些数据库 芯片仿真对服务器的要求 网络安全突发事件应急预案演练 保障网络安全包括 长春思科大学网络技术学院 小学生全国网络安全考试骗局 服务器管理员密码怎么设置 软件开发可以长期干下去吗 用典故诗句来说网络安全 软件开发里面哪个专业吃香 客户怎样找华为做软件开发 中国象棋棋谱数据库 戴尔服务器经销商苏州 数据结构和软件开发的关系 关于服务器安全方面的书籍 网络安全职业学校奇安信 乌鲁木齐网络技术公司 实验室服务器跑Python 久发远程互联网医疗科技公司 东阳腾灿网络技术有限公司
0