“Flink实时数据处理”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
1.pom.xml配置,打开IDEA
第173行: 第173行:
  
 
以上就是配置完成IDEA中的环境,数据处理将在该工程下实现
 
以上就是配置完成IDEA中的环境,数据处理将在该工程下实现
 +
 +
=项目开发一=
 +
a)编程读取kafka缓存数据
 +
 +
b)每日'''访问流量''',每日'''销售额'''实时统计
 +
 +
c)设计与创建mysql结果存储表
 +
 +
d)打包程序
 +
 +
e)提交任务
 +
 +
f)查看运行结果
 +
 +
==创建mysql数据表==
 +
我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下,
 +
查看mysql是否是启动状态
 +
 +
<nowiki>#service mysqld status</nowiki>
 +
 +
<nowiki>mysqld (pid  1409) is running.</nowiki>
 +
 +
输入以下代码进入mysql中
 +
 +
<nowiki>mysql -uroot -proot</nowiki>
 +
 +
create database fk_shop;  //创建数据存放库
 +
use fk_shop; //进入数据库
 +
create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表
 +
create table salevolume(datetime varchar(20),salevolume double);//创建销售额表
 +
show tables; //查看表
 +
 +
==根据mysql数据表结构在idea中创建mysqlsink==
 +
 +
1)在scala中创建一个class对象:里面包括实时统计的日期和点击量(放入objectclass包里)
 +
 +
[[文件:objectclass.png]]
 +
 +
[[文件:StringAndDouble.png]]
 +
 +
更改StringAndDouble.class
 +
 +
package objectclass
 +
class StringAndDouble(datetime:String,sale:Double) {
 +
  def getDate()= {datetime}
 +
  def getSale() ={sale}
 +
}
 +
 +
2)在scala中再创建一个class对象 (放入mysqlsink包里)
 +
 +
[[文件:Mysqlsink1.png]]
 +
 +
[[文件:Salesqlsink.png]]
 +
 +
更改SaleSQLsink'
 +
 +
package mysqlsink
 +
import java.sql._
 +
import objectclass.StringAndDouble
 +
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 +
class SaleSQLsink (table:String) extends RichSinkFunction[StringAndDouble] with
 +
Serializable {
 +
  //定义mysql配置连接
 +
  var conn:Connection=_
 +
  //创建语句
 +
  var ps:PreparedStatement=_
 +
  //  定义连接的mysql配置参数
 +
  var user="root"
 +
  var password="root"
 +
  var driver= "com.mysql.jdbc.Driver" 
 +
  var url = "jdbc:mysql://192.168.128.130:3306/fk_shop"  //连接到master节点下的fk_shop数据库
 +
//重写invoke方法
 +
  override def invoke(value: StringAndDouble): Unit ={
 +
    Class.forName(driver)//加载并注册驱动
 +
    //执行语句d
 +
    conn = DriverManager.getConnection(url,user,password)
 +
    val sql = "insert into" + table + "values(?,?)"
 +
    ps = conn.prepareStatement(sql)
 +
    ps.setString(1,value.getDate())
 +
    ps.setDouble(2,value.getSale())
 +
    ps.executeUpdate()
 +
    if(ps!=null){ps.close()}
 +
    if(conn != null) conn.close()
 +
  }
 +
}
 +
 +
==实现Flink数据处理==
 +
 +
在scala里面创建一个object对象 (放到flinkclass包里面)
 +
 +
[[文件:Flinkclass.png]]
 +
 +
Flink实时处理编程过程
 +
 +
package flinkclass
 +
//导入相关包
 +
import java.util.Properties
 +
import mysqlsink.SaleSQLsink
 +
import objectclass.StringAndDouble
 +
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 +
import org.apache.flink.streaming.api.scala._
 +
import org.apache.flink.streaming.api.windowing.time.Time
 +
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 +
import org.apache.flink.api.common.serialization.SimpleStringSchema
 +
object SaleVolumn {
 +
  def main(args: Array[String]): Unit = {
 +
    //初始化env
 +
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 +
    //配置kafka参数
 +
    val pro = new Properties()
 +
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
 +
    pro.setProperty("group.id","test")
 +
    //创建数据源
 +
    val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
 +
    //数据处理过程
 +
    val data = stream.map(x => x.split(",")).filter(x => x.nonEmpty && x.length == 11)
 +
  .map { x => if (x(7).contains("buy")) (x(x.length - 1), (x(5).toDouble, 1.0)) else
 +
(x(x.length - 1), (0.0, 1.0)) }.keyBy(0).timeWindow(Time.minutes(1)).reduce((x, y) =>
 +
(x._1, (x._2._1 + y._2._1, x._2._2 + y._2._2)))
 +
    //保存每日销售额到mysql表
 +
    data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLsink("salevolume"))
 +
    //保存每日点击流到mysql表
 +
    data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLsink("visitcount_everyday"))
 +
    env.execute()
 +
  }
 +
}
 +
 +
