Integrating Hive UDF's in Python

I need to do some pretty flexible things in my Hive queries, so flexible that it’s beyond the capability of Hive QL. Writing a Hive UDF (user defined function) is an option. However, all the online examples I could find require the UDF to be a standing-alone script, placed at a known location in HDFS, and used via the ADD FILE statement that is understood by the Hive CLI. Having to put the script in HDFS and use it on a machine that has the Hive CLI installed means interruption to my Python code flow, which I hate. I want the Hive UDF to be seamlessly integrated into my Python code. How can I do that?

Preparations

Before describing my solution to UDF, I need to make two preparations: the first is a Python client that easily interacts with Hive server, as long as UDF is not involved; the second is a toy Hive table for testing.

First, the Python client is developed separately and is out of the scope of this post. Skipping all the details, the client is used like this to create and query a table mytable:

hive = Hive()

sql = '''
CREATE TABLE mydb.mytable (
    a INT, 
    b STRING, 
    c STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE'''

hive.write(sql)

sql = 'SELECT a, b, c FROM mydb.mytable'
hive.read(sql)
rows = hive.fetchall()

This can execute any valid query without ever leaving the Python code flow.

Second, the toy table, mydb.cars, contains two columns: an integer id, and a JSON string info_json. The following rows are inserted into the table:

1, '{"make": "honda", "price": 1000}'
2, '{"make": "ford", "price": 2000}'
3, '{"make": "ford"}'
4, '{"make": "tesla", "price": 3000}'
5, '{"make": "honda", "price": 2000}'
6, '{"make": "ford", "price": 4000}'

The UDF syntax

A Hive UDF in Python is a regular Python script. Suppose the script file is myudf.py. The basic syntax for using the UDF is as follows:

ADD FILE hdfs:///tmp/myudf.py;

SELECT 
    TRANSFORM (id, info_json)
    USING 'myudf.py'
    AS (out STRING)
FROM mydb.mytable

Here, FROM ... determines what records are processed by the UDF; TRANSFORM (...) specifies the columns to be sent to the UDF (if may skip some columns that are retrieved by FROM ...); the UDF prints out a single column, which is called out here. The script myudf.py is located in hdfs:///tmp/.

The FROM clause, as usual, can use a sub-query like this:

...
FROM (
    SELECT * FROM mydb.cars
) AS tmp

with all kinds of variations.

Let’s make myudf.py very simple—it simply returns the second field:

#!/usr/bin/env python
from __future__ import print_function
import sys

SEP = '\t'
NULL = '\\N'

def main():
    for line in sys.stdin:
        _, info_json = line.strip().split(SEP)
        print(info_json)


if __name__ == '__main__':
    main()

Here’s how a UDF script works:

  1. Input rows are fed to the script on stdin.
  2. The script takes each input row and prints out something, which is taken as the output. If the print-out contains the field separator '\t', then it is interpreted as multiple columns. In the example above, the print-out contains a single column, which is named out in the HiveQL statement that uses this UDF.
  3. Because print-outs to stdout are taken as the result, there is no way to enforce that each input row has to correspond to one output row. If the script prints multiple lines for an input line, they are simply all result rows. If the script does not print anything while processing a certain input line, that amounts to filtering out that input row.

I came to the following critical realization, which is not emphasized in any of the online tutorials I found:

The xyz specified in TRANSFORM (...) USING 'xyz' must be a program that can run on the Hive server. If it requires some other program, an environment variable, or any other dependencies, these should all be in place, because the program xyz must execute on that server machine. This also implies that the program must be on the system PATH so that it can be found (or xyz may include the full path). The program should take input and print out output as described above.

And this is all that needs to be satisfied. It does not matter what language the script is written in—e.g. it’s perfectly fine if it’s a binary executable produced by some C++ code—as long as it just runs. The ADD FILE statement is not essential—again, as long as xyz runs.

Let’s verify it. Let’s make a UDF that is simply echo xyz. This UDF ignores any input, and just prints out a single line of output, xyz. I logged into a machine that has the hive CLI, and did this:

$ hive
hive> use mydb;
OK
Time taken: 0.014 seconds
hive> SELECT TRANSFORM(id, info_json) USING 'echo xyz' AS (out STRING) FROM cars;
Total jobs = 1
Launching Job 1 out of 1
... ...
Total MapReduce CPU Time Spent: 1 seconds 940 msec
OK
xyz
Time taken: 47.852 seconds, Fetched: 1 row(s)
hive>

