python怎么结合shell自动创建kafka的连接器
发表于:2025-11-09 作者:千家信息网编辑
千家信息网最后更新 2025年11月09日,这篇"python怎么结合shell自动创建kafka的连接器"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,
千家信息网最后更新 2025年11月09日python怎么结合shell自动创建kafka的连接器
这篇"python怎么结合shell自动创建kafka的连接器"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"python怎么结合shell自动创建kafka的连接器"文章吧。
环境
cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) [root@localhost ~]# uname -aLinux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linuxpython -VPython 2.7.5
安装连接oracle的python包
pip install cx_Oracle==7.3
获取oracle表信息
cat query_oracle.py #!/usr/bin/env pythonimport cx_Oracleimport sysimport osimport csvimport tracebackfile = open("oracle.txt", 'w').close()user = "test"passwd = "test"listener = '10.0.2.15:1521/orcl'conn = cx_Oracle.connect(user, passwd, listener)cursor = conn.cursor()sql = "select table_name from user_tables" cursor.execute(sql)LIST1=[]while True: row = cursor.fetchone() if row == None: break for table in row: #print table LIST1.append(table)LIST2=[]for i in LIST1: sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i cursor.execute(sql3) cursor.execute(sql3) row3 = cursor.fetchall() for data in row3: #LIST2.append(i) LIST2.extend(list(data)) LIST2.append(i) f=open('oracle.txt','a+') print >> f,LIST2 LIST2=[]#f=open('test.txt','a+')#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student'); #select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');去掉多余部分
cat auto.sh #!/bin/bash#python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " "#>oracle.txt>oracle_tables.txtcat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
cat oracle_tables.txt SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIMESNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE
shell 脚本处理表信息文件
cat connect.sh #!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l)#清空自动创建连接器的脚本>create-connect.sh#循环临时文件每一行for i in `seq $FILE_NUM`do FILE_LINE=$(sed -n ${i}p oracle_tables.txt) TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}') COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}') REAL_COL_NUM=`expr $COL_NUM - 2` #清空临时文件 >${TABLE_NAME}.txt >${TABLE_NAME}.sql #循环临时文件每行列名所在的列 for j in `seq 1 4 $REAL_COL_NUM` do k=`expr $j + 1` m=`expr $j + 2` n=`expr $j + 3` COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j}) COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k}) COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m}) COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n}) #判断列的数据类型是否是NUMBER if [ "$COL_DATA_TYPE" = "NUMBER" ] then #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中 echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt else #循环拼接SQL查询中的列名部分,追加到临时文件中 echo "$COL_NAME" >> ${TABLE_NAME}.txt fi done #拼接完整的SQL语句,追加到临时文件中 echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)> ${TABLE_NAME}.sql#循环追加每个表对应的连接器到自动创建连接器的脚本中cat >> create-connect.sh << EOFcurl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_source_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_$TABLE_NAME","mode": "{{ CONNECT_MODE }}","query": "$(cat ${TABLE_NAME}.sql)"}}' >/dev/null 2>&1EOFdone 说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。
增强版处理表信息脚本
#!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l)#清空创建连接器的脚本并追加echos函数> create-jdbc-connect.shcat >> create-jdbc-connect.sh << EOF#!/bin/bashechos(){case \$1 inred) echo -e "\033[31m \$2 \033[0m";;green) echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue) echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*) echo "\$2";;esac}EOF> create-jdbc-connect-time.shcat >> create-jdbc-connect-time.sh << EOF#!/bin/bashechos(){case \$1 inred) echo -e "\033[31m \$2 \033[0m";;green) echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue) echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*) echo "\$2";;esac}EOF#创建表相关文件目录mkdir -p ./TABLE_TIME#循环临时文件每一行for i in `seq $FILE_NUM`do FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt) TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}') COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}') REAL_COL_NUM=`expr $COL_NUM - 2` #清空临时文件 >./TABLE_TIME/${TABLE_NAME}_time.txt >./TABLE_TIME/${TABLE_NAME}_time.sql >./TABLE_TIME/${TABLE_NAME}.sql #循环临时文件每行列名所在的列 for j in `seq 1 4 $REAL_COL_NUM` do k=`expr $j + 1` m=`expr $j + 2` n=`expr $j + 3` COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j}) COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k}) COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m}) COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n}) #判断列的数据类型是否是NUMBER if [ "$COL_DATA_TYPE" = "NUMBER" ] then #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中 echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt else #循环拼接SQL查询中的列名部分,追加到临时文件中 echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt fi #判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中 TIME_COL=({{ TABLE_TIME_COL }}) for TIME in ${TIME_COL[@]} do if [ "$COL_NAME" = "$TIME" ] then echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt fi done done #拼接完整的SQL语句,追加到临时文件中 if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ] then #echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)> ./TABLE_TIME/${TABLE_NAME}_time.sql echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)> ./TABLE_TIME/${TABLE_NAME}_time.sql else echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql fi#循环追加每个表对应的连接器到自动创建连接器的脚本中if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]thencat >> create-jdbc-connect-time.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_time_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then echos green "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器成功"else echos red "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器失败"fiEOFelsecat >> create-jdbc-connect.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then echos green "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器成功"else echos red "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器失败"fiEOFfidone 以上就是关于"python怎么结合shell自动创建kafka的连接器"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。
连接器
文件
循环
脚本
内容
部分
成功
查询
信息
变量
所在
数据
文章
知识
篇文章
类型
语句
处理
一行
价值
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
东莞最大软件开发公司
服务器安全加固品牌
ftp服务器帐号分配
阿厦网络安全
无机晶体数据库
安尼卡服务器关闭
南大通用数据库 政府采购价
高中网络安全教育
怎么连service服务器
网络技术开发合同 百度文库
计算机网络技术有这个专业吗
苹果8连接苹果id服务器出错
三维模型数据库建设流程图
数据库控制参数表
服务器限制端口访问
网络安全法注销网站
镇江专业软件开发咨询报价
国产多媒体服务器
香港服务器好处
广播局网络安全部岗位职责
服务器后面叹号灯亮了
福建推荐服务器租用云主机
互联网科技头条新闻
腾讯软件开发专业
网络安全受两会代表关注
湖北数据库日志审计价格
刺激战场连接到服务器失败
计算机网络技术utp术语翻译
单位网络安全第一责任人是谁
服务器磁盘读写很高