Source code for src.data._load_es

"""
Functions to pre proccess and load data into elasticsearch.

Takes the two data frames produced users and tweets to do a join on user id.
This is done since elasticsearch is a NoSQL database and performs better with
de-normalized (redundant) data

This dataframe is then inserted into the elasticsearch using the columns names
from the pandas.
"""
import logging
from elasticsearch import Elasticsearch, helpers

log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)


def doc_generator(
    df,
    id_col,
    filterKeys
):
    """
    Iterate over the dataframe and yield documents to upload

    Parameters
    ----------
    df : DataFrame
        Pandas DataFrame containing the transformed Twitter data.
    id_col : str
        Column to be used for the id in the database.
    filterKeys : function
        Function to return what you want from the pandas series.

    Returns
    -------
    documents : generator
        Generator with dictionaries to insert into the database in bulk.
    """
    df_iter = df.iterrows()
    for i, document in df_iter:
        doc = {
            "_id": document[id_col],
        }
        doc.update(filterKeys(document))
        yield doc


[docs]def load_es( df_merged, ip_address='localhost', verbose=False ): """ Loads a dataframe into the Elastic Search database. Parameters ---------- df_merged : DataFrame Merged dataframe of tweets and users. ip_address : str Elastic Search database's ip address. Returns ------- actions : int Number of successully executed actions errors : list List of errors """ es = Elasticsearch([ip_address]) # no args, connect to localhost:9200 def filterKeys(document): # just return all the data from each row return document.to_dict() logger = logging.getLogger(__name__) logger.propagate = verbose logger.info('Loading data into Elastic Search') if not verbose: logging.disable(logging.CRITICAL) actions, errors = helpers.bulk( client=es, index='twitter', actions=doc_generator(df_merged, 'tweet_id', filterKeys), stats_only=(not verbose) ) if not verbose: logging.disable(logging.NOTSET) return actions, errors