api.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. from dataclasses import asdict
  2. from typing import List
  3. from dacite import from_dict
  4. import json
  5. import requests
  6. import sqlite3
  7. from twitter_v2.types import TweetSearchResponse, DMEventsResponse, UserSearchResponse
  8. # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/curate-a-collection/api-reference/get-collections-entries
  9. # we can perhaps steal a token from the TweetDeck Console, otherwise we need to apply for Standard v1.1 / Elevated
  10. class ApiV11TweetCollectionSource:
  11. def __init__ (self, token):
  12. self.token = token
  13. def create_collection (self, name):
  14. return
  15. def bulk_add_to_collection (self, collection_id, items):
  16. return
  17. def add_to_collection (self, collection_id, item):
  18. return
  19. def get_collection_tweets (self, collection_id):
  20. return
  21. class TwitterApiV2SocialGraph:
  22. def __init__ (self, token):
  23. self.token = token
  24. def get_user (self, user_id, is_username=False, return_dataclass=False):
  25. # GET /2/users/:id
  26. # GET /2/users/by/:username
  27. return self.get_users([user_id], is_username, return_dataclass=return_dataclass)
  28. def get_users (self, user_ids, are_usernames=False, return_dataclass=False):
  29. # GET /2/users/by?usernames=
  30. # GET /2/users?ids=
  31. user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified", "description", "public_metrics", "protected", "pinned_tweet_id", "url"]
  32. params = {
  33. 'user.fields' : ','.join(user_fields),
  34. }
  35. if are_usernames:
  36. url = "https://api.twitter.com/2/users/by"
  37. params['usernames'] = user_ids
  38. else:
  39. url = "https://api.twitter.com/2/users"
  40. params['ids'] = user_ids
  41. headers = {
  42. 'Authorization': 'Bearer {}'.format(self.token)
  43. }
  44. response = requests.get(url, params=params, headers=headers)
  45. result = json.loads(response.text)
  46. typed_result = from_dict(data_class=UserSearchResponse, data=result)
  47. if return_dataclass:
  48. return typed_result
  49. result = cleandict(asdict(typed_result))
  50. return result
  51. def get_following (self, user_id,
  52. max_results = 50, pagination_token = None, return_dataclass=False):
  53. # GET /2/users/:id/following
  54. url = "https://api.twitter.com/2/users/{}/following".format(user_id)
  55. user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified"]
  56. params = {
  57. 'user.fields' : ','.join(user_fields),
  58. 'max_results': max_results
  59. }
  60. if pagination_token:
  61. params['pagination_token'] = pagination_token
  62. headers = {
  63. 'Authorization': 'Bearer {}'.format(self.token)
  64. }
  65. response = requests.get(url, params=params, headers=headers)
  66. result = json.loads(response.text)
  67. typed_result = from_dict(data_class=UserSearchResponse, data=result)
  68. if return_dataclass:
  69. return typed_result
  70. result = cleandict(asdict(typed_result))
  71. return result
  72. def get_followers (self, user_id,
  73. max_results = 50, pagination_token = None, return_dataclass=False):
  74. # GET /2/users/:id/followers
  75. url = "https://api.twitter.com/2/users/{}/followers".format(user_id)
  76. user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified", "description", "public_metrics", "protected", "pinned_tweet_id", "url"]
  77. params = {
  78. 'user.fields' : ','.join(user_fields),
  79. 'max_results': max_results
  80. }
  81. if pagination_token:
  82. params['pagination_token'] = pagination_token
  83. headers = {
  84. 'Authorization': 'Bearer {}'.format(self.token)
  85. }
  86. response = requests.get(url, params=params, headers=headers)
  87. result = json.loads(response.text)
  88. typed_result = from_dict(data_class=UserSearchResponse, data=result)
  89. if return_dataclass:
  90. return typed_result
  91. result = cleandict(asdict(typed_result))
  92. return result
  93. def follow_user (self, user_id, target_user_id):
  94. # POST /2/users/:id/following
  95. # {target_user_id}
  96. return
  97. def unfollow_user (self, user_id, target_user_id):
  98. # DELETE /2/users/:source_user_id/following/:target_user_id
  99. return
  100. class ApiV2ConversationSource:
  101. def __init__ (self, token):
  102. self.token = token
  103. def get_recent_events (self, max_results = None, pagination_token = None):
  104. # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events
  105. url = "https://api.twitter.com/2/dm_events"
  106. tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"]
  107. media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"]
  108. user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"]
  109. params = {
  110. "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments",
  111. "expansions": ",".join(["sender_id", "participant_ids", "referenced_tweets.id", "attachments.media_keys"]),
  112. "user.fields": ",".join(user_fields),
  113. "tweet.fields": ",".join(tweet_fields),
  114. "media.fields": ",".join(media_fields)
  115. }
  116. if max_results:
  117. params['max_results'] = max_results
  118. if pagination_token:
  119. params['pagination_token'] = pagination_token
  120. headers = {"Authorization": "Bearer {}".format(self.token)}
  121. response = requests.get(url, params=params, headers=headers)
  122. response_json = json.loads(response.text)
  123. #print(response_json)
  124. typed_resp = from_dict(data=response_json, data_class=DMEventsResponse)
  125. return typed_resp
  126. def get_conversation (self, dm_conversation_id,
  127. max_results = None, pagination_token = None):
  128. return
  129. def get_conversation_with_user (self, user_id,
  130. max_results = None, pagination_token = None):
  131. return
  132. def send_message (self, dm_conversation_id, text, attachments = None):
  133. url = f'/2/dm_conversations/{dm_conversation_id}/messages'
  134. body = {
  135. 'text': text
  136. }
  137. if attachments:
  138. body['attachments'] = attachments
  139. headers = {"Authorization": "Bearer {}".format(self.token)}
  140. resp = requests.post(url, data=json.dumps(body), headers=headers)
  141. result = json.loads(resp.text)
  142. example_resp_text = """
  143. {
  144. "dm_conversation_id": "1346889436626259968",
  145. "dm_event_id": "128341038123"
  146. }
  147. """
  148. return result
  149. class ApiV2TweetSource:
  150. def __init__ (self, token):
  151. self.token = token
  152. def create_tweet (self, text,
  153. reply_to_tweet_id = None, quote_tweet_id = None):
  154. url = "https://api.twitter.com/2/tweets"
  155. tweet = {
  156. 'text': text
  157. }
  158. if reply_to_tweet_id:
  159. tweet['reply'] = {
  160. 'in_reply_to_tweet_id': reply_to_tweet_id
  161. }
  162. if quote_tweet_id:
  163. tweet['quote_tweet_id'] = quote_tweet_id
  164. body = json.dumps(tweet)
  165. headers = {
  166. 'Authorization': 'Bearer {}'.format(self.token),
  167. 'Content-Type': 'application/json'
  168. }
  169. response = requests.post(url, data=body, headers=headers)
  170. result = json.loads(response.text)
  171. return result
  172. def retweet (self, tweet_id, user_id):
  173. url = "https://api.twitter.com/2/users/{}/retweets".format(user_id)
  174. retweet = {
  175. 'tweet_id': tweet_id
  176. }
  177. body = json.dumps(retweet)
  178. headers = {
  179. 'Authorization': 'Bearer {}'.format(self.token),
  180. 'Content-Type': 'application/json'
  181. }
  182. response = requests.post(url, data=body, headers=headers)
  183. result = json.loads(response.text)
  184. return result
  185. def bookmark (self, tweet_id, user_id):
  186. url = "https://api.twitter.com/2/users/{}/bookmarks".format(user_id)
  187. bookmark = {
  188. 'tweet_id': tweet_id
  189. }
  190. body = json.dumps(bookmark)
  191. headers = {
  192. 'Authorization': 'Bearer {}'.format(self.token),
  193. 'Content-Type': 'application/json'
  194. }
  195. response = requests.post(url, data=body, headers=headers)
  196. result = json.loads(response.text)
  197. return result
  198. def delete_bookmark (self, tweet_id, user_id):
  199. url = "https://api.twitter.com/2/users/{}/bookmarks/{}".format(user_id, tweet_id)
  200. headers = {
  201. 'Authorization': 'Bearer {}'.format(self.token)
  202. }
  203. response = requests.delete(url, headers=headers)
  204. result = json.loads(response.text)
  205. return result
  206. def get_home_timeline (self, user_id, variant = 'reverse_chronological', max_results = 10, pagination_token = None, since_id = None, until_id = None, end_time = None) -> TweetSearchResponse:
  207. """
  208. Get a user's timeline as viewed by the user themselves.
  209. """
  210. path = 'users/{}/timelines/{}'.format(user_id, variant)
  211. return self.get_timeline(path,
  212. max_results=max_results, pagination_token=pagination_token, since_id=since_id, until_id=until_id, end_time=end_time, return_dataclass=True)
  213. def get_timeline (self, path,
  214. max_results = 10, pagination_token = None, since_id = None,
  215. until_id = None,
  216. end_time = None,
  217. non_public_metrics = False,
  218. exclude_replies=False,
  219. exclude_retweets=False,
  220. return_dataclass=False):
  221. """
  222. Get any timeline, including custom curated timelines built by Tweet Deck / ApiV11.
  223. """
  224. token = self.token
  225. url = "https://api.twitter.com/2/{}".format(path)
  226. tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"]
  227. media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"]
  228. user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"]
  229. expansions = ["entities.mentions.username",
  230. "attachments.media_keys",
  231. "author_id",
  232. "referenced_tweets.id",
  233. "referenced_tweets.id.author_id"]
  234. if non_public_metrics:
  235. tweet_fields.append("non_public_metrics")
  236. media_fields.append("non_public_metrics")
  237. params = {
  238. "expansions": ",".join(expansions),
  239. "media.fields": ",".join(media_fields),
  240. "tweet.fields": ",".join(tweet_fields),
  241. "user.fields": ",".join(user_fields),
  242. "max_results": max_results,
  243. }
  244. exclude = []
  245. if exclude_replies:
  246. exclude.append('replies')
  247. if exclude_retweets:
  248. exclude.append('retweets')
  249. if len(exclude):
  250. print(f'get_timeline exclude={exclude}')
  251. params['exclude'] = ','.join(exclude)
  252. if pagination_token:
  253. params['pagination_token'] = pagination_token
  254. if since_id:
  255. params['since_id'] = since_id
  256. if until_id:
  257. params['until_id'] = until_id
  258. if end_time:
  259. params['end_time'] = end_time
  260. headers = {"Authorization": "Bearer {}".format(token)}
  261. #headers = {"Authorization": "access_token {}".format(access_token)}
  262. response = requests.get(url, params=params, headers=headers)
  263. response_json = json.loads(response.text)
  264. try:
  265. print(json.dumps(response_json, indent = 2))
  266. typed_resp = from_dict(data=response_json, data_class=TweetSearchResponse)
  267. except:
  268. print('error converting response to dataclass')
  269. print(json.dumps(response_json, indent = 2))
  270. if not return_dataclass:
  271. return response_json
  272. raise 'error converting response to dataclass'
  273. if return_dataclass:
  274. return typed_resp
  275. checked_resp = cleandict(asdict(typed_resp))
  276. print('using checked response to get_timeline')
  277. #print(json.dumps(checked_resp, indent=2))
  278. #print('og=')
  279. #print(json.dumps(response_json, indent=2))
  280. return checked_resp
  281. def get_mentions_timeline (self, user_id,
  282. max_results = 10, pagination_token = None, since_id = None, return_dataclass=False):
  283. path = "users/{}/mentions".format(user_id)
  284. return self.get_timeline(path,
  285. max_results=max_results, pagination_token=pagination_token, since_id=since_id, return_dataclass=return_dataclass)
  286. def get_user_timeline (self, user_id,
  287. max_results = 10, pagination_token = None,
  288. since_id = None,
  289. non_public_metrics=False,
  290. exclude_replies=False,
  291. exclude_retweets=False,
  292. return_dataclass=False):
  293. """
  294. Get a user's Tweets as viewed by another.
  295. """
  296. path = "users/{}/tweets".format(user_id)
  297. return self.get_timeline(path,
  298. max_results=max_results, pagination_token=pagination_token, since_id=since_id,
  299. non_public_metrics = non_public_metrics,
  300. exclude_replies=exclude_replies, exclude_retweets=exclude_retweets, return_dataclass=return_dataclass)
  301. def get_tweet (self, id_, non_public_metrics = False, return_dataclass=False):
  302. return self.get_tweets([id_], non_public_metrics = non_public_metrics, return_dataclass=return_dataclass)
  303. def get_tweets (self,
  304. ids,
  305. non_public_metrics = False,
  306. return_dataclass = False):
  307. token = self.token
  308. url = "https://api.twitter.com/2/tweets"
  309. tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"]
  310. media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"]
  311. user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"]
  312. expansions = ["entities.mentions.username",
  313. "attachments.media_keys",
  314. "author_id",
  315. "referenced_tweets.id",
  316. "referenced_tweets.id.author_id"]
  317. if non_public_metrics:
  318. tweet_fields.append("non_public_metrics")
  319. media_fields.append("non_public_metrics")
  320. params = {
  321. "ids": ','.join(ids),
  322. "expansions": ",".join(expansions),
  323. "media.fields": ",".join(media_fields),
  324. "tweet.fields": ",".join(tweet_fields),
  325. "user.fields": ",".join(user_fields)
  326. }
  327. headers = {"Authorization": "Bearer {}".format(token)}
  328. #print(params)
  329. response = requests.get(url, params=params, headers=headers)
  330. response_json = json.loads(response.text)
  331. print(json.dumps(response_json, indent=2))
  332. typed_resp = from_dict(data=response_json, data_class=TweetSearchResponse)
  333. if return_dataclass:
  334. return typed_resp
  335. checked_resp = cleandict(asdict(typed_resp))
  336. print('using checked response to search_tweets')
  337. return checked_resp
  338. def search_tweets (self,
  339. query,
  340. pagination_token = None,
  341. since_id = None,
  342. max_results = 10,
  343. sort_order = None,
  344. non_public_metrics = False,
  345. return_dataclass = False
  346. ):
  347. token = self.token
  348. url = "https://api.twitter.com/2/tweets/search/recent"
  349. tweet_fields = ["created_at", "conversation_id", "referenced_tweets", "text", "public_metrics", "entities", "attachments"]
  350. media_fields = ["alt_text", "type", "preview_image_url", "public_metrics", "url", "media_key", "duration_ms", "width", "height", "variants"]
  351. user_fields = ["created_at", "name", "username", "location", "profile_image_url", "verified"]
  352. expansions = ["entities.mentions.username",
  353. "attachments.media_keys",
  354. "author_id",
  355. "referenced_tweets.id",
  356. "referenced_tweets.id.author_id"]
  357. if non_public_metrics:
  358. tweet_fields.append("non_public_metrics")
  359. media_fields.append("non_public_metrics")
  360. params = {
  361. "expansions": ",".join(expansions),
  362. "media.fields": ",".join(media_fields),
  363. "tweet.fields": ",".join(tweet_fields),
  364. "user.fields": ",".join(user_fields),
  365. "query": query,
  366. "max_results": max_results,
  367. }
  368. if pagination_token:
  369. params['pagination_token'] = pagination_token
  370. if since_id:
  371. params['since_id'] = since_id
  372. if sort_order:
  373. params['sort_order'] = sort_order
  374. headers = {"Authorization": "Bearer {}".format(token)}
  375. response = requests.get(url, params=params, headers=headers)
  376. response_json = json.loads(response.text)
  377. try:
  378. typed_resp = from_dict(data=response_json, data_class=TweetSearchResponse)
  379. except:
  380. print('error converting tweet search response to TweetSearchResponse')
  381. print(response_json)
  382. raise 'error converting tweet search response to TweetSearchResponse'
  383. if return_dataclass:
  384. return typed_resp
  385. checked_resp = cleandict(asdict(typed_resp))
  386. print('using checked response to search_tweets')
  387. return checked_resp
  388. def count_tweets (self,
  389. query,
  390. since_id = None,
  391. granularity = 'hour'
  392. ):
  393. """
  394. App rate limit (Application-only): 300 requests per 15-minute window shared among all users of your app = once per 3 seconds.
  395. """
  396. token = self.token
  397. url = "https://api.twitter.com/2/tweets/counts/recent"
  398. params = {
  399. "query": query
  400. }
  401. if since_id:
  402. params['since_id'] = since_id
  403. headers = {"Authorization": "Bearer {}".format(token)}
  404. response = requests.get(url, params=params, headers=headers)
  405. #print(response.status_code)
  406. #print(response.text)
  407. response_json = json.loads(response.text)
  408. return response_json
  409. #def get_conversation (self, tweet_id, pagination_token = None,
  410. # TODO
  411. def get_thread (self, tweet_id,
  412. author_id = None,
  413. only_replies = False,
  414. pagination_token = None,
  415. since_id = None,
  416. max_results = 10,
  417. sort_order = None,
  418. return_dataclass=False
  419. ):
  420. # FIXME author_id can be determined from a Tweet object
  421. query = ""
  422. if author_id:
  423. query += " from:{}".format(author_id)
  424. if only_replies:
  425. query += " in_reply_to_tweet_id:{}".format(tweet_id)
  426. else:
  427. query += " conversation_id:{}".format(tweet_id)
  428. print("get_thread query=" + query)
  429. return self.search_tweets(query,
  430. pagination_token = pagination_token, since_id = since_id, max_results = max_results, sort_order = sort_order,
  431. return_dataclass=return_dataclass)
  432. def get_bookmarks (self, user_id,
  433. max_results = 10, pagination_token = None, since_id = None,
  434. return_dataclass=False):
  435. path = "users/{}/bookmarks".format(user_id)
  436. return self.get_timeline(path,
  437. max_results=max_results, pagination_token=pagination_token, since_id=since_id, return_dataclass=return_dataclass)
  438. def get_media_tweets (self,
  439. author_id = None,
  440. has_media = True,
  441. has_links = None,
  442. has_images = None,
  443. has_videos = None,
  444. is_reply = None,
  445. is_retweet = None,
  446. pagination_token = None,
  447. since_id = None,
  448. max_results = 10,
  449. sort_order = None,
  450. return_dataclass=False
  451. ):
  452. # FIXME author_id can be determined from a Tweet object
  453. query = ""
  454. if has_media != None:
  455. if not has_media:
  456. query += "-"
  457. query += "has:media "
  458. if has_links != None:
  459. if not has_links:
  460. query += " -"
  461. query += "has:links "
  462. if has_images != None:
  463. if not has_images:
  464. query += " -"
  465. query += "has:images "
  466. if has_videos != None:
  467. if not has_videos:
  468. query += " -"
  469. query += "has:videos "
  470. if is_reply != None:
  471. if not is_reply:
  472. query += " -"
  473. query += "is:reply "
  474. if is_retweet != None:
  475. if not is_retweet:
  476. query += " -"
  477. query += "is:retweet "
  478. if author_id:
  479. query += "from:{} ".format(author_id)
  480. return self.search_tweets(query,
  481. pagination_token = pagination_token, since_id = since_id, max_results = max_results, sort_order = sort_order, return_dataclass = return_dataclass)
  482. def get_retweets (self, tweet_id):
  483. # GET /2/tweets/:id/retweeted_by
  484. return
  485. def get_quote_tweets( self, tweet_id):
  486. # GET /2/tweets/:id/quote_tweets
  487. return
  488. def get_liked_tweets (self, user_id,
  489. max_results = 10, pagination_token = None, since_id = None, return_dataclass=False):
  490. # GET /2/users/:id/liked_tweets
  491. # User rate limit (User context): 75 requests per 15-minute window per each authenticated user
  492. path = "users/{}/liked_tweets".format(user_id)
  493. return self.get_timeline(path,
  494. max_results=max_results, pagination_token=pagination_token, since_id=since_id, return_dataclass=return_dataclass)
  495. def get_liking_users (self, tweet_id,
  496. max_results = None, pagination_token = None,
  497. return_dataclass=False):
  498. # GET /2/tweets/:id/liking_users
  499. # User rate limit (User context): 75 requests per 15-minute window per each authenticated user
  500. url = f"https://api.twitter.com/2/tweets/{tweet_id}/liking_users"
  501. user_fields = ["id", "created_at", "name", "username", "location", "profile_image_url", "verified", "description", "public_metrics", "protected", "pinned_tweet_id", "url"]
  502. expansions = []
  503. params = cleandict({
  504. "user.fields": ','.join(user_fields),
  505. "max_results": max_results,
  506. "pagination_token": pagination_token,
  507. "expansions": ','.join(expansions),
  508. })
  509. headers = {
  510. "Authorization": f"Bearer {self.token}"
  511. }
  512. resp = requests.get(url, headers=headers, params=params)
  513. result = json.loads(resp.text)
  514. typed_result = from_dict(data_class=UserSearchResponse, data=result)
  515. #print(typed_result)
  516. if return_dataclass:
  517. return typed_result
  518. result = cleandict(asdict(typed_result))
  519. return result
  520. def like_tweet (self, tweet_id):
  521. # POST /2/users/:user_id/likes
  522. # {id: tweet_id}
  523. return
  524. def get_list_tweets (self, list_id):
  525. # GET /2/lists/:id/tweets
  526. return
  527. def cleandict(d):
  528. if isinstance(d, dict):
  529. return {k: cleandict(v) for k, v in d.items() if v is not None}
  530. elif isinstance(d, list):
  531. return [cleandict(v) for v in d]
  532. else:
  533. return d