Source code for cenpy.explorer

import requests as r
from json import JSONDecodeError
from six import iteritems as diter
import pandas as pd
import os
import six

if six.PY3:
    unicode = str

fp = os.path.dirname(os.path.realpath(__file__))

resp = raw_APIs = r.get('https://api.census.gov/data.json')
try:
    resp.raise_for_status()
    raw_APIs = resp.json()['dataset']
    APIs = {entry['identifier'].split('id')[-1].lstrip('/'): {key: value for key,
                                                              value in diter(entry) if key != entry['identifier']} for entry in raw_APIs}
except r.HTTPError:
    raise r.HTTPError('The main Census API Endpoint (https://api.census.gov/data.json) is not available.'
                      ' Try visiting https://api.census.gov/data.json in a web browser to verify connectivity.')
except JSONDecodeError:
    raise JSONDecodeError('The main Census API Endpoint (https://api.census.gov/data.json) returned malformed content.'
                          ' Try visiting https://api.census.gov/data.json in a web browser to verify connectivity.')

[docs]def available(verbose=True): """ Returns available identifiers for Census Data APIs. NOTE: we do not support the Economic Indicators Time Series API yet. Parameters ---------- verbose : bool whether to provide ID and title or just ID (default: True) Returns -------- list identifiers (if verbose: and dataset names) """ av_apis = [api for api in APIs.keys() if 'eits' not in api] av_apis = [api for api in av_apis if APIs[api] ['distribution'][0]['format'] == 'API'] if verbose: return _parse_results_table_from_response(raw_APIs).sort_index() else: return av_apis
def _parse_results_table_from_response(datajson): """ parse the raw data.json response into something more useful """ raw_table = pd.DataFrame(raw_APIs) shortcodes = [entry['identifier'].split('id')[-1].lstrip('/') for entry in raw_APIs] raw_table.index = shortcodes raw_table = raw_table[[ col for col in raw_table.columns if not col.startswith('@')]] listcols = raw_table.applymap(lambda x: isinstance(x, list)).any() listcols = listcols.index[listcols] raw_table[listcols] = raw_table[listcols].apply(_delist) raw_table['publisher'] = raw_table['publisher'].apply( lambda x: x.get('name', None)) raw_table.rename(columns=dict(identifier='identifier_url', c_vintage='vintage'), inplace=True) for col in raw_table: if isinstance(raw_table[col].iloc[0], str): if raw_table[col].iloc[0].startswith('http://'): raw_table.drop(col, axis=1, inplace=True) return raw_table[raw_table.columns[::-1]] def _delist(series): """ turn listed cols into tuples, or extract their single element """ series = series.copy(deep=True) lens = series.dropna().apply(len).unique() if len(lens) > 1: # cast to tuples series[~series.isnull()] = series.dropna().apply(tuple) elif len(lens) == 1 and lens.item() == 1: # grab single element series[~series.isnull()] = series.dropna().apply(lambda x: x[0]) return series
[docs]def explain(identifier=None, verbose=False): """ Explains datasets currently available via the census API Parameters ---------- identifier : string shortcode identifying which dataset in the API to use verbose : bool flag governing whether to provide full API record or just title and description. (default: False) Returns -------- dict title and description (if verbose: and full API information) """ if identifier is None: raise ValueError( 'No identifier provided. Use available() to discover identifiers') elif not verbose: return {APIs[identifier]['title']: APIs[identifier]['description']} else: return APIs[identifier]
[docs]def fips_table(kind, in_state=''): """ Pulls a table of FIPS codes for reference Parameters ---------- kind : str identifying the kind of census geography needed, down to sub-county or VTD fips in_state : str filter to only grab fips codes from within a state. Use to avoid large data downloads if you're looking for specific data. (default: '') Returns -------- pandas.DataFrame fips codes and names of the geographies in question """ qurl = u'https://www2.census.gov/geo/docs/reference/codes/files/' tdict = {'AIA': 'aia.txt', 'COUNTY': 'county.txt', 'SUBCOUNTY': 'cousub.txt', 'PLACE': 'places.txt', 'SCHOOLDISTRICT': 'schdist.txt', 'VTD': 'vtd.txt', 'STATE': None} kind = kind.upper() if len(kind.split(' ')) > 1: kind = ''.join(kind.split(' ')) in_state = in_state.upper() stfips = pd.read_csv(fp + '/stfipstable.csv') if kind == 'STATE': return stfips elif kind in tdict.keys(): if in_state == '': qurl += 'national_' + tdict[kind] else: if in_state in stfips['State Abbreviation'].tolist(): fips = stfips[stfips['State Abbreviation'] == in_state]['FIPS Code'].values[0] elif in_state in stfips['State Name'].tolist(): fips = stfips[stfips['State Name'] == in_state]['FIPS Code'].values[0] in_state = stfips[stfips['State Name'] == in_state]['State Abbreviation'].values[0] elif in_state in stfips['FIPS Code'].tolist(): fips = in_state in_state = stfips[stfips['FIPS Code' == fips] ]['State Abbreviation'].values[0] else: raise KeyError('Did not find State Abbreviation or Name') if kind == 'COUNTY': qurl += 'st' + unicode(fips).rjust(2, '0') + '_' + \ unicode(in_state).lower() + '_' + 'cou.txt' else: qurl += 'st' + unicode(fips).rjust(2, '0') + '_' + \ unicode(in_state).lower() + '_' + tdict[kind] else: raise KeyError('Requested Kind not in ', tdict.keys()) if kind in ['PLACE', 'VTD']: sep = '|' header = 0 else: sep = ',' header = None return pd.read_csv(qurl, sep=sep, header=header, encoding='latin1')