Python源码示例:binascii.b2a_hex()
示例1
def encrypt(self, text, appid):
"""对明文进行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明文开头
len_str = struct.pack("I", socket.htonl(len(text.encode())))
# text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid
text = self.get_random_str() + len_str + text.encode() + appid
# 使用自定义的填充方式对明文进行补位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64对加密后的字符串进行编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')
except Exception as e:
return ierror.WXBizMsgCrypt_EncryptAES_Error, None
示例2
def get_client_key_exchange_record(
cls,
robot_payload_enum: RobotPmsPaddingPayloadEnum,
tls_version: tls_parser.tls_version.TlsVersionEnum,
modulus: int,
exponent: int,
) -> TlsRsaClientKeyExchangeRecord:
"""A client key exchange record with a hardcoded pre_master_secret, and a valid or invalid padding.
"""
pms_padding = cls._compute_pms_padding(modulus)
tls_version_hex = binascii.b2a_hex(TlsRecordTlsVersionBytes[tls_version.name].value).decode("ascii")
pms_with_padding_payload = cls._CKE_PAYLOADS_HEX[robot_payload_enum]
final_pms = pms_with_padding_payload.format(
pms_padding=pms_padding, tls_version=tls_version_hex, pms=cls._PMS_HEX
)
cke_robot_record = TlsRsaClientKeyExchangeRecord.from_parameters(
tls_version, exponent, modulus, int(final_pms, 16)
)
return cke_robot_record
示例3
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
if callback_uri == "oob":
args["oauth_callback"] = "oob"
elif callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params:
args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
示例4
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
示例5
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
if callback_uri == "oob":
args["oauth_callback"] = "oob"
elif callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params:
args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
示例6
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib_parse.urlencode(args)
示例7
def sqlite3_savepoint(db):
"""Savepoint context manager. On return, commit; on exception, rollback.
Savepoints are like transactions, but they may be nested in
transactions or in other savepoints.
"""
# This is not symmetric with sqlite3_transaction because ROLLBACK
# undoes any effects and makes the transaction cease to be,
# whereas ROLLBACK TO undoes any effects but leaves the savepoint
# as is. So for either success or failure we must release the
# savepoint explicitly.
savepoint = binascii.b2a_hex(os.urandom(32))
db.cursor().execute("SAVEPOINT x%s" % (savepoint,))
ok = False
try:
yield
ok = True
finally:
if not ok:
db.cursor().execute("ROLLBACK TO x%s" % (savepoint,))
db.cursor().execute("RELEASE x%s" % (savepoint,))
示例8
def test_FortunaPool(self):
"""FortunaAccumulator.FortunaPool"""
pool = FortunaAccumulator.FortunaPool()
self.assertEqual(0, pool.length)
self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())
pool.append(b('abc'))
self.assertEqual(3, pool.length)
self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())
pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))
self.assertEqual(56, pool.length)
self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))
pool.reset()
self.assertEqual(0, pool.length)
pool.append(b('a') * 10**6)
self.assertEqual(10**6, pool.length)
self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
示例9
def runTest(self):
from Crypto.Cipher import DES
from binascii import b2a_hex
X = []
X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]
for i in range(16):
c = DES.new(X[i],DES.MODE_ECB)
if not (i&1): # (num&1) returns 1 for odd numbers
X[i+1:] = [c.encrypt(X[i])] # even
else:
X[i+1:] = [c.decrypt(X[i])] # odd
self.assertEqual(b2a_hex(X[16]),
b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
示例10
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
示例11
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
# The cipher should work like a stream cipher
# Test counter mode encryption, 3 bytes at a time
ct3 = []
cipher = self._new()
for i in range(0, len(plaintext), 3):
ct3.append(cipher.encrypt(plaintext[i:i+3]))
ct3 = b2a_hex(b("").join(ct3))
self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
# Test counter mode decryption, 3 bytes at a time
pt3 = []
cipher = self._new()
for i in range(0, len(ciphertext), 3):
pt3.append(cipher.encrypt(ciphertext[i:i+3]))
# PY3K: This is meant to be text, do not change to bytes (data)
pt3 = b2a_hex(b("").join(pt3))
self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
示例12
def _oauth_request_token_url(self, callback_uri= None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0a"),
)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
if callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params: args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
示例13
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=request_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0a"),
)
if "verifier" in request_token:
args["oauth_verifier"]=request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
示例14
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(
consumer_token, "GET", url, args, request_token
)
else:
signature = _oauth_signature(
consumer_token, "GET", url, args, request_token
)
args["oauth_signature"] = signature
return url + "?" + urllib.parse.urlencode(args)
示例15
def key_derivation(pwd, salt="'):
"""returns (key, verifier, salt) where
* key is a 128-bit int (as siphash.SipHash requires)
* verifier is an 8-byte string
* salt is an 8-byte string
"""
# if salt not given, pick a random one
if not salt:
salt = b2a_hex(os.urandom(SALT_BYTES))
# key generation
salt_as_int = int(salt, 16)
mask = 0xffffffffffffffff
key_hi = SIPHASH_SLOW(pwd+'0', salt_as_int) & mask
key_lo = SIPHASH_SLOW(pwd+'1', salt_as_int) & mask
key = (key_hi << 64) | key_lo
# verifier generation
verifier = tohex(SIPHASH_FAST(salt, key))
return (key, verifier, salt)
示例16
def test_dispatch_opcode_iquery(self):
# DNS packet with IQUERY opcode
payload = "271109000001000000000000076578616d706c6503636f6d0000010001"
# expected response is an error code REFUSED. The other fields are
# id 10001
# opcode IQUERY
# rcode REFUSED
# flags QR RD
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271189050001000000000000076578616d706c6503636f"
b"6d0000010001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例17
def test_dispatch_opcode_status(self):
# DNS packet with STATUS opcode
payload = "271211000001000000000000076578616d706c6503636f6d0000010001"
# expected response is an error code REFUSED. The other fields are
# id 10002
# opcode STATUS
# rcode REFUSED
# flags QR RD
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271291050001000000000000076578616d706c6503636f"
b"6d0000010001")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例18
def test_dispatch_opcode_query_non_existent_zone(self):
# DNS packet with QUERY opcode
# query is for example.com. IN A
payload = ("271501200001000000000001076578616d706c6503636f6d0000010001"
"0000291000000000000000")
# expected_response is an error code REFUSED. The other fields are
# id 10005
# opcode QUERY
# rcode REFUSED
# flags QR RD
# edns 0
# payload 8192
# ;QUESTION
# example.com. IN A
# ;ANSWER
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"271581050001000000000001076578616d706c6503636f"
b"6d00000100010000292000000000000000")
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
response = next(self.handler(request)).to_wire()
self.assertEqual(expected_response, binascii.b2a_hex(response))
示例19
def generate_jwt_secret():
# 32 bytes = 256 bits, recommended key length for HS256.
material = os.urandom(32)
return binascii.b2a_hex(material)
示例20
def bytes_to_number(string):
return int(binascii.b2a_hex(string), 16)
示例21
def __str__(self):
return binascii.b2a_hex(self._value)
示例22
def _oauth_request_parameters(self, url, access_token, parameters={},
method="GET"):
"""Returns the OAuth parameters as a dict for the given request.
parameters should include all POST arguments and query string arguments
that will be sent with the request.
"""
consumer_token = self._oauth_consumer_token()
base_args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(access_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
args = {}
args.update(base_args)
args.update(parameters)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, method, url, args,
access_token)
else:
signature = _oauth_signature(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = escape.to_basestring(signature)
return base_args
示例23
def get(self, arg):
def describe(s):
if type(s) == bytes:
return ["bytes", native_str(binascii.b2a_hex(s))]
elif type(s) == unicode_type:
return ["unicode", s]
raise Exception("unknown type")
self.write({'path': describe(arg),
'query': describe(self.get_argument("foo")),
})
示例24
def _oauth_request_parameters(self, url, access_token, parameters={},
method="GET"):
"""Returns the OAuth parameters as a dict for the given request.
parameters should include all POST arguments and query string arguments
that will be sent with the request.
"""
consumer_token = self._oauth_consumer_token()
base_args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(access_token["key"]),
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
oauth_version="1.0",
)
args = {}
args.update(base_args)
args.update(parameters)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, method, url, args,
access_token)
else:
signature = _oauth_signature(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = escape.to_basestring(signature)
return base_args
示例25
def get(self, arg):
def describe(s):
if type(s) == bytes:
return ["bytes", native_str(binascii.b2a_hex(s))]
elif type(s) == unicode_type:
return ["unicode", s]
raise Exception("unknown type")
self.write({'path': describe(arg),
'query': describe(self.get_argument("foo")),
})
示例26
def generate_prize_key(rand, *, prize=None, key=None, prize_winner=None, winner=None):
prize_key = PrizeKey()
prize_key.key = key or '-'.join(
binascii.b2a_hex(os.urandom(2)).decode('utf-8') for _ in range(4)
)
prize_key.prize_id = prize.id if prize else pick_random_instance(rand, Prize).id
if not prize_winner and winner:
prize_winner = PrizeWinner.objects.create(prize=prize, winner=winner)
prize_key.prize_winner = prize_winner
prize_key.clean()
return prize_key
示例27
def parse_ntlm_resp(msg3, seq):
'''
Parse the 3rd msg in NTLM handshake
Thanks to psychomario
'''
if seq in challenge_acks:
challenge = challenge_acks[seq]
else:
challenge = 'CHALLENGE NOT FOUND'
if len(msg3) > 43:
# Thx to psychomario for below
lmlen, lmmax, lmoff, ntlen, ntmax, ntoff, domlen, dommax, domoff, userlen, usermax, useroff = struct.unpack("12xhhihhihhihhi", msg3[:44])
lmhash = binascii.b2a_hex(msg3[lmoff:lmoff+lmlen])
nthash = binascii.b2a_hex(msg3[ntoff:ntoff+ntlen])
domain = msg3[domoff:domoff+domlen].replace("\0", "")
user = msg3[useroff:useroff+userlen].replace("\0", "")
# Original check by psychomario, might be incorrect?
#if lmhash != "0"*48: #NTLMv1
if ntlen == 24: #NTLMv1
msg = '%s %s' % ('NETNTLMv1:', user+"::"+domain+":"+lmhash+":"+nthash+":"+challenge)
return msg
elif ntlen > 60: #NTLMv2
msg = '%s %s' % ('NETNTLMv2:', user+"::"+domain+":"+challenge+":"+nthash[:32]+":"+nthash[32:])
return msg
示例28
def parse_ntlm_resp(msg3, seq):
'''
Parse the 3rd msg in NTLM handshake
Thanks to psychomario
'''
if seq in challenge_acks:
challenge = challenge_acks[seq]
else:
challenge = 'CHALLENGE NOT FOUND'
if len(msg3) > 43:
# Thx to psychomario for below
lmlen, lmmax, lmoff, ntlen, ntmax, ntoff, domlen, dommax, domoff, userlen, usermax, useroff = struct.unpack("12xhhihhihhihhi", msg3[:44])
lmhash = binascii.b2a_hex(msg3[lmoff:lmoff+lmlen])
nthash = binascii.b2a_hex(msg3[ntoff:ntoff+ntlen])
domain = msg3[domoff:domoff+domlen].replace("\0", "")
user = msg3[useroff:useroff+userlen].replace("\0", "")
# Original check by psychomario, might be incorrect?
#if lmhash != "0"*48: #NTLMv1
if ntlen == 24: #NTLMv1
msg = '%s %s' % ('NETNTLMv1:', user+"::"+domain+":"+lmhash+":"+nthash+":"+challenge)
return msg
elif ntlen > 60: #NTLMv2
msg = '%s %s' % ('NETNTLMv2:', user+"::"+domain+":"+challenge+":"+nthash[:32]+":"+nthash[32:])
return msg
示例29
def mssql(self,user,pass_):#author:hos@YSRC
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((self.ip,self.port))
hh=binascii.b2a_hex(self.ip)
husername=binascii.b2a_hex(user)
lusername=len(user)
lpassword=len(pass_)
ladd=len(self.ip)+len(str(self.port))+1
hladd=hex(ladd).replace('0x','')
hpwd=binascii.b2a_hex(pass_)
pp=binascii.b2a_hex(str(self.port))
address=hh+'3a'+pp
hhost= binascii.b2a_hex(self.ip)
data="0200020000000000123456789000000000000000000000000000000000000000000000000000ZZ5440000000000000000000000000000000000000000000000000000000000X3360000000000000000000000000000000000000000000000000000000000Y373933340000000000000000000000000000000000000000000000000000040301060a09010000000002000000000070796d7373716c000000000000000000000000000000000000000000000007123456789000000000000000000000000000000000000000000000000000ZZ3360000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000Y0402000044422d4c6962726172790a00000000000d1175735f656e676c69736800000000000000000000000000000201004c000000000000000000000a000000000000000000000000000069736f5f31000000000000000000000000000000000000000000000000000501353132000000030000000000000000"
data1=data.replace(data[16:16+len(address)],address)
data2=data1.replace(data1[78:78+len(husername)],husername)
data3=data2.replace(data2[140:140+len(hpwd)],hpwd)
if lusername>=16:
data4=data3.replace('0X',str(hex(lusername)).replace('0x',''))
else:
data4=data3.replace('X',str(hex(lusername)).replace('0x',''))
if lpassword>=16:
data5=data4.replace('0Y',str(hex(lpassword)).replace('0x',''))
else:
data5=data4.replace('Y',str(hex(lpassword)).replace('0x',''))
hladd = hex(ladd).replace('0x', '')
data6=data5.replace('ZZ',str(hladd))
data7=binascii.a2b_hex(data6)
sock.send(data7)
packet=sock.recv(1024)
if 'master' in packet:
#return "username:%s,password:%s" % (user,pass_)
self.return_result.append({'server':str(self.server),'url':str(self.url),'u':str(user),'p':str(pass_),})
except:
return 3
finally:
sock.close()
示例30
def get_auth_data(self,user,password,scramble,plugin):
user_hex = binascii.b2a_hex(user)
pass_hex = binascii.b2a_hex(self.get_hash(password,scramble))
data = "85a23f0000000040080000000000000000000000000000000000000000000000" + user_hex + "0014" + pass_hex
if plugin:data+=binascii.b2a_hex(plugin)+ "0055035f6f73076f737831302e380c5f636c69656e745f6e616d65086c69626d7973716c045f7069640539323330360f5f636c69656e745f76657273696f6e06352e362e3231095f706c6174666f726d067838365f3634"
len_hex = hex(len(data)/2).replace("0x","")
auth_data = len_hex + "000001" +data
return binascii.a2b_hex(auth_data)