千家信息网

Flink提交任务的方法是什么

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容主要讲解"Flink提交任务的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink提交任务的方法是什么"吧!一、关键组件任务提交过程
千家信息网最后更新 2025年12月02日Flink提交任务的方法是什么

本篇内容主要讲解"Flink提交任务的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink提交任务的方法是什么"吧!

一、关键组件

任务提交过程中有三个重要组件:Dispatcher、JobMaster、JobManagerRunnerImpl。通过下面调用路径先找到MiniDispatcher:

YarnJobClusterEntrypoint的main() -> ClusterEntrypoint的runCluster() -> DefaultDispatcherResourceManagerComponentFactory的create() -> DefaultDispatcherRunnerFactory的createDispatcherRunner() -> DefaultDispatcherRunner的grantLeadership() -> JobDispatcherLeaderProcess的onStart() -> DefaultDispatcherGatewayServiceFactory的create() -> JobDispatcherFactory的createDispatcher() -> MiniDispatcher的start()

(1)Dispatcher

负责接收任务提交请求,并分给JobManager执行;

Dispatcher启动时,会运行startRecoveredJobs()来启动需要恢复的任务。当Flink on Yarn模式时,MiniDispatcher将当前任务传入到需要恢复的任务中,这样就实现了任务的提交启动

(2)JobManagerRunner

负责运行JobMaster

(3)JobMaster

负责运行任务,对应旧版的JobManager;

一个任务对应一个JobMaster;

二、JobMaster执行任务

在JobMaster中通过Scheduler、Execution组件来执行一个任务。将任务DAG中每个节点算子分配给TaskManager中的TaskExecutor运行。

Execution的start()方法中通过rpc远程调用TaskExecutor的submitTask()方法:

  public void deploy() throws JobException {                        ......                try {                        ......                        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();                        final ComponentMainThreadExecutor jobMasterMainThreadExecutor =                                vertex.getExecutionGraph().getJobMasterMainThreadExecutor();                                                CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)                                .thenCompose(Function.identity())                                .whenCompleteAsync(                                        .....,                                        jobMasterMainThreadExecutor);                }                catch (Throwable t) {                        ......                }        }

三、TaskExecutor运行算子节点任务

TaskExecutor的submitTask()方法中通过创建org.apache.flink.runtime.taskmanager.Task来运行算子任务。Task的doRun()方法中通过算子节点对应的执行类AbstractInvokable来运行算子的处理逻辑,每个算子对应的执行类AbstractInvokable在客户端提交任务时确定,StreamExecutionEnvironment的addOperator():

    public  void addOperator(                        Integer vertexID,                        @Nullable String slotSharingGroup,                        @Nullable String coLocationGroup,                        StreamOperatorFactory operatorFactory,                        TypeInformation inTypeInfo,                        TypeInformation outTypeInfo,                        String operatorName) {                Class invokableClass =                                operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;                addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,                                outTypeInfo, operatorName, invokableClass);        }

当是流式任务时,调用StreamTask的invoke()方法。当是source节点时,通过调用链 StreamTask.invoke() -> StreamTask.runMailboxLoop() -> MailboxProcessor.runMailboxLoop() -> SourceStreamTask.processInput() :

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {                controller.suspendDefaultAction();                // Against the usual contract of this method, this implementation is not step-wise but blocking instead for                // compatibility reasons with the current source interface (source functions run as a loop, not in steps).                sourceThread.setTaskDescription(getName());                sourceThread.start();                sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {                        if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {                                mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));                        } else if (!isFinished && sourceThreadThrowable != null) {                                mailboxProcessor.reportThrowable(sourceThreadThrowable);                        } else {                                mailboxProcessor.allActionsCompleted();                        }                });        }

创建线程LegacySourceFunctionThread实例,来开启单独生产数据的线程。LegacySourceFunctionThread的run()方法中调用StreamSource的run()方法:

     public void run(final Object lockingObject,                        final StreamStatusMaintainer streamStatusMaintainer,                        final Output> collector,                        final OperatorChain operatorChain) throws Exception {                final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();                final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();                final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()                        ? getExecutionConfig().getLatencyTrackingInterval()                        : configuration.getLong(MetricOptions.LATENCY_INTERVAL);                LatencyMarksEmitter latencyEmitter = null;                if (latencyTrackingInterval > 0) {                        latencyEmitter = new LatencyMarksEmitter<>(                                getProcessingTimeService(),                                collector,                                latencyTrackingInterval,                                this.getOperatorID(),                                getRuntimeContext().getIndexOfThisSubtask());                }                final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();                this.ctx = StreamSourceContexts.getSourceContext(                        timeCharacteristic,                        getProcessingTimeService(),                        lockingObject,                        streamStatusMaintainer,                        collector,                        watermarkInterval,                        -1);                try {                        userFunction.run(ctx);                        // if we get here, then the user function either exited after being done (finite source)                        // or the function was canceled or stopped. For the finite source case, we should emit                        // a final watermark that indicates that we reached the end of event-time, and end inputs                        // of the operator chain                        if (!isCanceledOrStopped()) {                                // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,                                // so we still need the following call to end the input                                synchronized (lockingObject) {                                        operatorChain.endHeadOperatorInput(1);                                }                        }                } finally {                        if (latencyEmitter != null) {                                latencyEmitter.close();                        }                }        }

StreamSource的run()方法中调用 userFunction.run(ctx); 当数据源是kafka时,userFunction为FlinkKafkaConsumerBase

3.1 userFunction和 headOperator

最后执行run()的headOperator和算子程序userFunction是在添加算子时确定的,比如添加kafka数据源时

 environment.addSource(new FlinkKafkaConsumer(......));

最后调用的addSource()方法:

   public  DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) {                TypeInformation resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);                boolean isParallel = function instanceof ParallelSourceFunction;                clean(function);                final StreamSource sourceOperator = new StreamSource<>(function);                return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);        }

headOperator为StreamSource,StreamSource中的userFunction为FlinkKafkaConsumer

到此,相信大家对"Flink提交任务的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

任务 方法 算子 运行 节点 中通 数据 组件 内容 数据源 线程 学习 实用 更深 重要 三个 关键 兴趣 实例 实用性 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 如何管理阿里云服务器的数据库 excel服务器做企业管理软件 北京约牛网络技术有限公司 潍坊市网络技术公司电话 工信部网络安全局杜广达 网络技术招聘启事文案 数据库分析为什么最难 exl导入erp数据库数据 网络安全宣传展板图片 计算机系统网络安全管理办法 mysql数据库的优化 青海省西宁市软件开发公司 数据库在生活应用 青岛app软件开发哪家靠谱 湖北网络技术开发概况 数据库实体怎么恢复到数据库 饥荒联机本地服务器无法启动 崇明区海航软件开发哪家好 瀚海国际网络技术有限公司 视频会议拼接服务器 阿里云服务器618大促 法国对外安全总局网络安全 网络技术招聘启事文案 测试中数据库重要吗 湖州移动网络技术有限公司 网络安全事件应急处理 审判之逝电竞社无法连接服务器 设置ipv6对网络安全 学习网络技术往哪方面创业 西安网络安全违法举报网站
0