ScrapeMaster 1.0 - Installation Guide
Discover the project in action
Watch Video1️⃣ Create a Virtual Environment
Run the following command to create and activate a virtual environment:
bash
python -m venv venv
2️⃣ Install Dependencies
Create a requirements.txt
file and copy the following dependencies:
plaintext
openaipython-dotenvpandaspydanticrequestsbeautifulsoup4html2texttiktokenseleniumreadability-lxmlstreamlitstreamlit-tagsopenpyxl
Then install all dependencies with:
bash
pip install -r requirements.txt
3️⃣ Add Your API Key
Create a .env
file and add your OpenAI API key:
plaintext
OPENAI_API_KEY=sk-xxxxxxxx(place your own key)
4️⃣ Download ChromeDriver
Download ChromeDriver from the official website:Chrome for Testing availability
5️⃣ Create the Scraper Script
Save the following script as scraper.py
:
python
import osimport timeimport reimport jsonfrom datetime import datetimefrom typing import List, Dict, Typeimport pandas as pdfrom bs4 import BeautifulSoupfrom pydantic import BaseModel, Field, create_modelimport html2textimport tiktokenfrom dotenv import load_dotenvfrom selenium import webdriverfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.by import Byfrom openai import OpenAIload_dotenv()# Set up the Chrome WebDriver optionsdef setup_selenium():options = Options()# adding argumentsoptions.add_argument("--disable-gpu")options.add_argument("--disable-dev-shm-usage")options.add_argument("--window-size=1920,1080")# Randomize user-agent to mimic different usersoptions.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")# Specify the path to the ChromeDriverservice = Service(r"./chromedriver-win64/chromedriver.exe")# Initialize the WebDriverdriver = webdriver.Chrome(service=service, options=options)return driverdef fetch_html_selenium(url):driver = setup_selenium()try:driver.get(url)# Add random delays to mimic human behaviortime.sleep(5) # Adjust this to simulate time for user to read or interact# Add more realistic actions like scrollingdriver.execute_script("window.scrollTo(0, document.body.scrollHeight);")time.sleep(3) # Simulate time taken to scroll and readhtml = driver.page_sourcereturn htmlfinally:driver.quit()def clean_html(html_content):soup = BeautifulSoup(html_content, 'html.parser')# Remove headers and footers based on common HTML tags or classesfor element in soup.find_all(['header', 'footer']):element.decompose() # Remove these tags and their contentreturn str(soup)def html_to_markdown_with_readability(html_content):cleaned_html = clean_html(html_content)# Convert to markdownmarkdown_converter = html2text.HTML2Text()markdown_converter.ignore_links = Falsemarkdown_content = markdown_converter.handle(cleaned_html)return markdown_content# Define the pricing for gpt-4o-mini without Batch APIpricing = {"gpt-4o-mini": {"input": 0.150 / 1_000_000, # $0.150 per 1M input tokens"output": 0.600 / 1_000_000, # $0.600 per 1M output tokens},"gpt-4o-2024-08-06": {"input": 2.5 / 1_000_000, # $0.150 per 1M input tokens"output": 10 / 1_000_000, # $0.600 per 1M output tokens},# Add other models and their prices here if needed}model_used="gpt-4o-mini"def save_raw_data(raw_data, timestamp, output_folder='output'):# Ensure the output folder existsos.makedirs(output_folder, exist_ok=True)# Save the raw markdown data with timestamp in filenameraw_output_path = os.path.join(output_folder, f'rawData_{timestamp}.md')with open(raw_output_path, 'w', encoding='utf-8') as f:f.write(raw_data)print(f"Raw data saved to {raw_output_path}")return raw_output_pathdef remove_urls_from_file(file_path):# Regex pattern to find URLsurl_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'# Construct the new file namebase, ext = os.path.splitext(file_path)new_file_path = f"{base}_cleaned{ext}"# Read the original markdown contentwith open(file_path, 'r', encoding='utf-8') as file:markdown_content = file.read()# Replace all found URLs with an empty stringcleaned_content = re.sub(url_pattern, '', markdown_content)# Write the cleaned content to a new filewith open(new_file_path, 'w', encoding='utf-8') as file:file.write(cleaned_content)print(f"Cleaned file saved as: {new_file_path}")return cleaned_contentdef create_dynamic_listing_model(field_names: List[str]) -> Type[BaseModel]:"""Dynamically creates a Pydantic model based on provided fields.field_name is a list of names of the fields to extract from the markdown."""# Create field definitions using aliases for Field parametersfield_definitions = {field: (str, ...) for field in field_names}# Dynamically create the model with all fieldreturn create_model('DynamicListingModel', **field_definitions)def create_listings_container_model(listing_model: Type[BaseModel]) -> Type[BaseModel]:"""Create a container model that holds a list of the given listing model."""return create_model('DynamicListingsContainer', listings=(List[listing_model], ...))def trim_to_token_limit(text, model, max_tokens=200000):encoder = tiktoken.encoding_for_model(model)tokens = encoder.encode(text)if len(tokens) > max_tokens:trimmed_text = encoder.decode(tokens[:max_tokens])return trimmed_textreturn textdef format_data(data, DynamicListingsContainer):client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))system_message = """You are an intelligent text extraction and conversion assistant. Your task is to extract structured informationfrom the given text and convert it into a pure JSON format. The JSON should contain only the structured data extracted from the text,with no additional commentary, explanations, or extraneous information.You could encounter cases where you can't find the data of the fields you have to extract or the data will be in a foreign language.Please process the following text and provide the output in pure JSON format with no words before or after the JSON:"""user_message = f"Extract the following information from the provided text:\nPage content:\n\n{data}"completion = client.beta.chat.completions.parse(model=model_used,messages=[{"role": "system", "content": system_message},{"role": "user", "content": user_message},],response_format=DynamicListingsContainer)return completion.choices[0].message.parseddef save_formatted_data(formatted_data, timestamp, output_folder='output'):# Ensure the output folder existsos.makedirs(output_folder, exist_ok=True)# Prepare formatted data as a dictionaryformatted_data_dict = formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data# Save the formatted data as JSON with timestamp in filenamejson_output_path = os.path.join(output_folder, f'sorted_data_{timestamp}.json')with open(json_output_path, 'w', encoding='utf-8') as f:json.dump(formatted_data_dict, f, indent=4)print(f"Formatted data saved to JSON at {json_output_path}")# Prepare data for DataFrameif isinstance(formatted_data_dict, dict):# If the data is a dictionary containing lists, assume these lists are recordsdata_for_df = next(iter(formatted_data_dict.values())) if len(formatted_data_dict) == 1 else formatted_data_dictelif isinstance(formatted_data_dict, list):data_for_df = formatted_data_dictelse:raise ValueError("Formatted data is neither a dictionary nor a list, cannot convert to DataFrame")# Create DataFrametry:df = pd.DataFrame(data_for_df)print("DataFrame created successfully.")# Save the DataFrame to an Excel fileexcel_output_path = os.path.join(output_folder, f'sorted_data_{timestamp}.xlsx')df.to_excel(excel_output_path, index=False)print(f"Formatted data saved to Excel at {excel_output_path}")return dfexcept Exception as e:print(f"Error creating DataFrame or saving Excel: {str(e)}")return Nonedef calculate_price(input_text, output_text, model=model_used):# Initialize the encoder for the specific modelencoder = tiktoken.encoding_for_model(model)# Encode the input text to get the number of input tokensinput_token_count = len(encoder.encode(input_text))# Encode the output text to get the number of output tokensoutput_token_count = len(encoder.encode(output_text))# Calculate the costsinput_cost = input_token_count * pricing[model]["input"]output_cost = output_token_count * pricing[model]["output"]total_cost = input_cost + output_costreturn input_token_count, output_token_count, total_costif __name__ == "__main__":url = 'https://news.ycombinator.com/'fields=['Title', 'Number of Points', 'Creator', 'Time Posted', 'Number of Comments']try:# # Generate timestamptimestamp = datetime.now().strftime('%Y%m%d_%H%M%S')# Scrape dataraw_html = fetch_html_selenium(url)markdown = html_to_markdown_with_readability(raw_html)# Save raw datasave_raw_data(markdown, timestamp)# Create the dynamic listing modelDynamicListingModel = create_dynamic_listing_model(fields)# Create the container model that holds a list of the dynamic listing modelsDynamicListingsContainer = create_listings_container_model(DynamicListingModel)# Format dataformatted_data = format_data(markdown, DynamicListingsContainer) # Use markdown, not raw_html# Save formatted datasave_formatted_data(formatted_data, timestamp)# Convert formatted_data back to text for token countingformatted_data_text = json.dumps(formatted_data.dict())# Automatically calculate the token usage and cost for all input and outputinput_tokens, output_tokens, total_cost = calculate_price(markdown, formatted_data_text, model=model_used)print(f"Input token count: {input_tokens}")print(f"Output token count: {output_tokens}")print(f"Estimated total cost: ${total_cost:.4f}")except Exception as e:print(f"An error occurred: {e}")
6️⃣ Create the Streamlit App
Save the following as streamlit_app.py
:
python
import streamlit as stfrom streamlit_tags import st_tags_sidebarimport pandas as pdimport jsonfrom datetime import datetimefrom scraper import fetch_html_selenium, save_raw_data, format_data, save_formatted_data, calculate_price,html_to_markdown_with_readability, create_dynamic_listing_model,create_listings_container_model# Initialize Streamlit appst.set_page_config(page_title="Universal Web Scraper")st.title("Universal Web Scraper 🦑")# Sidebar componentsst.sidebar.title("Web Scraper Settings")model_selection = st.sidebar.selectbox("Select Model", options=["gpt-4o-mini", "gpt-4o-2024-08-06"], index=0)url_input = st.sidebar.text_input("Enter URL")# Tags input specifically in the sidebartags = st.sidebar.empty() # Create an empty placeholder in the sidebartags = st_tags_sidebar(label='Enter Fields to Extract:',text='Press enter to add a tag',value=[], # Default values if anysuggestions=[], # You can still offer suggestions, or keep it empty for complete freedommaxtags=-1, # Set to -1 for unlimited tagskey='tags_input')st.sidebar.markdown("---")# Process tags into a listfields = tags# Initialize variables to store token and cost informationinput_tokens = output_tokens = total_cost = 0 # Default values# Buttons to trigger scraping# Define the scraping functiondef perform_scrape():timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')raw_html = fetch_html_selenium(url_input)markdown = html_to_markdown_with_readability(raw_html)save_raw_data(markdown, timestamp)DynamicListingModel = create_dynamic_listing_model(fields)DynamicListingsContainer = create_listings_container_model(DynamicListingModel)formatted_data = format_data(markdown, DynamicListingsContainer)formatted_data_text = json.dumps(formatted_data.dict())input_tokens, output_tokens, total_cost = calculate_price(markdown, formatted_data_text, model=model_selection)df = save_formatted_data(formatted_data, timestamp)return df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp# Handling button press for scrapingif 'perform_scrape' not in st.session_state:st.session_state['perform_scrape'] = Falseif st.sidebar.button("Scrape"):with st.spinner('Please wait... Data is being scraped.'):st.session_state['results'] = perform_scrape()st.session_state['perform_scrape'] = Trueif st.session_state.get('perform_scrape'):df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp = st.session_state['results']# Display the DataFrame and other datast.write("Scraped Data:", df)st.sidebar.markdown("## Token Usage")st.sidebar.markdown(f"**Input Tokens:** {input_tokens}")st.sidebar.markdown(f"**Output Tokens:** {output_tokens}")st.sidebar.markdown(f"**Total Cost:** :green-background[***${total_cost:.4f}***]")# Create columns for download buttonscol1, col2, col3 = st.columns(3)with col1:st.download_button("Download JSON", data=json.dumps(formatted_data.dict(), indent=4), file_name=f"{timestamp}_data.json")with col2:# Convert formatted data to a dictionary if it's not already (assuming it has a .dict() method)data_dict = formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data# Access the data under the dynamic keyfirst_key = next(iter(data_dict)) # Safely get the first keymain_data = data_dict[first_key] # Access data using this key# Create DataFrame from the datadf = pd.DataFrame(main_data)# data_dict=json.dumps(formatted_data.dict(), indent=4)st.download_button("Download CSV", data=df.to_csv(index=False), file_name=f"{timestamp}_data.csv")with col3:st.download_button("Download Markdown", data=markdown, file_name=f"{timestamp}_data.md")# Ensure that these UI components are persistent and don't rely on re-running the scrape functionif 'results' in st.session_state:df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp = st.session_state['results']
7️⃣ Run the Streamlit App
Run the following command:
bash
streamlit run streamlit_app.py