“Flink实时数据处理”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
运行flink程序
运行flink程序
 
(未显示2个用户的14个中间版本)
第17行: 第17行:
 
c)每日'''访问流量'''||每日'''销售额'''实时统计
 
c)每日'''访问流量'''||每日'''销售额'''实时统计
  
d)打包程序
+
d)在IDEA中运行程序
  
e)提交任务
 
  
f)查看运行结果
 
  
 
==创建mysql数据表==
 
==创建mysql数据表==
第32行: 第30行:
 
<nowiki>mysqld (pid  1409) is running.</nowiki>
 
<nowiki>mysqld (pid  1409) is running.</nowiki>
  
(2)输入以下代码进入mysql中
+
(2)输入以下代码进入mysql中(密码:root)
  
 
  <nowiki>mysql -uroot -proot</nowiki>
 
  <nowiki>mysql -uroot -proot</nowiki>
第62行: 第60行:
 
(2)更改StringAndDouble.class
 
(2)更改StringAndDouble.class
  
  package <包名>
+
  <nowiki>package objectclass
class <scala文件名>(datetime:String,sale:Double) {
+
 
  def getDate()= {datetime}
+
class StringAndDouble(datetime:String,sale:Double) {
  def getSale() ={sale}
+
    def getDate()={datetime}
}
+
    def getSale()={sale}
 +
}</nowiki>
  
 
===mysqlsink===
 
===mysqlsink===
第106行: 第105行:
 
实现flink数据处理
 
实现flink数据处理
  
(1)在flinkclass里面创建一个object对象
+
(1)在flinkclass里面创建一个'''object对象'''
  
 
[[文件:flinkclass1.png]]
 
[[文件:flinkclass1.png]]
 +
 +
Flink实时处理编程过程
 +
 +
<nowiki>package flinkclass
 +
import java.util.Properties
 +
 +
import mysqlsink.SaleSQLSink
 +
import objectclass.StringAndDouble
 +
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 +
import org.apache.flink.streaming.api.scala._
 +
import org.apache.flink.streaming.api.windowing.time.Time
 +
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 +
import org.apache.flink.api.common.serialization.SimpleStringSchema
 +
object SaleVolumn {
 +
  def main(args: Array[String]): Unit = {
 +
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 +
    //配置Kafka参数
 +
    val pro = new Properties()
 +
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
 +
    pro.setProperty("group.id","test")
 +
    //创建数据源
 +
    val stream  = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
 +
    //数据处理过程
 +
    val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11)
 +
      .map{x=>if(x(7).contains("buy")) (x(x.length-1),(x(5).toDouble,1.0)) else (x(x.length-1),(0.0,1.0))}.keyBy(0).timeWindow(Time.minutes(1)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2)))
 +
    //保存每日销售额到MYSQL表
 +
    data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLSink("salevolume"))
 +
    //保存每日点击流到mysql表
 +
    data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday"))
 +
    env.execute()
 +
  }
 +
}</nowiki>
 +
 +
flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060
 +
 +
==运行flink程序==
 +
===关闭flink===
 +
[root@master local]# cd /usr/local/flink-1.10.1/bin
 +
 +
[root@master bin]# stop-cluster.sh
 +
 +
<nowiki>Stopping taskexecutor daemon (pid: 7986) on host slave1.centos.com.
 +
Stopping taskexecutor daemon (pid: 5760) on host slave2.centos.com.
 +
Stopping taskexecutor daemon (pid: 3219) on host slave3.centos.com.
 +
Stopping standalonesession daemon (pid: 11586) on host master.centos.com.</nowiki>
 +
 +
===关闭kafka===
 +
①在所有节点查看kafka的状态,如果运行则kill掉
 +
 +
[[文件:Killkafka.png]]
 +
 +
jps
 +
 +
kill -9 3507
 +
 +
===关闭zookeeper===
 +
 +
