千家信息网

kafka数据源Flink Kafka Consumer分析

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"kafka数据源Flink Kafka Consumer分析",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka数据源Flin
千家信息网最后更新 2025年12月02日kafka数据源Flink Kafka Consumer分析

这篇文章主要讲解了"kafka数据源Flink Kafka Consumer分析",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka数据源Flink Kafka Consumer分析"吧!

一、open()方法调用时机

FlinkKafkaConsumer继承自RichFunction,具有生命周期方法open()。那么flink是何时调用FlinkKafkaConsumer的open()方法呢?

StreamTask在调用算子程序之前,会执行beforeInvoke()方法,在该方法中会初始化算子的算子并且执行open()方法:

    operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());

initializeStateAndOpenOperators()方法中循环对算子初始化:

       protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {                for (StreamOperatorWrapper operatorWrapper : getAllOperators(true)) {                        StreamOperator operator = operatorWrapper.getStreamOperator();                        operator.initializeState(streamTaskStateInitializer);                        operator.open();                }        }

kafka source对应的operator为StreamSource,其open()方法为

      public void open() throws Exception {                super.open();                FunctionUtils.openFunction(userFunction, new Configuration());        }

FunctionUtils的openFunction()即执行算子(要继承RichFunction)的open()方法:

 public static void openFunction(Function function, Configuration parameters) throws Exception{                if (function instanceof RichFunction) {                        RichFunction richFunction = (RichFunction) function;                        richFunction.open(parameters);                }        }

二、运行时上下文RuntimeContext何时赋值?

在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的构造函数中,通过工厂类StreamOperatorFactory来创建StreamOperator。kafka source对应的StreamOperatorFactory为SimpleOperatorFactory,createStreamOperator()方法中调用StreamOperator的setup()方法:

 public > T createStreamOperator(StreamOperatorParameters parameters) {                if (operator instanceof AbstractStreamOperator) {                        ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);                }                if (operator instanceof SetupableStreamOperator) {                        ((SetupableStreamOperator) operator).setup(                                parameters.getContainingTask(),                                parameters.getStreamConfig(),                                parameters.getOutput());                }                return (T) operator;        }

kafka source对应的StreamOperator为StreamSource,其实现了SetupableStreamOperator接口。其setup方法在父类AbstractUdfStreamOperator:

       public void setup(StreamTask containingTask, StreamConfig config, Output> output) {                super.setup(containingTask, config, output);                FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());        }

FunctionUtils.setFunctionRuntimeContext()来给算子设置RuntimeContext。设置的RuntimeContext在AbstractStreamOperator的setup()方法中,为StreamingRuntimeContext:

          this.runtimeContext = new StreamingRuntimeContext(                        environment,                        environment.getAccumulatorRegistry().getUserMap(),                        getMetricGroup(),                        getOperatorID(),                        getProcessingTimeService(),                        null,                        environment.getExternalResourceInfoProvider());

三、FlinkKafkaConsumer的run()方法

Flink调用FlinkKafkaConsumer的run()方法来生产数据。run()方法的处理逻辑:

①创建KafkaFetcher,来拉取数据

          this.kafkaFetcher = createFetcher(                                sourceContext,                                subscribedPartitionsToStartOffsets,                                watermarkStrategy,                                (StreamingRuntimeContext) getRuntimeContext(),                                offsetCommitMode,                                getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),                                useMetrics);

②KafkaFetcher的runFetchLoop()中创建KafkaConsumerThread线程来循环拉取kafka数据。KafkaConsumerThread通过KafkaConsumer拉取kafka数据,并交给Handover

                          if (records == null) {                                        try {                                                records = consumer.poll(pollTimeout);                                        }                                        catch (WakeupException we) {                                                continue;                                        }                                }                                try {                                        handover.produce(records);                                        records = null;                                }

KafkaFetcher通过Handover获取拉取的kafka数据

                   while (running) {                                // this blocks until we get the next records                                // it automatically re-throws exceptions encountered in the consumer thread                                final ConsumerRecords records = handover.pollNext();                                // get the records for each topic partition                                for (KafkaTopicPartitionState partition : subscribedPartitionStates()) {                                        List> partitionRecords =                                                records.records(partition.getKafkaPartitionHandle());                                        partitionConsumerRecordsHandler(partitionRecords, partition);                                }                        }

③通过SourceContext中的Output>来发送数据给下一个算子

             public void collect(T element) {                        synchronized (lock) {                                output.collect(reuse.replace(element));                        }                }

SourceContext在StreamSource的run()方法中通过StreamSourceContexts.getSourceContext()创建。Output>在OperatorChain的createOutputCollector()创建,为其返回值。

           for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {                        @SuppressWarnings("unchecked")                        RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge);                        allOutputs.add(new Tuple2<>(output, outputEdge));                }

