错误日志:
File "/root/ssbc-master/workers/metadata.py", line 155, in save_metadata
dbcurr.connection.commit()
File "/usr/lib64/python2.7/site-packages/pymysql/connections.py", line 787, in commit
self._execute_command(COMMAND.COM_QUERY, "COMMIT")
File "/usr/lib64/python2.7/site-packages/pymysql/connections.py", line 1071, in _execute_command
raise err.InterfaceError("(0, '')")
InterfaceError: (0, '')
Traceback (most recent call last):
File "/root/ssbc-master/workers/metadata.py", line 155, in save_metadata
dbcurr.connection.commit()
File "/usr/lib64/python2.7/site-packages/pymysql/connections.py", line 787, in commit
self._execute_command(COMMAND.COM_QUERY, "COMMIT")
File "/usr/lib64/python2.7/site-packages/pymysql/connections.py", line 1071, in _execute_command
raise err.InterfaceError("(0, '')")
InterfaceError: (0, '')
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
File "simdht_worker.py", line 351, in run
self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
File "/usr/lib/python2.7/site-packages/DBUtils/SteadyDB.py", line 580, in tough_method
result = method(*args, **kwargs) # try to execute
File "/usr/lib64/python2.7/site-packages/pymysql/cursors.py", line 166, in execute
result = self._query(query)
File "/usr/lib64/python2.7/site-packages/pymysql/cursors.py", line 322, in _query
conn.query(q)
File "/usr/lib64/python2.7/site-packages/pymysql/connections.py", line 855, in query
self._execute_command(COMMAND.COM_QUERY, sql)
File "/usr/lib64/python2.7/site-packages/pymysql/connections.py", line 1071, in _execute_command
raise err.InterfaceError("(0, '')")
InterfaceError: (0, '')
metadata.py:
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import traceback
import pygeoip
import threading
import socket
import sys
import hashlib
import datetime
import time
import json
import metautils
from bencode import bencode, bdecode
geoip = pygeoip.GeoIP('GeoIP.dat')
def decode(encoding, s):
if type(s) is list:
s = ';'.join(s)
u = s
for x in (encoding, 'utf8', 'gbk', 'big5'):
try:
u = s.decode(x)
return u
except:
pass
return s.decode(encoding, 'ignore')
def decode_utf8(encoding, d, i):
if i+'.utf-8' in d:
return d[i+'.utf-8'].decode('utf8')
return decode(encoding, d[i])
def parse_metadata(data):
info = {}
encoding = 'utf8'
try:
torrent = bdecode(data)
if not torrent.get('name'):
return None
except:
return None
try:
info['create_time'] = datetime.datetime.fromtimestamp(float(torrent['creation date']))
except:
info['create_time'] = datetime.datetime.utcnow()
if torrent.get('encoding'):
encoding = torrent['encoding']
if torrent.get('announce'):
info['announce'] = decode_utf8(encoding, torrent, 'announce')
if torrent.get('comment'):
info['comment'] = decode_utf8(encoding, torrent, 'comment')[:200]
if torrent.get('publisher-url'):
info['publisher-url'] = decode_utf8(encoding, torrent, 'publisher-url')
if torrent.get('publisher'):
info['publisher'] = decode_utf8(encoding, torrent, 'publisher')
if torrent.get('created by'):
info['creator'] = decode_utf8(encoding, torrent, 'created by')[:15]
if 'info' in torrent:
detail = torrent['info']
else:
detail = torrent
info['name'] = decode_utf8(encoding, detail, 'name')
if 'files' in detail:
info['files'] = []
for x in detail['files']:
if 'path.utf-8' in x:
v = {'path': decode(encoding, '/'.join(x['path.utf-8'])), 'length': x['length']}
else:
v = {'path': decode(encoding, '/'.join(x['path'])), 'length': x['length']}
if 'filehash' in x:
v['filehash'] = x['filehash'].encode('hex')
info['files'].append(v)
info['length'] = sum([x['length'] for x in info['files']])
else:
info['length'] = detail['length']
info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()
if 'profiles' in detail:
info['profiles'] = detail['profiles']
return info
def save_metadata(dbcurr, binhash, address, start_time, data, blacklist):
utcnow = datetime.datetime.utcnow()
name = threading.currentThread().getName()
try:
info = parse_metadata(data)
if not info:
return
except:
traceback.print_exc()
return
info_hash = binhash.encode('hex')
info['info_hash'] = info_hash
# need to build tags
info['tagged'] = False
info['classified'] = False
info['requests'] = 1
info['last_seen'] = utcnow
info['source_ip'] = address[0]
for item in blacklist:
if str(item) in info['name']:
return
if info.get('files'):
files = [z for z in info['files'] if not z['path'].startswith('_')]
if not files:
files = info['files']
else:
files = [{'path': info['name'], 'length': info['length']}]
files.sort(key=lambda z:z['length'], reverse=True)
bigfname = files[0]['path']
info['extension'] = metautils.get_extension(bigfname).lower()
info['category'] = metautils.get_category(info['extension'])
if info['category'] == 'software':
return
if info['category'] == 'other':
return
if 'files' in info:
try:
dbcurr.execute('INSERT INTO search_filelist VALUES(%s, %s)', (info['info_hash'], json.dumps(info['files'])))
except:
print name, 'insert error', sys.exc_info()[1]
del info['files']
try:
try:
print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0], geoip.country_name_by_addr(address[0]),
except:
print '\n', 'Saved', info['info_hash'], sys.exc_info()[1]
try:
ret = dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' +
'length,create_time,last_seen,requests,comment,creator) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
(info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],
info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests'],
info.get('comment',''), info.get('creator','')))
except:
print 'insert search_hash err: ',info['info_hash']
dbcurr.connection.commit()
except:
print name, 'save error', info
traceback.print_exc()
return
simdht_worker.py:
def run(self):
self.name = threading.currentThread().getName()
print self.name, 'started'
while True:
while self.metadata_queue.qsize() > 0:
self.got_torrent()
address, binhash, dtype = self.queue.get()
if binhash in self.visited:
continue
if len(self.visited) > 100000:
self.visited = set()
self.visited.add(binhash)
self.n_reqs += 1
info_hash = binhash.encode('hex')
utcnow = datetime.datetime.utcnow()
date = (utcnow + datetime.timedelta(hours=8))
date = datetime.datetime(date.year, date.month, date.day)
# Check if we have this info_hash
self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
y = self.dbcurr.fetchone()
if y:
self.n_valid += 1
# 更新最近发现时间,请求数
self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
else:
if dtype == 'pt':
t = threading.Thread(target=simMetadata.download_metadata, args=(address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
self.n_downloading_pt += 1
elif dtype == 'lt' and self.n_downloading_lt < MAX_QUEUE_LT:
t = threading.Thread(target=ltMetadata.download_metadata, args=(address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
self.n_downloading_lt += 1
if self.n_reqs >= 1000:
self.dbcurr.execute('INSERT INTO search_statusreport(date,new_hashes,total_requests, valid_requests) VALUES(%s,%s,%s,%s) ON DUPLICATE KEY UPDATE ' +
'total_requests=total_requests+%s, valid_requests=valid_requests+%s, new_hashes=new_hashes+%s',
(date, self.n_new, self.n_reqs, self.n_valid, self.n_reqs, self.n_valid, self.n_new))
self.dbconn.commit()
print '\n', time.ctime(), 'n_reqs', self.n_reqs, 'n_valid', self.n_valid, 'n_new', self.n_new, 'n_queue', self.queue.qsize(),
print 'n_d_pt', self.n_downloading_pt, 'n_d_lt', self.n_downloading_lt,
self.n_reqs = self.n_valid = self.n_new = 0
def log_announce(self, binhash, address=None):
self.queue.put([address, binhash, 'pt'])
def log_hash(self, binhash, address=None):
if not lt:
return
if is_ip_allowed(address[0]):
return
if self.n_downloading_lt < MAX_QUEUE_LT:
self.queue.put([address, binhash, 'lt'])
google搜索了没有找到相应内容,请问怎么修复
dbcurr.commit()就行了吧,为什么要加一个connection呢?