As expected, the ‘program’ echo xyz ran and printed a single row

xyz

This points in the direction of effort: if I can somehow include the entire UDF in the string ‘…’ after USING, then I don’t need to deal with creating a script, putting it in HDFS, and so on. It should just work!

Strategy

My goal is to write “in-line” Hive UDF in Python and send this code over to Hive server as part of the HiveQL statement, hence avoiding any script files. The way to run Python code without a script file is

$ python -c "python_code"

therefore the idea is to write something like

TRANSFORM (...)
USING 'python -c "python_udf_code"'

On the Hive server that I work with, the Python version is 2.7, and the Hive version is 0.13. Both old versions will come haunt me in just a little bit. Keep reading.

First, let’s make up a naive UDF to see it can work. The most naive UDF I can think of contains one statement, print 123:

hive> SELECT TRANSFORM(id, info_json) USING 'python -c "print 123"' AS (out STRING) FROM cars;
Total jobs = 1
Launching Job 1 out of 1
... ...
Total MapReduce CPU Time Spent: 1 seconds 900 msec
OK
123
Time taken: 303.91 seconds, Fetched: 1 row(s)
hive>

Although painfully slow, the 123 in the print-out is really encouraging!

Trouble begins

What if I want to print a string instead of the number 123? The quotes around the string need to be escaped, which shouldn’t be much of an issue, right? Let’s try it:

hive> SELECT TRANSFORM(id, info_json) USING 'python -c "print \'abc\'"' AS (out STRING) FROM cars;
...
Ended Job = job_1558314961036_2285 with errors
Error during job, obtaining debugging information...
...
Cased by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script.
...
FAILED: Execution Error, returning code 20003 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. An error occurred when trying to close the Operator running your custom script.
...

Ooops! The straightforward quote escaping does not work. Let’s skip this for now. The above is running the Hive CLI on a “gateway” box. Let’s switch to my laptop and experiment with the Hive client code mentioned above.

>>> hive = Hive()
>>> sql = '''
>>>     SELECT
>>>         TRANSFORM(
>>>             id,
>>>             info_json
>>>         )
>>>         USING 'python -c "print 123"'
>>>         AS (out STRING)
>>>     FROM mydb.cars
>>> '''
>>>
>>> hive.read(sql)
>>> hive.fetchall()
[('123',)]

It worked.

Now let’s try printing a string.

>>> sql = '''
>>>     SELECT
>>>         TRANSFORM(
>>>             id,
>>>             info_json
>>>         )
>>>         USING 'python -c "print \'abc\'"'
>>>         AS (out STRING)
>>>     FROM mydb.cars
>>> '''
>>>
>>> hive.read(sql)
...
...
impala.error.HiveServer2Error: Failed after retrying 3 times

The error message is not the most useful. For two or three hours, I tried every way I could think of to escape the quotation marks. No luck. This is not working. Printing strings has to be supported. The need is simply inevitable, of course.

The trouble is this:

  • the program after USING has to be quoted, like 'python ...';
  • the Python script in there has to be quoted, like 'python -c "..."';
  • then in the script, quotation mark has to be escaped.

But it does not work.

After a few more hours combing the Internet, I found the reason: Hive had a bug about “string literal escaping”. The bug has been fixed in version 2.0.0, but was not back-ported. Remember my Hive version is 0.13? It came out in April 2014. On the other hand, Hive 2.0.0 came out in Feb 2016. I don’t know our Hive server admin. What can I do?

On a second thought, even if escaping worked, it probably is not enough—my Python UDF may be a sophisticated piece of code. It’s hard to prescribe what can be written in there. To pass that code as a long string in this fragile HiveQL statement is likely going to fail any moment. So the challenge is: how can I pass a block of arbitrary Python code (the UDF) through HiveQL, where quotation escaping does not work?

Passing Python code through HiveQL

The solution is to transform the Python code into something that would be safe to pass in HiveQL, and back-transform it in the UDF and execute it. This transformed format should be a simple blob of characters without all kinds of peculiarities. Roughly, I need something like this:

USING 'python -c "exec(back_transform(blob))"'

What should ‘blob’ be? Is it a string that needs to be quoted? That would go back to ground zero…

Regarding the “transform”, occurred to me first was “compress”. But quickly that gave way to Base64 encoding/decoding. I had not used it before, and knew (and know) very little about it. Somehow it looked right to me. According to Wikipedia,

Base64 is a group of binary-to-text encoding schemes that represent binary data in an ASCII string format… Base64 is designed to carry data stored in binary formats across channels that only reliably support text content. Base64 is particularly prevalent on the World Wide Web…