===启动zookeeper===
 +
 +
必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper
 +
 +
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >log/zookeeper/zookeeper.log 2>1 &
 +
 +
===启动kafka===
 +
 +
③在所有节点启动zookeeper后在master,slave1,slave2上启动kafka
 +
 +
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
 +
 +
④Jps查看进程
 +
 +
[[文件:jincheng.png]]
 +
 +
===创建kafka主题===
 +
查看主题是否创建没有则创建shop主题
 +
 +
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181
 +
 
 +
<nowiki>__consumer_offsets
 +
shop
 +
test</nowiki>
 +
 +
[root@master kafka_2.11-2.3.1]#bin/kafka-topics.sh  --zookeeper 192.168.128.130:2181  --delete  --topic shop
 +
 +
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh -create --zookeeper 192.168.128.130:2181 -replication-factor 1 --partitions 1 --topic shop
 +
 +
[root@master kafka_2.11-2.3.1]#  bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181
 +
 +
<nowiki>__consumer_offsets
 +
shop
 +
test</nowiki>
 +
 +
===启动flink集群===
 +
 +
⑤在master启动flink集群
 +
 +
/usr/local/flink/start-cluster.sh
 +
 +
===启动flume采集===
 +
在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
 +
 +
crontab -e
 +
 +
查看采集的数据:
 +
 +
[root@master bin]# rm -rf /opt/flinkproject/
 +
 +
[root@master bin]# mkdir /opt/flinkproject
 +
 +
[root@master bin]# ls /opt/flinkproject/
 +
 +
2020-07-25-20-05-01.log
 +
 +
启动之前写好的flume采集方案
 +
 +
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-kafka.conf -n client -Dflume.root.logger=INFO,console
 +
 +
===运行flink客户端程序-SaleVolumn===
 +
 +
终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了
 +
 +
[[文件:bd20-6-16.png|600px]]
 +
 +
[[文件:mysqlsale1.png]]
 +
[[文件:Msyqlsale1-1.png]]
 +
 +
=项目开发二=
 +
a)设计与创建mysql结果存储表
 +
 +
b)编程读取kafka缓存数据
 +
 +
c)每日 门店 销售额 实时统计
 +
 +
d)IDEA中运行程序
 +
 +
==创建mysql数据表==
 +
我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下
 +
 +
(1)查看mysql是否是启动状态
 +
 +
<nowiki>#service mysqld status</nowiki>
 +
 +
<nowiki>mysqld (pid  1409) is running.</nowiki>
 +
 +
(2)输入以下代码进入mysql中
 +
 +
<nowiki>mysql -uroot -proot</nowiki>
 +
 +
create database fk_shop;  //创建数据存放库
 +
use fk_shop; //进入数据库
 +
create table store_sale( datetime varchar(20), store_id varchar(20), sale double);//创建门店日销售额表
 +
 +
show tables; //查看表
 +
 +
==定义mysql sink==
 +
 +
在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可
 +
 +
[[文件:Scalaflies.png]]
 +
 +
===otherclass===
 +
 +
里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段
 +
 +
因为需要统计的是每日(string)店铺(string)销售额(double)
 +
 +
所以项目一中我们需要定义三个字段的值
 +
 +
(1)在otherclass里面创建第一个scala文件,名为Store_SaleF(自定义)
 +
 +
[[文件:Store_SaleF.png]]
 +
 +
(2)更改Store_SaleF.class
 +
 +
package otherclass
 +
 +
class Store_SaleF (datetime:String,store:String,sale:Double){
 +
  def getDate()={
 +
    datetime
 +
  }
 +
  def getStore()={
 +
    store
 +
  }
 +
  def getSale()={
 +
    sale
 +
  }
 +
}
 +
 +
===mysqlsink===
 +
 +
(1)在mysqlsink里面创建第二个scala文件,名为SaleSQLSink自定义)
 +
 +
[[文件:Mysql_StoreSale.png]]
 +
 +
更改SaleSQLSink
 +
 +
<nowiki>package mysqlsink
 +
 +
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 +
import java.sql._
 +
 +
import objectclass.StringAndDouble
 +
class SaleSQLSink(table:String) extends RichSinkFunction[StringAndDouble] with Serializable {
 +
  //定义连接的MYSQL配置参数
 +
  var conn:Connection=_
 +
  var ps:PreparedStatement=_
 +
  val user="root"
 +
  val password="root"
 +
  val driver="com.mysql.jdbc.Driver"
 +
  val url="jdbc:mysql://192.168.128.130:3306/fk_shop"
 +
 +
  override def invoke(value: StringAndDouble): Unit = {
 +
    Class.forName(driver)
 +
    conn = DriverManager.getConnection(url,user,password)
 +
    val sql="insert into "+table+" values(?,?)"
 +
    ps = conn.prepareStatement(sql)
 +
    ps.setString(1,value.getDate())
 +
    ps.setDouble(2,value.getSale())
 +
    ps.executeUpdate()
 +
    if(ps!=null) ps.close()
 +
    if(conn!=null) conn.close()
 +
  }
 +
}
 +
</nowiki>
 +
 +
===flinkclass===
 +
 +
实现flink数据处理
 +
 +
(1)在flinkclass里面创建一个object对象,名为Store_Saleflink
 +
 +
[[文件:Store_Saleflink.png]]
 +
 +
Flink实时处理编程过程
 +
 +
package flinkclass
 +
import java.util.Properties
 +
import mysqlsink.Mysql_StoreSale
 +
import java.util.Properties
 +
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 +
import org.apache.flink.streaming.api.scala._
 +
import org.apache.flink.streaming.api.windowing.time.Time
 +
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 +
import org.apache.flink.api.common.serialization.SimpleStringSchema
 +
import otherclass.Store_SaleF
 +
object Store_Saleflink {
 +
  def main(args: Array[String]): Unit = {
 +
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 +
    //kafka集群参数配置
 +
    val pro = new Properties()
 +
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
 +
    pro.setProperty("group.id","test")
 +
    //创建源
 +
    val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
 +
    //数据转换
 +
    val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>((x(x.length-
 +
1),x(6)),x(5).toDouble)).keyBy(0).timeWindow(Time.minutes(1)).sum(1).map(x=>new Store_SaleF(x._1._1,x._1._2,x._2))
 +
    //数据存储
 +
    data.addSink(new Mysql_StoreSale("store_sale"))
 +
    env.execute()
 +
  }
 +
}
 +
 +
 +
==运行flink程序==
 +
 +
①在所有节点查看kafka的状态,如果运行则kill掉
 +
 +
[[文件:Killkafka.png]]
 +
 +
jps
 +
kill -9 3507
 +
 +
②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper
 +
 +
usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog &
 +
 +
③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka
 +
 +
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
 +
 +
④Jps查看进程
 +
 +
[[文件:jincheng.png]]
 +
 +
⑤在master启动flink集群
 +
 +
/usr/local/flink/start-cluster.sh
 +
 +
⑥查看主题是否创建没有则创建shop主题
 +
 +
kafka-topics.sh –zookeeper 192.168.128.130:2181–list  查看已创建主题
 +
kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题
 +
kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1
 +
–replication -factor 1  创建shop主题
 +
 +
⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
 +
 +
crontab -e
 +
 +
⑧启动之前写好的flume采集方案
 +
 +
fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-
 +
kafka.conf -n client -Dflume.root.logger=INFO,console
 +
 +
⑨终于到了IDEA中本地运行项目Store_Saleflink,无报错后,进入mysql中查看最开始定义的一个表,mysql表里面就有每日商品销售额了
 +
 +
[[文件:mysqlsale2.png]]
 +
 +
=项目开发三=
 +
 +
