oauth2_login.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import base64
  2. from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify
  3. from flask_cors import CORS
  4. import sqlite3
  5. import os
  6. import json
  7. from datetime import datetime, timedelta, timezone
  8. import dateutil
  9. import dateutil.parser
  10. import dateutil.tz
  11. import requests
  12. import hashlib
  13. import re
  14. from requests.auth import AuthBase, HTTPBasicAuth
  15. from requests_oauthlib import OAuth2Session
  16. from flask import url_for as og_url_for
  17. app_access_token = os.environ.get("BEARER_TOKEN")
  18. app_consumer_key = os.environ.get("TWITTER_CONSUMER_KEY")
  19. app_secret_key = os.environ.get("TWITTER_CONSUMER_SECRET")
  20. TWITTER_SCOPES = ["bookmark.read", "bookmark.write", "tweet.read", "tweet.write", "dm.read", "users.read", "like.read", "like.write", "offline.access", "follows.read"]
  21. oauth2_login = Blueprint('oauth2_login', 'oauth2_login',
  22. static_folder='static',
  23. static_url_path='',
  24. url_prefix='/oauth2')
  25. def url_for_with_me (route, *args, **kwargs):
  26. #print('url_for_with_me')
  27. if route.endswith('.static') or not 'me' in g:
  28. return og_url_for(route, *args, **kwargs)
  29. return og_url_for(route, *args, **{'me': g.me, **kwargs})
  30. @oauth2_login.before_request
  31. def add_me ():
  32. g.me = request.args.get('me')
  33. #if me.startswith('twitter') and me in session:
  34. g.twitter_user = session.get(g.me)
  35. if not g.twitter_user:
  36. return
  37. now = datetime.now(timezone.utc).timestamp()
  38. if g.twitter_user['expires_at'] < now - 10:
  39. print('token is expired... refreshing')
  40. # FIXME we should have a lock here in case 2 requests quickly come in.
  41. # the later will fail as of now. should be rare since static resources aren't authenticated
  42. # and we don't use APIs w/ JS.
  43. refresh_token()
  44. print('DEBUG: twitter_user')
  45. print(g.twitter_user)
  46. @oauth2_login.context_processor
  47. def inject_me():
  48. return {'me': g.me, 'twitter_user': g.twitter_user, 'url_for': url_for_with_me}
  49. @oauth2_login.route('/logout.html')
  50. def get_logout_html ():
  51. del session[g.me]
  52. return redirect('/')
  53. # def add_me(endpoint, values):
  54. ##values['me'] = request.args.get('me')
  55. # g.me = request.args.get('me')
  56. # twitter_app.url_value_preprocessor(add_me)
  57. @oauth2_login.route('/logged-in.html')
  58. def get_loggedin_html ():
  59. client_id = os.environ.get('TWITTER_CLIENT_ID')
  60. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  61. code_verifier = session['twitter_code_verifier']
  62. code = request.args.get('code')
  63. state = request.args.get('state')
  64. endpoint_url = g.app_url
  65. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  66. authorization_response = redirect_uri + '?code={}&state={}'.format(code, state)
  67. # Fetch your access token
  68. token_url = "https://api.twitter.com/2/oauth2/token"
  69. # The following line of code will only work if you are using a type of App that is a public client
  70. auth = False
  71. # If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret.
  72. # Please remove the comment on the following line if you are using a type of App that is a confidential client
  73. auth = HTTPBasicAuth(client_id, client_secret)
  74. scopes = TWITTER_SCOPES
  75. #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
  76. oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
  77. token = oauth.fetch_token(
  78. token_url=token_url,
  79. authorization_response=authorization_response,
  80. auth=auth,
  81. client_id=client_id,
  82. include_client_id=True,
  83. code_verifier=code_verifier,
  84. )
  85. # Your access token
  86. access = token["access_token"]
  87. refresh = token["refresh_token"]
  88. #expires_at = token["expires_at"] # expires_in
  89. expires_at = (datetime.now(timezone.utc) + timedelta(seconds=token['expires_in'])).timestamp()
  90. # Make a request to the users/me endpoint to get your user ID
  91. user_me = requests.request(
  92. "GET",
  93. "https://api.twitter.com/2/users/me",
  94. headers={"Authorization": "Bearer {}".format(access)},
  95. ).json()
  96. user_id = user_me["data"]["id"]
  97. user_username = user_me["data"]["username"]
  98. user_name = user_me["data"]["name"]
  99. del session['twitter_code_verifier']
  100. me = 'twitter:{}'.format(user_id)
  101. session[ me ] = {
  102. 'expires_in': token['expires_in'],
  103. 'expires_at': expires_at,
  104. 'access_token': access,
  105. 'refresh_token': refresh,
  106. 'id': user_id,
  107. 'display_name': user_name,
  108. 'username': user_username
  109. }
  110. g.me = me
  111. g.twitter_user = session[ me ]
  112. #run_script('on_loggedin', {'twitter_user': g.twitter_user})
  113. return redirect(url_for_with_me('twitter_v2_facade.get_timeline_home_html'))
  114. @oauth2_login.route('/login.html')
  115. def get_login_html ():
  116. client_id = os.environ.get('TWITTER_CLIENT_ID')
  117. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  118. #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
  119. endpoint_url = g.app_url
  120. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  121. # Set the scopes
  122. scopes = TWITTER_SCOPES
  123. # Create a code verifier
  124. code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
  125. code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
  126. # Create a code challenge
  127. code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
  128. code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
  129. code_challenge = code_challenge.replace("=", "")
  130. # Start an OAuth 2.0 session
  131. oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
  132. # Create an authorize URL
  133. auth_url = "https://twitter.com/i/oauth2/authorize"
  134. authorization_url, state = oauth.authorization_url(
  135. auth_url, code_challenge=code_challenge, code_challenge_method="S256"
  136. )
  137. session['twitter_code_verifier'] = code_verifier
  138. return redirect(authorization_url)
  139. def refresh_token ():
  140. """
  141. Refresh twitter oauth access_token with refresh token.
  142. Works in the session, so must be done within a logged in request context.
  143. """
  144. client_id = os.environ.get('TWITTER_CLIENT_ID')
  145. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  146. token = g.twitter_user['refresh_token']
  147. basic_auth = base64.b64encode('{}:{}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8')
  148. headers = {
  149. 'Authorization': 'Basic ' + basic_auth,
  150. }
  151. data = {
  152. 'refresh_token': token,
  153. 'grant_type': 'refresh_token'
  154. }
  155. response = requests.post('https://api.twitter.com/2/oauth2/token', data=data, headers=headers)
  156. result = json.loads(response.text)
  157. if 'access_token' in result:
  158. expires_at = (datetime.now(timezone.utc) + timedelta(seconds=result['expires_in'])).timestamp()
  159. g.twitter_user['expires_at'] = expires_at
  160. g.twitter_user['refresh_token'] = result['refresh_token']
  161. g.twitter_user['access_token'] = result['access_token']
  162. session[ g.me ] = g.twitter_user
  163. return response.text
  164. @oauth2_login.route('/refresh-token', methods=['GET'])
  165. def get_twitter_refresh_token (response_format='json'):
  166. refresh_token()
  167. return 'ok'
  168. @oauth2_login.route('/app/refresh-token', methods=['GET'])
  169. def get_twitter_app_refresh_token ():
  170. basic_auth = base64.b64encode('{}:{}'.format(app_consumer_key, app_secret_key).encode('utf-8')).decode('utf-8')
  171. headers = {
  172. 'Authorization': 'Basic ' + basic_auth,
  173. }
  174. data = {
  175. 'grant_type': 'client_credentials'
  176. }
  177. response = requests.post('https://api.twitter.com/oauth2/token', data=data, headers=headers)
  178. result = json.loads(response.text)
  179. if 'access_token' in result:
  180. app_access_token = result['access_token']
  181. return 'ok'