1、第 16 期2023 年 8 月无线互联科技Wireless Internet TechnologyNo.16August,2023作者简介:刘潇(1986),男,山西长治人,工程师,硕士;研究方向:计算机技术应用。通信作者:季英凯(1971),男,吉林吉林人,高级工程师,硕士;研究方向:疾控信息化建设。基于 Flink 的电子疾病档案数据处理模型设计与实现刘 潇,季英凯(江苏省疾病预防控制中心 公共卫生信息所,江苏 南京 210009)摘要:“十四五”期间,疾控面临着汇集各业务条线的数据以形成动态实时的电子疾病档案的任务。针对现阶段疾控信息化工作的现状,文章基于 Flink 构建了一个电子疾
2、病档案的实时数据处理模型,使用消息中间件实现各个业务条线数据的发布与订阅。数据在 Flink 集群中实现了按主题目录的分流,检查与转换等操作,最终持久化写入 HBase 数据库,形成以个人信息为基础的各类业务数据的关联。实验与应用结果表明,该模型具有良好的数据处理能力,有效而可靠的实现了电子疾病档案的数据汇集。关键词:电子疾病档案;数据汇集;消息中间件;Flink中图分类号:TP391 文献标志码:A0 引言 随着疾控信息化工作的不断深入,疾控的传染病、公共卫生突发事件、慢性病、免疫规划、精神卫生等业务条线的信息系统在不断地建立与完善,由于业务系统在建设之初缺乏总体规划,各自为政,各类数据难以
3、支撑有效的业务协同服务,造成信息孤岛1。在当前各级疾控业务协作日渐紧密、内部一体化集成日渐成熟的大背景下2,为了有效地对业务数据进行汇聚与利用,中国疾病预防控制中心制定的疾病预防控制信息系统建设指导方案(2018 年版)要求,以国家和省统筹区域两级建设为重点,依托全员人口信息库等基础设施,构建实时共享的动态电子疾病档案(electronic diseases records,EDR)3-4,以个人健康为核心,贯穿整个生命周期,以出生和死亡 2 个重要的生命节点为开始和结束,全程记录疾病发生、发展及转归的监测信息,形成以个人基础信息(人口学信息、出生登记、死亡登记)为基础,包含体检筛查史、疾病诊
4、断史、检验检测史、治疗随访史、流行病学史和预防接种史等内容的主题数据目录5,以支撑疾控各类业务的交互协同,为政府决策分析提供有效的支持。如何利用现有的业务系统实现各类业务数据的汇集,以形成实时共享的电子疾病档案成了疾控信息化建设面临的一个新的问题。当前,疾控的各类业务系统于不同的时间由不同的开发公司建设,所采用的技术架构、业务流程以及业务数据的格式各不相同,各业务数据与电子疾病档案的数据标准均存在一定程度的差异。基于这些问题,为了实现电子疾病档案所需数据的抽取并进行有效的汇集,本文基于流式计算框架 Flink 建立一个电子疾病档案数据的实时处理模型,使用消息中间件Kafka 实现各类业务主题数
5、据的发布与订阅,使用Flink 实现各类业务数据按电子疾病档案主题目录数据的实时处理,使用 HBase 作为数据库实现电子疾病档案数据的汇集与有效关联,通过实验证明该模型的可行性与高效性。1 技术简介1.1 Flink 介绍 Apache Flink 是一个低延迟、高吞吐的分布式计算框架6,可对无界数据流与有界数据流进行计算7-8,相对于以 MapReduce 为代表的批处理计算框架、以 Spark Streaming 为代表的微批处理方案、以Storm 为 代 表 的 流 处 理 计 算 框 架,Flink 提 供 了DataStream API 与 DataSet API 分别对流处理与批
6、处理予以支持,其将批数据看成是有界的流数据,本质25第 16 期2023 年 8 月无线互联科技软件开发No.16August,2023上还是进行数据流的处理9。这种批流一体的特性使得 Flink 在执行计算时具有极高的灵活性与极低的延迟性。Flink 可在单独集群中运行,也可以在 Yarn、Mesos 等资源调度与管理框架上运行10。一个 Flink集群总是包含一个 JobManager 以及一个或多个TaskManager11。JobManager 负责处理 Job 提交、Job监控以及资源管理;TaskManager 运行 worker 进程,负责实际任务 Tasks 的执行。这些任务共
7、同组成了一个 Job。Flink 应用程序中的任务由用户定义的算子转换而来的流式 Dataflows 所组成,以构成有向无环图(DAG)以对数据流行处理,如图 1 所示。Flink 的Dataflows 由 Source、Transformation、Sink3 大部分组成。Source 可从数据源不断获取数据,Transformation 通过Flink 提供或用户自定义的各类算子灵活组合将获取的数据流进行各种业务逻辑的处理,最终由 Sink 输出到外部。Source 与 Sink 阶段都提供了包括 Kafka、ElasticSearch、MySQL、InfluxDB 等多种数据库引擎的专用
8、算子用以获取或输出数据,使用十分方便。图 1 Flink Dataflow Flink 的开发同时支持 Java 和 Scala 语言,提供了4 个不同抽象层级的 API,提供给用户以开发应用程序。第一层的 ProcessFunction API 是其他三层的调用与封装的基础。Flink 在这个 API 上实现最基础的流式处理能力。在该层用户可以进行有状态编程,实现复杂的时间语义处理。第二层是 Core APIs。该层包含应用于无界或有界数据流的 DataStream API 和应用于有界数据集的 DataSet API,提供各种形式的用户自定义转换(transformations)、联接(j
9、oins)、聚合(aggregations)、窗口(windows)和状态(state)等操作。第三层是 Table API。该层通过定义数据的 schema 提供类似于关系模型中的操作,比如 select、join、group-by 和 aggregate 等。第四层是 SQL API。这层在语义和程序表达式上都类似于 Table API,但其程序都是通过 SQL 表达式的形式来实现。1.2 Kafka 介绍 Kafka 是一个分布式的消息发布-订阅模式12的中间件系统。Kafka 在主题中保存消息的信息,生产者向主题写入数据,消费者从主题读取数据,从而实现数据的传递。这种隐式调用的风格使生
10、产者与消费者相互解耦,使其可以相互独立的变化。高性能、高吞吐、低延时是 Kafka 的显著的特性,虽然 Kafka 的消息保存在磁盘上,但由于采用了顺序写入、MMFiles(Memory Mapped Files)、Zero Copy、批量压缩等技术优化了读写性能13,使其可以突破传统的数据库、消息队列等数据引擎所受限的磁盘 IO瓶颈。因此即使是部署在普通的单机服务器上,Kafka 也能轻松支持每秒百万级的写入请求14,读写速度超过大部分的消息中间件,这种特性使得 Kafka在海量数据场景中应用广泛。2 设计与实现2.1 架构设计本文设计的电子疾病档案数据处理模型采用Kafka+Flink+H
11、Base 建立实时数据流的处理框架。在Kafka 中建立主题 Topic,疾控的各个业务系统作为生产者按规定的格式向主题中生产数据,通过这种模式将数据生产的任务交给各个数据系统的开发维护单位,避免直接操作多个业务系统带来不可预知的风险。Flink 程序作为消费者去 Kafka 的主题中获取数据并发送至下游进行数据类目的识别、分流以及格式转换等业务逻辑的处理操作,最终写入 Hbase 进行数据的持久化,并通过 HBase 表结构的设计实现以个人35第 16 期2023 年 8 月无线互联科技软件开发No.16August,2023信息为基础,各类电子疾病档案主题目录数据的关联。模型架构设计如图
12、2 所示。图 2 模型架构2.2 模型设计与实现2.2.1 消息中间件设计 由于疾控业务系统多,电子疾病档案所包含的数据类目繁杂且数据格式各异,而儿童预防接种等业务系统在某些时间段内产生的实时数据量很大,使用消息队列 Kafka 作为中间件来接收各类业务系统产生的数据是一个很适合的选择。一方面,利用 Kafka 出色的性能提高业务数据写入的响应时间,保证稳定性;另一方面,使得各个业务系统与 Flink 数据处理程序解耦。数据生产层的业务系统既可以将业务系统完成审批后的实时数据流接入消息队列,也可以批量对历史数据进行处理,业务数据生产者与消费程序各自独立运行。数据生产层的各个业务系统按约定的格式
13、将电子疾病档案系统需要的数据写入 Kafka 的主题中,为了使下游的消费者 Flink 程序能正确解析与处理,数据结构可以分为 3 个部分:第一部分代表该数据所属的电子疾病档案主题目录;第二部分代表该条数据的操作类型(新增、修改或删除);第三部分为业务数据所包含的具体内容,各个数据项统一使用制表符分隔(见表 1)。表 1 生产者数据结构数据类型数据表示内容第一部分6 位数字编码电子疾病档案主题目录第二部分1 位数字编码操作类型第三部分字符串电子疾病档案数据2.2.2 Flink 程序设计 Flink 程序使用 Scala 语言设计并实现,程序的Source 阶段使用 Flink 框架自带的 F
14、linkKafkaCon-sumer 不 断 地 从 Kafka 的 主 题 内 消 费 数 据。在Transformation 阶段,对获取的数据按电子疾病档案的主题目录进行分流,在各个主题目录对应的数据流中对其所属的数据进行各自不同的操作类型识别、数据格式检查、数据格式转换等操作。由于从 Kafak 获取的数据包含了电子疾病档案的各类数据目录,程序根据数据的前 6 位字符判断该条数据所属的主题目录,利用 Flink 提供的旁路输出 Side Output 功能对数据流进行切分,每类主题目录的数据被发送至其对应的子数据流中调用自定义的各类主题目录的数据处理函数进行处理,最终在各个子数据流的
15、Sink 阶段,将处理好的电子疾病档案数据写入 HBase 进行持久化。由于 Flink 没有提供已封装好的 HBase 的 Sink 算子,自定义算子实现 Flink 提供的 RichSinkFunction 接口来完成相关的对 HBase 的操作,Flink 程序的拓扑结构,如图 3 所示。图 3 程序拓扑结构45第 16 期2023 年 8 月无线互联科技软件开发No.16August,2023以死亡报告主题目录数据为例,Flink 程序关键代码如下:val environment=StreamExecutionEnvironment.getExecutionEnvironment/创建
16、可执行环境val outsideStream=environment.addSource(new FlinkKafkaConsumer String(topic,new SimpleStringSchema(),props)/使用FlinkKafkaConsumer 从 Kafka 消费数据.process(new ProcessFunctionString,String /在基本处理函数 ProcessFunction 中使用 Side Output函数对数据进行分流override def processElement(source:String,context:ProcessFuncti
17、on String,String#Context,collector:CollectorString):Unit=Source.substring(0,6)match /截取数据前六位获取所属主题目录 case 320601=context.output(death01,source.substring(7)/判断所属主题目录为死亡报告数据后获取剩余数据部分并标记其对应的数据流标识 death01 进行分流./其他主题目录判断与获取case _=context.output(or,source.substring(7).setParallelism(1)/分流算子并行度设置val deathS
18、tream=outsideStream.getSideOutput(death01)/分流后主题目录数据流的获取.deathStream.map(deathTransformation(_).setParallelism(2)/在自定义的 deathTransformation函数中完成其对应主题目录数据的操作类型识别、格式检查、转换操作.addSink(new deathHbaseSink()/自 定 义deathHbaseSink 函数对其对应主题目录数据进行持久化操作2.2.3 HBase 设计 在 HBase 电子疾病档案所属的命名空间 Name-space 中为各类主题目录建立各自所
19、属的表,考虑到之后的业务需要通过身份证号以个人信息为单位对 疾病档案进行综合展示与分析,对唯一标识表中的一行数据的 RowKey15使用身份证号加业务主键号的方式来生成,这样既保证了表中 RowKey 的唯一性,又可以使未来的业务端调用 Hbase 的 API 时,通过其提供的前置过滤器将身份证提取出来进行匹配与查询,而无需再建立二级索引或借助其他的工具。在Flink 程序进行数据持久化时,使用 Protobuf 工具提供的功能对电子疾病档案数据进行序列化压缩后,再关联其对应的 RowKey 写入 Hbase,使每个 RowKey 在表中只存储一个,提升空间的利用率。3 仿真实验 在疾控内部局
20、域网部署应用模型进行测试,使用的各类软件信息,如表 2 所示。集群硬件环境包含 10台 Cpu8 核,内存 32 G,操作系统为 64 位 Linux Centos 7.7 的虚拟机。其中,7 台为 Hadoop 与 Hbase 集群的部署提供支持。Hadoop 集群中包含了 2 个互为主从的管理节点 NameNode,5 个计算节点 DataNode,2 个资源调度管理节点 ResourceManager;HBase 集群中包含了两个互为主从的 HMaster 节点,3 个 RegionServer节点;Flink 集群部署于 DataNode 所在的 5 个计算节点,使用 Yarn Ses
21、sion 的模式对 Flink 作业进行调度与管理。Kafka 与 ZooKeeper 等中间件部署在另外三台虚拟机上。表 2 软件版本信息软件版本节点信息Hadoop-2.9.1NameNode:2,DataNode:5,ResourceManager:2HBase-2.2.2HMaster:2,RegionServer:3Flink-1.10.1TaskManager:3kafka_2.12 Broker:3ZooKeeper-3.4.143 节点实验从疾控内网数据库批量取出死亡报告信息与儿童预防接种信息各 10 000 条写入 Kafka 的电子疾病档案主题内,对 Flink 程序中的分
22、流算子以及死亡报告信息与儿童预防接种信息的数据处理算子分别设置不同的并行度的情况下,记录数据全部写入HBase 的运行时间情况,运行时间皆为 5 次实验后的平均值,结果如表 3 所示。55第 16 期2023 年 8 月无线互联科技软件开发No.16August,2023表 3 实验结果算子并行度运行时间/ms16 93623 44041 847从实验结果可以看出,在数据量较大的情况下,该模型的数据处理的实时性良好,且合理的提高分流算子或数据处理算子的并发度能有效地提高模型的数据处理能力,实验证明该模型具有良好的数据实时处理能力及弹性扩展能力。4 结语 为了建设电子疾病档案系统,实现疾控各个业
23、务条线数据的有效汇集,进而为业务协同以及辅助决策提供数据支持,本文根据疾控业务信息化建设的现状设计了一个数据实时处理模型,并使用消息中间件Kafka,流式计算框架 Flink 及宽列数据库 HBase 给出了模型的具体实现。实践表明,模型具有良好的数据处理能力,满足预计的设计目标,其分层设计的架构风格使得每一层都可以灵活根据需求独立的变化而不影响其他层。在业务生产数据量比较大的场景,可以单独为某类业务数据另设一个 Kafka 以提高数据吞吐量。在 Flink 程序的 Source 阶段,先进行数据的合流后再进行接下来的处理;而对于某些实时处理的数据量较大、数据处理任务较复杂的业务,也可以提高该
24、业务数据处理算子的并行度以提高数据处理速度。目前,该模型已成功应用在江苏省疾控中心的死亡报告、预防接种等部分业务条线数据的处理与汇集中,下一步的工作是根据电子疾病档案的数据规范要求进一步汇集数据,并结合统计分析工具与相关算法进行数据的利用与分析。参考文献1张诚,道理,毛丹,等.疾病预防控制信息化建设标准体系研究及应用J.中国卫生信息管理杂志,2022(1):55-62,73.2道理,夏天,张诚,等.新时代上海市疾病预防控制体系的信息化建设J.中国卫生资源,2022(1):126-128.3马家奇,赵自雄.“十三五”时期中国疾病控制信息化建设成效与展望J.中国卫生信息管理杂志,2021(3):3
25、14-318.4马家奇.全民健康信息化及其对慢性病防控的重要作用:慢性病全生命周期信息监测J.中华预防医学杂志,2020(4):378-384.5郭青,赵自雄,苏雪梅,等.基于电子疾病档案的疾病动态监测全周期管理模式研究J.中国卫生信息管理杂志,2020(4):411-415.6CARBONE P,KATSIFODIMOS A,EWEN S,et al.ApacheFlink:Stream and batch processing in a single engine J.Bulletin of the IEEE Computer Society Technical Committee on
26、Data Engineering,2015(4):28-38.7赵润发,娄渊胜,叶枫,等.基于 Flink 的工业大数据平台研究与应用J.计算机工程与设计,2022(3):886-894.8梁方玮,薛涛.面向物流服务的海量日志实时流处理平台J.计算机系统应用,2021(10):68-75.9毛亚青,王亮,胡俊峰.基于 Flink 的海量医学图像检索系统设计与实现J.计算机测量与控制,2020(9):212-217.10THAMSEN L,BEILHARZ J,VINH T T,et al.Mary,Hugo,and Hugo:Learning to schedule distributed d
27、ata-parallel processing jobs on shared clusters J.Concurrency and Computation:Practice and Experience,2021(18):1-12.11汪文豪,史雪荣.基于异构 Flink 集群的节点优先级调度策略J.计算机工程,2022(3):197-203.12EUGSTER P T,FELBER P A,RACHID GUERRAOUI,et al.The many faces of publish/subscribeJ.ACM Computing Surveys(CSUR),2003(2):114131
28、.13牟大恩.Kafka 入门与实践M.北京:人民邮电出版社,2017.14朱幼普,卢军.基于 Kafka 的分布式能效管理平台的设计与实现J.计算机与数字工程,2018(12):2620-2623.15圣文顺,徐爱萍.基于行键的 HBase 大数据文件存储转换与快速检索研究J.计算机应用研究,2019(12):3806-3810.(编辑 姚 鑫)(下转第 66 页)65第 16 期2023 年 8 月无线互联科技软件开发No.16August,2023Design of forest fire image detection system based on embedded AIPan Xi
29、ao1 Wang Xun2 Su Dongsheng1 Feng Lihong1 Wang Jun1 1.School of Industry&Art-Design Guangxi ECO-Engineering Vocational and Technology College Liuzhou 545004 China 2.School of Electronic Engineering Guangxi Normal University Guilin 541004 China Abstract In order to solve the problems of inconvenience
30、and high cost in current forest fire detection a forest fire smoke recognition system was designed using the embedded AI microcontroller of Kendryte K210.Utilize the existing forest fire smoke image data set use YOLOv2-Tiny convolutional neural network obtain the training model file through MaixHub
31、online Tensorflow in-depth learning open source framework and deploy it to the K210 microcontroller.Take the image through the OV2640 camera.Once the image contains the smog and flame form it can be identified on the display screen.The test results show that the FPS recognized by the system is about
32、 1315 frames and the accuracy of identifying and detecting smoke and flame can reach 91%and 90%respectively.Key words K210 YOLOv2-Tiny fire detection(上接第 56 页)Design and implementation of data processing model for electronic disease records based on FlinkLiu Xiao Ji Yingkai Department of Public Heal
33、th Information Jiangsu Center for Disease Control and Prevention Nanjing 210009 China Abstract During the 14th Five-Year Plan period CDC is faced with the task of collecting data from various business lines to form dynamic and real-time electronic diseases records.On the basic of the existing work f
34、oundation of CDC informatization a real-time processing model for electronic diseases records is designed and implemented based on Flink the message-oriented middleware is used to implement publication and subscription of data which come from various business lines Flink Cluster is used to split dat
35、a stream check and convert data by subject catalog HBase is used to store data which associated with various businesses based on personal information.Experimental and application results show that the model has good data processing ability effectively and reliably realizes the data collection of electronic disease records.Key words electronic diseases records data collection message-oriented middleware Flink66