• Welcome to the world's largest Chinese hacker forum

    Welcome to the world's largest Chinese hacker forum, our forum registration is open! You can now register for technical communication with us, this is a free and open to the world of the BBS, we founded the purpose for the study of network security, please don't release business of black/grey, or on the BBS posts, to seek help hacker if violations, we will permanently frozen your IP and account, thank you for your cooperation. Hacker attack and defense cracking or network Security

    business please click here: Creation Security  From CNHACKTEAM

Recommended Posts

需求:一个主题包含很多个表信息,需要自动根据数据字符串中的字段来写入到储备不同的表对应的路径中。

发送到卡夫卡中的数据原本最外层原本没有pkDay和项目,只有数据和姓名。因为担心数据里面会空值,所以根同事商量,让他们在最外层添加了项目和pkDay字段。

pkDay字段用于表的自动分区,项目和名字合起来用于自动拼接储备表的名称为ods _项目_名称

{

数据' : {

区块号: 12371220,

链条' : '以太坊:

当前时间戳' : 1620177708,

费用: 3000,

项目":"测试一

},

名称' : '视图:

pkDay': '2021-05-05 ',

项目":"测试一

}

自定义Flume拦截器开发

公共类分块提取器实现侦听器{

记录器日志=记录器工厂。获取记录器(这。getclass());

JsonParser解析器=null

@覆盖

public void initialize() {

parser=new JSON parser();

}

@覆盖

公共事件拦截(事件事件){

//获取数据身体

byte[]body=事件。get body();

MapString,字符串头map=event。获取头();

字符串str=新字符串(正文,字符集.UTF _ 8);

JSON元素element=解析器。parse(str);

尝试{

JSON对象root=element。getasjsonobject();

日志。调试(str);

JSON对象数据obj=root。get(' data ').getAsJsonObject();

String name=root.get('name ').getAsString().toLowerCase();

string project=" none

如果(对象。非空(根。get(' project '){

project=root.get('project ').getAsString().toLowerCase();

}

String pk_day=root.get('pkDay ').getAsString();

字符串PK _ year=PK _ day。子串(0,4);

字符串PK _ month=PK _ day。子串(0,7);

日志。debug(' name======' name);

//直接取数据中的数据值输出到身体中

event.setBody(dataObj.toString().getBytes(字符集UTF _ 8);

//设置页眉

headerMap.put('pk_year ',PK _ year);

headerMap.put('pk_month ',PK _ month);

headerMap.put('pk_day ',PK _ day);

headerMap.put('name ',name);

headerMap.put('project ',project);

}catch(异常e){

日志错误(字符串);

日志。error(e . getmessage());

//异常信息单独写入

if(null!=str){

headerMap.put('error ',' error ');

事件。设置主体(字符串。getbytes(字符集.UTF _ 8));

}

e。printstacktrace();

}

返回事件;

}

@覆盖

公共列表事件拦截(列表事件事件){

对于(事件事件:事件){

拦截(事件);

}

返回事件;

}

@覆盖

公共无效关闭(){

}

公共静态类生成器实现拦截器。建造者{

@覆盖

公共拦截器构建(){

返回新的BlockExtractorInterceptor();

}

@覆盖

公共空的配置(上下文上下文){

}

}

}

我这里判断了如果项目为空时,直接丢到没有人里面,那么结果表名为ods _无_名。

幸亏提前预判了他们会出现空的项目,结果接收时,果然他们给了一些是项目为空的。然后让他们修改数据了。

事件事件中可以获取到两个属性,一个是event.getHeaders(),一个是event.getBody()。

拿到数据后用数据的值覆盖了身体中的值10 .一标题中的值存进去PK _年,主键_月,主键_日,以及名字和项目。这个后面水道配置文件中使用。

注意一个细节,解析数据串时候,使用的是gson,刚好水道的解放运动目录下也有开源项目包,所以不需要额外添加开源项目的包。之前使用使用发现了一些坑。

打包编译

编译好,把生成的冲突包放到水道的解放运动目录下。

flume配置

test.sinks.k1.type=hdfs

测试。水槽。k1。HDFS。路径=hdfs://ns1//user/hive/warehouse/ods.db/ods_%{project}_%{name}/pk_year=%{pk_year}/pk_month=%{pk_month}/pk_day=%{pk_day}

测试。水槽。k1。HDFS。文件前缀=%{project}_%{name}

test.sinks.k1.hdfs.fileSufix=.原木

测试。水槽。k1。HDFS。uselocaltimestamp=true

测试。水槽。k1。HDFS。批量=500

测试。水槽。k1。HDFS。文件类型=数据流

测试。水槽。k1。HDFS。书写格式=文本

测试。水槽。k1。HDFS。rollsize=2147483648

测试。水槽。k1。HDFS。滚动间隔=0

test.sinks.k1.hdfs.rollCount=0

测试。水槽。k1。HDFS。空闲超时=120

测试。水槽。k1。HDFS。minblockreplicas=1

test.channels.c1.type=file

测试。渠道。C1。检查点dir=/home/Hadoop/big data/flume _ job/chk dir/http://www . Sina.com/project_%

测试。渠道。C1。datadirs=/home/Hadoop/big data/flume _ job/dataDir/http://www . Sina.com/{project}

测试。来源。S1。频道=C1

test.sinks.k1.channel=c1

测试结果如下:

标记红色部分:test.sinks.k1.hdfs.path和test.sinks.k1.hdfs.filePrefix可以使用页眉中放入的字段

标记蓝色部分:test.channels.c1.checkpointDir和测试频道c1。数据目录不能使用页眉中放入的字段。

我们使用红色部分的设置已经可以满足,根据字符串中不同的字段设置不同的分布式文件系统存放路径了。

其实官方有个正则提取的拦截器也可以,不过正则提取的规则比较麻烦,就怕存在重复的多层嵌套曾在重复的字段时可能会出错,我对正则也不太熟悉吧。

总之能够解决我的需求问题就行。

Link to comment
Share on other sites