运行flink程序
 +
 +
1) 在所有节点查看kafka的状态,如果运行则kill掉
 +
 +
[[文件:Killkafka.png]]
 +
 +
jps
 +
kill -9 3507
 +
 +
2) 必须先启动zookeeper 再启动kafka,不然启动无效
 +
 +
zkServer.sh stop 关闭zookeeper
 +
zkServer.sh start  启动zookeeper
 +
zkServer.sh status 查看zookeeper的状态
 +
 +
3) 在所有节点启动zookeeper后在master,slave1,slave2上启动kafka
 +
 +
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec
 +
>/tmp/klog &
 +
 +
4) Jps查看
 +
 +
5) 在master启动flink集群
 +
 +
/usr/local/flink/start-cluster.sh
 +
 +
6) 查看主题是否创建没有则创建shop主题
 +
 +
kafka-topics.sh –zookeeper 192.168.128.130:2181–list  查看已创建主题
 +
kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题
 +
kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1
 +
–replication -factor 1  创建shop主题
 +
 +
7) 在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
 +
 +
crontab -e
 +
 +
8) 启动之前写好的flume采集方案
 +
 +
fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-
 +
kafka.conf -n client -Dflume.root.logger=INFO,console
 +
 +
9) 终于到了IDEA中本地运行salelumn.scala

2020年8月6日 (四) 20:28的版本

Flink实时统计目标:

1.编程读取kafka缓存数据

2.商品实时统计:

(1)每日销售额,每日访问流量实时统计

(2)每日每家门店总销售额实时统计

(3)每日商品销量实时统计,并选出10大热销商品

(实现过程中,时间单位设置为1~2分钟)

配置开发环境:intellij & IDEA && Maven &&JDK

1.pom.xml配置,打开IDEA<我这里使用的是2018.3.6版本>

(1)首先创建一个新项目Create New Project,操作如下

Newproject1.png

Newproject2.png

Newproject3.png

Newproject4.png

(2)右下角都会出现 Maven projects need to be imported(项目需要导入依赖)<IDEA2020版本取消了导入默认依赖提示>

Enable Auto.png

修改settings.xml的配置(关于settings.xml位置参照上方newproject第3张图),将原settings.xml内容删除,没有则创建一个,将下列代码复制到里面

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
   <mirrors>
       <mirror>
           <id>alimaven</id>
           <name>aliyun maven</name>
           <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
           <mirrorOf>central</mirrorOf>
       </mirror>
       <mirror>
           <id>uk</id>
           <mirrorOf>central</mirrorOf>
           <name>Human Readable Name for this Mirror.</name>
           <url>http://uk.maven.org/maven2/</url>
       </mirror>
       <mirror>
           <id>CN</id>
           <name>OSChina Central</name>
           <url>http://maven.oschina.net/content/groups/public/</url>
           <mirrorOf>central</mirrorOf>
       </mirror>
       <mirror>
           <id>nexus</id>
           <name>internal nexus repository</name>
           <url>http://repo.maven.apache.org/maven2</url>
           <mirrorOf>central</mirrorOf>
       </mirror>
   </mirrors>
</settings>


(3)修改修改java编译程序的版本为1.8(直接修改-确认)与pom.xml默认依赖(根据以下代码进行局部更改)

Javacompiler.png

<properties>
       <maven.compiler.target>1.8</maven.compiler.target>
       <maven.compiler.source>1.8</maven.compiler.source>
</properties>
<build>
   <plugins>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
           <version>3.6.1</version>
           <configuration>
               <source>1.8</source>
               <target>1.8</target>
           </configuration>
       </plugin>
   </plugins>
</build>

(4)向pom.xml中的<dependencies></dependencies>添加依赖

<!-1flink-steaming依赖:流计算-->
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_2.11</artifactId>
   <version>1.10.1</version>
</dependency>
<!-2flink-client客户端依赖-->
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_2.11</artifactId>
   <version>1.10.1</version>
</dependency>
<!-3flink连接kafka依赖-->
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_2.11</artifactId>
   <version>1.10.1</version>
</dependency>
<!-4mysql依赖:将数据存入mysql中-->
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.45</version>
</dependency>
<!-5添加定义日志格式-->
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.7</version>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
   <scope>runtime</scope>
</dependency>

(5)添加完依赖以后就可以看到左侧External libraries里面有导入的依赖包,以及它的jar文件

Externallibraries.png

2.添加scala支持

(1)在src下main中创建一个名为scala的包

Createscala.png

(2)选中刚刚创建的scala包,右键Mark Directory as 点击 Sources Root:将scala做成一个源码包

Scalabag.png

(3)在setting,plugins中安装scala(时间可能较久),然后选择项目文件添加scala支持

Addscalasupport.png

3.IDEA中log配置

(1) 在所创建项目中的main里面创建文件夹resources,增加文件log4j.properties

(2) 增加日志:log4j.properties中的日志来源于master节点

cd /usr/local/flink-1.9.2/conf/

cat log4j-console.properties

将cat到的内容全部复制到IDEA相应文件中如下所示

Log.png

以上就是配置完成IDEA中的环境,数据处理将在该工程下实现

项目开发一

a)编程读取kafka缓存数据

b)每日访问流量,每日销售额实时统计

c)设计与创建mysql结果存储表

d)打包程序

e)提交任务

f)查看运行结果

创建mysql数据表

我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下, 查看mysql是否是启动状态

#service mysqld status

mysqld (pid 1409) is running.

输入以下代码进入mysql中

mysql -uroot -proot
create database fk_shop;  //创建数据存放库
use fk_shop; //进入数据库
create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表
create table salevolume(datetime varchar(20),salevolume double);//创建销售额表
show tables; //查看表

根据mysql数据表结构在idea中创建mysqlsink

1)在scala中创建一个class对象:里面包括实时统计的日期和点击量(放入objectclass包里)

Objectclass.png

StringAndDouble.png

更改StringAndDouble.class

package objectclass
class StringAndDouble(datetime:String,sale:Double) {
  def getDate()= {datetime}
  def getSale() ={sale}
}

2)在scala中再创建一个class对象 (放入mysqlsink包里)

Mysqlsink1.png

Salesqlsink.png

更改SaleSQLsink'

package mysqlsink
import java.sql._
import objectclass.StringAndDouble
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
class SaleSQLsink (table:String) extends RichSinkFunction[StringAndDouble] with 
Serializable {
 //定义mysql配置连接
 var conn:Connection=_
 //创建语句
 var ps:PreparedStatement=_
 //  定义连接的mysql配置参数
 var user="root"
 var password="root"
 var driver= "com.mysql.jdbc.Driver"  
 var url = "jdbc:mysql://192.168.128.130:3306/fk_shop"  //连接到master节点下的fk_shop数据库
//重写invoke方法
 override def invoke(value: StringAndDouble): Unit ={
   Class.forName(driver)//加载并注册驱动
   //执行语句d
   conn = DriverManager.getConnection(url,user,password)
   val sql = "insert into" + table + "values(?,?)"
   ps = conn.prepareStatement(sql)
   ps.setString(1,value.getDate())
   ps.setDouble(2,value.getSale())
   ps.executeUpdate()
   if(ps!=null){ps.close()}
   if(conn != null) conn.close()
 }
}

实现Flink数据处理

在scala里面创建一个object对象 (放到flinkclass包里面)

Flinkclass.png

Flink实时处理编程过程

package flinkclass
//导入相关包
import java.util.Properties
import mysqlsink.SaleSQLsink
import objectclass.StringAndDouble
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
object SaleVolumn {
  def main(args: Array[String]): Unit = {
    //初始化env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //配置kafka参数
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
    pro.setProperty("group.id","test")
    //创建数据源
    val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
    //数据处理过程
    val data = stream.map(x => x.split(",")).filter(x => x.nonEmpty && x.length == 11)
  .map { x => if (x(7).contains("buy")) (x(x.length - 1), (x(5).toDouble, 1.0)) else 
(x(x.length - 1), (0.0, 1.0)) }.keyBy(0).timeWindow(Time.minutes(1)).reduce((x, y) => 
(x._1, (x._2._1 + y._2._1, x._2._2 + y._2._2)))
    //保存每日销售额到mysql表
    data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLsink("salevolume"))
    //保存每日点击流到mysql表
    data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLsink("visitcount_everyday"))
    env.execute()
  }
}

运行flink程序

1) 在所有节点查看kafka的状态,如果运行则kill掉

Killkafka.png

jps
kill -9 3507

2) 必须先启动zookeeper 再启动kafka,不然启动无效

zkServer.sh stop 关闭zookeeper
zkServer.sh start  启动zookeeper
zkServer.sh status 查看zookeeper的状态

3) 在所有节点启动zookeeper后在master,slave1,slave2上启动kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec 
>/tmp/klog &

4) Jps查看

5) 在master启动flink集群

/usr/local/flink/start-cluster.sh

6) 查看主题是否创建没有则创建shop主题

kafka-topics.sh –zookeeper 192.168.128.130:2181–list  查看已创建主题
kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题
kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 
–replication -factor 1  创建shop主题

7) 在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动

crontab -e

8) 启动之前写好的flume采集方案

fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- 
kafka.conf -n client -Dflume.root.logger=INFO,console

9) 终于到了IDEA中本地运行salelumn.scala