一、用对执行环境
Flink 支持将程序打包为 JAR 再提交到集群执行。要让相同代码在本地/集群都“识大体”,务必使用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 当你从 命令行或 Web UI 提交 JAR 时,它会自动作为集群环境运行。
- 当你在 IDE 里直接运行
main()时,它会自动退化为本地环境(Local)。
这意味着:同一套代码可以在不同环境下复用,无需条件编译或分支判断。
二、目录结构与最小可运行示例
flink-demo/
├─ src/main/java/***/example/WordCountJob.java
├─ pom.xml
└─ ...
package ***.example;
import org.apache.flink.api.***mon.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WordCountJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.fromElements("hello flink", "hello world");
lines
.flatMap((String s, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) -> {
for (String w : s.split("\\s+")) out.collect(Tuple2.of(w, 1));
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t -> t.f0)
.sum(1)
.print();
env.execute("word-count-demo");
}
}
三、设置入口类 & 正确依赖范围
1)、pom.xml 关键点
-
入口类写进 Manifest:
Main-Class - Flink 依赖一般用
provided(由集群提供),避免把整套 Flink 打进你的 JAR - 使用 maven-shade-plugin 生成 fat-jar(只打进你的业务依赖;如需 relocate 冲突包请配置)
<project>
<properties>
<maven.***piler.source>11</maven.***piler.source>
<maven.***piler.target>11</maven.***piler.target>
<flink.version>YOUR_FLINK_VERSION</flink.version>
</properties>
<dependencies>
<!-- 由集群提供,避免重复打包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 若需要 Table/Connector/其他,请按需添加,通常也为 provided -->
</dependencies>
<build>
<plugins>
<!-- 统一打包并写入 Manifest 的 Main-Class -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>***.example.WordCountJob</mainClass>
</transformer>
</transformers>
<!-- 如有 Guava/Jackson/***ty 冲突,使用 relocate:
<relocations>
<relocation>
<pattern>***.google.***mon</pattern>
<shadedPattern>shadow.***.google.***mon</shadedPattern>
</relocation>
</relocations>
-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2) 构建命令
mvn -DskipTests clean package
# 产物:target/your-artifact-*-shaded.jar (名称依配置而定)
四、JAR 的入口类识别:program-class vs main-class
Flink 在加载你的 JAR 时,按以下顺序寻找入口:
-
Manifest 中的
program-class或main-class-
两者都存在时,
program-class优先
-
两者都存在时,
-
如果 Manifest 里都没有,命令行和 Web UI 都支持手动指定入口类
1) 手动覆盖入口(命令行)
bin/flink run \
-c ***.example.WordCountJob \ # 覆盖入口类
target/app-1.0-SNAPSHOT-shaded.jar \
--yourArg1 foo --yourArg2 bar
五、命令行 & Web UI
1) 命令行(最通用)
# 基本提交流程
bin/flink run target/app-1.0-SNAPSHOT-shaded.jar --key value
# 覆盖入口类
bin/flink run -c ***.example.WordCountJob target/app-1.0-SNAPSHOT-shaded.jar
- 适用于 Standalone / YARN / K8s 等多种部署模式(集群准备好后,
flink run即可)。 -
参数传递建议配合
ParameterTool.fromArgs(args)读取。
2) Web UI(可视化上传)
- 打开 Flink Web 前端 → “Submit New Job”
- 上传 JAR
- 如果 Manifest 无入口,在页面上选择/填写 Entry Class
- 填写程序参数,点击提交
六、远程执行与本地验证
-
本地 IDEA 运行
main():用getExecutionEnvironment()自动起本地运行时,调试快、反馈直观。 - 集群执行:打包 JAR 提交即可,代码不变。
- 这套机制也支持 CI:构建产物后,通过自动化脚本
bin/flink run提交。
七、常见问题速查
1) ClassNotFoundException / NoSuchMethodError
- 检查是否错误地把 Flink 依赖 打进了 JAR(不要),或版本与集群不一致
- 业务依赖冲突(Guava/Jackson/***ty)→ relocate 或与集群对齐
2) “入口类找不到”
- Manifest 未写
main-class/program-class且命令行/Web UI 未指定-c - 入口类的包名拼写、可见性、方法签名是否正确(
public static void main(String[] args))
3) 本地能跑,集群报错
- 本地 JDK/依赖版本与集群不同
- 读取外部文件路径、权限、HDFS/S3 凭据在集群环境不可用
- 在本地加载的配置/字典未打包或未挂载到集群
4) 依赖体积过大
- 只打业务依赖;Flink 及其 Connector(多数情况)由集群侧提供(
provided) - 若必须打进(如独立运行环境),请确认与运行时 Flink 版本完全兼容
八、交付级清单(Checklist)
-
代码中统一使用
StreamExecutionEnvironment.getExecutionEnvironment() -
Maven:Flink 依赖
provided,Manifest 写好Main-Class -
若有多入口,清楚
program-class优先级,并能用-c覆盖 -
使用
ParameterTool接收参数,避免硬编码 - 如有依赖冲突,使用 shade + relocate
- 本地/集群分别验证:路径、权限、凭证、JDK/依赖版本
-
CI/CD:构建产物 → 归档 →
flink run提交脚本自动化
九、结语
打包不是“把代码 zip 一下”这么简单:入口类识别、依赖范围、环境一致性都决定了上线是否顺滑。
遵循本文的“环境统一 + 正确打包 + 清晰入口 + 参数化配置”,你的 Flink 程序就能在 IDE → 集群 的迁移中“无感切换”,稳定交付。祝你 All Green!