需要加载mysql的flink-connector-jdbc-1.16.0.jar和mysql-connector-java-8.0.30.jar

下载:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.30/

下载:https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/

import os

from pyflink.table import EnvironmentSettings, DataTypes, StreamTableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

jars = []
for file in os.listdir(os.path.abspath(os.path.dirname(__file__))):
    if file.endswith('.jar'):
        file_path = os.path.abspath(file)
        jars.append(file_path)

str_jars = ';'.join(['file:///' + jar for jar in jars])
print(str_jars)
table_env.get_config().get_configuration().set_string("pipeline.jars", str_jars)
# table_env.execute_sql("""
#             CREATE TABLE my_source (
#               a VARCHAR
#             ) WITH (
#               'connector' = 'datagen',
#               'number-of-rows' = '10'
#             )
#         """)

# tab = table_env.from_path('my_source')
source_data = table_env.from_elements([(1, 2, "Leeaaa"), (3, 4, "Jaybbb"), (5, 6, "Jayccc"), (7, 8, "Leeddd")],
                                      ["value", "count", "name"])

table_env.create_temporary_view("my_source",source_data)


@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
    return s[begin:end]

#注册udf 可以在sql中调用
table_env.create_temporary_function("sub_string_fun", sub_string)
transformed_tab = source_data.select(sub_string(col('name'), 0, 3))
#这块  CREATE TABLE  不是创建表,而是创建连接,在你的数据库里应该先建好my_sink表
table_env.execute_sql("""
        CREATE TABLE my_sink (
          sum_res VARCHAR
        ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://localhost:3306/flink_st',
          'driver'= 'com.mysql.cj.jdbc.Driver',
          'username' = 'root',
          'password' = '123456',
          'table-name' = 'my_sink'
        )
    """)
table_env.execute_sql("INSERT INTO my_sink SELECT name FROM my_source").wait()
# 也可以运行下面语句插入,但是老报错。
# transformed_tab.execute_insert("my_sink")
# 插入数据后 ,就可以直接使用了
# table_env.execute_sql("SELECT name FROM my_source").print()
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