"""
WebWhatsAPI module
.. moduleauthor:: Mukul Hase <mukulhase@gmail.com>, Adarsh Sanjeev <adarshsanjeev@gmail.com>
"""
import binascii
import logging
import os
import shutil
import tempfile
from base64 import b64decode, b64encode
from io import BytesIO
from json import dumps, loads
import magic
from axolotl.kdf.hkdfv3 import HKDFv3
from axolotl.util.byteutil import ByteUtil
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from .objects.chat import UserChat, factory_chat
from .objects.contact import Contact
from .objects.message import MessageGroup, factory_message
from .objects.number_status import NumberStatus
from .wapi_js_wrapper import WapiJsWrapper
__version__ = '2.0.3'
class WhatsAPIDriverStatus(object):
Unknown = 'Unknown'
NoDriver = 'NoDriver'
NotConnected = 'NotConnected'
NotLoggedIn = 'NotLoggedIn'
LoggedIn = 'LoggedIn'
[docs]class WhatsAPIException(Exception):
pass
[docs]class ChatNotFoundError(WhatsAPIException):
pass
[docs]class WhatsAPIDriver(object):
"""
This is our main driver objects.
.. note::
Runs its own instance of selenium
"""
_PROXY = None
_URL = "https://web.whatsapp.com"
_LOCAL_STORAGE_FILE = 'localStorage.json'
_SELECTORS = {
'firstrun': "#wrapper",
'qrCode': "img[alt=\"Scan me!\"]",
'qrCodePlain': "div[data-ref]",
'mainPage': ".app.two",
'chatList': ".infinite-list-viewport",
'messageList': "#main > div > div:nth-child(1) > div > div.message-list",
'unreadMessageBar': "#main > div > div:nth-child(1) > div > div.message-list > div.msg-unread",
'searchBar': ".input",
'searchCancel': ".icon-search-morph",
'chats': ".infinite-list-item",
'chatBar': 'div.input',
'sendButton': 'button.icon:nth-child(3)',
'LoadHistory': '.btn-more',
'UnreadBadge': '.icon-meta',
'UnreadChatBanner': '.message-list',
'ReconnectLink': '.action',
'WhatsappQrIcon': 'span.icon:nth-child(2)',
'QRReloader': 'div[data-ref] > span > div'
}
_CLASSES = {
'unreadBadge': 'icon-meta',
'messageContent': "message-text",
'messageList': "msg"
}
logger = logging.getLogger(__name__)
driver = None
# Profile points to the Firefox profile for firefox and Chrome cache for chrome
# Do not alter this
_profile = None
def get_local_storage(self):
return self.driver.execute_script('return window.localStorage;')
def set_local_storage(self, data):
self.driver.execute_script(''.join(
["window.localStorage.setItem('{}', '{}');".format(k, v.replace("\n", "\\n") if isinstance(v, str) else v)
for k, v in data.items()]))
[docs] def save_firefox_profile(self, remove_old=False):
"""Function to save the firefox profile to the permanant one"""
self.logger.info("Saving profile from %s to %s" % (self._profile.path, self._profile_path))
if remove_old:
if os.path.exists(self._profile_path):
try:
shutil.rmtree(self._profile_path)
except OSError:
pass
shutil.copytree(os.path.join(self._profile.path), self._profile_path,
ignore=shutil.ignore_patterns("parent.lock", "lock", ".parentlock"))
else:
for item in os.listdir(self._profile.path):
if item in ["parent.lock", "lock", ".parentlock"]:
continue
s = os.path.join(self._profile.path, item)
d = os.path.join(self._profile_path, item)
if os.path.isdir(s):
shutil.copytree(s, d,
ignore=shutil.ignore_patterns("parent.lock", "lock", ".parentlock"))
else:
shutil.copy2(s, d)
with open(os.path.join(self._profile_path, self._LOCAL_STORAGE_FILE), 'w') as f:
f.write(dumps(self.get_local_storage()))
def set_proxy(self, proxy):
self.logger.info("Setting proxy to %s" % proxy)
proxy_address, proxy_port = proxy.split(":")
self._profile.set_preference("network.proxy.type", 1)
self._profile.set_preference("network.proxy.http", proxy_address)
self._profile.set_preference("network.proxy.http_port", int(proxy_port))
self._profile.set_preference("network.proxy.ssl", proxy_address)
self._profile.set_preference("network.proxy.ssl_port", int(proxy_port))
[docs] def close(self):
"""Closes the selenium instance"""
self.driver.close()
def __init__(self, client="firefox", username="API", proxy=None, command_executor=None, loadstyles=False,
profile=None, headless=False, autoconnect=True, logger=None, extra_params=None, chrome_options=None,
executable_path=None):
"""Initialises the webdriver"""
self.logger = logger or self.logger
extra_params = extra_params or {}
if profile is not None:
self._profile_path = profile
self.logger.info("Checking for profile at %s" % self._profile_path)
if not os.path.exists(self._profile_path):
self.logger.critical("Could not find profile at %s" % profile)
raise WhatsAPIException("Could not find profile at %s" % profile)
else:
self._profile_path = None
self.client = client.lower()
if self.client == "firefox":
if self._profile_path is not None:
self._profile = webdriver.FirefoxProfile(self._profile_path)
else:
self._profile = webdriver.FirefoxProfile()
if not loadstyles:
# Disable CSS
self._profile.set_preference('permissions.default.stylesheet', 2)
# Disable images
self._profile.set_preference('permissions.default.image', 2)
# Disable Flash
self._profile.set_preference('dom.ipc.plugins.enabled.libflashplayer.so',
'false')
if proxy is not None:
self.set_proxy(proxy)
options = Options()
if headless:
options.set_headless()
options.profile = self._profile
capabilities = DesiredCapabilities.FIREFOX.copy()
capabilities['webStorageEnabled'] = True
self.logger.info("Starting webdriver")
if executable_path is not None:
executable_path = os.path.abspath(executable_path)
self.logger.info("Starting webdriver")
self.driver = webdriver.Firefox(capabilities=capabilities, options=options,
executable_path=executable_path,
**extra_params)
else:
self.logger.info("Starting webdriver")
self.driver = webdriver.Firefox(capabilities=capabilities, options=options,
**extra_params)
elif self.client == "chrome":
self._profile = webdriver.ChromeOptions()
if self._profile_path is not None:
self._profile.add_argument("user-data-dir=%s" % self._profile_path)
if proxy is not None:
self._profile.add_argument('--proxy-server=%s' % proxy)
if headless:
self._profile.add_argument('headless')
if chrome_options is not None:
for option in chrome_options:
self._profile.add_argument(option)
self.logger.info("Starting webdriver")
self.driver = webdriver.Chrome(chrome_options=self._profile, **extra_params)
elif client == 'remote':
if self._profile_path is not None:
self._profile = webdriver.FirefoxProfile(self._profile_path)
else:
self._profile = webdriver.FirefoxProfile()
capabilities = DesiredCapabilities.FIREFOX.copy()
self.driver = webdriver.Remote(
command_executor=command_executor,
desired_capabilities=capabilities,
**extra_params
)
else:
self.logger.error("Invalid client: %s" % client)
self.username = username
self.wapi_functions = WapiJsWrapper(self.driver, self)
self.driver.set_script_timeout(500)
self.driver.implicitly_wait(10)
if autoconnect:
self.connect()
def connect(self):
self.driver.get(self._URL)
profilePath = ""
if self.client == "chrome":
profilePath = ""
else:
profilePath = self._profile.path
local_storage_file = os.path.join(profilePath, self._LOCAL_STORAGE_FILE)
if os.path.exists(local_storage_file):
with open(local_storage_file) as f:
self.set_local_storage(loads(f.read()))
self.driver.refresh()
[docs] def is_logged_in(self):
"""Returns if user is logged. Can be used if non-block needed for wait_for_login"""
# instead we use this (temporary) solution:
# return 'class="app _3dqpi two"' in self.driver.page_source
return self.driver.execute_script(
"if (document.querySelector('*[data-icon=chat]') !== null) { return true } else { return false }")
[docs] def is_connected(self):
"""Returns if user's phone is connected to the internet."""
return self.wapi_functions.isConnected()
[docs] def wait_for_login(self, timeout=90):
"""Waits for the QR to go away"""
WebDriverWait(self.driver, timeout).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, self._SELECTORS['mainPage']))
)
def get_qr_plain(self):
return self.driver.find_element_by_css_selector(self._SELECTORS['qrCodePlain']).get_attribute("data-ref")
[docs] def get_qr(self, filename=None):
"""Get pairing QR code from client"""
if "Click to reload QR code" in self.driver.page_source:
self.reload_qr()
qr = self.driver.find_element_by_css_selector(self._SELECTORS['qrCode'])
if filename is None:
fd, fn_png = tempfile.mkstemp(prefix=self.username, suffix='.png')
else:
fd = os.open(filename, os.O_RDWR | os.O_CREAT)
fn_png = os.path.abspath(filename)
self.logger.debug("QRcode image saved at %s" % fn_png)
qr.screenshot(fn_png)
os.close(fd)
return fn_png
def get_qr_base64(self):
if "Click to reload QR code" in self.driver.page_source:
self.reload_qr()
qr = self.driver.find_element_by_css_selector(self._SELECTORS['qrCode'])
return qr.screenshot_as_base64
def screenshot(self, filename):
self.driver.get_screenshot_as_file(filename)
[docs] def get_all_chats(self):
"""
Fetches all chats
:return: List of chats
:rtype: list[Chat]
"""
chats = self.wapi_functions.getAllChats()
if chats:
return [factory_chat(chat, self) for chat in chats]
else:
return []
[docs] def get_all_chat_ids(self):
"""
Fetches all chat ids
:return: List of chat ids
:rtype: list[str]
"""
return self.wapi_functions.getAllChatIds()
[docs] def get_unread(self, include_me=False, include_notifications=False, use_unread_count=False):
"""
Fetches unread messages
:param include_me: Include user's messages
:type include_me: bool or None
:param include_notifications: Include events happening on chat
:type include_notifications: bool or None
:param use_unread_count: If set uses chat's 'unreadCount' attribute to fetch last n messages from chat
:type use_unread_count: bool
:return: List of unread messages grouped by chats
:rtype: list[MessageGroup]
"""
raw_message_groups = self.wapi_functions.getUnreadMessages(include_me, include_notifications, use_unread_count)
unread_messages = []
for raw_message_group in raw_message_groups:
chat = factory_chat(raw_message_group, self)
messages = list(
filter(None.__ne__, [factory_message(message, self) for message in raw_message_group['messages']]))
messages.sort(key=lambda message: message.timestamp)
unread_messages.append(MessageGroup(chat, messages))
return unread_messages
[docs] def get_unread_messages_in_chat(self,
id,
include_me=False,
include_notifications=False):
"""
I fetch unread messages from an asked chat.
:param id: chat id
:type id: str
:param include_me: if user's messages are to be included
:type include_me: bool
:param include_notifications: if events happening on chat are to be included
:type include_notifications: bool
:return: list of unread messages from asked chat
:rtype: list
"""
# get unread messages
messages = self.wapi_functions.getUnreadMessagesInChat(
id,
include_me,
include_notifications
)
# process them
unread = [factory_message(message, self) for message in messages]
# return them
return unread
# get_unread_messages_in_chat()
[docs] def get_all_messages_in_chat(self, chat, include_me=False, include_notifications=False):
"""
Fetches messages in chat
:param include_me: Include user's messages
:type include_me: bool or None
:param include_notifications: Include events happening on chat
:type include_notifications: bool or None
:return: List of messages in chat
:rtype: list[Message]
"""
message_objs = self.wapi_functions.getAllMessagesInChat(chat.id, include_me, include_notifications)
for message in message_objs:
yield (factory_message(message, self))
[docs] def get_all_message_ids_in_chat(self, chat, include_me=False, include_notifications=False):
"""
Fetches message ids in chat
:param include_me: Include user's messages
:type include_me: bool or None
:param include_notifications: Include events happening on chat
:type include_notifications: bool or None
:return: List of message ids in chat
:rtype: list[str]
"""
return self.wapi_functions.getAllMessageIdsInChat(chat.id, include_me, include_notifications)
[docs] def get_message_by_id(self, message_id):
"""
Fetch a message
:param message_id: Message ID
:type message_id: str
:return: Message or False
:rtype: Message
"""
result = self.wapi_functions.getMessageById(message_id)
if result:
result = factory_message(result, self)
return result
[docs] def get_chat_from_id(self, chat_id):
"""
Fetches a chat given its ID
:param chat_id: Chat ID
:type chat_id: str
:return: Chat or Error
:rtype: Chat
"""
chat = self.wapi_functions.getChatById(chat_id)
if chat:
return factory_chat(chat, self)
raise ChatNotFoundError("Chat {0} not found".format(chat_id))
[docs] def get_chat_from_name(self, chat_name):
"""
Fetches a chat given its name
:param chat_name: Chat name
:type chat_name: str
:return: Chat or Error
:rtype: Chat
"""
chat = self.wapi_functions.getChatByName(chat_name)
if chat:
return factory_chat(chat, self)
raise ChatNotFoundError("Chat {0} not found".format(chat_name))
[docs] def get_chat_from_phone_number(self, number, createIfNotFound=False):
"""
Gets chat by phone number
Number format should be as it appears in Whatsapp ID
For example, for the number:
+972-51-234-5678
This function would receive:
972512345678
:param number: Phone number
:return: Chat
:rtype: Chat
"""
for chat in self.get_all_chats():
if not isinstance(chat, UserChat) or number not in chat.id:
continue
return chat
if createIfNotFound:
self.create_chat_by_number(number)
self.wait_for_login()
for chat in self.get_all_chats():
if not isinstance(chat, UserChat) or number not in chat.id:
continue
return chat
raise ChatNotFoundError('Chat for phone {0} not found'.format(number))
def reload_qr(self):
self.driver.find_element_by_css_selector(self._SELECTORS['QRReloader']).click()
[docs] def get_status(self):
"""
Returns status of the driver
:return: Status
:rtype: WhatsAPIDriverStatus
"""
if self.driver is None:
return WhatsAPIDriverStatus.NotConnected
if self.driver.session_id is None:
return WhatsAPIDriverStatus.NotConnected
try:
self.driver.find_element_by_css_selector(self._SELECTORS['mainPage'])
return WhatsAPIDriverStatus.LoggedIn
except NoSuchElementException:
pass
try:
self.driver.find_element_by_css_selector(self._SELECTORS['qrCode'])
return WhatsAPIDriverStatus.NotLoggedIn
except NoSuchElementException:
pass
return WhatsAPIDriverStatus.Unknown
def chat_send_message(self, chat_id, message):
result = self.wapi_functions.sendMessage(chat_id, message)
if not isinstance(result, bool):
return factory_message(result, self)
return result
def chat_reply_message(self, message_id, message):
result = self.wapi_functions.ReplyMessage(message_id, message)
if not isinstance(result, bool):
return factory_message(result, self)
return result
[docs] def send_message_to_id(self, recipient, message):
"""
Send a message to a chat given its ID
:param recipient: Chat ID
:type recipient: str
:param message: Plain-text message to be sent.
:type message: str
"""
return self.wapi_functions.sendMessageToID(recipient, message)
[docs] def convert_to_base64(self, path, is_thumbnail=False):
"""
:param path: file path
:return: returns the converted string and formatted for the send media function send_media
"""
mime = magic.Magic(mime=True)
content_type = mime.from_file(path)
archive = ''
with open(path, "rb") as image_file:
archive = b64encode(image_file.read())
archive = archive.decode('utf-8')
if is_thumbnail:
return archive
return 'data:' + content_type + ';base64,' + archive
[docs] def send_message_with_thumbnail(self, path, chatid, url, title, description, text):
"""
converts the file to base64 and sends it using the sendImage function of wapi.js
PS: The first link in text must be equals to url or thumbnail will not appear.
:param path: image file path
:param chatid: chatId to be sent
:param url: of thumbnail
:param title: of thumbnail
:param description: of thumbnail
:param text: under thumbnail
:return:
"""
imgBase64 = self.convert_to_base64(path, is_thumbnail=True)
if url not in text:
return False
return self.wapi_functions.sendMessageWithThumb(imgBase64, url, title, description, text, chatid)
[docs] def chat_send_seen(self, chat_id):
"""
Send a seen to a chat given its ID
:param chat_id: Chat ID
:type chat_id: str
"""
return self.wapi_functions.sendSeen(chat_id)
def chat_load_earlier_messages(self, chat_id):
self.wapi_functions.loadEarlierMessages(chat_id)
def chat_load_all_earlier_messages(self, chat_id):
self.wapi_functions.loadAllEarlierMessages(chat_id)
def async_chat_load_all_earlier_messages(self, chat_id):
self.wapi_functions.asyncLoadAllEarlierMessages(chat_id)
def are_all_messages_loaded(self, chat_id):
return self.wapi_functions.areAllMessagesLoaded(chat_id)
def group_get_participants_ids(self, group_id):
return self.wapi_functions.getGroupParticipantIDs(group_id)
def group_get_participants(self, group_id):
participant_ids = self.group_get_participants_ids(group_id)
for participant_id in participant_ids:
yield self.get_contact_from_id(participant_id['_serialized'])
def group_get_admin_ids(self, group_id):
return self.wapi_functions.getGroupAdmins(group_id)
def group_get_admins(self, group_id):
admin_ids = self.group_get_admin_ids(group_id)
for admin_id in admin_ids:
yield self.get_contact_from_id(admin_id)
[docs] def get_profile_pic_from_id(self, id):
"""
Get full profile pic from an id
The ID must be on your contact book to
successfully get their profile picture.
:param id: ID
:type id: str
"""
profile_pic = self.wapi_functions.getProfilePicFromId(id)
if profile_pic:
return b64decode(profile_pic)
else:
return False
[docs] def get_profile_pic_small_from_id(self, id):
"""
Get small profile pic from an id
The ID must be on your contact book to
successfully get their profile picture.
:param id: ID
:type id: str
"""
profile_pic_small = self.wapi_functions.getProfilePicSmallFromId(id)
if profile_pic_small:
return b64decode(profile_pic_small)
else:
return False
def download_file(self, url):
return b64decode(self.wapi_functions.downloadFile(url))
def download_file_with_credentials(self, url):
return b64decode(self.wapi_functions.downloadFileWithCredentials(url))
def download_media(self, media_msg, force_download=False):
if not force_download:
try:
if media_msg.content:
return BytesIO(b64decode(media_msg.content))
except AttributeError:
pass
file_data = self.download_file(media_msg.client_url)
if not file_data:
raise Exception('Impossible to download file')
media_key = b64decode(media_msg.media_key)
derivative = HKDFv3().deriveSecrets(media_key,
binascii.unhexlify(media_msg.crypt_keys[media_msg.type]),
112)
parts = ByteUtil.split(derivative, 16, 32)
iv = parts[0]
cipher_key = parts[1]
e_file = file_data[:-10]
cr_obj = Cipher(algorithms.AES(cipher_key), modes.CBC(iv), backend=default_backend())
decryptor = cr_obj.decryptor()
return BytesIO(decryptor.update(e_file) + decryptor.finalize())
[docs] def mark_default_unread_messages(self):
"""
Look for the latest unreplied messages received and mark them as unread.
"""
self.wapi_functions.markDefaultUnreadMessages()
[docs] def get_battery_level(self):
"""
Check the battery level of device
:return: int: Battery level
"""
return self.wapi_functions.getBatteryLevel()
[docs] def leave_group(self, chat_id):
"""
Leave a group
:param chat_id: id of group
:return:
"""
return self.wapi_functions.leaveGroup(chat_id)
[docs] def delete_chat(self, chat_id):
"""
Delete a chat
:param chat_id: id of chat
:return:
"""
return self.wapi_functions.deleteConversation(chat_id)
[docs] def delete_message(self, chat_id, message_array, revoke=False):
"""
Delete a chat
:param chat_id: id of chat
:param message_array: one or more message(s) id
:param revoke: Set to true so the message will be deleted for everyone, not only you
:return:
"""
return self.wapi_functions.deleteMessage(chat_id, message_array, revoke=False)
[docs] def check_number_status(self, number_id):
"""
Check if a number is valid/registered in the whatsapp service
:param number_id: number id
:return:
"""
number_status = self.wapi_functions.checkNumberStatus(number_id)
return NumberStatus(number_status, self)
def subscribe_new_messages(self, observer):
self.wapi_functions.new_messages_observable.subscribe(observer)
def unsubscribe_new_messages(self, observer):
self.wapi_functions.new_messages_observable.unsubscribe(observer)
def quit(self):
self.wapi_functions.quit()
self.driver.quit()
def create_chat_by_number(self, number):
url = self._URL + "/send?phone=" + number
self.driver.get(url)
def contact_block(self, id):
return self.wapi_functions.contactBlock(id)
def contact_unblock(self, id):
return self.wapi_functions.contactUnblock(id)
def remove_participant_group(self, idGroup, idParticipant):
return self.wapi_functions.removeParticipantGroup(idGroup, idParticipant)
def promove_participant_admin_group(self, idGroup, idParticipant):
return self.wapi_functions.promoteParticipantAdminGroup(idGroup, idParticipant)
def demote_participant_admin_group(self, idGroup, idParticipant):
return self.wapi_functions.demoteParticipantAdminGroup(idGroup, idParticipant)