flink 读取hive的数据
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。maven
千家信息网最后更新 2025年12月03日flink 读取hive的数据
flink1.8 对hive 的支持不够好,造成300W的数据,居然读了2个小时,打算将程序迁移至spark。 先把代码贴上。 后发现sql不应该有where条件,去掉后速度还行。
maven
org.apache.hive hive-jdbc 1.1.0 org.apache.hadoop hadoop-common 3.1.2 jdk.tools jdk.tools 1.8 system ${JAVA_HOME}/lib/tools.jar java
private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驱动路径 private final static String url = ";";// hive库地址+库名 private final static String user = "";// 用户名 private final static String password = "!";// 密码 private final static String table=""; private final static String sql = " "; public static void main(String[] arg) throws Exception { long time=System.currentTimeMillis(); HttpClientUtil.sendDingMessage("开始同步hive-"+table+";"+Utils.getTimeString()); /** * 初始化环境 */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); try { TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; String[] colName = new String[]{"user","name"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName); JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName) .setDBUrl(url) .setUsername(user).setPassword(password); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.add(Calendar.DATE, -1); //用昨天产出的数据 SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd"); String d=sj.format(calendar.getTime()); JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish(); DataSource rowlist = env.createInput(jdbcInputFormat); DataSet temp= rowlist.filter(new FilterFunction(){ @Override public boolean filter(Row row) throws Exception { String key=row.getField(0).toString(); String value=row.getField(1).toString(); if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) { return false; }else { return true; } } }).map(new MapFunction(){ @Override public RedisDataModel map(Row value) throws Exception { RedisDataModel m=new RedisDataModel(); m.setExpire(-1); m.setKey(JobConstants.REDIS_FLINK_IMEI_USER+value.getField(0).toString()); m.setGlobal(true); m.setValue(value.getField(1).toString()); return m; } }); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;开始推送模型,共有"+temp.count()+"条;"+Utils.getTimeString()); RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat() .setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER)) .setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS)) .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE))) .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL))) .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS))) .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW))) .finish(); temp.output(redisOutput); env.execute("hive-"+table+" sync"); HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗时:"+(System.currentTimeMillis()-time)/1000+"s"); } catch (Exception e) { logger.error("",e); HttpClientUtil.sendDingMessage("同步hive-"+table+"失败,时间戳:"+time+",原因:"+e.toString()); }
同步
数据
不够
代码
原因
地址
密码
小时
时间
条件
模型
环境
用户
用户名
程序
路径
速度
产出
推送
支持
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网易如何创造一个自己的服务器
软件开发公司起什么名字好听
分布式关系数据库创新
软件开发能力大于学历吗
网络安全知识防范恶意软件
方舟 服务器 工具
新华互联网科技历届发布会
金融行业与软件开发
公安部网络安全保卫局白学敏
长沙软件开发公司营业执照
加强互联网金融网络安全
inux服务器开源
数据库 加拿大
软件开发投标书模板免费下载
江西免费审批管控软件开发平台
地理数据库er图讲解
开封三年制计算机网络技术
体脂秤软件开发
qq空间留言时服务器繁忙
5e平台csgo服务器选择
对网络安全主管部门的建议书
国际服怎么更改第二次服务器
数据库 rt
软件开发项目合作意向书
acc数据库介绍
网络安全小故事小学生
数据库技术图片大全
贵州移动服务器显示当前网络不稳
天津华为网络安全
录播服务器