Pyflink教程(五):连接mysql
·
需要加载mysql的flink-connector-jdbc-1.16.0.jar和mysql-connector-java-8.0.30.jar
下载:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.30/
下载:https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/
import os
from pyflink.table import EnvironmentSettings, DataTypes, StreamTableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
jars = []
for file in os.listdir(os.path.abspath(os.path.dirname(__file__))):
if file.endswith('.jar'):
file_path = os.path.abspath(file)
jars.append(file_path)
str_jars = ';'.join(['file:///' + jar for jar in jars])
print(str_jars)
table_env.get_config().get_configuration().set_string("pipeline.jars", str_jars)
# table_env.execute_sql("""
# CREATE TABLE my_source (
# a VARCHAR
# ) WITH (
# 'connector' = 'datagen',
# 'number-of-rows' = '10'
# )
# """)
# tab = table_env.from_path('my_source')
source_data = table_env.from_elements([(1, 2, "Leeaaa"), (3, 4, "Jaybbb"), (5, 6, "Jayccc"), (7, 8, "Leeddd")],
["value", "count", "name"])
table_env.create_temporary_view("my_source",source_data)
@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
#注册udf 可以在sql中调用
table_env.create_temporary_function("sub_string_fun", sub_string)
transformed_tab = source_data.select(sub_string(col('name'), 0, 3))
#这块 CREATE TABLE 不是创建表,而是创建连接,在你的数据库里应该先建好my_sink表
table_env.execute_sql("""
CREATE TABLE my_sink (
sum_res VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink_st',
'driver'= 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '123456',
'table-name' = 'my_sink'
)
""")
table_env.execute_sql("INSERT INTO my_sink SELECT name FROM my_source").wait()
# 也可以运行下面语句插入,但是老报错。
# transformed_tab.execute_insert("my_sink")
# 插入数据后 ,就可以直接使用了
# table_env.execute_sql("SELECT name FROM my_source").print()
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)