Python - BigQuery utility script

October 24, 2017

Get the bigquery connector util from below article.

https://balakumar.net.in/python-connecting-to-bigquery/

Now lets create a new bigquery client utility script (bigquery_client.py) by using the connect service bigquery_connect.py base class.

This utility script is for

  • Listing all the datasets
  • Creating new dataset
  • Checking the existence of a dataset
  • Creating new table and checking for existence
  • Getting the table data
  • Creating new view and checking for existence
import re
from googleapiclient.errors import HttpError
from app.config import config
from app.libs.bq.bigquery_connect import BigqueryConnect

class BigqueryClient(BigqueryConnect):

    _service = None

    def __init__(self):
        self._service = self.getClientService()
        self._response = {
            'status': None,
            'data': {}
        }

    def getDatasets(self):
        """
        List of all datasets
        :returns: dict
        """
        ds = []

        for dataset in self._service.list_datasets():  # API request(s)
            if re.match('^z_.+', dataset.name) is not None:

                #dsInfo = self._service.dataset( dataset.name ).execute()
                #dsInfo = bigquery.dataset.Dataset( dataset.name, self._service)
                #dsInfo.friendly_name

                ds.append( {
                    'name': dataset.name,
                    'friendly_name': dataset.friendly_name,
                    'description': dataset.description
                } )

        return ds

    def isDatasetExists(self, datasetName):
        """
        Check whether Dataset exist or not
        :param datasetName: Name of the Dataset
        :returns: Boolean
        """
        dataset = self._service.dataset(datasetName)

        if dataset.exists():
            return True

        return False

    def createDataset(self, name, friendlyName, description=None):
        """
        Creating new Dataset
        :param name: Name of the Dataset
        :param friendlyName: Display name of the Dataset
        :param description: (Optional) Description about the Dataset
        :returns: Object with response details
        """

        try:
            dataset = self._service.dataset(name)

            if not dataset.exists():
                dataset.friendly_name = friendlyName

                if description is not None:
                    dataset.description = description

                dataset.create()

                self._response['status'] = 'success'
                self._response['data'] = {
                    'message': 'Dataset created successfully',
                    'bqObject': dataset
                }
            else:
                self._response = self._responseExists()

        except HttpError, err:
            self._response = self._responseError(err._get_reason())

        return self._response

    def getView(self, datasetName, viewName):
        """
        Check whether View exist or not
        :param datasetName: Name of the Dataset
        :param viewName: Name of the View
        :returns: Boolean
        """
        dataset = self._service.dataset(datasetName)
        view = dataset.table(viewName)

        if view.exists():
            view.reload()
            return view

        return False

    def createView(self, datasetName, viewName, viewSql, displayName=None, description=None):
        """
        Creating new View under a dataset
        :param datasetName: Dataset object
        :param viewName: Name of the View
        :param viewSql: SQL view query
        :param displayName: (Optional) Friendly name of the View
        :param description: (Optional) Description about the View
        :returns: Object with response details
        """

        dataset = self._service.dataset(datasetName)
        table = dataset.table(viewName)

        try:
            table.type = 'VIEW'
            table.view_query = viewSql

            if not table.exists():
                if displayName is not None:
                    table.friendlyName = displayName

                if description is not None:
                    table.description = description

                table.create()

                self._response['data'] = {
                    'message': 'created',
                    'bqObject': table
                }
            else:
                table.update()

                self._response['data'] = {
                    'message': 'updated',
                    'bqObject': table
                }

            self._response['status'] = 'success'
        except HttpError, err:
            self._response = self._responseError(err._get_reason())

        return self._response

    def createTable(self, datasetName, tableName, schema):
        """
        Creating new View under a dataset
        :param datasetName: Dataset object
        :param tableName: Name of the Table
        :param schema: Table schema
        :returns: Object with response details
        """

        dataset = self._service.dataset(datasetName)
        table = dataset.table(tableName)

        try:
            table.schema = schema

            if not table.exists():
                table.create()

                self._response['data'] = {
                    'message': 'created',
                    'bqObject': table
                }
            else:
                table.update()

                self._response['data'] = {
                    'message': 'updated',
                    'bqObject': table
                }

            self._response['status'] = 'success'
        except HttpError, err:
            self._response = self._responseError(err._get_reason())

        return self._response

    def getTableData(self, datasetName, tableName, maxResults=None):
        """
        Check whether View exist or not
        :param datasetName: Name of the Dataset
        :param tableName: Name of the Table
        :param maxResults: Return rows limit
        :returns: List
        """
        dataset = self._service.dataset(datasetName)
        table = dataset.table(tableName)

        if table.exists():
            table.reload()
            if maxResults is not None:
                tableData = table.fetch_data(max_results=maxResults)
            else:
                tableData = table.fetch_data()

            dataList = [list(i) for i in tableData]

            return {
                'fields': [field.name for field in table.schema],
                'data': dataList
            }

        return {}

    def _responseExists(self):
        return {
            'status': 'error',
            'data': {
                'message': 'exists'
            }
        }

    def _responseError(self, error):
        return {
            'status': 'error',
            'data': {
                'message': error
            }
        }