春雨里洗过的太阳

世间所有的相遇,都是久别重逢

解析树图的自定义函数

一 解析树图

采用hive 的udf或udtf构建解析树图,当树图不大的时候可以,但是当树图很大时,效率不高,因为它需要将整个树加载到内存中。

对于复杂的树形结构分析,建议使用更专业的图处理工具或框架,如Apache Giraph、Apache Flink的Gelly库或Spark的GraphX库等。这些工具提供了更强大和高效的图算法实现。

二 UDF

​ 这种适合处理树图不大的数据结构,需要将整个关系整合成一个map,效率不高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.lx.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
import org.codehaus.groovy.util.ListHashMap;

import java.util.*;

// 输入整颗树 输出树的所有路径 udf
//SELECT explode(tree_path(map(
// 'A', null,
// 'B', 'A',
// 'C', 'A',
// 'D', 'B',
// 'E', 'B',
// 'F', 'C'
// ))) AS path;
public class TreePathUDF extends UDF {
public static List<Text> evaluate(Map<String, String> tree) {
List<Text> paths = new ArrayList<>();
buildPaths("", tree, paths);
return paths;
}

private static void buildPaths(String currentPath, Map<String, String> tree, List<Text> paths) {
for (Map.Entry<String, String> entry : tree.entrySet()) {
String nodeId = entry.getKey();
String parentId = entry.getValue();

if (parentId == null || "".equals(parentId)) { // Root node
buildPathsFromNode(nodeId, "", tree, paths);
}
}
}

private static void buildPathsFromNode(String nodeId, String currentPath, Map<String, String> tree, List<Text> paths) {
String newPath = currentPath.isEmpty() ? nodeId : currentPath + "->" + nodeId;
paths.add(new Text(newPath));

for (Map.Entry<String, String> entry : tree.entrySet()) {
if (entry.getValue() != null && entry.getValue().equals(nodeId)) {
buildPathsFromNode(entry.getKey(), newPath, tree, paths);
}
}
}

public static void main(String[] args) {
// hashmap的k不可重复,故使用IdentityHashMap 这里内部比较k比较的是存储地址,不是值
// 上面udf 传入的map 的k需要全部 new String() 功能未摘录进来,自行处理
Map<String, String> tree = new IdentityHashMap<>();

tree.put("1", "");
tree.put("2", "");
tree.put(new String("3"), "1");
tree.put(new String("3"), "2");
tree.put("4", "1");
tree.put("5", "2");
tree.put(new String("7"), "3");
tree.put("8", "3");
tree.put(new String("7"), "5");
tree.put(new String("7"), "4");
tree.put("11", "6");
tree.put("12", "6");

// for (Map.Entry<String, String> entry : tree.entrySet()){
// System.out.println(entry.getKey() + " " + entry.getValue());
// }

List<Text> evaluate = evaluate(tree);
for (Text text : evaluate) {
System.out.println(text);
}
}
}

三 UDTF

在下面的代码中,我们假设输入数据是一个包含idparent_id字段的结构体。process方法会读取每一行输入,构建子节点映射(childrenMap),其中键是父节点ID,值是其所有子节点的列表。然后,它调用generatePaths方法来递归地生成所有从根到叶的路径,并使用forward方法将每条路径作为输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.lx.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
//ADD JAR /path/to/your/compiled/TreePathUDTF.jar;
// CREATE TEMPORARY FUNCTION tree_path_udtf AS 'your.package.TreePathUDTF';
//SELECT path FROM your_table LATERAL VIEW tree_path_udtf(id, parent_id) paths AS path;
public class TreePathUDTF extends GenericUDTF {

@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
// Assume args[0] is the id and args[1] is the parent_id
if (args.length != 2 || args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
|| args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("TreePathUDTF requires two primitive arguments: id and parent_id.");
}

// Define the output schema
List<String> fieldNames = new ArrayList<>();
List<ObjectInspector> fieldTypes = new ArrayList<>();
fieldNames.add("path");
fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldTypes
);
}

@Override
public void process(Object[] args) throws HiveException {
// Build the tree structure in memory
Map<String, List<String>> tree = new HashMap<>();
String currentId = args[0].toString();
String parentId = args[1] == null ? null : args[1].toString();

tree.putIfAbsent(parentId, new ArrayList<>());
tree.get(parentId).add(currentId);

// Generate all paths using DFS
generatePaths(tree, null, new StringBuilder(), new Stack<>());
}

private void generatePaths(Map<String, List<String>> tree, String currentNode, StringBuilder pathBuilder, Stack<String> pathStack) throws HiveException {
if (currentNode != null) {
pathStack.push(currentNode);
pathBuilder.append(currentNode);
if (!pathBuilder.toString().isEmpty()) {
pathBuilder.append("->");
}
}

// If this is a leaf node, output the path
if (!tree.containsKey(currentNode) || tree.get(currentNode).isEmpty()) {
String path = pathBuilder.toString();
// Remove the last "->" if exists
if (path.endsWith("->")) {
path = path.substring(0, path.length() - 2);
}
forward(path);
pathStack.pop();
return;
}

// Recursively process children
for (String child : tree.get(currentNode)) {
generatePaths(tree, child, pathBuilder, pathStack);
}

if (currentNode != null) {
pathBuilder.setLength(pathBuilder.length() - currentNode.length() - 2); // Remove the current node and "->" from the path
pathStack.pop();
}
}

