数据采集与缓存

来自CloudWiki
跳转至: 导航搜索

虚拟机环境

master 192.168.128.130            2G、1核
slave1 192.168.128.131	1.5G 、1核
slave2 192.168.128.132	 1.5G 、1核
slave3 192.168.128.133	 1.5G 、1核

已配置无密码登录、jdk、时间同步服务

集群环境

已安装Hadoop、Flume、Kafka、Flink、MYSQL、Zookeeper
Hadoop:ResourceManager(master)、NameNode(master)、SecondaryNameNode(master)、DataNode(slave1-slave3)、NodeManager(slave1-slave3)
Flume:master
Kafka:broker0(master)、broker1(slave1)、broker2(slave2)
Zookeeper:slave1、slave2、slave3
MYSQL:master
Flink:JobManager(master)、TaskManager(slave1-slave3)

开发环境

本地安装IDEA

本地安装JDK

模拟数据产生

通过python实现一个模拟数据产生的脚本

在Linux虚拟机中设置定时任务运行脚本,定时生产数据到文件

crontab -e

 * * * * * python /opt/data.py /opt/flinkproject/$(date +"\%Y-\%m-\%d-\%H-\%M-\%S").log 1000

data.py(python2):

# -*- coding:UTF-8 -*-
import random
import string
import sys
import time

# 大小写字母
alphabet_upper_list = string.ascii_uppercase
alphabet_lower_list = string.ascii_lowercase


# 随机生成指定位数的字符串
def get_random(instr, length):
    # 从指定序列中随机获取指定长度的片段并组成数组,例如:['a', 't', 'f', 'v', 'y']
    res = random.sample(instr, length)
    # 将数组内的元素组成字符串
    result = ''.join(res)
    return result

# 放置生成的并且不存在的rowkey
rowkey_tmp_list = []
# 制作rowkey
def get_random_rowkey():
    import time
    pre_rowkey = ""
    while True:
        # 获取00~99的两位数字,包含00与99
        num = random.randint(00, 99)
        # 获取当前10位的时间戳
        timestamp = int(time.time())
        # str(num).zfill(2)为字符串不满足2位,自动将该字符串补0
        pre_rowkey = str(num).zfill(2) + str(timestamp)
        if pre_rowkey not in rowkey_tmp_list:
            rowkey_tmp_list.append(pre_rowkey)
            break
    return pre_rowkey

# 创建用户名
def get_random_name(length):
    name = string.capwords(get_random(alphabet_lower_list, length))
    return name


# 获取年龄
def get_random_age():
    return str(random.randint(18, 60))


# 获取性别
def get_random_sex():
    return random.choice(["woman", "man"])


# 获取商品ID
def get_random_goods_no():
    goods_no_list = ["220902", "430031", "550012", "650012", "532120","230121","250983", "480071", "580016", "950013", "152121","230121"]
    return random.choice(goods_no_list)

# 获取商品价格(浮点型)
def get_random_goods_price():
	# 随机生成商品价格的整数位,1~999的三位数字,包含1与999
	price_int = random.randint(1, 999)
	# 随机生成商品价格的小数位,1~99的两位数字,包含1与99
	price_decimal = random.randint(1, 99)
	goods_price = str(price_int) +"." + str(price_decimal)
	return goods_price

# 获取门店ID
def get_random_store_id():
    store_id_list = ["313012", "313013", "313014", "313015", "313016","313017","313018", "313019", "313020", "313021", "313022","313023"]
    return random.choice(store_id_list)

# 获取购物行为类型
def get_random_goods_type():
    goods_type_list = ["pv", "buy", "cart", "fav","scan"]#点击、购买、加购、收藏、浏览
    return random.choice(goods_type_list)

# 获取电话号码
def get_random_tel():
    pre_list = ["130", "131", "132", "133", "134", "135", "136", "137", "138", "139", "147", "150",
                "151", "152", "153", "155", "156", "157", "158", "159", "186", "187", "188"]
    return random.choice(pre_list) + ''.join(random.sample('0123456789', 8))


# 获取邮箱名
def get_random_email(length):
    alphabet_list = alphabet_lower_list + alphabet_upper_list
    email_list = ["163.com", "126.com", "qq.com", "gmail.com","huawei.com"]
    return get_random(alphabet_list, length) + "@" + random.choice(email_list)


# 获取商品购买日期(统计最近7天数据)
def get_random_buy_time():
    buy_time_list = ["2019-08-01", "2019-08-02", "2019-08-03", "2019-08-04", "2019-08-05", "2019-08-06", "2019-08-07"]
    return random.choice(buy_time_list)

# 生成一条数据
def get_random_record():
    return get_random_rowkey() + "," + get_random_name(
        5) + "," + get_random_age() + "," + get_random_sex() + "," + get_random_goods_no() + ","+get_random_goods_price()+ "," +get_random_store_id()+ "," +get_random_goods_type() + ","+ get_random_tel() + "," + get_random_email(
        10) + "," +get_random_buy_time()

# 获取随机整数用于休眠
def get_random_sleep_time():
    return random.randint(5, 10)

# 将记录写到文本中
def write_record_to_file():
    # 覆盖文件内容,重新写入
    f = open(sys.argv[1], 'w')
    i = 0
    while i < int(sys.argv[2]):
        record = get_random_record()
        f.write(record)
        # 换行写入
        f.write('\n')
        i += 1
    f.close()

if __name__ == "__main__":
    write_record_to_file()




数据结果

121595340842,Bsiae,29,woman,230121,294.93,313017,scan,18838094567,fLNBbuYinv@126.com,2019-08-02
611595340842,Mtepc,41,woman,650012,790.62,313012,buy,15754092678,ZfWJCcMIyi@qq.com,2019-08-06
241595340842,Qokhn,18,man,430031,967.3,313023,scan,18665021834,zwCBInbHJT@huawei.com,2019-08-04
671595340842,Yjhzt,51,woman,250983,687.42,313023,cart,15520398617,knLOAFuswg@gmail.com,2019-08-07
161595340842,Lzaeu,54,woman,580016,438.8,313013,pv,13685410963,JkAPyizDgG@126.com,2019-08-04
821595340842,Gxnpr,34,man,152121,93.45,313013,buy,15798105326,AkvqwYQTJp@126.com,2019-08-06
541595340842,Wplbk,22,man,650012,909.8,313014,fav,18775360194,gqDXIMckLO@huawei.com,2019-08-06
581595340842,Oqcpl,44,man,430031,893.24,313014,buy,14785761329,OfrDzvcAUF@huawei.com,2019-08-03
441595340842,Irzbf,46,man,230121,801.86,313016,cart,13239810547,iEMTgnGNYm@qq.com,2019-08-03