|
import decimal |
|
import json as _json |
|
import sys |
|
import re |
|
from functools import reduce |
|
|
|
from _plotly_utils.optional_imports import get_module |
|
from _plotly_utils.basevalidators import ImageUriValidator |
|
|
|
|
|
def cumsum(x): |
|
""" |
|
Custom cumsum to avoid a numpy import. |
|
""" |
|
|
|
def _reducer(a, x): |
|
if len(a) == 0: |
|
return [x] |
|
return a + [a[-1] + x] |
|
|
|
ret = reduce(_reducer, x, []) |
|
return ret |
|
|
|
|
|
class PlotlyJSONEncoder(_json.JSONEncoder): |
|
""" |
|
Meant to be passed as the `cls` kwarg to json.dumps(obj, cls=..) |
|
|
|
See PlotlyJSONEncoder.default for more implementation information. |
|
|
|
Additionally, this encoder overrides nan functionality so that 'Inf', |
|
'NaN' and '-Inf' encode to 'null'. Which is stricter JSON than the Python |
|
version. |
|
|
|
""" |
|
|
|
def coerce_to_strict(self, const): |
|
""" |
|
This is used to ultimately *encode* into strict JSON, see `encode` |
|
|
|
""" |
|
|
|
if const in ("Infinity", "-Infinity", "NaN"): |
|
return None |
|
else: |
|
return const |
|
|
|
def encode(self, o): |
|
""" |
|
Load and then dump the result using parse_constant kwarg |
|
|
|
Note that setting invalid separators will cause a failure at this step. |
|
|
|
""" |
|
|
|
encoded_o = super(PlotlyJSONEncoder, self).encode(o) |
|
|
|
|
|
|
|
|
|
|
|
if not ("NaN" in encoded_o or "Infinity" in encoded_o): |
|
return encoded_o |
|
|
|
|
|
|
|
|
|
try: |
|
new_o = _json.loads(encoded_o, parse_constant=self.coerce_to_strict) |
|
except ValueError: |
|
|
|
|
|
raise ValueError( |
|
"Encoding into strict JSON failed. Did you set the separators " |
|
"valid JSON separators?" |
|
) |
|
else: |
|
return _json.dumps( |
|
new_o, |
|
sort_keys=self.sort_keys, |
|
indent=self.indent, |
|
separators=(self.item_separator, self.key_separator), |
|
) |
|
|
|
def default(self, obj): |
|
""" |
|
Accept an object (of unknown type) and try to encode with priority: |
|
1. builtin: user-defined objects |
|
2. sage: sage math cloud |
|
3. pandas: dataframes/series |
|
4. numpy: ndarrays |
|
5. datetime: time/datetime objects |
|
|
|
Each method throws a NotEncoded exception if it fails. |
|
|
|
The default method will only get hit if the object is not a type that |
|
is naturally encoded by json: |
|
|
|
Normal objects: |
|
dict object |
|
list, tuple array |
|
str, unicode string |
|
int, long, float number |
|
True true |
|
False false |
|
None null |
|
|
|
Extended objects: |
|
float('nan') 'NaN' |
|
float('infinity') 'Infinity' |
|
float('-infinity') '-Infinity' |
|
|
|
Therefore, we only anticipate either unknown iterables or values here. |
|
|
|
""" |
|
|
|
encoding_methods = ( |
|
self.encode_as_plotly, |
|
self.encode_as_sage, |
|
self.encode_as_numpy, |
|
self.encode_as_pandas, |
|
self.encode_as_datetime, |
|
self.encode_as_date, |
|
self.encode_as_list, |
|
self.encode_as_decimal, |
|
self.encode_as_pil, |
|
) |
|
for encoding_method in encoding_methods: |
|
try: |
|
return encoding_method(obj) |
|
except NotEncodable: |
|
pass |
|
return _json.JSONEncoder.default(self, obj) |
|
|
|
@staticmethod |
|
def encode_as_plotly(obj): |
|
"""Attempt to use a builtin `to_plotly_json` method.""" |
|
try: |
|
return obj.to_plotly_json() |
|
except AttributeError: |
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_list(obj): |
|
"""Attempt to use `tolist` method to convert to normal Python list.""" |
|
if hasattr(obj, "tolist"): |
|
return obj.tolist() |
|
else: |
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_sage(obj): |
|
"""Attempt to convert sage.all.RR to floats and sage.all.ZZ to ints""" |
|
sage_all = get_module("sage.all") |
|
if not sage_all: |
|
raise NotEncodable |
|
|
|
if obj in sage_all.RR: |
|
return float(obj) |
|
elif obj in sage_all.ZZ: |
|
return int(obj) |
|
else: |
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_pandas(obj): |
|
"""Attempt to convert pandas.NaT / pandas.NA""" |
|
pandas = get_module("pandas", should_load=False) |
|
if not pandas: |
|
raise NotEncodable |
|
|
|
if obj is pandas.NaT: |
|
return None |
|
|
|
|
|
if hasattr(pandas, "NA") and obj is pandas.NA: |
|
return None |
|
|
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_numpy(obj): |
|
"""Attempt to convert numpy.ma.core.masked""" |
|
numpy = get_module("numpy", should_load=False) |
|
if not numpy: |
|
raise NotEncodable |
|
|
|
if obj is numpy.ma.core.masked: |
|
return float("nan") |
|
elif isinstance(obj, numpy.ndarray) and obj.dtype.kind == "M": |
|
try: |
|
return numpy.datetime_as_string(obj).tolist() |
|
except TypeError: |
|
pass |
|
|
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_datetime(obj): |
|
"""Convert datetime objects to iso-format strings""" |
|
try: |
|
return obj.isoformat() |
|
except AttributeError: |
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_date(obj): |
|
"""Attempt to convert to utc-iso time string using date methods.""" |
|
try: |
|
time_string = obj.isoformat() |
|
except AttributeError: |
|
raise NotEncodable |
|
else: |
|
return iso_to_plotly_time_string(time_string) |
|
|
|
@staticmethod |
|
def encode_as_decimal(obj): |
|
"""Attempt to encode decimal by converting it to float""" |
|
if isinstance(obj, decimal.Decimal): |
|
return float(obj) |
|
else: |
|
raise NotEncodable |
|
|
|
@staticmethod |
|
def encode_as_pil(obj): |
|
"""Attempt to convert PIL.Image.Image to base64 data uri""" |
|
image = get_module("PIL.Image") |
|
if image is not None and isinstance(obj, image.Image): |
|
return ImageUriValidator.pil_image_to_uri(obj) |
|
else: |
|
raise NotEncodable |
|
|
|
|
|
class NotEncodable(Exception): |
|
pass |
|
|
|
|
|
def iso_to_plotly_time_string(iso_string): |
|
"""Remove timezone info and replace 'T' delimeter with ' ' (ws).""" |
|
|
|
if (iso_string.split("-")[:3] == "00:00") or (iso_string.split("+")[0] == "00:00"): |
|
raise Exception( |
|
"Plotly won't accept timestrings with timezone info.\n" |
|
"All timestrings are assumed to be in UTC." |
|
) |
|
|
|
iso_string = iso_string.replace("-00:00", "").replace("+00:00", "") |
|
|
|
if iso_string.endswith("T00:00:00"): |
|
return iso_string.replace("T00:00:00", "") |
|
else: |
|
return iso_string.replace("T", " ") |
|
|
|
|
|
def template_doc(**names): |
|
def _decorator(func): |
|
if not sys.version_info[:2] == (3, 2): |
|
if func.__doc__ is not None: |
|
func.__doc__ = func.__doc__.format(**names) |
|
return func |
|
|
|
return _decorator |
|
|
|
|
|
def _natural_sort_strings(vals, reverse=False): |
|
def key(v): |
|
v_parts = re.split(r"(\d+)", v) |
|
for i in range(len(v_parts)): |
|
try: |
|
v_parts[i] = int(v_parts[i]) |
|
except ValueError: |
|
|
|
pass |
|
return tuple(v_parts) |
|
|
|
return sorted(vals, key=key, reverse=reverse) |
|
|
|
|
|
def _get_int_type(): |
|
np = get_module("numpy", should_load=False) |
|
if np: |
|
int_type = (int, np.integer) |
|
else: |
|
int_type = (int,) |
|
return int_type |
|
|
|
|
|
def split_multichar(ss, chars): |
|
""" |
|
Split all the strings in ss at any of the characters in chars. |
|
Example: |
|
|
|
>>> ss = ["a.string[0].with_separators"] |
|
>>> chars = list(".[]_") |
|
>>> split_multichar(ss, chars) |
|
['a', 'string', '0', '', 'with', 'separators'] |
|
|
|
:param (list) ss: A list of strings. |
|
:param (list) chars: Is a list of chars (note: not a string). |
|
""" |
|
if len(chars) == 0: |
|
return ss |
|
c = chars.pop() |
|
ss = reduce(lambda x, y: x + y, map(lambda x: x.split(c), ss)) |
|
return split_multichar(ss, chars) |
|
|
|
|
|
def split_string_positions(ss): |
|
""" |
|
Given a list of strings split using split_multichar, return a list of |
|
integers representing the indices of the first character of every string in |
|
the original string. |
|
Example: |
|
|
|
>>> ss = ["a.string[0].with_separators"] |
|
>>> chars = list(".[]_") |
|
>>> ss_split = split_multichar(ss, chars) |
|
>>> ss_split |
|
['a', 'string', '0', '', 'with', 'separators'] |
|
>>> split_string_positions(ss_split) |
|
[0, 2, 9, 11, 12, 17] |
|
|
|
:param (list) ss: A list of strings. |
|
""" |
|
return list( |
|
map( |
|
lambda t: t[0] + t[1], |
|
zip(range(len(ss)), cumsum([0] + list(map(len, ss[:-1])))), |
|
) |
|
) |
|
|
|
|
|
def display_string_positions(p, i=None, offset=0, length=1, char="^", trim=True): |
|
""" |
|
Return a string that is whitespace except at p[i] which is replaced with char. |
|
If i is None then all the indices of the string in p are replaced with char. |
|
|
|
Example: |
|
|
|
>>> ss = ["a.string[0].with_separators"] |
|
>>> chars = list(".[]_") |
|
>>> ss_split = split_multichar(ss, chars) |
|
>>> ss_split |
|
['a', 'string', '0', '', 'with', 'separators'] |
|
>>> ss_pos = split_string_positions(ss_split) |
|
>>> ss[0] |
|
'a.string[0].with_separators' |
|
>>> display_string_positions(ss_pos,4) |
|
' ^' |
|
>>> display_string_positions(ss_pos,4,offset=1,length=3,char="~",trim=False) |
|
' ~~~ ' |
|
>>> display_string_positions(ss_pos) |
|
'^ ^ ^ ^^ ^' |
|
:param (list) p: A list of integers. |
|
:param (integer|None) i: Optional index of p to display. |
|
:param (integer) offset: Allows adding a number of spaces to the replacement. |
|
:param (integer) length: Allows adding a replacement that is the char |
|
repeated length times. |
|
:param (str) char: allows customizing the replacement character. |
|
:param (boolean) trim: trims the remaining whitespace if True. |
|
""" |
|
s = [" " for _ in range(max(p) + 1 + offset + length)] |
|
maxaddr = 0 |
|
if i is None: |
|
for p_ in p: |
|
for l in range(length): |
|
maxaddr = p_ + offset + l |
|
s[maxaddr] = char |
|
else: |
|
for l in range(length): |
|
maxaddr = p[i] + offset + l |
|
s[maxaddr] = char |
|
ret = "".join(s) |
|
if trim: |
|
ret = ret[: maxaddr + 1] |
|
return ret |
|
|
|
|
|
def chomp_empty_strings(strings, c, reverse=False): |
|
""" |
|
Given a list of strings, some of which are the empty string "", replace the |
|
empty strings with c and combine them with the closest non-empty string on |
|
the left or "" if it is the first string. |
|
Examples: |
|
for c="_" |
|
['hey', '', 'why', '', '', 'whoa', '', ''] -> ['hey_', 'why__', 'whoa__'] |
|
['', 'hi', '', "I'm", 'bob', '', ''] -> ['_', 'hi_', "I'm", 'bob__'] |
|
['hi', "i'm", 'a', 'good', 'string'] -> ['hi', "i'm", 'a', 'good', 'string'] |
|
Some special cases are: |
|
[] -> [] |
|
[''] -> [''] |
|
['', ''] -> ['_'] |
|
['', '', '', ''] -> ['___'] |
|
If reverse is true, empty strings are combined with closest non-empty string |
|
on the right or "" if it is the last string. |
|
""" |
|
|
|
def _rev(l): |
|
return [s[::-1] for s in l][::-1] |
|
|
|
if reverse: |
|
return _rev(chomp_empty_strings(_rev(strings), c)) |
|
if not len(strings): |
|
return strings |
|
if sum(map(len, strings)) == 0: |
|
return [c * (len(strings) - 1)] |
|
|
|
class _Chomper: |
|
def __init__(self, c): |
|
self.c = c |
|
|
|
def __call__(self, x, y): |
|
|
|
|
|
|
|
|
|
if len(y) == 0: |
|
return x[:-1] + [x[-1] + self.c] |
|
else: |
|
return x + [y] |
|
|
|
return list(filter(len, reduce(_Chomper(c), strings, [""]))) |
|
|
|
|
|
|
|
|
|
def levenshtein(s1, s2): |
|
if len(s1) < len(s2): |
|
return levenshtein(s2, s1) |
|
if len(s2) == 0: |
|
return len(s1) |
|
previous_row = range(len(s2) + 1) |
|
for i, c1 in enumerate(s1): |
|
current_row = [i + 1] |
|
for j, c2 in enumerate(s2): |
|
|
|
|
|
insertions = previous_row[j + 1] + 1 |
|
deletions = current_row[j] + 1 |
|
substitutions = previous_row[j] + (c1 != c2) |
|
current_row.append(min(insertions, deletions, substitutions)) |
|
previous_row = current_row |
|
return previous_row[-1] |
|
|
|
|
|
def find_closest_string(string, strings): |
|
def _key(s): |
|
|
|
|
|
return (levenshtein(s, string), s) |
|
|
|
return sorted(strings, key=_key)[0] |
|
|