【总结】Spark优化(1)-多Job并发执行
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,
千家信息网最后更新 2025年12月03日【总结】Spark优化(1)-多Job并发执行
Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等
在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,其中两个Job采用串行方式,另外两个Job采用并行方式。将任务提交到Yarn中执行。能够明显看出串行与兵线处理的性能。
每个Job执行时间:
| JobID | 开始时间 | 结束时间 | 耗时 |
| Job 0 | 16:59:45 | 17:00:34 | 49s |
| Job 1 | 17:00:34 | 17:01:13 | 39s |
| Job 2 | 17:01:15 | 17:01:55 | 40s |
| Job 3 | 17:01:16 | 17:02:12 | 56s |
四个Job都是自执行相同操作,Job0,Job1一组采用串行方式,Job2,Job3采用并行方式。
Job0,Job1串行方式耗时等于两个Job耗时之和 49s+39s=88s
Job2,Job3并行方式耗时等于最先开始和最后结束时间只差17:02:12-17:01:15=57s
代码:
package com.cn.ctripotb;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.hive.HiveContext;import java.util.*;import java.util.concurrent.Callable;import java.util.concurrent.Executors;/** * Created by Administrator on 2016/9/12. */public class HotelTest { static ResourceBundle rb = ResourceBundle.getBundle("filepath"); public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MultiJobWithThread") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); //测试真实数据时要把这里放开 final DataFrame df = getHotelInfo(hiveContext); //没有多线程处理的情况,连续执行两个Action操作,生成两个Job df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class); df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class); //用Executor实现多线程方式处理Job java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Callable() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.submit(new Callable() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.shutdown(); } public static DataFrame getHotelInfo(HiveContext hiveContext){ String sql = "select * from common.dict_hotel_ol"; return hiveContext.sql(sql); }}
方式
两个
时间
处理
数据
线程
测试
明显
相同
之和
代码
任务
性能
情况
最先
程序
算子
生成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
守护网络安全的新颖题目
极下解析服务器错误
赫山市公安局网络安全攻防演练
轻量级服务器连接数据库错误
网络安全风险管控办法
软件开发平均日工资
软件开发平台是什么岗位
惠州支付软件开发联系人
网络安全法实务
exp怎么导出数据库的表
亚马逊服务器登录
台湾串口服务器
漯河微商软件开发
初中生网络安全知识有奖竞赛总结
phpa数据库管理
区县网络安全调研报告
从事软件开发后来辞职
做服务器运维管理
广州兰格网络技术
邢台市网络安全委员会
5g网络技术基础
网络安全与攻防 集中实训
乐安软件开发技术
网络安全平台账号查询
云南斗牛app软件开发
朱立凯无锡网络安全
服务器进入
漯河微商软件开发
华胜天成与网络安全
互联网企业网络安全怎么解决