当有一个输出时,是RecordWriterOutput;多个时,是CopyingDirectedOutput或DirectedOutput

④单个输出RecordWriterOutput时,是通过成员属性RecordWriter实例来输出。RecordWriter通过StreamTask的createRecordWriterDelegate()创建,RecordWriterDelegate为RecordWriter的代理类,内部持有RecordWriter实例:

      public static  RecordWriterDelegate>> createRecordWriterDelegate(                        StreamConfig configuration,                        Environment environment) {                List>>> recordWrites = createRecordWriters(                        configuration,                        environment);                if (recordWrites.size() == 1) {                        return new SingleRecordWriter<>(recordWrites.get(0));                } else if (recordWrites.size() == 0) {                        return new NonRecordWriter<>();                } else {                        return new MultipleRecordWriters<>(recordWrites);                }        }        private static  List>>> createRecordWriters(                        StreamConfig configuration,                        Environment environment) {                List>>> recordWriters = new ArrayList<>();                List outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());                for (int i = 0; i < outEdgesInOrder.size(); i++) {                        StreamEdge edge = outEdgesInOrder.get(i);                        recordWriters.add(                                createRecordWriter(                                        edge,                                        i,                                        environment,                                        environment.getTaskInfo().getTaskName(),                                        edge.getBufferTimeout()));                }                return recordWriters;        }

outEdgesInOrder来源于StreamGraph中的StreamNode的List outEdges。

创建RecordWriter时,根据StreamEdge的StreamPartitioner outputPartitioner的isBroadcast()方法判断是BroadcastRecordWriter还是ChannelSelectorRecordWriter:

     public RecordWriter build(ResultPartitionWriter writer) {                if (selector.isBroadcast()) {                        return new BroadcastRecordWriter<>(writer, timeout, taskName);                } else {                        return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);                }        }

outputPartitioner是根据上下游节点并行度是否一致来确定:

                 if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {                                partitioner = new ForwardPartitioner();                        } else if (partitioner == null) {                                partitioner = new RebalancePartitioner();                        }

BroadcastRecordWriter和ChannelSelectorRecordWriter最终都会调用成员属性ResultPartitionWriter targetPartition的flush()方法来输出数据。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根据对应的ResultPartitionDeploymentDescriptor来判断是ConsumableNotifyingResultPartitionWriterDecorator还是直接传入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator会把消息直接传给下个节点消费,通过ResultPartitionConsumableNotifier来通知:

   public static ResultPartitionWriter[] decorate(                        Collection descs,                        ResultPartitionWriter[] partitionWriters,                        TaskActions taskActions,                        JobID jobId,                        ResultPartitionConsumableNotifier notifier) {                ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];                int counter = 0;                for (ResultPartitionDeploymentDescriptor desc : descs) {                        if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) {                                consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator(                                        taskActions,                                        jobId,                                        partitionWriters[counter],                                        notifier);                        } else {                                consumableNotifyingPartitionWriters[counter] = partitionWriters[counter];                        }                        counter++;                }                return consumableNotifyingPartitionWriters;        }

