当前位置:首页 > 心得体会 >

大数据培训心得

时间:2022-08-02 08:40:05 来源:网友投稿

下面是小编为大家整理的大数据培训心得(全文完整),供大家参考。

大数据培训心得(全文完整)

 

 2021.5.29 和 2021.6.5 大数据师资培训心得和收获

 第一阶段

 flume 日志采集

 Flume 主要包括三部分 source,channel,sink Flume 最主要的作用是,实时读取服务器本地磁盘的数据,将数据写到 HDFS。

 Source 负责接收数据,Channel 是位于 Source 和 Sink 之间的缓冲区,Sink 不断地轮询 Channel 中的事件且批量移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

 Event Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。

 安装 配置 简单使用见以下链接:

 https://blog.csdn.net/weixin_42837961/article/details/104533147

 本次实验命令 flume-ng agent --conf conf --conf-file job/dataCollect.conf -name a1 -Dflume.root.logger=INFO,console 实验思路:

 flume 的 source 监听一个日志文件/home/disastrous/project/access.log,将监听到的数据发送给channel管道在将数据推送给sink

 sink关联的就是HDFS上的路径

 注意:

 1.Flume 的文件夹下面建了一个 job 文件集,里面放每次执行时的配置文件,本次执行的配置文件:

 a1.sources = r1 a1.sinks = k1 a1.channels = c1

 a1.sources.r1.type = exec

  a1.sources.r1.command = tail -F /home/disastrous/project/access.log

 a1.sinks.k1.type = hdfs

  #指定 hdfs 目录格式:年月日 (小时:/%y-%m-%d/%H/

 日小时:/%y-%m-%d/%d-%H/)

 a1.sinks.k1.hdfs.path = hdfs://node1:9000/flume/webClick/logs/%y-%m-%d

 #生成文件前缀 a1.sinks.k1.hdfs.filePrefix = webClick-

  ###

 在 hdfs 上生成文件策略:三种策略,满足一个就会执行

 ### #以下策略:每隔 60s 或者文件大小超过 10M 的时候产生新文件 # hdfs 有多少条消息时新建文件,0 不基于消息个数 a1.sinks.k1.hdfs.rollCount=0 # hdfs 创建多长时间新建文件,0 不基于时间 a1.sinks.k1.hdfs.rollInterval=60 # hdfs 多大时新建文件,0 不基于文件大小 单位:byte

 a1.sinks.k1.hdfs.rollSize=1024000000

  # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件 a1.sinks.k1.hdfs.idleTimeout=6 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.useLocalTimeStamp=true

 ###

 在 hdfs 上生成目录策略

  每五分钟生成一个新目录: ### # 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,如果启用,则会影响除了%t 的其他所有时间表达式 a1.sinks.k1.hdfs.round=true # 时间上进行“舍弃”的值; a1.sinks.k1.hdfs.roundValue=24 # 时间上进行”舍弃”的单位,包含:second,minute,hour a1.sinks.k1.hdfs.roundUnit=hour

 a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.keep-alive = 1000 a1.channels.c1.transactionCapacity = 10000

  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

 [记录报错] 1.执行后报错:

 org.apache.flume.ChannelFullException: Space for commit to queue couldn"t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight 解决方法:

 a1.channels.c1.capacity = 20000

  //改大点 a1.channels.c1.transactionCapacity = 10000

 //改大点

 a1.channels.c1.keep-alive = 1000

  //填上这句

 可能是一下到达的数据太多了 //缓冲区大小最大多少和虚拟机配置的内存大小有关系

 参考链接:https://www.cnblogs.com/zlslch/p/7253943.html

 2.执行后在 HDFS 上查看文件只有 1.75kb 原因:我们刚开始的数据文件 access.log 不应该在/home/disastrous/project 下,要模仿后续到达,如果一开始就有这个文件,只会默认读取 10 条数据。

 解决方法:删除/home/disastrous/project/access.log 在 flume-ng 执行后再使用 cp 命令将数据文件复制到/home/disastrous/project 下

 第二阶段 MapReduce 预处理 参考 eclipse 项目 webclick

 定义数据对象类:WebLogBean package webclick;

 public class WebLogBean {

 private Boolean valid;

 private String remote_addr;

 private String time_local;

 private String request;

 private String status;

 private String body_bytes_sent;

 private String http_referer;

 private String from_browser;

 private String province;

 private String latitude;

 private String longitude;

 private int age;

  public Boolean getValid() {

  return valid;

 }

 public void setValid(Boolean valid) {

  this.valid = valid;

 }

 public String getRemote_addr() {

  return remote_addr;

 }

 public void setRemote_addr(String remote_addr) {

  this.remote_addr = remote_addr;

 }

 public String getTime_local() {

  return time_local;

 }

 public void setTime_local(String time_local) {

  this.time_local = time_local;

 }

 public String getRequest() {

  return request;

 }

 public void setRequest(String request) {

  this.request = request;

 }

 public String getStatus() {

  return status;

 }

 public void setStatus(String status) {

  this.status = status;

 }

 public String getBody_bytes_sent() {

  return body_bytes_sent;

 }

 public void setBody_bytes_sent(String body_bytes_sent) {

  this.body_bytes_sent = body_bytes_sent;

 }

 public String getHttp_referer() {

  return http_referer;

 }

 public void setHttp_referer(String http_referer) {

  this.http_referer = http_referer;

 }

 public String getFrom_browser() {

  return from_browser;

  }

 public void setFrom_browser(String from_browser) {

  this.from_browser = from_browser;

 }

 public String getProvince() {

  return province;

 }

 public void setProvince(String province) {

  this.province = province;

 }

 public String getLatitude() {

  return latitude;

 }

 public void setLatitude(String latitude) {

  this.latitude = latitude;

 }

 public String getLongitude() {

  return longitude;

 }

 public void setLongitude(String longitude) {

  this.longitude = longitude;

 }

 public int getAge() {

  return age;

 }

 public void setAge(int age) {

  this.age = age;

 }

 public String toString() {

  StringBuilder sb = new StringBuilder();

  sb.append(this.remote_addr);

  sb.append(",").append(this.time_local);

  sb.append(",").append(this.request);

  sb.append(",").append(this.status);

  sb.append(",").append(this.body_bytes_sent);

  sb.append(",").append(this.http_referer); //

 sb.append(",").append(this.from_browser);

  sb.append(",").append(this.province);

  sb.append(",").append(this.latitude);

  sb.append(",").append(this.longitude);

  sb.append(",").append(this.age);

  return sb.toString();

 }

 }

 预处理类:WebLogParser package webclick;

 public class WebLogParser {

  public static WebLogBean parser(String line) {

 WebLogBean webLogBean = new WebLogBean();

 String[] arr = line.split(" ");

  webLogBean.setValid(true);

  webLogBean.setRemote_addr(arr[0]);

  webLogBean.setTime_local(arr[3]+" "+arr[4]);

  webLogBean.setRequest(arr[6]);

  webLogBean.setStatus(arr[8]);

  webLogBean.setBody_bytes_sent(arr[9]);

  webLogBean.setHttp_referer(arr[10]);

 //

 String browser = ""; //

 for(int i=11;i<arr.length-4;i++) { //

  browser += arr[i] + " "; //

 } //

 webLogBean.setFrom_browser(browser);

  webLogBean.setProvince(arr[arr.length-4]);

  webLogBean.setLatitude(arr[arr.length-3]);

  webLogBean.setLongitude(arr[arr.length-2]);

  webLogBean.setAge(Integer.parseInt(arr[arr.length-1]));

 if( Integer.parseInt(webLogBean.getStatus()) >= 400) {

 webLogBean.setValid(false);

  }

 if(webLogBean.getRemote_addr().equals("-")) {

 webLogBean.setValid(false);

  }

 return webLogBean;

 } }

 Mapper 类:ClickMapper package webclick;

 import java.io.IOException;

 import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

 public class ClickMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

 Text newkey = new Text();

 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)

 throws IOException, InterruptedException {

  String line = value.toString();

  WebLogBean webLogBean = WebLogParser.parser(line);

  if(!webLogBean.getValid()) {

 return ;

  }

  System.out.println(webLogBean.toString());

  newkey.set(webLogBean.toString());

  context.write(newkey, NullWritable.get());

 } }

 主类:test package webclick;

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 public class test {

 public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

 conf.set("fs.defaultFS", "hdfs://192.168.100.3:9000");

 Job job = Job.getInstance(conf);

  job.setJarByClass(test.class);

  job.setMapperClass(ClickMapper.class);

  job.setOutputKeyClass(Text.class);

 job.setOutputValueClass(NullWritable.class);

 //

  FileInputFormat.setInputPaths(job, new Path("C://Users//Administrator//Desktop//access.log"));

 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.100.3:9000//flume/webClick/logs/21-06-05/*"));

 Path outpath = new Path("hdfs://192.168.100.3:9000//flume/webClick/result"); //

  如果有之前输出文件 result,就删除

 try(FileSystem filesystem = FileSystem.get(conf)){

  if(filesystem.exists(outpath)) {

 filesystem.delete(outpath,true);

  }

 }

 FileOutputFormat.setOutputPath(job, outpath);

 System.exit(job.waitForCompletion(true)?0:1);

  }

 }

 第三阶段:hive sqoop [错误记录] linux 中用 sqoop 从 hdfs 导出数据到 mysql,汉字显示成问号 解决方法:

 vi /etc/my.cnf

 加入下面两条 character-set-server=utf8 init_connect="SET NAMES utf8" service mysql restart 参考博客:https://blog.csdn.net/weixin_45102492/article/details/90720206 [心得]命令参考

 如果多次插入一张表,那么在写--export-dir 下面的路径时应该是 area_pvs/* 多次插入在文件里报错的就是这个样子:

推荐访问:大数据培训心得 完整 心得 培训