“Flink实时数据处理”的版本间的差异
Xiaolongyang(讨论 | 贡献) (创建页面,内容为“=Flink实时推荐系统= ==Flink实时统计目标:== (1)编程读取kafka缓存数据 (2)商品实时统计: 1.每日销售额,每日访问流量实时统计 2…”) |
(→运行flink程序) |
||
(未显示2个用户的59个中间版本) | |||
第1行: | 第1行: | ||
− | = | + | =Flink实时统计目标:= |
+ | 1.编程读取kafka缓存数据 | ||
− | + | 2.商品实时统计: | |
− | |||
− | + | 项目一:每日销售额,每日访问流量实时统计 | |
− | + | 项目二:每日每家门店总销售额实时统计 | |
− | + | 项目三:每日商品销量实时统计,并选出10大热销商品 | |
− | + | =项目开发一= | |
+ | a)设计与创建mysql结果存储表 | ||
− | ( | + | b)编程读取kafka缓存数据 |
+ | |||
+ | c)每日'''访问流量'''||每日'''销售额'''实时统计 | ||
+ | |||
+ | d)在IDEA中运行程序 | ||
+ | |||
+ | |||
+ | |||
+ | ==创建mysql数据表== | ||
+ | 我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下 | ||
+ | |||
+ | (1)查看mysql是否是启动状态 | ||
+ | |||
+ | <nowiki>#service mysqld status</nowiki> | ||
+ | |||
+ | <nowiki>mysqld (pid 1409) is running.</nowiki> | ||
+ | |||
+ | (2)输入以下代码进入mysql中(密码:root) | ||
+ | |||
+ | <nowiki>mysql -uroot -proot</nowiki> | ||
+ | |||
+ | create database fk_shop; //创建数据存放库 | ||
+ | use fk_shop; //进入数据库 | ||
+ | create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表 | ||
+ | create table salevolume(datetime varchar(20),salevolume double);//创建销售额表 | ||
+ | show tables; //查看表 | ||
+ | |||
+ | ==定义mysql sink== | ||
+ | |||
+ | 在项目中创建个scala文件夹,再根据mysql数据表结构在scala文件中创建类似以下'''包'''(名称可自定义) | ||
+ | |||
+ | [[文件:Scalaflies.png]] | ||
+ | |||
+ | ===otherclass=== | ||
+ | |||
+ | 里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段 | ||
+ | |||
+ | 因为创建的mysql表中销售额和点击流都属于double类型,日期为string类型 | ||
+ | |||
+ | 所以项目一中我们需要定义'''每日的点击流'''与'''每日的销售额''' | ||
+ | |||
+ | (1)在otherclass里面创建第一个scala文件,名为StringAndDouble(自定义) | ||
+ | |||
+ | [[文件:Otherclass.png]] | ||
+ | |||
+ | (2)更改StringAndDouble.class | ||
+ | |||
+ | <nowiki>package objectclass | ||
+ | |||
+ | class StringAndDouble(datetime:String,sale:Double) { | ||
+ | def getDate()={datetime} | ||
+ | def getSale()={sale} | ||
+ | }</nowiki> | ||
+ | |||
+ | ===mysqlsink=== | ||
+ | |||
+ | (1)在mysqlsink里面创建第二个scala文件,名为MySqlSinkToSale(自定义) | ||
+ | |||
+ | [[文件:Mysqlsink.png]] | ||
+ | |||
+ | 更改MySqlSinkToSale | ||
+ | |||
+ | package mysqlsink | ||
+ | import java.sql.DriverManager | ||
+ | import java.sql._ | ||
+ | import org.apache.flink.streaming.api.functions.sink.RichSinkFunction | ||
+ | import otherclass.StringAndDouble | ||
+ | class MySqlSinkToSale(table:String) extends RichSinkFunction[StringAndDouble] with Serializable { | ||
+ | var conn:Connection=_ | ||
+ | var ps:PreparedStatement=_ | ||
+ | val user="root" | ||
+ | val password="root" | ||
+ | val driver="com.mysql.jdbc.Driver" | ||
+ | val url="jdbc:mysql://192.168.128.130:3306/fk_shop" | ||
+ | override def invoke(value: StringAndDouble): Unit = { | ||
+ | Class.forName(driver) | ||
+ | conn=DriverManager.getConnection(url,user,password) | ||
+ | val sql = "insert into "+ table +" values(?,?)" | ||
+ | ps=conn.prepareStatement(sql) | ||
+ | ps.setString(1,value.getDate()) | ||
+ | ps.setDouble(2,value.getSale()) | ||
+ | ps.executeUpdate() | ||
+ | if(ps!=null) ps.close() | ||
+ | if(conn!=null) conn.close() | ||
+ | } | ||
+ | } | ||
+ | |||
+ | |||
+ | ===flinkclass=== | ||
+ | |||
+ | 实现flink数据处理 | ||
+ | |||
+ | (1)在flinkclass里面创建一个'''object对象''' | ||
+ | |||
+ | [[文件:flinkclass1.png]] | ||
+ | |||
+ | Flink实时处理编程过程 | ||
+ | |||
+ | <nowiki>package flinkclass | ||
+ | import java.util.Properties | ||
+ | |||
+ | import mysqlsink.SaleSQLSink | ||
+ | import objectclass.StringAndDouble | ||
+ | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment | ||
+ | import org.apache.flink.streaming.api.scala._ | ||
+ | import org.apache.flink.streaming.api.windowing.time.Time | ||
+ | import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer | ||
+ | import org.apache.flink.api.common.serialization.SimpleStringSchema | ||
+ | object SaleVolumn { | ||
+ | def main(args: Array[String]): Unit = { | ||
+ | val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
+ | //配置Kafka参数 | ||
+ | val pro = new Properties() | ||
+ | pro.setProperty("bootstrap.servers","192.168.128.130:9092") | ||
+ | pro.setProperty("group.id","test") | ||
+ | //创建数据源 | ||
+ | val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) | ||
+ | //数据处理过程 | ||
+ | val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11) | ||
+ | .map{x=>if(x(7).contains("buy")) (x(x.length-1),(x(5).toDouble,1.0)) else (x(x.length-1),(0.0,1.0))}.keyBy(0).timeWindow(Time.minutes(1)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))) | ||
+ | //保存每日销售额到MYSQL表 | ||
+ | data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLSink("salevolume")) | ||
+ | //保存每日点击流到mysql表 | ||
+ | data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday")) | ||
+ | env.execute() | ||
+ | } | ||
+ | }</nowiki> | ||
+ | |||
+ | flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060 | ||
+ | |||
+ | ==运行flink程序== | ||
+ | ===关闭flink=== | ||
+ | [root@master local]# cd /usr/local/flink-1.10.1/bin | ||
+ | |||
+ | [root@master bin]# stop-cluster.sh | ||
+ | |||
+ | <nowiki>Stopping taskexecutor daemon (pid: 7986) on host slave1.centos.com. | ||
+ | Stopping taskexecutor daemon (pid: 5760) on host slave2.centos.com. | ||
+ | Stopping taskexecutor daemon (pid: 3219) on host slave3.centos.com. | ||
+ | Stopping standalonesession daemon (pid: 11586) on host master.centos.com.</nowiki> | ||
+ | |||
+ | ===关闭kafka=== | ||
+ | ①在所有节点查看kafka的状态,如果运行则kill掉 | ||
+ | |||
+ | [[文件:Killkafka.png]] | ||
+ | |||
+ | jps | ||
+ | |||
+ | kill -9 3507 | ||
+ | |||
+ | ===关闭zookeeper=== | ||
+ | |||
+ | ===启动zookeeper=== | ||
+ | |||
+ | 必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper | ||
+ | |||
+ | nohup bin/zookeeper-server-start.sh config/zookeeper.properties >log/zookeeper/zookeeper.log 2>1 & | ||
+ | |||
+ | ===启动kafka=== | ||
+ | |||
+ | ③在所有节点启动zookeeper后在master,slave1,slave2上启动kafka | ||
+ | |||
+ | /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog & | ||
+ | |||
+ | ④Jps查看进程 | ||
+ | |||
+ | [[文件:jincheng.png]] | ||
+ | |||
+ | ===创建kafka主题=== | ||
+ | 查看主题是否创建没有则创建shop主题 | ||
+ | |||
+ | [root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181 | ||
+ | |||
+ | <nowiki>__consumer_offsets | ||
+ | shop | ||
+ | test</nowiki> | ||
+ | |||
+ | [root@master kafka_2.11-2.3.1]#bin/kafka-topics.sh --zookeeper 192.168.128.130:2181 --delete --topic shop | ||
+ | |||
+ | [root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh -create --zookeeper 192.168.128.130:2181 -replication-factor 1 --partitions 1 --topic shop | ||
+ | |||
+ | [root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181 | ||
+ | |||
+ | <nowiki>__consumer_offsets | ||
+ | shop | ||
+ | test</nowiki> | ||
+ | |||
+ | ===启动flink集群=== | ||
+ | |||
+ | ⑤在master启动flink集群 | ||
+ | |||
+ | /usr/local/flink/start-cluster.sh | ||
+ | |||
+ | ===启动flume采集=== | ||
+ | 在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动 | ||
+ | |||
+ | crontab -e | ||
+ | |||
+ | 查看采集的数据: | ||
+ | |||
+ | [root@master bin]# rm -rf /opt/flinkproject/ | ||
+ | |||
+ | [root@master bin]# mkdir /opt/flinkproject | ||
+ | |||
+ | [root@master bin]# ls /opt/flinkproject/ | ||
+ | |||
+ | 2020-07-25-20-05-01.log | ||
+ | |||
+ | 启动之前写好的flume采集方案 | ||
+ | |||
+ | /usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-kafka.conf -n client -Dflume.root.logger=INFO,console | ||
+ | |||
+ | ===运行flink客户端程序-SaleVolumn=== | ||
+ | |||
+ | 终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了 | ||
+ | |||
+ | [[文件:bd20-6-16.png|600px]] | ||
+ | |||
+ | [[文件:mysqlsale1.png]] | ||
+ | [[文件:Msyqlsale1-1.png]] | ||
+ | |||
+ | =项目开发二= | ||
+ | a)设计与创建mysql结果存储表 | ||
+ | |||
+ | b)编程读取kafka缓存数据 | ||
+ | |||
+ | c)每日 门店 销售额 实时统计 | ||
+ | |||
+ | d)IDEA中运行程序 | ||
+ | |||
+ | ==创建mysql数据表== | ||
+ | 我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下 | ||
+ | |||
+ | (1)查看mysql是否是启动状态 | ||
+ | |||
+ | <nowiki>#service mysqld status</nowiki> | ||
+ | |||
+ | <nowiki>mysqld (pid 1409) is running.</nowiki> | ||
+ | |||
+ | (2)输入以下代码进入mysql中 | ||
+ | |||
+ | <nowiki>mysql -uroot -proot</nowiki> | ||
+ | |||
+ | create database fk_shop; //创建数据存放库 | ||
+ | use fk_shop; //进入数据库 | ||
+ | create table store_sale( datetime varchar(20), store_id varchar(20), sale double);//创建门店日销售额表 | ||
+ | |||
+ | show tables; //查看表 | ||
+ | |||
+ | ==定义mysql sink== | ||
+ | |||
+ | 在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可 | ||
+ | |||
+ | [[文件:Scalaflies.png]] | ||
+ | |||
+ | ===otherclass=== | ||
+ | |||
+ | 里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段 | ||
+ | |||
+ | 因为需要统计的是每日(string)店铺(string)销售额(double) | ||
+ | |||
+ | 所以项目一中我们需要定义三个字段的值 | ||
+ | |||
+ | (1)在otherclass里面创建第一个scala文件,名为Store_SaleF(自定义) | ||
+ | |||
+ | [[文件:Store_SaleF.png]] | ||
+ | |||
+ | (2)更改Store_SaleF.class | ||
+ | |||
+ | package otherclass | ||
+ | |||
+ | class Store_SaleF (datetime:String,store:String,sale:Double){ | ||
+ | def getDate()={ | ||
+ | datetime | ||
+ | } | ||
+ | def getStore()={ | ||
+ | store | ||
+ | } | ||
+ | def getSale()={ | ||
+ | sale | ||
+ | } | ||
+ | } | ||
+ | |||
+ | ===mysqlsink=== | ||
+ | |||
+ | (1)在mysqlsink里面创建第二个scala文件,名为SaleSQLSink自定义) | ||
+ | |||
+ | [[文件:Mysql_StoreSale.png]] | ||
+ | |||
+ | 更改SaleSQLSink | ||
+ | |||
+ | <nowiki>package mysqlsink | ||
+ | |||
+ | import org.apache.flink.streaming.api.functions.sink.RichSinkFunction | ||
+ | import java.sql._ | ||
+ | |||
+ | import objectclass.StringAndDouble | ||
+ | class SaleSQLSink(table:String) extends RichSinkFunction[StringAndDouble] with Serializable { | ||
+ | //定义连接的MYSQL配置参数 | ||
+ | var conn:Connection=_ | ||
+ | var ps:PreparedStatement=_ | ||
+ | val user="root" | ||
+ | val password="root" | ||
+ | val driver="com.mysql.jdbc.Driver" | ||
+ | val url="jdbc:mysql://192.168.128.130:3306/fk_shop" | ||
+ | |||
+ | override def invoke(value: StringAndDouble): Unit = { | ||
+ | Class.forName(driver) | ||
+ | conn = DriverManager.getConnection(url,user,password) | ||
+ | val sql="insert into "+table+" values(?,?)" | ||
+ | ps = conn.prepareStatement(sql) | ||
+ | ps.setString(1,value.getDate()) | ||
+ | ps.setDouble(2,value.getSale()) | ||
+ | ps.executeUpdate() | ||
+ | if(ps!=null) ps.close() | ||
+ | if(conn!=null) conn.close() | ||
+ | } | ||
+ | } | ||
+ | </nowiki> | ||
+ | |||
+ | ===flinkclass=== | ||
+ | |||
+ | 实现flink数据处理 | ||
+ | |||
+ | (1)在flinkclass里面创建一个object对象,名为Store_Saleflink | ||
+ | |||
+ | [[文件:Store_Saleflink.png]] | ||
+ | |||
+ | Flink实时处理编程过程 | ||
+ | |||
+ | package flinkclass | ||
+ | import java.util.Properties | ||
+ | import mysqlsink.Mysql_StoreSale | ||
+ | import java.util.Properties | ||
+ | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment | ||
+ | import org.apache.flink.streaming.api.scala._ | ||
+ | import org.apache.flink.streaming.api.windowing.time.Time | ||
+ | import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer | ||
+ | import org.apache.flink.api.common.serialization.SimpleStringSchema | ||
+ | import otherclass.Store_SaleF | ||
+ | object Store_Saleflink { | ||
+ | def main(args: Array[String]): Unit = { | ||
+ | val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
+ | //kafka集群参数配置 | ||
+ | val pro = new Properties() | ||
+ | pro.setProperty("bootstrap.servers","192.168.128.130:9092") | ||
+ | pro.setProperty("group.id","test") | ||
+ | //创建源 | ||
+ | val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) | ||
+ | //数据转换 | ||
+ | val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>((x(x.length- | ||
+ | 1),x(6)),x(5).toDouble)).keyBy(0).timeWindow(Time.minutes(1)).sum(1).map(x=>new Store_SaleF(x._1._1,x._1._2,x._2)) | ||
+ | //数据存储 | ||
+ | data.addSink(new Mysql_StoreSale("store_sale")) | ||
+ | env.execute() | ||
+ | } | ||
+ | } | ||
+ | |||
+ | |||
+ | ==运行flink程序== | ||
+ | |||
+ | ①在所有节点查看kafka的状态,如果运行则kill掉 | ||
+ | |||
+ | [[文件:Killkafka.png]] | ||
+ | |||
+ | jps | ||
+ | kill -9 3507 | ||
+ | |||
+ | ②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper | ||
+ | |||
+ | usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog & | ||
+ | |||
+ | ③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka | ||
+ | |||
+ | /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog & | ||
+ | |||
+ | ④Jps查看进程 | ||
+ | |||
+ | [[文件:jincheng.png]] | ||
+ | |||
+ | ⑤在master启动flink集群 | ||
+ | |||
+ | /usr/local/flink/start-cluster.sh | ||
+ | |||
+ | ⑥查看主题是否创建没有则创建shop主题 | ||
+ | |||
+ | kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 | ||
+ | kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 | ||
+ | kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 | ||
+ | –replication -factor 1 创建shop主题 | ||
+ | |||
+ | ⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动 | ||
+ | |||
+ | crontab -e | ||
+ | |||
+ | ⑧启动之前写好的flume采集方案 | ||
+ | |||
+ | fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- | ||
+ | kafka.conf -n client -Dflume.root.logger=INFO,console | ||
+ | |||
+ | ⑨终于到了IDEA中本地运行项目Store_Saleflink,无报错后,进入mysql中查看最开始定义的一个表,mysql表里面就有每日商品销售额了 | ||
+ | |||
+ | [[文件:mysqlsale2.png]] | ||
+ | |||
+ | =项目开发三= | ||
+ | |||
+ | 商品销量实时统计,并选出10大热销商品 | ||
+ | |||
+ | a)编程flink程序 | ||
+ | |||
+ | b)启动flink集群 | ||
+ | |||
+ | c)IDEA中运行程序 | ||
+ | |||
+ | |||
+ | 在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可 | ||
+ | |||
+ | [[文件:Scalaflies.png]] | ||
+ | |||
+ | |||
+ | ===flinkclass=== | ||
+ | |||
+ | 实现flink数据处理 | ||
+ | |||
+ | (1)在flinkclass里面创建一个object对象,名为GoodsOrder | ||
+ | |||
+ | [[文件:GoodsOrder.png]] | ||
+ | |||
+ | Flink实时处理编程过程 | ||
+ | |||
+ | package flinkclass | ||
+ | import java.util.Properties | ||
+ | import objectclass.StringAndDouble | ||
+ | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment | ||
+ | import org.apache.flink.streaming.api.scala._ | ||
+ | import org.apache.flink.streaming.api.windowing.time.Time | ||
+ | import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer | ||
+ | import org.apache.flink.api.common.serialization.SimpleStringSchema | ||
+ | import org.apache.flink.streaming.api.windowing.windows.TimeWindow | ||
+ | import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction | ||
+ | import org.apache.flink.util.Collector | ||
+ | object GoodsOrder { | ||
+ | def main(args: Array[String]): Unit = { | ||
+ | val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
+ | //kafka集群参数配置 | ||
+ | val pro = new Properties() | ||
+ | pro.setProperty("bootstrap.servers","master:9092") | ||
+ | pro.setProperty("group.id","test") | ||
+ | //创建源 | ||
+ | val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) | ||
+ | //数据转换 | ||
+ | // ProcessWindowFunction[In,Out,Key,Window] | ||
+ | val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>new | ||
+ | StringAndDouble(x(4),1.0)).keyBy(x=>x.getDate()).timeWindow(Time.minutes(1)).process(new ProcessWindowFunction[StringAndDouble,String,String,TimeWindow]{ | ||
+ | override def process(key: String, context: Context, elements: Iterable[StringAndDouble], out: Collector[String]): Unit = { | ||
+ | val getsum = elements.toList.groupBy(x=>x.getDate()).map{tmp=>val ll=tmp._2.map(x=>x.getSale()).sum;(tmp._1,ll)} | ||
+ | val e = getsum.toList.sortBy(x=>x._2).reverse.take(10) | ||
+ | e.map(x=>out.collect(System.currentTimeMillis().toString()+":"+x._1+":"+x._2)) | ||
+ | } | ||
+ | }) | ||
+ | //数据存储 | ||
+ | data.print() | ||
+ | env.execute() | ||
+ | } | ||
+ | } | ||
+ | |||
+ | |||
+ | ==运行flink程序== | ||
+ | |||
+ | ①在所有节点查看kafka的状态,如果运行则kill掉 | ||
+ | |||
+ | [[文件:Killkafka.png]] | ||
+ | |||
+ | jps | ||
+ | kill -9 3507 | ||
+ | |||
+ | ②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper | ||
+ | |||
+ | usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog & | ||
+ | |||
+ | ③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka | ||
+ | |||
+ | /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog & | ||
+ | |||
+ | ④Jps查看进程 | ||
+ | |||
+ | [[文件:jincheng.png]] | ||
+ | |||
+ | ⑤在master启动flink集群 | ||
+ | |||
+ | /usr/local/flink/start-cluster.sh | ||
+ | |||
+ | ⑥查看主题是否创建没有则创建shop主题 | ||
+ | |||
+ | kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 | ||
+ | kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 | ||
+ | kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 | ||
+ | –replication -factor 1 创建shop主题 | ||
+ | |||
+ | ⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动 | ||
+ | |||
+ | crontab -e | ||
+ | |||
+ | ⑧启动之前写好的flume采集方案 | ||
+ | |||
+ | fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- | ||
+ | kafka.conf -n client -Dflume.root.logger=INFO,console | ||
+ | |||
+ | ⑨终于到了IDEA中本地运行项目GoodsOrder,查看结果 | ||
+ | |||
+ | [[文件:GoodsOrderflink.png]] |
2020年8月14日 (五) 10:14的最新版本
Flink实时统计目标:
1.编程读取kafka缓存数据
2.商品实时统计:
项目一:每日销售额,每日访问流量实时统计
项目二:每日每家门店总销售额实时统计
项目三:每日商品销量实时统计,并选出10大热销商品
项目开发一
a)设计与创建mysql结果存储表
b)编程读取kafka缓存数据
c)每日访问流量||每日销售额实时统计
d)在IDEA中运行程序
创建mysql数据表
我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下
(1)查看mysql是否是启动状态
#service mysqld status
mysqld (pid 1409) is running.
(2)输入以下代码进入mysql中(密码:root)
mysql -uroot -proot
create database fk_shop; //创建数据存放库 use fk_shop; //进入数据库 create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表 create table salevolume(datetime varchar(20),salevolume double);//创建销售额表 show tables; //查看表
定义mysql sink
在项目中创建个scala文件夹,再根据mysql数据表结构在scala文件中创建类似以下包(名称可自定义)
otherclass
里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段
因为创建的mysql表中销售额和点击流都属于double类型,日期为string类型
所以项目一中我们需要定义每日的点击流与每日的销售额
(1)在otherclass里面创建第一个scala文件,名为StringAndDouble(自定义)
(2)更改StringAndDouble.class
package objectclass class StringAndDouble(datetime:String,sale:Double) { def getDate()={datetime} def getSale()={sale} }
mysqlsink
(1)在mysqlsink里面创建第二个scala文件,名为MySqlSinkToSale(自定义)
更改MySqlSinkToSale
package mysqlsink import java.sql.DriverManager import java.sql._ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import otherclass.StringAndDouble class MySqlSinkToSale(table:String) extends RichSinkFunction[StringAndDouble] with Serializable { var conn:Connection=_ var ps:PreparedStatement=_ val user="root" val password="root" val driver="com.mysql.jdbc.Driver" val url="jdbc:mysql://192.168.128.130:3306/fk_shop" override def invoke(value: StringAndDouble): Unit = { Class.forName(driver) conn=DriverManager.getConnection(url,user,password) val sql = "insert into "+ table +" values(?,?)" ps=conn.prepareStatement(sql) ps.setString(1,value.getDate()) ps.setDouble(2,value.getSale()) ps.executeUpdate() if(ps!=null) ps.close() if(conn!=null) conn.close() } }
flinkclass
实现flink数据处理
(1)在flinkclass里面创建一个object对象
Flink实时处理编程过程
package flinkclass import java.util.Properties import mysqlsink.SaleSQLSink import objectclass.StringAndDouble import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema object SaleVolumn { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //配置Kafka参数 val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.128.130:9092") pro.setProperty("group.id","test") //创建数据源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据处理过程 val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11) .map{x=>if(x(7).contains("buy")) (x(x.length-1),(x(5).toDouble,1.0)) else (x(x.length-1),(0.0,1.0))}.keyBy(0).timeWindow(Time.minutes(1)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))) //保存每日销售额到MYSQL表 data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLSink("salevolume")) //保存每日点击流到mysql表 data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday")) env.execute() } }
flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060
运行flink程序
关闭flink
[root@master local]# cd /usr/local/flink-1.10.1/bin
[root@master bin]# stop-cluster.sh
Stopping taskexecutor daemon (pid: 7986) on host slave1.centos.com. Stopping taskexecutor daemon (pid: 5760) on host slave2.centos.com. Stopping taskexecutor daemon (pid: 3219) on host slave3.centos.com. Stopping standalonesession daemon (pid: 11586) on host master.centos.com.
关闭kafka
①在所有节点查看kafka的状态,如果运行则kill掉
jps
kill -9 3507
关闭zookeeper
启动zookeeper
必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >log/zookeeper/zookeeper.log 2>1 &
启动kafka
③在所有节点启动zookeeper后在master,slave1,slave2上启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
④Jps查看进程
创建kafka主题
查看主题是否创建没有则创建shop主题
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181
__consumer_offsets shop test
[root@master kafka_2.11-2.3.1]#bin/kafka-topics.sh --zookeeper 192.168.128.130:2181 --delete --topic shop
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh -create --zookeeper 192.168.128.130:2181 -replication-factor 1 --partitions 1 --topic shop
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181
__consumer_offsets shop test
启动flink集群
⑤在master启动flink集群
/usr/local/flink/start-cluster.sh
启动flume采集
在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
crontab -e
查看采集的数据:
[root@master bin]# rm -rf /opt/flinkproject/
[root@master bin]# mkdir /opt/flinkproject
[root@master bin]# ls /opt/flinkproject/
2020-07-25-20-05-01.log
启动之前写好的flume采集方案
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-kafka.conf -n client -Dflume.root.logger=INFO,console
运行flink客户端程序-SaleVolumn
终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了
项目开发二
a)设计与创建mysql结果存储表
b)编程读取kafka缓存数据
c)每日 门店 销售额 实时统计
d)IDEA中运行程序
创建mysql数据表
我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下
(1)查看mysql是否是启动状态
#service mysqld status
mysqld (pid 1409) is running.
(2)输入以下代码进入mysql中
mysql -uroot -proot
create database fk_shop; //创建数据存放库 use fk_shop; //进入数据库 create table store_sale( datetime varchar(20), store_id varchar(20), sale double);//创建门店日销售额表
show tables; //查看表
定义mysql sink
在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可
otherclass
里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段
因为需要统计的是每日(string)店铺(string)销售额(double)
所以项目一中我们需要定义三个字段的值
(1)在otherclass里面创建第一个scala文件,名为Store_SaleF(自定义)
(2)更改Store_SaleF.class
package otherclass
class Store_SaleF (datetime:String,store:String,sale:Double){ def getDate()={ datetime } def getStore()={ store } def getSale()={ sale } }
mysqlsink
(1)在mysqlsink里面创建第二个scala文件,名为SaleSQLSink自定义)
更改SaleSQLSink
package mysqlsink import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import java.sql._ import objectclass.StringAndDouble class SaleSQLSink(table:String) extends RichSinkFunction[StringAndDouble] with Serializable { //定义连接的MYSQL配置参数 var conn:Connection=_ var ps:PreparedStatement=_ val user="root" val password="root" val driver="com.mysql.jdbc.Driver" val url="jdbc:mysql://192.168.128.130:3306/fk_shop" override def invoke(value: StringAndDouble): Unit = { Class.forName(driver) conn = DriverManager.getConnection(url,user,password) val sql="insert into "+table+" values(?,?)" ps = conn.prepareStatement(sql) ps.setString(1,value.getDate()) ps.setDouble(2,value.getSale()) ps.executeUpdate() if(ps!=null) ps.close() if(conn!=null) conn.close() } }
flinkclass
实现flink数据处理
(1)在flinkclass里面创建一个object对象,名为Store_Saleflink
Flink实时处理编程过程
package flinkclass import java.util.Properties import mysqlsink.Mysql_StoreSale import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import otherclass.Store_SaleF object Store_Saleflink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //kafka集群参数配置 val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.128.130:9092") pro.setProperty("group.id","test") //创建源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据转换 val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>((x(x.length- 1),x(6)),x(5).toDouble)).keyBy(0).timeWindow(Time.minutes(1)).sum(1).map(x=>new Store_SaleF(x._1._1,x._1._2,x._2)) //数据存储 data.addSink(new Mysql_StoreSale("store_sale")) env.execute() } }
运行flink程序
①在所有节点查看kafka的状态,如果运行则kill掉
jps kill -9 3507
②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper
usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog &
③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
④Jps查看进程
⑤在master启动flink集群
/usr/local/flink/start-cluster.sh
⑥查看主题是否创建没有则创建shop主题
kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 –replication -factor 1 创建shop主题
⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
crontab -e
⑧启动之前写好的flume采集方案
fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- kafka.conf -n client -Dflume.root.logger=INFO,console
⑨终于到了IDEA中本地运行项目Store_Saleflink,无报错后,进入mysql中查看最开始定义的一个表,mysql表里面就有每日商品销售额了
项目开发三
商品销量实时统计,并选出10大热销商品
a)编程flink程序
b)启动flink集群
c)IDEA中运行程序
在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可
flinkclass
实现flink数据处理
(1)在flinkclass里面创建一个object对象,名为GoodsOrder
Flink实时处理编程过程
package flinkclass import java.util.Properties import objectclass.StringAndDouble import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.util.Collector object GoodsOrder { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //kafka集群参数配置 val pro = new Properties() pro.setProperty("bootstrap.servers","master:9092") pro.setProperty("group.id","test") //创建源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据转换 // ProcessWindowFunction[In,Out,Key,Window] val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>new StringAndDouble(x(4),1.0)).keyBy(x=>x.getDate()).timeWindow(Time.minutes(1)).process(new ProcessWindowFunction[StringAndDouble,String,String,TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[StringAndDouble], out: Collector[String]): Unit = { val getsum = elements.toList.groupBy(x=>x.getDate()).map{tmp=>val ll=tmp._2.map(x=>x.getSale()).sum;(tmp._1,ll)} val e = getsum.toList.sortBy(x=>x._2).reverse.take(10) e.map(x=>out.collect(System.currentTimeMillis().toString()+":"+x._1+":"+x._2)) } }) //数据存储 data.print() env.execute() } }
运行flink程序
①在所有节点查看kafka的状态,如果运行则kill掉
jps kill -9 3507
②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper
usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog &
③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
④Jps查看进程
⑤在master启动flink集群
/usr/local/flink/start-cluster.sh
⑥查看主题是否创建没有则创建shop主题
kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 –replication -factor 1 创建shop主题
⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
crontab -e
⑧启动之前写好的flume采集方案
fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- kafka.conf -n client -Dflume.root.logger=INFO,console
⑨终于到了IDEA中本地运行项目GoodsOrder,查看结果