flink Notes

  1. flink要部署的集群,master节点要开放端口(8081,6123,37243), 并配置好 master节点可以无密码登陆worker节点(通过ssh密钥登陆).

  2. 安装java, 到flink 官网下载最新压缩包,并解压. 参考flink官方

    a. dnf install java-1.8.0-openjdk-devel

    b. 修改flink路径下的config/flink.yml, 修改配置: ``` jobmanager.rpc.address: ${master节点的ip/domain} jobmanager.rpc.port: 6123

     jobmanager.heap.size: 1024m
     taskmanager.memory.process.size: 1568m
    
     taskmanager.numberOfTaskSlots: 3
     parallelism.default: 3
     ```
     `jobmanager.rpc.address` 配置项指向 master 节点。也应该通过设置 `jobmanager.memory.process.size` 和 `taskmanager.memory.process.size` 配置项来定义 `Flink` 允许在每个节点上分配的最大内存值。这些值的单位是 MB。如果一些 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 conf/flink-conf.yaml 文件中重写` taskmanager.memory.process.size` 或 `taskmanager.memory.flink.size` 的默认值。最后,你必须提供集群上会被用作为 worker 节点的所有节点列表,也就是运行 TaskManager 的节点。编辑文件 conf/workers 并输入每个 worker 节点的 IP 或主机名。
    

    c. 向config/masters文件中添加master节点的ip:port

    d. 向config/workers文件添加所有的worker节点

    e. 把修改过配置的flink配置文件+flink代码全部同步到其他的节点

    f. 在master节点执行命令,启动集群 ./bin/start-cluster.sh

Intellij IDEA中创建项目(scala + gradle)

gradle 项目
1. 新建 gradle 项目, 
2. 在 app 文件夹上 右键 新建目录 加 scala
3. 在`build.gradle` 中加 `scala-library` 依赖; 设置好 mainClass 
4. gradle build
maven 项目
file > Project Structure > Global Libraries : scala 版本要跟flink 对应的scala版本一致;

通过 table api,得到datastream

val env = StreamExecutionEnvironment.getExecutionEnvironment
val t_env = StreamTableEnvironment.create(env)

t_env.executeSql(
"""
| CREATE TABLE xx (
|     name STRING
| ) WITH (
|     'connector' = 'kafka'
|     .....
| )
|""".stripMargin)

