千家信息网

ElasticSearch笔记整理(三):Java API使用与ES中文分词

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,[TOC]pom.xml使用maven工程构建ES Java API的测试项目,其用到的依赖如下: org.elasticsearch elasticsearch 2.3.0
千家信息网最后更新 2025年12月02日ElasticSearch笔记整理(三):Java API使用与ES中文分词

[TOC]


pom.xml

使用maven工程构建ES Java API的测试项目,其用到的依赖如下:

    org.elasticsearch    elasticsearch    2.3.0    com.fasterxml.jackson.core    jackson-databind    2.7.0    org.dom4j    dom4j    2.0.0    org.projectlombok    lombok    1.16.10

ES API之基本增删改查

使用junit进行测试,其使用的全局变量与setUp函数如下:

private TransportClient client;private String index = "bigdata";   // 要操作的索引库为"bigdata"private String type = "product";    // 要操作的类型为"product"@Beforepublic void setup() throws UnknownHostException {    // 连接的是ES集群,所以需要添加集群名称,否则无法创建客户端    Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();    client = TransportClient.builder().settings(settings).build();    TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);    TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);    TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);    client.addTransportAddresses(ta1, ta2, ta3);    /*settings = client.settings();        Map asMap = settings.getAsMap();        for(Map.Entry setting : asMap.entrySet()) {            System.out.println(setting.getKey() + "::" + setting.getValue());        }*/}

索引添加:JSON方式

/**     * 注意:往es中添加数据有4种方式     * 1.JSON     * 2.Map     * 3.Java Bean     * 4.XContentBuilder     *     * 1.JSON方式     */@Testpublic void testAddJSON() {    String source = "{\"name\":\"sqoop\", \"author\": \"apache\", \"version\": \"1.4.6\"}";    IndexResponse response = client.prepareIndex(index, type, "4").setSource(source).get();    System.out.println(response.isCreated());}

索引添加:Map方式

/**     * 添加数据:     * 2.Map方式     */@Testpublic void testAddMap() {    Map source = new HashMap();    source.put("name", "flume");    source.put("author", "Cloudera");    source.put("version", "1.8.0");    IndexResponse response = client.prepareIndex(index, type, "5").setSource(source).get();    System.out.println(response.isCreated());}

索引添加:Java Bean方式

/**     * 添加数据:     * 3.Java Bean方式     *     * 如果不将对象转换为json字符串,则会报下面的异常:     * The number of object passed must be even but was [1]     */@Testpublic void testAddObj() throws JsonProcessingException {    Product product = new Product("kafka", "linkedIn", "0.10.0.1", "kafka.apache.org");    ObjectMapper objectMapper = new ObjectMapper();    String json = objectMapper.writeValueAsString(product);    System.out.println(json);    IndexResponse response = client.prepareIndex(index, type, "6").setSource(json).get();    System.out.println(response.isCreated());}

索引添加:XContentBuilder方式

/**     * 添加数据:     * 4.XContentBuilder方式     */@Testpublic void testAddXContentBuilder() throws IOException {    XContentBuilder source = XContentFactory.jsonBuilder();    source.startObject()        .field("name", "redis")        .field("author", "redis")        .field("version", "3.2.0")        .field("url", "redis.cn")        .endObject();    IndexResponse response = client.prepareIndex(index, type, "7").setSource(source).get();    System.out.println(response.isCreated());}

索引查询

