Managing datasets¶
Datasets belong to a given project, so all access to datasets implies getting a handle of the project first.
Basic operations¶
The list of all datasets of a project is accessible via list_datasets()
project = client.get_project('TEST_PROJECT')
datasets = project.list_datasets()
prettyprinter.pprint(datasets)
outputs
[ { 'checklists': { 'checklists': []},
'customMeta': { 'kv': { }},
'flowOptions': { 'crossProjectBuildBehavior': 'DEFAULT',
'rebuildBehavior': 'NORMAL'},
'formatParams': { 'arrayMapFormat': 'json',
'charset': 'utf8',
'compress': '',
'dateSerializationFormat': 'ISO',
'escapeChar': '\\',
'hiveSeparators': [ '\x02',
'\x03',
'\x04',
'\x05',
'\x06',
'\x07',
'\x08'],
'parseHeaderRow': True,
'probableNumberOfRecords': 8,
'quoteChar': '"',
'separator': ',',
'skipRowsAfterHeader': 0,
'skipRowsBeforeHeader': 0,
'style': 'excel'},
'formatType': 'csv',
'managed': False,
'name': 'train_set',
'params': { },
'partitioning': { 'dimensions': [], 'ignoreNonMatchingFile': False},
'projectKey': 'TEST_PROJECT',
'schema': { 'columns': [ { 'maxLength': -1,
'name': 'col0',
'type': 'string'},
{ 'maxLength': -1,
'name': 'col1',
'type': 'string'},
...
],
'userModified': False},
'tags': ['creator_admin'],
'type': 'UploadedFiles'},
...
]
Datasets can be created. For example loading the csv files of a folder
project = client.get_project('TEST_PROJECT')
folder_path = 'path/to/folder/'
for file in listdir(folder_path):
if not file.endswith('.csv'):
continue
dataset = project.create_dataset(file[:-4] # dot is not allowed in dataset names
,'Filesystem'
, params={
'connection': 'filesystem_root'
,'path': folder_path + file
}, formatType='csv'
, formatParams={
'separator': ','
,'style': 'excel' # excel-style quoting
,'parseHeaderRow': True
})
df = pandas.read_csv(folder_path + file)
dataset.set_schema({'columns': [{'name': column, 'type':'string'} for column in df.columns]})
Datasets can be deleted.
dataset = project.get_dataset('TEST_DATASET')
dataset.delete()
The metadata of the dataset can be modified. It is advised to first retrieve the current settings state with the get_metadata call, modify the returned object, and then set it back on the DSS instance.
dataset_metadata = dataset.get_metadata()
dataset_metadata['tag'] = ['tag1','tag2']
dataset.set_metadata(dataset_metadata)
Accessing the dataset data¶
The data of a dataset can be streamed over http to the API client with the iter_rows() method. This call returns the raw data, so that in most cases it is necessary to first get the dataset’s schema with a call to get_schema(). For example, printing the first 10 rows can be done with
columns = [column['name'] for column in dataset.get_schema()['columns']]
print(columns)
row_count = 0
for row in dataset.iter_rows():
print(row)
row_count = row_count + 1
if row_count >= 10:
break
outputs
['tube_assembly_id', 'supplier', 'quote_date', 'annual_usage', 'min_order_quantity', 'bracket_pricing', 'quantity', 'cost']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '1', '21.9059330191461']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '2', '12.3412139792904']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '5', '6.60182614356538']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '10', '4.6877695119712']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '25', '3.54156118026073']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '50', '3.22440644770007']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '100', '3.08252143576504']
['TA-00002', 'S-0066', '2013-07-07', '0', '0', 'Yes', '250', '2.99905966403855']
['TA-00004', 'S-0066', '2013-07-07', '0', '0', 'Yes', '1', '21.9727024365273']
['TA-00004', 'S-0066', '2013-07-07', '0', '0', 'Yes', '2', '12.4079833966715']
The schema of a data can be modified with the set_schema() method:
schema = dataset.get_schema()
schema['columns'].append({'name' : 'new_column', 'type' : 'bigint'})
dataset.set_schema(schema)
For partitioned datasets, the list of partitions is retrieved with list_partitions():
partitions = dataset.list_partitions()
And the data of a given partition can be retrieved by passing the appropriate partition spec as parameter to iter_rows():
row_count = 0
for row in dataset.iter_rows(partitions='partition_spec1,partition_spec2'):
print(row)
row_count = row_count + 1
if row_count >= 10:
break
Dataset operations¶
The rows of the dataset can be cleared, entirely or on a per-partition basis, with the clear() method.
dataset = project.get_dataset('SOME_DATASET')
dataset.clear(['partition_spec_1', 'partition_spec_2']) # clears specified partitions
dataset.clear() # clears all partitions
For datasets associated with a table in the Hive metastore, the synchronization of the table definition in the metastore with the dataset’s schema in DSS will be needed before it can be visible to Hive, and usable by Impala queries.
dataset = project.get_dataset('SOME_HDFS_DATASET')
dataset.synchronize_hive_metastore()