构建Huffman Tree来压缩数据
2016-06-28
喝牛奶的鸵鸟
Huffman Tree的主要应用在数据压缩和通信等方面,权重高(出现次数多)的字符靠近根节点,权重低的字符远离根节点,这样编码为可变字长。
代码:
#优先队列 priorityqueue.py
class HeapNode:
def __init__(self,value,level):
self.value = value
self.level = level
class PriorityQueue:
def __init__(self):
self.heapList = [HeapNode(None,None)]
self.size = 0
def buildQueue(self,dic):
print(f'构建{dic}的优先队列......')
i = len(dic) // 2
self.size = len(dic)
for k,v in dic.items():
node = HeapNode(k,v)
self.heapList.append(node)
while (i > 0):
self.adjustDown(i)
i = i - 1
def adjustDown(self,i):
while (i * 2) <= self.size:
mlc = self.maxLevelChild(i)
if self.heapList[i].level < self.heapList[mlc].level:
self.heapList[i],self.heapList[mlc] = self.heapList[mlc],self.heapList[i]
i = mlc
def maxLevelChild(self,i):
if i * 2 + 1 > self.size:
return i * 2
else:
if self.heapList[i*2].level > self.heapList[i*2+1].level:
return i * 2
else:
return i * 2 + 1
def enqueue(self,value,level):
node = HeapNode(value,level)
self.heapList.append(node)
self.size = self.size + 1
self.adjustUp(self.size)
print(f'{value}入队,优先级为:{level}')
def adjustUp(self,i):
while i // 2 > 0:
if self.heapList[i].level > self.heapList[i // 2].level:
self.heapList[i],self.heapList[i // 2] = self.heapList[i // 2],self.heapList[i]
i = i // 2
def dequeue(self):
pop = self.heapList[1]
self.heapList[1] = self.heapList[self.size]
self.size = self.size - 1
self.heapList.pop()
self.adjustDown(1)
print(f'{pop.value}出队,优先级为:{pop.level}')
if __name__ == '__main__':
pq = PriorityQueue()
pq.buildQueue({'数据a':1,'数据b':2,'数据c':3,'数据d':4,'数据e':5})
pq.enqueue('数据f', 6)
pq.dequeue()
pq.dequeue()
pq.dequeue()
pq.enqueue('数据g', 7)
pq.dequeue()
pq.dequeue()
from priorityqueue import PriorityQueue
from collections import defaultdict
class Node:
def __init__(self, character, weight):
self.parent = None
self.leftChild = None
self.rightChild = None
self.character = character
self.weight = weight
self.huffmancode = None
self.isLeaf = False
class Huffman:
def __init__(self, message):
self.message = message
self.pq = PriorityQueue()
self.root = None
self.huffmanEncode = defaultdict(list)
self.huffmanDecode = defaultdict(list)
dic = defaultdict(lambda: 0)
for c in self.message:
dic[c] += 1
for k in dic.keys():
node = Node(k, dic[k])
self.pq.enqueue(node, dic[k])
def createHuffmanTree(self):
while self.pq.size > 0:
if self.pq.size == 1:
root = self.pq.dequeue()
break
nd1 , nd2 = self.pq.dequeue() , self.pq.dequeue()
nd1.huffmancode , nd2.huffmancode = '0' , '1'
if not nd1.leftChild:
nd1.isLeaf = True
if not nd2.rightChild:
nd2.isLeaf = True
root = Node(None, nd1.weight + nd2.weight)
root.leftChild , root.rightChild = nd1 , nd2
nd1.parent , nd2.parent = root , root
self.pq.enqueue(root, root.weight)
self.dfs(root)
def dfs(self, node):
if node:
self.dfs(node.leftChild)
self.dfs(node.rightChild)
if node.isLeaf:
code = ''
temp = node
while temp.parent:
code = ''.join([code, temp.huffmancode])
temp = temp.parent
self.huffmanEncode.setdefault(node.character,code[::-1])
self.huffmanDecode.setdefault(code[::-1],node.character)
def encode(self):
return ''.join([self.huffmanEncode[i] for i in self.message])
def decode(self, ecs):
decode = ''
temp = ''
for i in ecs:
temp = ''.join([temp, i])
if temp in self.huffmanDecode.keys():
decode = ''.join([decode, self.huffmanDecode[temp]])
temp = ''
return decode
def compress(self,ec):
result = ''
start = 0
i = 16
while i <= len(ec):
result = ' '.join([result, str(int(ec[i - 16:i], 2))])
start = i
i = i + 16
if len(ec) % 16 != 0:
result = ' '.join([result, '123456789', ec[start:]])
return result
def decompression(self,ectext):
result = ''
ecList = ectext.split()
for i in ecList:
ec = int(i)
if ec == 123456789:
result = ''.join([result, ecList[-1]])
break
else:
result = ''.join([result, bin(ec).replace('0b', '').rjust(16, '0')])
result = self.decode(result)
return result
if __name__ == '__main__':
print('开始构建Huffman Tree...')
with open('源文件.txt', 'r') as f:
hfm = Huffman(f.read())
hfm.createHuffmanTree()
print('Huffman Tree构建完毕!')
print('开始压缩...')
ec = hfm.encode()
result = hfm.compress(ec)
with open('压缩文件', 'w') as f:
f.write(result.strip())
print('压缩完毕!')
print('开始解压...')
with open('压缩文件', 'r') as f:
codetext = f.read()
result2 = hfm.decompression(codetext)
with open('解压文件.txt', 'w') as f:
f.write(result2)
print('解压完毕!')
如果文件中存在大量重复的字符效果很明显,由于Python运行效率低,所以压缩1M左右的文件作为演示,要想压缩大文件那慢的不得了,不过构建Huffman树还是很快的,这不是问题完全可以用C重写,方法是一样的。