大数据与云原生组件的搭建及介绍

6/26/2022 HadoopFlink流处理引擎k8s云原生gRPCDubbo

# 1. Hadoop基本介绍

# 1.1 Hadoop简介

# 1.1.1 Hadoop是什么

Hadoop 是一个开源的分布式计算和存储框架,由 Apache 基金会开发和维护。

  • Hadoop 为庞大的计算机集群提供可靠的、可伸缩的应用层计算和存储支持,它允许使用简单的编程模型跨计算机群集分布式处理大型数据集,并且支持在单台计算机到几千台计算机之间进行扩展。
  • Hadoop 使用 Java 开发,所以可以在多种不同硬件平台的计算机上部署和使用。其核心部件包括分布式文件系统 (HDFS)、集群资源管理YARN 和高性能并行计算MapReduce。

# 1.1.2 Hadoop特点

Hadoop具备以下特点。

  • 分布式:存储和处理并非构建在一台大型超级计算机之上,而是分布在一群小型电脑上,这些电脑之间可以相互通信并协同工作。
  • 水平可伸缩性:只需添加新机器就可以很容易地扩展Hadoop集群,每台新机器都相应地增加了Hadoop集群的总存储和处理能力。
  • 容错:即使一些硬件或软件组件不能正常工作,Hadoop也能继续运行。
  • 成本:Hadoop不需要昂贵的高端服务器,而且在没有商业许可证的情况下也可以正常工作。
  • 编程抽象:Hadoop负责处理与分布式计算相关的所有纷杂的细节。由于有高级API,用户可以专注于实现业务逻辑,解决他们在现实世界中的问题。
  • 数据本地化:Hadoop不会将大型数据集迁移到应用程序正在运行的位置,而是在数据所在位置运行应用程序。

# 1.2 Hadoop生态圈

随着Hadoop的不断发展,Hadoop生态体系越来越完善,现如今已经发展成一个庞大的生态体系。Hadoop生态体系包含了很多子系统,下面介绍一些常见的子系统,具体如下:

Hadoop生态圈

[1] HDFS分布式文件系统

HDFS是Hadoop分布式文件系统,它是Hadoop生态系统中的核心项目之一,是分布式计算中数据存储管理基础。HDFS具有高容错性的数据备份机制,它能检测和应对硬件故障,并在低成本的通用硬件上运行。另外,HDFS具备流式的数据访问特点,提供高吞吐量应用程序数据访问功能,适合带有大型数据集的应用程序。

[2] MapReduce分布式计算框架

MapReduce是一种计算模型,用于大规模数据集(大于1TB)的并行运算。“Map”对数据集上的独立元素进行指定的操作,生成键值对形式中间结果;“Reduce”则对中间结果中相同“键”的所有“值”进行规约,以得到最终结果。MapReduce这种“分而治之”的思想,极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

[3] Yarn资源管理框架

Yarn(Yet Another Resource Negotiator)是Hadoop 2.0中的资源管理器,它可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

[4] Sqoop数据迁移工具

