123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- from dataclasses import asdict
- from typing import List
- from dacite import from_dict
- import json
- import requests
- import sqlite3
- from twitter_v2.types import TweetSearchResponse, DMEventsResponse, UserSearchResponse
- class ArchiveTweetSource:
- """
- id, created_at, retweeted, favorited, retweet_count, favorite_count, full_text, in_reply_to_status_id_str, in_reply_to_user_id, in_reply_to_screen_nam
- """
- def __init__ (self, archive_path, db_path = ".data/tweet.db", archive_user_id = None):
- self.archive_path = archive_path
- self.user_id = archive_user_id
- self.db_path = db_path
- return
-
- def get_db (self):
- db = sqlite3.connect(self.db_path)
-
- return db
- def get_user_timeline (self,
- author_id = None, max_results = 10, since_id = None):
-
- if max_results == None:
- max_results = -1
-
-
- sql_params = []
- where_sql = []
-
- # if the ID is not stored as a number (eg. string) then this could be a problem
- if since_id:
- where_sql.append("cast(id as integer) > ?")
- sql_params.append(since_id)
-
- #if author_id:
- # where_sql.append("author_id = ?")
- # sql_params.append(author_id)
-
- where_sql = " and ".join(where_sql)
-
- sql_cols = "id, created_at, retweeted, favorited, retweet_count, favorite_count, full_text, in_reply_to_status_id_str, in_reply_to_user_id, in_reply_to_screen_name"
-
- if author_id:
- sql_cols += ", '{}' as author_id".format(author_id)
-
- if where_sql:
- where_sql = "where {}".format(where_sql)
-
- sql = "select {} from tweet {} order by cast(id as integer) asc limit ?".format(sql_cols, where_sql)
- sql_params.append(max_results)
-
-
- results = self.search_tweets_sql(sql, sql_params)
-
- return results
-
- def get_tweet (self, id_):
- tweets = self.get_tweets([id_])
- if len(tweets):
- return tweets[0]
-
- def get_tweets (self,
- ids):
-
- sql_params = []
- where_sql = []
-
- ids_in_list_sql = "id in ({})".format( ','.join(['?'] * len(ids)))
- where_sql.append(ids_in_list_sql)
- sql_params += ids
-
- where_sql = " and ".join(where_sql)
-
- sql = "select * from tweet where {}".format(where_sql)
-
- results = self.search_tweets_sql(sql, sql_params)
-
- results.sort(key=lambda t: ids.index(t['id']))
-
- return results
-
- def search_tweets_sql (self,
- sql,
- sql_params = []
- ):
-
- with self.get_db() as db:
- cur = db.cursor()
- cur.row_factory = sqlite3.Row
-
-
- results = list(map(dict, cur.execute(sql, sql_params).fetchall()))
-
- print(f'search_tweets_sql {len(results)}')
-
- return results
|