partitionWriters通过 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create() 创建。 ResultPartition的输出是通过成员属性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:

      private void createSubpartitions(                        ResultPartition partition,                        ResultPartitionType type,                        BoundedBlockingSubpartitionType blockingSubpartitionType,                        ResultSubpartition[] subpartitions) {                // Create the subpartitions.                if (type.isBlocking()) {                        initializeBoundedBlockingPartitions(                                subpartitions,                                partition,                                blockingSubpartitionType,                                networkBufferSize,                                channelManager);                } else {                        for (int i = 0; i < subpartitions.length; i++) {                                subpartitions[i] = new PipelinedSubpartition(i, partition);                        }                }        }

流式任务时,ResultSubpartition为PipelinedSubpartition。

四、数据写出

4.1 ResultPartitionConsumableNotifier通知

ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:

       private JobTable.Connection associateWithJobManager(                        JobTable.Job job,                        ResourceID resourceID,                        JobMasterGateway jobMasterGateway) {                ......        ......                ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(                        jobMasterGateway,                        getRpcService().getExecutor(),                        taskManagerConfiguration.getTimeout());                ......        ......        }

RpcResultPartitionConsumableNotifier远程调用JobMaster的scheduleOrUpdateConsumers()方法,传入ResultPartitionID partitionId

4.1.1 JobMaster的scheduleOrUpdateConsumers()

JobMaster通过ExecutionGraph的scheduleOrUpdateConsumers()通知下游消费算子。

这里有两个关键代码:

①从本算子ExecutionVertex的成员Map resultPartitions中取出该分区对应的生产消费信息,这些信息存储在IntermediateResultPartition中;

       void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {                .......                final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());                .......                if (partition.getIntermediateResult().getResultType().isPipelined()) {                        // Schedule or update receivers of this partition                        execution.scheduleOrUpdateConsumers(partition.getConsumers());                }                else {                        throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +                                        "pipelined partitions.");                }        }

从IntermediateResultPartition取出消费者List> allConsumers;

从ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出执行任务;

②Execution的sendUpdatePartitionInfoRpcCall()方法通过rpc调用TaskExcutor的updatePartitions()方法来执行下游消费者算子

     private void sendUpdatePartitionInfoRpcCall(                        final Iterable partitionInfos) {                final LogicalSlot slot = assignedResource;                if (slot != null) {                        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();                        final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();                        CompletableFuture updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);                        updatePartitionsResultFuture.whenCompleteAsync(                                (ack, failure) -> {                                        // fail if there was a failure                                        if (failure != null) {                                                fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() +                                                        "] on TaskManager " + taskManagerLocation + " failed", failure));                                        }                                }, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor());                }        }
4.1.2 TaskExecutor的updatePartitions()

TaskExecutor的updatePartitions()来更新分区信息。如果之前InputChannel是未知的,则进行更新。SimpleInputGate的updateInputChannel():

   public void updateInputChannel(                        ResourceID localLocation,                        NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {                synchronized (requestLock) {                        if (closeFuture.isDone()) {                                // There was a race with a task failure/cancel                                return;                        }                        IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();                        InputChannel current = inputChannels.get(partitionId);                        if (current instanceof UnknownInputChannel) {                                UnknownInputChannel unknownChannel = (UnknownInputChannel) current;                                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);                                InputChannel newChannel;                                if (isLocal) {                                        newChannel = unknownChannel.toLocalInputChannel();                                } else {                                        RemoteInputChannel remoteInputChannel =                                                unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());                                        remoteInputChannel.assignExclusiveSegments();                                        newChannel = remoteInputChannel;                                }                                LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);                                inputChannels.put(partitionId, newChannel);                                channels[current.getChannelIndex()] = newChannel;                                if (requestedPartitionsFlag) {                                        newChannel.requestSubpartition(consumedSubpartitionIndex);                                }                                for (TaskEvent event : pendingEvents) {                                        newChannel.sendTaskEvent(event);                                }                                if (--numberOfUninitializedChannels == 0) {                                        pendingEvents.clear();                                }                        }                }        }

4.2 PipelinedSubpartition写出

记录先写到缓存ArrayDeque buffers中,然后通过PipelinedSubpartitionView readView的notifyDataAvailable() -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法来通知。

4.2.1 BufferAvailabilityListener创建时机?

①TaskManagerServices在创建ShuffleEnvironment时,通过 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 获取Netty服务端的处理器PartitionRequestServerHandler:

 public ChannelHandler[] getServerChannelHandlers() {                PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();                PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(                        partitionProvider,                        taskEventPublisher,                        queueOfPartitionQueues);                return new ChannelHandler[] {                        messageEncoder,                        new NettyMessage.NettyMessageDecoder(),                        serverHandler,                        queueOfPartitionQueues                };        }

②PartitionRequestServerHandler在获取到客户端发送的PartitionRequest 消息时, 创建CreditBasedSequenceNumberingViewReader,并通过 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 来设置CreditBasedSequenceNumberingViewReader

③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法调用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:

       void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {                // The notification might come from the same thread. For the initial writes this                // might happen before the reader has set its reference to the view, because                // creating the queue and the initial notification happen in the same method call.                // This can be resolved by separating the creation of the view and allowing                // notifications.                // TODO This could potentially have a bad performance impact as in the                // worst case (network consumes faster than the producer) each buffer                // will trigger a separate event loop task being scheduled.                ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));        }

感谢各位的阅读,以上就是"kafka数据源Flink Kafka Consumer分析"的内容了,经过本文的学习后,相信大家对kafka数据源Flink Kafka Consumer分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0