商品销量实时统计,并选出10大热销商品
 +
 +
a)编程flink程序
 +
 +
b)启动flink集群
 +
 +
c)IDEA中运行程序
 +
 +
 +
在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可
 +
 +
[[文件:Scalaflies.png]]
 +
 +
 +
===flinkclass===
 +
 +
实现flink数据处理
 +
 +
(1)在flinkclass里面创建一个object对象,名为GoodsOrder
 +
 +
[[文件:GoodsOrder.png]]
  
 
Flink实时处理编程过程
 
Flink实时处理编程过程
  
 
  package flinkclass
 
  package flinkclass
//导入相关包
 
 
  import java.util.Properties
 
  import java.util.Properties
import mysqlsink.SaleSQLsink
 
 
  import objectclass.StringAndDouble
 
  import objectclass.StringAndDouble
 
  import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 
  import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
第122行: 第451行:
 
  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 
  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 
  import org.apache.flink.api.common.serialization.SimpleStringSchema
 
  import org.apache.flink.api.common.serialization.SimpleStringSchema
  object SaleVolumn {
+
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 +
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 +
import org.apache.flink.util.Collector
 +
  object GoodsOrder {
 
   def main(args: Array[String]): Unit = {
 
   def main(args: Array[String]): Unit = {
    //初始化env
 
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     //配置kafka参数
+
     //kafka集群参数配置
 
     val pro = new Properties()
 
     val pro = new Properties()
     pro.setProperty("bootstrap.servers","192.168.128.130:9092")
+
     pro.setProperty("bootstrap.servers","master:9092")
 
     pro.setProperty("group.id","test")
 
     pro.setProperty("group.id","test")
     //创建数据源
+
     //创建源
 
     val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
 
     val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
     //数据处理过程
+
//数据转换
     val data = stream.map(x => x.split(",")).filter(x => x.nonEmpty && x.length == 11)
+
     // ProcessWindowFunction[In,Out,Key,Window]
  .map { x => if (x(7).contains("buy")) (x(x.length - 1), (x(5).toDouble, 1.0)) else
+
     val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>new
(x(x.length - 1), (0.0, 1.0)) }.keyBy(0).timeWindow(Time.minutes(1)).reduce((x, y) =>
+
StringAndDouble(x(4),1.0)).keyBy(x=>x.getDate()).timeWindow(Time.minutes(1)).process(new ProcessWindowFunction[StringAndDouble,String,String,TimeWindow]{
(x._1, (x._2._1 + y._2._1, x._2._2 + y._2._2)))
+
      override def process(key: String, context: Context, elements: Iterable[StringAndDouble], out: Collector[String]): Unit = {
    //保存每日销售额到mysql表
+
        val getsum = elements.toList.groupBy(x=>x.getDate()).map{tmp=>val ll=tmp._2.map(x=>x.getSale()).sum;(tmp._1,ll)}
    data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLsink("salevolume"))
+
        val e = getsum.toList.sortBy(x=>x._2).reverse.take(10)
    //保存每日点击流到mysql表
+
        e.map(x=>out.collect(System.currentTimeMillis().toString()+":"+x._1+":"+x._2))
    data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLsink("visitcount_everyday"))
+
      }
 +
    })
 +
    //数据存储
 +
    data.print()
 
     env.execute()
 
     env.execute()
 
   }
 
   }
 
  }
 
  }
 +
  
 
==运行flink程序==
 
==运行flink程序==
第186行: 第521行:
 
  kafka.conf -n client -Dflume.root.logger=INFO,console
 
  kafka.conf -n client -Dflume.root.logger=INFO,console
  
⑨终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了
+
⑨终于到了IDEA中本地运行项目GoodsOrder,查看结果
  
[[文件:mysqlsale1.png]]
+
[[文件:GoodsOrderflink.png]]
[[文件:Msyqlsale1-1.png]]
 

