Flink之本地集群安装运行

背景

Flink是目前比较流行的流处理框架,可通过集群调用,也可通过api调用。可通过在本地搭建单flink集群进行开发测试。

基础环境搭建

  • 系统:Mac OS
  • java环境:安装java1.8及以上

flink本地集群搭建运行

  • 通过brew安装flink

    1
    brew install apache-flink
  • 确认flink安装情况

    1
    2
    $ flink --version
    Version: 1.8.0, Commit ID: 4caec0d
  • 确认安装路径

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    $ brew info apache-flink
    apache-flink: stable 1.8.0, HEAD
    Scalable batch and stream data processing
    https://flink.apache.org/
    /usr/local/Cellar/apache-flink/1.8.0 (155 files, 320.2MB) *
    Built from source on 2019-09-25 at 09:47:27
    From: https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/homebrew-core.git/Formula/apache-flink.rb
    ==> Requirements
    Required: java = 1.8 ✔
    ==> Options
    --HEAD
    Install HEAD version
    ==> Analytics
    install: 1,780 (30 days), 5,638 (90 days), 17,424 (365 days)
    install_on_request: 1,776 (30 days), 5,604 (90 days), 17,243 (365 days)
    build_error: 0 (30 days)

    可见flink的安装目录为:/usr/local/Cellar/apache-flink/1.8.0

    flink相关的启停脚本、日志都在这个目录下。

  • 进入flink安装目录

    1
    $ cd /usr/local/Cellar/apache-flink/1.8.0
  • 运行启动脚本

    1
    2
    3
    4
    $ ./libexec/bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host zhengkaideMacBook-Pro.local.
    Starting taskexecutor daemon on host zhengkaideMacBook-Pro.local.
  • 打开网页查看:登陆http://localhost:8081即可

    flink_ui

  • 查看flink相关日志,在flink安装路径下的log目录内:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    $ tail libexec/log/flink-zhengk-standalonesession-0-zhengkaideMacBook-Pro.local.log
    2019-09-26 10:53:59,030 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at localhost:8081
    2019-09-26 10:53:59,031 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://localhost:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
    2019-09-26 10:53:59,031 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Web frontend listening at http://localhost:8081.
    2019-09-26 10:53:59,118 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
    2019-09-26 10:53:59,137 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
    2019-09-26 10:53:59,158 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - ResourceManager akka.tcp://flink@localhost:6123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
    2019-09-26 10:53:59,159 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Starting the SlotManager.
    2019-09-26 10:53:59,170 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher akka.tcp://flink@localhost:6123/user/dispatcher was granted leadership with fencing token 00000000-0000-0000-0000-000000000000
    2019-09-26 10:53:59,172 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all persisted jobs.
    2019-09-26 10:53:59,496 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID deb7db492d9f5d89bb6070517aeb5aa7 (akka.tcp://flink@192.168.1.104:61286/user/taskmanager_0) at ResourceManager

如下是一个监听端口的流数据,并进行单词字数统计的demo,代码如下,需最终打包成一个可执行的jar包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package org.apache.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class SocketWindowWordCountJob {

public static void main(String[] args) throws Exception {

final int port;
try{
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.out.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});


windowCounts.print().setParallelism(1);

env.execute("Socket Window WordCount");

}

public static class WordWithCount {
public String word;
public long count;

public WordWithCount() {}

public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return word + " : " + count;
}
}
}

启动demo程序

  • 单独开启一个终端,使用nc启动监听9000端口(这里端口可以更换)

    1
    $ nc -l 9000
  • 通过flink启动demo程序,并指定端口为9000

    1
    2
    3
    $ cd /usr/local/Cellar/apache-flink/1.8.0
    $ ./libexec/bin/flink run /Users/zhengk/IdeaProjects/flinkquickstartjava/target/flink-quickstart-java-1.0-SNAPSHOT.jar --port 9000
    Starting execution of program
  • 通过网页查看运行情况:http://localhost:8081

    example_ui

    点击进去可查看详细信息:

    example_detail

  • 在nc终端中输入一些文字(这里为随意输入,会通过流的方式)

    1
    2
    3
    4
    lorem ipsum
    ipsum ipsum ipsum
    bye
    hello
  • 查看flink日志可观察字数统计的输出:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    $ cd /usr/local/Cellar/apache-flink/1.8.0
    $ cd libexec/log
    $ tail -f flink-zhengk-taskexecutor-0-zhengkaideMacBook-Pro.local.out
    ipsum : 3
    ipsum : 3
    ipsum : 3
    bye : 1
    bye : 1
    ipsum : 3
    ipsum : 3
    bye : 1
    bye : 1
    bye : 1

停止flink集群

  • 进入flink安装目录

    1
    $ cd /usr/local/Cellar/apache-flink/1.8.0
  • 运行停止脚本

    1
    2
    3
    $ ./libexec/bin/stop-cluster.sh
    Stopping taskexecutor daemon (pid: 38310) on host zhengkaideMacBook-Pro.local.
    Stopping standalonesession daemon (pid: 37893) on host zhengkaideMacBook-Pro.local.

扩展

官方教程如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html