Source code for l2tscaffolder.scaffolders.plaso_sqlite

# -*- coding: utf-8 -*-
"""The scaffolder interface classes."""
import os
import sqlite3

from typing import Dict
from typing import Iterator
from typing import Tuple

from l2tscaffolder.lib import errors
from l2tscaffolder.scaffolders import interface
from l2tscaffolder.scaffolders import plaso
from l2tscaffolder.scaffolders import manager

[docs]class SQLQuestion(interface.DictQuestion): """SQL Query question."""
[docs] def ValidateAnswer(self, answer: dict): """Validates the answer to the SQL query question. The answer should be a dict that has query names as key values and valid SQLite commands as values. This function attempts to verify that the SQL commands do not have syntax errors in them by attempting to run it against an empty SQLite database stored in memory. The function also makes sure the key value confirms to the style guide of plaso, to be in the form of CamelCase, eg. BookmarkRow. Args: answer (dict): the answer to the question asked. Raises: errors.UnableToConfigure: if the answer is invalid. """ super(SQLQuestion, self).ValidateAnswer(answer) temp_db = sqlite3.connect(':memory:') for query_name, query in answer.items(): if ' ' in query_name: raise errors.UnableToConfigure(( 'Wrong format for key value, key cannot contain spaces ' '[{0:s}]').format(query_name)) if not query_name[0].isupper(): raise errors.UnableToConfigure(( 'Wrong format for key value, key needs to start with an ' 'upper case letter [{0:s}]').format(query_name)) if query_name.startswith('Parse'): raise errors.UnableToConfigure(( 'Wrong format for key value, key should not start with the ' 'Parse (that is added by template) [{0:s}]').format(query_name)) try: temp_db.execute(query) except ValueError as exception: raise errors.UnableToConfigure( 'Unable to run query [{0:s}] with error: {1!s}'.format( query_name, exception)) except (sqlite3.DatabaseError, sqlite3.OperationalError) as exception: if str(exception).endswith('syntax error'): raise errors.UnableToConfigure( 'Unable to run query [{0:s}] with error: {1!s}'.format( query_name, exception)) star_items = [x for x in query.split() if x == '*'] if star_items: raise errors.UnableToConfigure(( 'Unable to generate parser while using * to select columns. ' 'Please adjust your SELECT statements to include explicit ' 'column names.'))
[docs]class PlasoSQLiteScaffolder(plaso.PlasoPluginScaffolder): """The plaso SQLite plugin scaffolder. Attributes: database_name (str): name of the test SQLite database for the plugin. database_schema (dict): a dict containing all table names (keys) and the SQL statement used to create the table (value), derived from the test database. data_types (dict): a dict containing all the data types generated for the parser, the key is the name for each SQL statement run against the database and the value is the data type used for each generated event resulting from that SQL statement. queries (dict): a dict containing query name and SQL statements or queries run against the database. query_columns (dict): for each SQL statement run against the database, with the key being query name and value being a list of all SQL column names that are returned for each query. required_tables (list): a list of all required tables needed for the plugin to parse this particular database. timestamp_columns (dict): a dict containing a list of all columns with timestamp values, with query names as the key. """ # The name of the plugin or parser this scaffolder provides. NAME = 'sqlite' DESCRIPTION = 'Provides a scaffolder to generate a plaso SQLite plugin.' SCHEMA_QUERY = ( 'SELECT tbl_name, sql ' 'FROM sqlite_master ' 'WHERE type = "table" AND tbl_name != "xp_proc" ' 'AND tbl_name != "sqlite_sequence"') # Filenames of templates. TEMPLATE_PARSER_FILE = 'sqlite_plugin.jinja2' TEMPLATE_PARSER_TEST = 'sqlite_plugin_test.jinja2' TEMPLATE_FORMATTER_FILE = 'sqlite_plugin_formatter.jinja2' TEMPLATE_FORMATTER_TEST = 'sqlite_plugin_formatter_test.jinja2' # Questions, a list that contains all the needed questions that the # user should be prompted about before the plugin or parser is created. # Each element in the list needs to be an instance of BaseQuestion. QUESTIONS = [ SQLQuestion( attribute='queries', prompt=( 'Define the name of the callback function (key) that will be\n' 'called for every row returned from the SQL query (value).\n' 'The plugin will execute the SQL query and call the callback\n' 'once for each resulting row. The name of the function should\n' 'follow style guide and be descriptive. An example of that is\n' 'a SQL statement that fetches bookmarks, the key name should be\n' 'Bookmark, or if the SQL statement collects GPS coordinates\n' 'it could be called Location.'), key_prompt='Callback function name', value_prompt='SQL Statement'), interface.ListQuestion( attribute='required_tables', prompt='List of required tables')] def __init__(self): """Initializes the plaso SQLite plugin scaffolder.""" super(PlasoSQLiteScaffolder, self).__init__() self.database_name = '' self.database_schema = {} self.data_types = {} self.queries = {} self.query_columns = {} self.required_tables = [] self.timestamp_columns = {} def _GetQueryColumns(self, query: str) -> Iterator[str]: """Generates column names from a SQL statement. Args: query (str): a SQL query. Yields: str: a column name extracted from a SQL statement. """ _, _, query_string = query.lower().partition('select') query_column_string, _, _ = query_string.partition('from') for column in query_column_string.split(','): _, _, column_alias = column.partition(' as ') column = column_alias or column _, _, column = column.rpartition('.') if not column: continue yield column.strip() def _GetSchema(self, database_path: str) -> Dict[str, str]: """Returns the schema of a SQLite database as a dict. Args: database_path (str): full path to the SQLite database. Returns: dict: containing: str: name of a table in the str: SQL command that was used to create the table. Raises: sqlite3.DatabaseError: if the database cannot be read. """ database = sqlite3.connect(database_path) try: database.row_factory = sqlite3.Row cursor = database.cursor() sql_results = cursor.execute(self.SCHEMA_QUERY) schema = { table_name: ' '.join(query.split()) for table_name, query in sql_results} except sqlite3.DatabaseError: database.close() raise return schema
[docs] def GetJinjaContext(self) -> Dict[str, object]: """Returns a dict that can be used as a context for Jinja2 templates. Returns: dict: containing: str: name of Jinja argument. object: Jinja argument value. """ context = super(PlasoSQLiteScaffolder, self).GetJinjaContext() context['database_name'] = self.database_name context['database_schema'] = self.database_schema context['data_types'] = self.data_types context['queries'] = self.queries context['query_columns'] = self.query_columns context['required_tables'] = self.required_tables context['timestamp_columns'] = self.timestamp_columns return context
[docs] def GenerateFiles(self) -> Iterator[Tuple[str, str]]: """Generates files required for the SQLite plugin. Yields: tuple: file name and content of the file to be written to disk. Raises: errors.UnableToConfigure: if it is not possible to generate the files. """ _, _, database_name = self.test_file.rpartition(os.sep) self.database_name = database_name self.data_types = {} sql_columns = {} timestamp_columns = {} for query_name, query in self.queries.items(): self.data_types[query_name] = '{0:s}:{1:s}'.format( self._output_name.lower().replace('_', ':'), query_name.lower()) timestamp_columns[query_name] = [] sql_columns[query_name] = [] for column in self._GetQueryColumns(query): if column == '*': raise errors.UnableToConfigure( 'Cannot use a "*" as a column name, please select a different ' 'SELECT statement.') if 'time' in column: timestamp_columns[query_name].append(column.strip()) sql_columns[query_name].append(column) self.query_columns = sql_columns self.timestamp_columns = timestamp_columns self.database_schema = self._GetSchema(self.test_file) for name, content in super(PlasoSQLiteScaffolder, self).GenerateFiles(): yield name, content