/**     * 查询具体的索引信息     */@Testpublic void testGet() {    GetResponse response = client.prepareGet(index, type, "6").get();    Map map = response.getSource();    /*for(Map.Entry me : map.entrySet()) {            System.out.println(me.getKey() + "=" + me.getValue());        }*/    // lambda表达式,jdk 1.8之后    map.forEach((k, v) -> System.out.println(k + "=" + v));    //        map.keySet().forEach(key -> System.out.println(key + "xxx"));}

索引更新

/**     * 局部更新操作与curl的操作是一致的     * curl -XPOST http://uplooking01:9200/bigdata/product/AWA184kojrSrzszxL-Zs/_update -d' {"doc":{"name":"sqoop", "author":"apache"}}'     *     * 做全局更新的时候,也不用prepareUpdate,而直接使用prepareIndex     */@Testpublic void testUpdate() throws Exception {    /*String source = "{\"doc\":{\"url\": \"http://flume.apache.org\"}}";        UpdateResponse response = client.prepareUpdate(index, type, "4").setSource(source.getBytes()).get();*/    // 使用下面这种方式也是可以的    String source = "{\"url\": \"http://flume.apache.org\"}";    UpdateResponse response = client.prepareUpdate(index, type, "4").setDoc(source.getBytes()).get();    System.out.println(response.getVersion());}

索引删除

/**     * 删除操作     */@Testpublic void testDelete() {    DeleteResponse response = client.prepareDelete(index, type, "5").get();    System.out.println(response.getVersion());}

批量操作

/**     * 批量操作     */@Testpublic void testBulk() {    IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type, "8")        .setSource("{\"name\":\"elasticsearch\", \"url\":\"http://www.elastic.co\"}");    UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(index, type, "1").setDoc("{\"url\":\"http://hadoop.apache.org\"}");    BulkRequestBuilder bulk = client.prepareBulk();    BulkResponse bulkResponse = bulk.add(indexRequestBuilder).add(updateRequestBuilder).get();    Iterator it = bulkResponse.iterator();    while(it.hasNext()) {        BulkItemResponse response = it.next();        System.out.println(response.getId() + "<--->" + response.getVersion());    }}

获取索引记录数

/**     * 获取索引记录数     */@Testpublic void testCount() {    CountResponse response = client.prepareCount(index).get();    System.out.println("索引记录数:" + response.getCount());}

ES API之高级查询

基于junit进行测试,其用到的setUp函数和showResult函数如下:

全局变量与setUp:

private TransportClient client;private String index = "bigdata";private String type = "product";private String[] indics = {"bigdata", "bank"};@Beforepublic void setUp() throws UnknownHostException {    Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();    client = TransportClient.builder().settings(settings).build();    TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);    TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);    TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);    client.addTransportAddresses(ta1, ta2, ta3);}

showResult:

