博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop之MultipleOutputs
阅读量:4112 次
发布时间:2019-05-25

本文共 4432 字,大约阅读时间需要 14 分钟。

背景:

根据业务输出有规则的业务数据,比如都在/abc/a/下他们根据业务不同,其文件名称也不同

/abc/a/good-001

/abc/a/bad-001

那么下个job可以基于文件名做相应的业务操作

hadoop版本信息:

[ ~]$ hadoop versionHadoop 0.20.2-cdh3u4Subversion git://ubuntu-slave01/var/lib/jenkins/workspace/CDH3u4-Full-RC/build/cdh3/hadoop20/0.20.2-cdh3u4/source -r 214dd731e3bdb687cb55988d3f47dd9e248c5690Compiled by jenkins on Mon May  7 13:01:39 PDT 2012From source with checksum a60c9795e41a3248b212344fb131c12c

实现方式:

1.基于MultipleOutputs

 

实现代码:

mapper:访问hbase某个表然后利用MultipleOutputs写

import java.io.IOException;import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import org.apache.commons.lang.StringUtils;import org.apache.commons.lang.math.RandomUtils;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import com.alibaba.fastjson.JSONObject;public class CommentMapper extends TableMapper
{ private static final Log LOGGER = LogFactory.getLog(CommentMapper.class); private static Set
set = new HashSet
(); private org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
mos; @Override public void setup(Context context) { mos = new MultipleOutputs
(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); super.cleanup(context); } @Override protected final void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { try { List
list = value.list(); Iterator
iterator = list.iterator(); Map
map = new HashMap
(); while (iterator.hasNext()) { KeyValue keyValue = iterator.next(); byte[] bytes = value.getValue(keyValue.getFamily(), keyValue.getQualifier()); String keyId = StringUtils.lowerCase(Bytes.toString(keyValue.getFamily())) + StringUtils.capitalize(StringUtils.lowerCase(Bytes.toString(keyValue .getQualifier()))); if (set.contains(keyId)) { continue; } if ("eS".equals(keyId)) { map.put(keyId, Float.toString(Bytes.toFloat(bytes))); } else { map.put(keyId, Bytes.toString(bytes)); } } JSONObject json = new JSONObject(map); mos.write(RandomUtils.nextBoolean() + "", NullWritable.get(), new Text(json.toJSONString())); LOGGER.info("working dir:" + context.getWorkingDirectory().getName()); LOGGER.info("getInputSplit:" + Arrays.toString(context.getInputSplit().getLocations())); } catch (Throwable e) { LOGGER.error("Error occurs when running CommentMapper", e); throw new RunTimeException("Error occurs when running CommentMapper", e); } }}

Job执行:

private static void runJob() {	String inputTableName = "RECMD_JD_COMMENT";	Configuration conf = HBaseConfiguration.create();	conf.set("hbase.master", XXX);	conf.set("hbase.zookeeper.quorum", XXX);	conf.set("hbase.cluster.distributed", "true");	conf.set("mapreduce.job.counters.limit", "100000");	conf.set("mapreduce.job.counters.max", "100000");	String outPathStr = "/user/search/test/CommentText";	conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);	conf.set("mapreduce.output.basename", "val");	try {		HadoopUtil.delete(conf, new Path(outPathStr));		Scan scan = new Scan();		scan.setCacheBlocks(false);		scan.setCaching(200);		Job job = new Job(conf, "CommentDDTask");		job.setJarByClass(DDTask.class);		TableMapReduceUtil.initTableMapperJob(inputTableName, scan, CommentMapper.class,			NullWritable.class, Text.class, job);		TextOutputFormat.setOutputPath(job, new Path(outPathStr));		MultipleOutputs.addNamedOutput(job, "true", TextOutputFormat.class, NullWritable.class,			Text.class);		MultipleOutputs.addNamedOutput(job, "false", TextOutputFormat.class,			NullWritable.class, Text.class);		job.setNumReduceTasks(0);		job.waitForCompletion(true);	} catch (Throwable e) {		throw new RuntimeException("Run DDTask error! ", e);	} finally {		HConnectionManager.deleteConnection(conf, true);	}}

 

小技巧:

可以通过mapreduce.output.basename来控制写文件生成的名称

 

转载地址:http://bsqsi.baihongyu.com/

你可能感兴趣的文章
js-高德地图规划路线
查看>>
常用js收集
查看>>
mydata97的日期控件
查看>>
如何防止sql注入
查看>>
maven多工程构建与打包
查看>>
springmvc传值
查看>>
Java 集合学习一 HashSet
查看>>
在Eclipse中查看Android源码
查看>>
Android-Socket登录实例
查看>>
Android使用webservice客户端实例
查看>>
层在页面中的定位
查看>>
[转]C语言printf
查看>>
C 语言 学习---获取文本框内容及字符串拼接
查看>>
C 语言学习 --设置文本框内容及进制转换
查看>>
C 语言 学习---判断文本框取得的数是否是整数
查看>>
C 语言 学习---ComboBox相关、简单计算器
查看>>
C 语言 学习---ComboBox相关、简易“假”管理系统
查看>>
C 语言 学习---回调、时间定时更新程序
查看>>
C 语言 学习---复选框及列表框的使用
查看>>
第四章 - 程序计数器
查看>>