用 Python 实现哈希算法检测重复图片
水之心 关注
0.5142018.11.25 14:25:45字数 1,261阅读 3,363
在 Python 中导入 hashlib
模块,调用函数就可以生成某一个字符串或者文件的哈希值。这个算法对于未被篡改的上传文件非常有效,如果输入数据有细微变化,加密哈希算法都会导致雪崩效应,从而造成新文件的哈希值完全不同于原始文件哈希值。
某些情况下,我们需要检测图片之间的相似性,进行我们需要的处理:删除同一张图片、标记盗版等。
如何判断是同一张图片呢?最简单的方法是使用加密哈希(例如 MD5, SHA-1)判断。但是局限性非常大。例如一个 txt 文档,其 MD5 值是根据这个 txt 的二进制数据计算的,如果是这个 txt 文档的完全复制版,那他们的 MD5 值是完全相同的。但是,一旦改变副本的内容,哪怕只是副本的缩进格式,其 MD5 也会天差地别。比如,下面的两个字符串只是一个 .
符号的差别,MD5 却变化很大:
1 2 3 4 txt = b'The quick brown fox jumps over the lazy dog' print (txt, hashlib.md5(txt).hexdigest())print (txt+b'.' , hashlib.md5(txt+b'.' ).hexdigest())
1 2 b'The quick brown fox jumps over the lazy dog' 9e107d9d372bb6826bd81d3542a419d6 b'The quick brown fox jumps over the lazy dog.' e4d909c290d0fb1ca068ffaddf22cbd0
因此加密哈希只能用于判断两个完全一致、未经修改的文件,如果是一张经过调色或者缩放的图片,根本无法判断其与另一张图片是否为同一张图片。 那么如何判断一张被PS过的图片是否与另一张图片本质上相同呢?比较简单、易用的解决方案是采用感知哈希算法(Perceptual Hash Algorithm)。
感知哈希算法是一类算法的总称,包括 aHash、pHash、dHash。顾名思义,感知哈希不是以严格的方式计算 Hash 值,而是以更加相对的方式计算哈希值,因为“相似”与否,就是一种相对的判定。[1]
aHash:平均值哈希。速度比较快,但是常常不太精确。 pHash:余弦感知哈希。精确度比较高,但是速度方面较差一些。 dHash:差异值哈希。Amazing!精确度较高,且速度也非常快。 我们先看看一张图片:
1 2 3 4 5 6 7 import cv2from IPython.display import Imagefrom matplotlib import pyplot as plt %matplotlib inline img_name = 'E:/Data/URLimg/猫/喜马拉雅猫/27.jpg' Image(img_name)
https://cdn.jsdelivr.net/gh/MuyanGit/pic_url@master/img/202110200802975.png
https://cdn.jsdelivr.net/gh/MuyanGit/pic_url@master/img/image-20211108171149008.png
output_6_0.jpeg
下面我们主要研究以图搜图 ,它最核心的东西就是怎么让电脑识别图片。为了了解以图搜图 ,我们先看看哈希感知算法基本原理:
把图片转成一个可识别的字符串,这个字符串也叫哈希值 和其他图片匹配字符串,通过哈希值计算两张图片的汉明距离(Hamming Distance),通过汉明距离的大小,判断两张图片的相似程度。 ahash 均值哈希算法,
1 2 3 4 5 6 7 8 9 10 11 12 13 def aHash (image_path, hash_size=8 ): ''' get image ahash string ''' img = plt.imread(image_path) gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) resize_img = cv2.resize(gray_img, (hash_size, hash_size)) img_ = resize_img > resize_img.mean() img_bi = '' .join(img_.astype('B' ).flatten().astype('U' ).tolist()) return '' .join(map (lambda x:'%x' % int (img_bi[x:x+4 ],2 ), range (0 ,64 ,4 )))
1 print ('图片的 aHash:' , aHash(img_name))
1 图片的 aHash: bdc1c041767e7ca8
dHash 缩放图片 如果我们要计算上图的 dHash 值,第一步是把它缩放到足够小。为什么需要缩放呢?因为原图的分辨率一般都非常高。一张 ![200 \times 200](https://math.jianshu.com/math?formula=200 \times 200) 的图片,就有整整 万个像素点,每一个像素点都保存着一个 RGB 值, 万个 RGB,是相当庞大的信息量,非常多的细节需要处理。因此,我们需要把图片缩放到非常小,隐藏它的细节部分,只见森林,不见树木。建议缩放为 ,虽然可以缩放为任意大小,但是这个值是相对合理的。而且宽度为 ,有利于我们转换为 hash 值,往下面看,你就明白了。
1 2 3 4 5 6 7 8 img = plt.imread(img_name) hash_size = 8 gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) resize_img = cv2.resize(gray_img, (hash_size+1 , hash_size)) plt.imshow(resize_img) plt.show()
output_11_0.png
具体的流程和 aHash 差不多,只需要将均值改为水平梯度计算即可。该算法计算相邻像素之间的亮度差异并确定相对梯度。感知哈希算法从文件内容的各种特征中获得一个能够灵活分辨不同文件微小区别的多媒体文件指纹。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def dhash (image_path, hash_size=8 ): ''' get image dhash string ''' img = plt.imread(image_path) gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) resize_img = cv2.resize(gray_img, (hash_size+1 , hash_size)) differences = [] for t in range (resize_img.shape[1 ]-1 ): differences.append(resize_img[:, t] > resize_img[:, t+1 ]) img_ = np.stack(differences).T img_bi = '' .join(img_.astype('B' ).flatten().astype('U' ).tolist()) return '' .join(map (lambda x: '%x' % int (img_bi[x:x+4 ], 2 ), range (0 , 64 , 4 )))
为了方便,我将其封装为一个类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 class XHash : ''' 感知 Hash 算法 ''' def __init__ (self, image_path, hash_type ): self.image_path = image_path self.hash_size = 8 self.type = hash_type if self.type == 'aHash' : self.hash = self.__aHash() elif self.type == 'dHash' : self.hash = self.__dHash() def __get_gray (self, img ): ''' 读取 RGB 图片 并转换为灰度图 ''' return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) def __difference (self ): ''' 比较左右像素的差异 ''' img = plt.imread(self.image_path) resize_img = cv2.resize(img, (self.hash_size+1 , self.hash_size)) gray = self.__get_gray(resize_img) differences = [] for t in range (resize_img.shape[1 ]-1 ): differences.append(gray[:, t] > gray[:, t+1 ]) return np.stack(differences).T def __average (self ): ''' 与像素均值进行比较 ''' img = plt.imread(self.image_path) resize_img = cv2.resize(img, (self.hash_size, self.hash_size)) gray = self.__get_gray(resize_img) return gray > gray.mean() def __binarization (self, hash_image ): ''' 二值化 ''' return '' .join(hash_image.astype('B' ).flatten().astype('U' ).tolist()) def __seg (self, hash_image ): img_bi = self.__binarization(hash_image) return '' .join(map (lambda x: '%x' % int (img_bi[x:x+4 ], 2 ), range (0 , 64 , 4 ))) def __aHash (self ): return self.__seg(self.__average()) def __dHash (self ): return self.__seg(self.__difference())class XHash_Haming : ''' 计算两张图片的相似度 ''' def __init__ (self, image_path1, image_path2, hash_type ): self.hash_img1 = XHash(image_path1, hash_type).hash self.hash_img2 = XHash(image_path2, hash_type).hash def hash_haming (self ): ''' 计算两张通过哈希感知算法编码的图片的汉明距离 ''' return np.array([self.hash_img1[x] != self.hash_img2[x] for x in range (16 )], dtype='B' ).sum ()
1 2 3 4 5 import os dir_name = 'E:/Data/URLimg/猫/test/' print (os.listdir(dir_name))
1 [ '16 - 副本.jpg', '6 - 副本.jpg', '6. jpg', '8 - 副本 - 副本.jpg', '8 - 副本.jpg']
1 2 3 import sys sys.path.append('E:/zlab' )from dhash import XHash, XHash_Haming
我们这里有两个副本,我们看看它们的 dHash:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class Pairs : ''' 使用 dHash 实现哈希感知算法 ''' def __init__ (self, root ): if root == None : root = os.getcwd() self.names = { j: os.path.join(root, name) for j, name in enumerate (os.listdir(root)) } self.__hashs = np.array([ XHash(self.names[name], 'dHash' ).hash for name in self.names.keys() ]) self.__cal_haming_distance(self.__hashs) def __cal_haming_distance (self, hashs ): ''' 计算两两之间的距离 ''' j = 0 pairs = {} while j < hashs.shape[0 ]: for i in range (j + 1 , hashs.shape[0 ]): pairs[j] = pairs.get(j, []) + \ [np.array(hashs[i] != hashs[j]).sum ()] continue j += 1 self.pairs = pairs def get_names (self ): n = len (self.pairs) temp = {} while n > 0 : n -= 1 for i, d in enumerate (self.pairs[n]): if d == 0 : temp[n] = temp.get(n, []) + [i + n + 1 ] continue return temp def del_repeat (self ): P = self.get_names() for j in P: for i in P[j]: try : os.remove(self.names[i]) except FileNotFoundError: print ('已经移除,无需再次移除!' ) print ('删除完成!' )``` ```python pairs = Pairs(dir_name) pairs.pairs
1 {0: [15, 15, 13, 13], 1: [0, 16, 16], 2: [16, 16], 3: [0]}
我们可以通过汉明距离判定:0
与 1
、3
与 4
号图片分别是同一张图片,仅仅保留一张,删除重复图片:
1 [ '16 - 副本.jpg', '6 - 副本.jpg', '6. jpg', '8 - 副本 - 副本.jpg', '8 - 副本.jpg']
1 2 pairs.del_repeat() os.listdir(dir_name)
1 [ '16 - 副本.jpg', '6 - 副本.jpg', '8 - 副本 - 副本.jpg']
自动把重复的图片删除了!
1 2 3 {0: 'E:/Data/URLimg/猫/test/16 - 副本.jpg' , 1: 'E:/Data/URLimg/猫/test/6 - 副本.jpg' , 3: 'E:/Data/URLimg/猫/test/8 - 副本 - 副本.jpg' }
该代码被我放在 GitHub ,在不断的改进中。
Erum: https://www.jianshu.com/p/193f0089b7a2 ↩ python 查找重复文件,以及查找重复视频的一些思路
watfe 2020-02-24 19:15:15 996 收藏 2
分类专栏: Python
版权
Python 专栏收录该内容
30 篇文章0 订阅
订阅专栏
查找重复文件(文件大小一致、md5相同) 思路很简单:
找出指定目录及子目录下所有文件 找出大小重复的 进一步确认md5也重复的,则认为是重复文件 这里md5,为了加速计算,没有算文件的完整md5。(之前看到过这种算法,忘了在哪里看来的,大概是用于上传文件时,快速判断是否与已有文件对比验证用的)将文件分成256块,每块取前8个字节计算md5,这样能快速计算出一个大概可以用于判断文件唯一性的md5。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 import osimport reimport timeimport hashlibdef main (): path = 'd:/' fp_arr = file_search(path,repat=r'.*\.mp4' ) du_arr = find_duplicate_file(fp_arr) def file_search (path='.' ,repat = r'.*' ): """ 文件查找: 文件夹及子文件夹下,所有匹配文件,返回list文件列表,绝对路径形式 Args: path: 文件路径(默认当前路径) repat: 文件名正则匹配,不区分大小写(默认匹配所有文件) return: 文件列表(绝对路径) Returns: files_match: 文件列表 """ folders,files = [],[] st = time.time() repat = '^' +repat+'$' for record in os.walk(path): fop = record[0 ] folders.append(fop) for fip in record[2 ]: fip = os.path.abspath(os.path.join(fop,fip)).replace('\\' ,'/' ) files.append(fip) files_match = [] for file in files: a = re.findall(repat,file.lower()) if a: files_match+=a print ('找到{0}个文件' .format (len (files_match))) return files_matchdef fastmd5 (file_path,split_piece=256 ,get_front_bytes=8 ): """ 快速计算一个用于区分文件的md5(非全文件计算,是将文件分成s段后,取每段前d字节,合并后计算md5,以加快计算速度) Args: file_path: 文件路径 split_piece: 分割块数 get_front_bytes: 每块取前多少字节 """ size = os.path.getsize(file_path) block = size//split_piece h = hashlib.md5() if size < split_piece*get_front_bytes: with open (file_path, 'rb' ) as f: h.update(f.read()) else : with open (file_path, 'rb' ) as f: index = 0 for i in range (split_piece): f.seek(index) h.update(f.read(get_front_bytes)) index+=block return h.hexdigest()def find_duplicate_file (fp_arr ): """ 查找重复文件 Args: fp_arr:文件列表 """ d = {} for fp in fp_arr: size = os.path.getsize(fp) d[size]=d.get(size,[])+[fp] l = [] for k in d: if len (d[k])>1 : l.append(d[k]) ll = [] for f_arr in l: d = {} for f in f_arr: fmd5 = fastmd5(f) d[fmd5]=d.get(fmd5,[])+[f] for k in d: if len (d[k])>1 : ll.append(d[k]) print ('查重完毕,发现{0}处重复' .format (len (ll))) for i in ll: print (i) return llif __name__ == '__main__' : main()123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
视频查重(部分完成) 思路:对视频进行抽帧,然后比对是否有关键帧的图片指纹是否一致
这里写一下研究过程,实现代码:
视频抽帧 图像指纹生成 找出包含同样图像指纹的视频 这个过程试过一些方案也都记录一下: 曾经考虑subprocess.Popen()执行ffmpeg抽帧,但是太慢了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 def external_cmd (cmd, msg_in='' ): try : proc = subprocess.Popen(cmd, shell=True , stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout_value, stderr_value = proc.communicate(msg_in) return stdout_value, stderr_value except ValueError as err: return None , None except IOError as err: return None , None '''方法一''' external_cmd('ffmpeg -i "{0}" -r 0.05 -q:v 2 -f image2 ./%08d.000000.jpg' .format (video_path))'''方法二''' timeF = 20 for i in range (1 ,video_duration//timeF): h,m,s = (i*timeF)//3600 , ((i*timeF)%3600 )//60 , (i*timeF)%60 external_cmd('ffmpeg -i "{0}" -ss {1:0=2}:{2:0=2}:{3:0=2} -vframes 1 {4}.jpg' .format (video_path,h,m,s,i)) '''方法三''' timeF = 20 for i in range (1 ,video_duration//timeF): h,m,s = (i*timeF)//3600 , ((i*timeF)%3600 )//60 , (i*timeF)%60 hw = '{0}x{0}' .format (100 ) external_cmd('ffmpeg -i "{0}" -ss {1:0=2}:{2:0=2}:{3:0=2} -vframes 1 -s {5} -f image2 {4}.jpeg' .format (video_path,h,m,s,i,hw))12345678910111213141516171819202122232425262728293031323334353637383940
最后选定的还是cv2抽帧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 这个是一开始想的,将抽到的帧保存为单张图像,发现还是慢。''' # 视频抽帧测试,这种抽帧方式太慢了,1000帧大概45秒长度视频,花费5秒左右 videopath = '01.mp4' vc = cv2.cv2.VideoCapture(videopath) if vc.isOpened(): # 是否正常打开 rval,frame = vc.read() else: rval = False timeF =1000 # 抽帧频率 c = 1 while rval: rval,frame = vc.read() if(c%timeF==0): cv2.imwrite('{0:0=3}.jpg'.format(c),frame) cv2.waitKey(1) c+=1 vc.release() ''' 1234567891011121314151617181920 然后换成了这种,不存图像了,直接将抽到图像计算成dhash保存,总算速度上来了。 v = 'c:/users/kindle/desktop/test/01.mp4' cap = cv2.VideoCapture(v) n_frames = int (cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) dur = n_frames / fps cap.set (cv2.CAP_PROP_POS_MSEC, (5 *1000 )) success, image_np = cap.read() img = Image.fromarray(cv2.cvtColor(image_np,cv2.COLOR_BGR2RGB)) imgrsz = img.resize((100 ,100 )) 12345678910111213
计算图像指纹,直接用了现成的模块,imagehash里的dhash
1 2 h5 = str (imagehash.dhash(imgrsz)) 1
在上述基础上,视频转换为图像指纹组的函数基本如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def video2imageprint (filepath ): """ 返回整个视频的图片指纹列表 从3秒开始,每60秒抽帧,计算一张图像指纹 """ cap = cv2.VideoCapture(filepath) n_frames = int (cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) dur = n_frames / fps *1000 cap_set = 3000 hash_int_arr = [] while cap_set<dur-3000 : cap.set (cv2.CAP_PROP_POS_MSEC, cap_set) success, image_np = cap.read() if success: img = Image.fromarray(cv2.cvtColor(image_np,cv2.COLOR_BGR2RGB)) h = str (imagehash.dhash(img)) hash_arr.append(h) else : print ('fail' ,cap_set/1000 ,filepath) cap_set+=1000 *60 cap.release() return hash_arr12345678910111213141516171819202122232425
然后将建立字典,key为图像指纹,value为地址列表。
1 2 3 4 5 6 7 8 9 10 11 12 db = shelve.open ('videocheck.db' )for h in hash_arr: fp_arr = db.get(h, []) if fp_arr==[]: db[h]=[filepath] elif filepath not in fp_arr: db[h]=db[h]+[filepath] db.close()1234567891011
后面就是检查哪个指纹,对应的地址列表中,大于1个文件。 则说明有多个视频包含该指纹。 为了验证指纹相同的图像是否一致,还写了一个合并图像输出的函数。这个函数写成了这样,是考虑以后可以用作给视频生成多图合并的缩略图玩。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 def imgjoin (imgs,tags=[],width_height=(0 ,0 ),column_row=(0 ,0 ),blank=(0 ,0 ,0 ,0 ,0 ,0 ) ): ''' 多张图片,合并成一张视频抽帧缩略图合并大图那种。 可以每张图片上方加注释,也可以文件顶部只加一行注释。 每张图片宽高,行列间距,四外边距都可以自定义 args: imgs: pil图片数组 tags: 如果标签数和图片数相同,每张图片上方加文字。如果只有一个标签,则只在图片最顶部加1条文字。 width_height: 合并后图片中,每张缩略图宽高,如未指定以第一张图标为基准 column_row:横排和竖排数量 blank_cr:空白分布(列间,行间,左右,上下,标题,标签) return: 返回合并好的图片 ''' from PIL import Image,ImageDraw,ImageFont if len (imgs)>100 : print ('imgs当前上限100张图合并' ) return '' elif imgs==[]: print ('imgs中没有包含图片,请检查' ) return '' elif 1 <len (tags)<len (imgs): print ('tags文字数组和图片对不上,请只输入1条或和图片一样多' ) return '' else : pass if column_row==(0 ,0 ): cr = 1 while len (imgs)>cr**2 : cr+=1 column_row=(cr,cr) c,r = column_row if width_height==(0 ,0 ): width_height = imgs[0 ].size for i,m in enumerate (imgs): if m.size!=width_height: imgs[i] = m.resize(width_height) w,h = width_height bw,bh,blr,btb,btitle,btag = blank if blank==(0 ,0 ,0 ,0 ,0 ,0 ): if len (tags)==1 : btitle = h J_width = w*c + bw*(c-1 ) + blr*2 J_height= h*r + bh*(r-1 ) + btb*2 + btitle + btag*r J_img = Image.new('RGB' , (J_width,J_height),(255 ,255 ,255 )) draw=ImageDraw.Draw(J_img) newfont=ImageFont.truetype('simkai.ttf' ,12 ) for i,m in enumerate (imgs): if i==0 : x,y=blr,btb+btitle+btag elif i%c==0 : x,y=blr,y+bh+btag+h else : x,y=x+bw+w,y J_img.paste(m, (x, y, x+w, y+h)) if len (tags)>1 : draw.text((x,y-btag),tags[i],(0 ,0 ,0 ),font=newfont) return J_img12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
到这里最开始的研究就完成了, 最开始的实现思路,就是上面这样。
================
后来发现图像指纹是有可能不是完全一致的, 而是相似的,还要考虑到相似的图像指纹。
imagehash.dhash算出来的图像指纹,本身的type类型不是字符串。 为了保存,转为字符串后,后续计算两个字符串的相似度,哪怕是很简单的字符串每一位是否与另一字符串每一位相等,数以10w个图像指纹,互相计算都要花费很长时间。 计算两个指纹的相似度,试了几种方法效率,最后发现bin最快,这个方法还是从dhash的官网看来的。 2020-5-13 看到还有一种写法是num = 1 - (aHash - bHash)/len(aHash.hash)**2
直接imagehash计算,速度和bin的差不多,推荐使用这个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 '''关于dhash相似度比较方法研究,得到bin的方法计算最快,我的家用电脑10w次大概0.057秒。''' import time a = 'a1a8739f324eb01c' b = 'a1a8749f323eb01c' ai = int (str (a),16 ) bi = int (str (b),16 ) st = time.time()for i in range (100000 ): num = 1 -bin (ai^bi).count("1" )/64 et = time.time()print (num,et-st)123456789101112131415
接下来的考虑思路就是
计算得到相似图像指纹 找到相似指纹对应的视频 检查视频是否有连续相同地方 列出相似视频对比缩略图 1秒比对200w个感觉是挺快 但是1000个,长度为1小时的视频,就需要30分钟比对完。 这个计算量感觉太大,即使写出来,为提高效率可能需要其他算法之类的优化。
关于效率处理这里,并没有完全想好,也没有时间测试,暂时就搁置了。 因为是个人闲暇研究,扔了可能后续就忘了,捡不起来了。 所以这里把之前的研究过程记录一下,希望其他有用到的人能得到一些参考。