ScrapeMaster 2.0 - Installation Guide
Discover the project in action
Watch Video1️⃣ Create a Virtual Environment
Run the following command to create and activate a virtual environment:
bash
python -m venv venv
2️⃣ Install Dependencies
Create a requirements.txt
file and copy the following dependencies:
plaintext
openaipython-dotenvpandaspydanticrequestsbeautifulsoup4html2texttiktokenseleniumreadability-lxmlstreamlitstreamlit-tagsopenpyxlgroqgoogle-generativeai
Then install all dependencies with:
bash
pip install -r requirements.txt
3️⃣ Add Your API Key
Create a .env
file and add your OpenAI API key:
plaintext
OPENAI_API_KEY=sk-xxxxxxxx(place your own key)GOOGLE_API_KEY=AIzaSyxxxxxxxGROQ_API_KEY=gskxxxxxxxxx
4️⃣ Download ChromeDriver
Download ChromeDriver from the official website:Chrome for Testing availability
5️⃣ Create the Scraper Script
Save the following script as scraper.py
:
python
import osimport randomimport timeimport reimport jsonfrom datetime import datetimefrom typing import List, Dict, Typeimport pandas as pdfrom bs4 import BeautifulSoupfrom pydantic import BaseModel, Field, create_modelimport html2textimport tiktokenfrom dotenv import load_dotenvfrom selenium import webdriverfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.common.action_chains import ActionChainsfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom openai import OpenAIimport google.generativeai as genaifrom groq import Groqfrom assets import USER_AGENTS,PRICING,HEADLESS_OPTIONS,SYSTEM_MESSAGE,USER_MESSAGE,LLAMA_MODEL_FULLNAME,GROQ_LLAMA_MODEL_FULLNAMEload_dotenv()# Set up the Chrome WebDriver optionsdef setup_selenium():options = Options()# Randomly select a user agent from the imported listuser_agent = random.choice(USER_AGENTS)options.add_argument(f"user-agent={user_agent}")# Add other optionsfor option in HEADLESS_OPTIONS:options.add_argument(option)# Specify the path to the ChromeDriverservice = Service(r"./chromedriver-win64/chromedriver.exe")# Initialize the WebDriverdriver = webdriver.Chrome(service=service, options=options)return driverdef click_accept_cookies(driver):"""Tries to find and click on a cookie consent button. It looks for several common patterns."""try:# Wait for cookie popup to loadWebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, "//button | //a | //div")))# Common text variations for cookie buttonsaccept_text_variations = ["accept", "agree", "allow", "consent", "continue", "ok", "I agree", "got it"]# Iterate through different element types and common text variationsfor tag in ["button", "a", "div"]:for text in accept_text_variations:try:# Create an XPath to find the button by textelement = driver.find_element(By.XPATH, f"//{tag}[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text}')]")if element:element.click()print(f"Clicked the '{text}' button.")returnexcept:continueprint("No 'Accept Cookies' button found.")except Exception as e:print(f"Error finding 'Accept Cookies' button: {e}")def fetch_html_selenium(url):driver = setup_selenium()try:driver.get(url)# Add random delays to mimic human behaviortime.sleep(1) # Adjust this to simulate time for user to read or interactdriver.maximize_window()# Try to find and click the 'Accept Cookies' button# click_accept_cookies(driver)# Add more realistic actions like scrollingdriver.execute_script("window.scrollTo(0, document.body.scrollHeight);")time.sleep(2) # Simulate time taken to scroll and readdriver.execute_script("window.scrollTo(0, document.body.scrollHeight);")time.sleep(1)html = driver.page_sourcereturn htmlfinally:driver.quit()def clean_html(html_content):soup = BeautifulSoup(html_content, 'html.parser')# Remove headers and footers based on common HTML tags or classesfor element in soup.find_all(['header', 'footer']):element.decompose() # Remove these tags and their contentreturn str(soup)def html_to_markdown_with_readability(html_content):cleaned_html = clean_html(html_content)# Convert to markdownmarkdown_converter = html2text.HTML2Text()markdown_converter.ignore_links = Falsemarkdown_content = markdown_converter.handle(cleaned_html)return markdown_contentdef save_raw_data(raw_data, timestamp, output_folder='output'):# Ensure the output folder existsos.makedirs(output_folder, exist_ok=True)# Save the raw markdown data with timestamp in filenameraw_output_path = os.path.join(output_folder, f'rawData_{timestamp}.md')with open(raw_output_path, 'w', encoding='utf-8') as f:f.write(raw_data)print(f"Raw data saved to {raw_output_path}")return raw_output_pathdef remove_urls_from_file(file_path):# Regex pattern to find URLsurl_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'# Construct the new file namebase, ext = os.path.splitext(file_path)new_file_path = f"{base}_cleaned{ext}"# Read the original markdown contentwith open(file_path, 'r', encoding='utf-8') as file:markdown_content = file.read()# Replace all found URLs with an empty stringcleaned_content = re.sub(url_pattern, '', markdown_content)# Write the cleaned content to a new filewith open(new_file_path, 'w', encoding='utf-8') as file:file.write(cleaned_content)print(f"Cleaned file saved as: {new_file_path}")return cleaned_contentdef create_dynamic_listing_model(field_names: List[str]) -> Type[BaseModel]:"""Dynamically creates a Pydantic model based on provided fields.field_name is a list of names of the fields to extract from the markdown."""# Create field definitions using aliases for Field parametersfield_definitions = {field: (str, ...) for field in field_names}# Dynamically create the model with all fieldreturn create_model('DynamicListingModel', **field_definitions)def create_listings_container_model(listing_model: Type[BaseModel]) -> Type[BaseModel]:"""Create a container model that holds a list of the given listing model."""return create_model('DynamicListingsContainer', listings=(List[listing_model], ...))def trim_to_token_limit(text, model, max_tokens=120000):encoder = tiktoken.encoding_for_model(model)tokens = encoder.encode(text)if len(tokens) > max_tokens:trimmed_text = encoder.decode(tokens[:max_tokens])return trimmed_textreturn textdef generate_system_message(listing_model: BaseModel) -> str:"""Dynamically generate a system message based on the fields in the provided listing model."""# Use the model_json_schema() method to introspect the Pydantic modelschema_info = listing_model.model_json_schema()# Extract field descriptions from the schemafield_descriptions = []for field_name, field_info in schema_info["properties"].items():# Get the field type from the schema infofield_type = field_info["type"]field_descriptions.append(f'"{field_name}": "{field_type}"')# Create the JSON schema structure for the listingsschema_structure = ",\n".join(field_descriptions)# Generate the system message dynamicallysystem_message = f"""You are an intelligent text extraction and conversion assistant. Your task is to extract structured informationfrom the given text and convert it into a pure JSON format. The JSON should contain only the structured data extracted from the text,with no additional commentary, explanations, or extraneous information.You could encounter cases where you can't find the data of the fields you have to extract or the data will be in a foreign language.Please process the following text and provide the output in pure JSON format with no words before or after the JSON:Please ensure the output strictly follows this schema:{{"listings": [{{{schema_structure}}}]}} """return system_messagedef format_data(data, DynamicListingsContainer, DynamicListingModel, selected_model):token_counts = {}if selected_model in ["gpt-4o-mini", "gpt-4o-2024-08-06"]:# Use OpenAI APIclient = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))completion = client.beta.chat.completions.parse(model=selected_model,messages=[{"role": "system", "content": SYSTEM_MESSAGE},{"role": "user", "content": USER_MESSAGE + data},],response_format=DynamicListingsContainer)# Calculate tokens using tiktokenencoder = tiktoken.encoding_for_model(selected_model)input_token_count = len(encoder.encode(USER_MESSAGE + data))output_token_count = len(encoder.encode(json.dumps(completion.choices[0].message.parsed.dict())))token_counts = {"input_tokens": input_token_count,"output_tokens": output_token_count}return completion.choices[0].message.parsed, token_countselif selected_model == "gemini-1.5-flash":# Use Google Gemini APIgenai.configure(api_key=os.getenv("GOOGLE_API_KEY"))model = genai.GenerativeModel('gemini-1.5-flash',generation_config={"response_mime_type": "application/json","response_schema": DynamicListingsContainer})prompt = SYSTEM_MESSAGE + "\n" + USER_MESSAGE + data# Count input tokens using Gemini's methodinput_tokens = model.count_tokens(prompt)completion = model.generate_content(prompt)# Extract token counts from usage_metadatausage_metadata = completion.usage_metadatatoken_counts = {"input_tokens": usage_metadata.prompt_token_count,"output_tokens": usage_metadata.candidates_token_count}return completion.text, token_countselif selected_model == "Llama3.1 8B":# Dynamically generate the system message based on the schemasys_message = generate_system_message(DynamicListingModel)# print(SYSTEM_MESSAGE)# Point to the local serverclient = OpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio")completion = client.chat.completions.create(model=LLAMA_MODEL_FULLNAME, #change this if needed (use a better model)messages=[{"role": "system", "content": sys_message},{"role": "user", "content": USER_MESSAGE + data}],temperature=0.7,)# Extract the content from the responseresponse_content = completion.choices[0].message.contentprint(response_content)# Convert the content from JSON string to a Python dictionaryparsed_response = json.loads(response_content)# Extract token usagetoken_counts = {"input_tokens": completion.usage.prompt_tokens,"output_tokens": completion.usage.completion_tokens}return parsed_response, token_countselif selected_model== "Groq Llama3.1 70b":# Dynamically generate the system message based on the schemasys_message = generate_system_message(DynamicListingModel)# print(SYSTEM_MESSAGE)# Point to the local serverclient = Groq(api_key=os.environ.get("GROQ_API_KEY"),)completion = client.chat.completions.create(messages=[{"role": "system","content": sys_message},{"role": "user","content": USER_MESSAGE + data}],model=GROQ_LLAMA_MODEL_FULLNAME,)# Extract the content from the responseresponse_content = completion.choices[0].message.content# Convert the content from JSON string to a Python dictionaryparsed_response = json.loads(response_content)# completion.usagetoken_counts = {"input_tokens": completion.usage.prompt_tokens,"output_tokens": completion.usage.completion_tokens}return parsed_response, token_countselse:raise ValueError(f"Unsupported model: {selected_model}")def save_formatted_data(formatted_data, timestamp, output_folder='output'):# Ensure the output folder existsos.makedirs(output_folder, exist_ok=True)# Parse the formatted data if it's a JSON string (from Gemini API)if isinstance(formatted_data, str):try:formatted_data_dict = json.loads(formatted_data)except json.JSONDecodeError:raise ValueError("The provided formatted data is a string but not valid JSON.")else:# Handle data from OpenAI or other sourcesformatted_data_dict = formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data# Save the formatted data as JSON with timestamp in filenamejson_output_path = os.path.join(output_folder, f'sorted_data_{timestamp}.json')with open(json_output_path, 'w', encoding='utf-8') as f:json.dump(formatted_data_dict, f, indent=4)print(f"Formatted data saved to JSON at {json_output_path}")# Prepare data for DataFrameif isinstance(formatted_data_dict, dict):# If the data is a dictionary containing lists, assume these lists are recordsdata_for_df = next(iter(formatted_data_dict.values())) if len(formatted_data_dict) == 1 else formatted_data_dictelif isinstance(formatted_data_dict, list):data_for_df = formatted_data_dictelse:raise ValueError("Formatted data is neither a dictionary nor a list, cannot convert to DataFrame")# Create DataFrametry:df = pd.DataFrame(data_for_df)print("DataFrame created successfully.")# Save the DataFrame to an Excel fileexcel_output_path = os.path.join(output_folder, f'sorted_data_{timestamp}.xlsx')df.to_excel(excel_output_path, index=False)print(f"Formatted data saved to Excel at {excel_output_path}")return dfexcept Exception as e:print(f"Error creating DataFrame or saving Excel: {str(e)}")return Nonedef calculate_price(token_counts, model):input_token_count = token_counts.get("input_tokens", 0)output_token_count = token_counts.get("output_tokens", 0)# Calculate the costsinput_cost = input_token_count * PRICING[model]["input"]output_cost = output_token_count * PRICING[model]["output"]total_cost = input_cost + output_costreturn input_token_count, output_token_count, total_costif __name__ == "__main__":url = 'https://webscraper.io/test-sites/e-commerce/static'fields=['Name of item', 'Price']try:# # Generate timestamptimestamp = datetime.now().strftime('%Y%m%d_%H%M%S')# Scrape dataraw_html = fetch_html_selenium(url)markdown = html_to_markdown_with_readability(raw_html)# Save raw datasave_raw_data(markdown, timestamp)# Create the dynamic listing modelDynamicListingModel = create_dynamic_listing_model(fields)# Create the container model that holds a list of the dynamic listing modelsDynamicListingsContainer = create_listings_container_model(DynamicListingModel)# Format dataformatted_data, token_counts = format_data(markdown, DynamicListingsContainer,DynamicListingModel,"Groq Llama3.1 70b") # Use markdown, not raw_htmlprint(formatted_data)# Save formatted datasave_formatted_data(formatted_data, timestamp)# Convert formatted_data back to text for token countingformatted_data_text = json.dumps(formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data)# Automatically calculate the token usage and cost for all input and outputinput_tokens, output_tokens, total_cost = calculate_price(token_counts, "Groq Llama3.1 70b")print(f"Input token count: {input_tokens}")print(f"Output token count: {output_tokens}")print(f"Estimated total cost: ${total_cost:.4f}")except Exception as e:print(f"An error occurred: {e}")
6️⃣ Create the Streamlit App
Save the following as streamlit_app.py
:
python
import streamlit as stfrom streamlit_tags import st_tags_sidebarimport pandas as pdimport jsonfrom datetime import datetimefrom scraper import fetch_html_selenium, save_raw_data, format_data, save_formatted_data, calculate_price,html_to_markdown_with_readability, create_dynamic_listing_model,create_listings_container_modelfrom assets import PRICING# Initialize Streamlit appst.set_page_config(page_title="Universal Web Scraper")st.title("Universal Web Scraper 🦑")# Sidebar componentsst.sidebar.title("Web Scraper Settings")model_selection = st.sidebar.selectbox("Select Model", options=list(PRICING.keys()), index=0)url_input = st.sidebar.text_input("Enter URL")# Tags input specifically in the sidebartags = st.sidebar.empty() # Create an empty placeholder in the sidebartags = st_tags_sidebar(label='Enter Fields to Extract:',text='Press enter to add a tag',value=[], # Default values if anysuggestions=[], # You can still offer suggestions, or keep it empty for complete freedommaxtags=-1, # Set to -1 for unlimited tagskey='tags_input')st.sidebar.markdown("---")# Process tags into a listfields = tags# Initialize variables to store token and cost informationinput_tokens = output_tokens = total_cost = 0 # Default values# Buttons to trigger scraping# Define the scraping functiondef perform_scrape():timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')raw_html = fetch_html_selenium(url_input)markdown = html_to_markdown_with_readability(raw_html)save_raw_data(markdown, timestamp)DynamicListingModel = create_dynamic_listing_model(fields)DynamicListingsContainer = create_listings_container_model(DynamicListingModel)formatted_data, tokens_count = format_data(markdown, DynamicListingsContainer,DynamicListingModel,model_selection)input_tokens, output_tokens, total_cost = calculate_price(tokens_count, model=model_selection)df = save_formatted_data(formatted_data, timestamp)return df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp# Handling button press for scrapingif 'perform_scrape' not in st.session_state:st.session_state['perform_scrape'] = Falseif st.sidebar.button("Scrape"):with st.spinner('Please wait... Data is being scraped.'):st.session_state['results'] = perform_scrape()st.session_state['perform_scrape'] = Trueif st.session_state.get('perform_scrape'):df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp = st.session_state['results']# Display the DataFrame and other datast.write("Scraped Data:", df)st.sidebar.markdown("## Token Usage")st.sidebar.markdown(f"**Input Tokens:** {input_tokens}")st.sidebar.markdown(f"**Output Tokens:** {output_tokens}")st.sidebar.markdown(f"**Total Cost:** :green-background[***${total_cost:.4f}***]")# Create columns for download buttonscol1, col2, col3 = st.columns(3)with col1:st.download_button("Download JSON", data=json.dumps(formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data, indent=4), file_name=f"{timestamp}_data.json")with col2:# Convert formatted data to a dictionary if it's not already (assuming it has a .dict() method)if isinstance(formatted_data, str):# Parse the JSON string into a dictionarydata_dict = json.loads(formatted_data)else:data_dict = formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data# Access the data under the dynamic keyfirst_key = next(iter(data_dict)) # Safely get the first keymain_data = data_dict[first_key] # Access data using this key# Create DataFrame from the datadf = pd.DataFrame(main_data)# data_dict=json.dumps(formatted_data.dict(), indent=4)st.download_button("Download CSV", data=df.to_csv(index=False), file_name=f"{timestamp}_data.csv")with col3:st.download_button("Download Markdown", data=markdown, file_name=f"{timestamp}_data.md")# Ensure that these UI components are persistent and don't rely on re-running the scrape functionif 'results' in st.session_state:df, formatted_data, markdown, input_tokens, output_tokens, total_cost, timestamp = st.session_state['results']
7️⃣ Run the Streamlit App
Run the following command:
bash
streamlit run streamlit_app.py