2020年8月14日 (五) 10:14的最新版本

Flink实时统计目标:

1.编程读取kafka缓存数据

2.商品实时统计:

项目一:每日销售额,每日访问流量实时统计

项目二:每日每家门店总销售额实时统计

项目三:每日商品销量实时统计,并选出10大热销商品

项目开发一

a)设计与创建mysql结果存储表

b)编程读取kafka缓存数据

c)每日访问流量||每日销售额实时统计

d)在IDEA中运行程序


创建mysql数据表

我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下

(1)查看mysql是否是启动状态

#service mysqld status

mysqld (pid 1409) is running.

(2)输入以下代码进入mysql中(密码:root)

mysql -uroot -proot
create database fk_shop;  //创建数据存放库
use fk_shop; //进入数据库
create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表
create table salevolume(datetime varchar(20),salevolume double);//创建销售额表
show tables; //查看表

定义mysql sink

在项目中创建个scala文件夹,再根据mysql数据表结构在scala文件中创建类似以下(名称可自定义)

Scalaflies.png

otherclass

里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段

因为创建的mysql表中销售额和点击流都属于double类型,日期为string类型

所以项目一中我们需要定义每日的点击流每日的销售额

(1)在otherclass里面创建第一个scala文件,名为StringAndDouble(自定义)

Otherclass.png

(2)更改StringAndDouble.class

package objectclass

class StringAndDouble(datetime:String,sale:Double) {
    def getDate()={datetime}
    def getSale()={sale}
}

mysqlsink

(1)在mysqlsink里面创建第二个scala文件,名为MySqlSinkToSale(自定义)

Mysqlsink.png

更改MySqlSinkToSale

package mysqlsink
import java.sql.DriverManager
import java.sql._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import otherclass.StringAndDouble
class MySqlSinkToSale(table:String) extends RichSinkFunction[StringAndDouble] with Serializable {
  var conn:Connection=_
  var ps:PreparedStatement=_
  val user="root"
  val password="root"
  val driver="com.mysql.jdbc.Driver"
  val url="jdbc:mysql://192.168.128.130:3306/fk_shop"
  override def invoke(value: StringAndDouble): Unit = {
    Class.forName(driver)
    conn=DriverManager.getConnection(url,user,password)
    val sql = "insert into "+ table +" values(?,?)"
    ps=conn.prepareStatement(sql)
    ps.setString(1,value.getDate())
    ps.setDouble(2,value.getSale())
    ps.executeUpdate()
    if(ps!=null) ps.close()
    if(conn!=null) conn.close()
  }
}


flinkclass

实现flink数据处理

(1)在flinkclass里面创建一个object对象

Flinkclass1.png

Flink实时处理编程过程

package flinkclass
import java.util.Properties

import mysqlsink.SaleSQLSink
import objectclass.StringAndDouble
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
object SaleVolumn {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //配置Kafka参数
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
    pro.setProperty("group.id","test")
    //创建数据源
    val stream  = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
    //数据处理过程
    val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11)
      .map{x=>if(x(7).contains("buy")) (x(x.length-1),(x(5).toDouble,1.0)) else (x(x.length-1),(0.0,1.0))}.keyBy(0).timeWindow(Time.minutes(1)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2)))
    //保存每日销售额到MYSQL表
    data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLSink("salevolume"))
    //保存每日点击流到mysql表
    data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday"))
    env.execute()
  }
}

flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060

运行flink程序

关闭flink

[root@master local]# cd /usr/local/flink-1.10.1/bin

[root@master bin]# stop-cluster.sh

Stopping taskexecutor daemon (pid: 7986) on host slave1.centos.com.
Stopping taskexecutor daemon (pid: 5760) on host slave2.centos.com.
Stopping taskexecutor daemon (pid: 3219) on host slave3.centos.com.
Stopping standalonesession daemon (pid: 11586) on host master.centos.com.

