命令行界面

Flink提供命令行界面(CLI)来运行打包为JAR文件的程序,并控制它们的执行。CLI是任何Flink设置的一部分,可在本地单节点设置和分布式设置中使用。它位于<flink-home>/bin/flink 默认情况下,并连接到从同一安装目录启动的正在运行的Flink主服务器(JobManager)。

使用命令行界面的先决条件是Flink主机(JobManager)已启动(通过 <flink-home>/bin/start-cluster.sh)或YARN环境可用。

命令行可用于

例子

  • 运行没有参数的示例程序:

    ./bin/flink run ./examples/batch/WordCount.jar
    
  • 使用输入和结果文件的参数运行示例程序:

    ./bin/flink run ./examples/batch/WordCount.jar \
                         --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 运行带有并行性的示例程序16以及输入和结果文件的参数:

    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                         --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 运行禁用flink log输出的示例程序:

        ./bin/flink run -q ./examples/batch/WordCount.jar
    
  • 以分离模式运行示例程序:

        ./bin/flink run -d ./examples/batch/WordCount.jar
    
  • 在特定的JobManager上运行示例程序:

    ./bin/flink run -m myJMHost:8081 \
                           ./examples/batch/WordCount.jar \
                           --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 以特定类作为入口点运行示例程序:

    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
                           ./examples/batch/WordCount.jar \
                           --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 使用具有2个TaskManagers 每作业YARN群集运行示例程序

    ./bin/flink run -m yarn-cluster -yn 2 \
                           ./examples/batch/WordCount.jar \
                           --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
    
  • 以Word为单位显示WordCount示例程序的优化执行计划:

    ./bin/flink info ./examples/batch/WordCount.jar \
                            --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 列出计划和正在运行的作业(包括其JobID):

    ./bin/flink list
    
  • 列出预定作业(包括其作业ID):

    ./bin/flink list -s
    
  • 列出正在运行的作业(包括其作业ID):

    ./bin/flink list -r
    
  • 列出所有现有工作(包括其作业ID):

    ./bin/flink list -a
    
  • 列出在Flink YARN会话中运行Flink作业:

    ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
    
  • 取消工作:

    ./bin/flink cancel <jobID>
    
  • 使用保存点取消作业:

    ./bin/flink cancel -s [targetDirectory] <jobID>
    
  • 停止工作(仅限流处理工作):

    ./bin/flink stop <jobID>
    
  • 修改正在运行的作业(仅限流式处理作业):. / bin/flink modify -p

注意:取消和停止(流处理)作业的区别如下:

在取消呼叫中,作业中的算子立即接收cancel()方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink将开始定期中断线程,直到它停止。

“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop仅适用于使用实现StoppableFunction接口的源的作业当用户请求停止作业时,所有源都将接收stop()方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。

保存点

保存点通过命令行客户端控制:

触发保存点

./bin/flink savepoint <jobId> [savepointDirectory]

这将触发具有ID的作业的保存点jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由JobManager访问。

如果未指定目标目录,则需要配置默认目录否则,触发保存点将失败。

使用YARN触发保存点

./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>

这将触发具有ID jobId和YARN应用程序ID 的作业的保存点yarnAppId,并返回创建的保存点的路径。

其他所有内容与上面触发保存点部分中描述的相同

使用保存点取消

您可以自动触发保存点并取消作业。

./bin/flink cancel -s [savepointDirectory] <jobID>

如果未配置保存点目录,则需要为Flink安装配置默认保存点目录(请参阅保存点)。

只有保存点成功,才会取消该作业。

恢复保存点

./bin/flink run -s <savepointPath> ...

run命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger命令返回保存点路径。

默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置allowNonRestoredState标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。

./bin/flink run -s <savepointPath> -n ...

如果您的程序删除了属于保存点的 算子,这将非常有用。

配置保存点

./bin/flink savepoint -d <savepointPath>

在给定路径处理保存点。savepoint trigger命令返回保存点路径。

如果使用自定义状态实例(例如自定义还原状态或RocksDB状态),则必须指定触发保存点的程序JAR的路径,以便使用用户代码类加载器处理保存点:

./bin/flink savepoint -d <savepointPath> -j <jarFile>

否则,你会遇到一个ClassNotFoundException

用法

命令行语法如下:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main" method or "getPlan()" method.
                                          Only needed if the JAR file does not
                                          specify the class in its manifest.
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.
     -d,--detached                        If present, runs the job in detached
                                          mode
     -n,--allowNonRestoredState           Allow to skip savepoint state that
                                          cannot be restored. You need to allow
                                          this if you removed an operator from
                                          your program that was part of the
                                          program when the savepoint was
                                          triggered.
     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the
                                          configuration.
     -q,--sysoutLogging                   If present, suppress logging output to
                                          standard out.
     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                          from (for example
                                          hdfs:///flink/savepoint-1537).
  Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager (master) to
                                          which to connect. Use this flag to
                                          connect to a different JobManager than
                                          the one specified in the
                                          configuration.
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container
                                          with optional unit (default: MB)
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=Number of Task Managers)
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container
                                          with optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for
                                          the YARN application
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode



Action "info" shows the optimized execution plan of the program (JSON).

  Syntax: info [OPTIONS] <jar-file> <arguments>
  "info" action options:
     -c,--class <classname>           Class with the program entry point ("main"
                                      method or "getPlan()" method. Only needed
                                      if the JAR file does not specify the class
                                      in its manifest.
     -p,--parallelism <parallelism>   The parallelism with which to run the
                                      program. Optional flag to override the
                                      default value specified in the
                                      configuration.


Action "list" lists running and scheduled programs.

  Syntax: list [OPTIONS]
  "list" action options:
     -r,--running     Show only running programs and their JobIDs
     -s,--scheduled   Show only scheduled programs and their JobIDs
  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Address of the JobManager (master) to
                                      which to connect. Use this flag to connect
                                      to a different JobManager than the one
                                      specified in the configuration.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode



Action "stop" stops a running program (streaming jobs only).

  Syntax: stop [OPTIONS] <Job ID>
  "stop" action options:

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Address of the JobManager (master) to
                                      which to connect. Use this flag to connect
                                      to a different JobManager than the one
                                      specified in the configuration.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode



Action "cancel" cancels a running program.

  Syntax: cancel [OPTIONS] <Job ID>
  "cancel" action options:
     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
                                            The target directory is optional. If
                                            no directory is specified, the
                                            configured default directory
                                            (state.savepoints.dir) is used.
  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Address of the JobManager (master) to
                                      which to connect. Use this flag to connect
                                      to a different JobManager than the one
                                      specified in the configuration.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode



Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint" action options:
     -d,--dispose <arg>       Path of savepoint to dispose.
     -j,--jarfile <jarfile>   Flink program JAR file.
  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Address of the JobManager (master) to
                                      which to connect. Use this flag to connect
                                      to a different JobManager than the one
                                      specified in the configuration.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode



Action "modify" modifies a running job (e.g. change of parallelism).

  Syntax: modify <Job ID> [OPTIONS]
  "modify" action options:
     -h,--help                           Show the help message for the CLI
                                         Frontend or the action.
     -p,--parallelism <newParallelism>   New parallelism for the specified job.
     -v,--verbose                        This option is deprecated.
  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Address of the JobManager (master) to
                                      which to connect. Use this flag to connect
                                      to a different JobManager than the one
                                      specified in the configuration.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode

回到顶部