Python源码示例:binascii.a2b_hex()
示例1
def test_body_encoding(self):
unicode_body = u("\xe9")
byte_body = binascii.a2b_hex(b"e9")
# unicode string in body gets converted to utf8
response = self.fetch("/echopost", method="POST", body=unicode_body,
headers={"Content-Type": "application/blah"})
self.assertEqual(response.headers["Content-Length"], "2")
self.assertEqual(response.body, utf8(unicode_body))
# byte strings pass through directly
response = self.fetch("/echopost", method="POST",
body=byte_body,
headers={"Content-Type": "application/blah"})
self.assertEqual(response.headers["Content-Length"], "1")
self.assertEqual(response.body, byte_body)
# Mixing unicode in headers and byte string bodies shouldn't
# break anything
response = self.fetch("/echopost", method="POST", body=byte_body,
headers={"Content-Type": "application/blah"},
user_agent=u("foo"))
self.assertEqual(response.headers["Content-Length"], "1")
self.assertEqual(response.body, byte_body)
示例2
def test_body_encoding(self):
unicode_body = u("\xe9")
byte_body = binascii.a2b_hex(b"e9")
# unicode string in body gets converted to utf8
response = self.fetch("/echopost", method="POST", body=unicode_body,
headers={"Content-Type": "application/blah"})
self.assertEqual(response.headers["Content-Length"], "2")
self.assertEqual(response.body, utf8(unicode_body))
# byte strings pass through directly
response = self.fetch("/echopost", method="POST",
body=byte_body,
headers={"Content-Type": "application/blah"})
self.assertEqual(response.headers["Content-Length"], "1")
self.assertEqual(response.body, byte_body)
# Mixing unicode in headers and byte string bodies shouldn't
# break anything
response = self.fetch("/echopost", method="POST", body=byte_body,
headers={"Content-Type": "application/blah"},
user_agent=u("foo"))
self.assertEqual(response.headers["Content-Length"], "1")
self.assertEqual(response.body, byte_body)
示例3
def test_hash_password(tor_cmd):
"""
Hash a controller password. It's salted so can't assert that we get a
particular value. Also, tor's output is unnecessarily verbose so including
hush to cut it down.
"""
output = run_tor(tor_cmd, '--hush', '--hash-password', 'my_password').splitlines()[-1]
if not re.match('^16:[0-9A-F]{58}$', output):
raise AssertionError("Unexpected response from 'tor --hash-password my_password': %s" % output)
# I'm not gonna even pretend to understand the following. Ported directly
# from tor's test_cmdline_args.py.
output_hex = binascii.a2b_hex(stem.util.str_tools._to_bytes(output).strip()[3:])
salt, how, hashed = output_hex[:8], output_hex[8], output_hex[9:]
count = (16 + (how & 15)) << ((how >> 4) + 6)
stuff = salt + b'my_password'
repetitions = count // len(stuff) + 1
inp = (stuff * repetitions)[:count]
assert_equal(hashlib.sha1(inp).digest(), hashed)
示例4
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
示例5
def hex_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = binascii.a2b_hex(input)
return (output, len(input))
示例6
def handle_email_opened(self, query):
# image size: 43 Bytes
img_data = '47494638396101000100800100000000ffffff21f90401000001002c00000000'
img_data += '010001000002024c01003b'
img_data = binascii.a2b_hex(img_data)
self.send_response(200)
self.send_header('Content-Type', 'image/gif')
self.send_header('Content-Length', str(len(img_data)))
self.end_headers()
self.wfile.write(img_data)
msg_id = self.get_query('id')
if not msg_id:
return
self.semaphore_acquire()
query = self._session.query(db_models.Message)
query = query.filter_by(id=msg_id, opened=None)
message = query.first()
if message and not message.campaign.has_expired:
message.opened = db_models.current_timestamp()
message.opener_ip = self.get_client_ip()
message.opener_user_agent = self.headers.get('user-agent', None)
self._session.commit()
signals.send_safe('email-opened', self.logger, self)
self.semaphore_release()
示例7
def hex_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = binascii.a2b_hex(input)
return (output, len(input))
示例8
def YARACompile(ruledata):
if ruledata.startswith('#'):
if ruledata.startswith('#h#'):
rule = binascii.a2b_hex(ruledata[3:])
elif ruledata.startswith('#b#'):
rule = binascii.a2b_base64(ruledata[3:])
elif ruledata.startswith('#s#'):
rule = 'rule string {strings: $a = "%s" ascii wide nocase condition: $a}' % ruledata[3:]
elif ruledata.startswith('#q#'):
rule = ruledata[3:].replace("'", '"')
else:
rule = ruledata[1:]
return yara.compile(source=rule)
else:
dFilepaths = {}
if os.path.isdir(ruledata):
for root, dirs, files in os.walk(ruledata):
for file in files:
filename = os.path.join(root, file)
dFilepaths[filename] = filename
else:
for filename in ProcessAt(ruledata):
dFilepaths[filename] = filename
return yara.compile(filepaths=dFilepaths)
示例9
def test_dispatch_opcode_iquery(self):
# DNS packet with IQUERY opcode
payload = "271109000001000000000000076578616d706c6503636f6d0000010001"
# expected response is an error code REFUSED. The other fields are
# id 10001
# opcode IQUERY
# rcode REFUSED
# flags QR RD
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271189050001000000000000076578616d706c6503636f"
b"6d0000010001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例10
def test_dispatch_opcode_status(self):
# DNS packet with STATUS opcode
payload = "271211000001000000000000076578616d706c6503636f6d0000010001"
# expected response is an error code REFUSED. The other fields are
# id 10002
# opcode STATUS
# rcode REFUSED
# flags QR RD
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271291050001000000000000076578616d706c6503636f"
b"6d0000010001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例11
def test_dispatch_opcode_query_non_existent_zone(self):
# DNS packet with QUERY opcode
# query is for example.com. IN A
payload = ("271501200001000000000001076578616d706c6503636f6d0000010001"
"0000291000000000000000")
# expected_response is an error code REFUSED. The other fields are
# id 10005
# opcode QUERY
# rcode REFUSED
# flags QR RD
# edns 0
# payload 8192
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271581050001000000000001076578616d706c6503636f"
b"6d00000100010000292000000000000000")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例12
def test_dispatch_opcode_query_unsupported_recordtype(self):
# query is for example.com. IN DNAME
payload = "271901000001000000000000076578616d706c6503636f6d0000270001"
# expected_response is REFUSED. The other fields are
# id 10009
# opcode QUERY
# rcode REFUSED
# flags QR RD
# ;QUESTION
# example.com. IN DNAME
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271981050001000000000000076578616d706c6503636f"
b"6d0000270001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例13
def test_send_notify_message(self):
# id 10001
# opcode NOTIFY
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_notify_response = ("2711a4000001000000000000076578616d706c650"
"3636f6d0000060001")
context = self.get_context()
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(expected_notify_response))):
response, retry = self.notify.notify_zone_changed(
context, objects.Zone.from_dict(self.test_zone),
self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
self.assertEqual(response, dns.message.from_wire(
binascii.a2b_hex(expected_notify_response)))
self.assertEqual(retry, 1)
示例14
def test_poll_for_serial_number(self):
# id 10001
# opcode QUERY
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# example.com. 3600 IN SOA example-ns.com. admin.example.com. 100 3600
# 600 86400 3600
# ;AUTHORITY
# ;ADDITIONAL
poll_response = ("271184000001000100000000076578616d706c6503636f6d0000"
"060001c00c0006000100000e1000290a6578616d706c652d6e73"
"c0140561646d696ec00c0000006400000e100000025800015180"
"00000e10")
context = self.get_context()
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number(
context, objects.Zone.from_dict(self.test_zone),
self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
self.assertEqual(status, 'SUCCESS')
self.assertEqual(serial, self.test_zone['serial'])
self.assertEqual(retries, 2)
示例15
def test_poll_for_serial_number_lower_serial(self):
# id 10001
# opcode QUERY
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# example.com. 3600 IN SOA example-ns.com. admin.example.com. 99 3600
# 600 86400 3600
# ;AUTHORITY
# ;ADDITIONAL
poll_response = ("271184000001000100000000076578616d706c6503636f6d0000"
"060001c00c0006000100000e1000290a6578616d706c652d6e73"
"c0140561646d696ec00c0000006300000e100000025800015180"
"00000e10")
context = self.get_context()
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number(
context, objects.Zone.from_dict(self.test_zone),
self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
self.assertEqual(status, 'ERROR')
self.assertEqual(serial, 99)
self.assertEqual(retries, 0)
示例16
def test_poll_for_serial_number_higher_serial(self):
# id 10001
# opcode QUERY
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# example.com. 3600 IN SOA example-ns.com. admin.example.com. 101 3600
# 600 86400 3600
# ;AUTHORITY
# ;ADDITIONAL
poll_response = ("271184000001000100000000076578616d706c6503636f6d0000"
"060001c00c0006000100000e1000290a6578616d706c652d6e73"
"c0140561646d696ec00c0000006500000e100000025800015180"
"00000e10")
context = self.get_context()
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number(
context, objects.Zone.from_dict(self.test_zone),
self.nameserver.host, self.nameserver.port, 0, 0, 2, 0)
self.assertEqual(status, 'SUCCESS')
self.assertEqual(serial, 101)
self.assertEqual(retries, 2)
示例17
def test_receive_notify(self, mock_doaxfr, mock_query):
"""
Get a NOTIFY and ensure the response is right,
and an AXFR is triggered
"""
payload = ('1a7220000001000000000000076578616d706c6503636f6d000006'
'0001')
# expected response is NOERROR, other fields are
# opcode NOTIFY
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b'1a72a4000001000000000000076578616d706c6503'
b'636f6d0000060001')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': ['0.0.0.0', 1234]}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例18
def test_receive_notify_bad_notifier(self):
payload = '243520000001000000000000076578616d706c6503636f6d0000060001'
# expected response is REFUSED, other fields are
# opcode NOTIFY
# rcode REFUSED
# flags QR
# ;QUESTION
# example.com. IN SOA
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b'2435a0050001000000000000076578616d706c6503636f'
b'6d0000060001')
request = dns.message.from_wire(binascii.a2b_hex(payload))
# Bad 'requester'
request.environ = {'addr': ['6.6.6.6', 1234]}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例19
def test_receive_create_bad_notifier(self):
payload = '8dfd70000001000000000000076578616d706c6503636f6d00ff02ff00'
# expected response is REFUSED, other fields are
# opcode 14
# rcode REFUSED
# flags QR
# ;QUESTION
# example.com. CLASS65280 TYPE65282
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b'8dfdf0050001000000000000076578616d706c6503636f'
b'6d00ff02ff00')
request = dns.message.from_wire(binascii.a2b_hex(payload))
# Bad 'requester'
request.environ = {'addr': ['6.6.6.6', 1234]}
response = next(self.handler(request)).to_wire()
self.assertEqual(binascii.b2a_hex(response), expected_response)
示例20
def test_receive_delete(self, mock_execute):
payload = '3b9970000001000000000000076578616d706c6503636f6d00ff03ff00'
# Expected NOERROR other fields are
# opcode 14
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. CLASS65280 TYPE65283
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b'3b99f4000001000000000000076578616d706c6503636f'
b'6d00ff03ff00')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': ['0.0.0.0', 1234]}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例21
def test_receive_delete_bad_notifier(self):
payload = 'e6da70000001000000000000076578616d706c6503636f6d00ff03ff00'
# expected response is REFUSED, other fields are
# opcode 14
# rcode REFUSED
# flags QR
# ;QUESTION
# example.com. CLASS65280 TYPE65283
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b'e6daf0050001000000000000076578616d706c6503636f'
b'6d00ff03ff00')
request = dns.message.from_wire(binascii.a2b_hex(payload))
# Bad 'requester'
request.environ = {'addr': ['6.6.6.6', 1234]}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例22
def test_transfer_source(self, mock_doaxfr, mock_query):
payload = '735d70000001000000000000076578616d706c6503636f6d00ff02ff00'
# Expected NOERROR other fields are
# opcode 14
# rcode NOERROR
# flags QR AA
# ;QUESTION
# example.com. CLASS65280 TYPE65282
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b'735df4000001000000000000076578616d706c6503636f'
b'6d00ff02ff00')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': ['0.0.0.0', 1234]}
with mock.patch.object(
designate.backend.agent_backend.impl_fake.FakeBackend,
'find_zone_serial', return_value=None):
response = next(self.handler(request)).to_wire()
mock_doaxfr.assert_called_with(
'example.com.', [], source='1.2.3.4'
)
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例23
def check(ip, port, timeout):
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, int(port)))
data = binascii.a2b_hex(
"3a000000a741000000000000d40700000000000061646d696e2e24636d640000000000ffffffff130000001069736d6173746572000100000000")
s.send(data)
result = s.recv(1024)
if "ismaster" in result:
getlog_data = binascii.a2b_hex(
"480000000200000000000000d40700000000000061646d696e2e24636d6400000000000100000021000000026765744c6f670010000000737461727475705761726e696e67730000")
s.send(getlog_data)
result = s.recv(1024)
if "totalLinesWritten" in result:
return u"未授权访问"
except Exception, e:
pass
示例24
def hex_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = binascii.a2b_hex(input)
return (output, len(input))
示例25
def hex_decode(input,errors='strict'):
""" Decodes the object input and returns a tuple (output
object, length consumed).
input must be an object which provides the bf_getreadbuf
buffer slot. Python strings, buffer objects and memory
mapped files are examples of objects providing this slot.
errors defines the error handling to apply. It defaults to
'strict' handling which is the only currently supported
error handling for this codec.
"""
assert errors == 'strict'
output = binascii.a2b_hex(input)
return (output, len(input))
示例26
def number_to_bytes(num, num_bytes):
padded_hex = '%0*x' % (2 * num_bytes, num)
big_endian = binascii.a2b_hex(padded_hex.encode('ascii'))
return big_endian
示例27
def _readPage(self, page, tries=3):
"""Read a page from the flash and receive it's contents"""
self._debug('Command: FLASH_READ_PAGE %d' % page)
# Load page into the buffer
crc = self._loadPageMultiple(page, tries)
for _ in range(tries):
# Dump the buffer
self._sendCommand(COMMAND_BUFFER_LOAD)
# Wait for data start
if not self._waitForMessage(COMMAND_BUFFER_LOAD):
self._debug('Invalid / no response for BUFFER_LOAD command')
continue
# Load successful -> read sector with 2 nibbles per byte
page_data = self._readExactly(self.page_size * 2)
if page_data is None:
self._debug('Invalid / no response for page data')
continue
try:
data = binascii.a2b_hex(page_data.decode(ENCODING))
if crc == binascii.crc32(data):
self._debug('CRC did match with read data')
return data
else:
self._debug('CRC did not match with read data')
continue
except TypeError:
self._debug('CRC could not be parsed')
continue
self._debug('Page read tries exceeded')
return None
示例28
def _read_register(self, cmd, name):
"""Generic read register function, send cmd and read a <CMD><LEN><DATA> response"""
self._sendCommand(cmd)
if not self._waitForMessage(cmd):
self._debug('Invalid / no response for %s command' % (name,))
logError('Invalid response')
return None
length_str = self._readExactly(2).decode(ENCODING)
if length_str is None:
self._debug('Invalid / no response for %s length' % (name,))
logError('Invalid response')
return None
try:
length = int(length_str, 16)
except ValueError:
self._debug('Could not decode %s length' % (name,))
logError('Invalid register length')
return None
data_str = self._readExactly(length * 2).decode(ENCODING)
if data_str is None:
self._debug('Invalid / no response for %s check' % (name,))
logError('Invalid response')
return None
try: # Check if valid data
decoded_data = binascii.a2b_hex(data_str)
except TypeError:
self._debug('Could not decode %s content' % (name))
logError('Invalid response')
return None
return data_str
示例29
def _decode_xsrf_token(self, cookie):
"""把_get_raw_xsrf_token返回的cookie字符串转换成元组形式.
"""
try:
m = _signed_value_version_re.match(utf8(cookie))
if m:
version = int(m.group(1))
if version == 2:
_, mask, masked_token, timestamp = cookie.split("|")
mask = binascii.a2b_hex(utf8(mask))
token = _websocket_mask(
mask, binascii.a2b_hex(utf8(masked_token)))
timestamp = int(timestamp)
return version, token, timestamp
else:
# Treat unknown versions as not present instead of failing.
raise Exception("Unknown xsrf cookie version")
else:
version = 1
try:
token = binascii.a2b_hex(utf8(cookie))
except (binascii.Error, TypeError):
token = utf8(cookie)
# We don't have a usable timestamp in older versions.
timestamp = int(time.time())
return (version, token, timestamp)
except Exception:
# Catch exceptions and return nothing instead of failing.
gen_log.debug("Uncaught exception in _decode_xsrf_token",
exc_info=True)
return None, None, None
示例30
def test_cookie_tampering_future_timestamp(self):
handler = CookieTestRequestHandler()
# this string base64-encodes to '12345678'
handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
version=1)
cookie = handler._cookies['foo']
match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
self.assertTrue(match)
timestamp = match.group(1)
sig = match.group(2)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '12345678', timestamp),
sig)
# shifting digits from payload to timestamp doesn't alter signature
# (this is not desirable behavior, just confirming that that's how it
# works)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '1234', b'5678' + timestamp),
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
self.assertTrue(
handler.get_secure_cookie('foo', min_version=1) is None)