Sqoop是一款开源的数据导入导出工具,主要用于在Hadoop与传统的数据库间进行数据的转换,它可以将一个关系型数据库(例如,MySQL、Oracle等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导出到关系型数据库中,使数据迁移变得非常方便。

[5] Hbase分布式存储系统

HBase是Google Bigtable克隆版,它是一个针对结构化数据的可伸缩、高可靠、高性能、分布式和面向列的动态模式数据库。和传统关系数据库不同,HBase采用了BigTable的数据模型:增强的稀疏排序映射表(Key/Value),其中,键由行关键字、列关键字和时间戳构成。HBase提供了对大规模数据的随机、实时读写访问,同时,HBase中保存的数据可以使用MapReduce来处理,它将数据存储和并行计算完美地结合在一起。

[6] Zookeeper分布式协作服务

Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和HBase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等用于构建分布式应用,减少分布式应用程序所承担的协调任务。

[7] Hive数据仓库

Hive是基于Hadoop的一个分布式数据仓库工具,可以将结构化的数据文件映射为一张数据库表,将SQL语句转换为MapReduce任务进行运行。其优点是操作简单,降低学习成本,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

[8] Flume日志收集工具

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

# 1.3 Hadoop部署模式

Hadoop有单机模式(Standalone Mode)、伪分布式模式(Pseudo-Distrubuted Mode)、全分布式集群模式(Full-Distributed Mode)3种部署方式。

Hadoop各种部署模式的区分依据主要是HDFS的NameNode、DataNode,YARN的ResourceManager、NodeManager、AppMaster等模块运行在几个JVM进程以及几个机器节点上。

Hadoop部署模式

# 1.3.1 单机模式

默认情况下,Hadoop即处于该模式,无需对配置文件进行修改,用于开发和调式。使用本地文件系统,而不是分布式文件系统。Hadoop不会启动NameNode、DataNode、JobTracker、TaskTracker等守护进程,Map()和Reduce()任务作为同一个进程的不同部分来执行的。用于对MapReduce程序的逻辑进行调试,确保程序的正确。

# 1.3.2 伪分布式模式

Hadoop的守护进程运行在本机机器,模拟一个小规模的集群。在一台主机模拟多主机。Hadoop启动NameNode、DataNode、JobTracker、TaskTracker这些守护进程都在同一台机器上运行,是相互独立的Java进程。在这种模式下,Hadoop使用的是分布式文件系统,各个作业也是由JobTraker服务,来管理的独立进程。在单机模式之上增加了代码调试功能,允许检查内存使用情况,HDFS输入输出,以及其他的守护进程交互。类似于完全分布式模式,因此,这种模式常用来开发测试Hadoop程序的执行是否正确。

修改3个配置文件:core-site.xml(Hadoop集群的特性,作用于全部进程及客户端)、hdfs-site.xml(配置HDFS集群的工作属性)、mapred-site.xml(配置MapReduce集群的属性),格式化文件系统。

# 1.3.3 全分布式集群模式

Hadoop的守护进程运行在一个集群上。Hadoop的守护进程运行在由多台主机搭建的集群上,是真正的生产环境。在所有的主机上安装JDK和Hadoop,组成相互连通的网络。在主机间设置SSH免密码登录,把各从节点生成的公钥添加到主节点的信任列表。

修改3个配置文件:core-site.xml、hdfs-site.xml、mapred-site.xml,指定NameNode和JobTraker的位置和端口,设置文件的副本等参数,格式化文件系统。

# 2. Hadoop部署环境准备

本文在 MacOS 上以伪分布式的方式搭建 Hadoop 集群。

  • 系统版本:macOS Ventura 13.0.1

  • JDK8版本:jdk-8u333-macosx-x64.dmg

  • Hadoop版本:hadoop-3.3.4.tar.gz

# 2.1 安装JDK环境

# 2.1.1 Hadoop与JDK版本关系

Hadoop与JDK版本关系如下:

  • Apache Hadoop 3.3 及更高版本支持 Java 8 和 Java 11(仅限运行时,请使用 Java 8 编译 Hadoop,不支持使用 Java 11 编译 Hadoop)
  • Apache Hadoop 从 3.0.x 到 3.2.x 现在只支持 Java 8
  • Apache Hadoop 从 2.7.x 到 2.10.x 支持 Java 7 和 8

详见:https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Java+Versions (opens new window)

# 2.1.2 配置JDK8环境

JDK官网:https://www.oracle.com/java/technologies/downloads/#java8 (opens new window)

Step1:去官网下载 jdk-8u333-macosx-x64.dmg(以实际版本为准即可),然后安装。

Step2:配置环境变量

$ cd ~/
$ open .bash_profile

添加如下一行配置:
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_333.jdk/Contents/Home
1
2
3
4
5

# 2.2 开启SSH服务并配置免密登录

# 2.2.1 开启SSH远程登录服务

MacOS 系统已经默认安装了SSH,但是SSH服务并未启用,下面我们需要将之开启。

通用——共享——远程登录——打开,并将详细配置项改为“所有用户”。

MacOS开启SSH远程登录

# 2.2.2 创建密钥并配置免密登录

执行以下命令创建密钥(如果之前配置Github、Gitlab等服务远程连接时生成过,则不需要重新生成)

$ cd ~/.ssh && ls -l               // 查看是否创建过密钥
$ ssh-keygen -t rsa -C “email”     // 创建密钥
1
2

执行以下命令,将自己的密钥放在ssh授权目录,这样ssh登录自身就不需要输入密码了。

$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys    // 将密钥放在ssh授权目录
$ ssh localhost                                      // 输出Last login即为配置成功
1
2

# 3. 搭建Hadoop伪分布式集群

# 3.1 下载Hadoop安装包

Hadoop官网下载:https://hadoop.apache.org/releases.html (opens new window)(我这里选择了 hadoop-3.3.4 (opens new window) 版本)

Hadoop官网下载

将下载得到的hadoop-3.3.4.tar.gz文件解压,如果只需要Hadoop单机模式,现在就可以了,但是单机模式没有HDFS,因此接下来要做伪分布模式的设置。

# 3.2 Hadoop伪分布设置

# 3.2.1 修改配置信息

进入 ./hadoop-3.3.4/etc/hadoop目录,修改成以下配置

  • 打开hadoop-env.sh文件,增加JAVA的路径设置:

    export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_333.jdk/Contents/Home
    
    1
  • 打开core-site.xml文件,将configuration节点改为如下内容:

    <configuration>
      <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
      </property>
    </configuration>
    
    1
    2
    3
    4
    5
    6
  • 打开hdfs-site.xml文件,将configuration节点改为如下内容:

    <configuration>
      <property>
        <name>dfs.replication</name>
        <value>1</value>
      </property>
    </configuration>
    
    1
    2
    3
    4
    5
    6
  • 打开mapred-site.xml文件,将configuration节点改为如下内容:

    <configuration>
        <property>
             <name>mapreduce.framework.name</name>
             <value>yarn</value>
         </property>
    </configuration>
    
    1
    2
    3
    4
    5
    6
  • 打开yarn-site.xml文件,将configuration节点改为如下内容:

    <configuration>
        <property> 
            <name>yarn.nodemanager.aux-services</name> 
            <value>mapreduce_shuffle</value> 
        </property>
        <property> 
            <name>yarn.nodemanager.env-whitelist</name>
                      <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
        </property>
    </configuration>
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

# 3.2.2 初始化HDFS

./hadoop-3.3.4/bin目录执行以下命令,初始化HDFS

$ ./hdfs namenode -format
1

初始化HDFS

# 3.3 启动及关闭Hadoop

# 3.3.1 启动Hadoop及yarn服务

进入目录./hadoop-3.3.4/sbin目录,执行./start-dfs.sh启动HDFS

$ cd ./hadoop-3.3.4/sbin
$ ./start-dfs.sh
1
2

启动Hadoop

注:上面的警告不会影响使用。

Chrome浏览器访问地址:http://localhost:9870 ,可见Hadoop的Web页面如下图所示:

Hadoop的Web页面

进入./hadoop-3.3.4/sbin目录,执行./start-yarn.sh启动Yarn

$ cd ./hadoop-3.3.4/sbin
$ ./start-yarn.sh
1
2

启动Yarn服务

Chrome浏览器访问地址:http://localhost:8088 ,可见Yarn的Web页面如下图所示:

Yarn的Web页面

执行jps命令查看所有java进程,正常情况下可以见到以下进程:

$ jps
1

jps命令查看进程

至此,Hadoop3伪分布式环境的部署、设置、启动都已经完成。

# 3.3.2 停止所有Hadoop服务

进入./hadoop-3.3.4/sbin目录,执行./stop-all.sh即可关闭Hadoop的所有服务

$ cd ./hadoop-3.3.4/sbin
$ ./stop-all.sh
1
2

停止所有Hadoop服务

# 4. Flink基本介绍

# 4.1 Flink是什么

Apache Flink 是一个分布式流处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

项目地址:https://github.com/apache/flink (opens new window)

官方文档:https://flink.apache.org/zh/ (opens new window)

Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。

Flink简介

# 4.2 PyFlink简介

PyFlink 是 Apache Flink 的 Python API,允许构建可扩展的批处理和流式工作负载,例如实时数据处理管道、大规模探索性数据分析、机器学习 (ML) 管道和 ETL 过程。如果已经熟悉 Python 和 Pandas 等库,那么 PyFlink 可以更简单地利用 Flink 生态系统的全部功能。根据需要的抽象级别,PyFlink 中可以使用两种不同的 API:PyFlink Table API 和 PyFlink DataStream API。

# 4.3 Flink重要概念

# 4.3.1 批处理与流处理

批处理与流处理:

  • 批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
  • 流处理的特点是无界、实时,无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在Spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。而在Flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

  • 无界流:有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  • 有界流:有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

有界流与无界流

Flink 擅长处理无界和有界数据集,精确的时间控制和状态化使得Flink的运行时能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

# 4.3.2 部署应用到任何地方

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了常见的集群资源管理器,例如 Hadoop YARN、Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行。

Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。

部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成。

# 4.3.3 利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

Flink利用内存性能

# 4.4 Flink基本原理

# 4.4.1 统一的批处理与流处理系统

在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据项目一般会被设计为只能处理其中一种任务,例如Apache Storm、Apache Smaza只支持流处理任务,而Aapche MapReduce、Apache Tez、Apache Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,看似一个特例,实则不然——Spark Streaming采用了一种micro-batch的架构,即把输入的数据流切分成细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Apache Storm、Apache Smaza等完全流式的数据处理方式完全不同。通过其灵活的执行引擎,Flink能够同时支持批处理任务与流处理任务。

在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式。

  • 对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。
  • 而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。

这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求。Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型。Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量。同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量。

Flink执行引擎数据传输模式

# 4.4.2 Flink流处理的容错机制

对于一个分布式系统来说,单个进程或是节点崩溃导致整个Job失败是经常发生的事情,在异常发生时不会丢失用户数据并能自动恢复才是分布式系统必须支持的特性之一。本节主要介绍Flink流处理系统任务级别的容错机制。

批处理系统比较容易实现容错机制,由于文件可以重复访问,当某个任务失败后,重启该任务即可。但是到了流处理系统,由于数据源是无限的数据流,从而导致一个流处理任务执行几个月的情况,将所有数据缓存或是持久化,留待以后重复访问基本上是不可行的。Flink基于分布式快照与可部分重发的数据源实现了容错。用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。Flink的分布式快照实现借鉴了Chandy和Lamport在1985年发表的一篇关于分布式快照的论文,其实现的主要思想如下:

按照用户自定义的分布式快照间隔时间,Flink会定时在所有数据源中插入一种特殊的快照标记消息,这些快照标记消息和其他消息一样在DAG中流动,但是不会被用户定义的业务逻辑所处理,每一个快照标记消息都将其所在的数据流分成两部分:本次快照数据和下次快照数据。

Flink包含快照标记消息的消息流

快照标记消息沿着DAG流经各个操作符,当操作符处理到快照标记消息时,会对自己的状态进行快照,并存储起来。当一个操作符有多个输入的时候,Flink会将先抵达的快照标记消息及其之后的消息缓存起来,当所有的输入中对应该次快照的快照标记消息全部抵达后,操作符对自己的状态快照并存储,之后处理所有快照标记消息之后的已缓存消息。操作符对自己的状态快照并存储可以是异步与增量的操作,并不需要阻塞消息的处理。分布式快照的流程下图所示:

Flink分布式快照流程图

当所有的Data Sink(终点操作符)都收到快照标记信息并对自己的状态快照和存储后,整个分布式快照就完成了,同时通知数据源释放该快照标记消息之前的所有消息。若之后发生节点崩溃等异常情况时,只需要恢复之前存储的分布式快照状态,并从数据源重发该快照以后的消息就可以了。

Exactly-Once是流处理系统需要支持的一个非常重要的特性,它保证每一条消息只被流处理系统处理一次,许多流处理任务的业务逻辑都依赖于Exactly-Once特性。相对于At-Least-Once或是At-Most-Once, Exactly-Once特性对流处理系统的要求更为严格,实现也更加困难。Flink基于分布式快照实现了Exactly-Once特性。

相对于其他流处理系统的容错方案,Flink基于分布式快照的方案在功能和性能方面都具有很多优点,包括:

  • 低延迟。由于操作符状态的存储可以异步,所以进行快照的过程基本上不会阻塞消息的处理,因此不会对消息延迟产生负面影响。
  • 高吞吐量。当操作符状态较少时,对吞吐量基本没有影响。当操作符状态较多时,相对于其他的容错机制,分布式快照的时间间隔是用户自定义的,所以用户可以权衡错误恢复时间和吞吐量要求来调整分布式快照的时间间隔。
  • 与业务逻辑的隔离。Flink的分布式快照机制与用户的业务逻辑是完全隔离的,用户的业务逻辑不会依赖或是对分布式快照产生任何影响。
  • 错误恢复代价。分布式快照的时间间隔越短,错误恢复的时间越少,与吞吐量负相关。

# 4.4.3 Flink流处理的时间窗口

对于流处理系统来说,流入的消息不存在上限,所以对于聚合或是连接等操作,流处理系统需要对流入的消息进行分段,然后基于每一段数据进行聚合或是连接。消息的分段即称为窗口,流处理系统支持的窗口有很多类型,最常见的就是时间窗口,基于时间间隔对消息进行分段处理。

对于目前大部分流处理系统来说,时间窗口一般是根据Task所在节点的本地时钟进行切分,这种方式实现起来比较容易,不会产生阻塞。但是可能无法满足某些应用需求,比如:消息本身带有时间戳,用户希望按照消息本身的时间特性进行分段处理。由于不同节点的时钟可能不同,以及消息在流经各个节点的延迟不同,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不同的时间窗口中,从而产生不符合预期的结果。

Flink支持3种类型的时间窗口,分别适用于用户对于时间窗口不同类型的要求:

  • Operator Time。根据Task所在节点的本地时钟来切分的时间窗口。
  • Event Time。消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的所有消息一定会被正确处理。由于消息可能乱序流入Task,所以Task需要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的所有消息都被处理,才可以释放,如果乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。
  • Ingress Time。有时消息本身并不带有时间戳信息,但用户依然希望按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时可以在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,之后处理的流程与Event Time相同。Ingress Time可以看成是Event Time的一个特例,由于其在消息源处时间戳一定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,因此对Flink分布式系统的吞吐量和延迟的影响也会更小。

# 4.4.4 Event Time时间窗口的实现

Flink借鉴了Google的MillWheel项目,通过WaterMark来支持基于Event Time的时间窗口。

当操作符通过基于Event Time的时间窗口来处理数据时,它必须在确定所有属于该时间窗口的消息全部流入此操作符后才能开始数据处理。但是由于消息可能是乱序的,所以操作符无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入,Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的WaterMark,插入到消息流中输出到Flink流处理系统中,Flink操作符按照时间窗口缓存所有流入的消息,当操作符处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口数据进行处理并发送到下一个操作符节点,然后也将WaterMark发送到下一个操作符节点。

为了保证能够处理所有属于某个时间窗口的消息,操作符必须等到大于这个时间窗口的WaterMark之后才能开始对该时间窗口的消息进行处理,相对于基于Operator Time的时间窗口,Flink需要占用更多内存,且会直接影响消息处理的延迟时间。对此,一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果即可,无需缓存该时间窗口的所有消息。

对于基于Event Time时间窗口的操作符来说,流入WaterMark的时间戳与当前节点的时钟一致是最简单理想的状况,但是在实际环境中是不可能的,由于消息的乱序以及前面节点处理效率的不同,总是会有某些消息流入时间大于其本身的时间戳,真实WaterMark时间戳与理想情况下WaterMark时间戳的差别称为Time Skew,如下图所示:

WaterMark的Time-Skew图

Time Skew决定了该WaterMark与上一个WaterMark之间的时间窗口所有数据需要缓存的时间,Time Skew时间越长,该时间窗口数据的延迟越长,占用内存的时间也越长,同时会对流处理系统的吞吐量产生负面影响。

# 4.4.5 基于时间戳的排序

在流处理系统中,由于流入的消息是无限的,所以对消息进行排序基本上被认为是不可行的。但是在Flink流处理系统中,基于WaterMark,Flink实现了基于时间戳的全局排序。排序的实现思路如下:排序操作符缓存所有流入的消息,当其接收到WaterMark时,对时间戳小于该WaterMark的消息进行排序,并发送到下一个节点,在此排序操作符中释放所有时间戳小于该WaterMark的消息,继续缓存流入的消息,等待下一个WaterMark触发下一次排序。

由于WaterMark保证了在其之后不会出现时间戳比它小的消息,所以可以保证排序的正确性。需要注意的是,如果排序操作符有多个节点,只能保证每个节点的流出消息是有序的,节点之间的消息不能保证有序,要实现全局有序,则只能有一个排序操作符节点。

通过支持基于Event Time的消息处理,Flink扩展了其流处理系统的应用范围,使得更多的流处理任务可以通过Flink来执行。

# 4.4.6 定制的内存管理

Flink项目基于Java及Scala等JVM语言,JVM本身作为一个各种类型应用的执行平台,其对Java对象的管理也是基于通用的处理策略,其垃圾回收器通过估算Java对象的生命周期对Java对象进行有效率的管理。

针对不同类型的应用,用户可能需要针对该类型应用的特点,配置针对性的JVM参数更有效率的管理Java对象,从而提高性能。JVM调优需要用户对应用本身及JVM的各参数有深入了解,极大地提高了分布式计算平台的调优门槛。Flink框架本身了解计算逻辑每个步骤的数据传输,相比于JVM垃圾回收器,其了解更多的Java对象生命周期,从而为更有效率地管理Java对象提供了可能。

# 4.4.7 JVM存在的问题

[1] Java对象开销

相对于c/c++等更加接近底层的语言,Java对象的存储密度相对偏低,例如,“abcd”这样简单的字符串在UTF-8编码中需要4个字节存储,但采用了UTF-16编码存储字符串的Java则需要8个字节,同时Java对象还有header等其他额外信息,一个4字节字符串对象在Java中需要48字节的空间来存储。对于大部分的大数据应用,内存都是稀缺资源,更有效率地内存存储,意味着CPU数据访问吞吐量更高,以及更少磁盘落地的存在。

[2] 对象存储结构引发的cache miss

为了缓解CPU处理速度与内存访问速度的差距,现代CPU数据访问一般都会有多级缓存。当从内存加载数据到缓存时,一般是以cache line为单位加载数据,所以当CPU访问的数据如果是在内存中连续存储的话,访问的效率会非常高。如果CPU要访问的数据不在当前缓存所有的cache line中,则需要从内存中加载对应的数据,这被称为一次cache miss。当cache miss非常高的时候,CPU大部分的时间都在等待数据加载,而不是真正的处理数据。Java对象并不是连续的存储在内存上,同时很多的Java数据结构的数据聚集性也不好。

[3] 大数据的垃圾回收

Java的垃圾回收机制一方面它免去了开发者自己回收资源的步骤,提高了开发效率,减少了内存泄漏的可能,另一方面垃圾回收也是Java应用的不定时炸弹,有时秒级甚至是分钟级的垃圾回收极大影响了Java应用的性能和可用性。在时下数据中心,大容量内存得到了广泛的应用,甚至出现了单台机器配置TB内存的情况,同时,大数据分析通常会遍历整个源数据集,对数据进行转换、清洗、处理等步骤。在这个过程中,会产生海量的Java对象,JVM的垃圾回收执行效率对性能有很大影响。通过JVM参数调优提高垃圾回收效率需要用户对应用和分布式计算框架以及JVM的各参数有深入了解,而且有时候这也远远不够。

[4] OOM问题

OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会出现OutOfMemoryError错误,JVM崩溃,分布式框架的健壮性和性能都会受到影响。通过JVM管理内存,同时试图解决OOM问题的应用,通常都需要检查Java对象的大小,并在某些存储Java对象特别多的数据结构中设置阈值进行控制。但是JVM并没有提供官方检查Java对象大小的工具,第三方的工具类库可能无法准确通用地确定Java对象大小。侵入式的阈值检查也会为分布式计算框架的实现增加很多额外与业务逻辑无关的代码。

# 4.4.8 Flink的处理策略

为了解决以上提到的问题,高性能分布式计算框架通常需要以下技术:

  • 定制的序列化工具。显式内存管理的前提步骤就是序列化,将Java对象序列化成二进制数据存储在内存上(on heap或是off-heap)。通用的序列化框架,如Java默认使用java.io.Serializable将Java对象及其成员变量的所有元信息作为其序列化数据的一部分,序列化后的数据包含了所有反序列化所需的信息。这在某些场景中十分必要,但是对于Flink这样的分布式计算框架来说,这些元数据信息可能是冗余数据。定制的序列化框架,如Hadoop的org.apache.hadoop.io.Writable需要用户实现该接口,并自定义类的序列化和反序列化方法。这种方式效率最高,但需要用户额外的工作,不够友好。
  • 显式的内存管理。一般通用的做法是批量申请和释放内存,每个JVM实例有一个统一的内存管理器,所有内存的申请和释放都通过该内存管理器进行。这可以避免常见的内存碎片问题,同时由于数据以二进制的方式存储,可以大大减轻垃圾回收压力。

缓存友好的数据结构和算法。对于计算密集的数据结构和算法,直接操作序列化后的二进制数据,而不是将对象反序列化后再进行操作。同时,只将操作相关的数据连续存储,可以最大化的利用L1/L2/L3缓存,减少Cache miss的概率,提升CPU计算的吞吐量。以排序为例,由于排序的主要操作是对Key进行对比,如果将所有排序数据的Key与Value分开并对Key连续存储,那么访问Key时的Cache命中率会大大提高。

# 4.4.9 Flink排序算法

以Flink中的排序为例,排序通常是分布式计算框架中一个非常重的操作,Flink通过特殊设计的排序算法获得了非常好的性能,其排序算法的实现如下:

  • 将待排序的数据经过序列化后存储在两个不同的MemorySegment集中。数据全部的序列化值存放于其中一个MemorySegment集中。数据序列化后的Key和指向第一个MemorySegment集中值的指针存放于第二个MemorySegment集中。
  • 对第二个MemorySegment集中的Key进行排序,如需交换Key位置,只需交换对应的Key+Pointer的位置,第一个MemorySegment集中的数据无需改变。 当比较两个Key大小时,TypeComparator提供了直接基于二进制数据的对比方法,无需反序列化任何数据。
  • 排序完成后,访问数据时,按照第二个MemorySegment集中Key的顺序访问,并通过Pointer值找到数据在第一个MemorySegment集中的位置,通过TypeSerializer反序列化成Java对象返回。

Flink排序算法

这样实现的好处有:

  • 通过Key和Full data分离存储的方式尽量将被操作的数据最小化,提高Cache命中的概率,从而提高CPU的吞吐量。
  • 移动数据时,只需移动Key+Pointer,而无须移动数据本身,大大减少了内存拷贝的数据量。
  • TypeComparator直接基于二进制数据进行操作,节省了反序列化的时间。

通过定制的内存管理,Flink通过充分利用内存与CPU缓存,大大提高了CPU的执行效率,同时由于大部分内存都由框架自己控制,也很大程度提升了系统的健壮性,减少了OOM出现的可能。

# 4.5 Flink特点及应用场景

# 4.5.1 Flink特点

Apache Flink是一个集合众多具有竞争力特性于一身的第三代流处理引擎,它的以下特点使得它能够在同类系统中脱颖而出。

[1] 同时支持高吞吐、低延迟、高性能。

  • Flink是目前开源社区中唯一一套集高吞吐、低延迟、高性能三者于一身的分布式流式处理框架。像Apache Spark也只能兼顾高吞吐和高性能特性,主要因为在Spark Streaming流式计算中无法做到低延迟保障;而流式计算框架Apache Storm只能支持低延迟和高性能特性,但是无法满足高吞吐的要求。

[2] 同时支持事件时间和处理时间语义。

  • 在流式计算领域中,窗口计算的地位举足轻重,但目前大多数框架窗口计算采用的都是处理时间,也就是事件传输到计算框架处理时系统主机的当前时间。Flink能够支持基于事件时间语义进行窗口计算,也就是使用事件产生的时间,这种基于事件驱动的机制使得事件即使乱序到达,流系统也能够计算出精确的结果,保证了事件原本的时序性。

[3] 支持有状态计算,并提供精确一次的状态一致性保障。

  • 所谓状态就是在流式计算过程中将算子的中间结果数据保存着内存或者文件系统中,等下一个事件进入算子后可以从之前的状态中获取中间结果中计算当前的结果,从而不须每次都基于全部的原始数据来统计结果,这种方式极大地提升了系统的性能,并降低了数据计算过程的资源消耗。

[4] 基于轻量级分布式快照实现的容错机制。

  • Flink能够分布式运行在上千个节点上,将一个大型计算任务的流程拆解成小的计算过程,然后将Task分布到并行节点上进行处理。在任务执行过程中,能够自动发现事件处理过程中的错误而导致的数据不一致问题,在这种情况下,通过基于分布式快照技术的Checkpoints,将执行过程中的状态信息进行持久化存储,一旦任务出现异常终止,Flink就能够从Checkpoints中进行任务的自动恢复,以确保数据中处理过程中的一致性。

[5] 保证了高可用,动态扩展,实现7 * 24小时全天候运行。

  • 支持高可用性配置(无单点失效),和Kubernetes、YARN、Apache Mesos紧密集成,快速故障恢复,动态扩缩容作业等。基于上述特点,它可以7 X 24小时运行流式应用,几乎无须停机。当需要动态更新或者快速恢复时,Flink通过Savepoints技术将任务执行的快照保存在存储介质上,当任务重启的时候可以直接从事先保存的Savepoints恢复原有的计算状态,使得任务继续按照停机之前的状态运行。

[6] 支持高度灵活的窗口操作。

  • Flink将窗口划分为基于Time、Count、Session,以及Data-driven等类型的窗口操作,窗口可以用灵活的触发条件定制化来达到对复杂流传输模式的支持,用户可以定义不同的窗口触发机制来满足不同的需求。

# 4.5.2 Flink应用场景

在实际生产的过程中,大量数据在不断地产生,例如金融交易数据、互联网订单数据、GPS定位数据、传感器信号、移动终端产生的数据、通信信号数据等,以及我们熟悉的网络流量监控、服务器产生的日志数据,这些数据最大的共同点就是实时从不同的数据源中产生,然后再传输到下游的分析系统。

针对这些数据类型主要包括以下场景,Flink对这些场景都有非常好的支持。

[1] 实时智能推荐

  • 利用Flink流计算帮助用户构建更加实时的智能推荐系统,对用户行为指标进行实时计算,对模型进行实时更新,对用户指标进行实时预测,并将预测的信息推送给Web/App端,帮助用户获取想要的商品信息,另一方面也帮助企业提高销售额,创造更大的商业价值。

[2] 复杂事件处理

  • 例如工业领域的复杂事件处理,这些业务类型的数据量非常大,且对数据的时效性要求较高。我们可以使用Flink提供的CEP(复杂事件处理)进行事件模式的抽取,同时应用Flink的SQL进行事件数据的转换,在流式系统中构建实时规则引擎。

[3] 实时欺诈检测

  • 在金融领域的业务中,常常出现各种类型的欺诈行为。运用Flink流式计算技术能够在毫秒内就完成对欺诈判断行为指标的计算,然后实时对交易流水进行规则判断或者模型预测,这样一旦检测出交易中存在欺诈嫌疑,则直接对交易进行实时拦截,避免因为处理不及时而导致的经济损失

[4] 实时数仓与ETL

  • 结合离线数仓,通过利用流计算等诸多优势和SQL灵活的加工能力,对流式数据进行实时清洗、归并、结构化处理,为离线数仓进行补充和优化。另一方面结合实时数据ETL处理能力,利用有状态流式计算技术,可以尽可能降低企业由于在离线数据计算过程中调度逻辑的复杂度,高效快速地处理企业需要的统计结果,帮助企业更好的应用实时数据所分析出来的结果。

[5] 流数据分析

  • 实时计算各类数据指标,并利用实时结果及时调整在线系统相关策略,在各类投放、无线智能推送领域有大量的应用。流式计算技术将数据分析场景实时化,帮助企业做到实时化分析Web应用或者App应用的各种指标。

[6] 实时报表分析

  • 实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用便是实时大屏展示。利用流式计算实时得出的结果直接被推送到前段应用,实时显示出重要的指标变换,最典型的案例就是淘宝的双十一实时战报。

# 4.6 Flink与Spark对比

# 4.6.1 各自的适用场景

对于以下场景,你可以选择 Spark:

  • 数据量非常大而且逻辑复杂的批数据处理,并且对计算效率有较高要求(比如用大数据分析来构建推荐系统进行个性化推荐、广告定点投放等);
  • 基于历史数据的交互式查询,要求响应较快;
  • 基于实时数据流的数据处理,延迟性要求在在数百毫秒到数秒之间。

Spark完美满足这些场景的需求,而且它可以一站式解决这些问题,无需用别的数据处理平台。

由于Flink是为了提升流处理而创建的平台,所以它适用于各种需要非常低延迟(微秒到毫秒级)的实时数据处理场景,比如实时日志报表分析。而且 Flink 用流处理去模拟批处理的思想,比Spark 用批处理去模拟流处理的思想扩展性更好。

# 4.6.2 国内现状

Spark中分布式RDD缓存是一个非常强大的功能,在这一点上比Flink好用很多,比如在实时计算过程中还需要一些离线大数据与之关联,就可以用Spark。Spark实时计算本来就是微批处理,所以批处理能做的事情流处理都能做,代码也是批流高度统一。

Flink重在它的高实时性,是真正的实时计算,在状态数据和checkpoint容错上做的比较好,能够做到exactly once,对实时性要求高肯定用Flink。还有一点值得注意的是,Spark比Flink好的地方就是Spark的executor死了不会导致整个job挂掉,而是会创建新的executor再重新执行失败的任务。而Flink某个taskmanager死了整个job就失败了,必须设置checkpoint来进行容错。总的来说,Flink是现在最好的实时计算框架。

# 4.6.3 流处理引擎对比

以下是目前主流的流处理引擎之间的对比:

流处理引擎对比

# 5. 搭建Flink环境

以下在MacOS、Debian分别搭建PyFlink的本地开发和正式运行环境,基础环境需要提前准备好JDK8、Python3,如何搭建此处不再赘述。不熟悉的话可见我的两个其他博客:MacOS的基础设置及使用指南 (opens new window)VPS基本部署环境的搭建与配置 (opens new window)

注:即使是使用 Python 语言使用 Flink,在执行 Flink 作业时还是依赖于JDK。所以使用 PyFlink 也需要JDK 环境,JDK 8 或者 JDK 11 版本都可以。

# 5.1 在MacOS搭建本地开发环境

下载安装包:

$ wget https://dlcdn.apache.org/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz
$ tar -xzf flink-1.16.1-bin-scala_2.12.tgz
1
2

启动与停止集群:

$ cd flink-1.16.1
$ ./bin/start-cluster.sh            // 启动Flink
$ ./bin/stop-cluster.sh             // 停止Flink
1
2
3

打开 http://localhost:8081 (opens new window) 地址,即可查看到 Flink 的管理面板。

Flink作业管理面板

安装apache-flink依赖:

实验环境:MacOS13、Python3.8(亲测Python3.9安装apache-flink后不可用,可以用Conda环境搭个Python3.8的环境)

$ pip3 install pycodestyle
$ pip3 install isort 
$ pip3 install apache-flink
1
2
3

注:我直接安装 apache-flink 时报错 pycodestyle 和 isort 找不到包,所以就先安装了这两个依赖,再安装apache-flink。

安装apache-flink依赖

# 5.2 在Debian搭建正式运行环境

安装包及启动方式与MacOS的本地开发环境一致,此处不再赘述。服务器环境由于默认的 8081 端口被占用,面板默认仅 localhost 访问,因此需要修改一下配置文件。Flink 管理面板没有账号密码验证,可通过 htpasswd 工具搭配 Nginx 去实现。

[1] 修改端口号及外网访问

$ vim ./conf/flink-conf.yaml

rest.port: your_server_port

jobmanager.bind-host: your_server_ip
rest.bind-address: your_server_ip
1
2
3
4
5
6

[2] 给面板添加访问密码

htpasswd是Apache附带的程序,htpasswd生成包含用户名和密码的文本文件,每行内容格式为“用户名:密码”,用于基本身份认证。

$ apt-get install apache2-utils
$ htpasswd -c /root/Flink/.flinkpassword your_account_user
1
2

修改nginx配置,在 Nginx的 location / {} 内添加如下代码片段,可以顺便把域名和https一并配置了。

    auth_basic           "input you user name and password";
    auth_basic_user_file /root/Flink/.flinkpassword;
1
2

之后重载nginx配置,再次访问就需要密码验证了。

$ nginx -t
$ nginx -s reload
1
2

# 6. k8s与云原生基本介绍

# 6.1 k8s简介

# 6.1.1 k8s是什么

Kubernetes,简称k8s,是用8代替中间8个字符“ubernete”而成的缩写。是Google开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。用于管理云平台中多个主机上的容器化的应用,Kubernetes的目标是让部署容器化的应用简单并且高效,它提供了应用部署,规划,更新,维护的一种机制。

Kubernetes 构建在 Docker 技术之上,为跨主机的容器化应用提供资源调度、服务发现、高可用管理和弹性伸缩等一整套功能,它提供了完善的管理工具,涵盖开发、部署测试、运维监控等各个环节。它的目标不仅仅是一个编排系统,更是提供一个规范,可以让你来描述集群的架构,定义服务的最终状态,Kubernetes 可以帮你将系统自动达到和维持在这个状态。

k8s简介

项目地址:https://github.com/kubernetes/kubernetes (opens new window)

官方文档:https://kubernetes.io/zh-cn/docs/reference/ (opens new window)

# 6.1.2 k8s优势及应用场景

Kubernetes为你提供了一个可弹性运行分布式系统的框架。Kubernetes 会满足你的扩展要求、故障转移、部署模式等。

  • 服务发现和负载均衡:Kubernetes 可以使用 DNS 名称或自己的 IP 地址公开容器,如果进入容器的流量很大, Kubernetes 可以负载均衡并分配网络流量,从而使部署稳定。
  • 存储编排:Kubernetes 允许你自动挂载你选择的存储系统,例如本地存储、公共云提供商等。
  • 自动部署和回滚:你可以使用 Kubernetes 描述已部署容器的所需状态,它可以以受控的速率将实际状态 更改为期望状态。例如,你可以自动化 Kubernetes 来为你的部署创建新容器, 删除现有容器并将它们的所有资源用于新容器。
  • 自动完成装箱计算:Kubernetes 允许你指定每个容器所需 CPU 和内存(RAM)。当容器指定了资源请求时,Kubernetes 可以做出更好的决策来管理容器的资源。
  • 自我修复:Kubernetes 重新启动失败的容器、替换容器、杀死不响应用户定义的 运行状况检查的容器,并且在准备好服务之前不将其通告给客户端。
  • 密钥与配置管理:Kubernetes 允许你存储和管理敏感信息,例如密码、OAuth 令牌和 ssh 密钥。你可以在不重建容器镜像的情况下部署和更新密钥和应用程序配置,也无需在堆栈配置中暴露密钥。
  • 高可扩展性:Kubernetes具体很高的可扩展性,这体现在整个架构的方方面面,包括CRI、CSI、CNI等等。使我们可以更好的扩展k8s,使其更加适应我们的业务需求。

# 6.1.3 k8s基本架构及术语

k8s 由众多组件组成,组件间通过 API 互相通信,归纳起来主要分为三个部分。

  • Controller Manager,即控制平面,用于调度程序以及节点状态检测。
  • Nodes,构成了Kubernetes集群的集体计算能力,实际部署容器运行的地方。
  • Pods,Kubernetes集群中资源的最小单位。
k8s基本架构

下面简单介绍一下k8s的常用术语,完整的详见:k8s官方文档-术语表 (opens new window)

  • 主机(Master):用于控制 Kubernetes 节点的计算机。所有任务分配都来自于此。

  • 节点(Node):负责执行请求和所分配任务的计算机。由 Kubernetes 主机负责对节点进行控制。

  • 容器集(Pod):被部署在单个节点上的,且包含一个或多个容器的容器组。同一容器集中的所有容器共享同一个 IP 地址、IPC、主机名称及其它资源。容器集会将网络和存储从底层容器中抽象出来。这样,您就能更加轻松地在集群中移动容器。

  • 复制控制器(Replication Controller):用于控制应在集群某处运行的完全相同的容器集副本数量。

  • 服务(Service):将工作内容与容器集分离。Kubernetes 服务代理会自动将服务请求分发到正确的容器集——无论这个容器集会移到集群中的哪个位置,甚至可以被替换掉。

# 6.2 云原生简介

# 6.2.1 云原生是什么

云原生(CloudNative)指的是基于在云中构建、运行应用程序的理念,而打造的一套技术体系。 不同于传统数据中心的软件开发、构建模式,云原生包含有“原生为云而设计”的含义,能够有效提升云上资源利用率、缩短开发周期。

# 6.2.2 云原生优缺点

云原生优点

  • 快速迭代:借助于云原生的微服务技术,在应用程序开发时,IT 技术人员能够将代码,解耦成独立的模块单元,以敏捷的开发周期和单体的独立功能,提升拓展性、降低互联性,加快项目交付节奏。
  • 自动部署:相比传统开发模式,云原生技术能够有效解决代码质量低、发布流程长等难题,以自动化部署、模块单元组合的方式,构建编译、测试、部署等环节一体化的高效能流程,推进智能化开发。
  • 独立高效:在开发创新的思路跃迁上,云原生的微服务化框架,完全突破旧有开发模式瓶颈,聚焦打造可独立发布单体应用目标,全方位解构僵化的传统 IT 架构系统,助力不同团队优化协作流程。

云原生缺点

  • 安全隐患:云原生因应用容器技术,容易对数据安全造成一定程度的安全隐患。攻击者如果从某个容器链接至主机,有可能在下一步攻击行动中,入侵更多非授权容器,导致大规模数据风险。

# 7. 搭建k8s开发环境

# 7.1 安装Docker-Desktop

MacOS 的 Docker-Desktop 自带单体k8s开发环境的支持,用它来安装非常方便。

mac-docker

# 7.2 启用Kubernetes服务

设置按钮——Kubernetes——勾选上Enable Kubernetes(第一次开启会自动进行下载安装)——Apply & restart

启用Kubernetes服务

完成后使用kubectl命令检查一下版本:

$ kubectl version --short

Flag --short has been deprecated, and will be removed in the future. The --short output will become the default.
Client Version: v1.25.4
Kustomize Version: v4.5.7
Server Version: v1.25.4
1
2
3
4
5
6

查看唯一的master节点:

$ kubectl get node

NAME             STATUS   ROLES           AGE   VERSION
docker-desktop   Ready    control-plane   31m   v1.25.4
1
2
3
4

查看默认的名称空间:

$ kubectl get ns  (全称 kubectl get namespace)

NAME              STATUS   AGE
default           Active   33m
kube-node-lease   Active   33m
kube-public       Active   33m
kube-system       Active   33m
1
2
3
4
5
6
7

查看默认的pods:

$ kubectl get pods --all-namespaces

NAMESPACE     NAME                                     READY   STATUS    RESTARTS   AGE
kube-system   coredns-565d847f94-bkv82                 1/1     Running   3          33m
kube-system   coredns-565d847f94-w4v5f                 1/1     Running   3          33m
kube-system   etcd-docker-desktop                      1/1     Running   4          33m
kube-system   kube-apiserver-docker-desktop            1/1     Running   4          33m
kube-system   kube-controller-manager-docker-desktop   1/1     Running   4          33m
kube-system   kube-proxy-bpnjq                         1/1     Running   3          33m
kube-system   kube-scheduler-docker-desktop            1/1     Running   4          33m
kube-system   storage-provisioner                      1/1     Running   4          33m
kube-system   vpnkit-controller                        1/1     Running   5          33m
1
2
3
4
5
6
7
8
9
10
11
12

# 8. RPC技术概述

# 8.1 什么是RPC

RPC(Remote Procedure Call Protocol)远程过程调用协议。一个通俗的描述是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。比较正式的描述是:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

那么我们至少从这样的描述中挖掘出几个要点:

  • RPC是协议:既然是协议就只是一套规范,那么就需要有人遵循这套规范来进行实现。目前典型的RPC实现包括:Dubbo、Thrift、gRPC等。
  • 网络协议和网络IO模型对其透明:既然RPC的客户端认为自己是在调用本地对象。那么传输层使用的是TCP/UDP还是HTTP协议,又或者是一些其他的网络协议它就不需要关心了。
  • 信息格式对其透明:我们知道在本地应用程序中,对于某个对象的调用需要传递一些参数,并且会返回一个调用结果。至于被调用的对象内部是如何使用这些参数,并计算出处理结果的,调用方是不需要关心的。那么对于远程调用来说,这些参数会以某种信息格式传递给网络上的另外一台计算机,这个信息格式是怎样构成的,调用方是不需要关心的。
  • 应该有跨语言能力:为什么这样说呢?因为调用方实际上也不清楚远程服务器的应用程序是使用什么语言运行的。那么对于调用方来说,无论服务器方使用的是什么语言,本次调用都应该成功,并且返回值也应该按照调用方程序语言所能理解的形式进行描述。
RPC过程调用模型图

# 8.2 为什么要用RPC

其实这是应用开发到一定的阶段的强烈需求驱动的。如果我们开发简单的单一应用,逻辑简单、用户不多、流量不大,那我们用不着。当我们的系统访问量增大、业务增多时,我们会发现单机系统已经无法承受。此时我们可以将业务拆分成几个互不关联的应用,分别部署在各自机器上,以划清逻辑并减小压力。此时,我们也可以不需要RPC,因为应用之间是互不关联的。

当我们的业务越来越多、应用也越来越多时,自然的,我们会发现有些功能已经不能简单划分开来或者划分不出来。此时可以将公共业务逻辑抽离出来,将之组成独立的服务Service应用 。而原有的、新增的应用都可以与那些独立的Service应用交互,以此来完成完整的业务功能。所以此时,我们急需一种高效的应用程序之间的通讯手段来完成这种需求,这时候就需要RPC来大显身手了。

# 8.3 RPC的基本原理

# 8.3.1 RPC基本组成

在一个典型 RPC 的使用场景中,包含了服务发现、负载、容错、网络传输、序列化等组件。

完整RPC框架的组成

一个 RPC 的核心功能主要有 5 个部分组成,分别是:客户端、客户端 Stub、网络传输模块、服务端 Stub、服务端等。

  • 客户端(Client):服务调用方。

  • 客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端。

  • 服务端存根(Server Stub):接收客户端发送过来的请求消息并进行解包,然后再调用本地服务进行处理。

  • 服务端(Server):服务的真正提供者。

  • 网络传输服务(Network Service):底层传输,可以是 TCP 或 HTTP。

    RPC网络传输的流程

# 8.3.2 RPC调用流程

一次RPC调用流程包含以下步骤。

  • Step1:服务消费者(Client 客户端)通过本地调用的方式调用服务。
  • Step2:客户端存根(Client Stub)接收到调用请求后负责将方法、入参等信息序列化(组装)成能够进行网络传输的消息体。
  • Step3:客户端存根(Client Stub)找到远程的服务地址,并且将消息通过网络发送给服务端。
  • Step4:服务端存根(Server Stub)收到消息后进行解码(反序列化操作)。
  • Step5:服务端存根(Server Stub)根据解码结果调用本地的服务进行相关处理
  • Step6:服务端(Server)本地服务业务处理。
  • Step7:处理结果返回给服务端存根(Server Stub)。
  • Step8:服务端存根(Server Stub)序列化结果。
  • Step9:服务端存根(Server Stub)将结果通过网络发送至消费方。
  • Step10:客户端存根(Client Stub)接收到消息,并进行解码(反序列化)。
  • Step11:服务消费方得到最终结果。

RPC调用流程

# 8.4 RPC技术选型

RPC 协议只规定了 Client 与 Server 之间的点对点调用流程,包括 stub、通信协议、RPC 消息解析等部分,在实际应用中,还需要考虑服务的高可用、负载均衡等问题,所以这里的 RPC 框架指的是能够完成 RPC 调用的解决方案,除了点对点的 RPC 协议的具体实现外,还可以包括服务的发现与注销、提供服务的多台 Server 的负载均衡、服务的高可用等更多的功能。 目前的 RPC 框架大致有两种不同的侧重方向,一种偏重于服务治理,另一种偏重于跨语言调用 。

偏重于服务治理——服务治理型

  • 服务治理型的RPC框架有Dubbo、DubboX等,Dubbo是阿里开源的分布式服务框架,能够实现高性能RPC调用,并且提供了丰富的管理功能,是十分优秀的RPC框架。DubboX是基于Dubbo框架开发的RPC框架,支持REST风格远程调用,并增加了一些新的feature。
  • 这类RPC框架的特点是功能丰富,提供高性能的远程调用以及服务发现及治理功能,适用于大型服务的微服务化拆分以及管理,对于特定语言如java的项目可以十分友好的透明化接入,但缺点是语言耦合度较高,跨语言支持难度较大。

偏重于跨语言调用——跨语言调用型

  • 跨语言调用型的RPC框架有Thrift、gRPC、Hessian、Hprose等,这一类的RPC框架重点关注于服务的跨语言调用,能够支持大部分的语言进行语言无关的调用,非常适合于为不同语言提供通用远程服务的场景。但这类框架没有服务发现相关机制,实际使用时一般需要代理层进行请求转发和负载均衡策略控制。

# 8.4.1 gRPC

[1] 基本介绍

gRPC是一个高性能、开源、通用的RPC框架,由Google推出,基于HTTP2协议标准设计开发,默认采用Protocol Buffers数据序列化协议,支持多种开发语言。gRPC提供了一种简单的方法来精确的定义服务,并且为客户端和服务端自动生成可靠的功能库。

在gRPC客户端可以直接调用不同服务器上的远程程序,使用姿势看起来就像调用本地程序一样,很容易去构建分布式应用和服务。和很多RPC系统一样,服务端负责实现定义好的接口并处理客户端的请求,客户端根据接口描述直接调用需要的服务。客户端和服务端可以分别使用gRPC支持的不同语言实现。

gRPC基本介绍

官方网站:https://grpc.io/ (opens new window)(具体要在哪个语言里使用gRPC就去官网找对应语言的教程即可)

gRPC官网

[2] 主要特性

gRPC的主要特性如下:

  • 强大的IDL:gRPC使用ProtoBuf来定义服务,ProtoBuf是由Google开发的一种数据序列化协议(类似于XML、JSON、hessian)。ProtoBuf能够将数据进行序列化,并广泛应用在数据存储、通信协议等方面。
  • 多语言支持:gRPC支持多种语言,并能够基于语言自动生成客户端和服务端功能库。目前已提供了C版本grpc、Java版本grpc-java 和 Go版本grpc-go,其它语言的版本正在积极开发中,其中,grpc支持C、C++、Node.js、Python、Ruby、Objective-C、PHP和C#等语言,grpc-java已经支持Android开发。
  • HTTP2:gRPC基于HTTP2标准设计,所以相对于其他RPC框架,gRPC带来了更多强大功能,如双向流、头部压缩、多复用请求等。这些功能给移动设备带来重大益处,如节省带宽、降低TCP链接次数、节省CPU使用和延长电池寿命等。同时,gRPC还能够提高了云端服务和Web应用的性能。gRPC既能够在客户端应用,也能够在服务器端应用,从而以透明的方式实现客户端和服务器端的通信和简化通信系统的构建。

# 8.4.2 Dubbo

[1] 基本介绍

Apache Dubbo 是一款 RPC 服务开发框架,用于解决微服务架构下的服务治理与通信问题,官方提供了 Java、Golang 等多语言 SDK 实现。使用 Dubbo 开发的微服务原生具备相互之间的远程地址发现与通信能力, 利用 Dubbo 提供的丰富服务治理特性,可以实现诸如服务发现、负载均衡、流量调度等服务治理诉求。Dubbo 被设计为高度可扩展,用户可以方便的实现流量拦截、选址的各种定制逻辑。

Dubbo3 定义为面向云原生的下一代 RPC 服务框架。3.0 基于 Dubbo 2.x 演进而来,在保持原有核心功能特性的同时, Dubbo3 在易用性、超大规模微服务实践、云原生基础设施适配、安全性等几大方向上进行了全面升级。

Dubbo架构图

官方网站:https://dubbo.apache.org/zh/ (opens new window)(目前主要支持Java、Go等多种语言)

Dubbo官网

[2] 主要特性

Dubbo的主要特性如下:

  • 高性能 RPC 通信协议:跨进程或主机的服务通信是 Dubbo 的一项基本能力,Dubbo RPC 以预先定义好的协议编码方式将请求数据(Request)发送给后端服务,并接收服务端返回的计算结果(Response)。RPC 通信对用户来说是完全透明的,使用者无需关心请求是如何发出去的、发到了哪里,每次调用只需要拿到正确的调用结果。
  • 自动服务(地址)发现:Dubbo 的服务发现机制,让微服务组件之间可以独立演进并任意部署,消费端可以在无需感知对端部署位置与 IP 地址的情况下完成通信。Dubbo 提供的是 Client-Based 的服务发现机制,使用者可以有多种方式启用服务发现。
  • 运行态流量管控:透明地址发现让 Dubbo 请求可以被发送到任意 IP 实例上,这个过程中流量被随机分配。当需要对流量进行更丰富、更细粒度的管控时,就可以用到 Dubbo 的流量管控策略,Dubbo 提供了包括负载均衡、流量路由、请求超时、流量降级、重试等策略,基于这些基础能力可以轻松的实现更多场景化的路由方案,Dubbo 支持流控策略在运行态动态生效,无需重新部署。
  • 丰富的扩展组件及生态:Dubbo 强大的服务治理能力不仅体现在核心框架上,还包括其优秀的扩展能力以及周边配套设施的支持。通过 Filter、Router、Protocol 等几乎存在于每一个关键流程上的扩展点定义,可以丰富 Dubbo 的功能或实现与其他微服务配套系统的对接,包括 Transaction、Tracing 目前都有通过 SPI 扩展的实现方案。
  • 面向云原生设计:Dubbo 从设计上是完全遵循云原生微服务开发理念的,这体现在多个方面,首先是对云原生基础设施与部署架构的支持,包括 容器、Kubernetes 等;另一方面,Dubbo 众多核心组件都已面向云原生升级,包括 Triple 协议、统一路由规则、对多语言的支持。

# 9. gRPC与Dubbo官方示例实践

# 9.1 gRPC官方示例实践

gRPC官方提供了使用示例,也许是依赖环境不同,照着官方文档操作会踩一些坑。本文的测试环境为 MacOS13 + Python3.9 + Java8 + Maven3.8.6。

# 9.1.1 gRPC-Python

gRPC-Python示例的官方文档:https://grpc.io/docs/languages/python/quickstart/ (opens new window)

[1] 下载示例源码并安装依赖

项目地址:https://github.com/grpc/grpc (opens new window)(示例代码在 grpc/examples/python/helloworld 目录下)

运行示例项目需要安装 grpcio 和 protobuf 库。

$ pip3 install grpcio
$ pip3 install protobuf==4.21
1
2

注:protobuf库需要指定版本,否则运行示例项目时会报如下错误ImportError: cannot import name 'builder' from 'google.protobuf.internal'

[2] 测试gRPC-Python示例程序

运行 greeter_server.py 和 greeter_client.py 即可,输出如下内容即为启动成功。

$ python3 greeter_server.py
Server started, listening on 50051

$ python3 greeter_client.py
Will try to greet world ...
Greeter client received: Hello, you!
1
2
3
4
5
6

# 9.1.2 gRPC-Java

[1] 下载示例源码并安装依赖

项目地址:https://github.com/grpc/grpc-java (opens new window)(示例代码在 grpc-java/examples 目录下)

$ git clone https://github.com/grpc/grpc-java
$ cd grpc-java/examples
1
2

配置好Maven,并拉取相关依赖,然后执行 mvn compile命令编译一下,这时虽然IDEA里依然显示爆红,但已经可以运行了。

[2] 测试gRPC-Java示例程序

先启动 HelloWorldServer.java

十二月 05, 2022 7:28:20 下午 io.grpc.examples.helloworld.HelloWorldServer start
信息: Server started, listening on 50051
1
2

后启动 HelloWorldClient.java

十二月 05, 2022 7:28:37 下午 io.grpc.examples.helloworld.HelloWorldClient greet
信息: Will try to greet world ...
十二月 05, 2022 7:28:38 下午 io.grpc.examples.helloworld.HelloWorldClient greet
信息: Greeting: Hello world
1
2
3
4

输出以上内容即为示例程序启动成功。

注意事项:跑gRPC-Java示例程序时,HelloWorldClient遇到了以下报错。我把50051端口的进程kill掉,重启就好了。

RPC failed: Status{code=UNKNOWN, description=null, cause=null}
1

# 9.2 Dubbo官方示例实践

Dubbo-Java示例的官方文档:https://cn.dubbo.apache.org/zh/docs3-v2/java-sdk/quick-start/spring-boot/ (opens new window)

# 9.2.1 搭建ZooKeeper服务

Dubbo 的运行需要依赖 ZooKeeper 服务,如果没有的话,可以使用 Docker Compose 搭建一个。

docker-compose.yml

version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    restart: always
1
2
3
4
5
6
7
8

执行如下命令运行容器。

$ docker-compose up -d     // 运行容器
1

# 9.2.2 测试Dubbo-Java示例程序

[1] 下载示例源码并修改配置

$ git clone https://github.com/apache/dubbo-samples.git
$ cd dubbo-samples/1-basic/dubbo-samples-spring-boot
$ tree -L 2

.
├── README.md
├── dubbo-samples-spring-boot-consumer
│   ├── pom.xml
│   └── src
├── dubbo-samples-spring-boot-interface
│   ├── pom.xml
│   └── src
├── dubbo-samples-spring-boot-provider
│   ├── pom.xml
│   └── src
└── pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

配置好Maven,并拉取相关依赖。修改 application.yml 里的 Zookeeper 服务地址,默认是 zookeeper://127.0.0.1:2181

[2] 运行Dubbo-Java示例程序

先启动 ProviderApplication.java

Hello World, request from consumer: xxx.xxx.xxx.xxx
1

后启动 ConsumerApplication.java

result: Hello World
1

输出以上内容即为示例程序启动成功。

# 10. 参考资料

[1] 大数据Hadoop生态圈包含哪些子系统 from 传智教育 (opens new window)

[2] Hadoop设置单节点集群 from Hadoop官方文档 (opens new window)

[3] Mac部署hadoop3(伪分布式) from 腾讯云 (opens new window)

[4] 在Mac OS X上开启ssh服务 from CSDN (opens new window)

[5] Hadoop(二)搭建伪分布式集群 from LanceToBigData (opens new window)

[6] Hadoop学习笔记:Hadoop及其生态系统简介 from 知乎 (opens new window)

[7] 什么是Flink?Flink能用来做什么?from CSDN (opens new window)

[8] 实时数据流计算引擎Flink和Spark剖析 from 知乎 (opens new window)

[9] Flink 如何支持特征工程、在线学习、在线预测等 AI 场景?from 阿里云 (opens new window)

[10] 基于 PyFlink 的学习文档,通过一个个小实践,便于小伙伴们快速入手 PyFlink from Github (opens new window)

[11] Flink的学习文档,包含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容 from Github (opens new window)

[12] 如何从 0 到 1 开发 PyFlink API 作业 from Flink学习网 (opens new window)

[13] 这个项目是为了帮助用户更容易地编写他们的 pyflink 作业 from Github (opens new window)

[14] Spark与Flink对比 from 51CTO博客 (opens new window)

[15] flink实现简单的用户密码认证 from 51CTO (opens new window)

[16] Flink 生态:一个案例快速上手 PyFlink from 阿里云 (opens new window)

[17] kubernetes官方文档 from kubernetes官网 (opens new window)

[18] Kubernetes 是什么? from redhat (opens new window)

[19] awesome-kubernetes-notes from sphinx (opens new window)

[20] k8s 教程 from GIthub (opens new window)

[21] Kubernetes中文文档 from Kubernetes中文社区 (opens new window)

[22] Kubernetes 基础教程 from 云原生资料库 (opens new window)

[23] 云原生架构概述 from dockone (opens new window)

[24] Kubernetes 介绍篇:是什么?为什么要用?from InfoQ (opens new window)

[25] Docker不香吗?为什么还要用K8s from 51CTO (opens new window)

[26] sealos--基于 Kubernetes 的云操作系统 from Github (opens new window)

[27] k9s--Kubernetes CLI 以风格管理您的集群 from Github (opens new window)

[28] kind--Kubernetes IN Docker - 用于测试 Kubernetes 的本地集群 from Github (opens new window)

[29] kopf--用 Python 操作 Kubernetes 的框架 from Github (opens new window)

[30] kubernetes 初体验-mac版 from 稀土掘金 (opens new window)

[31] 安装 Kubernetes Dashboard from k8s折腾笔记 (opens new window)

[32] RPC核心原理是什么?以及常用技术有哪些?from CSDN (opens new window)

[33] RPC框架:从原理到选型,一文带你搞懂RPC from 51CTO (opens new window)

[34] 如何理解 RPC 远程服务调用 from 技术文章摘抄 (opens new window)

[35] 架构设计:如何实现一个高性能分布式 RPC 框架 from 技术文章摘抄 (opens new window)

[36] RPC介绍与原理 from CSDN (opens new window)

[37] RPC框架(一)RPC简介 from CSDN (opens new window)

[38] RPC简介以及与RESTful对比 from Helloted Blog (opens new window)

[39] gRPC简介 from Gitbook (opens new window)

[40] Dubbo简介 from Dubbo官网 (opens new window)

[41] rpc框架之 grpc vs dubbo 性能比拼 from CSDN (opens new window)

[42] 示例演示了如何使用 Spring Boot 方式快速开发 Dubbo 应用 from Dubbo官网 (opens new window)

[43] ImportError: cannot import name 'builder' from 'google.protobuf.internal' from stackoverflow (opens new window)

[44] Flink 原理与实现:深入理解Flink核心技术 from 简书 (opens new window)

[45] Flink的设计与运行原理 from 厦门大学数据库实验室 (opens new window)

[46] Flink的原理和用法 from 亿速云 (opens new window)

[47] Flink分布式架构 from Flink官方文档 (opens new window)

Last Updated: 7/29/2023, 3:43:27 PM