“丝路通:Flink实时统计引擎”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
编写flink程序
 
第32行: 第32行:
 
     //数据处理过程
 
     //数据处理过程
 
     val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==14)
 
     val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==14)
       .map{x=>x(2),(x(7),1.0))}.keyBy(0).timeWindow(Time.minutes(1)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2)))
+
       .map{x=>x(16),(x(8),1.0))}.keyBy(0).timeWindow(Time.minutes(5)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2)))
 
     //keyBy(0) 按时间排序 ,timeWindow 时间窗口
 
     //keyBy(0) 按时间排序 ,timeWindow 时间窗口
  
    //保存每日销售额到MYSQL表
+
 
    data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLSink("salevolume"))
 
 
     //保存每日点击流到mysql表
 
     //保存每日点击流到mysql表
 
     data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday"))
 
     data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday"))

2020年11月9日 (一) 11:57的最新版本

编写flink程序

flinkclass

实现flink数据处理

(1)在flinkclass里面创建一个object对象

Flinkclass1.png

Flink实时处理编程过程

package flinkclass
import java.util.Properties

import mysqlsink.SaleSQLSink
import objectclass.StringAndDouble
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
object SaleVolumn {
 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //配置Kafka参数
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","192.168.128.130:9092")
    pro.setProperty("group.id","test")//设置组ID
    //创建数据源
    val stream  = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro))
    
    //数据处理过程
    val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==14)
      .map{x=>x(16),(x(8),1.0))}.keyBy(0).timeWindow(Time.minutes(5)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2)))
     //keyBy(0) 按时间排序 ,timeWindow 时间窗口


    //保存每日点击流到mysql表
    data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday"))
    env.execute()
  }
}

filter:过滤空行和长度不等于11的

map:统计销售额和访问量,

keyBy(0):对统计数据以第0项(时间项)为键 进行分组

timeWindow: 确定时间窗口

reduce: 对同组数据进行归约、合并

x._1,(x._2._1+y._2._1,x._2._2+y._2._2): x._1 时间,x._2._1+y._2._1 销售额相加 x._2._2+y._2._2 点击流相加

flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060

运行flink程序

关闭flink

[root@master local]# cd /usr/local/flink-1.10.1/bin

[root@master bin]# stop-cluster.sh

Stopping taskexecutor daemon (pid: 7986) on host slave1.centos.com.
Stopping taskexecutor daemon (pid: 5760) on host slave2.centos.com.
Stopping taskexecutor daemon (pid: 3219) on host slave3.centos.com.
Stopping standalonesession daemon (pid: 11586) on host master.centos.com.

关闭kafka

①在所有节点查看kafka的状态,如果运行则kill掉

Killkafka.png

jps
kill -9 3507

关闭zookeeper

启动zookeeper

必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties >log/zookeeper/zookeeper.log 2>1 &

启动kafka

③在所有节点启动zookeeper后在master,slave1,slave2上启动kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &

④Jps查看进程

Jincheng.png

创建kafka主题

查看主题是否创建没有则创建shop主题

[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181

__consumer_offsets
shop
test

[root@master kafka_2.11-2.3.1]#bin/kafka-topics.sh --zookeeper 192.168.128.130:2181 --delete --topic shop

[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh -create --zookeeper 192.168.128.130:2181 -replication-factor 1 --partitions 1 --topic shop

[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181

__consumer_offsets
shop
test

启动flink集群

⑤在master启动flink集群

/usr/local/flink/start-cluster.sh

启动flume采集

在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动

crontab -e

查看采集的数据:

[root@master bin]# rm -rf /opt/flinkproject/

[root@master bin]# mkdir /opt/flinkproject

[root@master bin]# ls /opt/flinkproject/

2020-07-25-20-05-01.log

启动之前写好的flume采集方案

/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-kafka.conf -n client -Dflume.root.logger=INFO,console

运行flink客户端程序-SaleVolumn

终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了

Bd20-6-16.png

Mysqlsale1.png

Msyqlsale1-1.png

将flink程序打包

刚才我们是将flink在客户端(PC机)上运行的,

我们还可以将flink程序 放到集群中去运行,这就需要将flink打包。

Bd20-6-25.png

Bd20-6-26.png