Source code for src.data._transform
"""
Preprocessing functions to convert raw .json tweet data into a clean(er)
dataframe.
"""
import numpy as np
import pandas as pd
import logging
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)
def merge_dataframes(
df_users,
df_tweets,
verbose=False
):
"""
Merges users and tweets dataframe to input into elasticsearch.
Parameters
----------
df_users : DataFrame
Dataframe with all users from the data.
df_tweets : DataFrame
Dataframe with all tweets pulled.
verbose : int or bool, default=False
Controls the verbosity when pulling data.
Returns
-------
df_merged : DataFrame
Dataframe with the dataframes joined on user_id so every
row has the tweet and the user information.
"""
logger = logging.getLogger(__name__)
logger.propagate = verbose
logger.info('Merging datframes for ES')
df_merged = pd.merge(
df_users, df_tweets, left_on="id_str", right_on="user_id_str"
)
df_merged = df_merged.rename(columns={
'id_str_x': 'user_id',
'id_str_y': 'tweet_id',
'created_at_x': 'user_created_at',
'created_at_y': 'tweet_created_at',
})
date_format = '%a %b %d %H:%M:%S %z %Y'
df_merged['user_created_at'] = pd.to_datetime(
df_merged['user_created_at'], format=date_format, errors='ignore'
)
df_merged = df_merged.where(df_merged.notnull(), None)
return df_merged
[docs]def transform(json_path, verbose=False):
"""
Converts a raw .json file containing Tweets' data into a clean(er)
dataset.
Parameters
----------
json_path : str
Path to json file containing the pertinent data.
verbose : int or bool, default=False
Controls the verbosity when pulling data.
Returns
-------
df_merged : DataFrame
Dataframe with the dataframes joined on user_id so every
row has the tweet and the user information.
"""
logger = logging.getLogger(__name__)
logger.propagate = verbose
logger.info('Reading json file')
df = pd.read_json(json_path, lines=True)
# add the original tweets that are missing in the dataset
new_tweets = df['retweeted_status'].dropna().tolist()\
+ df['quoted_status'].dropna().tolist()
df = pd.concat([df, pd.DataFrame(new_tweets)]).drop_duplicates('id_str')
df_users = pd.DataFrame(df['user'].tolist())
logger.info('Dropping irrelevant columns')
df_tweets = df.drop(columns=[
'id', 'in_reply_to_status_id', 'in_reply_to_user_id', 'user',
'coordinates', 'place', 'quoted_status_id', 'favorited',
'retweeted', 'retweeted_status', 'matching_rules', 'geo',
'filter_level', 'display_text_range', 'contributors',
'quoted_status', 'quoted_status_id', 'quoted_status_permalink',
'in_reply_to_screen_name', 'text', 'extended_tweet', 'truncated',
])
# drop columns with 100% missing values
all_missing = df_users.columns[
(df_users.isna().sum(0) / df_users.shape[0]) == 1
].tolist()
df_users = df_users.drop(columns=[
'id', 'url', 'default_profile', 'profile_image_url',
'profile_image_url_https', 'profile_banner_url',
'profile_background_image_url', 'profile_background_image_url_https',
'profile_background_tile', 'profile_link_color',
'profile_sidebar_fill_color', 'profile_text_color',
'profile_use_background_image', 'default_profile_image',
'translator_type', 'contributors_enabled', 'is_translator',
'profile_background_color', 'profile_sidebar_border_color'
]+all_missing).drop_duplicates(subset='id_str', keep='first')
logger.info('Adding ID columns and retrieving full text of all tweets')
df_tweets['user_id_str'] = df['user'].apply(lambda x: x['id_str'])
def get_full_text(row):
if (not row['truncated']) and type(row['retweeted_status']) != dict:
return row['text']
elif type(row['retweeted_status']) != dict:
return row['extended_tweet']['full_text']
elif row['retweeted_status']['truncated']:
return row['retweeted_status']['extended_tweet']['full_text']
else:
return row['retweeted_status']['text']
df_tweets['full_text'] = df.apply(get_full_text, axis=1)
def get_retweet_id(row):
"""returns: is_retweet, original_tweet_id_str"""
if type(row['retweeted_status']) == dict:
return True, row['retweeted_status']['id_str']
else:
return False, np.nan
df_tweets['is_retweet'], df_tweets['original_tweet_id_str'] = \
zip(*df.apply(get_retweet_id, axis=1))
df_tweets['is_reply'] = ~df['in_reply_to_status_id'].isna()
df_tweets['created_at'] = pd.to_datetime(
df_tweets['created_at'].reset_index(drop=True), utc=True
)
logger.info('Extracting some basic info from users dataframe')
df_derived = pd.DataFrame(
df_users.derived.apply(
lambda x: x['locations'][0] if type(x) == dict else {}
).tolist()
)
df_geo = pd.DataFrame(
df_derived.geo.apply(
lambda x: x if type(x) == dict else {}
).tolist()
)
df_derived = pd.concat([df_derived, df_geo], axis=1).drop(columns='geo')
logger.info('Concatenating extracted features from users dataframe')
df_users = pd.concat(
[df_users, df_derived.rename(
columns={i: f'users_derived_{i}' for i in df_derived.columns})],
axis=1
).drop(columns='derived')
logger.info('Merging tweets and users dataframe')
return merge_dataframes(
df_users,
df_tweets
).drop_duplicates(subset='tweet_id')