如何进行flink中的kafka源码分析
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。最近一直在弄flink sql相关的
千家信息网最后更新 2025年12月02日如何进行flink中的kafka源码分析
今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。

flink源码如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory{ private ConcurrentHashMap
kafkaTableSources = new ConcurrentHashMap<>(); @Override public Map requiredContext() { Map context = new HashMap<>(); context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context; } @Override public List supportedProperties() { List properties = new ArrayList<>(); properties.add(KafkaConnectorDescriptor.DATABASE_KEY); properties.add(KafkaConnectorDescriptor.TABLE_KEY); return properties; } @Override public StreamTableSource createStreamTableSource(Map
properties) { //避免频繁的触发 是否需要加缓存 KafkaTableSource kafkaTableSource; String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY); String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY); if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder(); kafkaTableSource = builder .cluster(dataBase) .subject(table) .build(); kafkaTableSources.put(dataBase + table,kafkaTableSource); } else { kafkaTableSource = kafkaTableSources.get(dataBase + table); } return kafkaTableSource; }}
class Kafka08PBTableSource protected(topic: String, properties: Properties, schema: TableSchema, typeInformation: TypeInformation[Row], paramMap: util.LinkedHashMap[String, AnyRef], entryClass: String) extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST) new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest() }}下面用户自定义的kafka的sink类:
class Kafka08UDMPBTableSink (topic: String, properties: Properties, partitioner: Optional[FlinkKafkaPartitioner[Row]], paramMap: util.LinkedHashMap[String, AnyRef], serializationSchema: SerializationSchema[Row], fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]] ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) } override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes) override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes) } override def getFieldNames: Array[String]=this.fieldNames /** Returns the types of the table fields. */ override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner) dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames)) }}public class TrackRowDeserializationSchema implements SerializationSchema, DeserializationSchema
{ private static final long serialVersionUID = -2885556750743978636L; /** Type information describing the input type. */ private TypeInformation
typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass;}
public class TrackRowFormatFactory extends TableFormatFactoryBaseimplements SerializationSchemaFactory
, DeserializationSchemaFactory
{ public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false); } public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation); } @Override protected List
supportedFormatProperties() { final List properties = new ArrayList<>(); properties.add(TrackValidator.FORMAT_IN_SCHEMA); properties.add(TrackValidator.FORMAT_IN_CLASS); properties.add(TrackValidator.FORMAT_OUT_CLASS); properties.add(TrackValidator.FORMAT_OUT_SCHEMA); properties.add(TrackValidator.FORMAT_TYPE_INFORMATION); properties.add(TrackValidator.FORMAT_TYPE_VALUE); return properties; }}
看完上述内容,你们对如何进行flink中的kafka源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
源码
分析
内容
频繁
东西
同学
层次
更多
用户
目标
知识
篇文章
缓存
行业
资讯
资讯频道
问题
阶段
频道
进一
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
北京先进软件开发不二之选
网络技术的主要课程
软件开发游戏公司公司
网络安全技术能手竞赛试题
西峡直播软件开发
学职业网络技术怎么样
山西回收服务器小卡
网络3D游戏数据库
群晖服务器照片重新索引
山东电信服务器租用云服务器
如何重新构建数据库 ps4
服务器内存条顺序
知名的华为服务器
中职网络安全管理与评估
软件开发经验证明
2020年网络安全风险评估报告
网络安全法起草原则
米3清除所有数据库
录播服务器怎么选
无线网络技术速查方法
当你在服务器里有很多钻石时
组态软件开发报价
共享网站用什么服务器好
单位网络安全考核情况
宝德服务器前面维修口ip和密码
软件开发生命周期任务
游戏软件开发投资合同
中国图书全文数据库新课书馆
服务器机箱和主板怎么连接
数据库表的主键怎么查询