/**     * 格式化输出查询结果     * @param response     */private void showResult(SearchResponse response) {    SearchHits searchHits = response.getHits();    float maxScore = searchHits.getMaxScore();  // 查询结果中的最大文档得分    System.out.println("maxScore: " + maxScore);    long totalHits = searchHits.getTotalHits(); // 查询结果记录条数    System.out.println("totalHits: " + totalHits);    SearchHit[] hits = searchHits.getHits();    // 查询结果    System.out.println("当前返回结果记录条数:" + hits.length);    for (SearchHit hit : hits) {        long version = hit.version();        String id = hit.getId();        String index = hit.getIndex();        String type = hit.getType();        float score = hit.getScore();        System.out.println("===================================================");        String source = hit.getSourceAsString();        System.out.println("version: " + version);        System.out.println("id: " + id);        System.out.println("index: " + index);        System.out.println("type: " + type);        System.out.println("score: " + score);        System.out.println("source: " + source);    }}

ES查询类型说明

查询类型有如下4种:

query and fetch(速度最快)(返回N倍数据量)query then fetch(默认的搜索方式)DFS query and fetchDFS query then fetch(可以更精确控制搜索打分和排名。)

查看API的注释如下:

/**     * Same as {@link #QUERY_THEN_FETCH}, except for an initial scatter phase which goes and computes the distributed     * term frequencies for more accurate scoring.     */DFS_QUERY_THEN_FETCH((byte) 0),/**     * The query is executed against all shards, but only enough information is returned (not the document content).     * The results are then sorted and ranked, and based on it, only the relevant shards are asked for the actual     * document content. The return number of hits is exactly as specified in size, since they are the only ones that     * are fetched. This is very handy when the index has a lot of shards (not replicas, shard id groups).     */QUERY_THEN_FETCH((byte) 1),/**     * Same as {@link #QUERY_AND_FETCH}, except for an initial scatter phase which goes and computes the distributed     * term frequencies for more accurate scoring.     */DFS_QUERY_AND_FETCH((byte) 2),/**     * The most naive (and possibly fastest) implementation is to simply execute the query on all relevant shards     * and return the results. Each shard returns size results. Since each shard already returns size hits, this     * type actually returns size times number of shards results back to the caller.     */QUERY_AND_FETCH((byte) 3),

关于DFS的说明:

DFS是什么缩写?这个D可能是Distributed,F可能是frequency的缩写,至于S可能是Scatter的缩写,整个单词可能是分布式词频率和文档频率散发的缩写。初始化散发是一个什么样的过程?从es的官方网站我们可以发现,初始化散发其实就是在进行真正的查询之前,先把各个分片的词频率和文档频率收集一下,然后进行词搜索的时候,各分片依据全局的词频率和文档频率进行搜索和排名。显然如果使用DFS_QUERY_THEN_FETCH这种查询方式,效率是最低的,因为一个搜索,可能要请求3次分片。但,使用DFS方法,搜索精度应该是最高的。

总结:

总结一下,从性能考虑QUERY_AND_FETCH是最快的,DFS_QUERY_THEN_FETCH是最慢的。从搜索的准确度来说,DFS要比非DFS的准确度更高。

精确查询

/**     * 1.精确查询     * termQuery     * term就是一个字段     */@Testpublic void testSearch2() {    SearchRequestBuilder searchQuery = client.prepareSearch(indics)    // 在prepareSearch()的参数为索引库列表,意为要从哪些索引库中进行查询        .setSearchType(SearchType.DEFAULT)  // 设置查询类型,有QUERY_AND_FETCH  QUERY_THEN_FETCH  DFS_QUERY_AND_FETCH  DFS_QUERY_THEN_FETCH        .setQuery(QueryBuilders.termQuery("author", "apache"))// 设置相应的query,用于检索,termQuery的参数说明:name是doc中的具体的field,value就是要找的具体的值        ;    // 如果上面不加查询条件,则会查询所有    SearchResponse response = searchQuery.get();    showResult(response);}

模糊查询

/**     * 2.模糊查询     * prefixQuery     */@Testpublic void testSearch3() {    SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.QUERY_THEN_FETCH)        .setQuery(QueryBuilders.prefixQuery("name", "h"))        .get();    showResult(response);}

分页查询

/**     * 3.分页查询     * 查询索引库bank中     * 年龄在(25, 35]之间的数据信息     *     * 分页算法:     *      查询的第几页,每一页显示几条     *          每页显示10条记录     *     *      查询第4页的内容     *          setFrom(30=(4-1)*size)     *          setSize(10)     *       所以第N页的起始位置:(N - 1) * pageSize     */@Testpublic void testSearch4() {    // 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片)    SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)        .setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))        // 下面setFrom和setSize用于设置查询结果进行分页        .setFrom(0)        .setSize(5)        .get();    showResult(response);}

高亮显示查询

/**     * 4.高亮显示查询     * 获取数据,     *  查询apache,不仅在author拥有,也可以在url,在name中也可能拥有     *  author or url   --->booleanQuery中的should操作     *      如果是and的类型--->booleanQuery中的must操作     *      如果是not的类型--->booleanQuery中的mustNot操作     *  使用的match操作,其实就是使用要查询的keyword和对应字段进行完整匹配,是否相等,相等返回     */@Testpublic void testSearch5() {    SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DEFAULT)        //                .setQuery(QueryBuilders.multiMatchQuery("apache", "author", "url"))        //                .setQuery(QueryBuilders.regexpQuery("url", ".*apache.*"))        //                .setQuery(QueryBuilders.termQuery("author", "apache"))        .setQuery(QueryBuilders.boolQuery()                  .should(QueryBuilders.regexpQuery("url", ".*apache.*"))                  .should(QueryBuilders.termQuery("author", "apache")))        // 设置高亮显示--->设置相应的前置标签和后置标签        .setHighlighterPreTags("")        .setHighlighterPostTags("")        // 哪个字段要求高亮显示        .addHighlightedField("author")        .addHighlightedField("url")        .get();    SearchHits searchHits = response.getHits();    float maxScore = searchHits.getMaxScore();  // 查询结果中的最大文档得分    System.out.println("maxScore: " + maxScore);    long totalHits = searchHits.getTotalHits(); // 查询结果记录条数    System.out.println("totalHits: " + totalHits);    SearchHit[] hits = searchHits.getHits();    // 查询结果    System.out.println("当前返回结果记录条数:" + hits.length);    for(SearchHit hit : hits) {        System.out.println("========================================================");        Map highlightFields = hit.getHighlightFields();        for(Map.Entry me : highlightFields.entrySet()) {            System.out.println("--------------------------------------");            String key = me.getKey();            HighlightField highlightField = me.getValue();            String name = highlightField.getName();            System.out.println("key: " + key + ", name: " + name);            Text[] texts = highlightField.fragments();            String value = "";            for(Text text : texts) {                // System.out.println("text: " + text.toString());                value += text.toString();            }            System.out.println("value: " + value);        }    }}

排序查询

/**     * 5.排序查询     * 对结果集进行排序     *  balance(收入)由高到低     */@Testpublic void testSearch6() {    // 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片)    SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)        .setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))        .addSort("balance", SortOrder.DESC)        // 下面setFrom和setSize用于设置查询结果进行分页        .setFrom(0)        .setSize(5)        .get();    showResult(response);}

聚合查询:计算平均值

/**     * 6.聚合查询:计算平均值     */@Testpublic void testSearch7() {    indics = new String[]{"bank"};    // 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片)    SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)        .setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))        /*                    select avg(age) as avg_name from person;                    那么这里的avg("balance")--->就是返回结果avg_name这个别名                 */        .addAggregation(AggregationBuilders.avg("avg_balance").field("balance"))        .addAggregation(AggregationBuilders.max("max").field("balance"))        .get();    //        System.out.println(response);    /*            response中包含的Aggregations                "aggregations" : {                    "max" : {                      "value" : 49741.0                    },                    "avg_balance" : {                      "value" : 25142.137373737372                    }                  }                  则一个aggregation为:                  {                      "value" : 49741.0                    }         */    Aggregations aggregations = response.getAggregations();    List aggregationList = aggregations.asList();    for(Aggregation aggregation : aggregationList) {        System.out.println("========================================");        String name = aggregation.getName();        // Map map = aggregation.getMetaData();        System.out.println("name: " + name);        // System.out.println(map);        Object obj = aggregation.getProperty("value");        System.out.println(obj);    }    /*Aggregation avgBalance = aggregations.get("avg_balance");        Object obj = avgBalance.getProperty("value");        System.out.println(obj);*/}

ES中文分词之集成IK分词

如果我们的数据包含中文,而在查询时希望可以支持对中文进行分词搜索,那么ES本身依赖于Lucene的分词对中文就不佳了,这时就可以考虑使用其它分词方法,如这里要说明的IK中文分词,其集成到ES的步骤如下:

  1)下载地址:    https://github.com/medcl/elasticsearch-analysis-ik  2)使用maven对源代码进行编译(mvn clean install -DskipTests)(package)  3)把编译后的target/releases下的zip文件拷贝到   ES_HOME/plugins/analysis-ik目录下面,然后解压  4)把下载的ik插件中的conf/ik目录拷贝到ES_HOME/config下  5)修改ES_HOME/config/elasticsearch.yml文件,添加index.analysis.analyzer.default.type: ik  (把IK设置为默认分词器,这一步是可选的)  6)重启es服务  7)测试分词效果

需要说明的是,数据需要重新插入,并使用ik分词,即需要重新构建期望使用中文分词IK的索引库。

测试代码如下:

package cn.xpleaf.bigdata.elasticsearch;import org.elasticsearch.action.search.SearchRequestBuilder;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.search.SearchType;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.text.Text;import org.elasticsearch.common.transport.InetSocketTransportAddress;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.aggregations.Aggregation;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.Aggregations;import org.elasticsearch.search.highlight.HighlightField;import org.elasticsearch.search.sort.SortOrder;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.List;import java.util.Map;/** * 使用Java API来操作es集群 * Transport * 代表了一个集群 * 我们客户端和集群通信是使用TransportClient * 

* 使用prepareSearch来完成全文检索之 * 中文分词 */public class ElasticSearchTest3 { private TransportClient client; private String index = "bigdata"; private String type = "product"; private String[] indics = {"chinese"}; @Before public void setUp() throws UnknownHostException { Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build(); client = TransportClient.builder().settings(settings).build(); TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300); TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300); TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300); client.addTransportAddresses(ta1, ta2, ta3); } /** * 中文分词的操作 * 1.查询以"中"开头的数据,有两条 * 2.查询以"中国"开头的数据,有0条 * 3.查询包含"烂"的数据,有1条 * 4.查询包含"烂摊子"的数据,有0条 * 分词: * 为什么我们搜索China is the greatest country~ * 中文:中国最牛逼 * * ××× * 中华 * 人民 * 共和国 * 中华人民 * 人民共和国 * 华人 * 共和 * 特殊的中文分词法: * 庖丁解牛 * IK分词法 * 搜狗分词法 */ @Test public void testSearch2() { SearchResponse response = client.prepareSearch(indics) // 在prepareSearch()的参数为索引库列表,意为要从哪些索引库中进行查询 .setSearchType(SearchType.DEFAULT) // 设置查询类型,有QUERY_AND_FETCH QUERY_THEN_FETCH DFS_QUERY_AND_FETCH DFS_QUERY_THEN_FETCH //.setQuery(QueryBuilders.prefixQuery("content", "烂摊子"))// 设置相应的query,用于检索,termQuery的参数说明:name是doc中的具体的field,value就是要找的具体的值// .setQuery(QueryBuilders.regexpQuery("content", ".*烂摊子.*")) .setQuery(QueryBuilders.prefixQuery("content", "中国")) .get(); showResult(response); } /** * 格式化输出查询结果 * @param response */ private void showResult(SearchResponse response) { SearchHits searchHits = response.getHits(); float maxScore = searchHits.getMaxScore(); // 查询结果中的最大文档得分 System.out.println("maxScore: " + maxScore); long totalHits = searchHits.getTotalHits(); // 查询结果记录条数 System.out.println("totalHits: " + totalHits); SearchHit[] hits = searchHits.getHits(); // 查询结果 System.out.println("当前返回结果记录条数:" + hits.length); for (SearchHit hit : hits) { long version = hit.version(); String id = hit.getId(); String index = hit.getIndex(); String type = hit.getType(); float score = hit.getScore(); System.out.println("==================================================="); String source = hit.getSourceAsString(); System.out.println("version: " + version); System.out.println("id: " + id); System.out.println("index: " + index); System.out.println("type: " + type); System.out.println("score: " + score); System.out.println("source: " + source); } } @After public void cleanUp() { client.close(); }}

相关测试代码已上传到GitHub:https://github.com/xpleaf/elasticsearch-study

查询 索引 结果 数据 方式 中文 搜索 类型 就是 文档 测试 集群 全局 参数 缩写 高亮 最大 精确 人民 函数 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 杭州网络技术企业 数据库统计每个岗位人数 计算网络安全检测及设置工具 一年级网络安全手抄报 一等奖 中国网络安全看湖南 ios路由器软件开发 电子软件开发属于什么专业 单位手机网络安全自查报告 软件开发深蓝 满足一个服务器可以服务多个客户 dell服务器管理卡ip 计算机网络安全的背景 鼎维软件开发有限公司 罗湖区网络技术质量保障 我的世界服务器玩家名变量是什么 西门子服务器作用 女孩子学哪类软件开发 学术期刊数据库是什么意思 搜索网络安全的知识 php字符串写入数据库 软件开发合同甲方 8核16g数据库支持多少并发 网络安全责任如何认定 软通动力资深软件开发工程师薪资 服务器系统怎么进不了网站 游侠对战平台服务器搭建 玉田企业网络技术答疑解惑 软件开发应届生工资一般多少 河南通信软件开发服务以客为尊 高防服务器租赁安全标准
0