查看“Flink实时数据处理”的源代码
←
Flink实时数据处理
跳转至:
导航
,
搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看与复制此页面的源代码。
=Flink实时统计目标:= 1.编程读取kafka缓存数据 2.商品实时统计: 项目一:每日销售额,每日访问流量实时统计 项目二:每日每家门店总销售额实时统计 项目三:每日商品销量实时统计,并选出10大热销商品 =项目开发一= a)设计与创建mysql结果存储表 b)编程读取kafka缓存数据 c)每日'''访问流量'''||每日'''销售额'''实时统计 d)在IDEA中运行程序 ==创建mysql数据表== 我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下 (1)查看mysql是否是启动状态 <nowiki>#service mysqld status</nowiki> <nowiki>mysqld (pid 1409) is running.</nowiki> (2)输入以下代码进入mysql中 <nowiki>mysql -uroot -proot</nowiki> create database fk_shop; //创建数据存放库 use fk_shop; //进入数据库 create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表 create table salevolume(datetime varchar(20),salevolume double);//创建销售额表 show tables; //查看表 ==定义mysql sink== 在项目中创建个scala文件夹,再根据mysql数据表结构在scala文件中创建类似以下'''包'''(名称可自定义) [[文件:Scalaflies.png]] ===otherclass=== 里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段 因为创建的mysql表中销售额和点击流都属于double类型,日期为string类型 所以项目一中我们需要定义'''每日的点击流'''与'''每日的销售额''' (1)在otherclass里面创建第一个scala文件,名为StringAndDouble(自定义) [[文件:Otherclass.png]] (2)更改StringAndDouble.class package <包名> class <scala文件名>(datetime:String,sale:Double) { def getDate()= {datetime} def getSale() ={sale} } ===mysqlsink=== (1)在mysqlsink里面创建第二个scala文件,名为MySqlSinkToSale(自定义) [[文件:Mysqlsink.png]] 更改MySqlSinkToSale package mysqlsink import java.sql.DriverManager import java.sql._ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import otherclass.StringAndDouble class MySqlSinkToSale(table:String) extends RichSinkFunction[StringAndDouble] with Serializable { var conn:Connection=_ var ps:PreparedStatement=_ val user="root" val password="root" val driver="com.mysql.jdbc.Driver" val url="jdbc:mysql://192.168.128.130:3306/fk_shop" override def invoke(value: StringAndDouble): Unit = { Class.forName(driver) conn=DriverManager.getConnection(url,user,password) val sql = "insert into "+ table +" values(?,?)" ps=conn.prepareStatement(sql) ps.setString(1,value.getDate()) ps.setDouble(2,value.getSale()) ps.executeUpdate() if(ps!=null) ps.close() if(conn!=null) conn.close() } } ===flinkclass=== 实现flink数据处理 (1)在flinkclass里面创建一个object对象 [[文件:flinkclass1.png]] Flink实时处理编程过程 package flinkclass //导入相关包 import java.util.Properties import mysqlsink.SaleSQLsink import objectclass.StringAndDouble import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema object SaleVolumn { def main(args: Array[String]): Unit = { //初始化env val env = StreamExecutionEnvironment.getExecutionEnvironment //配置kafka参数 val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.128.130:9092") pro.setProperty("group.id","test") //创建数据源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据处理过程 val data = stream.map(x => x.split(",")).filter(x => x.nonEmpty && x.length == 11) .map { x => if (x(7).contains("buy")) (x(x.length - 1), (x(5).toDouble, 1.0)) else (x(x.length - 1), (0.0, 1.0)) }.keyBy(0).timeWindow(Time.minutes(1)).reduce((x, y) => (x._1, (x._2._1 + y._2._1, x._2._2 + y._2._2))) //保存每日销售额到mysql表 data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLsink("salevolume")) //保存每日点击流到mysql表 data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLsink("visitcount_everyday")) env.execute() } } ==运行flink程序== ①在所有节点查看kafka的状态,如果运行则kill掉 [[文件:Killkafka.png]] jps kill -9 3507 ②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog & ③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog & ④Jps查看进程 [[文件:jincheng.png]] ⑤在master启动flink集群 /usr/local/flink/start-cluster.sh ⑥查看主题是否创建没有则创建shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 –replication -factor 1 创建shop主题 ⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动 crontab -e ⑧启动之前写好的flume采集方案 fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- kafka.conf -n client -Dflume.root.logger=INFO,console ⑨终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了 [[文件:mysqlsale1.png]] [[文件:Msyqlsale1-1.png]] =项目开发二= a)设计与创建mysql结果存储表 b)编程读取kafka缓存数据 c)每日 门店 销售额 实时统计 d)IDEA中运行程序 ==创建mysql数据表== 我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下 (1)查看mysql是否是启动状态 <nowiki>#service mysqld status</nowiki> <nowiki>mysqld (pid 1409) is running.</nowiki> (2)输入以下代码进入mysql中 <nowiki>mysql -uroot -proot</nowiki> create database fk_shop; //创建数据存放库 use fk_shop; //进入数据库 create table store_sale( datetime varchar(20), store_id varchar(20), sale double);//创建门店日销售额表 show tables; //查看表 ==定义mysql sink== 在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可 [[文件:Scalaflies.png]] ===otherclass=== 里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段 因为需要统计的是每日(string)店铺(string)销售额(double) 所以项目一中我们需要定义三个字段的值 (1)在otherclass里面创建第一个scala文件,名为Store_SaleF(自定义) [[文件:Store_SaleF.png]] (2)更改Store_SaleF.class package otherclass class Store_SaleF (datetime:String,store:String,sale:Double){ def getDate()={ datetime } def getStore()={ store } def getSale()={ sale } } ===mysqlsink=== (1)在mysqlsink里面创建第二个scala文件,名为Mysql_StoreSale(自定义) [[文件:Mysql_StoreSale.png]] 更改Mysql_StoreSale package mysqlsink import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import otherclass.Store_SaleF import java.sql._ class Mysql_StoreSale(table:String) extends RichSinkFunction[Store_SaleF] with Serializable { var conn:Connection=_ var ps:PreparedStatement=_ val user="root" val password="root" val driver="com.mysql.jdbc.Driver" val url="jdbc:mysql://192.168.128.130:3306/fk_shop" override def invoke(value: Store_SaleF): Unit = { Class.forName(driver) conn=DriverManager.getConnection(url,user,password) val sql="insert into "+table+" values(?,?,?)" ps=conn.prepareStatement(sql) ps.setString(1,value.getDate()) ps.setString(2,value.getStore()) ps.setDouble(3,value.getSale()) ps.executeUpdate() if(ps!=null) ps.close() if(conn!=null) conn.close() } } ===flinkclass=== 实现flink数据处理 (1)在flinkclass里面创建一个object对象,名为Store_Saleflink [[文件:Store_Saleflink.png]] Flink实时处理编程过程 package flinkclass import java.util.Properties import mysqlsink.Mysql_StoreSale import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import otherclass.Store_SaleF object Store_Saleflink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //kafka集群参数配置 val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.128.130:9092") pro.setProperty("group.id","test") //创建源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据转换 val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>((x(x.length- 1),x(6)),x(5).toDouble)).keyBy(0).timeWindow(Time.minutes(1)).sum(1).map(x=>new Store_SaleF(x._1._1,x._1._2,x._2)) //数据存储 data.addSink(new Mysql_StoreSale("store_sale")) env.execute() } } ==运行flink程序== ①在所有节点查看kafka的状态,如果运行则kill掉 [[文件:Killkafka.png]] jps kill -9 3507 ②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog & ③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog & ④Jps查看进程 [[文件:jincheng.png]] ⑤在master启动flink集群 /usr/local/flink/start-cluster.sh ⑥查看主题是否创建没有则创建shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 –replication -factor 1 创建shop主题 ⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动 crontab -e ⑧启动之前写好的flume采集方案 fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- kafka.conf -n client -Dflume.root.logger=INFO,console ⑨终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的一个表,mysql表里面就有每日商品销售额了 [[文件:mysqlsale2.png]]
返回至
Flink实时数据处理
。
导航菜单
个人工具
登录
命名空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
最近更改
随机页面
帮助
工具
链入页面
相关更改
特殊页面
页面信息