Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 48 additions & 28 deletions linkedin_scraper/company.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .person import Person
import time
import os
from urllib.parse import quote_plus
import json

AD_BANNER_CLASSNAME = ('ad-banner-container', '__ad')
Expand Down Expand Up @@ -49,7 +50,7 @@ class Company(Scraper):
employees = []
headcount = None

def __init__(self, linkedin_url = None, name = None, about_us =None, website = None, phone = None, headquarters = None, founded = None, industry = None, company_type = None, company_size = None, specialties = None, showcase_pages =[], affiliated_companies = [], driver = None, scrape = True, get_employees = True, close_on_complete = True):
def __init__(self, linkedin_url = None, name = None, about_us =None, website = None, phone = None, headquarters = None, founded = None, industry = None, company_type = None, company_size = None, specialties = None, showcase_pages =[], affiliated_companies = [], driver = None, scrape = True, get_employees = True, close_on_complete = True, employee_search_keywords = None, timeout = 10):
self.linkedin_url = linkedin_url
self.name = name
self.about_us = about_us
Expand All @@ -63,6 +64,12 @@ def __init__(self, linkedin_url = None, name = None, about_us =None, website = N
self.specialties = specialties
self.showcase_pages = showcase_pages
self.affiliated_companies = affiliated_companies
self.employee_search_keywords = employee_search_keywords
self.timeout = timeout

# Validation: Check if keywords provided but get_employees is False
if employee_search_keywords and not get_employees:
raise ValueError("Cannot use employee_search_keywords when get_employees=False. Set get_employees=True to filter employees by keywords.")

if driver is None:
try:
Expand Down Expand Up @@ -118,53 +125,67 @@ def __parse_employee__(self, employee_raw):
# print(e)
return None

def get_employees(self, wait_time=10):

def get_employees(self, keywords=None):
total = []
list_css = "list-style-none"
employee_xpath = '//div[contains(@class, "artdeco-entity-lockup")]'
next_xpath = '//button[@aria-label="Next"]'
driver = self.driver

try:
see_all_employees = driver.find_element(By.XPATH,'//a[@data-control-name="topcard_see_all_employees"]')
except:
pass
driver.get(os.path.join(self.linkedin_url, "people"))

# Construct URL with keyword search if provided
people_url = f"{self.linkedin_url}/people"
if keywords:
# Join keywords and URL encode them
keyword_string = " ".join(keywords) if isinstance(keywords, list) else str(keywords)
encoded_keywords = quote_plus(keyword_string)
people_url = f"{people_url}/?keywords={encoded_keywords}"

driver.get(people_url)

_ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.XPATH, '//span[@dir="ltr"]')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.XPATH, employee_xpath)))

driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));")
time.sleep(1)
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight*3/4));")
time.sleep(1)

results_list = driver.find_element(By.CLASS_NAME, list_css)
results_li = results_list.find_elements(By.TAG_NAME, "li")
for res in results_li:
total.append(self.__parse_employee__(res))
# Get employee elements directly
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
for res in employee_elements:
employee = self.__parse_employee__(res)
if employee:
total.append(employee)

def is_loaded(previous_results):
loop = 0
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));")
results_li = results_list.find_elements(By.TAG_NAME, "li")
while len(results_li) == previous_results and loop <= 5:
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
while len(employee_elements) == previous_results and loop <= 5:
time.sleep(1)
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));")
results_li = results_list.find_elements(By.TAG_NAME, "li")
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
loop += 1
return loop <= 5

def get_data(previous_results):
results_li = results_list.find_elements(By.TAG_NAME, "li")
for res in results_li[previous_results:]:
total.append(self.__parse_employee__(res))

results_li_len = len(results_li)
while is_loaded(results_li_len):
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
for res in employee_elements[previous_results:]:
employee = self.__parse_employee__(res)
if employee:
total.append(employee)

employee_count = len(employee_elements)
while is_loaded(employee_count):
try:
driver.find_element(By.XPATH,next_xpath).click()
except:
pass
_ = WebDriverWait(driver, wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, list_css)))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.XPATH, employee_xpath)))

driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));")
time.sleep(1)
Expand All @@ -175,8 +196,8 @@ def get_data(previous_results):
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));")
time.sleep(1)

get_data(results_li_len)
results_li_len = len(total)
get_data(employee_count)
employee_count = len(total)
return total


Expand All @@ -186,7 +207,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):

driver.get(self.linkedin_url)

_ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.XPATH, '//div[@dir="ltr"]')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.XPATH, '//div[@dir="ltr"]')))

navigation = driver.find_element(By.CLASS_NAME, "org-page-navigation__items ")

Expand All @@ -201,7 +222,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):
except:
driver.get(os.path.join(self.linkedin_url, "about"))

_ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.TAG_NAME, 'section')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.TAG_NAME, 'section')))
time.sleep(3)

if 'Cookie Policy' in driver.find_elements(By.TAG_NAME, "section")[1].text or any(classname in driver.find_elements(By.TAG_NAME, "section")[1].get_attribute('class') for classname in AD_BANNER_CLASSNAME):
Expand Down Expand Up @@ -255,9 +276,8 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):

driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));")


try:
_ = WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.CLASS_NAME, 'company-list')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.CLASS_NAME, 'company-list')))
showcase, affiliated = driver.find_elements(By.CLASS_NAME, "company-list")
driver.find_element(By.ID,"org-related-companies-module__show-more-btn").click()

Expand All @@ -284,7 +304,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):
pass

if get_employees:
self.employees = self.get_employees()
self.employees = self.get_employees(keywords=self.employee_search_keywords)

driver.get(self.linkedin_url)

Expand Down Expand Up @@ -313,7 +333,7 @@ def scrape_not_logged_in(self, close_on_complete = True, retry_limit = 10, get_e
# get showcase
try:
driver.find_element(By.ID,"view-other-showcase-pages-dialog").click()
WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.ID, 'dialog')))
WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.ID, 'dialog')))