关闭kafka

①在所有节点查看kafka的状态,如果运行则kill掉

Killkafka.png

jps
kill -9 3507

关闭zookeeper

启动zookeeper

必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties >log/zookeeper/zookeeper.log 2>1 &

启动kafka

③在所有节点启动zookeeper后在master,slave1,slave2上启动kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &

④Jps查看进程

Jincheng.png

创建kafka主题

查看主题是否创建没有则创建shop主题

[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181

__consumer_offsets
shop
test

[root@master kafka_2.11-2.3.1]#bin/kafka-topics.sh --zookeeper 192.168.128.130:2181 --delete --topic shop

[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh -create --zookeeper 192.168.128.130:2181 -replication-factor 1 --partitions 1 --topic shop

[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181

__consumer_offsets
shop
test

启动flink集群

⑤在master启动flink集群

/usr/local/flink/start-cluster.sh

启动flume采集

在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动

crontab -e

查看采集的数据:

[root@master bin]# rm -rf /opt/flinkproject/

[root@master bin]# mkdir /opt/flinkproject

[root@master bin]# ls /opt/flinkproject/

2020-07-25-20-05-01.log

启动之前写好的flume采集方案

/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-kafka.conf -n client -Dflume.root.logger=INFO,console

运行flink客户端程序-SaleVolumn

终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了

Bd20-6-16.png

Mysqlsale1.png Msyqlsale1-1.png

项目开发二

a)设计与创建mysql结果存储表

b)编程读取kafka缓存数据

c)每日 门店 销售额 实时统计

d)IDEA中运行程序

创建mysql数据表

我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下

(1)查看mysql是否是启动状态

#service mysqld status

mysqld (pid 1409) is running.

(2)输入以下代码进入mysql中

mysql -uroot -proot
create database fk_shop;  //创建数据存放库
use fk_shop; //进入数据库
create table store_sale( datetime varchar(20), store_id varchar(20), sale double);//创建门店日销售额表
show tables; //查看表

定义mysql sink

在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可

Scalaflies.png

otherclass

里面存放的是第一个scala文件:主要功能是定义需要处理的数据字段

因为需要统计的是每日(string)店铺(string)销售额(double)

所以项目一中我们需要定义三个字段的值

(1)在otherclass里面创建第一个scala文件,名为Store_SaleF(自定义)

Store SaleF.png

(2)更改Store_SaleF.class

package otherclass
class Store_SaleF (datetime:String,store:String,sale:Double){
  def getDate()={
    datetime
  }
  def getStore()={
    store
  }
  def getSale()={
    sale
  }
}

mysqlsink

(1)在mysqlsink里面创建第二个scala文件,名为SaleSQLSink自定义)

Mysql StoreSale.png

更改SaleSQLSink

package mysqlsink

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import java.sql._

import objectclass.StringAndDouble
class SaleSQLSink(table:String) extends RichSinkFunction[StringAndDouble] with Serializable {
  //定义连接的MYSQL配置参数
  var conn:Connection=_
  var ps:PreparedStatement=_
  val user="root"
  val password="root"
  val driver="com.mysql.jdbc.Driver"
  val url="jdbc:mysql://192.168.128.130:3306/fk_shop"

  override def invoke(value: StringAndDouble): Unit = {
    Class.forName(driver)
    conn = DriverManager.getConnection(url,user,password)
    val sql="insert into "+table+" values(?,?)"
    ps = conn.prepareStatement(sql)
    ps.setString(1,value.getDate())
    ps.setDouble(2,value.getSale())
    ps.executeUpdate()
    if(ps!=null) ps.close()
    if(conn!=null) conn.close()
  }
}

flinkclass

实现flink数据处理

(1)在flinkclass里面创建一个object对象,名为Store_Saleflink

Store Saleflink.png

Flink实时处理编程过程

