from dataclasses import asdict from typing import List from dacite import from_dict import json import requests import sqlite3 from twitter_v2.types import TweetSearchResponse, DMEventsResponse, UserSearchResponse # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/curate-a-collection/api-reference/get-collections-entries # we can perhaps steal a token from the TweetDeck Console, otherwise we need to apply for Standard v1.1 / Elevated class ApiV11TweetCollectionSource: def __init__ (self, token): self.token = token def create_collection (self, name): return def bulk_add_to_collection (self, collection_id, items): return def add_to_collection (self, collection_id, item): return def get_collection_tweets (self, collection_id): return class TwitterApiV2SocialGraph: def __init__ (self, token): self.token = token def get_user (self, user_id, is_username=False, return_dataclass=False): # GET /2/users/:id # GET /2/users/by/:username return self.get_users([user_id], is_username, return_dataclass=return_dataclass) def get_users (self, user_ids, are_usernames=False, return_dataclass=False): # GET /2/users/by?usernames= # GET /2/users?ids= user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified", "description", "public_metrics", "protected", "pinned_tweet_id", "url"] params = { 'user.fields' : ','.join(user_fields), } if are_usernames: url = "https://api.twitter.com/2/users/by" params['usernames'] = user_ids else: url = "https://api.twitter.com/2/users" params['ids'] = user_ids headers = { 'Authorization': 'Bearer {}'.format(self.token) } response = requests.get(url, params=params, headers=headers) result = json.loads(response.text) typed_result = from_dict(data_class=UserSearchResponse, data=result) if return_dataclass: return typed_result result = cleandict(asdict(typed_result)) return result def get_following (self, user_id, max_results = 50, pagination_token = None, return_dataclass=False): # GET /2/users/:id/following url = "https://api.twitter.com/2/users/{}/following".format(user_id) user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified"] params = { 'user.fields' : ','.join(user_fields), 'max_results': max_results } if pagination_token: params['pagination_token'] = pagination_token headers = { 'Authorization': 'Bearer {}'.format(self.token) } response = requests.get(url, params=params, headers=headers) result = json.loads(response.text) typed_result = from_dict(data_class=UserSearchResponse, data=result) if return_dataclass: return typed_result result = cleandict(asdict(typed_result)) return result def get_followers (self, user_id, max_results = 50, pagination_token = None, return_dataclass=False): # GET /2/users/:id/followers url = "https://api.twitter.com/2/users/{}/followers".format(user_id) user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified", "description", "public_metrics", "protected", "pinned_tweet_id", "url"] params = { 'user.fields' : ','.join(user_fields), 'max_results': max_results } if pagination_token: params['pagination_token'] = pagination_token headers = { 'Authorization': 'Bearer {}'.format(self.token) } response = requests.get(url, params=params, headers=headers) result = json.loads(response.text) typed_result = from_dict(data_class=UserSearchResponse, data=result) if return_dataclass: return typed_result result = cleandict(asdict(typed_result)) return result def follow_user (self, user_id, target_user_id): # POST /2/users/:id/following # {target_user_id} return def unfollow_user (self, user_id, target_user_id): # DELETE /2/users/:source_user_id/following/:target_user_id return class ApiV2ConversationSource: def __init__ (self, token): self.token = token def get_recent_events (self, max_results = None, pagination_token = None): # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events url = "https://api.twitter.com/2/dm_events" tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"] media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"] user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"] params = { "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments", "expansions": ",".join(["sender_id", "participant_ids", "referenced_tweets.id", "attachments.media_keys"]), "user.fields": ",".join(user_fields), "tweet.fields": ",".join(tweet_fields), "media.fields": ",".join(media_fields) } if max_results: params['max_results'] = max_results if pagination_token: params['pagination_token'] = pagination_token headers = {"Authorization": "Bearer {}".format(self.token)} response = requests.get(url, params=params, headers=headers) response_json = json.loads(response.text) #print(response_json) typed_resp = from_dict(data=response_json, data_class=DMEventsResponse) return typed_resp def get_conversation (self, dm_conversation_id, max_results = None, pagination_token = None): return def get_conversation_with_user (self, user_id, max_results = None, pagination_token = None): return def send_message (self, dm_conversation_id, text, attachments = None): url = f'/2/dm_conversations/{dm_conversation_id}/messages' body = { 'text': text } if attachments: body['attachments'] = attachments headers = {"Authorization": "Bearer {}".format(self.token)} resp = requests.post(url, data=json.dumps(body), headers=headers) result = json.loads(resp.text) example_resp_text = """ { "dm_conversation_id": "1346889436626259968", "dm_event_id": "128341038123" } """ return result class ApiV2TweetSource: def __init__ (self, token): self.token = token def create_tweet (self, text, reply_to_tweet_id = None, quote_tweet_id = None): url = "https://api.twitter.com/2/tweets" tweet = { 'text': text } if reply_to_tweet_id: tweet['reply'] = { 'in_reply_to_tweet_id': reply_to_tweet_id } if quote_tweet_id: tweet['quote_tweet_id'] = quote_tweet_id body = json.dumps(tweet) headers = { 'Authorization': 'Bearer {}'.format(self.token), 'Content-Type': 'application/json' } response = requests.post(url, data=body, headers=headers) result = json.loads(response.text) return result def retweet (self, tweet_id, user_id): url = "https://api.twitter.com/2/users/{}/retweets".format(user_id) retweet = { 'tweet_id': tweet_id } body = json.dumps(retweet) headers = { 'Authorization': 'Bearer {}'.format(self.token), 'Content-Type': 'application/json' } response = requests.post(url, data=body, headers=headers) result = json.loads(response.text) return result def delete_retweet (self, tweet_id, user_id): url = "https://api.twitter.com/2/users/{}/retweets/{}".format(user_id, tweet_id) headers = { 'Authorization': 'Bearer {}'.format(self.token) } response = requests.delete(url, headers=headers) result = json.loads(response.text) return result def bookmark (self, tweet_id, user_id): url = "https://api.twitter.com/2/users/{}/bookmarks".format(user_id) bookmark = { 'tweet_id': tweet_id } body = json.dumps(bookmark) headers = { 'Authorization': 'Bearer {}'.format(self.token), 'Content-Type': 'application/json' } response = requests.post(url, data=body, headers=headers) result = json.loads(response.text) return result def delete_bookmark (self, tweet_id, user_id): url = "https://api.twitter.com/2/users/{}/bookmarks/{}".format(user_id, tweet_id) headers = { 'Authorization': 'Bearer {}'.format(self.token) } response = requests.delete(url, headers=headers) print(response.status_code) result = json.loads(response.text) return result def get_home_timeline (self, user_id, variant = 'reverse_chronological', max_results = 10, pagination_token = None, since_id = None, until_id = None, end_time = None, start_time=None) -> TweetSearchResponse: """ Get a user's timeline as viewed by the user themselves. """ path = 'users/{}/timelines/{}'.format(user_id, variant) return self.get_timeline(path, max_results=max_results, pagination_token=pagination_token, since_id=since_id, until_id=until_id, end_time=end_time, start_time=start_time, return_dataclass=True) def get_timeline (self, path, max_results = 10, pagination_token = None, since_id = None, until_id = None, end_time = None, start_time = None, non_public_metrics = False, exclude_replies=False, exclude_retweets=False, return_dataclass=False): """ Get any timeline, including custom curated timelines built by Tweet Deck / ApiV11. Max 3,200 for Essential access, and 800 if exclude_replies=True """ token = self.token url = "https://api.twitter.com/2/{}".format(path) tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"] media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"] user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"] expansions = ["entities.mentions.username", "attachments.media_keys", "author_id", "referenced_tweets.id", "referenced_tweets.id.author_id"] if non_public_metrics: tweet_fields.append("non_public_metrics") media_fields.append("non_public_metrics") params = { "expansions": ",".join(expansions), "media.fields": ",".join(media_fields), "tweet.fields": ",".join(tweet_fields), "user.fields": ",".join(user_fields), "max_results": max_results, } exclude = [] if exclude_replies: exclude.append('replies') if exclude_retweets: exclude.append('retweets') if len(exclude): print(f'get_timeline exclude={exclude}') params['exclude'] = ','.join(exclude) if pagination_token: params['pagination_token'] = pagination_token if since_id: params['since_id'] = since_id if until_id: params['until_id'] = until_id if end_time: params['end_time'] = end_time if start_time: params['start_time'] = start_time headers = {"Authorization": "Bearer {}".format(token)} #headers = {"Authorization": "access_token {}".format(access_token)} response = requests.get(url, params=params, headers=headers) response_json = json.loads(response.text) try: #print(json.dumps(response_json, indent = 2)) typed_resp = from_dict(data=response_json, data_class=TweetSearchResponse) except: print('error converting response to dataclass') print(json.dumps(response_json, indent = 2)) if not return_dataclass: return response_json raise 'error converting response to dataclass' if return_dataclass: return typed_resp checked_resp = cleandict(asdict(typed_resp)) print('using checked response to get_timeline') #print(json.dumps(checked_resp, indent=2)) #print('og=') #print(json.dumps(response_json, indent=2)) return checked_resp def get_mentions_timeline (self, user_id, max_results = 10, pagination_token = None, since_id = None, return_dataclass=False): path = "users/{}/mentions".format(user_id) return self.get_timeline(path, max_results=max_results, pagination_token=pagination_token, since_id=since_id, return_dataclass=return_dataclass) def get_user_timeline (self, user_id, max_results = 10, pagination_token = None, since_id = None, until_id = None, start_time = None, end_time = None, non_public_metrics=False, exclude_replies=False, exclude_retweets=False, return_dataclass=False): """ Get a user's Tweets as viewed by another. """ path = "users/{}/tweets".format(user_id) return self.get_timeline(path, max_results=max_results, pagination_token=pagination_token, since_id=since_id, until_id=until_id,start_time=start_time, non_public_metrics = non_public_metrics, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets, return_dataclass=return_dataclass) def get_tweet (self, id_, non_public_metrics = False, return_dataclass=False): return self.get_tweets([id_], non_public_metrics = non_public_metrics, return_dataclass=return_dataclass) def get_tweets (self, ids, non_public_metrics = False, return_dataclass = False): token = self.token url = "https://api.twitter.com/2/tweets" tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"] media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"] user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"] expansions = ["entities.mentions.username", "attachments.media_keys", "author_id", "referenced_tweets.id", "referenced_tweets.id.author_id"] if non_public_metrics: tweet_fields.append("non_public_metrics") media_fields.append("non_public_metrics") params = { "ids": ','.join(ids), "expansions": ",".join(expansions), "media.fields": ",".join(media_fields), "tweet.fields": ",".join(tweet_fields), "user.fields": ",".join(user_fields) } headers = {"Authorization": "Bearer {}".format(token)} #print(params) response = requests.get(url, params=params, headers=headers) response_json = json.loads(response.text) print(json.dumps(response_json, indent=2)) typed_resp = from_dict(data=response_json, data_class=TweetSearchResponse) if return_dataclass: return typed_resp checked_resp = cleandict(asdict(typed_resp)) print('using checked response to search_tweets') return checked_resp def search_tweets (self, query, pagination_token = None, since_id = None, max_results = 10, sort_order = None, non_public_metrics = False, return_dataclass = False ): token = self.token url = "https://api.twitter.com/2/tweets/search/recent" tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"] media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"] user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"] expansions = ["entities.mentions.username", "attachments.media_keys", "author_id", "referenced_tweets.id", "referenced_tweets.id.author_id"] if non_public_metrics: tweet_fields.append("non_public_metrics") media_fields.append("non_public_metrics") params = { "expansions": ",".join(expansions), "media.fields": ",".join(media_fields), "tweet.fields": ",".join(tweet_fields), "user.fields": ",".join(user_fields), "query": query, "max_results": max_results, } if pagination_token: params['pagination_token'] = pagination_token if since_id: params['since_id'] = since_id if until_id: params['until_id'] = until_id if start_time: params['start_time'] = start_time if end_time: params['end_time'] = end_time if sort_order: params['sort_order'] = sort_order headers = {"Authorization": "Bearer {}".format(token)} response = requests.get(url, params=params, headers=headers) response_json = json.loads(response.text) try: typed_resp = from_dict(data=response_json, data_class=TweetSearchResponse) except: print('error converting tweet search response to TweetSearchResponse') print(response_json) raise 'error converting tweet search response to TweetSearchResponse' if return_dataclass: return typed_resp checked_resp = cleandict(asdict(typed_resp)) print('using checked response to search_tweets') return checked_resp def count_tweets (self, query, since_id = None, granularity = 'hour' ): """ App rate limit (Application-only): 300 requests per 15-minute window shared among all users of your app = once per 3 seconds. """ token = self.token url = "https://api.twitter.com/2/tweets/counts/recent" params = { "query": query } if since_id: params['since_id'] = since_id headers = {"Authorization": "Bearer {}".format(token)} response = requests.get(url, params=params, headers=headers) #print(response.status_code) #print(response.text) response_json = json.loads(response.text) return response_json #def get_conversation (self, tweet_id, pagination_token = None, # TODO def get_thread (self, tweet_id, author_id = None, only_replies = False, pagination_token = None, since_id = None, max_results = 10, sort_order = None, return_dataclass=False ): # FIXME author_id can be determined from a Tweet object query = "" if author_id: query += " from:{}".format(author_id) if only_replies: query += " in_reply_to_tweet_id:{}".format(tweet_id) else: query += " conversation_id:{}".format(tweet_id) print("get_thread query=" + query) return self.search_tweets(query, pagination_token = pagination_token, since_id = since_id, max_results = max_results, sort_order = sort_order, return_dataclass=return_dataclass) def get_bookmarks (self, user_id, max_results = 10, pagination_token = None, since_id = None, return_dataclass=False): path = "users/{}/bookmarks".format(user_id) return self.get_timeline(path, max_results=max_results, pagination_token=pagination_token, since_id=since_id, return_dataclass=return_dataclass) def get_media_tweets (self, author_id = None, has_media = True, has_links = None, has_images = None, has_videos = None, is_reply = None, is_retweet = None, pagination_token = None, since_id = None, max_results = 10, sort_order = None, return_dataclass=False ): # FIXME author_id can be determined from a Tweet object query = "" if has_media != None: if not has_media: query += "-" query += "has:media " if has_links != None: if not has_links: query += " -" query += "has:links " if has_images != None: if not has_images: query += " -" query += "has:images " if has_videos != None: if not has_videos: query += " -" query += "has:videos " if is_reply != None: if not is_reply: query += " -" query += "is:reply " if is_retweet != None: if not is_retweet: query += " -" query += "is:retweet " if author_id: query += "from:{} ".format(author_id) return self.search_tweets(query, pagination_token = pagination_token, since_id = since_id, max_results = max_results, sort_order = sort_order, return_dataclass = return_dataclass) def get_retweets (self, tweet_id): # GET /2/tweets/:id/retweeted_by return def get_quote_tweets( self, tweet_id): # GET /2/tweets/:id/quote_tweets return def get_liked_tweets (self, user_id, max_results = 10, pagination_token = None, since_id = None, return_dataclass=False): # GET /2/users/:id/liked_tweets # User rate limit (User context): 75 requests per 15-minute window per each authenticated user path = "users/{}/liked_tweets".format(user_id) print('get_liked_tweets') return self.get_timeline(path, max_results=max_results, pagination_token=pagination_token, since_id=since_id, return_dataclass=return_dataclass) def get_liking_users (self, tweet_id, max_results = None, pagination_token = None, return_dataclass=False): # GET /2/tweets/:id/liking_users # User rate limit (User context): 75 requests per 15-minute window per each authenticated user url = f"https://api.twitter.com/2/tweets/{tweet_id}/liking_users" user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified", "description", "public_metrics", "protected", "pinned_tweet_id", "url"] expansions = [] params = cleandict({ "user.fields": ','.join(user_fields), "max_results": max_results, "pagination_token": pagination_token, "expansions": ','.join(expansions), }) headers = { "Authorization": f"Bearer {self.token}" } resp = requests.get(url, headers=headers, params=params) result = json.loads(resp.text) typed_result = from_dict(data_class=UserSearchResponse, data=result) #print(typed_result) if return_dataclass: return typed_result result = cleandict(asdict(typed_result)) return result def like_tweet (self, tweet_id): # POST /2/users/:user_id/likes # {id: tweet_id} return def delete_like (self, tweet_id, user_id): url = "https://api.twitter.com/2/users/{}/likes/{}".format(user_id, tweet_id) headers = { 'Authorization': 'Bearer {}'.format(self.token) } response = requests.delete(url, headers=headers) print(response.status_code) result = json.loads(response.text) return result def get_list_tweets (self, list_id): # GET /2/lists/:id/tweets return def cleandict(d): if isinstance(d, dict): return {k: cleandict(v) for k, v in d.items() if v is not None} elif isinstance(d, list): return [cleandict(v) for v in d] else: return d