val xxTable = t_env.from("xx")
val xxStream = t_env.toDataStream(xxTable, classOf[XX) // 如果不指定 `XX` 类型,会得到`DataStream[ROW]`类型

输出flink处理结果: sink (elasticsearch)

t_env.executeSql(
"""
| CREATE TABLE sink (
|     name STRING
| ) WITH (
|     'connector' = 'print'
| )
|""".stripMargin)

val dst = t_env.fromDataStream[XX](stream) // stream 为经过处理后的流, 如果`stream`不是`DataStream[ROW]`类型,需要指定 `XX` 类型
dst.executeInsert("sink")

gradle 打包

使用的包可以在maven repository搜索jar

plugins {
    id 'java'
    id 'scala'
    id 'application' // 这也是要添加的
    id 'com.github.johnrengelman.shadow' version '7.0.0' // shadowjar 用于合并 manifest 等资源文件, 防止出现 xxx_factory 不对的情况
}

group 'org.static'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.scala-lang:scala-library:2.12.14' //添加后, IDEA 才能知道项目使用的scala sdk

    // https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink
    implementation 'org.apache.flink:flink-table-planner-blink_2.12:1.13.2'

    // https://mvnrepository.com/artifact/com.google.code.gson/gson
    implementation 'com.google.code.gson:gson:2.8.8'


application {
    mainClass = 'sql.main' // 设置entry文件
}


sourceCompatibility = 1.8
archivesBaseName = "zz"
version = 1.2

shadowJar {
    zip64 = true // 要开启,防止文件数量过多报错
    manifest {
        attributes 'Main-Class': 'sql.main'
    }
    dependencies { // 在打包的时候,不需要打入包的依赖
        exclude(dependency('org.apache.flink:flink-table-planner-blink_2.12:1.13.2'))
    }
    mergeServiceFiles() // 当项目使用多个`Java Service Provider`接口(interface)的库(SPI),需要修改META-INF文件
}

使用 shadowjarfat jar, 参考Gradle Shadow plugin + Gradle 历险记(四): 正确的打包jar的方式


遇到的问题

  1. 错误scala Static methods in interface require: 需要在IDEA Preference > scala Compiler > Additional Compiler options : -target:jvm-1.8 参考 https://blog.csdn.net/YongDaiMe/article/details/108586263

  2. scala版本 一定要跟flink scala版本一致

  3. 提交集群时出现Exception in thread "main" java.lang.NoClassDefFoundError: scala/Predef$ 错误 可能是因为没有在 gradle 的 build.gradle 依赖里加入 scala-library-{scala-version} 依赖/ scala 编译的版本跟flink集群编译用的scala版本不一致

  4. IDEA刷新 gradle 配置, scala类型的文件出现重新设置 scala版本, 肯定是 build.gradle 缺少了 scala-library 依赖

  5. Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath., 这个可能是使用了多个 SPI, 但是META-INF文件没有被修改导致的, 需要添加

    shadowjar {
        mergeServiceFiles()
    }
    

    参考


环境

# 安装 pyenv
sudo dnf groupinstall -y "Development Tools"
sudo dnf install -y zlib zlib-devel bzip2-devel openssl-devel sqlite-devel readline-devel

curl https://pyenv.run | bash

export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init --path)"
eval "$(pyenv init -)"

exec "$SHELL"

pyenv install xxx 

yum install libffi-devel  # 这一步很重要,不然会报错
pyenv uninstall 3.7.6
pyenv install 3.7.6
python -m pip install apache-flink==1.13.2

pyflink 本地执行 py 文件的时候, 如果用到了 jar 文件, 需要在代码中指定 t_env.get_config().get_configuration().set_string("pipeline.jars", "file://~/flink-sql-connector-kafka_2.11-1.13.0.jar")

如果是用 flink 提交集群运行, jar依赖包指定方式

  1. 要么放到 flink/lib 目录下,然后重启集群;
  2. flink run -C file://xxxxx.jar -py xxxx.py 就可以了,不需要重启flink集群
  3. flink run -m yarn-cluster -j flink-sql-connector-kafka_2.11-1.11.2.jar -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py main.py 通过 -j 参数来指定一个 jar 包路径,多个 jar 包则使用多个 -j. 参考

java pojo 类:

  1. 类必须是 public 的
  2. 字段必须是 public的/private的(要有getter+setters)

flink 设置 job name table_env.getConfig.getConfiguration.setString("pipeline.name", "job name")


flink state evolution 可以通过 avrohugger 利用 avro schema 生成 SpecificRecord 进行演进, 但是有问题, 实现不了删除字段

avrohugger {
    // sourceDirectories = files('src-avro')
    // destinationDirectory = file('src-scala')
    // namespaceMapping = [ 'com.example' : 'com.zlad' ]
    // typeMapping {
    //     protocolType = ScalaADT
    // }
    sourceFormat = SpecificRecord
}

plugins {
    id 'java'
    id 'application'
    id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

dependencies {
    // https://mvnrepository.com/artifact/com.github.davidmc24.gradle.plugin/gradle-avro-plugin
    implementation 'com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.3.0'
    implementation "org.apache.avro:avro:1.11.0"
}

avro {
    fieldVisibility = "PUBLIC"
}

def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
    source("src/main/avro/*.avdl")
    outputDir = file("src/main/avros")
}

tasks.named("compileJava").configure {
    source(generateAvro)
}

通过修改 avdl , 可实现state field增删. 在flink job 代码中直接引用编译好的 avdl 对用的 java class.