Корпус переписки Enron — подготовка данных к анализу

Один из самых громких крахов начала 2000-х годов — банкротство компании Enron, принес довольно удивительную пользу для технологий анализа данных, естественных языков и социальных сетей. Стал доступен корпус переписки сотрудников компании Enron, состоящий из более чем 600 тысяч сообщений. Такой объем реальных данных о жизни компании просто уникален и бесценен, особенно, учитывая полную легальность использования содержимого корпуса. Копия корпуса была приобретена за 10 000$ Andrew McCallum, сейчас все данные находятся в открытом доступе. Тем, кого интересут подробности истории компании Enron рекомендую книгу Hedge Hogs: The Cowboy Traders Behind Wall Street’s Largest Hedge Fund Disaster .

Увлекательнейшая книга Mining the social Web описывает пример использования документно-ориентированной базы данных MongoDb для обработки и анализа писем.

Итак, на сайте доступен полный архив корпуса сообщений. Архив распаковываем в удобный каталог (это займет некоторое время) — в коде Python каталог задан переменной MAILDIR.

Так как с исходным огромным количеством файлов работать неудобно, то преобразуем данные сообщений в стандартный почтовый формат Unix MailBox.

Устанавливаем пакет для работы с датами:

sudo pip install python_dateutil

import re
import email
from time import asctime
import os
import sys
from dateutil.parser import parse

MAILDIR = '/home/aleksandr/enron/data/maildir/'
MBOX = '/home/aleksandr/enron/data/enron.mbox'

mbox = open(MBOX, 'w')

test = 'inbox'

for (root, dirs, file_names) in os.walk(MAILDIR):
    lastFolder = root.split(os.sep)[-1].lower()
    if lastFolder == 'inbox':
        for file_name in file_names:
            file_path = os.path.join(root, file_name)
            message_text = open(file_path).read()

            _from = re.search("From: ([^r]+)", message_text).groups()[0]
            _date = re.search("Date: ([^r]+)", message_text).groups()[0]

            _date = asctime(parse(_date).timetuple())

            msg = email.message_from_string(message_text)
            msg.set_unixfrom('From %s %s' % (_from, _date))

            mbox.write(msg.as_string(unixfrom=True) + "nn")

mbox.close()

Сконвертируем в формат JSON, в котором будет происходить работа с базой MongoDB. Устанавливаем BeatifulSoup — для очистки текста:

apt-get install python-bs4

from mercurial.templatefilters import json

__author__ = 'aleksandr'

import mailbox
import email
import json
import time
import quopri
#from BeatifulSoup import BeatifulSoup
from bs4 import BeautifulSoup
from dateutil.parser import   parse

MBOX = '/home/aleksandr/enron/data/enron.mbox'
OUTFILE = '/home/aleksandr/enron/data/enron.json'

def cleanContent(msg):
    msg = quopri.decodestring(msg)

    try:
        soup = BeautifulSoup(msg,)
    except:
        return ''

    result =''.join(soup.findAll(text = True));
    return result

class Encoder(json.JSONDecoder):


    def default(self, o):

        return list(o)

    def to_JSON(self, msg):
        return json.dumps(msg, default=lambda o: o.__dict__, sort_keys = True, indent = 4)

def jsonifyMessage( msg):
    json_msg = {'parts' : []}

    for (k, v) in msg.items():
        json_msg[k] = v.decode('utf-8', 'ignore')

        for k in ['To', 'Cc', 'Bcc']:
            if not json_msg.get(k):
                continue

            json_msg [k] = json_msg[k].replace('n','').replace('t','').decode('utf-8','ignore').split(',')

        for part in msg.walk():
            json_part = {}

            if part.get_content_maintype() == 'multipart':
                continue

            json_part['contentType' ]= part.get_content_type()
            content = part.get_payload(decode=False).decode('utf-8','ignore')
            json_part['content'] = cleanContent(content)

            json_msg['parts'].append(json_part)

        #then = parse(json_msg['Date'])
        #millis = int(time.mktime(then.timetuple())*1000 +then.microsecond/1000)
        #json_msg['Date'] = {'$date' : millis}
        return json_msg

