from dataclasses import asdict from typing import List from dacite import from_dict import json import requests import sqlite3 from twitter_v2.types import TweetSearchResponse, DMEventsResponse, UserSearchResponse class ArchiveTweetSource: """ id, created_at, retweeted, favorited, retweet_count, favorite_count, full_text, in_reply_to_status_id_str, in_reply_to_user_id, in_reply_to_screen_nam """ def __init__ (self, archive_path, db_path = ".data/tweet.db", archive_user_id = None): self.archive_path = archive_path self.user_id = archive_user_id self.db_path = db_path return def get_db (self): db = sqlite3.connect(self.db_path) return db def get_user_timeline (self, author_id = None, max_results = 10, since_id = None): if max_results == None: max_results = -1 sql_params = [] where_sql = [] # if the ID is not stored as a number (eg. string) then this could be a problem if since_id: where_sql.append("cast(id as integer) > ?") sql_params.append(since_id) #if author_id: # where_sql.append("author_id = ?") # sql_params.append(author_id) where_sql = " and ".join(where_sql) sql_cols = "id, created_at, retweeted, favorited, retweet_count, favorite_count, full_text, in_reply_to_status_id_str, in_reply_to_user_id, in_reply_to_screen_name" if author_id: sql_cols += ", '{}' as author_id".format(author_id) if where_sql: where_sql = "where {}".format(where_sql) sql = "select {} from tweet {} order by cast(id as integer) asc limit ?".format(sql_cols, where_sql) sql_params.append(max_results) results = self.search_tweets_sql(sql, sql_params) return results def get_tweet (self, id_): tweets = self.get_tweets([id_]) if len(tweets): return tweets[0] def get_tweets (self, ids): sql_params = [] where_sql = [] ids_in_list_sql = "id in ({})".format( ','.join(['?'] * len(ids))) where_sql.append(ids_in_list_sql) sql_params += ids where_sql = " and ".join(where_sql) sql = "select * from tweet where {}".format(where_sql) results = self.search_tweets_sql(sql, sql_params) results.sort(key=lambda t: ids.index(t['id'])) return results def search_tweets_sql (self, sql, sql_params = [] ): with self.get_db() as db: cur = db.cursor() cur.row_factory = sqlite3.Row results = list(map(dict, cur.execute(sql, sql_params).fetchall())) print(f'search_tweets_sql {len(results)}') return results