package flinkclass
import java.util.Properties
import mysqlsink.Mysql_StoreSale
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import otherclass.Store_SaleF
object Store_Saleflink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //kafka集群参数配置
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
    pro.setProperty("group.id","test")
    //创建源
    val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
    //数据转换
    val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>((x(x.length- 
1),x(6)),x(5).toDouble)).keyBy(0).timeWindow(Time.minutes(1)).sum(1).map(x=>new Store_SaleF(x._1._1,x._1._2,x._2))
    //数据存储
    data.addSink(new Mysql_StoreSale("store_sale"))
    env.execute()
  }
}


运行flink程序

①在所有节点查看kafka的状态,如果运行则kill掉

Killkafka.png

jps
kill -9 3507

②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper

usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog &

③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &

④Jps查看进程

Jincheng.png

⑤在master启动flink集群

/usr/local/flink/start-cluster.sh

⑥查看主题是否创建没有则创建shop主题

kafka-topics.sh –zookeeper 192.168.128.130:2181–list  查看已创建主题
kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题
kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 
–replication -factor 1  创建shop主题

⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动

crontab -e

⑧启动之前写好的flume采集方案

fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- 
kafka.conf -n client -Dflume.root.logger=INFO,console

⑨终于到了IDEA中本地运行项目Store_Saleflink,无报错后,进入mysql中查看最开始定义的一个表,mysql表里面就有每日商品销售额了

Mysqlsale2.png

项目开发三

商品销量实时统计,并选出10大热销商品

a)编程flink程序

b)启动flink集群

c)IDEA中运行程序


在项目中已经创建好了scala文件夹以及下面的包,下面就只需要写入scala程序即可

Scalaflies.png


flinkclass

实现flink数据处理

(1)在flinkclass里面创建一个object对象,名为GoodsOrder

GoodsOrder.png

Flink实时处理编程过程

package flinkclass
import java.util.Properties
import objectclass.StringAndDouble
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.util.Collector
object GoodsOrder {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //kafka集群参数配置
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","master:9092")
    pro.setProperty("group.id","test")
    //创建源
    val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
//数据转换
    // ProcessWindowFunction[In,Out,Key,Window]
    val data=stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11 && x(7).contains("buy")).map(x=>new 
StringAndDouble(x(4),1.0)).keyBy(x=>x.getDate()).timeWindow(Time.minutes(1)).process(new ProcessWindowFunction[StringAndDouble,String,String,TimeWindow]{
      override def process(key: String, context: Context, elements: Iterable[StringAndDouble], out: Collector[String]): Unit = {
       val getsum = elements.toList.groupBy(x=>x.getDate()).map{tmp=>val ll=tmp._2.map(x=>x.getSale()).sum;(tmp._1,ll)}
        val e = getsum.toList.sortBy(x=>x._2).reverse.take(10)
        e.map(x=>out.collect(System.currentTimeMillis().toString()+":"+x._1+":"+x._2))
      }
    })
    //数据存储
    data.print()
    env.execute()
  }
}


运行flink程序

①在所有节点查看kafka的状态,如果运行则kill掉

Killkafka.png

jps
kill -9 3507

②必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper

usr/local/kafka_2.11-2.3.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-2.3.1/config/zookeeper.properties >/tmp/startzklog &

③在所有节点启动zookeeper后在master,slave1,slave2,slave3上启动kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &

④Jps查看进程

Jincheng.png

⑤在master启动flink集群

/usr/local/flink/start-cluster.sh

⑥查看主题是否创建没有则创建shop主题

kafka-topics.sh –zookeeper 192.168.128.130:2181–list  查看已创建主题
kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题
kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 
–replication -factor 1  创建shop主题

⑦在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动

crontab -e

⑧启动之前写好的flume采集方案

fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- 
kafka.conf -n client -Dflume.root.logger=INFO,console

⑨终于到了IDEA中本地运行项目GoodsOrder,查看结果

GoodsOrderflink.png