应用背景:业务逻辑需要写一些模糊匹配项,比如column1字段包含"abc"的数据,我们经常会这样: column like '%abc%'
或 column regexp 'abc'
。假如要包含的不仅仅是abc,可能还有abc1、abc2、….,那我们就需要写很多or like语句,对内存消耗是极大的,那下面这个东西就是就是为了解决这个匹配多词的。
一、构造关键词字典数
import java.util.*;
/**
* 构造字典树
* @Description: TODO
* @author zhangjinke
* @date 2021-03-31 14:13:57
* @version V1.0
*/
public class Trie
{
public Node StartNode;//起始状态
public Map<String, Integer> m_Keywords = new HashMap<String, Integer>();//记录了当前关键词集合
//初始化Trie
public Trie()
{
StartNode = new Node(null);
StartNode.m_Failure = StartNode;
}
// 清除所有Trie节点
public void closeTrie()
{
CleanStates(StartNode);
}
// 增加关键词
public void AddKeyword(String keyword)
{
m_Keywords.put(keyword, 0);
//RebuildTrie();
}
// 删除关键词
public void DeleteKeyword(String keyword)
{
m_Keywords.remove(keyword);
RebuildTrie();
}
// 检索字符串text是否包含关键词,返回字符串
public String Search(String text)
{
Node curState = StartNode;
int i;
HashSet<String> res=new HashSet<String>();
// 查看状态机中当前状态下该字符对应的下一状态,如果在当前状态下找不到满足该个字符的状态路线,
// 则返回到当前状态的失败状态下继续寻找,直到初始状态
for (i = 0; i < text.length(); ++i)
{
// while (!curState.m_Goto.containsKey(text.charAt(i)) == false)
while (!curState.m_Goto.containsKey(text.charAt(i)))
{
if (curState.m_Failure != StartNode)
{
if (curState == curState.m_Failure)
{ //陷入死循环了...
System.out.println("Trie Failure");
break;
}
curState = curState.m_Failure; // 返回到当前状态的失败状态
}
else
{
curState = StartNode;
break;
}
}
// 如果当前状态下能找到该字符对应的下一状态,则跳到下一状态m,
// 如果状态m包含了m_Output,表示匹配到了关键词,具体原因请继续往下看
if (curState.m_Goto.containsKey(text.charAt(i)))
{
curState = curState.m_Goto.get(text.charAt(i));
if (!curState.m_Output.isEmpty())
{
for(int j=0;j<curState.m_Output.size();j++)
{
res.add(curState.m_Output.get(j));
}
}
}
}
return res.toString();
}
// 检索字符串text是否包含关键词,返回HashSet
public HashSet<String> SearchSet(String text)
{
Node curState = StartNode;
int i;
HashSet<String> res=new HashSet<String>();
// 查看状态机中当前状态下该字符对应的下一状态,如果在当前状态下找不到满足该个字符的状态路线,
// 则返回到当前状态的失败状态下继续寻找,直到初始状态
for (i = 0; i < text.length(); ++i)
{
while (!curState.m_Goto.containsKey(text.charAt(i)))
{
if (curState.m_Failure != StartNode)
{
if (curState == curState.m_Failure)
{ //陷入死循环了...
System.out.println("Trie Failure");
break;
}
curState = curState.m_Failure; // 返回到当前状态的失败状态
}
else
{
curState = StartNode;
break;
}
}
// 如果当前状态下能找到该字符对应的下一状态,则跳到下一状态m,
// 如果状态m包含了m_Output,表示匹配到了关键词,具体原因请继续往下看
if (curState.m_Goto.containsKey(text.charAt(i)))
{
curState = curState.m_Goto.get(text.charAt(i));
if (!curState.m_Output.isEmpty())
{
for(int j=0;j<curState.m_Output.size();j++)
{
res.add(curState.m_Output.get(j));
}
}
}
}
return res;
}
/**
* 内部类Node,表示Trie的状态节点
*/
private class Node
{
public Node(Node parent)
{
this.m_Parent = parent;
this.m_Failure = null;
}
// 记录了该状态节点下,字符-->另一个状态的对应关系
public Map<Character, Node> m_Goto = new HashMap<Character, Node>();
// 如果该状态下某具体字符找不到对应的下一状态,应该跳转到m_Failure状态继续查找
public Node m_Failure;
// 该状态节点的前一个节点
public Node m_Parent;
// 记录了到达该节点时,匹配到的关键词
public List<String> m_Output = new ArrayList<String>();
// 为当前状态节点添加字符c对应的下一状态
Node AddGoto(char c)
{
if (!m_Goto.containsKey(c))
{
// not in the goto table
Node newState = new Node(this);
m_Goto.put(c, newState);
return newState;
}
else
{
return m_Goto.get(c);
}
}
};
// 添加关键词到Trie节点
void DoAddWord(String keyword)
{
int i;
Node curState = StartNode;
for (i = 0; i < keyword.length(); i++)
{
curState = curState.AddGoto(keyword.charAt(i));
}
curState.m_Output.add(keyword);
}
// 建立Trie
public void RebuildTrie()
{
CleanStates(StartNode);
StartNode = new Node(null);
StartNode.m_Failure = StartNode;
// add all keywords
for (String key : m_Keywords.keySet())
{
DoAddWord(key);
}
// 为每个状态节点设置失败跳转的状态节点
DoFailure();
}
// 清除state下的所有状态节点
void CleanStates(Node state)
{
for (char key : state.m_Goto.keySet())
{
CleanStates(state.m_Goto.get(key));
}
state = null;
}
// 为每个状态节点设置失败跳转的状态节点
void DoFailure()
{
LinkedList<Node> q = new LinkedList<Node>();
// 首先设置起始状态下的所有子状态,设置他们的m_Failure为起始状态,并将他们添加到q中
for (char c : StartNode.m_Goto.keySet())
{
q.add(StartNode.m_Goto.get(c));
StartNode.m_Goto.get(c).m_Failure = StartNode;
}
while (!q.isEmpty())
{
// 获得q的第一个element,并获取它的子节点,为每个子节点设置失败跳转的状态节点
Node r = q.getFirst();
Node state;
q.remove();
for (char c : r.m_Goto.keySet())
{
q.add(r.m_Goto.get(c));
// 从父节点的m_Failure(m1)开始,查找包含字符c对应子节点的节点,
// 如果m1找不到,则到m1的m_Failure查找,依次类推
state = r.m_Failure;
while (!state.m_Goto.containsKey(c))
{
state = state.m_Failure;
if (state == StartNode)
{
break;
}
}
// 如果找到了,设置该子节点的m_Failure为找到的目标节点(m2),
// 并把m2对应的关键词列表添加到该子节点中
if (state.m_Goto.containsKey(c))
{
r.m_Goto.get(c).m_Failure = state.m_Goto.get(c);
for (String str : r.m_Goto.get(c).m_Failure.m_Output)
{
r.m_Goto.get(c).m_Output.add(str);
}
}
else
{ //找不到,设置该子节点的m_Failure为初始节点
r.m_Goto.get(c).m_Failure = StartNode;
}
}
}
}
};
二、自定义UDF函数
import org.apache.hadoop.hive.ql.exec.UDF;
import sun.rmi.runtime.Log;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashSet;
public class KeywordsUDF extends UDF
{
public static Trie trie;
public KeywordsUDF()
{
trie=new Trie();
try
{
InputStream is=this.getClass().getResourceAsStream("/resource/dic.txt");
BufferedReader br=new BufferedReader(new InputStreamReader(is));
String line=br.readLine();
int i=0;
while(null!=line)
{
i++;
trie.AddKeyword(line.trim().replace(" ",""));
line=br.readLine();
}
br.close();
trie.RebuildTrie();
}
catch(Exception e){
e.printStackTrace();
}
}
public String evaluate(String comment)
{
if(comment!=null&&!comment.isEmpty()) {
HashSet<String> hashSet = new HashSet<>();
hashSet = trie.SearchSet(comment);
if (hashSet.size() > 0) {
return "bad";
} else return "pass";
}
else return "null";
}
}
三、字典文件
dic.txt(dic.dic也行),字典里面关键词就是按Windows的回车换行分隔,将字典文件放到项目的resource目录下【没有resource或者不想放那的话,等打包后自己手动把字典后加到对应文件夹下】。
四、准备上传的jar包
把将你的工程【包含字典树类和udf的项目】打成jar包,一定要保证jar包里有你的字典!!!
五、上传使用
项目内打JAR包,上传HDFS集群
ADD jar /home/hadoop_hive/zhangjinke/TestAnalysis-test.jar;
CREATE TEMPORARY FUNCTION is_Get AS 'KeywordsUDF'; -- 这里的KeywordsUDF类我是直接放到了java文件夹下的
select is_Get('包含关键词的语句'); -- 输出bad
-- select is_Get('未包含字典中任何一关键词的字符串'); -- 输出pass
六、注意事项
udf函数中的路径默认从本地机器读取,hive是分布式的,所以你要把字典【dic.txt】上传到每一个机器上,不易操作
那么我们就直接放到jar包中,使用 InputStream is=this.getClass().getResourceAsStream("/resource/dic.txt");
将字典确保每台机器顺利从有jar包【包里有字典】的机器上拿字典及编译后的字节码文件,这种方法避免了出现找不到文件路径的错误。
Comments NOTHING