下面是小编为大家整理的大数据培训心得(全文完整),供大家参考。
2021.5.29 和 2021.6.5 大数据师资培训心得和收获
第一阶段
flume 日志采集
Flume 主要包括三部分 source,channel,sink Flume 最主要的作用是,实时读取服务器本地磁盘的数据,将数据写到 HDFS。
Source 负责接收数据,Channel 是位于 Source 和 Sink 之间的缓冲区,Sink 不断地轮询 Channel 中的事件且批量移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Event Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
安装 配置 简单使用见以下链接:
https://blog.csdn.net/weixin_42837961/article/details/104533147
本次实验命令 flume-ng agent --conf conf --conf-file job/dataCollect.conf -name a1 -Dflume.root.logger=INFO,console 实验思路:
flume 的 source 监听一个日志文件/home/disastrous/project/access.log,将监听到的数据发送给channel管道在将数据推送给sink
sink关联的就是HDFS上的路径
注意:
1.Flume 的文件夹下面建了一个 job 文件集,里面放每次执行时的配置文件,本次执行的配置文件:
a1.sources = r1 a1.sinks = k1 a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/disastrous/project/access.log
a1.sinks.k1.type = hdfs
#指定 hdfs 目录格式:年月日 (小时:/%y-%m-%d/%H/
日小时:/%y-%m-%d/%d-%H/)
a1.sinks.k1.hdfs.path = hdfs://node1:9000/flume/webClick/logs/%y-%m-%d
#生成文件前缀 a1.sinks.k1.hdfs.filePrefix = webClick-
###
在 hdfs 上生成文件策略:三种策略,满足一个就会执行
### #以下策略:每隔 60s 或者文件大小超过 10M 的时候产生新文件 # hdfs 有多少条消息时新建文件,0 不基于消息个数 a1.sinks.k1.hdfs.rollCount=0 # hdfs 创建多长时间新建文件,0 不基于时间 a1.sinks.k1.hdfs.rollInterval=60 # hdfs 多大时新建文件,0 不基于文件大小 单位:byte
a1.sinks.k1.hdfs.rollSize=1024000000
# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件 a1.sinks.k1.hdfs.idleTimeout=6 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.useLocalTimeStamp=true
###
在 hdfs 上生成目录策略
每五分钟生成一个新目录: ### # 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,如果启用,则会影响除了%t 的其他所有时间表达式 a1.sinks.k1.hdfs.round=true # 时间上进行“舍弃”的值; a1.sinks.k1.hdfs.roundValue=24 # 时间上进行”舍弃”的单位,包含:second,minute,hour a1.sinks.k1.hdfs.roundUnit=hour
a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.keep-alive = 1000 a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
[记录报错] 1.执行后报错:
org.apache.flume.ChannelFullException: Space for commit to queue couldn"t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight 解决方法:
a1.channels.c1.capacity = 20000
//改大点 a1.channels.c1.transactionCapacity = 10000
//改大点
a1.channels.c1.keep-alive = 1000
//填上这句
可能是一下到达的数据太多了 //缓冲区大小最大多少和虚拟机配置的内存大小有关系
参考链接:https://www.cnblogs.com/zlslch/p/7253943.html
2.执行后在 HDFS 上查看文件只有 1.75kb 原因:我们刚开始的数据文件 access.log 不应该在/home/disastrous/project 下,要模仿后续到达,如果一开始就有这个文件,只会默认读取 10 条数据。
解决方法:删除/home/disastrous/project/access.log 在 flume-ng 执行后再使用 cp 命令将数据文件复制到/home/disastrous/project 下
第二阶段 MapReduce 预处理 参考 eclipse 项目 webclick
定义数据对象类:WebLogBean package webclick;
public class WebLogBean {
private Boolean valid;
private String remote_addr;
private String time_local;
private String request;
private String status;
private String body_bytes_sent;
private String http_referer;
private String from_browser;
private String province;
private String latitude;
private String longitude;
private int age;
public Boolean getValid() {
return valid;
}
public void setValid(Boolean valid) {
this.valid = valid;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getFrom_browser() {
return from_browser;
}
public void setFrom_browser(String from_browser) {
this.from_browser = from_browser;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getLatitude() {
return latitude;
}
public void setLatitude(String latitude) {
this.latitude = latitude;
}
public String getLongitude() {
return longitude;
}
public void setLongitude(String longitude) {
this.longitude = longitude;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.remote_addr);
sb.append(",").append(this.time_local);
sb.append(",").append(this.request);
sb.append(",").append(this.status);
sb.append(",").append(this.body_bytes_sent);
sb.append(",").append(this.http_referer); //
sb.append(",").append(this.from_browser);
sb.append(",").append(this.province);
sb.append(",").append(this.latitude);
sb.append(",").append(this.longitude);
sb.append(",").append(this.age);
return sb.toString();
}
}
预处理类:WebLogParser package webclick;
public class WebLogParser {
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
String[] arr = line.split(" ");
webLogBean.setValid(true);
webLogBean.setRemote_addr(arr[0]);
webLogBean.setTime_local(arr[3]+" "+arr[4]);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//
String browser = ""; //
for(int i=11;i<arr.length-4;i++) { //
browser += arr[i] + " "; //
} //
webLogBean.setFrom_browser(browser);
webLogBean.setProvince(arr[arr.length-4]);
webLogBean.setLatitude(arr[arr.length-3]);
webLogBean.setLongitude(arr[arr.length-2]);
webLogBean.setAge(Integer.parseInt(arr[arr.length-1]));
if( Integer.parseInt(webLogBean.getStatus()) >= 400) {
webLogBean.setValid(false);
}
if(webLogBean.getRemote_addr().equals("-")) {
webLogBean.setValid(false);
}
return webLogBean;
} }
Mapper 类:ClickMapper package webclick;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class ClickMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text newkey = new Text();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
WebLogBean webLogBean = WebLogParser.parser(line);
if(!webLogBean.getValid()) {
return ;
}
System.out.println(webLogBean.toString());
newkey.set(webLogBean.toString());
context.write(newkey, NullWritable.get());
} }
主类:test package webclick;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class test {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.100.3:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(test.class);
job.setMapperClass(ClickMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//
FileInputFormat.setInputPaths(job, new Path("C://Users//Administrator//Desktop//access.log"));
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.100.3:9000//flume/webClick/logs/21-06-05/*"));
Path outpath = new Path("hdfs://192.168.100.3:9000//flume/webClick/result"); //
如果有之前输出文件 result,就删除
try(FileSystem filesystem = FileSystem.get(conf)){
if(filesystem.exists(outpath)) {
filesystem.delete(outpath,true);
}
}
FileOutputFormat.setOutputPath(job, outpath);
System.exit(job.waitForCompletion(true)?0:1);
}
}
第三阶段:hive sqoop [错误记录] linux 中用 sqoop 从 hdfs 导出数据到 mysql,汉字显示成问号 解决方法:
vi /etc/my.cnf
加入下面两条 character-set-server=utf8 init_connect="SET NAMES utf8" service mysql restart 参考博客:https://blog.csdn.net/weixin_45102492/article/details/90720206 [心得]命令参考
如果多次插入一张表,那么在写--export-dir 下面的路径时应该是 area_pvs/* 多次插入在文件里报错的就是这个样子: