flink 笔记(scala + gradle)
flink Notes
flink 集群安装(standalon集群) (centos8环境)
flink要部署的集群,master节点要开放端口(8081,6123,37243), 并配置好master节点可以无密码登陆worker节点(通过ssh密钥登陆).安装
java, 到flink官网下载最新压缩包,并解压. 参考flink官方a.
dnf install java-1.8.0-openjdk-develb. 修改
flink路径下的config/flink.yml, 修改配置: ``` jobmanager.rpc.address: ${master节点的ip/domain} jobmanager.rpc.port: 6123jobmanager.heap.size: 1024m taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 3 parallelism.default: 3 ``` `jobmanager.rpc.address` 配置项指向 master 节点。也应该通过设置 `jobmanager.memory.process.size` 和 `taskmanager.memory.process.size` 配置项来定义 `Flink` 允许在每个节点上分配的最大内存值。这些值的单位是 MB。如果一些 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 conf/flink-conf.yaml 文件中重写` taskmanager.memory.process.size` 或 `taskmanager.memory.flink.size` 的默认值。最后,你必须提供集群上会被用作为 worker 节点的所有节点列表,也就是运行 TaskManager 的节点。编辑文件 conf/workers 并输入每个 worker 节点的 IP 或主机名。c. 向
config/masters文件中添加master节点的ip:portd. 向
config/workers文件添加所有的worker节点e. 把修改过配置的
flink配置文件+flink代码全部同步到其他的节点f. 在
master节点执行命令,启动集群./bin/start-cluster.sh
在Intellij IDEA中创建项目(scala + gradle)
gradle 项目
1. 新建 gradle 项目,
2. 在 app 文件夹上 右键 新建目录 加 scala
3. 在`build.gradle` 中加 `scala-library` 依赖; 设置好 mainClass
4. gradle build
maven 项目
file > Project Structure > Global Libraries : scala 版本要跟flink 对应的scala版本一致;
创建 flink 数据源 source (kafka), 使用函数的方式参考
通过 table api,得到datastream
val env = StreamExecutionEnvironment.getExecutionEnvironment
val t_env = StreamTableEnvironment.create(env)
t_env.executeSql(
"""
| CREATE TABLE xx (
| name STRING
| ) WITH (
| 'connector' = 'kafka'
| .....
| )
|""".stripMargin)
val xxTable = t_env.from("xx")
val xxStream = t_env.toDataStream(xxTable, classOf[XX) // 如果不指定 `XX` 类型,会得到`DataStream[ROW]`类型
输出flink处理结果: sink (elasticsearch)
t_env.executeSql(
"""
| CREATE TABLE sink (
| name STRING
| ) WITH (
| 'connector' = 'print'
| )
|""".stripMargin)
val dst = t_env.fromDataStream[XX](stream) // stream 为经过处理后的流, 如果`stream`不是`DataStream[ROW]`类型,需要指定 `XX` 类型
dst.executeInsert("sink")
gradle 打包
使用的包可以在maven repository搜索jar包
plugins {
id 'java'
id 'scala'
id 'application' // 这也是要添加的
id 'com.github.johnrengelman.shadow' version '7.0.0' // shadowjar 用于合并 manifest 等资源文件, 防止出现 xxx_factory 不对的情况
}
group 'org.static'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.scala-lang:scala-library:2.12.14' //添加后, IDEA 才能知道项目使用的scala sdk
// https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink
implementation 'org.apache.flink:flink-table-planner-blink_2.12:1.13.2'
// https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation 'com.google.code.gson:gson:2.8.8'
application {
mainClass = 'sql.main' // 设置entry文件
}
sourceCompatibility = 1.8
archivesBaseName = "zz"
version = 1.2
shadowJar {
zip64 = true // 要开启,防止文件数量过多报错
manifest {
attributes 'Main-Class': 'sql.main'
}
dependencies { // 在打包的时候,不需要打入包的依赖
exclude(dependency('org.apache.flink:flink-table-planner-blink_2.12:1.13.2'))
}
mergeServiceFiles() // 当项目使用多个`Java Service Provider`接口(interface)的库(SPI),需要修改META-INF文件
}
使用 shadowjar 打fat jar, 参考Gradle Shadow plugin + Gradle 历险记(四): 正确的打包jar的方式
遇到的问题
错误
scala Static methods in interface require: 需要在IDEAPreference > scala Compiler > Additional Compiler options : -target:jvm-1.8 参考 https://blog.csdn.net/YongDaiMe/article/details/108586263scala版本 一定要跟flinkscala版本一致提交集群时出现
Exception in thread "main" java.lang.NoClassDefFoundError: scala/Predef$错误 可能是因为没有在 gradle 的 build.gradle 依赖里加入 scala-library-{scala-version} 依赖/ scala 编译的版本跟flink集群编译用的scala版本不一致在
IDEA刷新gradle配置,scala类型的文件出现重新设置scala版本, 肯定是build.gradle缺少了scala-library依赖Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath., 这个可能是使用了多个SPI, 但是META-INF文件没有被修改导致的, 需要添加shadowjar { mergeServiceFiles() }
Pyflink 使用 参考
环境
# 安装 pyenv
sudo dnf groupinstall -y "Development Tools"
sudo dnf install -y zlib zlib-devel bzip2-devel openssl-devel sqlite-devel readline-devel
curl https://pyenv.run | bash
export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init --path)"
eval "$(pyenv init -)"
exec "$SHELL"
pyenv install xxx
yum install libffi-devel # 这一步很重要,不然会报错
pyenv uninstall 3.7.6
pyenv install 3.7.6
python -m pip install apache-flink==1.13.2
pyflink 本地执行 py 文件的时候, 如果用到了 jar 文件, 需要在代码中指定 t_env.get_config().get_configuration().set_string("pipeline.jars", "file://~/flink-sql-connector-kafka_2.11-1.13.0.jar")
如果是用 flink 提交集群运行, jar依赖包指定方式
- 要么放到
flink/lib目录下,然后重启集群; flink run -C file://xxxxx.jar -py xxxx.py就可以了,不需要重启flink集群flink run -m yarn-cluster -j flink-sql-connector-kafka_2.11-1.11.2.jar -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py main.py通过 -j 参数来指定一个 jar 包路径,多个 jar 包则使用多个 -j. 参考
java pojo 类:
- 类必须是 public 的
- 字段必须是 public的/private的(要有getter+setters)
flink 设置 job name
table_env.getConfig.getConfiguration.setString("pipeline.name", "job name")
flink state evolution 可以通过 avrohugger 利用 avro schema 生成 SpecificRecord 进行演进, 但是有问题, 实现不了删除字段
avrohugger {
// sourceDirectories = files('src-avro')
// destinationDirectory = file('src-scala')
// namespaceMapping = [ 'com.example' : 'com.zlad' ]
// typeMapping {
// protocolType = ScalaADT
// }
sourceFormat = SpecificRecord
}
flink state evolution 通过avro实现
plugins {
id 'java'
id 'application'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}
import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask
dependencies {
// https://mvnrepository.com/artifact/com.github.davidmc24.gradle.plugin/gradle-avro-plugin
implementation 'com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.3.0'
implementation "org.apache.avro:avro:1.11.0"
}
avro {
fieldVisibility = "PUBLIC"
}
def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
source("src/main/avro/*.avdl")
outputDir = file("src/main/avros")
}
tasks.named("compileJava").configure {
source(generateAvro)
}
通过修改 avdl , 可实现state field增删. 在flink job 代码中直接引用编译好的 avdl 对用的 java class.
