相比直接编写 Hadoop MR 要方便不少,在调用时

 必赢官网     |      2019-12-27 12:32

原标题:MaxCompute重装上阵 第五弹 - SELECT TRANSFOR

UDTF

  • Hive中UDTF编写和使用

Hive 提供了 Transform 这一关键字,使用 python 脚本处理hive 的数据,实现 Map/Reduce 的效果,在一些场景下,相比直接编写 Hadoop MR 要方便不少。

摘要: MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。 MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

UDAF

  • Hive udaf开发入门和运行过程详解
  • Hive通用型自定义聚合函数(UDAF)

简介

首先简要介绍一下 hive sql 语句的编写逻辑以及 python 脚本的编写方法。

MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台, 尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。 MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

Hive中的TRANSFORM:使用脚本完成Map/Reduce

转自: http://www.coder4.com/archives/4052

首先来看一下数据:

hive> select * from test;
OK
1       3
2       2
3       1

假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

我们看一下Python的代码:

#!/home/tops/bin/python

import sys
import hashlib

for line in sys.stdin:
    line = line.strip()
    arr = line.split()
    md5_arr = []
    for a in arr:
        md5_arr.append(hashlib.md5(a).hexdigest())
    print "t".join(md5_arr)

在Hive中,使用脚本,首先要将他们加入:

add file /xxxx/test.py

然后,在调用时,使用TRANSFORM语法。

SELECT 
    TRANSFORM (col1, col2) 
    USING './test.py' 
    AS (new1, new2) 
FORM 
    test;

这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

这里有一个小坑:有时候,我们结合INSERT OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是t。不要考虑外面其他的分割符号!

最后,解释一下MAP、REDUCE。

在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!

所以、混用map reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add file。

如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

例如:

CREATE TABLE features
(
    id BIGINT,
    norm_features MAP<STRING, FLOAT> 
);

答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

例如,以上面的表结构为例,每行输出应为:

1^Ifeature1^C1.0^Bfeature2^C2.0

其中I是tab键,这是TRANSFORM要求的分割符号。B和^C是Hive存储时MAP类型的KV分割符。

另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 MAP<STRING, FLOAT>)

www.56.net ,hive 部分编写

hive transform sql 的一个很常用的模式是:

  • hive sql 通过查询语句获取输入源数据
  • 调用 python 脚本 MAP、REDUCE 处理数据
  • hive sql 将 python 的处理结果入库或其他操作后续操作

执行 hive 前要先加载 python 脚本,脚本可以上传到 hdfs 上,通过语句 ADD FILE hdfs://xxxx 加载。

比如从 表A 中 读取数据,通过 python MAP/REDUCE脚本处理后将处理结果写入表 B,对应的 hive 语句约为:

ADD FILE hdfs://xxxx;
FROM (
  FROM (
    SELECT *
            FROM TABLE-A
  ) T  
  MAP T.a, T.b, T.c
  USING 'python ./map.py'
  AS d, e, f
  CLUSTER BY d
) map_out
INSERT OVERWRITE TABLE-B
REDUCE map_out.d, map_out.e, map_out.f
USING 'python ./reduce.py'
AS (g ,h ,i)

MaxCompute基于ODPS2.0新一代的SQL引擎,显著提升了SQL语言编译过程的易用性与语言的表达能力。我们在此推出MaxCompute(ODPS2.0)重装上阵系列文章

Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

/**
 * Mapper.
 */
public interface Mapper {
  /**
   * Maps a single row into an intermediate rows.
   * 
   * @param record
   *          input record
   * @param output
   *          collect mapped rows.
   * @throws Exception
   *           on error
   */
  void map(String[] record, Output output) throws Exception;
}

可以将一列拆分为多列

使用样例:

public class ExecuteMap {

    private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";

    private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Process class must be given");
        }

        new GenericMR().map(System.in, System.out,
                getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
    }

    private static Mapper getMapper(String parserClass, String[] args)
            throws ClassNotFoundException {
        if (mappers.containsKey(parserClass)) {
            return mappers.get(parserClass);
        }

        Class[] classes = new Class[args.length];
        for (int i = 0; i < classes.length; ++i) {
            classes[i] = String.class;
        }
        try {
            Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
            mappers.put(parserClass, mapper);
            return mapper;
        } catch (ClassNotFoundException e) {
            throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
        } catch (Exception e) {
            throw new  ClassNotFoundException("Error Constructing processor", e);
        }

    }
}

MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "

COMMAND="FROM dw_rtb.event_fact_adx_auction "
COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "

必赢 ,python 部分编写

python 脚本的处理逻辑大概可以分为三部分:

  • 从 hive 获取输入数据
  • map、reduce 操作
  • 输出数据给 hive

其中输入、输出部分是利用系统标准输入输出流实现的,python 从 sys.stdin 中获取 hive 传入的数据,将处理结果通过 sys.stdout 传给 hive。

python 标准输入获取的每一行对应 hive sql 的一条数据,每一行通过 t 区分 hive 表的各个字段值。同样的,输出给 hive 的每一行中不同的字段值也要通过 't' 连接,否则 hive 会解析错误。

以 map 处理为例,python 脚本通过用的模式如下:

#!/usr/bin/env python
# coding: utf8
import sys

def map_field(a,b):
    return a+1, b+1

for line in sys.stdin:
    a, b = line.split('t')
    c, d = map_field(a,b)
    print c + 't' + d

第一弹 - 善用MaxCompute编译器的错误和警告

Hive Python Streaming的原理及写法

http://www.tuicool.com/articles/vmumUjA

Example

下面介绍一个使用 hive-transform 统计用户 Get/Post 请求数的例子。在这个例子中将从一张记录所有用户请求记录的表 member_source_request 中读取源数据,并过滤掉 OPTION 等请求,只统计 GET,POST,PUT,DELET 四种请求。并将记录结果写入到一张 member_method_count 表中。

member_source_request 表 schema:

Name Type
1 member_id int
2 method string
3 url string
4 ip string
5 ... ...

创建 member_method_count 表:

CREATE TABLE tmp.member_method_count(
    member_id INT,
    get_request BIGINT,
    put_request BIGINT,
    post_request BIGINT,
    delete_request BIGINT
)
partitioned by (`date` INT)

第二弹 - 新的基本数据类型与内建函数

Hive SQL

ADD FILE hdfs:///member-method.py; 
FROM (
    FROM (
        SELECT *
        FROM tmp.member_source_request
        WHERE member_id is not null
    ) T
    MAP T.member_id, T.method, T.url
    USING 'python ./member-method.py --mapper'
    AS member_id, method, url
    CLUSTER BY member_id
) map_out
INSERT OVERWRITE TABLE tmp.member_method_count PARTITION (date=20180220)
REDUCE map_out.member_id, map_out.method, map_out.url
USING 'python ./member-method.py --reducer'
AS member_id, get_request, put_request, post_request, delete_request

第三弹 - 复杂类型

python 脚本

MemberRequestJob 为具体实现 map、reduce 逻辑,其父类可服用

#!/usr/bin/env python
# coding: utf8
import sys
from collections import defaultdict

class MRJob(object):
    def __init__(self, sep='t'):
        self.sep = sep

    def map(self, line):
        raise NotImplementedError()

    def reduce(self, key, value):
        raise NotImplementedError()

    def map_end(self):
        pass

    def reduce_end(self):
        pass

    def run_mapper(self):
        for line in sys.stdin:
            line = line.strip('n').strip('t')
            self.map(line)
        if hasattr(self, 'map_end'):
            self.map_end()

    def run_reducer(self):
        for line in sys.stdin:
            line = line.strip('n').strip('t')
            key, value = line.split(self.sep, 1)
            self.reduce(key, value)
        if hasattr(self, 'reduce_end'):
            self.reduce_end()

    def output(self, key=None, value=None):
        print str(key) + self.sep + str(value)

    def run(self):
        if len(sys.argv) <= 1:
            raise Exception('--mapper or --reducer must be set')
        self.args = tuple(sys.argv[2:])
        if sys.argv[1] == '--mapper':
            self.run_mapper()
        elif sys.argv[1] == '--reducer':
            self.run_reducer()


class FieldMRJob(MRJob):

    def __init__(self, field_sep='t', sep='t'):
        MRJob.__init__(self, sep)
        self.field_sep = field_sep

    def map_fields(fields):
        raise NotImplementedError()

    def reduce_fields(key, fields):
        raise NotImplementedError()

    def map(self, line):
        fields = line.split(self.field_sep)
        self.map_fields(fields)

    def reduce(self, key, value):
        values = value.split(self.field_sep)
        self.reduce_fields(key, values)

    def output(self, key=None, values=()):
        value = self.field_sep.join(map(str, values))
        MRJob.output(self, key, value)


class MemberRequestJob(FieldMRJob):
    def __init__(self):
        FieldMRJob.__init__(self)
        self.all_member = set()
        self.all_get_counts = defaultdict(int)
        self.all_post_counts = defaultdict(int)
        self.all_put_counts = defaultdict(int)
        self.all_delete_counts = defaultdict(int)

    def map_fields(self, fields):
        member_id, method, url = fields
        if method in ['GET', 'PUT', 'POST', 'DELETE']:
            self.output(member_id, (method, url))

    def reduce_fields(self, member_id, fields):
        method, url = fields
        self.all_member.add(member_id)
        if method == 'GET':
            self.all_get_counts[member_id] += 1
        elif method == 'POST':
            self.all_post_counts[member_id] += 1
        elif method == 'PUT':
            self.all_put_counts[member_id] += 1
        else:
            self.all_delete_counts[member_id] += 1

    def reduce_end(self):
        for member_id in list(self.all_member):
            self.output(member_id, (self.all_get_counts.get(member_id, 0),
                                    self.all_put_counts.get(member_id, 0),
                                    self.all_post_counts.get(member_id, 0),
                                    self.all_delete_counts.get(member_id, 0)
                                    )
                        )


if __name__ == '__main__':
    MemberRequestJob().run()

第四弹 - CTE,VALUES,SEMIJOIN

上次向您介绍了CTE,VALUES,SEMIJOIN,本篇向您介绍MaxCompute对其他脚本语言的支持

  • SELECT TRANSFORM。

  • 场景1

  • 我的系统要迁移到MaxCompute平台上,系统中原来有很多功能是使用脚本来完成的,包括python,shell,ruby等脚本。 要迁移到MaxCompute上,我需要把这些脚本全部都改造成UDF/UDAF/UDTF。改造过程不仅需要耗费时间人力,还需要做一遍又一遍的测试,从而保证改造成的udf和原来的脚本在逻辑上是等价的。我希望能有更简单的迁移方式。
  • 场景2
  • SQL比较擅长的是集合操作,而我需要做的事情要对一条数据做更多的精细的计算,现有的内置函数不能方便的实现我想要的功能,而UDF的框架不够灵活,并且Java/Python我都不太熟悉。相比之下我更擅长写脚本。我就希望能够写一个脚本,数据全都输入到我的脚本里来,我自己来做各种计算,然后把结果输出。而MaxCompute平台就负责帮我把数据做好切分,让我的脚本能够分布式执行,负责数据的输入表和输出表的管理,负责JOIN,UNION等关系操作就好了。

上述功能可以使用SELECT TRANSFORM来实现

SELECT TRANSFORM 介绍

此文中采用MaxCompute Studio作展示,首先,安装MaxCompute Studio,导入测试MaxCompute项目,创建工程,建立一个新的MaxCompute脚本文件, 如下

www.56.net 1

提交作业可以看到执行计划(全部展开后的视图):

www.56.net 2

Select transform允许sql用户指定在服务器上执行一句shell命令,将上游数据各字段用tab分隔,每条记录一行,逐行输入shell命令的stdin,并从stdout读取数据作为输出,送到下游。Shell命令的本质是调用Unix的一些utility,因此可以启动其他的脚本解释器。包括python,java,php,awk,ruby等。

该命令兼容Hive的Transform功能,可以参考Hive的文档。一些需要注意的点如下:

  1. Using 子句指定的是要执行的命令,而非资源列表,这一点和大多数的MaxCompute SQL语法不一样,这么做是为了和hive的语法保持兼容。

  2. 输入从stdin传入,输出从stdout传出;

  3. 可以配置分隔符,默认使用 t 分隔列,用换行分隔行;

  4. 可以自定义reader/writer,但用内置的reader/writer会快很多

  5. 使用自定义的资源(脚本文件,数据文件等),可以使用 set odps.sql.session.resources=foo.sh,bar.txt; 来指定。可以指定多个resource文件,用逗号隔开(因此不允许resource名字中包含逗号和分号)。此外我们还提供了resources子句,可以在using 子句后面指定 resources 'foo.sh', 'bar.txt' 来指定资源,两种方式是等价的(参考“用odps跑测试”的例子);