def gen_json_msgs( mb):
    while 1:
        msg = mb.next()
        if msg is None:
            break
        yield jsonifyMessage(msg)



mbox = mailbox.UnixMailbox(open(MBOX,'rb'),email.message_from_file)

f = open (OUTFILE,'w')
f.write('[')
first = True
for msg in gen_json_msgs(mbox):
    if msg!=None :
        encoder = Encoder()
        jsonText = encoder.to_JSON(msg)
        #json = json.dumps(msg, cls = Encoder, indent = 1
        if first==False:
            f.write(',')
        f.write(jsonText + 'n')
        first = False

f.write(']')
f.close()

В результате должен сформироваться текстовый файл в формате JSON примерно следующего вида:

Стоит сказать пару слов о MongoDB. MongoDB является документно-ориентированной СУБД и относится к NoSQL системам, так как не является реляционной базой данных. Большим преимуществом является возможность работы без задания строгой схемы хранения данных(schemaless), достаточно импортировать документы JSON в нужную коллекцию и MongoDB автоматически позволит производить запросы над документами. В дальнейшем при появлении у записей новых полей(колонок данных) достаточно обновить нужные записи. Одним из немаложных достоинства решения MongoDB является полнотекстовый поиск, а также удобство репликации и горизонтального масштабирования на множество серверов.

MongoDB достаточно просто установить практически на любую операционную систему. Например, для Ubuntu инструкция по установке представлена по этому адресу. Я рекомендую сразу установить какую либо оболочку, удобную для составления запросов и наглядного просмотра данных, например, Robomongo. Robomongo — оболочка, на основе JavaScript для управления mongodb. Данное средство позволяет наглядно просматривать содержимое коллекций, делать запросы с автодополнением.

После установки mongodb можно запустить командой sudo service mongodb start. Для отладки может быть удобно запустить прямо из консоли с указанием пути, где будут храниться данные:

mongod --dbpath /home/aleksandr/enron/mongodb

Теперь можно импортировать данные в MongoDB, предварительно установив пакет для работы с mongodb из python (sudo pip install pymongo).


JSON_FILE = '/home/aleksandr/enron/data/enron.json'

import  sys
from  pymongo import Connection
from pymongo.errors import ConnectionFailure
import json

def main():

    try:
        c = Connection(host="localhost", port = 27017)
    except ConnectionFailure, e:
        sys.stderr.write("Could not connect: %s " % e)

    dbh = c["enron"]

    assert dbh.connection == c
    print "Successfully set up db handle"

    emails = dbh.emails
    emails.create_index("Message-ID", unique = True) #: 1}, {"unique" : "true"}
    # inserting
    doc = {
        "Message-ID" : "Jane",
        "content" : "Hello"
    }

    try:
        emails.insert(doc, safe = True)
    except:
        print "Error ", sys.exc_info()[0];

    with open(JSON_FILE, "r" ) as  myfile:
        data = myfile.read()
    jmails = json.loads(data)

    for item in jmails:
        try:
            emails.insert(item)
        except:
            print "Error", sys.exc_info()[0]


if __name__ == "__main__":
    main()

Для того, чтобы избежать дублирования записей при повторном запуске импорта данных в MongoDB был создан индекс по идентификатору MessageID, при повторной вставке сообщений будет просто сгенерировано исключений, если необходимо, можно в таком случае произвести обновление полей существующей записи с этим идентификатором.

Теперь данные подготовлены для обработки. В следующих статьях мы начнем анализ и классификацию сообщений.

Материалы:
Выбор оболочки для администрирования MongoDB
Руководство по MongoDB


Комментарии:

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *