123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- import base64
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify
- from flask_cors import CORS
- import sqlite3
- import os
- import json
- from datetime import datetime, timedelta, timezone
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- import hashlib
- import re
- from requests.auth import AuthBase, HTTPBasicAuth
- from requests_oauthlib import OAuth2Session
- from flask import url_for as og_url_for
- app_access_token = os.environ.get("BEARER_TOKEN")
- app_consumer_key = os.environ.get("TWITTER_CONSUMER_KEY")
- app_secret_key = os.environ.get("TWITTER_CONSUMER_SECRET")
- TWITTER_SCOPES = ["bookmark.read", "bookmark.write", "tweet.read", "tweet.write", "dm.read", "users.read", "like.read", "offline.access", "follows.read"]
- oauth2_login = Blueprint('oauth2_login', 'oauth2_login',
- static_folder='static',
- static_url_path='',
- url_prefix='/oauth2')
- def url_for_with_me (route, *args, **kwargs):
- #print('url_for_with_me')
- if route.endswith('.static') or not 'me' in g:
- return og_url_for(route, *args, **kwargs)
-
- return og_url_for(route, *args, **{'me': g.me, **kwargs})
- @oauth2_login.before_request
- def add_me ():
- g.me = request.args.get('me')
-
- #if me.startswith('twitter') and me in session:
- g.twitter_user = session.get(g.me)
-
- if not g.twitter_user:
- return
-
- now = datetime.now(timezone.utc).timestamp()
-
- if g.twitter_user['expires_at'] < now - 10:
- print('token is expired... refreshing')
- # FIXME we should have a lock here in case 2 requests quickly come in.
- # the later will fail as of now. should be rare since static resources aren't authenticated
- # and we don't use APIs w/ JS.
- refresh_token()
- @oauth2_login.context_processor
- def inject_me():
-
- return {'me': g.me, 'twitter_user': g.twitter_user, 'url_for': url_for_with_me}
-
- @oauth2_login.route('/logout.html')
- def get_logout_html ():
- del session[g.me]
- return redirect('/')
-
-
- # def add_me(endpoint, values):
- ##values['me'] = request.args.get('me')
- # g.me = request.args.get('me')
- # twitter_app.url_value_preprocessor(add_me)
- @oauth2_login.route('/logged-in.html')
- def get_loggedin_html ():
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
-
- code_verifier = session['twitter_code_verifier']
-
- code = request.args.get('code')
- state = request.args.get('state')
-
- endpoint_url = g.app_url
- redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
-
- authorization_response = redirect_uri + '?code={}&state={}'.format(code, state)
-
- # Fetch your access token
- token_url = "https://api.twitter.com/2/oauth2/token"
-
-
- # The following line of code will only work if you are using a type of App that is a public client
- auth = False
- # If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret.
- # Please remove the comment on the following line if you are using a type of App that is a confidential client
- auth = HTTPBasicAuth(client_id, client_secret)
-
- scopes = TWITTER_SCOPES
- #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
- oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
-
- token = oauth.fetch_token(
- token_url=token_url,
- authorization_response=authorization_response,
- auth=auth,
- client_id=client_id,
- include_client_id=True,
- code_verifier=code_verifier,
- )
- # Your access token
- access = token["access_token"]
- refresh = token["refresh_token"]
- #expires_at = token["expires_at"] # expires_in
- expires_at = (datetime.now(timezone.utc) + timedelta(seconds=token['expires_in'])).timestamp()
-
-
- # Make a request to the users/me endpoint to get your user ID
- user_me = requests.request(
- "GET",
- "https://api.twitter.com/2/users/me",
- headers={"Authorization": "Bearer {}".format(access)},
- ).json()
- user_id = user_me["data"]["id"]
- user_username = user_me["data"]["username"]
- user_name = user_me["data"]["name"]
-
- del session['twitter_code_verifier']
-
- me = 'twitter:{}'.format(user_id)
-
- session[ me ] = {
- 'expires_at': expires_at,
- 'access_token': access,
- 'refresh_token': refresh,
- 'id': user_id,
- 'display_name': user_name,
- 'username': user_username
- }
-
- g.me = me
- g.twitter_user = session[ me ]
-
- #run_script('on_loggedin', {'twitter_user': g.twitter_user})
-
- return redirect(url_for_with_me('twitter_v2_facade.get_timeline_home_html'))
-
- @oauth2_login.route('/login.html')
- def get_login_html ():
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
-
- #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
-
- endpoint_url = g.app_url
- redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
-
- # Set the scopes
- scopes = TWITTER_SCOPES
- # Create a code verifier
- code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
- code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
- # Create a code challenge
- code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
- code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
- code_challenge = code_challenge.replace("=", "")
- # Start an OAuth 2.0 session
- oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
- # Create an authorize URL
- auth_url = "https://twitter.com/i/oauth2/authorize"
- authorization_url, state = oauth.authorization_url(
- auth_url, code_challenge=code_challenge, code_challenge_method="S256"
- )
-
- session['twitter_code_verifier'] = code_verifier
-
-
- return redirect(authorization_url)
- def refresh_token ():
- """
- Refresh twitter oauth access_token with refresh token.
-
- Works in the session, so must be done within a logged in request context.
-
- """
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
- token = g.twitter_user['refresh_token']
-
- basic_auth = base64.b64encode('{}:{}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8')
-
- headers = {
- 'Authorization': 'Basic ' + basic_auth,
- }
-
- data = {
- 'refresh_token': token,
- 'grant_type': 'refresh_token'
- }
-
-
- response = requests.post('https://api.twitter.com/2/oauth2/token', data=data, headers=headers)
-
- result = json.loads(response.text)
-
- if 'access_token' in result:
- expires_at = (datetime.now(timezone.utc) + timedelta(seconds=result['expires_in'])).timestamp()
- g.twitter_user['expires_at'] = expires_at
- g.twitter_user['refresh_token'] = result['refresh_token']
- g.twitter_user['access_token'] = result['access_token']
-
- session[ g.me ] = g.twitter_user
-
- return response.text
- @oauth2_login.route('/refresh-token', methods=['GET'])
- def get_twitter_refresh_token (response_format='json'):
- refresh_token()
-
- return 'ok'
- @oauth2_login.route('/app/refresh-token', methods=['GET'])
- def get_twitter_app_refresh_token ():
-
- basic_auth = base64.b64encode('{}:{}'.format(app_consumer_key, app_secret_key).encode('utf-8')).decode('utf-8')
-
- headers = {
- 'Authorization': 'Basic ' + basic_auth,
- }
-
- data = {
- 'grant_type': 'client_credentials'
- }
-
- response = requests.post('https://api.twitter.com/oauth2/token', data=data, headers=headers)
-
- result = json.loads(response.text)
-
- if 'access_token' in result:
-
- app_access_token = result['access_token']
-
-
-
- return 'ok'
|