@Override
public void close() throws HiveException {
// Clean up resources if needed
}
}

三 python

​ Hive原生并不支持Python UDF直接返回多行(即UDTF的功能),这通常需要使用Java或其他支持的语言来实现。不过,我们可以通过一些变通方法,在Python UDF中返回一种结构化格式(如JSON或CSV格式的字符串),然后在Hive查询中解析这些格式来获得多行输出。

​ 当时测试写了三种方案,上面u d t f测试效果最好,故下面的并未完成测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# find_paths.py  
import sys

def find_all_paths(edges, start_node):
paths = []
stack = [(start_node, [start_node])]
while stack:
node, path = stack.pop()
if node is None: # Sentinel value indicating a path end
paths.append(path[:-1]) # Exclude the sentinel from the path
continue
paths.append(path)
for child in edges.get(node, []):
stack.append((child, path + [child]))
stack.append((None, path)) # Add a sentinel to mark the path end
return paths

def main():
edges = {}
for line in sys.stdin:
parent, child = line.strip().split('\t')
if parent == 'NULL': # Assuming the root node's parent is marked as 'NULL'
parent = None
edges.setdefault(parent, []).append(child)

all_paths = find_all_paths(edges, None) # Assuming the tree starts from a root node with no parent (None)
for path in all_paths:
print('->'.join(path))

if __name__ == '__main__':
main()



ADD FILE /path/to/find_paths.py; -- 替换为你的Python脚本的实际路径

SELECT TRANSFORM (parent, child)
USING 'python find_paths.py'
AS path
FROM tree_data;
------------------------------------------===================================2

# tree_paths_udf.py
import json

def find_all_paths(edges, start_node=None):
"""
Find all paths from the start_node to the leaves.
If start_node is None, it starts from the root nodes (nodes without parent).
"""
# Build adjacency list representation of the tree
adjacency_list = {}
for edge in edges:
parent, child = edge['parent'], edge['child']
adjacency_list.setdefault(parent, []).append(child)

# DFS to find all paths
def dfs(node, path, paths):
path.append(node)
if node not in adjacency_list:
paths.append(list(path))
else:
for child in adjacency_list[node]:
dfs(child, path, paths)
path.pop()

paths = []
if start_node:
dfs(start_node, [], paths)
else:
# If no start_node is specified, start from all root nodes
root_nodes = set(adjacency_list.keys()) - set(child for children in adjacency_list.values() for child in children)
for root in root_nodes:
dfs(root, [], paths)

return json.dumps(paths)

# This function will be called by Hive
def tree_paths(edges_json):
edges = json.loads(edges_json)
return find_all_paths(edges)



ADD FILE hdfs:///path/to/your/tree_paths_udf.zip; -- 或者上传的Python可执行文件路径

CREATE TEMPORARY FUNCTION tree_paths_udf AS 'tree_paths'
USING 'python tree_paths_udf.zip'; -- 根据你的上传文件类型调整这里


WITH edges AS (
SELECT CONCAT('{"parent": "', parent, '", "child": "', child, '"}') AS edge_json
FROM parent_child_relations
)
SELECT
path
FROM (
SELECT
explode(split(regexp_replace(tree_paths_udf(concat('[', collect_list(edge_json), ']')), '\\\\"', '"'), ',')) AS path_json
FROM edges
) t
LATERAL VIEW json_tuple(path_json, 'value') exploded_table AS path;


============================================================


import sys

# 定义一个全局变量来存储树结构
tree = {}

def build_tree(parent_id, child_id):
"""
构建树形结构
"""
if parent_id not in tree:
tree[parent_id] = []
tree[parent_id].append(child_id)

def find_all_paths(node, path=[]):
"""
递归查找所有从根到叶的路径
"""
path = path + [node]
if node not in tree:
yield path
for child in tree.get(node, []):
yield from find_all_paths(child, path)

def main():
"""
主函数,读取标准输入并输出所有路径
"""
for line in sys.stdin:
parent_id, child_id = line.strip().split()
build_tree(parent_id, child_id)

# 假设根节点的parent_id为'root'或者可以指定一个特定的起始节点
root_node = 'root'
for path in find_all_paths(root_node):
print('->'.join(path))

if __name__ == "__main__":
main()




ADD FILE /path/to/find_paths.py;

CREATE TABLE all_paths AS
SELECT TRANSFORM (parent_id, child_id)
USING 'python find_paths.py'
AS path
FROM parent_child_relations;