怎么使用Flink TableAPI和SQL /Elasticsearch
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"怎么使用Flink TableAPI和SQL /Elasticsearch",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么使用F
千家信息网最后更新 2025年12月02日怎么使用Flink TableAPI和SQL /Elasticsearch
这篇文章主要讲解了"怎么使用Flink TableAPI和SQL /Elasticsearch",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么使用Flink TableAPI和SQL /Elasticsearch"吧!
使用Tbale&SQL与Flink Elasticsearch Connector 连接器将数据写入Elasticsearch引擎的索引
示例环境
java.version: 1.8.xflink.version: 1.11.1elasticsearch:6.x
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToEs.java
package com.flink.examples.elasticsearch;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入Elasticsearch引擎的索引 */public class InsertToEs { /** * Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。 * 参考官方:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html */ //参见属性配置类:ElasticsearchValidator static String table_sql = "CREATE TABLE my_users (\n" + " user_id STRING,\n" + " user_name STRING,\n" + " uv BIGINT,\n" + " pv BIGINT,\n" + " PRIMARY KEY (user_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'elasticsearch',\n" + " 'connector.version' = '6',\n" + " 'connector.property-version' = '1', \n" + " 'connector.hosts' = 'http://192.168.110.35:9200',\n" + " 'connector.index' = 'users',\n" + " 'connector.document-type' = 'doc',\n" + " 'format.type' = 'json',\n" + " 'update-mode'='append' -- append|upsert\n" + ")"; public static void main(String[] args) { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); //Elasticsearch connector 目前只支持了 sink,不支持 source 。不能SELECT elasticsearch table,因此只能通过insert的方式提交数据; String sql = "insert into my_users (user_id,user_name,uv,pv) values('10003','tom',31,10)";// TableResult tableResult = tEnv.executeSql(sql); //第二种方式:声明一个操作集合来执行sql StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); TableResult tableResult = stmtSet.execute(); tableResult.print(); }}打印结果
+-------------------------------------------+| default_catalog.default_database.my_users |+-------------------------------------------+| -1 |+-------------------------------------------+1 row in set
感谢各位的阅读,以上就是"怎么使用Flink TableAPI和SQL /Elasticsearch"的内容了,经过本文的学习后,相信大家对怎么使用Flink TableAPI和SQL /Elasticsearch这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
示例
方式
学习
内容
引擎
模块
环境
索引
连接器
支持
官方
就是
属性
思路
情况
数据源
文章
时间
更多
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
宿城区多功能网络技术
魔兽世界世界数据库
南宫软件开发哪家正规
计算机网络技术和人工智能
网络安全工程师考研吗
闵行区信息软件开发网上价格
数据库大文本类型
网络安全加密上市公司
校园网络安全教育知识讲座
黎明觉醒怎么登录服务器
软件开发+坑
数据库文件是流式文件吗
网络安全法新增的内容
什么是应用程序软件开发
spss数据库的建立
电脑进入界面后无法连接服务器
内部网络安全现状
广西定时模块服务器
数据网络技术就业前景
虚拟网络安全中心
网络安全手抄报怎么涂颜色
服务器主板可以连接手机吗
你画我猜无法连接至游戏服务器
美国服务器多IP
取数软件开发公司
安徽省网络安全宣传周先进集体
镇江百事通网络技术有限公司
国内科研软件开发
海康威视服务器关机命令
网络安全建设的含义