oauth2_login.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. import base64
  2. from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify
  3. from flask_cors import CORS
  4. import sqlite3
  5. import os
  6. import json
  7. from datetime import datetime, timedelta, timezone
  8. import dateutil
  9. import dateutil.parser
  10. import dateutil.tz
  11. import requests
  12. import hashlib
  13. import re
  14. from requests.auth import AuthBase, HTTPBasicAuth
  15. from requests_oauthlib import OAuth2Session
  16. from flask import url_for as og_url_for
  17. app_access_token = os.environ.get("BEARER_TOKEN")
  18. app_consumer_key = os.environ.get("TWITTER_CONSUMER_KEY")
  19. app_secret_key = os.environ.get("TWITTER_CONSUMER_SECRET")
  20. TWITTER_SCOPES = ["bookmark.read", "bookmark.write", "tweet.read", "tweet.write", "dm.read", "users.read", "like.read", "offline.access", "follows.read"]
  21. oauth2_login = Blueprint('oauth2_login', 'oauth2_login',
  22. static_folder='static',
  23. static_url_path='',
  24. url_prefix='/oauth2')
  25. def url_for_with_me (route, *args, **kwargs):
  26. #print('url_for_with_me')
  27. if route.endswith('.static') or not 'me' in g:
  28. return og_url_for(route, *args, **kwargs)
  29. return og_url_for(route, *args, **{'me': g.me, **kwargs})
  30. @oauth2_login.before_request
  31. def add_me ():
  32. g.me = request.args.get('me')
  33. #if me.startswith('twitter') and me in session:
  34. g.twitter_user = session.get(g.me)
  35. if not g.twitter_user:
  36. return
  37. now = datetime.now(timezone.utc).timestamp()
  38. if g.twitter_user['expires_at'] < now - 10:
  39. print('token is expired... refreshing')
  40. # FIXME we should have a lock here in case 2 requests quickly come in.
  41. # the later will fail as of now. should be rare since static resources aren't authenticated
  42. # and we don't use APIs w/ JS.
  43. refresh_token()
  44. @oauth2_login.context_processor
  45. def inject_me():
  46. return {'me': g.me, 'twitter_user': g.twitter_user, 'url_for': url_for_with_me}
  47. @oauth2_login.route('/logout.html')
  48. def get_logout_html ():
  49. del session[g.me]
  50. return redirect('/')
  51. # def add_me(endpoint, values):
  52. ##values['me'] = request.args.get('me')
  53. # g.me = request.args.get('me')
  54. # twitter_app.url_value_preprocessor(add_me)
  55. @oauth2_login.route('/logged-in.html')
  56. def get_loggedin_html ():
  57. client_id = os.environ.get('TWITTER_CLIENT_ID')
  58. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  59. code_verifier = session['twitter_code_verifier']
  60. code = request.args.get('code')
  61. state = request.args.get('state')
  62. endpoint_url = g.app_url
  63. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  64. authorization_response = redirect_uri + '?code={}&state={}'.format(code, state)
  65. # Fetch your access token
  66. token_url = "https://api.twitter.com/2/oauth2/token"
  67. # The following line of code will only work if you are using a type of App that is a public client
  68. auth = False
  69. # If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret.
  70. # Please remove the comment on the following line if you are using a type of App that is a confidential client
  71. auth = HTTPBasicAuth(client_id, client_secret)
  72. scopes = TWITTER_SCOPES
  73. #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
  74. oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
  75. token = oauth.fetch_token(
  76. token_url=token_url,
  77. authorization_response=authorization_response,
  78. auth=auth,
  79. client_id=client_id,
  80. include_client_id=True,
  81. code_verifier=code_verifier,
  82. )
  83. # Your access token
  84. access = token["access_token"]
  85. refresh = token["refresh_token"]
  86. #expires_at = token["expires_at"] # expires_in
  87. expires_at = (datetime.now(timezone.utc) + timedelta(seconds=token['expires_in'])).timestamp()
  88. # Make a request to the users/me endpoint to get your user ID
  89. user_me = requests.request(
  90. "GET",
  91. "https://api.twitter.com/2/users/me",
  92. headers={"Authorization": "Bearer {}".format(access)},
  93. ).json()
  94. user_id = user_me["data"]["id"]
  95. user_username = user_me["data"]["username"]
  96. user_name = user_me["data"]["name"]
  97. del session['twitter_code_verifier']
  98. me = 'twitter:{}'.format(user_id)
  99. session[ me ] = {
  100. 'expires_at': expires_at,
  101. 'access_token': access,
  102. 'refresh_token': refresh,
  103. 'id': user_id,
  104. 'display_name': user_name,
  105. 'username': user_username
  106. }
  107. g.me = me
  108. g.twitter_user = session[ me ]
  109. #run_script('on_loggedin', {'twitter_user': g.twitter_user})
  110. return redirect(url_for_with_me('twitter_v2_facade.get_timeline_home_html'))
  111. @oauth2_login.route('/login.html')
  112. def get_login_html ():
  113. client_id = os.environ.get('TWITTER_CLIENT_ID')
  114. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  115. #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
  116. endpoint_url = g.app_url
  117. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  118. # Set the scopes
  119. scopes = TWITTER_SCOPES
  120. # Create a code verifier
  121. code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
  122. code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
  123. # Create a code challenge
  124. code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
  125. code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
  126. code_challenge = code_challenge.replace("=", "")
  127. # Start an OAuth 2.0 session
  128. oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
  129. # Create an authorize URL
  130. auth_url = "https://twitter.com/i/oauth2/authorize"
  131. authorization_url, state = oauth.authorization_url(
  132. auth_url, code_challenge=code_challenge, code_challenge_method="S256"
  133. )
  134. session['twitter_code_verifier'] = code_verifier
  135. return redirect(authorization_url)
  136. def refresh_token ():
  137. """
  138. Refresh twitter oauth access_token with refresh token.
  139. Works in the session, so must be done within a logged in request context.
  140. """
  141. client_id = os.environ.get('TWITTER_CLIENT_ID')
  142. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  143. token = g.twitter_user['refresh_token']
  144. basic_auth = base64.b64encode('{}:{}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8')
  145. headers = {
  146. 'Authorization': 'Basic ' + basic_auth,
  147. }
  148. data = {
  149. 'refresh_token': token,
  150. 'grant_type': 'refresh_token'
  151. }
  152. response = requests.post('https://api.twitter.com/2/oauth2/token', data=data, headers=headers)
  153. result = json.loads(response.text)
  154. if 'access_token' in result:
  155. expires_at = (datetime.now(timezone.utc) + timedelta(seconds=result['expires_in'])).timestamp()
  156. g.twitter_user['expires_at'] = expires_at
  157. g.twitter_user['refresh_token'] = result['refresh_token']
  158. g.twitter_user['access_token'] = result['access_token']
  159. session[ g.me ] = g.twitter_user
  160. return response.text
  161. @oauth2_login.route('/refresh-token', methods=['GET'])
  162. def get_twitter_refresh_token (response_format='json'):
  163. refresh_token()
  164. return 'ok'
  165. @oauth2_login.route('/app/refresh-token', methods=['GET'])
  166. def get_twitter_app_refresh_token ():
  167. basic_auth = base64.b64encode('{}:{}'.format(app_consumer_key, app_secret_key).encode('utf-8')).decode('utf-8')
  168. headers = {
  169. 'Authorization': 'Basic ' + basic_auth,
  170. }
  171. data = {
  172. 'grant_type': 'client_credentials'
  173. }
  174. response = requests.post('https://api.twitter.com/oauth2/token', data=data, headers=headers)
  175. result = json.loads(response.text)
  176. if 'access_token' in result:
  177. app_access_token = result['access_token']
  178. return 'ok'