import json
import threading
from datetime import timedelta
import pandas as pd
import pendulum
import requests
class WeatherUnderground:
""" WeatherUnderground class """
def __init__(self, city, api_key):
self.city = city
self.api_key = api_key
def get_weather_by_date_range_threaded(self, _start, _end):
""" Get weather data by date range
:param _start: start date
:param _end: end date
"""
_end_time, _valid_time = self.get_end_and_start_time(_end, _start)
data, worker = self.create_worker()
_df = self.get_data(_end_time, _valid_time, data, worker)
_df = self.clean_data(_df)
_df = self.prepare_data(_df)
return _df
@staticmethod
def prepare_data(_df):
_df = _df.sort_values(by="valid_time_gmt") # Sort dataframe by valid time
_df["datetime"] = pd.to_datetime(_df["valid_time_gmt"] * 1e9) + timedelta(hours=7)
_df = _df.drop(columns=["valid_time_gmt"]) # Drop valid time column
_df = _df.set_index("datetime") # Set index to datetime
_df = _df.rename(columns={'temp': 'temp_wu', 'dewPt': 'dew_point'})
_df['temp_wu'] = (_df['temp_wu'] - 32) * 5 / 9 # Convert temperature to Celsius
_df['dew_point'] = (_df['dew_point'] - 32) * 5 / 9 # Convert dew point to Celsius
return _df
def get_data(self, _end_time, _valid_time, data, worker):
""" This method used to get data from API """
num_th = 20 # Number of threads to run
while _end_time > _valid_time:
threads = [] # Initialize threads list
for i in range(num_th):
if _end_time <= _valid_time: # If end time is less than valid time, break
break
dt = _valid_time.strftime("%Y%m%d") # Convert valid time to string
t = threading.Thread(target=worker, args=(dt, self.city, self.api_key)) # Create thread
threads.append(t) # Add thread to thread list
_valid_time = _valid_time.add(days=1)
for t in threads:
t.start() # Start threads
for t in threads:
t.join() # Join threads
_df = pd.concat(data.values()) # Concatenate dataframes
return _df
@staticmethod
def create_worker():
""" Create worker function. """
data = {} # Initialize data dictionary
def worker(_dt, _city, _api_key):
a = requests.get(
f'https://api.weather.com/v1/location/{_city}/observations/historical.json?apiKey={_api_key}&units=e'
f'&startDate={_dt}')
b = json.loads(a.text) # Convert json to dictionary
c = pd.DataFrame(b['observations']) # Convert dictionary to dataframe
data.update({_dt: c}) # Update data dictionary
return data, worker
def get_end_and_start_time(self, _end, _start):
_valid_time = self.convert_date_string_to_datetime(_start)
_end_time = self.convert_date_string_to_datetime(_end)
return _end_time, _valid_time
@staticmethod
def clean_data(_df):
""" Clean data. """
columns = ['key', 'class', 'expire_time_gmt', 'obs_id', 'obs_name']
_df = _df.drop(columns=columns) # Drop irrelevant columns
_df = _df.dropna(axis=1, thresh=int(len(_df) * 0.9)) # Drop columns with more than 90% of data as null.
return _df
@staticmethod
def convert_date_string_to_datetime(date_string):
""" Convert date string to datetime object.
:param date_string: Date string in format YYYY-MM-DD
:return: datetime object
"""
return pendulum.parse(date_string, tz='Asia/Bangkok')
if __name__ == "__main__":
start = "2019-01-01"
end = "2019-01-31"
wu = WeatherUnderground("YSCB:9:AU", "YOUR_API_KEY")
df = wu.get_weather_by_date_range_threaded(start, end)
print(df)