Source code for langchain_google_community.drive

# Prerequisites:
# 1. Create a Google Cloud project
# 2. Enable the Google Drive API:
#   https://console.cloud.google.com/flows/enableapi?apiid=drive.googleapis.com
# 3. Authorize credentials for desktop app:
#   https://developers.google.com/drive/api/quickstart/python#authorize_credentials_for_a_desktop_application # noqa: E501
# 4. For service accounts visit
#   https://cloud.google.com/iam/docs/service-accounts-create

import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union

from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import BaseModel, root_validator, validator

SCOPES = ["https://www.googleapis.com/auth/drive.file"]


[docs] class GoogleDriveLoader(BaseLoader, BaseModel): """Load Google Docs from `Google Drive`.""" service_account_key: Path = Path.home() / ".credentials" / "keys.json" """Path to the service account key file.""" credentials_path: Path = Path.home() / ".credentials" / "credentials.json" """Path to the credentials file.""" token_path: Path = Path.home() / ".credentials" / "token.json" """Path to the token file.""" folder_id: Optional[str] = None """The folder id to load from.""" document_ids: Optional[List[str]] = None """The document ids to load from.""" file_ids: Optional[List[str]] = None """The file ids to load from.""" recursive: bool = False """Whether to load recursively. Only applies when folder_id is given.""" file_types: Optional[Sequence[str]] = None """The file types to load. Only applies when folder_id is given.""" load_trashed_files: bool = False """Whether to load trashed files. Only applies when folder_id is given.""" # NOTE(MthwRobinson) - changing the file_loader_cls to type here currently # results in pydantic validation errors file_loader_cls: Any = None """The file loader class to use.""" file_loader_kwargs: Dict["str", Any] = {} """The file loader kwargs to use.""" load_auth: bool = False """Whether to load authorization identities.""" load_extended_metadata: bool = False """Whether to load extended metadata.""" def _get_file_size_from_id(self, id: str) -> str: """Fetch the size of the file.""" try: import googleapiclient.errors # type: ignore[import] from googleapiclient.discovery import build # type: ignore[import] except ImportError as exc: raise ImportError( "You must run " "`pip install --upgrade " "google-api-python-client` " "to load authorization identities." ) from exc creds = self._load_credentials() service = build("drive", "v3", credentials=creds) try: file = service.files().get(fileId=id, fields="size").execute() return file["size"] except googleapiclient.errors.HttpError: print( f"insufficientFilePermissions: The user does not have sufficient \ permissions to retrieve size for the file with fileId: {id}" ) return "unknown" except Exception as exc: print( f"Error occurred while fetching the size for the file with fileId: {id}" ) print(f"Error: {exc}") return "unknown" def _get_owner_metadata_from_id(self, id: str) -> str: """Fetch the owner of the file.""" try: import googleapiclient.errors # type: ignore[import] from googleapiclient.discovery import build # type: ignore[import] except ImportError as exc: raise ImportError( "You must run " "`pip install --upgrade " "google-api-python-client` " "to load authorization identities." ) from exc creds = self._load_credentials() service = build("drive", "v3", credentials=creds) try: file = service.files().get(fileId=id, fields="owners").execute() return file["owners"][0].get("emailAddress") except googleapiclient.errors.HttpError: print( f"insufficientFilePermissions: The user does not have sufficient \ permissions to retrieve owner for the file with fileId: {id}" ) return "unknown" except Exception as exc: print( f"Error occurred while fetching the owner for the file with fileId: \ {id} with error: {exc}" ) return "unknown" def _get_file_path_from_id(self, id: str) -> str: """Fetch the full path of the file starting from the root.""" try: import googleapiclient.errors # type: ignore[import] from googleapiclient.discovery import build # type: ignore[import] except ImportError as exc: raise ImportError( "You must run " "`pip install --upgrade " "google-api-python-client` " "to load authorization identities." ) from exc creds = self._load_credentials() service = build("drive", "v3", credentials=creds) path = [] current_id = id while True: try: file = ( service.files() .get(fileId=current_id, fields="name, parents") .execute() ) path.append(file["name"]) if "parents" in file: current_id = file["parents"][0] else: break except googleapiclient.errors.HttpError: print( f"insufficientFilePermissions: The user does not have sufficient\ permissions to retrieve path for the file with fileId: {id}" ) break path.reverse() return "/".join(path) def _get_identity_metadata_from_id(self, id: str) -> List[str]: """Fetch the list of people having access to ID file.""" try: import googleapiclient.errors # type: ignore[import] from googleapiclient.discovery import build # type: ignore[import] except ImportError as exc: raise ImportError( "You must run " "`pip install --upgrade " "google-api-python-client` " "to load authorization identities." ) from exc authorized_identities: list = [] creds = self._load_credentials() service = build("drive", "v3", credentials=creds) # Build the service try: permissions = service.permissions().list(fileId=id).execute() except googleapiclient.errors.HttpError: print( f"insufficientFilePermissions: The user does not have sufficient \ permissions to retrieve permission for the file with fileId: {id}" ) return authorized_identities except Exception as exc: print( f"Error occurred while fetching the permissions for the file with \ fileId: {id}" ) print(f"Error: {exc}") return authorized_identities for perm in permissions.get("permissions", {}): email_id = ( service.permissions() .get(fileId=id, permissionId=perm.get("id", ""), fields="emailAddress") .execute() .get("emailAddress") ) if email_id: authorized_identities.append(email_id) return authorized_identities @root_validator def validate_inputs(cls, values: Dict[str, Any]) -> Dict[str, Any]: """Validate that either folder_id or document_ids is set, but not both.""" if values.get("folder_id") and ( values.get("document_ids") or values.get("file_ids") ): raise ValueError( "Cannot specify both folder_id and document_ids nor " "folder_id and file_ids" ) if ( not values.get("folder_id") and not values.get("document_ids") and not values.get("file_ids") ): raise ValueError("Must specify either folder_id, document_ids, or file_ids") file_types = values.get("file_types") if file_types: if values.get("document_ids") or values.get("file_ids"): raise ValueError( "file_types can only be given when folder_id is given," " (not when document_ids or file_ids are given)." ) type_mapping = { "document": "application/vnd.google-apps.document", "sheet": "application/vnd.google-apps.spreadsheet", "pdf": "application/pdf", } allowed_types = list(type_mapping.keys()) + list(type_mapping.values()) short_names = ", ".join([f"'{x}'" for x in type_mapping.keys()]) full_names = ", ".join([f"'{x}'" for x in type_mapping.values()]) for file_type in file_types: if file_type not in allowed_types: raise ValueError( f"Given file type {file_type} is not supported. " f"Supported values are: {short_names}; and " f"their full-form names: {full_names}" ) # replace short-form file types by full-form file types def full_form(x: str) -> str: return type_mapping[x] if x in type_mapping else x values["file_types"] = [full_form(file_type) for file_type in file_types] return values @validator("credentials_path") def validate_credentials_path(cls, v: Any, **kwargs: Any) -> Any: """Validate that credentials_path exists.""" if not v.exists(): raise ValueError(f"credentials_path {v} does not exist") return v def _load_credentials(self) -> Any: """Load credentials.""" # Adapted from https://developers.google.com/drive/api/v3/quickstart/python try: from google.auth import default # type: ignore[import] from google.auth.transport.requests import Request # type: ignore[import] from google.oauth2 import service_account # type: ignore[import] from google.oauth2.credentials import Credentials # type: ignore[import] from google_auth_oauthlib.flow import ( # type: ignore[import] InstalledAppFlow, ) except ImportError: raise ImportError( "Could execute GoogleDriveLoader. " "Please, install drive dependency group: " "`pip install langchain-google-community[drive]`" ) creds = None if self.service_account_key.exists(): return service_account.Credentials.from_service_account_file( str(self.service_account_key), scopes=SCOPES ) if self.token_path.exists(): creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) elif "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ: creds, project = default() creds = creds.with_scopes(SCOPES) # no need to write to file if creds: return creds else: flow = InstalledAppFlow.from_client_secrets_file( str(self.credentials_path), SCOPES ) creds = flow.run_local_server(port=0) with open(self.token_path, "w") as token: token.write(creds.to_json()) return creds def _load_sheet_from_id(self, id: str) -> List[Document]: """Load a sheet and all tabs from an ID.""" from googleapiclient.discovery import build # type: ignore[import] creds = self._load_credentials() sheets_service = build("sheets", "v4", credentials=creds) spreadsheet = sheets_service.spreadsheets().get(spreadsheetId=id).execute() sheets = spreadsheet.get("sheets", []) if self.load_auth: authorized_identities = self._get_identity_metadata_from_id(id) if self.load_extended_metadata: owner = self._get_owner_metadata_from_id(id) size = self._get_file_size_from_id(id) full_path = self._get_file_path_from_id(id) documents = [] for sheet in sheets: sheet_name = sheet["properties"]["title"] result = ( sheets_service.spreadsheets() .values() .get(spreadsheetId=id, range=sheet_name) .execute() ) values = result.get("values", []) if not values: continue # empty sheet header = values[0] for i, row in enumerate(values[1:], start=1): metadata = { "source": ( f"https://docs.google.com/spreadsheets/d/{id}/" f"edit?gid={sheet['properties']['sheetId']}" ), "title": f"{spreadsheet['properties']['title']} - {sheet_name}", "row": i, } if self.load_auth: metadata["authorized_identities"] = authorized_identities if self.load_extended_metadata: metadata["owner"] = owner metadata["size"] = size metadata["full_path"] = full_path content = [] for j, v in enumerate(row): title = header[j].strip() if len(header) > j else "" content.append(f"{title}: {v.strip()}") page_content = "\n".join(content) documents.append(Document(page_content=page_content, metadata=metadata)) return documents def _load_document_from_id(self, id: str) -> Document: """Load a document from an ID.""" from io import BytesIO from googleapiclient.discovery import build from googleapiclient.errors import HttpError # type: ignore[import] from googleapiclient.http import MediaIoBaseDownload # type: ignore[import] creds = self._load_credentials() service = build("drive", "v3", credentials=creds) if self.load_auth: authorized_identities = self._get_identity_metadata_from_id(id) if self.load_extended_metadata: owner = self._get_owner_metadata_from_id(id) size = self._get_file_size_from_id(id) full_path = self._get_file_path_from_id(id) file = ( service.files() .get(fileId=id, supportsAllDrives=True, fields="modifiedTime,name") .execute() ) request = service.files().export_media(fileId=id, mimeType="text/plain") fh = BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False try: while done is False: status, done = downloader.next_chunk() except HttpError as e: if e.resp.status == 404: print("File not found: {}".format(id)) # noqa: T201 else: print("An error occurred: {}".format(e)) # noqa: T201 text = fh.getvalue().decode("utf-8") metadata = { "source": f"https://docs.google.com/document/d/{id}/edit", "title": f"{file.get('name')}", "when": f"{file.get('modifiedTime')}", } if self.load_auth: metadata["authorized_identities"] = authorized_identities # type: ignore if self.load_extended_metadata: metadata["owner"] = owner metadata["size"] = size metadata["full_path"] = full_path return Document(page_content=text, metadata=metadata) def _load_documents_from_folder( self, folder_id: str, *, file_types: Optional[Sequence[str]] = None ) -> List[Document]: """Load documents from a folder.""" from googleapiclient.discovery import build creds = self._load_credentials() service = build("drive", "v3", credentials=creds) files = self._fetch_files_recursive(service, folder_id) # If file types filter is provided, we'll filter by the file type. if file_types: _files = [f for f in files if f["mimeType"] in file_types] # type: ignore else: _files = files returns = [] for file in _files: if file["trashed"] and not self.load_trashed_files: continue elif file["mimeType"] == "application/vnd.google-apps.document": returns.append(self._load_document_from_id(file["id"])) # type: ignore elif file["mimeType"] == "application/vnd.google-apps.spreadsheet": returns.extend(self._load_sheet_from_id(file["id"])) # type: ignore elif ( file["mimeType"] == "application/pdf" or self.file_loader_cls is not None ): returns.extend(self._load_file_from_id(file["id"])) # type: ignore else: pass return returns def _fetch_files_recursive( self, service: Any, folder_id: str ) -> List[Dict[str, Union[str, List[str]]]]: """Fetch all files and subfolders recursively.""" results = ( service.files() .list( q=f"'{folder_id}' in parents", pageSize=1000, includeItemsFromAllDrives=True, supportsAllDrives=True, fields="nextPageToken, files(id, name, mimeType, parents, trashed)", ) .execute() ) files = results.get("files", []) returns = [] for file in files: if file["mimeType"] == "application/vnd.google-apps.folder": if self.recursive: returns.extend(self._fetch_files_recursive(service, file["id"])) else: returns.append(file) return returns def _load_documents_from_ids(self) -> List[Document]: """Load documents from a list of IDs.""" if not self.document_ids: raise ValueError("document_ids must be set") return [self._load_document_from_id(doc_id) for doc_id in self.document_ids] def _load_file_from_id(self, id: str) -> List[Document]: """Load a file from an ID.""" from io import BytesIO from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload creds = self._load_credentials() service = build("drive", "v3", credentials=creds) if self.load_auth: authorized_identities = self._get_identity_metadata_from_id(id) if self.load_extended_metadata: owner = self._get_owner_metadata_from_id(id) size = self._get_file_size_from_id(id) full_path = self._get_file_path_from_id(id) file = service.files().get(fileId=id, supportsAllDrives=True).execute() request = service.files().get_media(fileId=id) fh = BytesIO() downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() if self.file_loader_cls is not None: fh.seek(0) loader = self.file_loader_cls(file=fh, **self.file_loader_kwargs) docs = loader.load() for doc in docs: doc.metadata["source"] = f"https://drive.google.com/file/d/{id}/view" if "title" not in doc.metadata: doc.metadata["title"] = f"{file.get('name')}" if self.load_auth: doc.metadata["authorized_identities"] = authorized_identities if self.load_extended_metadata: doc.metadata["owner"] = owner doc.metadata["size"] = size doc.metadata["full_path"] = full_path return docs else: from PyPDF2 import PdfReader # type: ignore[import] content = fh.getvalue() pdf_reader = PdfReader(BytesIO(content)) docs = [] for i, page in enumerate(pdf_reader.pages): metadata = { "source": f"https://drive.google.com/file/d/{id}/view", "title": f"{file.get('name')}", "page": i, } if self.load_auth: metadata["authorized_identities"] = authorized_identities if self.load_extended_metadata: metadata["owner"] = owner metadata["size"] = size metadata["full_path"] = full_path docs.append( Document( page_content=page.extract_text(), metadata=metadata, ) ) return docs def _load_file_from_ids(self) -> List[Document]: """Load files from a list of IDs.""" if not self.file_ids: raise ValueError("file_ids must be set") docs = [] for file_id in self.file_ids: docs.extend(self._load_file_from_id(file_id)) return docs
[docs] def load(self) -> List[Document]: """Load documents.""" if self.folder_id: return self._load_documents_from_folder( self.folder_id, file_types=self.file_types ) elif self.document_ids: return self._load_documents_from_ids() else: return self._load_file_from_ids()