Remember I’m using Python 3.6, whereas the Hive server is using Python 2.7. Suppose the UDF code is s, I first encode it to bytes, then Base64-encode it to a string by

base64.urlsafe_b64encode(s.encode())

The encoded string may contain 'a-z', 'A-Z', '0-9', and '-', '_'. Hive will not complain about it. Let’s take the simple UDF above, repeated below:

$ python3
Python 3.6.7 (default, Oct 22 2018, 11:32:17) 
[GCC 8.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> s = '''\
... from __future__ import print_function
... import sys
... 
... SEP = '\t'
... NULL = '\\N'
... 
... def main():
...     for line in sys.stdin:
...         _, info_json = line.strip().split(SEP)
...         print(info_json)
... 
... 
... if __name__ == '__main__':
...     main()
... '''
>>> import base64
>>> encoded = base64.urlsafe_b64encode(s.encode())
>>> encoded
b'ZnJvbSBfX2Z1dHVyZV9fIGltcG9ydCBwcmludF9mdW5jdGlvbgppbXBvcnQgc3lzCgpTRVAgPSAnCScKTlVMTCA9ICdcTicKCmRlZiBtYWluKCk6CiAgICBmb3IgbGluZSBpbiBzeXMuc3RkaW46CiAgICAgICAgXywgaW5mb19qc29uID0gbGluZS5zdHJpcCgpLnNwbGl0KFNFUCkKICAgICAgICBwcmludChpbmZvX2pzb24pCgoKaWYgX19uYW1lX18gPT0gJ19fbWFpbl9fJzoKICAgIG1haW4oKQo='
>>> 

To be sure, we can get back the code (please note the switch between Python versions):

$ python
Python 2.7.15rc1 (default, Nov 12 2018, 14:31:15) 
[GCC 7.3.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> encoded = 'ZnJvbSBfX2Z1dHVyZV9fIGltcG9ydCBwcmludF9mdW5jdGlvbgppbXBvcnQgc3lzCgpTRVAgPSAnCScKTlVMTCA9ICdcTicKCmRlZiBtYWluKCk6CiAgICBmb3IgbGluZSBpbiBzeXMuc3RkaW46CiAgICAgICAgXywgaW5mb19qc29uID0gbGluZS5zdHJpcCgpLnNwbGl0KFNFUCkKICAgICAgICBwcmludChpbmZvX2pzb24pCgoKaWYgX19uYW1lX18gPT0gJ19fbWFpbl9fJzoKICAgIG1haW4oKQo='
>>> type(encoded)
<type 'str'>
>>> import base64
>>> decoded = base64.urlsafe_b64decode(encoded)
>>> print(decoded)
from __future__ import print_function
import sys

SEP = '	'
NULL = '\N'

def main():
    for line in sys.stdin:
        _, info_json = line.strip().split(SEP)
        print(info_json)


if __name__ == '__main__':
    main()

>>> 

This looks really promising. At this point I imagined my UDF would look like this:

import base64
code = 'ZnJvbSBfX2Z1dHVyZV9fIGltcG9ydCBwcmludF9mdW5jdGlvbgppbXBvcnQgc3lzCgpTRVAgPSAnCScKTlVMTCA9ICdcTicKCmRlZiBtYWluKCk6CiAgICBmb3IgbGluZSBpbiBzeXMuc3RkaW46CiAgICAgICAgXywgaW5mb19qc29uID0gbGluZS5zdHJpcCgpLnNwbGl0KFNFUCkKICAgICAgICBwcmludChpbmZvX2pzb24pCgoKaWYgX19uYW1lX18gPT0gJ19fbWFpbl9fJzoKICAgIG1haW4oKQo='
exec(base64.urlsafe_b64decode(code))

and I’d write the Hive snippet this way

USING 'python -c "import base64; code = ...; exec(base64.urlsafe_b64decode(code))"'

But wait, the code variable is a string. The string value needs quotation marks. Ah…, hmm… Even if code takes its value from a variable, that variable has to take its string value earlier. At some point, the string has to get into the Python code, and without quotation marks, no string value can be represented to begin with. And my adorable five-year-behind-its-time Hive server does not like those quotation marks.

I searched whether there is a way in Python to create string values without quotation marks—hoping something like quote(abc) is equivalent to 'abc'. I did not find it. (I think R has something like that.)

As I’m writing this, I think I must be in a pretty creative mode the night when I worked out everythong described in this post. I came up with a solution to this problem without too much difficulty—

How can I get a string value into Python code without the help of quotation marks? When the code takes command-line arguments, the value provided is a string that does not need to be quoted, the value received in Python is a string, and quotation marks do not appear on the receiving side, either. The code goes like this:

def make_hive_udf(s):
    encoded = base64.urlsafe_b64encode(s.encode('utf-8'))

    script = 'import sys, base64; code = sys.argv[1]; exec(base64.urlsafe_b64decode(code));'

    code = f'python -c "{script}" {str(encoded)[2:-1]}'
    return code

This function returns the string that should be used after TRANSFORM (...) USING. When used in that context, the string should be wrapped in single quotes. In the last line, when I feed the encoded string (actually bytes in Python 3) to the script, I need to get rid of the quotation marks. To this end, I first turn the bytes to string,

>>> str(encoded)
"b'ZnJvbSBfX2Z1dHVyZV9fIGltcG9ydCBwcmludF9mdW5jdGlvbgppbXBvcnQgc3lzCgpTRVAgPSAnCScKTlVMTCA9ICdcTicKCmRlZiBtYWluKCk6CiAgICBmb3IgbGluZSBpbiBzeXMuc3RkaW46CiAgICAgICAgXywgaW5mb19qc29uID0gbGluZS5zdHJpcCgpLnNwbGl0KFNFUCkKICAgICAgICBwcmludChpbmZvX2pzb24pCgoKaWYgX19uYW1lX18gPT0gJ19fbWFpbl9fJzoKICAgIG1haW4oKQo='"

This string has three extra characters I don’t want: b' at the beginning and ' at the end, so I use the str(encoded)[2:-1] substring to get rid of them. In the end, the string returned by make_hive_udf is actually this:

'python -c "import sys, base64; code = sys.argv[1]; exec(base64.urlsafe_b64decode(bytes(code)));" ZnJvbSBfX2Z1dHVyZV9fIGltcG9ydCBwcmludF9mdW5jdGlvbgppbXBvcnQgc3lzCgpTRVAgPSAnCScKTlVMTCA9ICdcTicKCmRlZiBtYWluKCk6CiAgICBmb3IgbGluZSBpbiBzeXMuc3RkaW46CiAgICAgICAgXywgaW5mb19qc29uID0gbGluZS5zdHJpcCgpLnNwbGl0KFNFUCkKICAgICAgICBwcmludChpbmZvX2pzb24pCgoKaWYgX19uYW1lX18gPT0gJ19fbWFpbl9fJzoKICAgIG1haW4oKQo='

This whole string is in single quotes; the script after -c is in double quotes; the sole command-line argument (i.e. the long encoding of the actual UDF) is not quoted. (It could be double-quoted, and that should make no difference.)

Now let’s test-use make_hive_udf to print the string 'abc'.

>>> s = 'from __future__ import print_function; print("abc")'
>>> udf = make_hive_udf(s)
>>> sql = f'''
...     SELECT
...         TRANSFORM(
...             id,
...             info_json
...         )
...         USING '{udf}'
...         AS (out STRING)
...     FROM mydb.cars;
... '''
>>> print(sql)

	SELECT
	    TRANSFORM(
		id,
		info_json
	    )
	    USING 'python -c "import sys, base64; code = sys.argv[1]; exec(base64.urlsafe_b64decode(code));" ZnJvbSBfX2Z1dHVyZV9fIGltcG9ydCBwcmludF9mdW5jdGlvbjsgcHJpbnQoImFiYyIp'
	    AS (out STRING)
	FROM mydb.cars;

>>> hive = Hive()
>>> hive.read(sql).fetchall()
[('abc',)]

It worked.

UDAF

Related to UDF (user-defined functions), there is UDAF (user-defined aggregation function). While UDF is basically a map function on the table records (although we can also do filtering by not printing anything for certain records, and augmentation by printing out multiple lines for any particular record), UDAF is a group-by function on the table records.

To support group-by, the query needs to make sure the group of records are sent to the same worker node consecutively; this is achieved by the CLUSTER BY clause of HiveQL. On the other hand, in the UDAF, the code is free to do whatever it wants on a number of consecutive records. Of course, the query containing CLUSTER BY and the UDAF need to coordinate to do useful things.

Let’s get concrete with an example. Take our car data. Let’s group by make, and calculate average price and number of missing prices per make.

Since the UDAF requires the records to be clustered by make, the caller necessarily needs to extract make from info_json and use it in the CLUSTER BY clause. So, the caller may as well provide make in addition to info_json, and the UDAF does not need to extract make again from info_json. Here we assume the caller does not provide this convenience.

The UDAF code is the following:

s = '''
from __future__ import print_function
import itertools
import json
import sys

SEP = '\t'
NULL = '\\N'

def read_data(input_data=sys.stdin):
    for line in input_data:
        info = json.loads(line.strip())
        yield info['make'], info.get('price', NULL)

def main():
    data = read_data()
    for make, group in itertools.groupby(data, key=lambda x: x[0]):
        n_good = 0
        n_bad = 0
        price_sum = 0
        for _, price in group:
            if price == NULL:
                n_bad += 1
            else:
                p = float(price)
                price_sum += p
                n_good += 1
        avg_price = round(price_sum / n_good)
        print(make + SEP + str(avg_price) + SEP + str(n_bad))


if __name__ == '__main__':
    main()
'''

The code is self-explanatory once you understand the usage of itertools.groupby(data, key=...). This function gathers a group as consecutive elements in data for which the value of key does not change.

The Hive query that uses this UDAF is

sql = f'''
SELECT
    TRANSFORM (
        info_json
    )
    USING '{make_udf(s)}'
    AS (make STRING, avg_price FLOAT, null_prices INT)
FROM (
    SELECT
        id,
        info_json
    FROM mydb.cars
    CLUSTER BY GET_JSON_OBJECT(info_json, '$.make')
    ) AS t
'''

This post by Florian Wilheml titled “Hive UDFs and UDAFs with Python” explains CLUSTER BY.

Now let’s see what we get:

>>> hive = Hive()
>>> hive.read(sql)
>>> z = hive.fetchall_pandas().sort_values(['make'])
>>> print(z)
    make  avg_price  null_prices
1   ford     3000.0            1
0  honda     1500.0            0
2  tesla     3000.0            0

Make the UDF customizable

Both UDF and UDAF are just regular Python code. As long as it relies totally on the standard libraries and does not do any I/O other than the reading data records and printing out result records, it is whatever Python can do. We are pretty unstoppable by now.

Except for one limitation: the self-contained UDF or UDAF script is doing exactly one particular thing. If we want to do another, only slightly different thing, we have to write another self-contained script, likely repeating much code. In other words, the re-usability story is lacking.

We can not import non-standard-library code in the UDF/UDAF. This can not be overcome. However, we could make one UDF/UDAF do different things according to control parameters. These parameters are in addition to the data records. How do we pass these parameters to the UDF/UDAF?

This question can be asked in this concrete way: we launch a Python script, which waits for data records to be fed on stdin; but before data records start to come in, we want the script to read some known number of other values first. How do we do that?

We have done it. In the function make_hive_udf, the Python reads in one parameter, which is the encoded Python code. After that, the script continues to wait for data records:

code = f'python -c "{script}" {str(encoded)[2:-1]}'

To make the script more versatile, we can pass more parameters to it, let it process them before starting to wait for data records.

Let use our cars data to make up a simple example. Suppose we want to print out the cars’ make and price. When the price is missing for any particular record, we want to print a default value—this is a parameter. In addition, we wan to specify what country’s cars to print—this is another parameter. Here is the code:

s = '''
from __future__ import print_function
import json
import sys

SEP = '\t'
NULL = '\\N'
MAKES = {
    'jap': ['honda'],
    'america': ['ford', 'tesla']
}

def main(country, price_default):
    for line in sys.stdin:
        _, info_json = line.strip().split(SEP)
        info = json.loads(info_json)
        make = info['make']
        if country == 'all' or make in MAKES[country]:
            price = info.get('price', price_default)
            print(make + SEP + str(price))

if __name__ == '__main__':
    country = sys.argv[2]
    price_default = float(sys.argv[3])
    assert country == 'all' or country in MAKES
    main(country, price_default)
'''

Note that the parameters country and price_default are taken in as sys.argv[2] and sys.argv[3], respectively. This is because sys.argv[1] is used to take in the encoded code. To support such parameters, we revise the function make_hive_udf as follows:

def make_hive_udf(s, *args):
    encoded = base64.urlsafe_b64encode(s.encode('utf-8'))

    script = 'import sys, base64; code = sys.argv[1]; exec(base64.urlsafe_b64decode(code));'

    code = f'python -c "{script}" {str(encoded)[2:-1]}'

    if args:
        code = code + ' ' + ' '.join(str(v) for v in args)

    return code

We can use this UDF like the following:

>>> country = 'america'
>>> default_price = 250
>>> code = make_hive_udf(s, country, default_price)
>>> sql = f'''
>>> SELECT
>>>     TRANSFORM (
>>>         id,
>>>         info_json
>>>     )
>>>     USING '{code}'
>>>     AS (make STRING, price FLOAT)
>>> FROM mydb.cars
>>> '''
>>>
>>> hive = Hive()
>>> hive.read(sql)
>>> z = hive.fetchall_pandas().sort_values(['make', 'price'])
>>> print(z)
    make   price
2   ford   250.0
1   ford  2000.0
5   ford  4000.0
3  tesla  3000.0

Use UDF in dedicated modules

Remember our UDF is a proper Python script. It can be quite complex, containing multiple helper functions and such. It’s natural, and often more feasible, to define such a UDF in a separate file rather than in an inline string.

Let’s make a UDF a “module” for easy import. When we pass the UDF module object to the function make_udf, we can use the inspect module to get the source code of it as a string, then proceed as before. The function is revised as follows:

import base64
import inspect
from types import ModuleType
from typing import Union

def make_udf(module_or_code: Union[ModuleType, str], *args, py_command: str='python') -> str:
    '''
    This function takes a Python module or a code string,
    encodes it into a byte string, which is suitable for transmission over the Internet,
    and returns a simple Python code snipplet that decodes and executes the original source code.

    Args:
        `module_or_code`: either a Python module *object* (not the module name), 
            or a code block as a string.

            For example, if the UDF is module `mypackage.udfs.udf`, then you may do

                import mypackage.udfs.udf
                s = make_udf(mypackage.udfs.udf)

            but not

                s = make_udf('mypackage.udfs.udf')

        `args`: if present, positional arguments to the UDF script.
            Each argument value should be a simple string or number (which will be converted to a string)
            that does not contain any special characters (such as space, quotes) that
            could potentially cause trouble on the command-line.

            In this case, `module_or_code` processes a known number of arguments
            specified on the command-line before processing records coming from `stdin`.
            The number of args must be fixed; there can not be optional arguments with
            default values, because the UDF assumes data records begin after the known number
            of arguments.

    Suppose `s` is the output of this function, then it is used like this 
    to construct a HiveQL statement:

        sql = f"""
            SELECT
                TRANSFORM ( ...input_columns... )
                USING '{s}'
                AS (...output_columns...)
            FROM {db_name}.{table_name}
        """

    (The part after `FROM` is often a subquery rather than a table directly.)

    An important detail is that this string (`s`) is wrapped by single quotes 
    in the HiveQL statement, as shown above.

    If `module_or_code` is a UDAF rather than UDF, a `CLUSTER BY` clause is needed, like this:

        sql = f"""
            SELECT
                TRANSFORM (
                    ...input_columns_including_aggregation_columns... 
                    )
                USING '{s}'
                AS (...output_columns...)
            FROM (
                SELECT
                    ...columns_including_aggregation_columns...
                FROM {db_name}.{table_name}
                CLUSTER BY ...aggregation_columns...
                ) AS t
        """

    This function suppose 4 scenarios:

        1. UDF
        2. UDAF
        3. UDF with arguments
        4. UDAF with arguments

    The difference between UDF and UDAF does not appear in this function, but rather
    in `module_or_code` and in the HiveQL statements that use the output of this function
    (i.e. presence of `CLUSTER BY`).

    While the current function is written in Python 3.6+, the UDF `module_or_code` 
    is written in the Python version that is installed on the Hive server.
    '''

    if inspect.ismodule(module_or_code):
        s = inspect.getsource(module_or_code)
    else:
        assert isinstance(module_or_code, str)
        s = module_or_code

    encoded = base64.urlsafe_b64encode(s.encode('utf-8'))

    assert py_command in ('python', 'python3')
    if py_command == 'python':  # 'python' is assumed to be Python 2 here.
        script = 'import sys, base64; code = sys.argv[1]; exec(base64.urlsafe_b64decode(code));'
    else:
        script = 'import sys, base64; code = sys.argv[1]; exec(base64.urlsafe_b64decode(code).decode());'

    code = f'{py_command} -c "{script}" {str(encoded)[2:-1]}'

    if args:
        code = code + ' ' + ' '.join(str(v) for v in args)

    return code

This is our final version. A lot of power is packed in these 20 lines of code.

(First half drafted in May; finished in October, 2019.)

Written on October 26, 2019