本文共 5063 字,大约阅读时间需要 16 分钟。
统计手机流量,把手机号输出到一个文件,非手机号输出到另一个文件
文件信息
public class TelphoneName implements Writable{ private String phoneNo; private int upPackNo; private int downPackNo; private int upPayLoad; private int downPayLoad; public String getPhoneNo() { return phoneNo; } public void setPhoneNo(String phoneNo) { this.phoneNo = phoneNo; } public int getUpPackNo() { return upPackNo; } public void setUpPackNo(int upPackNo) { this.upPackNo = upPackNo; } public int getDownPackNo() { return downPackNo; } public void setDownPackNo(int downPackNo) { this.downPackNo = downPackNo; } public int getUpPayLoad() { return upPayLoad; } public void setUpPayLoad(int upPayLoad) { this.upPayLoad = upPayLoad; } public int getDownPayLoad() { return downPayLoad; } public void setDownPayLoad(int downPayLoad) { this.downPayLoad = downPayLoad; } public TelphoneName(){ } public TelphoneName(String phoneNo,int upPackNo,int downPackNo,int upPayLoad,int downPayLoad){ this.phoneNo = phoneNo; this.upPackNo = upPackNo; this.downPackNo = downPackNo; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; } public TelphoneName(String[] split){ this(split[1], Integer.parseInt(split[6]), Integer.parseInt(split[7]),Integer.parseInt(split[8]),Integer.parseInt(split[9])); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNo); out.writeInt(upPackNo); out.writeInt(downPackNo); out.writeInt(upPayLoad); out.writeInt(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { phoneNo=in.readUTF(); upPackNo=in.readInt(); downPackNo=in.readInt(); upPayLoad=in.readInt(); downPayLoad=in.readInt(); } @Override public String toString() { return upPackNo+"\t"+downPackNo+"\t"+upPayLoad+"\t"+downPayLoad; }}
public class TrafficCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\\s"); context.write(new Text(split[1]),new TelphoneName(split)); }}
public class TrafficCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int upPackNo =0; int downPackNo =0; int upPayLoad =0; int downPayLoad =0; for(TelphoneName telphoneName:values){ upPackNo += telphoneName.getUpPackNo(); downPackNo += telphoneName.getDownPackNo(); upPayLoad += telphoneName.getUpPayLoad(); downPayLoad += telphoneName.getDownPayLoad(); } TelphoneName telphoneName = new TelphoneName(key.toString(),upPackNo,downPackNo,upPayLoad,downPayLoad); context.write(key,telphoneName); }}
//分区class MyPartitioner extends Partitioner{ @Override public int getPartition(Text key, TelphoneName telphoneName, int i) { String keyString = key.toString(); return (keyString.length()==11)?0:1; }}public class Reducer { public static void main(String[] args) throws Exception { //实现一个job的实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //设置jar加载路径 job.setJarByClass(Reducer.class); //设置mapper 和reducer类路径 job.setMapperClass(TrafficCountMapper.class); job.setReducerClass(TrafficCountReducer.class); //设置mapper 输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TelphoneName.class); //设置reducer输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(TelphoneName.class); //map阶段执行分区 job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); //设置输入输出 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交 job.waitForCompletion(true); }}
org.apache.maven.plugins maven-assembly-plugin 2.5.5 jar-with-dependencies hadoop.mapreduce.serialize1.Reducer McIDE make-assembly package single
转载地址:http://eiazi.baihongyu.com/