“Flink点击流和销售额计算”的版本间的差异
(→编写flink程序) |
|||
(未显示同一用户的1个中间版本) | |||
第39行: | 第39行: | ||
} | } | ||
}</nowiki> | }</nowiki> | ||
+ | |||
+ | filter:过滤空行和长度不等于11的 | ||
+ | |||
+ | map:统计销售额和访问量, | ||
+ | |||
+ | keyBy(0):对统计数据以第0项(时间项)为键 进行分组 | ||
+ | |||
+ | timeWindow: 确定时间窗口 | ||
+ | |||
+ | reduce: 对同组数据进行归约、合并 | ||
flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060 | flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060 | ||
第138行: | 第148行: | ||
我们还可以将flink程序 放到集群中去运行,这就需要将flink打包。 | 我们还可以将flink程序 放到集群中去运行,这就需要将flink打包。 | ||
+ | |||
+ | [[文件:bd20-6-25.png|600px]] | ||
+ | |||
+ | [[文件:bd20-6-26.png|600px]] |
2020年8月16日 (日) 04:07的最新版本
目录
编写flink程序
flinkclass
实现flink数据处理
(1)在flinkclass里面创建一个object对象
Flink实时处理编程过程
package flinkclass import java.util.Properties import mysqlsink.SaleSQLSink import objectclass.StringAndDouble import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema object SaleVolumn { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //配置Kafka参数 val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.128.130:9092") pro.setProperty("group.id","test") //创建数据源 val stream = env.addSource(new FlinkKafkaConsumer[String]("shop",new SimpleStringSchema(),pro)) //数据处理过程 val data = stream.map(x=>x.split(",")).filter(x=>x.nonEmpty && x.length==11) .map{x=>if(x(7).contains("buy")) (x(x.length-1),(x(5).toDouble,1.0)) else (x(x.length-1),(0.0,1.0))}.keyBy(0).timeWindow(Time.minutes(1)).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))) //保存每日销售额到MYSQL表 data.map(x=>new StringAndDouble(x._1,x._2._1)).addSink(new SaleSQLSink("salevolume")) //保存每日点击流到mysql表 data.map(x=>new StringAndDouble(x._1,x._2._2)).addSink(new SaleSQLSink("visitcount_everyday")) env.execute() } }
filter:过滤空行和长度不等于11的
map:统计销售额和访问量,
keyBy(0):对统计数据以第0项(时间项)为键 进行分组
timeWindow: 确定时间窗口
reduce: 对同组数据进行归约、合并
flink中的时间窗口:https://blog.csdn.net/zg_hover/article/details/87592060
运行flink程序
关闭flink
[root@master local]# cd /usr/local/flink-1.10.1/bin
[root@master bin]# stop-cluster.sh
Stopping taskexecutor daemon (pid: 7986) on host slave1.centos.com. Stopping taskexecutor daemon (pid: 5760) on host slave2.centos.com. Stopping taskexecutor daemon (pid: 3219) on host slave3.centos.com. Stopping standalonesession daemon (pid: 11586) on host master.centos.com.
关闭kafka
①在所有节点查看kafka的状态,如果运行则kill掉
jps
kill -9 3507
关闭zookeeper
启动zookeeper
必须先启动zookeeper 再启动kafka,不然启动无效,启动kafka里面的zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >log/zookeeper/zookeeper.log 2>1 &
启动kafka
③在所有节点启动zookeeper后在master,slave1,slave2上启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiec >/tmp/klog &
④Jps查看进程
创建kafka主题
查看主题是否创建没有则创建shop主题
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181
__consumer_offsets shop test
[root@master kafka_2.11-2.3.1]#bin/kafka-topics.sh --zookeeper 192.168.128.130:2181 --delete --topic shop
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh -create --zookeeper 192.168.128.130:2181 -replication-factor 1 --partitions 1 --topic shop
[root@master kafka_2.11-2.3.1]# bin/kafka-topics.sh --list --zookeeper 192.168.128.130:2181
__consumer_offsets shop test
启动flink集群
⑤在master启动flink集群
/usr/local/flink/start-cluster.sh
启动flume采集
在IDEA中启动flume采集方案时,把之间采集的数据删除掉, 检查模拟采集是否启动
crontab -e
查看采集的数据:
[root@master bin]# rm -rf /opt/flinkproject/
[root@master bin]# mkdir /opt/flinkproject
[root@master bin]# ls /opt/flinkproject/
2020-07-25-20-05-01.log
启动之前写好的flume采集方案
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/spool-memory-kafka.conf -n client -Dflume.root.logger=INFO,console
运行flink客户端程序-SaleVolumn
终于到了IDEA中本地运行项目SaleVolumn,无报错后,进入mysql中查看最开始定义的两个表,mysql表里面就有每日的访问流量和每日的销售额了
将flink程序打包
刚才我们是将flink在客户端(PC机)上运行的,
我们还可以将flink程序 放到集群中去运行,这就需要将flink打包。