Python源码示例:binascii.Error()
示例1
def b64decode(s, altchars=None, validate=False):
"""Decode a Base64 encoded byte string.
s is the byte string to decode. Optional altchars must be a
string of length 2 which specifies the alternative alphabet used
instead of the '+' and '/' characters.
The decoded string is returned. A binascii.Error is raised if s is
incorrectly padded.
If validate is False (the default), non-base64-alphabet characters are
discarded prior to the padding check. If validate is True,
non-base64-alphabet characters in the input result in a binascii.Error.
"""
s = _bytes_from_decode_data(s)
if altchars is not None:
altchars = _bytes_from_decode_data(altchars)
assert len(altchars) == 2, repr(altchars)
s = s.translate(bytes.maketrans(altchars, b'+/'))
if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s):
raise binascii.Error('Non-base64 digit found')
return binascii.a2b_base64(s)
示例2
def urlsafe_b64decode(s):
"""Decode a byte string encoded with the standard Base64 alphabet.
s is the byte string to decode. The decoded byte string is
returned. binascii.Error is raised if the input is incorrectly
padded or if there are non-alphabet characters present in the
input.
The alphabet uses '-' instead of '+' and '_' instead of '/'.
"""
s = _bytes_from_decode_data(s)
s = s.translate(_urlsafe_decode_translation)
return b64decode(s)
# Base32 encoding/decoding must be done in Python
示例3
def b16decode(s, casefold=False):
"""Decode a Base16 encoded byte string.
s is the byte string to decode. Optional casefold is a flag
specifying whether a lowercase alphabet is acceptable as input.
For security purposes, the default is False.
The decoded byte string is returned. binascii.Error is raised if
s were incorrectly padded or if there are non-alphabet characters
present in the string.
"""
s = _bytes_from_decode_data(s)
if casefold:
s = s.upper()
if re.search(b'[^0-9A-F]', s):
raise binascii.Error('Non-base16 digit found')
return binascii.unhexlify(s)
# Legacy interface. This code could be cleaned up since I don't believe
# binascii has any line length limitations. It just doesn't seem worth it
# though. The files should be opened in binary mode.
示例4
def wrap_exception():
def helper(function):
@functools.wraps(function)
def decorated_function(self, context, container, *args, **kwargs):
try:
return function(self, context, container, *args, **kwargs)
except exception.DockerError as e:
with excutils.save_and_reraise_exception(reraise=False):
LOG.error("Error occurred while calling Docker API: %s",
str(e))
except Exception as e:
with excutils.save_and_reraise_exception(reraise=False):
LOG.exception("Unexpected exception: %s", str(e))
return decorated_function
return helper
示例5
def _handle_request_exception(self, e):
if isinstance(e, Finish):
# Not an error; just finish the request without logging.
if not self._finished:
self.finish(*e.args)
return
try:
self.log_exception(*sys.exc_info())
except Exception:
# An error here should still get a best-effort send_error()
# to avoid leaking the connection.
app_log.error("Error in exception logger", exc_info=True)
if self._finished:
# Extra errors after the request has been finished should
# be logged, but there is no reason to continue to try and
# send a response.
return
if isinstance(e, HTTPError):
if e.status_code not in httputil.responses and not e.reason:
gen_log.error("Bad HTTP status code: %d", e.status_code)
self.send_error(500, exc_info=sys.exc_info())
else:
self.send_error(e.status_code, exc_info=sys.exc_info())
else:
self.send_error(500, exc_info=sys.exc_info())
示例6
def _handle_request_exception(self, e):
if isinstance(e, Finish):
# Not an error; just finish the request without logging.
if not self._finished:
self.finish(*e.args)
return
try:
self.log_exception(*sys.exc_info())
except Exception:
# An error here should still get a best-effort send_error()
# to avoid leaking the connection.
app_log.error("Error in exception logger", exc_info=True)
if self._finished:
# Extra errors after the request has been finished should
# be logged, but there is no reason to continue to try and
# send a response.
return
if isinstance(e, HTTPError):
if e.status_code not in httputil.responses and not e.reason:
gen_log.error("Bad HTTP status code: %d", e.status_code)
self.send_error(500, exc_info=sys.exc_info())
else:
self.send_error(e.status_code, exc_info=sys.exc_info())
else:
self.send_error(500, exc_info=sys.exc_info())
示例7
def is_valid_user_id(b64_content: str) -> bool:
"""
Check potential token to see if it contains a valid Discord user ID.
See: https://discordapp.com/developers/docs/reference#snowflakes
"""
b64_content = utils.pad_base64(b64_content)
try:
decoded_bytes = base64.urlsafe_b64decode(b64_content)
string = decoded_bytes.decode('utf-8')
# isdigit on its own would match a lot of other Unicode characters, hence the isascii.
return string.isascii() and string.isdigit()
except (binascii.Error, ValueError):
return False
示例8
def is_valid_timestamp(b64_content: str) -> bool:
"""
Return True if `b64_content` decodes to a valid timestamp.
If the timestamp is greater than the Discord epoch, it's probably valid.
See: https://i.imgur.com/7WdehGn.png
"""
b64_content = utils.pad_base64(b64_content)
try:
decoded_bytes = base64.urlsafe_b64decode(b64_content)
timestamp = int.from_bytes(decoded_bytes, byteorder="big")
except (binascii.Error, ValueError) as e:
log.debug(f"Failed to decode token timestamp '{b64_content}': {e}")
return False
# Seems like newer tokens don't need the epoch added, but add anyway since an upper bound
# is not checked.
if timestamp + TOKEN_EPOCH >= DISCORD_EPOCH:
return True
else:
log.debug(f"Invalid token timestamp '{b64_content}': smaller than Discord epoch")
return False
示例9
def parse_netntlm_resp_msg(headers, resp_header, seq):
'''
Parse the client response to the challenge
'''
try:
header_val3 = headers[resp_header]
except KeyError:
return
header_val3 = header_val3.split(' ', 1)
# The header value can either start with NTLM or Negotiate
if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate':
try:
msg3 = base64.decodestring(header_val3[1])
except binascii.Error:
return
return parse_ntlm_resp(msg3, seq)
示例10
def parse_netntlm_resp_msg(headers, resp_header, seq):
'''
Parse the client response to the challenge
'''
try:
header_val3 = headers[resp_header]
except KeyError:
return
header_val3 = header_val3.split(' ', 1)
# The header value can either start with NTLM or Negotiate
if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate':
try:
msg3 = base64.decodestring(header_val3[1])
except binascii.Error:
return
return parse_ntlm_resp(msg3, seq)
示例11
def load_user_from_auth_header(header_val):
if header_val.startswith('Basic '):
header_val = header_val.replace('Basic ', '', 1)
basic_username = basic_password = ''
try:
header_val = base64.b64decode(header_val).decode('utf-8')
basic_username = header_val.split(':')[0]
basic_password = header_val.split(':')[1]
except (TypeError, UnicodeDecodeError, binascii.Error):
pass
user = _fetch_user_by_name(basic_username)
if user and config.config_login_type == constants.LOGIN_LDAP and services.ldap:
if services.ldap.bind_user(str(user.password), basic_password):
return user
if user and check_password_hash(str(user.password), basic_password):
return user
return
示例12
def _get_user(self, environ):
user = None
http_auth = environ.get("HTTP_AUTHORIZATION")
if http_auth and http_auth.startswith('Basic'):
auth = http_auth.split(" ", 1)
if len(auth) == 2:
try:
# b64decode doesn't accept unicode in Python < 3.3
# so we need to convert it to a byte string
auth = base64.b64decode(auth[1].strip().encode('utf-8'))
if PY3: # b64decode returns a byte string in Python 3
auth = auth.decode('utf-8')
auth = auth.split(":", 1)
except TypeError as exc:
self.debug("Couldn't get username: %s", exc)
return user
except binascii.Error as exc:
self.debug("Couldn't get username: %s", exc)
return user
if len(auth) == 2:
user = auth[0]
return user
示例13
def b64decode(s, altchars=None):
"""Decode a Base64 encoded string.
s is the string to decode. Optional altchars must be a string of at least
length 2 (additional characters are ignored) which specifies the
alternative alphabet used instead of the '+' and '/' characters.
The decoded string is returned. A TypeError is raised if s were
incorrectly padded or if there are non-alphabet characters present in the
string.
"""
if altchars is not None:
s = _translate(s, {altchars[0]: '+', altchars[1]: '/'})
try:
return binascii.a2b_base64(s)
except binascii.Error, msg:
# Transform this exception for consistency
raise TypeError(msg)
示例14
def loads(self, bstruct):
"""
Given a ``bstruct`` (a bytestring), verify the signature and then
deserialize and return the deserialized value.
A ``ValueError`` will be raised if the signature fails to validate.
"""
try:
b64padding = b'=' * (-len(bstruct) % 4)
fstruct = base64.urlsafe_b64decode(bytes_(bstruct) + b64padding)
except (binascii.Error, TypeError) as e:
raise ValueError('Badly formed base64 data: %s' % e)
cstruct = fstruct[self.digest_size:]
expected_sig = fstruct[:self.digest_size]
sig = hmac.new(
self.salted_secret, bytes_(cstruct), self.digestmod).digest()
if strings_differ(sig, expected_sig):
raise ValueError('Invalid signature')
return self.serializer.loads(cstruct)
示例15
def b64decode(s, altchars=None):
"""Decode a Base64 encoded string.
s is the string to decode. Optional altchars must be a string of at least
length 2 (additional characters are ignored) which specifies the
alternative alphabet used instead of the '+' and '/' characters.
The decoded string is returned. A TypeError is raised if s is
incorrectly padded. Characters that are neither in the normal base-64
alphabet nor the alternative alphabet are discarded prior to the padding
check.
"""
if altchars is not None:
s = s.translate(string.maketrans(altchars[:2], '+/'))
try:
return binascii.a2b_base64(s)
except binascii.Error, msg:
# Transform this exception for consistency
raise TypeError(msg)
示例16
def _get_policies(self, access_control_policy_ids):
def get_policy(aid):
try:
return self.api.handle_onem2m_request(
OneM2MRequest(
OneM2MOperation.retrieve, aid, fr=self._abs_cse_id
)
).get().content
except OneM2MErrorResponse as error:
if error.response_status_code == STATUS_NOT_FOUND:
self.logger.debug("Policy '%s' NOT FOUND.", aid)
else:
self.logger.debug("Error getting policy: %s:", error)
return None
return [_f for _f in map(get_policy, access_control_policy_ids) if _f]
# def _notify_das_server(self, notify_uri, payload):
示例17
def b64decode(s, altchars=None):
"""Decode a Base64 encoded string.
s is the string to decode. Optional altchars must be a string of at least
length 2 (additional characters are ignored) which specifies the
alternative alphabet used instead of the '+' and '/' characters.
The decoded string is returned. A TypeError is raised if s were
incorrectly padded or if there are non-alphabet characters present in the
string.
"""
if altchars is not None:
s = _translate(s, {altchars[0]: '+', altchars[1]: '/'})
try:
return binascii.a2b_base64(s)
except binascii.Error, msg:
# Transform this exception for consistency
raise TypeError(msg)
示例18
def build_status_header(self):
"""Build org-admin header for internal status delivery."""
try:
encoded_auth_header = self._identity_header.get("x-rh-identity")
identity = json.loads(b64decode(encoded_auth_header))
account = identity["identity"]["account_number"]
identity_header = {
"identity": {
"account_number": account,
"type": "User",
"user": {"username": "cost-mgmt", "email": "cost-mgmt@redhat.com", "is_org_admin": True},
}
}
json_identity = json_dumps(identity_header)
cost_internal_header = b64encode(json_identity.encode("utf-8"))
return {"x-rh-identity": cost_internal_header}
except (binascii.Error, json.JSONDecodeError, TypeError, KeyError) as error:
LOG.error(f"Unable to build internal status header. Error: {str(error)}")
示例19
def _handle_request_exception(self, e: BaseException) -> None:
if isinstance(e, Finish):
# Not an error; just finish the request without logging.
if not self._finished:
self.finish(*e.args)
return
try:
self.log_exception(*sys.exc_info())
except Exception:
# An error here should still get a best-effort send_error()
# to avoid leaking the connection.
app_log.error("Error in exception logger", exc_info=True)
if self._finished:
# Extra errors after the request has been finished should
# be logged, but there is no reason to continue to try and
# send a response.
return
if isinstance(e, HTTPError):
self.send_error(e.status_code, exc_info=sys.exc_info())
else:
self.send_error(500, exc_info=sys.exc_info())
示例20
def b64decode(s, altchars=None, validate=False):
"""Decode the Base64 encoded bytes-like object or ASCII string s.
Optional altchars must be a bytes-like object or ASCII string of length 2
which specifies the alternative alphabet used instead of the '+' and '/'
characters.
The result is returned as a bytes object. A binascii.Error is raised if
s is incorrectly padded.
If validate is False (the default), characters that are neither in the
normal base-64 alphabet nor the alternative alphabet are discarded prior
to the padding check. If validate is True, these non-alphabet characters
in the input result in a binascii.Error.
"""
s = _bytes_from_decode_data(s)
if altchars is not None:
altchars = _bytes_from_decode_data(altchars)
assert len(altchars) == 2, repr(altchars)
s = s.translate(bytes.maketrans(altchars, b'+/'))
if validate and not re.match(b'^[A-Za-z0-9+/]*={0,2}$', s):
raise binascii.Error('Non-base64 digit found')
return binascii.a2b_base64(s)
示例21
def get_decoded_data(self):
"""Decode the result data from base64."""
try:
return base64.b64decode(self.data)
except binascii.Error:
return None
示例22
def is_valid(token):
"""
Validate a OTP token.
:param token: OTP token
:type token: str
:return: bool
"""
try:
TOTP(token).now()
return True
except (binascii.Error, ValueError, TypeError):
return False
示例23
def update(self):
"""
Generate a new OTP based on the same token.
"""
try:
self.pin = self.now()
except binascii.Error:
self.pin = None
示例24
def standard_b64decode(s):
"""Decode a byte string encoded with the standard Base64 alphabet.
s is the byte string to decode. The decoded byte string is
returned. binascii.Error is raised if the input is incorrectly
padded or if there are non-alphabet characters present in the
input.
"""
return b64decode(s)
示例25
def _load(self, jwt):
if isinstance(jwt, text_type):
jwt = jwt.encode('utf-8')
if not issubclass(type(jwt), binary_type):
raise DecodeError("Invalid token type. Token must be a {0}".format(
binary_type))
try:
signing_input, crypto_segment = jwt.rsplit(b'.', 1)
header_segment, payload_segment = signing_input.split(b'.', 1)
except ValueError:
raise DecodeError('Not enough segments')
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid header padding')
try:
header = json.loads(header_data.decode('utf-8'))
except ValueError as e:
raise DecodeError('Invalid header string: %s' % e)
if not isinstance(header, Mapping):
raise DecodeError('Invalid header string: must be a json object')
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid payload padding')
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid crypto padding')
return (payload, signing_input, header, signature)
示例26
def decode_b(encoded):
defects = []
pad_err = len(encoded) % 4
if pad_err:
defects.append(errors.InvalidBase64PaddingDefect())
padded_encoded = encoded + b'==='[:4-pad_err]
else:
padded_encoded = encoded
try:
# The validate kwarg to b64decode is not supported in Py2.x
if not re.match(b'^[A-Za-z0-9+/]*={0,2}$', padded_encoded):
raise binascii.Error('Non-base64 digit found')
return base64.b64decode(padded_encoded), defects
except binascii.Error:
# Since we had correct padding, this must an invalid char error.
defects = [errors.InvalidBase64CharactersDefect()]
# The non-alphabet characters are ignored as far as padding
# goes, but we don't know how many there are. So we'll just
# try various padding lengths until something works.
for i in 0, 1, 2, 3:
try:
return base64.b64decode(encoded+b'='*i), defects
except (binascii.Error, TypeError): # Py2 raises a TypeError
if i==0:
defects.append(errors.InvalidBase64PaddingDefect())
else:
# This should never happen.
raise AssertionError("unexpected binascii.Error")
示例27
def decode_file_data(data):
# Py3 raises binascii.Error instead of TypeError as in Py27
try:
return base64.b64decode(data)
except (TypeError, binascii.Error):
raise exception.Base64Exception()
示例28
def validateUserPasswordDynamoDB(app, username, password):
"""
validateUserPassword: verify user and password.
throws exception if not valid
Note: make this async since we'll eventually need some sort of http request to validate user/passwords
"""
if getPassword(app, username) is None:
# look up name in dynamodb table
dynamodb = getDynamoDBClient(app)
table_name = config.get("aws_dynamodb_users_table")
log.info(f"looking for user: {username} in DynamoDB table: {table_name}")
try:
response = await dynamodb.get_item(
TableName=table_name,
Key={'username': {'S': username}}
)
except ClientError as e:
log.error("Unable to read dyanamodb table: {}".format(e.response['Error']['Message']))
raise HTTPInternalServerError() # 500
if "Item" not in response:
log.info(f"user: {username} not found")
raise HTTPUnauthorized() # 401
item = response['Item']
if "password" not in item:
log.error("Expected to find password key in DynamoDB table")
raise HTTPInternalServerError() # 500
password_item = item["password"]
if 'S' not in password_item:
log.error("Expected to find 'S' key for password item")
raise HTTPInternalServerError() # 500
log.debug(f"password: {password_item}")
if password_item['S'] != password:
log.warn(f"user password is not valid for user: {username}")
raise HTTPUnauthorized() # 401
# add user/password to user_db map
setPassword(app, username, password)
示例29
def _decode_telegram_base64(string):
"""
Decodes an url-safe base64-encoded string into its bytes
by first adding the stripped necessary padding characters.
This is the way Telegram shares binary data as strings,
such as Bot API-style file IDs or invite links.
Returns `None` if the input string was not valid.
"""
try:
return base64.urlsafe_b64decode(string + '=' * (len(string) % 4))
except (binascii.Error, ValueError, TypeError):
return None # not valid base64, not valid ascii, not a string
示例30
def _encode_telegram_base64(string):
"""
Inverse for `_decode_telegram_base64`.
"""
try:
return base64.urlsafe_b64encode(string).rstrip(b'=').decode('ascii')
except (binascii.Error, ValueError, TypeError):
return None # not valid base64, not valid ascii, not a string