数据源格式描述:
输入t1.txt源数据,数据文件分隔符”*&*”,字段说明如下:
字段序号 | 字段英文名称 | 字段中文名称 | 字段类型 | 字段长度 |
1 | TIME_ID | 时间(到时) | 字符型 | 12 |
2 | Session | 会话时长 | 数值型 | 8 |
3 | MSISDN | 用户号码 | 字符型 | 11 |
4 | SP_DOMAIN | SP域名 | 数值型 | 64 |
5 | USER_AGENT_ORIGN | 终端字串 | 字符型 | 128 |
6 | USER_AGENT | 终端类别 | 字符型 | 64 |
7 | UPSTREAM_VOL | 上行流量 | 数值型 | 8 |
8 | DOWNSTREAM_VOL | 下行流量 | 数值型 | 8 |
9 | URL_CNT | 访问次数 | 数值型 | 20 |
用mapreduce实现单表汇总:
在数据源的基础上,根据终端类型汇总出总流量及访问次数。汇总模型字段说明如下:
字段序号 | 字段英文名称 | 字段中文名称 | 字段类型 | 字段长度 |
1 | USER_AGENT | 终端类型 | 字符型 | |
2 | TOT_FLUX | 总流量 | 数值型 | 30 |
3 | URL_CNT | 访问次数 | 数值型 | 30 |
代码如下:
package mianshi;
import java.io.DataInput;
import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import com.google.protobuf.TextFormat;
public class Test1 {
/**
* @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws Exception { //创建配置文件 Configuration conf=new Configuration(); //创建job Job job = new Job(conf,Test1.class.getName()); //设置jar包运行 job.setJarByClass(Test1.class); //设置输入路径 FileInputFormat.setInputPaths(job, new Path(args[0])); //设置输入格式 job.setInputFormatClass(TextInputFormat.class); //设置自定义Mapper job.setMapperClass(MyMapper.class); //设置Map输出的Value类型,也就是V2 job.setMapOutputValueClass(Model.class); //设置Map输出的Key类型,也就是K2 job.setMapOutputKeyClass(Text.class); //设置分区类型 job.setPartitionerClass(HashPartitioner.class); //设置Rudece任务数 job.setNumReduceTasks(1); //设置自定义Reduce类 job.setReducerClass(MyReducer.class); //设置输出K3的类型 job.setOutputKeyClass(Text.class); //设置输出的V3类型 job.setOutputValueClass(Model.class); //设置输出的格式 job.setOutputFormatClass(TextOutputFormat.class); //指定输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job job.waitForCompletion(true);}
static class MyMapper extends Mapper<LongWritable, Text, Text, Model>{ @Override protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException { /** * 切割字符串有点意思! * “*”是特殊字符,需要用[] * "&"需要用\\转义 * * */ String[] split = v1.toString().split("[*]\\&[*]"); Text user_agent = new Text(split[5]); Long tot_flux = new Long(split[6])+new Long(split[7]); Long url_cnt = new Long(split[8]); Model v2 = new Model(tot_flux, url_cnt); context.write(user_agent, v2); } } static class MyReducer extends Reducer<Text, Model, Text, Model>{ @Override protected void reduce(Text k2, Iterable<Model> v2s,Context context) throws IOException, InterruptedException { //定义计数器 long sum_flux =0L; long sum_url = 0L; for(Model model : v2s){ sum_flux += model.tot_flux; sum_url += model.url_cnt; } Model v3 = new Model(sum_flux,sum_url); context.write(k2, v3); } }}
/**
* 自定义类型必须实现Writable * @author Sky * */class Model implements Writable{ long tot_flux; long url_cnt; public Model(){} public Model(Long tot_flux,Long url_cnt){ this.tot_flux = tot_flux; this.url_cnt = url_cnt; }public void write(DataOutput out) throws IOException {
//序列化出去 out.writeLong(tot_flux); out.writeLong(url_cnt); }public void readFields(DataInput in) throws IOException {
//和序列化出去的一样 this.tot_flux = in.readLong(); this.url_cnt = in.readLong(); } //必须覆写toString方法,否则输出的值是内存值 @Override public String toString() { return tot_flux+"\t"+url_cnt; } }
文章参考论坛: