Flink实时数据处理
目录
Flink实时统计目标:
1.编程读取kafka缓存数据
2.商品实时统计:
(1)每日销售额,每日访问流量实时统计
(2)每日每家门店总销售额实时统计
(3)每日商品销量实时统计,并选出10大热销商品
(实现过程中,时间单位设置为1~2分钟)
配置开发环境:intellij & IDEA && Maven &&JDK
1.pom.xml配置,打开IDEA<我这里使用的是2018.3.6版本>
(1)首先创建一个新项目Create New Project,操作如下
(2)右下角都会出现 Maven projects need to be imported(项目需要导入依赖)<IDEA2020版本取消了导入默认依赖提示>
修改settings.xml的配置(关于settings.xml位置参照上方newproject第3张图),将原settings.xml内容删除,没有则创建一个,将下列代码复制到里面
<?xml version="1.0" encoding="UTF-8"?> <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"> <mirrors> <mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>uk</id> <mirrorOf>central</mirrorOf> <name>Human Readable Name for this Mirror.</name> <url>http://uk.maven.org/maven2/</url> </mirror> <mirror> <id>CN</id> <name>OSChina Central</name> <url>http://maven.oschina.net/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>nexus</id> <name>internal nexus repository</name> <url>http://repo.maven.apache.org/maven2</url> <mirrorOf>central</mirrorOf> </mirror> </mirrors> </settings>
(3)修改修改java编译程序的版本为1.8(直接修改-确认)与pom.xml默认依赖(根据以下代码进行局部更改)
<properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
(4)向pom.xml中的<dependencies></dependencies>添加依赖
<!-1flink-steaming依赖:流计算-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.1</version> </dependency>
<!-2flink-client客户端依赖-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.10.1</version> </dependency>
<!-3flink连接kafka依赖-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.1</version> </dependency>
<!-4mysql依赖:将数据存入mysql中-->
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.45</version> </dependency>
<!-5添加定义日志格式-->
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency>
(5)添加完依赖以后就可以看到左侧External libraries里面有导入的依赖包,以及它的jar文件
2.添加scala支持
(1)在src下main中创建一个名为scala的包
(2)选中刚刚创建的scala包,右键Mark Directory as 点击 Sources Root:将scala做成一个源码包
(3)在setting,plugins中安装scala(时间可能较久),然后选择项目文件添加scala支持
3.IDEA中log配置
(1) 在所创建项目中的main里面创建文件夹resources,增加文件log4j.properties
(2) 增加日志:log4j.properties中的日志来源于master节点
cd /usr/local/flink-1.9.2/conf/
cat log4j-console.properties
将cat到的内容全部复制到IDEA相应文件中如下所示
以上就是配置完成IDEA中的环境,数据处理将在该工程下实现
项目开发一
a)编程读取kafka缓存数据
b)每日访问流量,每日销售额实时统计
c)设计与创建mysql结果存储表
d)打包程序
e)提交任务
f)查看运行结果
创建mysql数据表
我们会将处理后的数据存入mysql数据表中,所以现在进入master节点下, 查看mysql是否是启动状态
#service mysqld status
mysqld (pid 1409) is running.
输入以下代码进入mysql中
mysql -uroot -proot
create database fk_shop; //创建数据存放库 use fk_shop; //进入数据库 create table visitcount_everyday(datetime varchar(20) ,visicount double);//创建点击流表 create table salevolume(datetime varchar(20),salevolume double);//创建销售额表 show tables; //查看表
根据mysql数据表结构在idea中创建mysqlsink
1)在scala中创建一个class对象:里面包括实时统计的日期和点击量(放入objectclass包里)
更改StringAndDouble.class
package objectclass class StringAndDouble(datetime:String,sale:Double) { def getDate()= {datetime} def getSale() ={sale} }
2)在scala中再创建一个class对象 (放入mysqlsink包里)
更改SaleSQLsink'
package mysqlsink import java.sql._ import objectclass.StringAndDouble import org.apache.flink.streaming.api.functions.sink.RichSinkFunction class SaleSQLsink (table:String) extends RichSinkFunction[StringAndDouble] with Serializable { //定义mysql配置连接 var conn:Connection=_ //创建语句 var ps:PreparedStatement=_ // 定义连接的mysql配置参数 var user="root" var password="root" var driver= "com.mysql.jdbc.Driver" var url = "jdbc:mysql://192.168.128.130:3306/fk_shop" //连接到master节点下的fk_shop数据库 //重写invoke方法 override def invoke(value: StringAndDouble): Unit ={ Class.forName(driver)//加载并注册驱动 //执行语句d conn = DriverManager.getConnection(url,user,password) val sql = "insert into" + table + "values(?,?)" ps = conn.prepareStatement(sql) ps.setString(1,value.getDate()) ps.setDouble(2,value.getSale()) ps.executeUpdate() if(ps!=null){ps.close()} if(conn != null) conn.close() } }
实现Flink数据处理
在scala里面创建一个object对象 (放到flinkclass包里面)
Flink实时处理编程过程
package flinkclass //导入相关包 import java.util.Properties import mysqlsink.SaleSQLsink import objectclass.StringAndDouble import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema object SaleVolumn { def main(args: Array[String]): Unit = { //初始化env val env = StreamExecutionEnvironment.getExecutionEnvironment //配置kafka参数 val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.128.130:9092") pro.setProperty("group.id","test") //创建数据源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据处理过程 val data = stream.map(x => x.split(",")).filter(x => x.nonEmpty && x.length == 11) .map { x => if (x(7).contains("buy")) (x(x.length - 1), (x(5).toDouble, 1.0)) else (x(x.length - 1), (0.0, 1.0)) }.keyBy(0).timeWindow(Time.minutes(1)).reduce((x, y) => (x._1, (x._2._1 + y._2._1, x._2._2 + y._2._2))) //保存每日销售额到mysql表 data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLsink("salevolume")) //保存每日点击流到mysql表 data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLsink("visitcount_everyday")) env.execute() } }
运行flink程序
1) 在所有节点查看kafka的状态,如果运行则kill掉
jps kill -9 3507
2) 必须先启动zookeeper 再启动kafka,不然启动无效
zkServer.sh stop 关闭zookeeper zkServer.sh start 启动zookeeper zkServer.sh status 查看zookeeper的状态
3) 在所有节点启动zookeeper后在master,slave1,slave2上启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
4) Jps查看
5) 在master启动flink集群
/usr/local/flink/start-cluster.sh
6) 查看主题是否创建没有则创建shop主题
kafka-topics.sh –zookeeper 192.168.128.130:2181–list 查看已创建主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –delete –topic shop 删除shop主题 kafka-topics.sh –zookeeper 192.168.128.130:2181 –create –topic shop –partitions 1 –replication -factor 1 创建shop主题
7) 在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
crontab -e
8) 启动之前写好的flume采集方案
fume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory- kafka.conf -n client -Dflume.root.logger=INFO,console
9) 终于到了IDEA中本地运行salelumn.scala