ORC文件读写工具类和Flink输出ORC格式文件的方法
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容主要讲解"ORC文件读写工具类和Flink输出ORC格式文件的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ORC文件读写工具类和Flink输
千家信息网最后更新 2025年12月01日ORC文件读写工具类和Flink输出ORC格式文件的方法
本篇内容主要讲解"ORC文件读写工具类和Flink输出ORC格式文件的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"ORC文件读写工具类和Flink输出ORC格式文件的方法"吧!
一.ORC文件:
压缩
压缩比例在1:7到1:10之间,3份副本的话会节省接近10倍空间
调查数据周末要给出
数据压缩后要注意负载均衡问题,可以尝试reblance
导出
hive的orc文件使用sqoop导出到mysql使用hcatalog直接增加一些配置参数即可
查看
以json方式查看orc文件
hive --orcfiledump -j -p /user/hive/warehouse/dim.db/dim_province/000000_0
下载
以KV形式查看orc文件
hive --orcfiledump -d /user/hive/warehouse/dim.db/dim_province/000000_0 > myfile.txt
orc读取会查找字段在min和max中的值,不包含则跳过,所以速度会快
二,orc读写工具类
注意事项: 在windows读写时,请务必保证classpath ,path中不要有hadoop的环境变量! 如果有,请先删除,并且重启IDE
2.1 读:
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.Reader;import org.apache.orc.RecordReader;import org.apache.orc.TypeDescription;import java.io.IOException;public class CoreReader { public static void main(Configuration conf, String[] args) throws IOException { // Get the information from the file footer Reader reader = OrcFile.createReader(new Path("my-file.orc"), OrcFile.readerOptions(conf)); System.out.println("File schema: " + reader.getSchema()); System.out.println("Row count: " + reader.getNumberOfRows()); // Pick the schema we want to read using schema evolution TypeDescription readSchema = TypeDescription.fromString("struct"); // Read the row data VectorizedRowBatch batch = readSchema.createRowBatch(); RecordReader rowIterator = reader.rows(reader.options() .schema(readSchema)); LongColumnVector z = (LongColumnVector) batch.cols[0]; BytesColumnVector y = (BytesColumnVector) batch.cols[1]; LongColumnVector x = (LongColumnVector) batch.cols[2]; while (rowIterator.nextBatch(batch)) { for(int row=0; row < batch.size; ++row) { int zRow = z.isRepeating ? 0: row; int xRow = x.isRepeating ? 0: row; System.out.println("z: " + (z.noNulls || !z.isNull[zRow] ? z.vector[zRow] : null)); System.out.println("y: " + y.toString(row)); System.out.println("x: " + (x.noNulls || !x.isNull[xRow] ? x.vector[xRow] : null)); } } rowIterator.close(); } public static void main(String[] args) throws IOException { main(new Configuration(), args); }} 2.2,写:
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import java.io.IOException;import java.nio.charset.StandardCharsets;public class CoreWriter { public static void main(Configuration conf, String[] args) throws IOException { TypeDescription schema = TypeDescription.fromString("struct"); Writer writer = OrcFile.createWriter(new Path("my-file.orc"), OrcFile.writerOptions(conf) .setSchema(schema)); VectorizedRowBatch batch = schema.createRowBatch(); LongColumnVector x = (LongColumnVector) batch.cols[0]; BytesColumnVector y = (BytesColumnVector) batch.cols[1];for(int r=0; r < 10000; ++r) { int row = batch.size++; x.vector[row] = r; byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8); y.setRef(row, buffer, 0, buffer.length); // If the batch is full, write it out and start over. if (batch.size == batch.getMaxSize()) { writer.addRowBatch(batch); batch.reset(); } }if (batch.size != 0) { writer.addRowBatch(batch); } writer.close(); } public static void main(String[] args) throws IOException {main(new Configuration(), args); }} 2.3 Flink Sink ORC文件示例:(基于flink1.12.3版本)
import org.apache.flink.core.fs.Path;import org.apache.flink.orc.OrcSplitReaderUtil;import org.apache.flink.orc.vector.RowDataVectorizer;import org.apache.flink.orc.writer.OrcBulkWriterFactory;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.types.logical.DoubleType;import org.apache.flink.table.types.logical.IntType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.table.types.logical.VarCharType;import org.apache.hadoop.conf.Configuration;import org.apache.orc.TypeDescription;import java.util.Properties;public class StreamingWriteFileOrc { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000); env.setParallelism(1); DataStream dataStream = env.addSource( new MySource()); //写入orc格式的属性 final Properties writerProps = new Properties(); writerProps.setProperty("orc.compress", "LZ4"); //定义类型和字段名 LogicalType[] orcTypes = new LogicalType[]{ new IntType(), new DoubleType(), new VarCharType()}; String[] fields = new String[]{"a", "b", "c"}; TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of( orcTypes, fields)); //构造工厂类OrcBulkWriterFactory final OrcBulkWriterFactory factory = new OrcBulkWriterFactory<>( new RowDataVectorizer(typeDescription.toString(), orcTypes), writerProps, new Configuration()); StreamingFileSink orcSink = StreamingFileSink .forBulkFormat(new Path("file:///tmp/aaaa"), factory) .build(); dataStream.addSink(orcSink); env.execute(); } public static class MySource implements SourceFunction{ @Override public void run(SourceContext sourceContext) throws Exception{ while (true){ GenericRowData rowData = new GenericRowData(3); rowData.setField(0, (int) (Math.random() * 100)); rowData.setField(1, Math.random() * 100); rowData.setField(2, org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100))); sourceContext.collect(rowData); Thread.sleep(1); } } @Override public void cancel(){ } }} 2.4 POM依赖
UTF-8 1.8 UTF-8 UTF-8 1.8 UTF-8 1.8 2.11 2.11 1.12.3 1.2.0 1.7.21 1.3.1 compile commons-cli commons-cli 1.4 commons-codec commons-codec 1.15 junit junit 4.11 test org.apache.hbase hbase-client ${hbase.version} org.apache.hadoop hadoop-yarn-common org.apache.hadoop hadoop-yarn-api hadoop-mapreduce-client-core org.apache.hadoop hadoop-auth org.apache.hadoop hadoop-common org.apache.hadoop commons-lang commons-lang 2.6 org.apache.commons commons-lang3 3.3.2 mysql mysql-connector-java 5.1.47 com.alibaba fastjson 1.2.28 org.apache.flink flink-java ${flink.cluster.version} ${scope.value} org.apache.flink flink-table ${flink.cluster.version} pom ${scope.value} org.apache.flink flink-table-api-scala-bridge_2.11 ${flink.cluster.version} ${scope.value} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.cluster.version} ${scope.value} org.apache.flink flink-connector-filesystem_2.11 1.11.3 org.apache.flink flink-connector-filesystem_${scala.version} 1.11.3 org.apache.flink flink-orc_2.11 1.12.3 ${scope.value} org.apache.flink flink-ml_${scala.version} 1.8.1 ${scope.value} org.apache.flink flink-table-planner-blink_2.11 ${flink.cluster.version} ${scope.value} org.apache.flink flink-table-common ${flink.cluster.version} ${scope.value} org.apache.flink flink-streaming-java_${scala.version} 1.12.3 ${scope.value} org.apache.flink flink-streaming-scala_${scala.version} ${flink.cluster.version} commons-lang3 org.apache.commons commons-cli commons-cli ${scope.value} org.apache.flink flink-connector-kafka_${scala.version} ${flink.cluster.version} log4j log4j org.slf4j slf4j-log4j12 org.apache.hadoop hadoop-common 2.7.3 ${scope.value} org.apache.hadoop hadoop-hdfs 2.7.3 ${scope.value} xml-apis xml-apis org.apache.flink flink-parquet_${scala.version} ${flink.cluster.version} org.apache.flink flink-avro ${flink.cluster.version} org.slf4j slf4j-api ${slf4j.version} ch.qos.logback logback-core ${logback.version} ch.qos.logback logback-classic ${logback.version} redis.clients jedis 3.0.0 org.apache.commons commons-pool2 2.5.0 com.alibaba druid 1.0.11 org.apache.flink flink-clients_2.11 ${flink.cluster.version} org.apache.hive hive-jdbc 1.2.1 org.apache.hadoop hadoop-client 2.7.3
到此,相信大家对"ORC文件读写工具类和Flink输出ORC格式文件的方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
文件
工具
方法
格式
UTF-8
输出
内容
字段
数据
学习
实用
更深
均衡
之间
事项
兴趣
副本
参数
变量
实用性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
ipv6 服务器应用
软件开发红冲线
网络安全法生效
服务器托管河北虚拟主机
东至新能源软件开发服务价钱
社区基础数据库
广西启路网络技术有限公司
我的世界刷什么会占服务器空间
数据库结构和库表关系
如何查询数据库表格表头
17网络安全信息表怎么填
软件开发与设计就业
博山物流竞价软件开发报价
云帮手支持centos服务器吗
没有加载数据库
数据库自连接怎么解释
生物信息学实验报告数据库检索
wed服务器采用的传输协议
猫爪论坛网站代理服务器
软件开发项目利润分配
serv-u服务器软件
网络安全法生效
厦门英九网络技术
网络安全法专题研讨材料免费复制
lol进去老是说无法连接服务器
邯郸软件开发需要多少钱
暗黑黎明2服务器
临沂vr软件开发
软件开发需要学什么东西
杭州精准学网络技术有限公司