showcase_pages = driver.find_elements(By.CLASS_NAME, "company-showcase-pages")[1]
for showcase_company in showcase_pages.find_elements(By.TAG_NAME, "li"):
Expand Down Expand Up @@ -344,7 +364,7 @@ def scrape_not_logged_in(self, close_on_complete = True, retry_limit = 10, get_e
pass

if get_employees:
self.employees = self.get_employees()
self.employees = self.get_employees(keywords=self.employee_search_keywords)

driver.get(self.linkedin_url)

Expand Down
144 changes: 73 additions & 71 deletions linkedin_scraper/job_search.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,93 @@
import os
from typing import List
from time import sleep
import urllib.parse
from selenium.common.exceptions import TimeoutException

from .objects import Scraper
from . import constants as c
from .jobs import Job

from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys


class JobSearch(Scraper):
AREAS = ["recommended_jobs", None, "still_hiring", "more_jobs"]

def __init__(self, driver, base_url="https://www.linkedin.com/jobs/", close_on_complete=False, scrape=True, scrape_recommended_jobs=True):
class Job(Scraper):

def __init__(
self,
linkedin_url=None,
job_title=None,
company=None,
company_linkedin_url=None,
location=None,
posted_date=None,
applicant_count=None,
job_description=None,
benefits=None,
driver=None,
close_on_complete=True,
scrape=True,
):
super().__init__()
self.linkedin_url = linkedin_url
self.job_title = job_title
self.driver = driver
self.base_url = base_url
self.company = company
self.company_linkedin_url = company_linkedin_url
self.location = location
self.posted_date = posted_date
self.applicant_count = applicant_count
self.job_description = job_description
self.benefits = benefits

if scrape:
self.scrape(close_on_complete, scrape_recommended_jobs)
self.scrape(close_on_complete)

def __repr__(self):
return f"<Job {self.job_title} {self.company}>"

def scrape(self, close_on_complete=True, scrape_recommended_jobs=True):
def scrape(self, close_on_complete=True):
if self.is_signed_in():
self.scrape_logged_in(close_on_complete=close_on_complete, scrape_recommended_jobs=scrape_recommended_jobs)
self.scrape_logged_in(close_on_complete=close_on_complete)
else:
raise NotImplemented("This part is not implemented yet")


def scrape_job_card(self, base_element) -> Job:
job_div = self.wait_for_element_to_load(name="job-card-list__title", base=base_element)
job_title = job_div.text.strip()
linkedin_url = job_div.get_attribute("href")
company = base_element.find_element_by_class_name("artdeco-entity-lockup__subtitle").text
location = base_element.find_element_by_class_name("job-card-container__metadata-wrapper").text
job = Job(linkedin_url=linkedin_url, job_title=job_title, company=company, location=location, scrape=False, driver=self.driver)
return job


def scrape_logged_in(self, close_on_complete=True, scrape_recommended_jobs=True):
def to_dict(self):
return {
"linkedin_url": self.linkedin_url,
"job_title": self.job_title,
"company": self.company,
"company_linkedin_url": self.company_linkedin_url,
"location": self.location,
"posted_date": self.posted_date,
"applicant_count": self.applicant_count,
"job_description": self.job_description,
"benefits": self.benefits
}


def scrape_logged_in(self, close_on_complete=True):
driver = self.driver
driver.get(self.base_url)
if scrape_recommended_jobs:
self.focus()
sleep(self.WAIT_FOR_ELEMENT_TIMEOUT)
job_area = self.wait_for_element_to_load(name="scaffold-finite-scroll__content")
areas = self.wait_for_all_elements_to_load(name="artdeco-card", base=job_area)
for i, area in enumerate(areas):
area_name = self.AREAS[i]
if not area_name:
continue
area_results = []
for job_posting in area.find_elements_by_class_name("jobs-job-board-list__item"):
job = self.scrape_job_card(job_posting)
area_results.append(job)
setattr(self, area_name, area_results)
return


def search(self, search_term: str) -> List[Job]:
url = os.path.join(self.base_url, "search") + f"?keywords={urllib.parse.quote(search_term)}&refresh=true"
self.driver.get(url)
self.scroll_to_bottom()

driver.get(self.linkedin_url)
self.focus()
sleep(self.WAIT_FOR_ELEMENT_TIMEOUT)

job_listing_class_name = "jobs-search-results-list"
job_listing = self.wait_for_element_to_load(name=job_listing_class_name)

self.scroll_class_name_element_to_page_percent(job_listing_class_name, 0.3)
self.focus()
sleep(self.WAIT_FOR_ELEMENT_TIMEOUT)

self.scroll_class_name_element_to_page_percent(job_listing_class_name, 0.6)
self.focus()
sleep(self.WAIT_FOR_ELEMENT_TIMEOUT)

self.scroll_class_name_element_to_page_percent(job_listing_class_name, 1)
self.focus()
sleep(self.WAIT_FOR_ELEMENT_TIMEOUT)

job_results = []
for job_card in self.wait_for_all_elements_to_load(name="job-card-list", base=job_listing):
job = self.scrape_job_card(job_card)
job_results.append(job)
return job_results
self.job_title = self.wait_for_element_to_load(name="job-details-jobs-unified-top-card__job-title").text.strip()
self.company = self.wait_for_element_to_load(name="job-details-jobs-unified-top-card__company-name").text.strip()
self.company_linkedin_url = self.wait_for_element_to_load(name="job-details-jobs-unified-top-card__company-name").find_element(By.TAG_NAME,"a").get_attribute("href")
primary_descriptions = self.wait_for_element_to_load(name="job-details-jobs-unified-top-card__primary-description-container").find_elements(By.TAG_NAME, "span")
texts = [span.text for span in primary_descriptions if span.text.strip() != ""]
self.location = texts[0]
self.posted_date = texts[3]

try:
self.applicant_count = self.wait_for_element_to_load(name="jobs-unified-top-card__applicant-count").text.strip()
except TimeoutException:
self.applicant_count = 0
job_description_elem = self.wait_for_element_to_load(name="jobs-description")
self.mouse_click(job_description_elem.find_element(By.TAG_NAME, "button"))
job_description_elem = self.wait_for_element_to_load(name="jobs-description")
job_description_elem.find_element(By.TAG_NAME, "button").click()
self.job_description = job_description_elem.text.strip()
try:
self.benefits = self.wait_for_element_to_load(name="jobs-unified-description__salary-main-rail-card").text.strip()
except TimeoutException:
self.benefits = None

if close_on_complete:
driver.close()