Один из самых громких крахов начала 2000-х годов — банкротство компании Enron, принес довольно удивительную пользу для технологий анализа данных, естественных языков и социальных сетей. Стал доступен корпус переписки сотрудников компании Enron, состоящий из более чем 600 тысяч сообщений. Такой объем реальных данных о жизни компании просто уникален и бесценен, особенно, учитывая полную легальность использования содержимого корпуса. Копия корпуса была приобретена за 10 000$ Andrew McCallum, сейчас все данные находятся в открытом доступе. Тем, кого интересут подробности истории компании Enron рекомендую книгу Hedge Hogs: The Cowboy Traders Behind Wall Street’s Largest Hedge Fund Disaster .
Увлекательнейшая книга Mining the social Web описывает пример использования документно-ориентированной базы данных MongoDb для обработки и анализа писем.
Итак, на сайте доступен полный архив корпуса сообщений. Архив распаковываем в удобный каталог (это займет некоторое время) — в коде Python каталог задан переменной MAILDIR.
Так как с исходным огромным количеством файлов работать неудобно, то преобразуем данные сообщений в стандартный почтовый формат Unix MailBox.
Устанавливаем пакет для работы с датами:
sudo pip install python_dateutil
import re import email from time import asctime import os import sys from dateutil.parser import parse MAILDIR = '/home/aleksandr/enron/data/maildir/' MBOX = '/home/aleksandr/enron/data/enron.mbox' mbox = open(MBOX, 'w') test = 'inbox' for (root, dirs, file_names) in os.walk(MAILDIR): lastFolder = root.split(os.sep)[-1].lower() if lastFolder == 'inbox': for file_name in file_names: file_path = os.path.join(root, file_name) message_text = open(file_path).read() _from = re.search("From: ([^r]+)", message_text).groups()[0] _date = re.search("Date: ([^r]+)", message_text).groups()[0] _date = asctime(parse(_date).timetuple()) msg = email.message_from_string(message_text) msg.set_unixfrom('From %s %s' % (_from, _date)) mbox.write(msg.as_string(unixfrom=True) + "nn") mbox.close()
Сконвертируем в формат JSON, в котором будет происходить работа с базой MongoDB. Устанавливаем BeatifulSoup — для очистки текста:
apt-get install python-bs4
from mercurial.templatefilters import json __author__ = 'aleksandr' import mailbox import email import json import time import quopri #from BeatifulSoup import BeatifulSoup from bs4 import BeautifulSoup from dateutil.parser import parse MBOX = '/home/aleksandr/enron/data/enron.mbox' OUTFILE = '/home/aleksandr/enron/data/enron.json' def cleanContent(msg): msg = quopri.decodestring(msg) try: soup = BeautifulSoup(msg,) except: return '' result =''.join(soup.findAll(text = True)); return result class Encoder(json.JSONDecoder): def default(self, o): return list(o) def to_JSON(self, msg): return json.dumps(msg, default=lambda o: o.__dict__, sort_keys = True, indent = 4) def jsonifyMessage( msg): json_msg = {'parts' : []} for (k, v) in msg.items(): json_msg[k] = v.decode('utf-8', 'ignore') for k in ['To', 'Cc', 'Bcc']: if not json_msg.get(k): continue json_msg [k] = json_msg[k].replace('n','').replace('t','').decode('utf-8','ignore').split(',') for part in msg.walk(): json_part = {} if part.get_content_maintype() == 'multipart': continue json_part['contentType' ]= part.get_content_type() content = part.get_payload(decode=False).decode('utf-8','ignore') json_part['content'] = cleanContent(content) json_msg['parts'].append(json_part) #then = parse(json_msg['Date']) #millis = int(time.mktime(then.timetuple())*1000 +then.microsecond/1000) #json_msg['Date'] = {'$date' : millis} return json_msg def gen_json_msgs( mb): while 1: msg = mb.next() if msg is None: break yield jsonifyMessage(msg) mbox = mailbox.UnixMailbox(open(MBOX,'rb'),email.message_from_file) f = open (OUTFILE,'w') f.write('[') first = True for msg in gen_json_msgs(mbox): if msg!=None : encoder = Encoder() jsonText = encoder.to_JSON(msg) #json = json.dumps(msg, cls = Encoder, indent = 1 if first==False: f.write(',') f.write(jsonText + 'n') first = False f.write(']') f.close()
В результате должен сформироваться текстовый файл в формате JSON примерно следующего вида:
Стоит сказать пару слов о MongoDB. MongoDB является документно-ориентированной СУБД и относится к NoSQL системам, так как не является реляционной базой данных. Большим преимуществом является возможность работы без задания строгой схемы хранения данных(schemaless), достаточно импортировать документы JSON в нужную коллекцию и MongoDB автоматически позволит производить запросы над документами. В дальнейшем при появлении у записей новых полей(колонок данных) достаточно обновить нужные записи. Одним из немаложных достоинства решения MongoDB является полнотекстовый поиск, а также удобство репликации и горизонтального масштабирования на множество серверов.
MongoDB достаточно просто установить практически на любую операционную систему. Например, для Ubuntu инструкция по установке представлена по этому адресу. Я рекомендую сразу установить какую либо оболочку, удобную для составления запросов и наглядного просмотра данных, например, Robomongo. Robomongo — оболочка, на основе JavaScript для управления mongodb. Данное средство позволяет наглядно просматривать содержимое коллекций, делать запросы с автодополнением.
После установки mongodb можно запустить командой sudo service mongodb start. Для отладки может быть удобно запустить прямо из консоли с указанием пути, где будут храниться данные: mongod --dbpath /home/aleksandr/enron/mongodb
Теперь можно импортировать данные в MongoDB, предварительно установив пакет для работы с mongodb из python (sudo pip install pymongo).
JSON_FILE = '/home/aleksandr/enron/data/enron.json' import sys from pymongo import Connection from pymongo.errors import ConnectionFailure import json def main(): try: c = Connection(host="localhost", port = 27017) except ConnectionFailure, e: sys.stderr.write("Could not connect: %s " % e) dbh = c["enron"] assert dbh.connection == c print "Successfully set up db handle" emails = dbh.emails emails.create_index("Message-ID", unique = True) #: 1}, {"unique" : "true"} # inserting doc = { "Message-ID" : "Jane", "content" : "Hello" } try: emails.insert(doc, safe = True) except: print "Error ", sys.exc_info()[0]; with open(JSON_FILE, "r" ) as myfile: data = myfile.read() jmails = json.loads(data) for item in jmails: try: emails.insert(item) except: print "Error", sys.exc_info()[0] if __name__ == "__main__": main()
Для того, чтобы избежать дублирования записей при повторном запуске импорта данных в MongoDB был создан индекс по идентификатору MessageID, при повторной вставке сообщений будет просто сгенерировано исключений, если необходимо, можно в таком случае произвести обновление полей существующей записи с этим идентификатором.
Теперь данные подготовлены для обработки. В следующих статьях мы начнем анализ и классификацию сообщений.
Материалы:
Выбор оболочки для администрирования MongoDB
Руководство по MongoDB
Комментарии: