“数据采集与缓存”的版本间的差异
来自CloudWiki
第28行: | 第28行: | ||
在Linux虚拟机中设置定时任务运行脚本,定时生产数据到文件 | 在Linux虚拟机中设置定时任务运行脚本,定时生产数据到文件 | ||
− | * * * * * python /opt/data.py /opt/flinkproject/$(date +"\%Y-\%m-\%d-\%H-\%M-\%S").log 1000 | + | crontab -e |
+ | |||
+ | * * * * * python /opt/data.py /opt/flinkproject/$(date +"\%Y-\%m-\%d-\%H-\%M-\%S").log 1000 | ||
+ | |||
+ | data.py(python2): | ||
+ | |||
+ | <nowiki># -*- coding:UTF-8 -*- | ||
+ | import random | ||
+ | import string | ||
+ | import sys | ||
+ | import time | ||
+ | |||
+ | # 大小写字母 | ||
+ | alphabet_upper_list = string.ascii_uppercase | ||
+ | alphabet_lower_list = string.ascii_lowercase | ||
+ | |||
+ | |||
+ | # 随机生成指定位数的字符串 | ||
+ | def get_random(instr, length): | ||
+ | # 从指定序列中随机获取指定长度的片段并组成数组,例如:['a', 't', 'f', 'v', 'y'] | ||
+ | res = random.sample(instr, length) | ||
+ | # 将数组内的元素组成字符串 | ||
+ | result = ''.join(res) | ||
+ | return result | ||
+ | |||
+ | # 放置生成的并且不存在的rowkey | ||
+ | rowkey_tmp_list = [] | ||
+ | # 制作rowkey | ||
+ | def get_random_rowkey(): | ||
+ | import time | ||
+ | pre_rowkey = "" | ||
+ | while True: | ||
+ | # 获取00~99的两位数字,包含00与99 | ||
+ | num = random.randint(00, 99) | ||
+ | # 获取当前10位的时间戳 | ||
+ | timestamp = int(time.time()) | ||
+ | # str(num).zfill(2)为字符串不满足2位,自动将该字符串补0 | ||
+ | pre_rowkey = str(num).zfill(2) + str(timestamp) | ||
+ | if pre_rowkey not in rowkey_tmp_list: | ||
+ | rowkey_tmp_list.append(pre_rowkey) | ||
+ | break | ||
+ | return pre_rowkey | ||
+ | |||
+ | # 创建用户名 | ||
+ | def get_random_name(length): | ||
+ | name = string.capwords(get_random(alphabet_lower_list, length)) | ||
+ | return name | ||
+ | |||
+ | |||
+ | # 获取年龄 | ||
+ | def get_random_age(): | ||
+ | return str(random.randint(18, 60)) | ||
+ | |||
+ | |||
+ | # 获取性别 | ||
+ | def get_random_sex(): | ||
+ | return random.choice(["woman", "man"]) | ||
+ | |||
+ | |||
+ | # 获取商品ID | ||
+ | def get_random_goods_no(): | ||
+ | goods_no_list = ["220902", "430031", "550012", "650012", "532120","230121","250983", "480071", "580016", "950013", "152121","230121"] | ||
+ | return random.choice(goods_no_list) | ||
+ | |||
+ | # 获取商品价格(浮点型) | ||
+ | def get_random_goods_price(): | ||
+ | # 随机生成商品价格的整数位,1~999的三位数字,包含1与999 | ||
+ | price_int = random.randint(1, 999) | ||
+ | # 随机生成商品价格的小数位,1~99的两位数字,包含1与99 | ||
+ | price_decimal = random.randint(1, 99) | ||
+ | goods_price = str(price_int) +"." + str(price_decimal) | ||
+ | return goods_price | ||
+ | |||
+ | # 获取门店ID | ||
+ | def get_random_store_id(): | ||
+ | store_id_list = ["313012", "313013", "313014", "313015", "313016","313017","313018", "313019", "313020", "313021", "313022","313023"] | ||
+ | return random.choice(store_id_list) | ||
+ | |||
+ | # 获取购物行为类型 | ||
+ | def get_random_goods_type(): | ||
+ | goods_type_list = ["pv", "buy", "cart", "fav","scan"]#点击、购买、加购、收藏、浏览 | ||
+ | return random.choice(goods_type_list) | ||
+ | |||
+ | # 获取电话号码 | ||
+ | def get_random_tel(): | ||
+ | pre_list = ["130", "131", "132", "133", "134", "135", "136", "137", "138", "139", "147", "150", | ||
+ | "151", "152", "153", "155", "156", "157", "158", "159", "186", "187", "188"] | ||
+ | return random.choice(pre_list) + ''.join(random.sample('0123456789', 8)) | ||
+ | |||
+ | |||
+ | # 获取邮箱名 | ||
+ | def get_random_email(length): | ||
+ | alphabet_list = alphabet_lower_list + alphabet_upper_list | ||
+ | email_list = ["163.com", "126.com", "qq.com", "gmail.com","huawei.com"] | ||
+ | return get_random(alphabet_list, length) + "@" + random.choice(email_list) | ||
+ | |||
+ | |||
+ | # 获取商品购买日期(统计最近7天数据) | ||
+ | def get_random_buy_time(): | ||
+ | buy_time_list = ["2019-08-01", "2019-08-02", "2019-08-03", "2019-08-04", "2019-08-05", "2019-08-06", "2019-08-07"] | ||
+ | return random.choice(buy_time_list) | ||
+ | |||
+ | # 生成一条数据 | ||
+ | def get_random_record(): | ||
+ | return get_random_rowkey() + "," + get_random_name( | ||
+ | 5) + "," + get_random_age() + "," + get_random_sex() + "," + get_random_goods_no() + ","+get_random_goods_price()+ "," +get_random_store_id()+ "," +get_random_goods_type() + ","+ get_random_tel() + "," + get_random_email( | ||
+ | 10) + "," +get_random_buy_time() | ||
+ | |||
+ | # 获取随机整数用于休眠 | ||
+ | def get_random_sleep_time(): | ||
+ | return random.randint(5, 10) | ||
+ | |||
+ | # 将记录写到文本中 | ||
+ | def write_record_to_file(): | ||
+ | # 覆盖文件内容,重新写入 | ||
+ | f = open(sys.argv[1], 'w') | ||
+ | i = 0 | ||
+ | while i < int(sys.argv[2]): | ||
+ | record = get_random_record() | ||
+ | f.write(record) | ||
+ | # 换行写入 | ||
+ | f.write('\n') | ||
+ | i += 1 | ||
+ | f.close() | ||
+ | |||
+ | if __name__ == "__main__": | ||
+ | write_record_to_file() | ||
+ | |||
+ | |||
+ | |||
+ | </nowiki> |
2020年7月21日 (二) 07:08的版本
虚拟机环境
master 192.168.128.130 2G、1核 slave1 192.168.128.131 1.5G 、1核 slave2 192.168.128.132 1.5G 、1核 slave3 192.168.128.133 1.5G 、1核 已配置无密码登录、jdk、时间同步服务
集群环境
已安装Hadoop、Flume、Kafka、Flink、MYSQL、Zookeeper Hadoop:ResourceManager(master)、NameNode(master)、SecondaryNameNode(master)、DataNode(slave1-slave3)、NodeManager(slave1-slave3) Flume:master Kafka:broker0(master)、broker1(slave1)、broker2(slave2) Zookeeper:slave1、slave2、slave3 MYSQL:master Flink:JobManager(master)、TaskManager(slave1-slave3)
开发环境
本地安装IDEA
本地安装JDK
模拟数据产生
通过python实现一个模拟数据产生的脚本
在Linux虚拟机中设置定时任务运行脚本,定时生产数据到文件
crontab -e
* * * * * python /opt/data.py /opt/flinkproject/$(date +"\%Y-\%m-\%d-\%H-\%M-\%S").log 1000
data.py(python2):
# -*- coding:UTF-8 -*- import random import string import sys import time # 大小写字母 alphabet_upper_list = string.ascii_uppercase alphabet_lower_list = string.ascii_lowercase # 随机生成指定位数的字符串 def get_random(instr, length): # 从指定序列中随机获取指定长度的片段并组成数组,例如:['a', 't', 'f', 'v', 'y'] res = random.sample(instr, length) # 将数组内的元素组成字符串 result = ''.join(res) return result # 放置生成的并且不存在的rowkey rowkey_tmp_list = [] # 制作rowkey def get_random_rowkey(): import time pre_rowkey = "" while True: # 获取00~99的两位数字,包含00与99 num = random.randint(00, 99) # 获取当前10位的时间戳 timestamp = int(time.time()) # str(num).zfill(2)为字符串不满足2位,自动将该字符串补0 pre_rowkey = str(num).zfill(2) + str(timestamp) if pre_rowkey not in rowkey_tmp_list: rowkey_tmp_list.append(pre_rowkey) break return pre_rowkey # 创建用户名 def get_random_name(length): name = string.capwords(get_random(alphabet_lower_list, length)) return name # 获取年龄 def get_random_age(): return str(random.randint(18, 60)) # 获取性别 def get_random_sex(): return random.choice(["woman", "man"]) # 获取商品ID def get_random_goods_no(): goods_no_list = ["220902", "430031", "550012", "650012", "532120","230121","250983", "480071", "580016", "950013", "152121","230121"] return random.choice(goods_no_list) # 获取商品价格(浮点型) def get_random_goods_price(): # 随机生成商品价格的整数位,1~999的三位数字,包含1与999 price_int = random.randint(1, 999) # 随机生成商品价格的小数位,1~99的两位数字,包含1与99 price_decimal = random.randint(1, 99) goods_price = str(price_int) +"." + str(price_decimal) return goods_price # 获取门店ID def get_random_store_id(): store_id_list = ["313012", "313013", "313014", "313015", "313016","313017","313018", "313019", "313020", "313021", "313022","313023"] return random.choice(store_id_list) # 获取购物行为类型 def get_random_goods_type(): goods_type_list = ["pv", "buy", "cart", "fav","scan"]#点击、购买、加购、收藏、浏览 return random.choice(goods_type_list) # 获取电话号码 def get_random_tel(): pre_list = ["130", "131", "132", "133", "134", "135", "136", "137", "138", "139", "147", "150", "151", "152", "153", "155", "156", "157", "158", "159", "186", "187", "188"] return random.choice(pre_list) + ''.join(random.sample('0123456789', 8)) # 获取邮箱名 def get_random_email(length): alphabet_list = alphabet_lower_list + alphabet_upper_list email_list = ["163.com", "126.com", "qq.com", "gmail.com","huawei.com"] return get_random(alphabet_list, length) + "@" + random.choice(email_list) # 获取商品购买日期(统计最近7天数据) def get_random_buy_time(): buy_time_list = ["2019-08-01", "2019-08-02", "2019-08-03", "2019-08-04", "2019-08-05", "2019-08-06", "2019-08-07"] return random.choice(buy_time_list) # 生成一条数据 def get_random_record(): return get_random_rowkey() + "," + get_random_name( 5) + "," + get_random_age() + "," + get_random_sex() + "," + get_random_goods_no() + ","+get_random_goods_price()+ "," +get_random_store_id()+ "," +get_random_goods_type() + ","+ get_random_tel() + "," + get_random_email( 10) + "," +get_random_buy_time() # 获取随机整数用于休眠 def get_random_sleep_time(): return random.randint(5, 10) # 将记录写到文本中 def write_record_to_file(): # 覆盖文件内容,重新写入 f = open(sys.argv[1], 'w') i = 0 while i < int(sys.argv[2]): record = get_random_record() f.write(record) # 换行写入 f.write('\n') i += 1 f.close() if __name__ == "__main__": write_record_to_file()