千家信息网

集算器实现外部数据并行计算

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,文本并行SPL可将文本文件按体积大致分为N段,只读取其中一段。比如cardInfo.txt存储着一千万条人口信息,将其分为十份,取第二份,代码可以写作:AB1=file("d:\\temp\\
千家信息网最后更新 2025年12月02日集算器实现外部数据并行计算

文本并行

SPL可将文本文件按体积大致分为N段,只读取其中一段。比如cardInfo.txt存储着一千万条人口信息,将其分为十份,取第二份,代码可以写作:


AB
1=file("d:\\temp\\cardInfo千万.txt")
2=A1.import@t(;2:10)/直接读入内存
3=A1.cursor@t(;2:10).fetch@x()/游标方式读取

按体积大致分段,而不是按行数精确分段,目的是提高分段性能。比如在IDE中观察A2或A3的前几个字段,可以看到行数并非精确的100万(与具体数据有关):

indexcardNonamegenderprovincemobile
1308200310180525Alison ClintonfemaleIdaho1024627490
2709198311300191Abby WoodfemaleKansas19668466
31005199807060610George BushmaleCalifornia1019879226
1000005405199907050256Mark RowswellmaleIdaho1168620176

分段读取可应用于多线程计算,从而提高读取性能。比如用2个线程分别读取cardInfo.txt,各线程计算本段行数,最后合并为总行数,可用如下代码:

5fork to(2)=A1.cursor@t(;A5:2).total(count(1))/2线程分段
6=A5.sum()
/合并结果

语句fork语句适合算法较复杂的情况,当算法比较简单时,可用cursor@m直接分段读取。比如前面的代码可以改写如下:

7=A1.cursor@tm(;2).total(count(1))/2线程分段

上述代码指定了线程数,如果省略线程数,则用配置文件中的"parallet limit"当做默认线程数。假设parallet limit=2,则上述代码可以改写成:

8=A1.cursor@tm().total(count(1))/默认线程数分段

为了验证分段读取前后的性能差异,下面设计一个算法,分别用单线程和2线程计算cardInfo.txt的总行数,可以看到性能显著提升:

11=now()
12=A1.cursor@t().total(count(1))
13=interval@ms(A11,now())/未分段,20882ms
14

15=now()
16=A1.cursor@tm(;2).total(count(1))
17=interval@ms(A15,now())/2线程分段,12217ms

JDBC 并行

通过JDBC取数时,有时会遇到数据库负载虽然不重,但取数性能仍然较差的情况,这种情况下可以用并行取数提高性能。

比如Oracle数据库有一张通话记录表callrecord,记录数100万条,索引字段是callTime,且数据基本按该字段平均分布。采用非并行取数时,可以发现性能不够理想,代码如下:


AB
1=now()/记录时间,用于测试性能
2=connect("orcl")
3=A2.query@x("select * from callrecord")
4=interval@ms(A1,now())/非并行取数,17654ms

改为2线程并行取数后,可以看到性能提升明显,代码如下:

6=now()
7=connect("orcl").query@x("select min(callTime),max(callTime) from callrecordA")
8=2.(range(A7.#1,elapse@s(A7.#2,1),~:2))/时间区间参数列表
9fork A8=connect("orcl")
10
=B9.query@x("select * from callrecordA where callTime>=? and callTime
11=A9.conj()
12=interval@ms(A6,now())/并行取数,10045ms

既然要并行取数,就要把源数据分成多个区间,使每区间的数据量大致相等。在这个例子中,索引字段是时间类型callTime,所以先用A7求出callTime的数据范围,再用A8将该范围平均分成2个时间区间。之后在A9进行并行计算,每个线程以各自的时间区间为参数执行SQL,取数结果将大致相等。最后合并多线程的取数结果,作为最终结果。

函数range非常适合对数据分段。该函数可将某范围平均分为N个区间,获得第i个区间,且可根据范围的数据类型自动调整区间的数据类型。本例的范围类型是datetime,则函数range将范围按秒均分,返回类型也是datetime。如果范围类型是date,则函数range按天均分;如果范围类型是整数,则函数range按整数均分。

上面例子中,分段字段是索引,如果没有建立索引,则查询性能会出现下降。在这种情况下,并行取数仍然可以带来明显的性能提升,所以可以用相同的方法。

上面例子中,源数据基本按callTime平均分布,因此容易使各区间的数据量大致相等,如果源数据分布很不平均,可以考虑按行号分段。每种数据库都有生成行号的方法,比如oralce可用rownum。

除了单表单SQL并行取数,SPL也支持多表多SQL并行取数。比如某报表格式较复杂,需要SPL执行多个SQL,并按一定的格式拼出结果集。当采用非并行取数时,可以发现性能不够理想,代码如下:


AB
1=now()=connect("orcl")
2select count(1) from callrecordA where to_char(calltime,'yyyy')='2015'=B1.query(A2)
3select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201501'=B1.query(A3)
4select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201502'=B1.query(A4)
5select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201503'=B1.query(A5)
6select count(1) from callrecordA where to_char(calltime,'yyyy')='2016'=B1.query(A6)
7select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201601'=B1.query(A7)
8select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201602'=B1.query(A8)
9select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201603'=B1.query(A9)
10
=B1.close()
11=[B2:B9].new(~.#1:data)
12=interval@ms(A1,now())/非并行取数,2195毫秒

改为4线程并行取数后,可以看到性能提升明显,代码如下:

14=now()
15fork [A2:A9]=connect("orcl")
16
=B15.query@x(A15)
17=A15.new(~.#1:data)
18=interval@ms(A14,now())/4并行取数,1320毫秒

需要注意的是,并行取数时任务数可大于并行数。比如上面代码共8个任务,但同时执行的任务只有4个,其他待执行的任务排在队列中,如果某个小任务先执行完成,SPL会从队列中取下一个任务并执行它。可以看到,当任务数较多时,即使各任务负载相差较大,也能充分发挥硬件性能。

混合并行

当数据量太大时,除了分库计算,还可以进行混合数据源并行计算,后者性能更高。具体做法是:把数据分为两部分(或多部分),一部分存储在数据库中,通常是当前实时数据,一部分存储在组文件,通常是历史数据,再对两种数据源进行并行计算,从而获得更高性能。

比如历史订单存储在orders.ctx中,当前订单存储在数据库orcl中,请按年、月分组,对各组数据的amount字段求和。SPL代码如下:


AB
1forkselect extract(year from orderTime)y,extract(month from orderTime)m,sum(amount) amount from orders group by extract(year from orderTime),extract(month from orderTime)
2
=connect("orcl")
3
=B2.query@x(B1)
4fork=file("orders.ctx").create()
5
=B4.groups(year(ORDERTIME):Y,month(ORDERTIME):M;sum(AMOUNT):AMOUNT)
6=[A1,A4].conj()
7=A6.groups(Y,M;sum(AMOUNT):AMOUNT)

注意fork……fork……的用法。如果fork语句块下接非fork语句块,则两者顺序执行,如果fork语句块下接fork语句块,则两者并行执行。


数据 线程 性能 代码 区间 任务 范围 类型 字段 语句 函数 数据库 时间 结果 存储 情况 索引 明显 例子 文件 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全大客户经理职责 鼎隆网络技术 陕西服务器机柜货期 app 软件开发 工具 个体软件开发需要税务登记吗 软件开发在线计划 服务器保护后显示器黑屏 网络安全论文写作方向 mysql数据库监控脚本 淘宝网店数据库设计 绿园区通用网络技术服务哪家好 什么软件开发抖音直播 主机游戏登不上服务器 网络安全三同步的原因 商家打印订单显示请求服务器超时 数据库统计姓名分类的记录数 两个数据库用日期和名称筛选 软件测试跟软件开发哪个简单 信息网络安全的防范与措施 不慎删除的数据库能恢复回来吗 最牛逼的软件开发企业有哪些 从单片机转向软件开发怎么做 局机关网络安全工作制度 oracle数据库掉线 福州网络安全培训学费多少 2015年网络安全会议 阿里云服务器管理员密码忘了 网络安全科学技术 鹏城互联网科技有限公司 sql 查看数据库 命令
0